From bcc423714036f75931be6145cfee5bbb35ad1b10 Mon Sep 17 00:00:00 2001 From: JaguarJack Date: Wed, 8 Jul 2020 17:48:45 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=97=A0=E6=B3=95=E5=87=86=E6=97=B6=E5=90=AF=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- extend/catcher/library/crontab/Cron.php | 6 +-- .../catcher/library/crontab/ManageProcess.php | 53 ++++++++++++++++--- extend/catcher/library/crontab/Process.php | 34 ++++++------ .../library/crontab/RegisterSignal.php | 12 +++-- 4 files changed, 75 insertions(+), 30 deletions(-) diff --git a/extend/catcher/library/crontab/Cron.php b/extend/catcher/library/crontab/Cron.php index 2613fd4..64d009f 100644 --- a/extend/catcher/library/crontab/Cron.php +++ b/extend/catcher/library/crontab/Cron.php @@ -106,17 +106,17 @@ class Cron * @time 2020年07月04日 * @return bool */ - protected function can() + public function can() { if ($this->second) { $now = date('s', time()); - return in_array($now, [--$now, $now, ++$now]); + return ($now % 5) == 0; } if ($this->expression) { $cron = CronExpression::factory($this->expression); - return $cron->isDue('now'); + return $cron->getNextRunDate(date('Y-m-d H:i:s'), 0 , true)->getTimestamp() == time(); } return false; diff --git a/extend/catcher/library/crontab/ManageProcess.php b/extend/catcher/library/crontab/ManageProcess.php index 737067c..96b72f5 100644 --- a/extend/catcher/library/crontab/ManageProcess.php +++ b/extend/catcher/library/crontab/ManageProcess.php @@ -12,6 +12,7 @@ namespace catcher\library\crontab; use Swoole\Process; use catcher\library\crontab\Process as MProcess; +use Swoole\Timer; class ManageProcess { @@ -29,7 +30,7 @@ class ManageProcess * * @var int */ - protected $staticNum = 2; + protected $staticNum = 1; /** * 存储 process 信息 @@ -78,9 +79,9 @@ class ManageProcess // 守护进程 // Process::daemon(true, false); // alarm 信号 - Process::alarm(1000 * 1000); + // Process::alarm(1000 * 1000); // 1s 调度一次 - swoole_timer_tick(1000, $this->schedule());//不会继承 + $this->timeTick(1000, $this->schedule()); // 注册信号 $this->registerSignal(); // 存储 pid @@ -91,6 +92,24 @@ class ManageProcess $this->initProcesses(); } + /** + * 自定义 tick 关闭协程 + * + * @time 2020年07月08日 + * @param int $time + * @param $callable + * @return void + */ + protected function timeTick(int $time, $callable) + { + // 关闭协程 + Timer::set([ + 'enable_coroutine' => false, + ]); + + Timer::tick($time, $callable); + } + /** * 调度 * @@ -101,13 +120,16 @@ class ManageProcess { return function () { $schedule = new Schedule(); - $schedule->command('route:list')->everyFiveMinutes(); + $schedule->command('catch:cache')->everyThirtySeconds(); foreach ($schedule->getCronTask() as $cron) { if ($cron->can()) { list($waiting, $process) = $this->hasWaitingProcess(); if ($waiting) { // 向 process 投递 cron + // var_dump(serialize($cron)); + //$process->push(serialize($cron)); + var_dump($process->pop()); } else { // 创建临时 process 处理,处理完自动销毁 $this->createProcess($cron); @@ -127,11 +149,12 @@ class ManageProcess protected function createProcess(Cron $cron) { $process = new Process(function (Process $process) use($cron) { - $cron->run(); + echo 'hello world'; + //$cron->run(); $process->exit(); }); - $process->name(sprintf('worker: ')); + // $process->name(sprintf('worker: ')); $process->start(); } @@ -144,7 +167,12 @@ class ManageProcess */ protected function createStaticProcess() { - return new Process($this->createProcessCallback()); + $process = new Process($this->createProcessCallback()); + + // 使用非阻塞队列 + $process->useQueue(1, 2|Process::IPC_NOWAIT); + + return $process; } /** @@ -164,4 +192,15 @@ class ManageProcess $this->process[$process->pid] = $this->processInfo($process); } } + + /** + * 记录日志 + * + * @time 2020年07月07日 + * @return void + */ + protected function log() + { + fwrite(STDOUT, runtime_path('schedule') . 'error.log'); + } } \ No newline at end of file diff --git a/extend/catcher/library/crontab/Process.php b/extend/catcher/library/crontab/Process.php index 9612674..8499701 100644 --- a/extend/catcher/library/crontab/Process.php +++ b/extend/catcher/library/crontab/Process.php @@ -12,6 +12,7 @@ namespace catcher\library\crontab; use catcher\CatchAdmin; use think\console\Table; +use think\facade\Log; trait Process { @@ -27,30 +28,32 @@ trait Process $quit = true; }); + pcntl_signal(SIGUSR1, function (){ + // todo + }); + while (true) { //$data = $worker->pop(); - $this->beforeTask($process->pid); - $this->afterTask($process->pid); + /**if ($cron = $process->pop()) { + if (is_string($cron) && $cron) { + var_dump($cron); + //$cron = unserialize($cron); - var_dump($this->process); - var_dump(isset($this->process[$process->pid]), $process->pid); - // 处理任务前 - // 处理任务中 - // 处理任务后 - //var_dump(unserialize($data)); - // echo "来自主进程的消息:". $worker->pop().',来自管道'.$worker->pipe.',当前的进程id为'.$worker->pid.PHP_EOL; - // $worker->push('hello 主进程'); //不要当做管道使用 + $this->beforeTask($process->pid); - // 睡眠一秒 让出 CPU 调度 + //$cron->run(); - // var_dump(123); + $this->afterTask($process->pid); + + //$process->push('from process' . $process->pid); + } + }*/ pcntl_signal_dispatch(); - sleep(5); + sleep(1); // 如果收到安全退出的信号,需要在最后任务处理完成之后退出 if ($quit) { - var_dump(1000); $process->exit(0); } } @@ -88,7 +91,7 @@ trait Process foreach ($this->process as $process) { if ($process['status'] == self::WAITING) { - $waiting = [true, $process]; + $waiting = [true, $process['name']]; break; } } @@ -122,7 +125,6 @@ trait Process if (isset($this->process[$pid])) { $this->process[$pid]['status'] = self::WAITING; $this->process[$pid]['deal_num'] += 1; - var_dump($this->process); } } diff --git a/extend/catcher/library/crontab/RegisterSignal.php b/extend/catcher/library/crontab/RegisterSignal.php index 819321c..6346a9b 100644 --- a/extend/catcher/library/crontab/RegisterSignal.php +++ b/extend/catcher/library/crontab/RegisterSignal.php @@ -82,7 +82,7 @@ trait RegisterSignal foreach ($this->process as $process) { Process::kill($process['pid'], SIGTERM); } - + // 退出 master Process::kill($this->master_pid, SIGKILL); }; } @@ -96,7 +96,11 @@ trait RegisterSignal protected function workerStatus() { return function () { - $this->storeStatus(); + // $this->storeStatus(); + var_dump(123); + foreach ($this->process as $process) { + Process::kill($process['pid'], SIGUSR1); + } }; } @@ -109,14 +113,14 @@ trait RegisterSignal protected function smoothReloadWorkers() { return function () { + // 使用队列, 会发生主进程往一个不存在的进程发送消息吗? + var_dump('send'); foreach ($this->process as $process) { - var_dump($process['pid']); Process::kill((int)$process['pid'], SIGTERM); } }; } - /** * 管道破裂信号 *