worker状态管理

This commit is contained in:
JaguarJack 2020-07-09 08:28:29 +08:00
parent a3f378eca8
commit ea32a4cd33
4 changed files with 59 additions and 29 deletions

View File

@ -37,7 +37,7 @@ class ManageProcess
* *
* @var array * @var array
*/ */
protected $process = []; protected $processes = [];
/** /**
* 主进程ID * 主进程ID
@ -187,9 +187,12 @@ class ManageProcess
$process = $this->createStaticProcess(); $process = $this->createStaticProcess();
// $worker->name("[$i+1]catch-worker"); // $worker->name("[$i+1]catch-worker");
$process->start(); $process->start();
$this->process[$process->pid] = $this->processInfo($process); $this->processes[$process->pid] = $process;
$this->storeStatus($this->processInfo($process));
} }
} }

View File

@ -70,12 +70,13 @@ trait Process
protected function processInfo($process) protected function processInfo($process)
{ {
return [ return [
'name' => $process,
'pid' => $process->pid, 'pid' => $process->pid,
'status' => self::WAITING, 'status' => self::WAITING,
'start_at' => time(), 'start_at' => time(),
'deal_num' => 0, 'running_time' => 0,
'error' => 0, 'memory' => memory_get_usage(),
'deal_tasks' => 0,
'errors' => 0,
]; ];
} }
@ -89,12 +90,20 @@ trait Process
{ {
$waiting = [false, null]; $waiting = [false, null];
foreach ($this->process as $process) { $pid = 0;
// 获取等待状态的 worker
$processes = $this->getProcessesStatus();
foreach ($processes as $process) {
if ($process['status'] == self::WAITING) { if ($process['status'] == self::WAITING) {
$waiting = [true, $process['name']]; $pid = $process['pid'];
break; break;
} }
} }
// 获取相应的状态
if (isset($this->processes[$pid])) {
return [true, $this->processes[$pid]];
}
return $waiting; return $waiting;
} }
@ -108,9 +117,18 @@ trait Process
*/ */
protected function beforeTask($pid) protected function beforeTask($pid)
{ {
if (isset($this->process[$pid])) { $processes = $this->getProcessesStatus();
$this->process[$pid]['status'] = self::BUSYING;
foreach ($processes as &$process) {
if ($process['pid'] == $pid) {
$process['status'] = self::BUSYING;
$process['running_time'] = time() - $process['start_at'];
$process['memory'] = memory_get_usage();
break;
}
} }
$this->writeStatusToFile($processes);
} }
/** /**
@ -122,10 +140,18 @@ trait Process
*/ */
protected function afterTask($pid) protected function afterTask($pid)
{ {
if (isset($this->process[$pid])) { $processes = $this->getProcessesStatus();
$this->process[$pid]['status'] = self::WAITING;
$this->process[$pid]['deal_num'] += 1; foreach ($processes as &$process) {
if ($process['pid'] == $pid) {
$process['status'] = self::WAITING;
$process['running_time'] = time() - $process['start_at'];
$process['memory'] = memory_get_usage();
break;
}
} }
$this->writeStatusToFile($processes);
} }
/** /**

View File

@ -62,9 +62,9 @@ trait RegisterSignal
{ {
return function () { return function () {
while ($res = Process::wait(false)) { while ($res = Process::wait(false)) {
if (isset($this->process[$res['pid']])) { if (isset($this->processes[$res['pid']])) {
$this->unsetWorkerStatus($res['pid']); $this->unsetWorkerStatus($res['pid']);
unset($this->process[$res['pid']]); unset($this->processes[$res['pid']]);
} }
} }
}; };
@ -80,8 +80,8 @@ trait RegisterSignal
{ {
return function () { return function () {
// 发送停止信号给子进程 等待结束后自动退出 // 发送停止信号给子进程 等待结束后自动退出
foreach ($this->process as $process) { foreach ($this->processes as $pid => $process) {
Process::kill($process['pid'], SIGTERM); Process::kill($pid, SIGTERM);
} }
// 退出 master // 退出 master
Process::kill($this->master_pid, SIGKILL); Process::kill($this->master_pid, SIGKILL);
@ -98,8 +98,8 @@ trait RegisterSignal
{ {
return function () { return function () {
// $this->storeStatus(); // $this->storeStatus();
foreach ($this->process as $process) { foreach ($this->processes as $pid => $process) {
Process::kill($process['pid'], SIGUSR1); Process::kill($pid, SIGUSR1);
} }
}; };
} }
@ -114,7 +114,7 @@ trait RegisterSignal
{ {
return function () { return function () {
// 使用队列, 会发生主进程往一个不存在的进程发送消息吗? // 使用队列, 会发生主进程往一个不存在的进程发送消息吗?
foreach ($this->process as $process) { foreach ($this->processes as $process) {
Process::kill((int)$process['pid'], SIGTERM); Process::kill((int)$process['pid'], SIGTERM);
} }
}; };

View File

@ -35,11 +35,12 @@ trait Store
*/ */
public function storeStatus(array $status) public function storeStatus(array $status)
{ {
if (file_exists($this->getWorkerStatusPath())) { if (file_exists($this->getProcessStatusPath())) {
$workersStatus = $this->getWorkersStatus(); $workersStatus = $this->getProcessesStatus();
// ['PID',, 'START_AT', 'STATUS', 'DEAL_TASKS', 'ERRORS', 'running_time', 'memory']; // ['PID',, 'START_AT', 'STATUS', 'DEAL_TASKS', 'ERRORS', 'running_time', 'memory'];
$pids = array_column($workersStatus, 'pid'); $pids = array_column($workersStatus, 'pid');
if (!in_array($status['pid'], $pids)) { if (!in_array($status['pid'], $pids)) {
$workersStatus = array_merge($workersStatus, $status); $workersStatus = array_merge($workersStatus, $status);
} else { } else {
@ -52,7 +53,7 @@ trait Store
} }
$this->writeStatusToFile($workersStatus); $this->writeStatusToFile($workersStatus);
} else { } else {
file_put_contents($this->getWorkerStatusPath(), \json_encode($status)); file_put_contents($this->getProcessStatusPath(), \json_encode([$status]));
} }
} }
@ -62,9 +63,9 @@ trait Store
* @time 2020年07月08日 * @time 2020年07月08日
* @return mixed * @return mixed
*/ */
protected function getWorkersStatus() protected function getProcessesStatus()
{ {
return \json_decode(file_get_contents($this->getWorkerStatusPath()), true); return \json_decode(file_get_contents($this->getProcessStatusPath()), true);
} }
/** /**
@ -76,7 +77,7 @@ trait Store
*/ */
protected function unsetWorkerStatus($pid) protected function unsetWorkerStatus($pid)
{ {
$workers = $this->getWorkersStatus(); $workers = $this->getProcessesStatus();
foreach ($workers as $k => $worker) { foreach ($workers as $k => $worker) {
if ($worker['pid'] == $pid) { if ($worker['pid'] == $pid) {
@ -96,7 +97,7 @@ trait Store
*/ */
protected function writeStatusToFile($status) protected function writeStatusToFile($status)
{ {
$file = new \SplFileObject($this->getWorkerStatusPath(), 'rw+'); $file = new \SplFileObject($this->getProcessStatusPath(), 'rw+');
// 加锁 防止多进程写入混乱 // 加锁 防止多进程写入混乱
$file->flock(LOCK_EX); $file->flock(LOCK_EX);
$file->fwrite(\json_encode($status)); $file->fwrite(\json_encode($status));
@ -114,7 +115,7 @@ trait Store
// 等待信号输出 // 等待信号输出
sleep(1); sleep(1);
return file_exists($this->getWorkerStatusPath()) ? file_get_contents($this->getWorkerStatusPath()) : ''; return file_exists($this->getProcessStatusPath()) ? file_get_contents($this->getProcessStatusPath()) : '';
} }
/** /**
@ -153,7 +154,7 @@ trait Store
* @time 2020年07月07日 * @time 2020年07月07日
* @return string * @return string
*/ */
protected function getWorkerStatusPath() protected function getProcessStatusPath()
{ {
$path = runtime_path('schedule' . DIRECTORY_SEPARATOR); $path = runtime_path('schedule' . DIRECTORY_SEPARATOR);
@ -161,6 +162,6 @@ trait Store
mkdir($path, 0777, true); mkdir($path, 0777, true);
} }
return $path . 'worker-status.php'; return $path . 'worker-status.json';
} }
} }