timeTick(1000, $this->schedule()); // 注册信号 $this->registerSignal(); // 存储 pid $this->storeMasterPid(getmypid()); // pid $this->master_pid = getmypid(); // 初始化进程 $this->initProcesses(); } /** * 自定义 tick 关闭协程 * * @time 2020年07月08日 * @param int $time * @param $callable * @return void */ protected function timeTick(int $time, $callable) { // 关闭协程 Timer::set([ 'enable_coroutine' => false, ]); Timer::tick($time, $callable); } /** * 调度 * * @time 2020年07月07日 * @return \Closure */ protected function schedule() { return function () { $schedule = new Schedule(); $schedule->command('catch:cache')->everyThirtySeconds(); foreach ($schedule->getCronTask() as $cron) { if ($cron->can()) { list($waiting, $process) = $this->hasWaitingProcess(); if ($waiting) { // 向 process 投递 cron // var_dump(serialize($cron)); //$process->push(serialize($cron)); var_dump($process->pop()); } else { // 创建临时 process 处理,处理完自动销毁 $this->createProcess($cron); } } } }; } /** * Create Task * * @time 2019年08月06日 * @param Cron $cron * @return void */ protected function createProcess(Cron $cron) { $process = new Process(function (Process $process) use($cron) { echo 'hello world'; //$cron->run(); $process->exit(); }); // $process->name(sprintf('worker: ')); $process->start(); } /** * 创建静态 worker 进程 * * @time 2020年07月05日 * @return Process */ protected function createStaticProcess() { $process = new Process($this->createProcessCallback()); // 使用非阻塞队列 $process->useQueue(1, 2|Process::IPC_NOWAIT); return $process; } /** * 初始化 workers * * @time 2020年07月03日 * @return void */ protected function initProcesses() { for ($i = 0; $i < $this->staticNum; $i++) { $process = $this->createStaticProcess(); // $worker->name("[$i+1]catch-worker"); $process->start(); $this->process[$process->pid] = $this->processInfo($process); } } /** * 记录日志 * * @time 2020年07月07日 * @return void */ protected function log() { fwrite(STDOUT, runtime_path('schedule') . 'error.log'); } }