afterTask($process->pid); }); while (true) { //$data = $worker->pop(); /**if ($cron = $process->pop()) { if (is_string($cron) && $cron) { var_dump($cron); //$cron = unserialize($cron); $this->beforeTask($process->pid); //$cron->run(); $this->afterTask($process->pid); //$process->push('from process' . $process->pid); } }*/ pcntl_signal_dispatch(); sleep(1); // 如果收到安全退出的信号,需要在最后任务处理完成之后退出 if ($quit) { $process->exit(0); } } }; } /** * 进程信息 * * @time 2020年07月05日 * @param $process * @return array */ protected function processInfo($process) { return [ 'pid' => $process->pid, 'status' => self::WAITING, 'start_at' => time(), 'running_time' => 0, 'memory' => memory_get_usage(), 'deal_tasks' => 0, 'errors' => 0, ]; } /** * 是否有等待的 Process * * @time 2020年07月07日 * @return array */ protected function hasWaitingProcess() { $waiting = [false, null]; $pid = 0; // 获取等待状态的 worker $processes = $this->getProcessesStatus(); // $processIds foreach ($processes as $process) { if ($process['status'] == self::WAITING) { $pid = $process['pid']; break; } } // 获取相应的状态 if (isset($this->processes[$pid])) { return [true, $this->processes[$pid]]; } return $waiting; } /** * 处理任务前 * * @time 2020年07月07日 * @param $pid * @return void */ protected function beforeTask($pid) { $processes = $this->getProcessesStatus(); foreach ($processes as &$process) { if ($process['pid'] == $pid) { $process['status'] = self::BUSYING; $process['running_time'] = time() - $process['start_at']; $process['memory'] = memory_get_usage(); break; } } $this->writeStatusToFile($processes); } /** * 处理任务后 * * @time 2020年07月07日 * @param $pid * @return void */ protected function afterTask($pid) { $processes = $this->getProcessesStatus(); foreach ($processes as &$process) { if ($process['pid'] == $pid) { $process['status'] = self::WAITING; $process['running_time'] = time() - $process['start_at']; $process['memory'] = memory_get_usage(); break; } } $this->writeStatusToFile($processes); } /** * 退出服务 * * @time 2020年07月07日 * @return void */ public function stop() { \Swoole\Process::kill($this->getMasterPid(), SIGTERM); } /** * 状态输出 * * @time 2020年07月07日 * @return void */ public function status() { \Swoole\Process::kill($this->getMasterPid(), SIGUSR1); } /** * 子进程重启 * * @time 2020年07月07日 * @return void */ public function reload() { \Swoole\Process::kill($this->getMasterPid(), SIGUSR2); } /** * 输出 process 信息 * * @time 2020年07月05日 * @return string */ public function renderProcessesStatusToString() { $scheduleV = self::VERSION; $adminV = CatchAdmin::VERSION; $phpV = PHP_VERSION; $info = <<setHeader([ 'pid', 'memory', 'start_at', 'running_time', 'status', 'deal_tasks','errors' ], 2); $processes = []; foreach ($this->getProcessesStatus() as $process) { $processes[] = [ $process['pid'], (int)($process['memory']/1024/1024) . 'M', date('Y-m-d H:i', $process['start_at']), gmstrftime('%H:%M:%S', $process['running_time']), $process['status'], $process['deal_tasks'], $process['errors'], ]; } $table->setRows($processes, 2); $table->render(); return $info . PHP_EOL . $table->render(); } }