287 lines
8.7 KiB
PHP
Raw Normal View History

2020-07-07 16:58:16 +08:00
<?php
// +----------------------------------------------------------------------
// | CatchAdmin [Just Like ]
// +----------------------------------------------------------------------
// | Copyright (c) 2017~2020 http://catchadmin.com All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( https://github.com/yanwenwu/catch-admin/blob/master/LICENSE.txt )
// +----------------------------------------------------------------------
// | Author: JaguarJack [ njphper@gmail.com ]
// +----------------------------------------------------------------------
2020-07-07 16:58:34 +08:00
namespace catcher\library\crontab;
use catcher\CatchAdmin;
use think\console\Table;
2020-07-09 21:03:16 +08:00
use think\facade\Log;
2020-07-07 16:58:34 +08:00
trait Process
{
2020-08-08 20:43:58 +08:00
/**
* quit 退出
*
* @var boolean
*/
protected $quit =false;
/**
* 设置最大内存/256M
*
* @var [type]
*/
protected $maxMemory = 256 * 1024 * 1024;
/**
* 创建进程
*
* @return void
*/
2020-07-07 16:58:34 +08:00
protected function createProcessCallback()
{
return function (\Swoole\Process $process) {
// 必须使用 pcntl signal 注册捕获
// Swoole\Process::signal ignalfd 和 EventLoop 是异步 IO不能用于阻塞的程序中会导致注册的监听回调函数得不到调度
// 同步阻塞的程序可以使用 pcntl 扩展提供的 pcntl_signal
// 安全退出进程
2020-08-08 20:43:58 +08:00
pcntl_signal(SIGTERM, function() {
$this->quit = true;
2020-07-07 16:58:34 +08:00
});
pcntl_signal(SIGUSR1, function () use ($process){
2020-07-08 17:48:45 +08:00
// todo
2020-07-09 21:03:16 +08:00
$this->updateTask($process->pid);
2020-07-08 17:48:45 +08:00
});
2020-07-07 16:58:34 +08:00
while (true) {
2020-07-09 21:03:16 +08:00
$cron = $process->pop();
if ($cron && is_string($cron)) {
$cron = unserialize($cron);
$this->beforeTask($process->pid);
try {
$cron->run();
} catch (\Throwable $e) {
$this->addErrors($process->pid);
Log::error($e->getMessage() . ': at ' . $e->getFile() . ' ' . $e->getLine() . '行'.
PHP_EOL . $e->getTraceAsString());
2020-07-08 17:48:45 +08:00
}
2020-07-09 21:03:16 +08:00
$this->afterTask($process->pid);
2020-07-09 15:41:53 +08:00
}
2020-07-07 16:58:34 +08:00
pcntl_signal_dispatch();
2020-07-08 17:48:45 +08:00
sleep(1);
2020-08-08 20:43:58 +08:00
// 超过最大内存
if (memory_get_usage() > $this->maxMemory) {
$this->quit = true;
}
2020-07-07 16:58:34 +08:00
// 如果收到安全退出的信号,需要在最后任务处理完成之后退出
2020-08-08 20:43:58 +08:00
if ($this->quit) {
2020-07-09 21:03:16 +08:00
Log::info('worker quit');
2020-07-07 16:58:34 +08:00
$process->exit(0);
}
}
};
}
/**
* 进程信息
*
* @time 2020年07月05日
* @param $process
* @return array
*/
protected function processInfo($process)
{
return [
'pid' => $process->pid,
2020-07-09 21:03:16 +08:00
'memory' => memory_get_usage(),
2020-07-07 16:58:34 +08:00
'start_at' => time(),
2020-07-09 08:28:29 +08:00
'running_time' => 0,
2020-07-09 21:03:16 +08:00
'status' => self::WAITING,
2020-07-09 08:28:29 +08:00
'deal_tasks' => 0,
'errors' => 0,
2020-07-07 16:58:34 +08:00
];
}
/**
* 是否有等待的 Process
*
* @time 2020年07月07日
* @return array
*/
protected function hasWaitingProcess()
{
$waiting = [false, null];
2020-07-09 08:28:29 +08:00
$pid = 0;
2020-07-09 08:37:03 +08:00
// $processIds
2020-07-09 21:03:16 +08:00
foreach ($this->table as $process) {
2020-07-07 16:58:34 +08:00
if ($process['status'] == self::WAITING) {
2020-07-09 08:28:29 +08:00
$pid = $process['pid'];
2020-07-07 16:58:34 +08:00
break;
}
}
2020-07-09 08:37:03 +08:00
// 获取相应的进程投递任务
2020-07-09 08:28:29 +08:00
if (isset($this->processes[$pid])) {
return [true, $this->processes[$pid]];
}
2020-07-07 16:58:34 +08:00
return $waiting;
}
/**
* 处理任务前
*
* @time 2020年07月07日
* @param $pid
* @return void
*/
protected function beforeTask($pid)
{
2020-07-09 21:03:16 +08:00
if ($process = $this->table->get($this->getColumnKey($pid))) {
$process['status'] = self::BUSYING;
$process['running_time'] = time() - $process['start_at'];
$process['memory'] = memory_get_usage();
$this->table->set($this->getColumnKey($pid), $process);
2020-07-07 16:58:34 +08:00
}
}
/**
* 处理任务后
*
* @time 2020年07月07日
* @param $pid
* @return void
*/
protected function afterTask($pid)
{
2020-07-09 21:03:16 +08:00
if ($process = $this->table->get($this->getColumnKey($pid))) {
$process['status'] = self::WAITING;
$process['running_time'] = time() - $process['start_at'];
$process['memory'] = memory_get_usage();
$process['deal_tasks'] += 1;
$this->table->set($this->getColumnKey($pid), $process);
}
}
2020-07-09 08:28:29 +08:00
2020-07-09 21:03:16 +08:00
/**
* 更新信息
*
* @time 2020年07月09日
* @param $pid
* @return void
*/
protected function updateTask($pid)
{
if ($process = $this->table->get($this->getColumnKey($pid))) {
$process['running_time'] = time() - $process['start_at'];
$process['memory'] = memory_get_usage();
$this->table->set($this->getColumnKey($pid), $process);
2020-07-07 16:58:34 +08:00
}
2020-07-09 21:03:16 +08:00
}
2020-07-09 08:28:29 +08:00
2020-07-09 21:03:16 +08:00
/**
* 增加错误
*
* @time 2020年07月09日
* @param $pid
* @return void
*/
protected function addErrors($pid)
{
if ($process = $this->table->get($this->getColumnKey($pid))) {
$process['errors'] += 1;
$this->table->set($this->getColumnKey($pid), $process);
}
2020-07-07 16:58:34 +08:00
}
/**
* 退出服务
*
* @time 2020年07月07日
* @return void
*/
public function stop()
{
\Swoole\Process::kill($this->getMasterPid(), SIGTERM);
}
/**
* 状态输出
*
* @time 2020年07月07日
* @return void
*/
public function status()
{
\Swoole\Process::kill($this->getMasterPid(), SIGUSR1);
}
/**
* 子进程重启
*
* @time 2020年07月07日
* @return void
*/
public function reload()
{
\Swoole\Process::kill($this->getMasterPid(), SIGUSR2);
}
/**
* 输出 process 信息
*
* @time 2020年07月05日
* @return string
*/
public function renderProcessesStatusToString()
2020-07-07 16:58:34 +08:00
{
$scheduleV = self::VERSION;
$adminV = CatchAdmin::VERSION;
$phpV = PHP_VERSION;
2020-07-09 21:03:16 +08:00
$processNumber = $this->table->count();
2020-07-09 15:41:53 +08:00
$memory = (int)(memory_get_usage()/1024/1024). 'M';
$startAt = date('Y-m-d H:i:s', $this->master_start_at);
$runtime = gmstrftime('%H:%M:%S', time() - $this->master_start_at);
2020-07-07 16:58:34 +08:00
$info = <<<EOT
-------------------------------------------------------------------------------------------------------
| ____ _ _ _ _ _ ____ _ _ _ |
| / ___|__ _| |_ ___| |__ / \ __| |_ __ ___ (_)_ __ / ___| ___| |__ ___ __| |_ _| | ___ |
| | | / _` | __/ __| '_ \ / _ \ / _` | '_ ` _ \| | '_ \ \___ \ / __| '_ \ / _ \/ _` | | | | |/ _ \ |
| | |__| (_| | || (__| | | |/ ___ \ (_| | | | | | | | | | | ___) | (__| | | | __/ (_| | |_| | | __/ |
| \____\__,_|\__\___|_| |_/_/ \_\__,_|_| |_| |_|_|_| |_| |____/ \___|_| |_|\___|\__,_|\__,_|_|\___| |
| ----------------------------------------- CatchAdmin Schedule ---------------------------------------|
2020-07-09 15:41:53 +08:00
| Schedule Version: $scheduleV CatchAdmin Version: $adminV PHP Version: $phpV |
| Process Number: $processNumber Memory: $memory Start at: $startAt |
| Running Time: $runtime |
2020-07-07 16:58:34 +08:00
|------------------------------------------------------------------------------------------------------|
EOT;
$table = new Table();
$table->setHeader([
'pid', 'memory', 'start_at', 'running_time', 'status', 'deal_tasks','errors'
], 2);
2020-07-07 16:58:34 +08:00
$processes = [];
2020-07-09 21:03:16 +08:00
foreach ($this->table as $process) {
2020-07-07 16:58:34 +08:00
$processes[] = [
$process['pid'],
(int)($process['memory']/1024/1024) . 'M',
date('Y-m-d H:i', $process['start_at']),
gmstrftime('%H:%M:%S', $process['running_time']),
$process['status'],
$process['deal_tasks'],
$process['errors'],
2020-07-07 16:58:34 +08:00
];
}
$table->setRows($processes, 2);
2020-07-07 16:58:34 +08:00
$table->render();
return $info . PHP_EOL . $table->render();
}
}