diff --git a/extend/catcher/library/crontab/Master.php b/extend/catcher/library/crontab/Master.php index 6374784..f0e5ba9 100644 --- a/extend/catcher/library/crontab/Master.php +++ b/extend/catcher/library/crontab/Master.php @@ -14,9 +14,9 @@ use Swoole\Process; use catcher\library\crontab\Process as MProcess; use Swoole\Timer; -class ManageProcess +class Master { - use RegisterSignal, MProcess, Store; + use RegisterSignal, MProcess, Store, Table; /** * 动态扩展的最大 process 数量 @@ -58,12 +58,14 @@ class ManageProcess */ protected $master_start_at; + + protected $table; /** - * process status 存储文件 + * 日志 * - * @var string + * @var */ - protected $processStatus = 'process-status'; + protected $logHandle; // 版本 const VERSION = '1.0.0'; @@ -82,7 +84,7 @@ class ManageProcess public function start() { // 守护进程 - // Process::daemon(true, false); + //Process::daemon(true, false); // alarm 信号 // Process::alarm(1000 * 1000); // 1s 调度一次 @@ -92,10 +94,11 @@ class ManageProcess // pid $this->master_pid = getmypid(); $this->master_start_at = time(); + // 初始化 + $this->init(); // 存储 pid $this->storeMasterPid($this->master_pid); - // 初始化文件 - $this->initFiles(); + // 初始化进程 $this->initProcesses(); } @@ -135,7 +138,6 @@ class ManageProcess list($waiting, $process) = $this->hasWaitingProcess(); if ($waiting) { // 向 process 投递 cron - var_dump(serialize($cron)); $process->push(serialize($cron)); } else { // 创建临时 process 处理,处理完自动销毁 @@ -189,8 +191,6 @@ class ManageProcess */ protected function initProcesses() { - - for ($i = 0; $i < $this->staticNum; $i++) { $process = $this->createStaticProcess(); @@ -200,10 +200,20 @@ class ManageProcess $this->processes[$process->pid] = $process; - $this->storeStatus($this->processInfo($process)); + $this->addColumn($this->getColumnKey($process->pid), $this->processInfo($process)); } + } - $this->saveProcessStatus(); + /** + * 栏目 KEY + * + * @time 2020年07月09日 + * @param $pid + * @return string + */ + protected function getColumnKey($pid) + { + return 'process_'. $pid; } /** @@ -212,9 +222,37 @@ class ManageProcess * @time 2020年07月09日 * @return void */ - protected function initFiles() + protected function init() { - file_put_contents($this->getProcessStatusPath(), ''); - file_put_contents($this->schedulePath() . 'error.log', ''); + $this->staticNum = config('catch.schedule.static_worker_number'); + + $this->maxNum = config('catch.schedule.max_worker_number'); + + $this->initLog(); + + $this->createTable(); + } + + /** + * 日志初始化 + * + * @time 2020年07月09日 + * @return void + */ + protected function initLog() + { + $log = config('log.channels.file'); + + $log['path'] = config('catch.schedule.store_path'); + + $channels = config('log.channels'); + + $channels['schedule'] = $log; + + + config([ + 'channels' => $channels, + 'default' => 'schedule', + ], 'log'); } } \ No newline at end of file diff --git a/extend/catcher/library/crontab/Process.php b/extend/catcher/library/crontab/Process.php index 0ea652f..3fd9e0c 100644 --- a/extend/catcher/library/crontab/Process.php +++ b/extend/catcher/library/crontab/Process.php @@ -12,6 +12,7 @@ namespace catcher\library\crontab; use catcher\CatchAdmin; use think\console\Table; +use think\facade\Log; trait Process { @@ -29,31 +30,28 @@ trait Process pcntl_signal(SIGUSR1, function () use ($process){ // todo - $this->afterTask($process->pid); + $this->updateTask($process->pid); }); while (true) { - if ($cron = $process->pop()) { - if (is_string($cron) && $cron) { - - $cron = unserialize($cron); - - $this->beforeTask($process->pid); - - try { - $cron->run(); - } catch (\Throwable $e) { - file_put_contents($this->schedulePath() . 'schedule.log', $e . PHP_EOL, FILE_APPEND); -; } - - $this->afterTask($process->pid); + $cron = $process->pop(); + if ($cron && is_string($cron)) { + $cron = unserialize($cron); + $this->beforeTask($process->pid); + try { + $cron->run(); + } catch (\Throwable $e) { + $this->addErrors($process->pid); + Log::error($e->getMessage() . ': at ' . $e->getFile() . ' ' . $e->getLine() . '行'. + PHP_EOL . $e->getTraceAsString()); } + $this->afterTask($process->pid); } pcntl_signal_dispatch(); sleep(1); - // 如果收到安全退出的信号,需要在最后任务处理完成之后退出 if ($quit) { + Log::info('worker quit'); $process->exit(0); } } @@ -71,10 +69,10 @@ trait Process { return [ 'pid' => $process->pid, - 'status' => self::WAITING, + 'memory' => memory_get_usage(), 'start_at' => time(), 'running_time' => 0, - 'memory' => memory_get_usage(), + 'status' => self::WAITING, 'deal_tasks' => 0, 'errors' => 0, ]; @@ -92,11 +90,8 @@ trait Process $pid = 0; - // 获取等待状态的 worker - $processes = $this->getProcessesStatus(); - // $processIds - foreach ($processes as $process) { + foreach ($this->table as $process) { if ($process['status'] == self::WAITING) { $pid = $process['pid']; break; @@ -120,18 +115,12 @@ trait Process */ protected function beforeTask($pid) { - $processes = $this->getProcessesStatus(); - - foreach ($processes as &$process) { - if ($process['pid'] == $pid) { - $process['status'] = self::BUSYING; - $process['running_time'] = time() - $process['start_at']; - $process['memory'] = memory_get_usage(); - break; - } + if ($process = $this->table->get($this->getColumnKey($pid))) { + $process['status'] = self::BUSYING; + $process['running_time'] = time() - $process['start_at']; + $process['memory'] = memory_get_usage(); + $this->table->set($this->getColumnKey($pid), $process); } - - $this->writeStatusToFile($processes); } /** @@ -143,18 +132,44 @@ trait Process */ protected function afterTask($pid) { - $processes = $this->getProcessesStatus(); - - foreach ($processes as &$process) { - if ($process['pid'] == $pid) { - $process['status'] = self::WAITING; - $process['running_time'] = time() - $process['start_at']; - $process['memory'] = memory_get_usage(); - break; - } + if ($process = $this->table->get($this->getColumnKey($pid))) { + $process['status'] = self::WAITING; + $process['running_time'] = time() - $process['start_at']; + $process['memory'] = memory_get_usage(); + $process['deal_tasks'] += 1; + $this->table->set($this->getColumnKey($pid), $process); } + } - $this->writeStatusToFile($processes); + /** + * 更新信息 + * + * @time 2020年07月09日 + * @param $pid + * @return void + */ + protected function updateTask($pid) + { + if ($process = $this->table->get($this->getColumnKey($pid))) { + $process['running_time'] = time() - $process['start_at']; + $process['memory'] = memory_get_usage(); + $this->table->set($this->getColumnKey($pid), $process); + } + } + + /** + * 增加错误 + * + * @time 2020年07月09日 + * @param $pid + * @return void + */ + protected function addErrors($pid) + { + if ($process = $this->table->get($this->getColumnKey($pid))) { + $process['errors'] += 1; + $this->table->set($this->getColumnKey($pid), $process); + } } /** @@ -202,7 +217,7 @@ trait Process $adminV = CatchAdmin::VERSION; $phpV = PHP_VERSION; - $processNumber = count($this->processes); + $processNumber = $this->table->count(); $memory = (int)(memory_get_usage()/1024/1024). 'M'; $startAt = date('Y-m-d H:i:s', $this->master_start_at); $runtime = gmstrftime('%H:%M:%S', time() - $this->master_start_at); @@ -227,7 +242,7 @@ EOT; $processes = []; - foreach ($this->getProcessesStatus() as $process) { + foreach ($this->table as $process) { $processes[] = [ $process['pid'], (int)($process['memory']/1024/1024) . 'M', diff --git a/extend/catcher/library/crontab/RegisterSignal.php b/extend/catcher/library/crontab/RegisterSignal.php index d61ee16..28ccd27 100644 --- a/extend/catcher/library/crontab/RegisterSignal.php +++ b/extend/catcher/library/crontab/RegisterSignal.php @@ -100,6 +100,10 @@ trait RegisterSignal foreach ($this->processes as $pid => $process) { Process::kill($pid, SIGUSR1); } + + usleep(100); + + $this->saveProcessStatus(); }; } diff --git a/extend/catcher/library/crontab/Store.php b/extend/catcher/library/crontab/Store.php index bae7ee4..b34342c 100644 --- a/extend/catcher/library/crontab/Store.php +++ b/extend/catcher/library/crontab/Store.php @@ -26,48 +26,6 @@ trait Store return file_put_contents($path, $pid); } - /** - * 存储信息 - * - * @time 2020年07月07日 - * @param array $status - * @return void - */ - public function storeStatus(array $status) - { - $workersStatus = $this->getProcessesStatus(); - - if (empty($workersStatus)) { - $this->writeStatusToFile([$status]); - } else { - // ['PID',, 'START_AT', 'STATUS', 'DEAL_TASKS', 'ERRORS', 'running_time', 'memory']; - $pids = array_column($workersStatus, 'pid'); - - if (!in_array($status['pid'], $pids)) { - $workersStatus = array_merge($workersStatus, $status); - } else { - foreach ($workersStatus as &$workerStatus) { - if ($workersStatus['pid'] == $status['pid']) { - $workersStatus = $status; - break; - } - } - } - $this->writeStatusToFile($workersStatus); - } - } - - /** - * 获取进程间信息 - * - * @time 2020年07月08日 - * @return mixed - */ - protected function getProcessesStatus() - { - return \json_decode(file_get_contents($this->getProcessStatusPath()), true); - } - /** * 清除退出的 worker 信息 * @@ -77,44 +35,9 @@ trait Store */ protected function unsetWorkerStatus($pid) { - $workers = $this->getProcessesStatus(); - - foreach ($workers as $k => $worker) { - if ($worker['pid'] == $pid) { - unset($workers[$k]); - } - } - - $this->writeStatusToFile($workers); + $this->table->del($this->getColumnKey($pid)); } - /** - * 写入文件 - * - * @time 2020年07月08日 - * @param $status - * @return void - */ - protected function writeStatusToFile($status) - { - $this->writeContentToFile($this->getProcessStatusPath(), \json_encode($status)); - } - - /** - * 写入内容 - * - * @time 2020年07月09日 - * @param $path - * @param $content - * @return void - */ - protected function writeContentToFile($path, $content) - { - $file = new \SplFileObject($path, 'rw+'); - $file->flock(LOCK_EX); - $file->fwrite($content); - $file->flock(LOCK_UN); - } /** * 输出 @@ -125,7 +48,7 @@ trait Store public function output() { // 等待信号输出 - sleep(1); + usleep(500); return $this->getProcessStatusInfo(); } @@ -151,7 +74,7 @@ trait Store */ protected function getMasterPidPath() { - return $this->schedulePath() . 'master.pid'; + return config('catch.schedule.master_pid_file'); } /** @@ -162,7 +85,7 @@ trait Store */ protected function schedulePath() { - $path = runtime_path('schedule' . DIRECTORY_SEPARATOR); + $path = config('catch.schedule.store_path'); if (!is_dir($path)) { mkdir($path, 0777, true); @@ -171,16 +94,6 @@ trait Store return $path; } - /** - * 获取 worker 状态存储地址 - * - * @time 2020年07月07日 - * @return string - */ - protected function getProcessStatusPath() - { - return $this->schedulePath() . 'worker-status.json'; - } /** * 进程状态文件 @@ -212,8 +125,6 @@ trait Store */ protected function getProcessStatusInfo() { - $this->saveProcessStatus(); - return file_get_contents($this->getSaveProcessStatusFile()); } } \ No newline at end of file