From ea32a4cd3337564715709cb22114e549a662ac4e Mon Sep 17 00:00:00 2001 From: JaguarJack Date: Thu, 9 Jul 2020 08:28:29 +0800 Subject: [PATCH] =?UTF-8?q?worker=E7=8A=B6=E6=80=81=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../catcher/library/crontab/ManageProcess.php | 7 ++- extend/catcher/library/crontab/Process.php | 46 +++++++++++++++---- .../library/crontab/RegisterSignal.php | 14 +++--- extend/catcher/library/crontab/Store.php | 21 +++++---- 4 files changed, 59 insertions(+), 29 deletions(-) diff --git a/extend/catcher/library/crontab/ManageProcess.php b/extend/catcher/library/crontab/ManageProcess.php index 96b72f5..d981b97 100644 --- a/extend/catcher/library/crontab/ManageProcess.php +++ b/extend/catcher/library/crontab/ManageProcess.php @@ -37,7 +37,7 @@ class ManageProcess * * @var array */ - protected $process = []; + protected $processes = []; /** * 主进程ID @@ -187,9 +187,12 @@ class ManageProcess $process = $this->createStaticProcess(); // $worker->name("[$i+1]catch-worker"); + $process->start(); - $this->process[$process->pid] = $this->processInfo($process); + $this->processes[$process->pid] = $process; + + $this->storeStatus($this->processInfo($process)); } } diff --git a/extend/catcher/library/crontab/Process.php b/extend/catcher/library/crontab/Process.php index 8499701..f3dbe7a 100644 --- a/extend/catcher/library/crontab/Process.php +++ b/extend/catcher/library/crontab/Process.php @@ -70,12 +70,13 @@ trait Process protected function processInfo($process) { return [ - 'name' => $process, 'pid' => $process->pid, 'status' => self::WAITING, 'start_at' => time(), - 'deal_num' => 0, - 'error' => 0, + 'running_time' => 0, + 'memory' => memory_get_usage(), + 'deal_tasks' => 0, + 'errors' => 0, ]; } @@ -89,12 +90,20 @@ trait Process { $waiting = [false, null]; - foreach ($this->process as $process) { + $pid = 0; + + // 获取等待状态的 worker + $processes = $this->getProcessesStatus(); + foreach ($processes as $process) { if ($process['status'] == self::WAITING) { - $waiting = [true, $process['name']]; + $pid = $process['pid']; break; } } + // 获取相应的状态 + if (isset($this->processes[$pid])) { + return [true, $this->processes[$pid]]; + } return $waiting; } @@ -108,9 +117,18 @@ trait Process */ protected function beforeTask($pid) { - if (isset($this->process[$pid])) { - $this->process[$pid]['status'] = self::BUSYING; + $processes = $this->getProcessesStatus(); + + foreach ($processes as &$process) { + if ($process['pid'] == $pid) { + $process['status'] = self::BUSYING; + $process['running_time'] = time() - $process['start_at']; + $process['memory'] = memory_get_usage(); + break; + } } + + $this->writeStatusToFile($processes); } /** @@ -122,10 +140,18 @@ trait Process */ protected function afterTask($pid) { - if (isset($this->process[$pid])) { - $this->process[$pid]['status'] = self::WAITING; - $this->process[$pid]['deal_num'] += 1; + $processes = $this->getProcessesStatus(); + + foreach ($processes as &$process) { + if ($process['pid'] == $pid) { + $process['status'] = self::WAITING; + $process['running_time'] = time() - $process['start_at']; + $process['memory'] = memory_get_usage(); + break; + } } + + $this->writeStatusToFile($processes); } /** diff --git a/extend/catcher/library/crontab/RegisterSignal.php b/extend/catcher/library/crontab/RegisterSignal.php index ff326d1..40c5650 100644 --- a/extend/catcher/library/crontab/RegisterSignal.php +++ b/extend/catcher/library/crontab/RegisterSignal.php @@ -62,9 +62,9 @@ trait RegisterSignal { return function () { while ($res = Process::wait(false)) { - if (isset($this->process[$res['pid']])) { + if (isset($this->processes[$res['pid']])) { $this->unsetWorkerStatus($res['pid']); - unset($this->process[$res['pid']]); + unset($this->processes[$res['pid']]); } } }; @@ -80,8 +80,8 @@ trait RegisterSignal { return function () { // 发送停止信号给子进程 等待结束后自动退出 - foreach ($this->process as $process) { - Process::kill($process['pid'], SIGTERM); + foreach ($this->processes as $pid => $process) { + Process::kill($pid, SIGTERM); } // 退出 master Process::kill($this->master_pid, SIGKILL); @@ -98,8 +98,8 @@ trait RegisterSignal { return function () { // $this->storeStatus(); - foreach ($this->process as $process) { - Process::kill($process['pid'], SIGUSR1); + foreach ($this->processes as $pid => $process) { + Process::kill($pid, SIGUSR1); } }; } @@ -114,7 +114,7 @@ trait RegisterSignal { return function () { // 使用队列, 会发生主进程往一个不存在的进程发送消息吗? - foreach ($this->process as $process) { + foreach ($this->processes as $process) { Process::kill((int)$process['pid'], SIGTERM); } }; diff --git a/extend/catcher/library/crontab/Store.php b/extend/catcher/library/crontab/Store.php index 395f6e9..ab1f52d 100644 --- a/extend/catcher/library/crontab/Store.php +++ b/extend/catcher/library/crontab/Store.php @@ -35,11 +35,12 @@ trait Store */ public function storeStatus(array $status) { - if (file_exists($this->getWorkerStatusPath())) { + if (file_exists($this->getProcessStatusPath())) { - $workersStatus = $this->getWorkersStatus(); + $workersStatus = $this->getProcessesStatus(); // ['PID',, 'START_AT', 'STATUS', 'DEAL_TASKS', 'ERRORS', 'running_time', 'memory']; $pids = array_column($workersStatus, 'pid'); + if (!in_array($status['pid'], $pids)) { $workersStatus = array_merge($workersStatus, $status); } else { @@ -52,7 +53,7 @@ trait Store } $this->writeStatusToFile($workersStatus); } else { - file_put_contents($this->getWorkerStatusPath(), \json_encode($status)); + file_put_contents($this->getProcessStatusPath(), \json_encode([$status])); } } @@ -62,9 +63,9 @@ trait Store * @time 2020年07月08日 * @return mixed */ - protected function getWorkersStatus() + protected function getProcessesStatus() { - return \json_decode(file_get_contents($this->getWorkerStatusPath()), true); + return \json_decode(file_get_contents($this->getProcessStatusPath()), true); } /** @@ -76,7 +77,7 @@ trait Store */ protected function unsetWorkerStatus($pid) { - $workers = $this->getWorkersStatus(); + $workers = $this->getProcessesStatus(); foreach ($workers as $k => $worker) { if ($worker['pid'] == $pid) { @@ -96,7 +97,7 @@ trait Store */ protected function writeStatusToFile($status) { - $file = new \SplFileObject($this->getWorkerStatusPath(), 'rw+'); + $file = new \SplFileObject($this->getProcessStatusPath(), 'rw+'); // 加锁 防止多进程写入混乱 $file->flock(LOCK_EX); $file->fwrite(\json_encode($status)); @@ -114,7 +115,7 @@ trait Store // 等待信号输出 sleep(1); - return file_exists($this->getWorkerStatusPath()) ? file_get_contents($this->getWorkerStatusPath()) : ''; + return file_exists($this->getProcessStatusPath()) ? file_get_contents($this->getProcessStatusPath()) : ''; } /** @@ -153,7 +154,7 @@ trait Store * @time 2020年07月07日 * @return string */ - protected function getWorkerStatusPath() + protected function getProcessStatusPath() { $path = runtime_path('schedule' . DIRECTORY_SEPARATOR); @@ -161,6 +162,6 @@ trait Store mkdir($path, 0777, true); } - return $path . 'worker-status.php'; + return $path . 'worker-status.json'; } } \ No newline at end of file