worker状态切换到table内存操作

This commit is contained in:
JaguarJack 2020-07-09 21:03:16 +08:00
parent 40ed25816c
commit 9335556197
4 changed files with 122 additions and 154 deletions

View File

@ -14,9 +14,9 @@ use Swoole\Process;
use catcher\library\crontab\Process as MProcess; use catcher\library\crontab\Process as MProcess;
use Swoole\Timer; use Swoole\Timer;
class ManageProcess class Master
{ {
use RegisterSignal, MProcess, Store; use RegisterSignal, MProcess, Store, Table;
/** /**
* 动态扩展的最大 process 数量 * 动态扩展的最大 process 数量
@ -58,12 +58,14 @@ class ManageProcess
*/ */
protected $master_start_at; protected $master_start_at;
protected $table;
/** /**
* process status 存储文件 * 日志
* *
* @var string * @var
*/ */
protected $processStatus = 'process-status'; protected $logHandle;
// 版本 // 版本
const VERSION = '1.0.0'; const VERSION = '1.0.0';
@ -82,7 +84,7 @@ class ManageProcess
public function start() public function start()
{ {
// 守护进程 // 守护进程
// Process::daemon(true, false); //Process::daemon(true, false);
// alarm 信号 // alarm 信号
// Process::alarm(1000 * 1000); // Process::alarm(1000 * 1000);
// 1s 调度一次 // 1s 调度一次
@ -92,10 +94,11 @@ class ManageProcess
// pid // pid
$this->master_pid = getmypid(); $this->master_pid = getmypid();
$this->master_start_at = time(); $this->master_start_at = time();
// 初始化
$this->init();
// 存储 pid // 存储 pid
$this->storeMasterPid($this->master_pid); $this->storeMasterPid($this->master_pid);
// 初始化文件
$this->initFiles();
// 初始化进程 // 初始化进程
$this->initProcesses(); $this->initProcesses();
} }
@ -135,7 +138,6 @@ class ManageProcess
list($waiting, $process) = $this->hasWaitingProcess(); list($waiting, $process) = $this->hasWaitingProcess();
if ($waiting) { if ($waiting) {
// 向 process 投递 cron // 向 process 投递 cron
var_dump(serialize($cron));
$process->push(serialize($cron)); $process->push(serialize($cron));
} else { } else {
// 创建临时 process 处理,处理完自动销毁 // 创建临时 process 处理,处理完自动销毁
@ -189,8 +191,6 @@ class ManageProcess
*/ */
protected function initProcesses() protected function initProcesses()
{ {
for ($i = 0; $i < $this->staticNum; $i++) { for ($i = 0; $i < $this->staticNum; $i++) {
$process = $this->createStaticProcess(); $process = $this->createStaticProcess();
@ -200,10 +200,20 @@ class ManageProcess
$this->processes[$process->pid] = $process; $this->processes[$process->pid] = $process;
$this->storeStatus($this->processInfo($process)); $this->addColumn($this->getColumnKey($process->pid), $this->processInfo($process));
} }
}
$this->saveProcessStatus(); /**
* 栏目 KEY
*
* @time 2020年07月09日
* @param $pid
* @return string
*/
protected function getColumnKey($pid)
{
return 'process_'. $pid;
} }
/** /**
@ -212,9 +222,37 @@ class ManageProcess
* @time 2020年07月09日 * @time 2020年07月09日
* @return void * @return void
*/ */
protected function initFiles() protected function init()
{ {
file_put_contents($this->getProcessStatusPath(), ''); $this->staticNum = config('catch.schedule.static_worker_number');
file_put_contents($this->schedulePath() . 'error.log', '');
$this->maxNum = config('catch.schedule.max_worker_number');
$this->initLog();
$this->createTable();
}
/**
* 日志初始化
*
* @time 2020年07月09日
* @return void
*/
protected function initLog()
{
$log = config('log.channels.file');
$log['path'] = config('catch.schedule.store_path');
$channels = config('log.channels');
$channels['schedule'] = $log;
config([
'channels' => $channels,
'default' => 'schedule',
], 'log');
} }
} }

View File

@ -12,6 +12,7 @@ namespace catcher\library\crontab;
use catcher\CatchAdmin; use catcher\CatchAdmin;
use think\console\Table; use think\console\Table;
use think\facade\Log;
trait Process trait Process
{ {
@ -29,31 +30,28 @@ trait Process
pcntl_signal(SIGUSR1, function () use ($process){ pcntl_signal(SIGUSR1, function () use ($process){
// todo // todo
$this->afterTask($process->pid); $this->updateTask($process->pid);
}); });
while (true) { while (true) {
if ($cron = $process->pop()) { $cron = $process->pop();
if (is_string($cron) && $cron) { if ($cron && is_string($cron)) {
$cron = unserialize($cron);
$cron = unserialize($cron); $this->beforeTask($process->pid);
try {
$this->beforeTask($process->pid); $cron->run();
} catch (\Throwable $e) {
try { $this->addErrors($process->pid);
$cron->run(); Log::error($e->getMessage() . ': at ' . $e->getFile() . ' ' . $e->getLine() . '行'.
} catch (\Throwable $e) { PHP_EOL . $e->getTraceAsString());
file_put_contents($this->schedulePath() . 'schedule.log', $e . PHP_EOL, FILE_APPEND);
; }
$this->afterTask($process->pid);
} }
$this->afterTask($process->pid);
} }
pcntl_signal_dispatch(); pcntl_signal_dispatch();
sleep(1); sleep(1);
// 如果收到安全退出的信号,需要在最后任务处理完成之后退出 // 如果收到安全退出的信号,需要在最后任务处理完成之后退出
if ($quit) { if ($quit) {
Log::info('worker quit');
$process->exit(0); $process->exit(0);
} }
} }
@ -71,10 +69,10 @@ trait Process
{ {
return [ return [
'pid' => $process->pid, 'pid' => $process->pid,
'status' => self::WAITING, 'memory' => memory_get_usage(),
'start_at' => time(), 'start_at' => time(),
'running_time' => 0, 'running_time' => 0,
'memory' => memory_get_usage(), 'status' => self::WAITING,
'deal_tasks' => 0, 'deal_tasks' => 0,
'errors' => 0, 'errors' => 0,
]; ];
@ -92,11 +90,8 @@ trait Process
$pid = 0; $pid = 0;
// 获取等待状态的 worker
$processes = $this->getProcessesStatus();
// $processIds // $processIds
foreach ($processes as $process) { foreach ($this->table as $process) {
if ($process['status'] == self::WAITING) { if ($process['status'] == self::WAITING) {
$pid = $process['pid']; $pid = $process['pid'];
break; break;
@ -120,18 +115,12 @@ trait Process
*/ */
protected function beforeTask($pid) protected function beforeTask($pid)
{ {
$processes = $this->getProcessesStatus(); if ($process = $this->table->get($this->getColumnKey($pid))) {
$process['status'] = self::BUSYING;
foreach ($processes as &$process) { $process['running_time'] = time() - $process['start_at'];
if ($process['pid'] == $pid) { $process['memory'] = memory_get_usage();
$process['status'] = self::BUSYING; $this->table->set($this->getColumnKey($pid), $process);
$process['running_time'] = time() - $process['start_at'];
$process['memory'] = memory_get_usage();
break;
}
} }
$this->writeStatusToFile($processes);
} }
/** /**
@ -143,18 +132,44 @@ trait Process
*/ */
protected function afterTask($pid) protected function afterTask($pid)
{ {
$processes = $this->getProcessesStatus(); if ($process = $this->table->get($this->getColumnKey($pid))) {
$process['status'] = self::WAITING;
foreach ($processes as &$process) { $process['running_time'] = time() - $process['start_at'];
if ($process['pid'] == $pid) { $process['memory'] = memory_get_usage();
$process['status'] = self::WAITING; $process['deal_tasks'] += 1;
$process['running_time'] = time() - $process['start_at']; $this->table->set($this->getColumnKey($pid), $process);
$process['memory'] = memory_get_usage();
break;
}
} }
}
$this->writeStatusToFile($processes); /**
* 更新信息
*
* @time 2020年07月09日
* @param $pid
* @return void
*/
protected function updateTask($pid)
{
if ($process = $this->table->get($this->getColumnKey($pid))) {
$process['running_time'] = time() - $process['start_at'];
$process['memory'] = memory_get_usage();
$this->table->set($this->getColumnKey($pid), $process);
}
}
/**
* 增加错误
*
* @time 2020年07月09日
* @param $pid
* @return void
*/
protected function addErrors($pid)
{
if ($process = $this->table->get($this->getColumnKey($pid))) {
$process['errors'] += 1;
$this->table->set($this->getColumnKey($pid), $process);
}
} }
/** /**
@ -202,7 +217,7 @@ trait Process
$adminV = CatchAdmin::VERSION; $adminV = CatchAdmin::VERSION;
$phpV = PHP_VERSION; $phpV = PHP_VERSION;
$processNumber = count($this->processes); $processNumber = $this->table->count();
$memory = (int)(memory_get_usage()/1024/1024). 'M'; $memory = (int)(memory_get_usage()/1024/1024). 'M';
$startAt = date('Y-m-d H:i:s', $this->master_start_at); $startAt = date('Y-m-d H:i:s', $this->master_start_at);
$runtime = gmstrftime('%H:%M:%S', time() - $this->master_start_at); $runtime = gmstrftime('%H:%M:%S', time() - $this->master_start_at);
@ -227,7 +242,7 @@ EOT;
$processes = []; $processes = [];
foreach ($this->getProcessesStatus() as $process) { foreach ($this->table as $process) {
$processes[] = [ $processes[] = [
$process['pid'], $process['pid'],
(int)($process['memory']/1024/1024) . 'M', (int)($process['memory']/1024/1024) . 'M',

View File

@ -100,6 +100,10 @@ trait RegisterSignal
foreach ($this->processes as $pid => $process) { foreach ($this->processes as $pid => $process) {
Process::kill($pid, SIGUSR1); Process::kill($pid, SIGUSR1);
} }
usleep(100);
$this->saveProcessStatus();
}; };
} }

View File

@ -26,48 +26,6 @@ trait Store
return file_put_contents($path, $pid); return file_put_contents($path, $pid);
} }
/**
* 存储信息
*
* @time 2020年07月07日
* @param array $status
* @return void
*/
public function storeStatus(array $status)
{
$workersStatus = $this->getProcessesStatus();
if (empty($workersStatus)) {
$this->writeStatusToFile([$status]);
} else {
// ['PID',, 'START_AT', 'STATUS', 'DEAL_TASKS', 'ERRORS', 'running_time', 'memory'];
$pids = array_column($workersStatus, 'pid');
if (!in_array($status['pid'], $pids)) {
$workersStatus = array_merge($workersStatus, $status);
} else {
foreach ($workersStatus as &$workerStatus) {
if ($workersStatus['pid'] == $status['pid']) {
$workersStatus = $status;
break;
}
}
}
$this->writeStatusToFile($workersStatus);
}
}
/**
* 获取进程间信息
*
* @time 2020年07月08日
* @return mixed
*/
protected function getProcessesStatus()
{
return \json_decode(file_get_contents($this->getProcessStatusPath()), true);
}
/** /**
* 清除退出的 worker 信息 * 清除退出的 worker 信息
* *
@ -77,44 +35,9 @@ trait Store
*/ */
protected function unsetWorkerStatus($pid) protected function unsetWorkerStatus($pid)
{ {
$workers = $this->getProcessesStatus(); $this->table->del($this->getColumnKey($pid));
foreach ($workers as $k => $worker) {
if ($worker['pid'] == $pid) {
unset($workers[$k]);
}
}
$this->writeStatusToFile($workers);
} }
/**
* 写入文件
*
* @time 2020年07月08日
* @param $status
* @return void
*/
protected function writeStatusToFile($status)
{
$this->writeContentToFile($this->getProcessStatusPath(), \json_encode($status));
}
/**
* 写入内容
*
* @time 2020年07月09日
* @param $path
* @param $content
* @return void
*/
protected function writeContentToFile($path, $content)
{
$file = new \SplFileObject($path, 'rw+');
$file->flock(LOCK_EX);
$file->fwrite($content);
$file->flock(LOCK_UN);
}
/** /**
* 输出 * 输出
@ -125,7 +48,7 @@ trait Store
public function output() public function output()
{ {
// 等待信号输出 // 等待信号输出
sleep(1); usleep(500);
return $this->getProcessStatusInfo(); return $this->getProcessStatusInfo();
} }
@ -151,7 +74,7 @@ trait Store
*/ */
protected function getMasterPidPath() protected function getMasterPidPath()
{ {
return $this->schedulePath() . 'master.pid'; return config('catch.schedule.master_pid_file');
} }
/** /**
@ -162,7 +85,7 @@ trait Store
*/ */
protected function schedulePath() protected function schedulePath()
{ {
$path = runtime_path('schedule' . DIRECTORY_SEPARATOR); $path = config('catch.schedule.store_path');
if (!is_dir($path)) { if (!is_dir($path)) {
mkdir($path, 0777, true); mkdir($path, 0777, true);
@ -171,16 +94,6 @@ trait Store
return $path; return $path;
} }
/**
* 获取 worker 状态存储地址
*
* @time 2020年07月07日
* @return string
*/
protected function getProcessStatusPath()
{
return $this->schedulePath() . 'worker-status.json';
}
/** /**
* 进程状态文件 * 进程状态文件
@ -212,8 +125,6 @@ trait Store
*/ */
protected function getProcessStatusInfo() protected function getProcessStatusInfo()
{ {
$this->saveProcessStatus();
return file_get_contents($this->getSaveProcessStatusFile()); return file_get_contents($this->getSaveProcessStatusFile());
} }
} }