摘要:作為定時任務的執(zhí)行者,通過每喚醒自身一次,然后把執(zhí)行表遍歷一次,挑選當下需要執(zhí)行的任務,通過投遞出去并更新該任務執(zhí)行表中的狀態(tài)。
作者:bromine
鏈接:https://www.jianshu.com/p/b44...
來源:簡書
著作權歸作者所有,本文已獲得作者授權轉載,并對原文進行了重新的排版。
Swoft Github: https://github.com/swoft-clou...
Swoft源碼剖析系列目錄:https://segmentfault.com/a/11...前言
Swoft的任務功能基于Swoole的Task機制,或者說Swoft的Task機制本質就是對Swoole的Task機制的封裝和加強。
任務投遞//SwoftTaskTask.php class Task { /** * Deliver coroutine or async task * * @param string $taskName * @param string $methodName * @param array $params * @param string $type * @param int $timeout * * @return bool|array * @throws TaskException */ public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3) { $data = TaskHelper::pack($taskName, $methodName, $params, $type); if(!App::isWorkerStatus() && !App::isCoContext()){ return self::deliverByQueue($data);//見下文Command章節(jié) } if(!App::isWorkerStatus() && App::isCoContext()){ throw new TaskException("Please deliver task by http!"); } $server = App::$server->getServer(); // Delier coroutine task if ($type == self::TYPE_CO) { $tasks[0] = $data; $prifleKey = "task" . "." . $taskName . "." . $methodName; App::profileStart($prifleKey); $result = $server->taskCo($tasks, $timeout); App::profileEnd($prifleKey); return $result; } // Deliver async task return $server->task($data); } }
任務投遞Task::deliver()將調用參數打包后根據$type參數通過Swoole的$server->taskCo()或$server->task()接口投遞到Task進程。
Task本身始終是同步執(zhí)行的,$type僅僅影響投遞這一操作的行為,Task::TYPE_ASYNC對應的$server->task()是異步投遞,Task::deliver()調用后馬上返回;Task::TYPE_CO對應的$server->taskCo()是協(xié)程投遞,投遞后讓出協(xié)程控制,任務完成或執(zhí)行超時后Task::deliver()才從協(xié)程返回。
//SwoftTaskBootstrapListenersTaskEventListener /** * The listener of swoole task * @SwooleListener({ * SwooleEvent::ON_TASK, * SwooleEvent::ON_FINISH, * }) */ class TaskEventListener implements TaskInterface, FinishInterface { /** * @param SwooleServer $server * @param int $taskId * @param int $workerId * @param mixed $data * @return mixed * @throws InvalidArgumentException */ public function onTask(Server $server, int $taskId, int $workerId, $data) { try { /* @var TaskExecutor $taskExecutor*/ $taskExecutor = App::getBean(TaskExecutor::class); $result = $taskExecutor->run($data); } catch (Throwable $throwable) { App::error(sprintf("TaskExecutor->run %s file=%s line=%d ", $throwable->getMessage(), $throwable->getFile(), $throwable->getLine())); $result = false; // Release system resources App::trigger(AppEvent::RESOURCE_RELEASE); App::trigger(TaskEvent::AFTER_TASK); } return $result; } }
此處是swoole.onTask的事件回調,其職責僅僅是將將Worker進程投遞來的打包后的數據轉發(fā)給TaskExecutor。
Swoole的Task機制的本質是Worker進程將耗時任務投遞給同步的Task進程(又名TaskWorker)處理,所以swoole.onTask的事件回調是在Task進程中執(zhí)行的。上文說過,Worker進程是你大部分HTTP服務代碼執(zhí)行的環(huán)境,但是從TaskEventListener.onTask()方法開始,代碼的執(zhí)行環(huán)境都是Task進程,也就是說,TaskExecutor和具體的TaskBean都是執(zhí)行在Task進程中的。
//SwoftTaskTaskExecutor /** * The task executor * * @Bean() */ class TaskExecutor { /** * @param string $data * @return mixed */ public function run(string $data) { $data = TaskHelper::unpack($data); $name = $data["name"]; $type = $data["type"]; $method = $data["method"]; $params = $data["params"]; $logid = $data["logid"] ?? uniqid("", true); $spanid = $data["spanid"] ?? 0; $collector = TaskCollector::getCollector(); if (!isset($collector["task"][$name])) { return false; } list(, $coroutine) = $collector["task"][$name]; $task = App::getBean($name); if ($coroutine) { $result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type); } else { $result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type); } return $result; } }
任務執(zhí)行思路很簡單,將Worker進程發(fā)過來的數據解包還原成原來的調用參數,根據$name參數找到對應的TaskBean并調用其對應的task()方法。其中TaskBean使用類級別注解@Task(name="TaskName")或者@Task("TaskName")聲明。
值得一提的一點是,@Task注解除了name屬性,還有一個coroutine屬性,上述代碼會根據該參數選擇使用協(xié)程的runCoTask()或者同步的runSyncTask()執(zhí)行Task。但是由于而且由于Swoole的Task進程的執(zhí)行是完全同步的,不支持協(xié)程,所以目前版本請該參數不要配置為true。同樣的在TaskBean中編寫的任務代碼必須的同步阻塞的或者是要能根據環(huán)境自動將異步非阻塞和協(xié)程降級為同步阻塞的
從Process中投遞任務前面我們提到:
Swoole的Task機制的本質是Worker進程將耗時任務投遞給同步的Task進程(又名 TaskWorker)處理。
換句話說,Swoole的$server->taskCo()或$server->task()都只能在Worker進程中使用。
這個限制大大的限制了使用場景。 如何能夠為了能夠在Process中投遞任務呢?Swoft為了繞過這個限制提供了Task::deliverByProcess()方法。其實現(xiàn)原理也很簡單,通過Swoole的$server->sendMessage()方法將調用信息從Process中投遞到Worker進程中,然后由Worker進程替其投遞到Task進程當中,相關代碼如下:
//SwoftTaskTask.php /** * Deliver task by process * * @param string $taskName * @param string $methodName * @param array $params * @param string $type * @param int $timeout * @param int $workId * * @return bool */ public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool { /* @var PipeMessageInterface $pipeMessage */ $server = App::$server->getServer(); $pipeMessage = App::getBean(PipeMessage::class); $data = [ "name" => $taskName, "method" => $methodName, "params" => $params, "timeout" => $timeout, "type" => $type, ]; $message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data); return $server->sendMessage($message, $workId); }
數據打包后使用$server->sendMessage()投遞給Worker:
//SwoftBootstrapServerServerTrait.php /** * onPipeMessage event callback * * @param SwooleServer $server * @param int $srcWorkerId * @param string $message * @return void * @throws InvalidArgumentException */ public function onPipeMessage(Server $server, int $srcWorkerId, string $message) { /* @var PipeMessageInterface $pipeMessage */ $pipeMessage = App::getBean(PipeMessage::class); list($type, $data) = $pipeMessage->unpack($message); App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId); }
$server->sendMessage后,Worker進程收到數據時會觸發(fā)一個swoole.pipeMessage事件的回調,Swoft會將其轉換成自己的swoft.pipeMessage事件并觸發(fā).
//SwoftTaskEventListenersPipeMessageListener.php /** * The pipe message listener * * @Listener(event=AppEvent::PIPE_MESSAGE) */ class PipeMessageListener implements EventHandlerInterface { /** * @param SwoftEventEventInterface $event */ public function handle(EventInterface $event) { $params = $event->getParams(); if (count($params) < 3) { return; } list($type, $data, $srcWorkerId) = $params; if ($type != PipeMessage::MESSAGE_TYPE_TASK) { return; } $type = $data["type"]; $taskName = $data["name"]; $params = $data["params"]; $timeout = $data["timeout"]; $methodName = $data["method"]; // delever task Task::deliver($taskName, $methodName, $params, $type, $timeout); } }
swoft.pipeMessage事件最終由PipeMessageListener處理。在相關的監(jiān)聽其中,如果發(fā)現(xiàn)swoft.pipeMessage事件由Task::deliverByProcess()產生的,Worker進程會替其執(zhí)行一次Task::deliver(),最終將任務數據投遞到TaskWorker進程中。
一道簡單的回顧練習:從Task::deliverByProcess()到某TaskBean 最終執(zhí)行任務,經歷了哪些進程,而調用鏈的哪些部分又分別是在哪些進程中執(zhí)行?
從Command進程或其子進程中投遞任務//SwoftTaskQueueTask.php /** * @param string $data * @param int $taskWorkerId * @param int $srcWorkerId * * @return bool */ public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null) { if ($taskWorkerId === null) { $taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum); } if ($srcWorkerId === null) { $srcWorkerId = mt_rand(0, $this->workerNum - 1); } $this->check(); $data = $this->pack($data, $srcWorkerId); $result = msg_send($this->queueId, $taskWorkerId, $data, false); if (!$result) { return false; } return true; }
對于Command進程的任務投遞,情況會更復雜一點。
上文提到的Process,其往往衍生于Http/Rpc服務,作為同一個Manager的子孫進程,他們能夠拿到SwooleServer的句柄變量,從而通過$server->sendMessage(),$server->task()等方法進行任務投遞。
但在Swoft的體系中,還有一個十分路人的角色: Command。
Command的進程從shell或cronb獨立啟動,和Http/Rpc服務相關的進程沒有親緣關系。因此Command進程以及從Command中啟動的Process進程是沒有辦法拿到SwooleServer的調用句柄直接通過UnixSocket進行任務投遞的。
為了為這種進程提供任務投遞支持,Swoft利用了Swoole的Task進程的一個特殊功能----消息隊列。
同一個項目中Command和HttpRpcServer 通過約定一個message_queue_key獲取到系統(tǒng)內核中的同一條消息隊列,然后Comand進程就可以通過該消息隊列向Task進程投遞任務了。
該機制沒有提供對外的公開方法,僅僅被包含在Task::deliver()方法中,Swoft會根據當前環(huán)境隱式切換投遞方式。但該消息隊列的實現(xiàn)依賴Semaphore拓展,如果你想使用,需要在編譯PHP時加上--enable-sysvmsg參數。
除了手動執(zhí)行的普通任務,Swoft還提供了精度為秒的定時任務功能用來在項目中替代Linux的Crontab功能.
Swoft用兩個前置Process---任務計劃進程:CronTimerProcess和任務執(zhí)行進程CronExecProcess
,和兩張內存數據表-----RunTimeTable(任務(配置)表)OriginTable((任務)執(zhí)行表)用于定時任務的管理調度。
兩張表的每行記錄的結構如下:
SwoftTaskCrontabTableCrontab.php /** * 任務表,記錄用戶配置的任務信息 * 表每行記錄包含的字段如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一確定一條記錄 * @var array $originStruct */ private $originStruct = [ "rule" => [SwooleTable::TYPE_STRING, 100],//定時任務執(zhí)行規(guī)則,對應@Scheduled注解的cron屬性 "taskClass" => [SwooleTable::TYPE_STRING, 255],//任務名 對應@Task的name屬性(默認為類名) "taskMethod" => [SwooleTable::TYPE_STRING, 255],//Task方法,對應@Scheduled注解所在方法 "add_time" => [SwooleTable::TYPE_STRING, 11],//初始化該表內容時的10位時間戳 ]; /** * 執(zhí)行表,記錄短時間內要執(zhí)行的任務列表及其執(zhí)行狀態(tài) * 表每行記錄包含的字段如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一確定一條記錄 * @var array $runTimeStruct */ private $runTimeStruct = [ "taskClass" => [SwooleTable::TYPE_STRING, 255],//同上 "taskMethod" => [SwooleTable::TYPE_STRING, 255],//同上 "minute" => [SwooleTable::TYPE_STRING, 20],//需要執(zhí)行任務的時間,精確到分鐘 格式date("YmdHi") "sec" => [SwooleTable::TYPE_STRING, 20],//需要執(zhí)行任務的時間,精確到分鐘 10位時間戳 "runStatus" => [SwooleTABLE::TYPE_INT, 4],//任務狀態(tài),有 0(未執(zhí)行) 1(已執(zhí)行) 2(執(zhí)行中) 三種。 //注意:這里的執(zhí)行是一個容易誤解的地方,此處的執(zhí)行并不是指任務本身的執(zhí)行,而是值`任務投遞`這一操作的執(zhí)行,從宏觀上看換成 _未投遞_,_已投遞_,_投遞中_描述會更準確。 ];此處為何要使用Swoole的內存Table?
Swoft的的定時任務管理是分別由 任務計劃進程 和 任務執(zhí)行進程 進程負責的。兩個進程的運行共同管理定時任務,如果使用進程間獨立的array()等結構,兩個進程必然需要頻繁的進程間通信。而使用跨進程的Table(本文的Table,除非特別說明,都指Swoole的SwooleTable結構)直接進行進程間數據共享,不僅性能高,操作簡單 還解耦了兩個進程。
為了Table能夠在兩個進程間共同使用,Table必須在Swoole Server啟動前創(chuàng)建并分配內存。具體代碼在SwoftTaskBootstrapListeners->onBeforeStart()中,比較簡單,有興趣的可以自行閱讀。
背景介紹完了,我們來看看這兩個定時任務進程的行為
//SwoftTaskBootstrapProcessCronTimerProcess.php /** * Crontab timer process * * @Process(name="cronTimer", boot=true) */ class CronTimerProcess implements ProcessInterface { /** * @param SwoftProcessProcess $process */ public function run(SwoftProcess $process) { //code.... /* @var SwoftTaskCrontabCrontab $cron*/ $cron = App::getBean("crontab"); // Swoole/HttpServer $server = App::$server->getServer(); $time = (60 - date("s")) * 1000; $server->after($time, function () use ($server, $cron) { // Every minute check all tasks, and prepare the tasks that next execution point needs $cron->checkTask(); $server->tick(60 * 1000, function () use ($cron) { $cron->checkTask(); }); }); } }
//SwoftTaskCrontabCrontab.php /** * 初始化runTimeTable數據 * * @param array $task 任務 * @param array $parseResult 解析crontab命令規(guī)則結果,即Task需要在當前分鐘內的哪些秒執(zhí)行 * @return bool */ private function initRunTimeTableData(array $task, array $parseResult): bool { $runTimeTableTasks = $this->getRunTimeTable()->table; $min = date("YmdHi"); $sec = strtotime(date("Y-m-d H:i")); foreach ($parseResult as $time) { $this->checkTaskQueue(false); $key = $this->getKey($task["rule"], $task["taskClass"], $task["taskMethod"], $min, $time + $sec); $runTimeTableTasks->set($key, [ "taskClass" => $task["taskClass"], "taskMethod" => $task["taskMethod"], "minute" => $min, "sec" => $time + $sec, "runStatus" => self::NORMAL ]); } return true; }
CronTimerProcess是Swoft的定時任務調度進程,其核心方法是Crontab->initRunTimeTableData()。
該進程使用了Swoole的定時器功能,通過SwooleTimer在每分鐘首秒時執(zhí)行的回調,CronTimerProcess每次被喚醒后都會遍歷任務表計算出當前這一分鐘內的60秒分別需要執(zhí)行的任務清單,寫入執(zhí)行表并標記為 未執(zhí)行。
//SwoftTaskBootstrapProcess /** * Crontab process * * @Process(name="cronExec", boot=true) */ class CronExecProcess implements ProcessInterface { /** * @param SwoftProcessProcess $process */ public function run(SwoftProcess $process) { $pname = App::$server->getPname(); $process->name(sprintf("%s cronexec process", $pname)); /** @var SwoftTaskCrontabCrontab $cron */ $cron = App::getBean("crontab"); // Swoole/HttpServer $server = App::$server->getServer(); $server->tick(0.5 * 1000, function () use ($cron) { $tasks = $cron->getExecTasks(); if (!empty($tasks)) { foreach ($tasks as $task) { // Diliver task Task::deliverByProcess($task["taskClass"], $task["taskMethod"]); $cron->finishTask($task["key"]); } } }); } }
CronExecProcess作為定時任務的執(zhí)行者,通過SwooleTimer每0.5s喚醒自身一次,然后把 執(zhí)行表 遍歷一次,挑選當下需要執(zhí)行的任務,通過sendMessage()投遞出去并更新該 任務執(zhí)行表中的狀態(tài)。
該執(zhí)行進程只負責任務的投遞,任務的實際實際執(zhí)行仍然在Task進程中由TaskExecutor處理。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/29080.html
摘要:作者鏈接來源簡書著作權歸作者所有,本文已獲得作者授權轉載,并對原文進行了重新的排版。同時順手整理個人對源碼的相關理解,希望能夠稍微填補學習領域的空白。系列文章只會節(jié)選關鍵代碼輔以思路講解,請自行配合源碼閱讀。 作者:bromine鏈接:https://www.jianshu.com/p/2f6...來源:簡書著作權歸作者所有,本文已獲得作者授權轉載,并對原文進行了重新的排版。Swoft...
摘要:和服務關系最密切的進程是中的進程組,絕大部分業(yè)務處理都在該進程中進行。隨后觸發(fā)一個事件各組件通過該事件進行配置文件加載路由注冊。事件每個請求到來時僅僅會觸發(fā)事件。服務器生命周期和服務基本一致,詳情參考源碼剖析功能實現(xiàn) 作者:bromine鏈接:https://www.jianshu.com/p/4c0...來源:簡書著作權歸作者所有,本文已獲得作者授權轉載,并對原文進行了重新的排版。S...
摘要:在中的應用官網源碼解讀號外號外歡迎大家我們開發(fā)組定了一個就線下聚一次的小目標上一篇源碼解讀反響還不錯不少同學推薦再加一篇講解一下中使用到的功能幫助大家開啟的實戰(zhàn)之旅服務器開發(fā)涉及到的相關技術領域的知識非常多不日積月累打好基礎是很難真正 date: 2017-12-14 21:34:51title: swoole 在 swoft 中的應用 swoft 官網: https://www.sw...
摘要:官網源碼解讀號外號外歡迎大家我們開發(fā)組定了一個就線下聚一次的小目標里面的框架算是非常重的了這里的重先不具體到性能層面主要是框架的設計思想和框架集成的服務讓框架可以既可以快速解決很多問題又可以輕松擴展中的框架有在應該無出其右了這次解讀的源碼 官網: https://www.swoft.org/源碼解讀: http://naotu.baidu.com/file/8... 號外號外, 歡迎大...
摘要:我們項目使用的是框架,所以我就想到用框架的定時器。,以及的結構注在定時器這塊使用到兩個一個是用于存儲任務的實例。 這兩天老大給了個需求想把商城熱點數據同步到redis緩存。我們項目使用的是swoft框架,所以我就想到用框架的Crontab定時器。但是在測試的時候發(fā)現(xiàn)把Table的size設置為1024時(實際上設置為任何大小都一樣,貼上swoole的解釋)發(fā)現(xiàn)內存溢出了 showImg...
閱讀 908·2023-04-25 18:51
閱讀 1863·2021-09-09 11:39
閱讀 3276·2019-08-30 15:53
閱讀 2090·2019-08-30 13:03
閱讀 1304·2019-08-29 16:17
閱讀 574·2019-08-29 11:33
閱讀 1878·2019-08-26 14:00
閱讀 2118·2019-08-26 13:41