摘要:因公司業務需要,最近在設計一個通用隊列功能模塊,主體要求兩大點用實現事務型消息隊列當然,主流的隊列服務可使用或者等,此處討論的是實現多進程消費隊列消息用實現事務型消息隊列消息隊列的作用有異步化解耦和消除峰值等。
因公司業務需要,最近在設計一個通用隊列功能模塊,主體要求兩大點:
用MySql實現事務型消息隊列(當然,主流的隊列服務可使用redis或者rabbitmq等,此處討論的是mysql實現)
php多進程消費隊列消息
用MySql實現事務型消息隊列消息隊列的作用有:異步化、解耦和消除峰值等。目前異步化對于我來說使用最頻繁,在很多業務場景下,我們可以將實時性要求較低的請求轉為異步處理,減小系統負載壓力,提高系統穩定性。在離線數據異步處理過程中,消息隊列要滿足以下要求:
消息不能丟失,即使在系統失敗的情況下。消息一旦被插入就一定會被至少處理一次(只被處理一次是最好的,但是實現起來有難度,所以只要求at-least-once semantic);
FIFO順序。(mysql id自增可滿足此特性。當然,可以設計特殊參數做特殊處理)
支持多生產者(mysql支持并發操作,支持此特點)
支持多消費者。每個消息只能被其中一個消費者處理(業務的處理需要考慮冪等性)。
以上是隊列實現的說明,具體用MySql實現事務型消息隊列可以參考文章
https://spockwangs.github.io/...
此次設計的表結構如下:
CREATE TABLE `comom_queue` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT "自增id", `type` tinyint(4) NOT NULL DEFAULT "0" COMMENT "隊列類型,代碼業務備注", `conn_id` int(11) NOT NULL DEFAULT "0" COMMENT "消費者標識", `param_content` text COMMENT "隊列入參", `callback` varchar(255) NOT NULL DEFAULT "" COMMENT "隊列消費回調函數", `status` tinyint(2) NOT NULL DEFAULT "0" COMMENT "0新建 1消費中 2成功 3失敗 4需重試", `create_time` int(11) NOT NULL DEFAULT "0" COMMENT "創建時間", `update_time` int(11) NOT NULL DEFAULT "0" COMMENT "狀態變更時間", `preexec_time` int(11) NOT NULL DEFAULT "0" COMMENT "預消費時間", `p_key` varchar(100) NOT NULL DEFAULT "" COMMENT "業務唯一標識key,查詢用", `mark` varchar(255) NOT NULL DEFAULT "" COMMENT "備注", PRIMARY KEY (`id`), KEY `indx_s` (`p_key`,`type`) USING BTREE, KEY `indx_exec` (`conn_id`,`status`) USING BTREE, KEY `indx_ty` (`type`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
說明下幾個字段的設計:
callback 隊列中不同的業務消息有不同的業務處理,利用callback值回調對應的業務方法
type 隊列業務類型,區分不同的業務,可用不同的消費者分開消費。在FIFO的特點外,可多帶帶開消費者對有特殊要求(消息優先級高)的業務消息進行消費
preexec_time 預消費時間,有的業務消息有消費時間要求,可設置出隊列時間
php多進程消費設計此次php多進程的實現依賴pcntl,posix擴展,讀者可自行檢查是否安裝了此拓展。queue隊列服務設計和實現包括以下功能點:
主進程和子進程的運行時間可配
主進程(master進程)創建和監聽子進程行為
創建定時器信號,主進程(master進程)定時監聽隊列信息,可用于消息堆積通知等
子進程(worker進程)消費消息
針對不同的業務消息可配置不同數量的子進程
各個業務子進程數可配置正常拉起數和最大進程數,根據隊列積壓情況,子進程動態啟動進程數(暫未實現,后續添加)
不多說了,直接看代碼,抽離出來的queue服務類代碼如下:
"process_num"] protected $child = []; // 子進程pid數組 protected $result = []; // 計算的結果 protected $overTime = 0; //主進程超時時間 protected $startTime; //主進程運行時間 protected $childOverTime = 3600; //子進程超時時間 protected $alarm_time = 2; public function __construct($process = [], $overTime = 0, $childOverTime = 3600) { if (!function_exists("pcntl_fork")) { die("pcntl_fork not existing"); } $this->process = $process; $this->overTime = $overTime; $this->childOverTime = $childOverTime; $this->startTime = time(); } /** * 設置子進程 */ public function setProcess($process) { $this->process = $process; } /** * 設置檢測時間間隔 單位s */ public function setAlarmTime($time){ $this->alarm_time = $time; } /** * fork 子進程 */ protected function forkProcess() { //循環創建每個type 的消費子進程 $process = $this->process; foreach($process as $key => $num) { for ($i = 0; $i < $num; $i++){ $this->forkOneProcess($key); } } return $this; } /** * 創建子進程操作 * @param $key * @return $this */ private function forkOneProcess($key) { $pid = pcntl_fork(); if ($pid == 0) { $id = getmypid(); $this->processDo($id, $key); exit(0); } else if ($pid > 0) { //記錄子進程信息 $childProcess = array( "pid" => $pid, "type" => $key, "create_time" => time() ); $this->child[$pid] = $childProcess; } return $this; } /** * 子進程做的事情,消費者 */ abstract protected function processDo($id, $key); /** * 隊列數量檢測 */ abstract protected function checkQueueNum(); /** * 等待子進程結束 */ protected function waiteProcess() { while(count($this->child)) { foreach($this->child as $pid => $item){ $res = pcntl_waitpid($pid,$status,WNOHANG); pcntl_signal_dispatch(); if ( -1 == $res || $res > 0 ) { unset($this->child[$pid]); echo "pid $pid 退出", PHP_EOL; //判斷主進程是否超時 未超時拉起新的子進程 $leftTime = time() - $this->startTime; if ($this->overTime > $leftTime){ $this->forkOneProcess($item["type"]); echo "創建新進程", PHP_EOL; } }//判斷子進程是否存在且超時,超過時限20分鐘則強制退出 elseif (posix_kill($pid, 0) && (time() - $item["create_time"] - 20*60) > $this->childOverTime){ posix_kill($pid, SIGUSR1); echo "pid $pid 退出2", PHP_EOL; } } } return $this; } /** * 隊列檢測 */ protected function timeHandler(){ $this->checkQueueNum(); pcntl_alarm($this->alarm_time); } /** * 啟動 */ public function runProcess() { //注冊信號 pcntl_signal(SIGALRM, array($this, "timeHandler")); pcntl_alarm($this->alarm_time); $leftTime = time() - $this->startTime; while(($this->overTime ==0 || $this->overTime > $leftTime)){ echo "新進程processlist", PHP_EOL; $this->forkProcess()->waiteProcess(); $leftTime = time() - $this->startTime; } } }
最后一個功能點:各個業務子進程數可配置正常拉起數和最大進程數,根據隊列積壓情況,子進程動態啟動進程數 暫未實現。目前的queue服務設計如上,請各位看官多多指教!
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/29768.html
閱讀 2352·2021-11-23 09:51
閱讀 2002·2021-10-14 09:43
閱讀 2768·2021-09-27 13:35
閱讀 1150·2021-09-22 15:54
閱讀 2503·2021-09-13 10:36
閱讀 3800·2019-08-30 15:56
閱讀 3410·2019-08-30 14:09
閱讀 1718·2019-08-30 12:57