From a3f378eca84a50a54f753529d83b54b65ec20918 Mon Sep 17 00:00:00 2001 From: JaguarJack Date: Wed, 8 Jul 2020 19:47:35 +0800 Subject: [PATCH] =?UTF-8?q?=E5=86=99=E5=85=A5=20worker=20=E7=8A=B6?= =?UTF-8?q?=E6=80=81=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../library/crontab/RegisterSignal.php | 3 +- extend/catcher/library/crontab/Store.php | 74 ++++++++++++++++++- 2 files changed, 71 insertions(+), 6 deletions(-) diff --git a/extend/catcher/library/crontab/RegisterSignal.php b/extend/catcher/library/crontab/RegisterSignal.php index 6346a9b..ff326d1 100644 --- a/extend/catcher/library/crontab/RegisterSignal.php +++ b/extend/catcher/library/crontab/RegisterSignal.php @@ -63,6 +63,7 @@ trait RegisterSignal return function () { while ($res = Process::wait(false)) { if (isset($this->process[$res['pid']])) { + $this->unsetWorkerStatus($res['pid']); unset($this->process[$res['pid']]); } } @@ -97,7 +98,6 @@ trait RegisterSignal { return function () { // $this->storeStatus(); - var_dump(123); foreach ($this->process as $process) { Process::kill($process['pid'], SIGUSR1); } @@ -114,7 +114,6 @@ trait RegisterSignal { return function () { // 使用队列, 会发生主进程往一个不存在的进程发送消息吗? - var_dump('send'); foreach ($this->process as $process) { Process::kill((int)$process['pid'], SIGTERM); } diff --git a/extend/catcher/library/crontab/Store.php b/extend/catcher/library/crontab/Store.php index 3faedf3..395f6e9 100644 --- a/extend/catcher/library/crontab/Store.php +++ b/extend/catcher/library/crontab/Store.php @@ -30,11 +30,77 @@ trait Store * 存储信息 * * @time 2020年07月07日 - * @return false|int + * @param array $status + * @return void */ - public function storeStatus() + public function storeStatus(array $status) { - return file_put_contents($this->getWorkerStatusPath(), $this->getWorkerStatus()); + if (file_exists($this->getWorkerStatusPath())) { + + $workersStatus = $this->getWorkersStatus(); + // ['PID',, 'START_AT', 'STATUS', 'DEAL_TASKS', 'ERRORS', 'running_time', 'memory']; + $pids = array_column($workersStatus, 'pid'); + if (!in_array($status['pid'], $pids)) { + $workersStatus = array_merge($workersStatus, $status); + } else { + foreach ($workersStatus as &$workerStatus) { + if ($workersStatus['pid'] == $status['pid']) { + $workersStatus = $status; + break; + } + } + } + $this->writeStatusToFile($workersStatus); + } else { + file_put_contents($this->getWorkerStatusPath(), \json_encode($status)); + } + } + + /** + * 获取进程间信息 + * + * @time 2020年07月08日 + * @return mixed + */ + protected function getWorkersStatus() + { + return \json_decode(file_get_contents($this->getWorkerStatusPath()), true); + } + + /** + * 清除退出的 worker 信息 + * + * @time 2020年07月08日 + * @param $pid + * @return void + */ + protected function unsetWorkerStatus($pid) + { + $workers = $this->getWorkersStatus(); + + foreach ($workers as $k => $worker) { + if ($worker['pid'] == $pid) { + unset($workers[$k]); + } + } + + $this->writeStatusToFile($workers); + } + + /** + * 写入文件 + * + * @time 2020年07月08日 + * @param $status + * @return void + */ + protected function writeStatusToFile($status) + { + $file = new \SplFileObject($this->getWorkerStatusPath(), 'rw+'); + // 加锁 防止多进程写入混乱 + $file->flock(LOCK_EX); + $file->fwrite(\json_encode($status)); + $file->flock(LOCK_UN); } /** @@ -95,6 +161,6 @@ trait Store mkdir($path, 0777, true); } - return $path . 'worker-status.txt'; + return $path . 'worker-status.php'; } } \ No newline at end of file