修复定时任务无法准时启动

This commit is contained in:
JaguarJack 2020-07-08 17:48:45 +08:00
parent 4fec98f939
commit bcc4237140
4 changed files with 75 additions and 30 deletions

View File

@ -106,17 +106,17 @@ class Cron
* @time 2020年07月04日 * @time 2020年07月04日
* @return bool * @return bool
*/ */
protected function can() public function can()
{ {
if ($this->second) { if ($this->second) {
$now = date('s', time()); $now = date('s', time());
return in_array($now, [--$now, $now, ++$now]); return ($now % 5) == 0;
} }
if ($this->expression) { if ($this->expression) {
$cron = CronExpression::factory($this->expression); $cron = CronExpression::factory($this->expression);
return $cron->isDue('now'); return $cron->getNextRunDate(date('Y-m-d H:i:s'), 0 , true)->getTimestamp() == time();
} }
return false; return false;

View File

@ -12,6 +12,7 @@ namespace catcher\library\crontab;
use Swoole\Process; use Swoole\Process;
use catcher\library\crontab\Process as MProcess; use catcher\library\crontab\Process as MProcess;
use Swoole\Timer;
class ManageProcess class ManageProcess
{ {
@ -29,7 +30,7 @@ class ManageProcess
* *
* @var int * @var int
*/ */
protected $staticNum = 2; protected $staticNum = 1;
/** /**
* 存储 process 信息 * 存储 process 信息
@ -78,9 +79,9 @@ class ManageProcess
// 守护进程 // 守护进程
// Process::daemon(true, false); // Process::daemon(true, false);
// alarm 信号 // alarm 信号
Process::alarm(1000 * 1000); // Process::alarm(1000 * 1000);
// 1s 调度一次 // 1s 调度一次
swoole_timer_tick(1000, $this->schedule());//不会继承 $this->timeTick(1000, $this->schedule());
// 注册信号 // 注册信号
$this->registerSignal(); $this->registerSignal();
// 存储 pid // 存储 pid
@ -91,6 +92,24 @@ class ManageProcess
$this->initProcesses(); $this->initProcesses();
} }
/**
* 自定义 tick 关闭协程
*
* @time 2020年07月08日
* @param int $time
* @param $callable
* @return void
*/
protected function timeTick(int $time, $callable)
{
// 关闭协程
Timer::set([
'enable_coroutine' => false,
]);
Timer::tick($time, $callable);
}
/** /**
* 调度 * 调度
* *
@ -101,13 +120,16 @@ class ManageProcess
{ {
return function () { return function () {
$schedule = new Schedule(); $schedule = new Schedule();
$schedule->command('route:list')->everyFiveMinutes(); $schedule->command('catch:cache')->everyThirtySeconds();
foreach ($schedule->getCronTask() as $cron) { foreach ($schedule->getCronTask() as $cron) {
if ($cron->can()) { if ($cron->can()) {
list($waiting, $process) = $this->hasWaitingProcess(); list($waiting, $process) = $this->hasWaitingProcess();
if ($waiting) { if ($waiting) {
// 向 process 投递 cron // 向 process 投递 cron
// var_dump(serialize($cron));
//$process->push(serialize($cron));
var_dump($process->pop());
} else { } else {
// 创建临时 process 处理,处理完自动销毁 // 创建临时 process 处理,处理完自动销毁
$this->createProcess($cron); $this->createProcess($cron);
@ -127,11 +149,12 @@ class ManageProcess
protected function createProcess(Cron $cron) protected function createProcess(Cron $cron)
{ {
$process = new Process(function (Process $process) use($cron) { $process = new Process(function (Process $process) use($cron) {
$cron->run(); echo 'hello world';
//$cron->run();
$process->exit(); $process->exit();
}); });
$process->name(sprintf('worker: ')); // $process->name(sprintf('worker: '));
$process->start(); $process->start();
} }
@ -144,7 +167,12 @@ class ManageProcess
*/ */
protected function createStaticProcess() protected function createStaticProcess()
{ {
return new Process($this->createProcessCallback()); $process = new Process($this->createProcessCallback());
// 使用非阻塞队列
$process->useQueue(1, 2|Process::IPC_NOWAIT);
return $process;
} }
/** /**
@ -164,4 +192,15 @@ class ManageProcess
$this->process[$process->pid] = $this->processInfo($process); $this->process[$process->pid] = $this->processInfo($process);
} }
} }
/**
* 记录日志
*
* @time 2020年07月07日
* @return void
*/
protected function log()
{
fwrite(STDOUT, runtime_path('schedule') . 'error.log');
}
} }

View File

@ -12,6 +12,7 @@ namespace catcher\library\crontab;
use catcher\CatchAdmin; use catcher\CatchAdmin;
use think\console\Table; use think\console\Table;
use think\facade\Log;
trait Process trait Process
{ {
@ -27,30 +28,32 @@ trait Process
$quit = true; $quit = true;
}); });
pcntl_signal(SIGUSR1, function (){
// todo
});
while (true) { while (true) {
//$data = $worker->pop(); //$data = $worker->pop();
$this->beforeTask($process->pid); /**if ($cron = $process->pop()) {
$this->afterTask($process->pid); if (is_string($cron) && $cron) {
var_dump($cron);
//$cron = unserialize($cron);
var_dump($this->process); $this->beforeTask($process->pid);
var_dump(isset($this->process[$process->pid]), $process->pid);
// 处理任务前
// 处理任务中
// 处理任务后
//var_dump(unserialize($data));
// echo "来自主进程的消息:". $worker->pop().',来自管道'.$worker->pipe.',当前的进程id为'.$worker->pid.PHP_EOL;
// $worker->push('hello 主进程'); //不要当做管道使用
// 睡眠一秒 让出 CPU 调度 //$cron->run();
// var_dump(123); $this->afterTask($process->pid);
//$process->push('from process' . $process->pid);
}
}*/
pcntl_signal_dispatch(); pcntl_signal_dispatch();
sleep(5); sleep(1);
// 如果收到安全退出的信号,需要在最后任务处理完成之后退出 // 如果收到安全退出的信号,需要在最后任务处理完成之后退出
if ($quit) { if ($quit) {
var_dump(1000);
$process->exit(0); $process->exit(0);
} }
} }
@ -88,7 +91,7 @@ trait Process
foreach ($this->process as $process) { foreach ($this->process as $process) {
if ($process['status'] == self::WAITING) { if ($process['status'] == self::WAITING) {
$waiting = [true, $process]; $waiting = [true, $process['name']];
break; break;
} }
} }
@ -122,7 +125,6 @@ trait Process
if (isset($this->process[$pid])) { if (isset($this->process[$pid])) {
$this->process[$pid]['status'] = self::WAITING; $this->process[$pid]['status'] = self::WAITING;
$this->process[$pid]['deal_num'] += 1; $this->process[$pid]['deal_num'] += 1;
var_dump($this->process);
} }
} }

View File

@ -82,7 +82,7 @@ trait RegisterSignal
foreach ($this->process as $process) { foreach ($this->process as $process) {
Process::kill($process['pid'], SIGTERM); Process::kill($process['pid'], SIGTERM);
} }
// 退出 master
Process::kill($this->master_pid, SIGKILL); Process::kill($this->master_pid, SIGKILL);
}; };
} }
@ -96,7 +96,11 @@ trait RegisterSignal
protected function workerStatus() protected function workerStatus()
{ {
return function () { return function () {
$this->storeStatus(); // $this->storeStatus();
var_dump(123);
foreach ($this->process as $process) {
Process::kill($process['pid'], SIGUSR1);
}
}; };
} }
@ -109,14 +113,14 @@ trait RegisterSignal
protected function smoothReloadWorkers() protected function smoothReloadWorkers()
{ {
return function () { return function () {
// 使用队列, 会发生主进程往一个不存在的进程发送消息吗?
var_dump('send');
foreach ($this->process as $process) { foreach ($this->process as $process) {
var_dump($process['pid']);
Process::kill((int)$process['pid'], SIGTERM); Process::kill((int)$process['pid'], SIGTERM);
} }
}; };
} }
/** /**
* 管道破裂信号 * 管道破裂信号
* *