daemon) { Process::daemon(); } if ($this->interval) { Process::alarm($this->interval); } $this->init(); // 初始化进程池 $this->initWorkers(); // 设置进程名称 Process::setWorkerName($this->name . ' master'); // 注册信号 $this->registerSignal(); // 写入进程状态 $this->setWorkerStatus($this->name . ' master'); // 信号发送 while (true) { Process::dispatch(); $pid = pcntl_waitpid(-1, $status, WNOHANG); Process::dispatch(); if ($pid > 0) { if (isset($this->workerIds[$pid])) { unset($this->workerIds[$pid]); $this->deleteWorkerStatusFile($pid); $this->worker_start_at = time(); // 如果进程挂掉,正常退出码都是 0,当然这里可以自己定义,看 exit($status) 设置什么 // 真实的 exit code pcntl_wexitstatus 函数获取 // exit code > 0 都是由于异常导致的 $exitCode = pcntl_wexitstatus($status); if (!in_array($exitCode, [255, 250])) { $this->forkStatic(); } } // 如果静态工作进程全部退出,会发生 CPU 空转,所以这里需要 sleep 1 if (!count($this->workerIds)) { // sleep(1); self::exitMasterDo(); exit(0); } } usleep(500 * 1000); } } catch (\Throwable $exception) { // todo echo sprintf('[%s]: ', date('Y-m-d H:i:s')) . $exception->getMessage(); } } /** * 初始化 * @throws Exception */ protected function init() { $this->displayErrors(); $this->start_at = $this->worker_start_at = time(); // 记录 masterID FileSystem::put(self::masterPidStorePath(), posix_getpid()); // 保存信息 $this->saveTaskInfo(); // 初始化进程数量 $this->allWorkersCount = $this->static; // 显示UI $this->display(); // 重定向 $this->dup(); } /** * 初始化进程池 * * @time 2020年07月21日 * @return void */ protected function initWorkers() { $this->redis = $this->getRedisHandle(); for ($i = 0; $i < $this->static; $i++) { $this->forkStatic(); } } /** * fork 进程 * * @time 2020年07月21日 * @return void */ protected function forkDynamic() { $process = new Process(function (Process $process) { $redis = $this->getRedisHandle(); while($crontab = $redis->rpop($this->crontabQueueName)) { $task = $this->getTaskObject(\json_decode($crontab, true)); $task->run(); } $process->exit(); }); $process->start(); $this->workerIds[$process->pid] = true; } /** * 静态进程 * * @time 2020年07月21日 * @return void */ protected function forkStatic() { $process = new Process(function (Process $process) { $process->initMemory(); $name = $this->name . ' worker'; $this->setWorkerStatus($name, $this->dealNum, $this->status); Process::setWorkerName($name); Process::signal(SIGUSR2, function ($signal) use ($name) { $this->setWorkerStatus($name, $this->dealNum, $this->status); }); // 该信号保证进程完成任务后安全退出 Process::signal(SIGTERM, function ($signal) { $this->exitSafely = true; }); while (true) { /************** 任务 *********************/ $this->status = 'busying'; $this->setWorkerStatus($name, $this->dealNum, 'busying'); // 处理定时任务 while ($crontab = $this->redis->rpop($this->crontabQueueName)) { $task = $this->getTaskObject(\json_decode($crontab, true)); $task->run(); if ($task->shouldExit()) { $process->exit(250); } $this->dealNum += 1; } $this->status = 'waiting'; $this->setWorkerStatus($name, $this->dealNum, 'waiting'); /****************处理*********************/ // 暂停一秒 让出CPU调度 sleep(1); // 信号调度 Process::dispatch(); // 是否需要安全退出 || 查看内存是否溢出 if ($this->exitSafely || $process->isMemoryOverflow()) { $process->exit(); //exit(0); } } }); $process->start(); $this->workerIds[$process->pid] = true; } /** * 重定向文件流 * * @time 2020年07月22日 * @return void * @throws Exception */ protected function dup() { if (!$this->daemon) { return; } global $stdout, $stderr; // 关闭标准输入输出 fclose(STDOUT); fclose(STDIN); fclose(STDERR); // 重定向输出&错误 $stdoutPath = self::$stdout ?: self::stdoutPath(); !file_exists($stdoutPath) && touch($stdoutPath); // 等待 100 毫秒 usleep(100 * 1000); $stdout = fopen($stdoutPath, 'a'); $stderr = fopen($stdoutPath, 'a'); return; } /** * 输出 * * @time 2020年07月21日 * @return string */ public function output() { $isShowCtrlC = $this->daemon ? '' : 'Ctrl+c to stop' . "\r\n"; $info = <<output(); return fwrite(STDOUT, $this->renderStatus()); } /** * 获取 redis 句柄 * * @time 2020年09月15日 * @return object|null */ protected function getRedisHandle() { return Cache::store('redis')->handler(); } }