298 lines
6.3 KiB
PHP
Raw Normal View History

2020-07-05 16:36:28 +08:00
<?php
// +----------------------------------------------------------------------
// | CatchAdmin [Just Like ]
// +----------------------------------------------------------------------
// | Copyright (c) 2017~2020 http://catchadmin.com All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( https://github.com/yanwenwu/catch-admin/blob/master/LICENSE.txt )
// +----------------------------------------------------------------------
// | Author: JaguarJack [ njphper@gmail.com ]
// +----------------------------------------------------------------------
2020-07-07 16:58:34 +08:00
namespace catcher\library\crontab;
use Swoole\Process;
use catcher\library\crontab\Process as MProcess;
2020-07-08 17:48:45 +08:00
use Swoole\Timer;
2020-07-07 16:58:34 +08:00
2020-07-09 21:03:16 +08:00
class Master
2020-07-07 16:58:34 +08:00
{
2020-07-09 21:03:16 +08:00
use RegisterSignal, MProcess, Store, Table;
2020-07-07 16:58:34 +08:00
/**
* 动态扩展的最大 process 数量
*
* @var int
*/
protected $maxNum;
2020-07-07 16:58:34 +08:00
/**
* 常驻 process
*
* @var int
*/
protected $staticNum;
/**
* 临时进程数量
*
* @var int
*/
protected $temporaryNum = 0;
2020-07-07 16:58:34 +08:00
/**
* 存储 process 信息
*
* @var array
*/
2020-07-09 08:28:29 +08:00
protected $processes = [];
2020-07-07 16:58:34 +08:00
/**
* 主进程ID
*
* @var
*/
protected $master_pid;
2020-07-10 08:35:14 +08:00
/**
* @var string
*/
protected $kernel;
2020-07-07 16:58:34 +08:00
/**
* pid 文件名称
*
* @var string
*/
protected $mater = 'catch-master';
2020-07-09 15:41:53 +08:00
/**
* @var int
*/
protected $master_start_at;
2020-07-09 21:56:17 +08:00
/**
* @var bool
*/
protected $daemon = false;
2020-07-07 16:58:34 +08:00
// 版本
const VERSION = '1.0.0';
// process 等待状态
const WAITING = 'waiting';
// process 繁忙状态
const BUSYING = 'busying';
/**
* 启动进程
*
* @time 2020年07月07日
* @return void
*/
public function start()
{
// 守护进程
2020-07-09 21:56:17 +08:00
if ($this->daemon) {
Process::daemon(true, false);
}
2020-07-07 16:58:34 +08:00
// alarm 信号
2020-07-08 17:48:45 +08:00
// Process::alarm(1000 * 1000);
2020-07-07 16:58:34 +08:00
// 1s 调度一次
2020-07-08 17:48:45 +08:00
$this->timeTick(1000, $this->schedule());
2020-07-07 16:58:34 +08:00
// 注册信号
$this->registerSignal();
// pid
$this->master_pid = getmypid();
2020-07-09 15:41:53 +08:00
$this->master_start_at = time();
2020-07-09 21:03:16 +08:00
// 初始化
$this->init();
// 存储 pid
$this->storeMasterPid($this->master_pid);
2020-07-09 21:03:16 +08:00
2020-07-07 16:58:34 +08:00
// 初始化进程
$this->initProcesses();
}
2020-07-08 17:48:45 +08:00
/**
* 自定义 tick 关闭协程
*
* @time 2020年07月08日
* @param int $time
* @param $callable
* @return void
*/
protected function timeTick(int $time, $callable)
{
// 关闭协程
Timer::set([
'enable_coroutine' => false,
]);
Timer::tick($time, $callable);
}
2020-07-07 16:58:34 +08:00
/**
* 调度
*
* @time 2020年07月07日
* @return \Closure
*/
protected function schedule()
{
return function () {
2020-07-10 08:35:14 +08:00
$kernel = new $this->kernel;
foreach ($kernel->tasks() as $cron) {
2020-07-07 16:58:34 +08:00
if ($cron->can()) {
list($waiting, $process) = $this->hasWaitingProcess();
if ($waiting) {
// 向 process 投递 cron
2020-07-09 15:41:53 +08:00
$process->push(serialize($cron));
2020-07-07 16:58:34 +08:00
} else {
// 创建临时 process 处理,处理完自动销毁
$this->createProcess($cron);
}
}
}
};
}
/**
* Create Task
*
* @time 2019年08月06日
* @param Cron $cron
* @return void
*/
protected function createProcess(Cron $cron)
{
if ($this->isCanCreateTemporaryProcess()) {
$process = new Process(function (Process $process) use ($cron) {
$cron->run();
$process->exit();
});
// $process->name(sprintf('worker: '));
2020-07-07 16:58:34 +08:00
$process->start();
$this->temporaryNum += 1;
}
}
2020-07-07 16:58:34 +08:00
/**
* 是否可以创建临时进程
*
* @time 2020年07月09日
* @return bool
*/
protected function isCanCreateTemporaryProcess()
{
return ($this->table->count() + $this->temporaryNum) < $this->maxNum;
2020-07-07 16:58:34 +08:00
}
/**
* 创建静态 worker 进程
*
* @time 2020年07月05日
* @return Process
*/
protected function createStaticProcess()
{
2020-07-08 17:48:45 +08:00
$process = new Process($this->createProcessCallback());
// 使用非阻塞队列
$process->useQueue(1, 2|Process::IPC_NOWAIT);
return $process;
2020-07-07 16:58:34 +08:00
}
/**
* 初始化 workers
*
* @time 2020年07月03日
* @return void
*/
protected function initProcesses()
{
for ($i = 0; $i < $this->staticNum; $i++) {
$process = $this->createStaticProcess();
// $worker->name("[$i+1]catch-worker");
2020-07-09 08:28:29 +08:00
2020-07-07 16:58:34 +08:00
$process->start();
2020-07-09 08:28:29 +08:00
$this->processes[$process->pid] = $process;
2020-07-09 21:03:16 +08:00
$this->addColumn($this->getColumnKey($process->pid), $this->processInfo($process));
2020-07-07 16:58:34 +08:00
}
2020-07-09 21:03:16 +08:00
}
2020-07-09 21:03:16 +08:00
/**
* 栏目 KEY
*
* @time 2020年07月09日
* @param $pid
* @return string
*/
protected function getColumnKey($pid)
{
return 'process_'. $pid;
2020-07-07 16:58:34 +08:00
}
2020-07-08 17:48:45 +08:00
/**
2020-07-09 15:41:53 +08:00
* 初始化文件
2020-07-08 17:48:45 +08:00
*
2020-07-09 15:41:53 +08:00
* @time 2020年07月09日
2020-07-08 17:48:45 +08:00
* @return void
*/
2020-07-09 21:03:16 +08:00
protected function init()
{
$this->staticNum = config('catch.schedule.static_worker_number');
$this->maxNum = config('catch.schedule.max_worker_number');
$this->initLog();
2020-07-09 21:56:17 +08:00
file_put_contents($this->getSaveProcessStatusFile(), '');
2020-07-09 21:03:16 +08:00
$this->createTable();
2020-07-10 08:35:14 +08:00
$this->kernel = config('catch.schedule.schedule_kernel');
2020-07-09 21:03:16 +08:00
}
/**
* 日志初始化
*
* @time 2020年07月09日
* @return void
*/
protected function initLog()
2020-07-08 17:48:45 +08:00
{
2020-07-09 21:03:16 +08:00
$log = config('log.channels.file');
$log['path'] = config('catch.schedule.store_path');
$channels = config('log.channels');
$channels['schedule'] = $log;
config([
'channels' => $channels,
'default' => 'schedule',
], 'log');
2020-07-08 17:48:45 +08:00
}
2020-07-09 21:56:17 +08:00
/**
* 开启 debug
*
* @time 2020年07月09日
* @return $this
*/
public function daemon()
{
$this->daemon = true;
return $this;
}
2020-07-07 16:58:34 +08:00
}