摘要:支持消息刪除業務使用方,可以隨時刪除指定消息。消息傳輸可靠性消息進入到延遲隊列后,保證至少被消費一次。
延遲隊列,顧名思義它是一種帶有延遲功能的消息隊列。 那么,是在什么場景下我才需要這樣的隊列呢?
一、背景先看看一下業務場景:
1.會員過期前3天發送召回通知
2.訂單支付成功后,5分鐘后檢測下游環節是否都正常,比如用戶購買會員后,各種會員狀態是否都設置成功
3.如何定期檢查處于退款狀態的訂單是否已經退款成功?
4.實現通知失敗,1,3,5,7分鐘重復通知,直到對方回復?
通常解決以上問題,最簡單直接的辦法就是定時去掃表。
掃表存在的問題是:
1.掃表與數據庫長時間連接,在數量量大的情況容易出現連接異常中斷,需要更多的異常處理,對程序健壯性要求高
2.在數據量大的情況下延時較高,規定內處理不完,影響業務,雖然可以啟動多個進程來處理,這樣會帶來額外的維護成本,不能從根本上解決。
3.每個業務都要維護一個自己的掃表邏輯。 當業務越來越多時,發現掃表部分的邏輯會重復開發,但是非常類似
延時隊列能對于上述需求能很好的解決
二、調研調研了市場上一些開源的方案,以下:
1.有贊科技:只有原理,沒有開源代碼
2.github個人的:https://github.com/ouqiang/de...
1.基于redis實現,redis只能配置一個,如果redis掛了整個服務不可用,可用性差點
2.消費端實現的是拉模式,接入成本大,每個項目都得去實現一遍接入代碼
3.在star使用的人數不多,放在生產環境,存在風險,加之對go語言不了解,出了問題難以維護
3.SchedulerX-阿里開源的: 功能很強大,但是運維復雜,依賴組件多,不夠輕量
4.RabbitMQ-延時任務: 本身沒有延時功能,需要借助一特性自己實現,而且公司沒有部署這個隊列,去多帶帶部署一個這個來做延時隊列成本有點高,而且還需要專門的運維來維護,目前團隊不支持
基本以上原因打算自己寫一個,平常使用php多,項目基本redis的zset結構作為存儲,用php語言實現 ,實現原理參考了有贊團隊:https://tech.youzan.com/queui...
三、目標輕量級:有較少的php的拓展就能直接運行,不需要引入網絡框架,比如swoole,workman之類的
穩定性:采用master-work架構,master不做業務處理,只負責管理子進程,子進程異常退出時自動拉起
可用性:
1.支持多實例部署,每個實例無狀態,一個實例掛掉不影響服務
2.支持配置多個redis,一個redis掛了只影響部分消息
3.業務方接入方便,在后臺只需填寫相關消息類型和回掉接口
拓展性: 當消費進程存在瓶頸時,可以配置加大消費進程數,當寫入存在瓶頸時,可增加實例數寫入性能可線性提高
實時性:允許存在一定的時間誤差。
支持消息刪除:業務使用方,可以隨時刪除指定消息。
消息傳輸可靠性:消息進入到延遲隊列后,保證至少被消費一次。
寫入性能:qps>1000+
四、架構設計與說明總體架構
采用master-work架構模式,主要包括6個模塊:
1.dq-mster: 主進程,負責管理子進程的創建,銷毀,回收以及信號通知
2.dq-server: 負責消息寫入,讀取,刪除功能以及維護redis連接池
3.dq-timer-N: 負責從redis的zset結構中掃描到期的消息,并負責寫入ready 隊列,個數可配置,一般2個就行了,因為消息在zset結構是按時間有序的
4.dq-consume-N: 負責從ready隊列中讀取消息并通知給對應回掉接口,個數可配置
5.dq-redis-checker: 負責檢查redis的服務狀態,如果redis宕機,發送告警郵件
6.dq-http-server: 提供web后臺界面,用于注冊topic
五、部署環境依賴:PHP 5.4+ 安裝sockets,redis,pcntl,pdo_mysql 拓展
create database dq; #存放告警信息 CREATE TABLE `dq_alert` ( `id` int(11) NOT NULL AUTO_INCREMENT, `host` varchar(255) NOT NULL DEFAULT "", `port` int(11) NOT NULL DEFAULT "0", `user` varchar(255) NOT NULL DEFAULT "", `pwd` varchar(255) NOT NULL DEFAULT "", `ext` varchar(2048) NOT NULL DEFAULT "", PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; #存放redis信息 CREATE TABLE `dq_redis` ( `id` int(11) NOT NULL AUTO_INCREMENT, `t_name` varchar(200) NOT NULL DEFAULT "", `t_content` varchar(2048) NOT NULL DEFAULT "", PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8; #存儲注冊信息 CREATE TABLE `dq_topic` ( `id` int(11) NOT NULL AUTO_INCREMENT, `t_name` varchar(1024) NOT NULL DEFAULT "", `delay` int(11) NOT NULL DEFAULT "0", `callback` varchar(1024) NOT NULL DEFAULT "", `timeout` int(11) NOT NULL DEFAULT "3000", `email` varchar(1024) NOT NULL DEFAULT "", `topic` varchar(255) NOT NULL DEFAULT "", `createor` varchar(1024) NOT NULL DEFAULT "", `status` tinyint(4) NOT NULL DEFAULT "1", `method` varchar(32) NOT NULL DEFAULT "GET", PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
在DqConf.php文件中修改php了路徑 $logPath
命令:
php DqHttpServer.php --port 8088
訪問:http://127.0.0.1:8088,出現配置界面
redis信息格式:host:post:auth 比如 127.0.0.1:6379:12345
php DqInit.php --port 6789
看到如下信息說明啟動成功
addServer($server); $topic ="order_openvip_checker"; //topic在后臺注冊 $id = uniqid(); $data=array( "id"=>$id, "body"=>array( "a"=>1, "b"=>2, "c"=>3, "ext"=>str_repeat("a",64), ), //可選,設置后以這個通知時間為準,默認延時時間在注冊topic的時候指定 "fix_time"=>date("Y-m-d 23:50:50"), ); //添加 $boolRet = $dqClient->add($topic, $data); echo "add耗時:".(msectime() - $time)."ms "; //查詢 $time = msectime(); $result = $dqClient->get($topic, $id); echo "get耗時:".(msectime() - $time)."ms "; //刪除 $time = msectime(); $boolRet = $dqClient->del($topic,$id); echo "del耗時:".(msectime() - $time)."ms ";
執行php test.php
默認日志目錄在項目目錄的logs目錄下,在DqConf.php修改$logPath
1.請求日志:request_ymd.txt
2.通知日志:notify_ymd.txt
3.錯誤日志:err_ymd.txt
1.系統會自動檢測配置文件新,如果有改動,會自動退出(沒有找到較好的熱更新的方案),需要重啟,可以在crontab里面建個任務,1分鐘執行一次,程序有check_self的判斷
2.優雅退出命令: master檢測偵聽了USR2信號,收到信號后會通知所有子進程,子進程完成當前任務后會自動退出
ps -ef | grep dq-master| grep -v grep | head -n 1 | awk "{print $2}" | xargs kill -USR2六、性能測試
需要安裝pthreads拓展:
測試原理:使用多線程模擬并發,在1s內能成功返回請求成功的個數
php DqBench concurrency requests concurrency:并發數 requests: 每個并發產生的請求數 測試環境:內存 8G ,8核cpu,2個redis和1個dq-server 部署在一個機器上,數據包64字節 qps:2400七、值得一提的性能優化點:
1.redis multi命令:將多個對redis的操作打包成一個減少網絡開銷
2.計數的操作異步處理,在異步邏輯里面用函數的static變量來保存,當寫入redis成功后釋放static變量,可以在redis出現異常時計數仍能保持一致,除非進程退出
3.內存泄露檢測有必要: 所有的內存分配在底層都是調用了brk或者mmap,只要程序只有大量brk或者mmap的系統調用,內存泄露可能性非常高 ,檢測命令: strace -c -p pid | grep "mmap| brk"
4.檢測程序的系統調用情況:strace -c -p pid ,發現某個系統函數調用是其他的數倍,可能大概率程序存在問題
八、異常處理如果調用通知接口在超時時間內,沒有收到回復認為通知失敗,系統會重新把數據放入隊列,重新通知,系統默認最大通知10次(可以在Dqconf.php文件中修改$notify_exp_nums)通知間隔為2n+1,比如第一次1分鐘,通知失敗,第二次3分鐘后,直到收到回復,超出最大通知次數后系統自動丟棄,同時發郵件通知
ps:網絡抖動在所難免,通知接口如果涉及到核心的服務,一定要保證冪等!!
九、線上情況線上部署了兩個實例每個機房部一個,4個redis作存儲,服務穩定運行數月,各項指標均符合預期
主要接入業務:
訂單10分鐘召回通知
接口超時或者失敗補償
項目地址: https://github.com/chenlinzho...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/29595.html
摘要:背景當下視頻直播如此紅火,打造一個在線直播間涉及到哪些技術呢視頻直播由主播的直播端以及觀眾的觀看端組成。保持心跳斷開重連快速搭建在線直播間按前文所述,搭建直播間有非常多的細節需要考慮,包括采集推流分發播放體驗優化聊天室性能調優等。 背景 當下視頻直播如此紅火,打造一個在線直播間涉及到哪些技術呢? 視頻直播由主播的直播端以及觀眾的觀看端組成。一個簡單的觀看端最起碼應包含播放器以及聊天室。...
摘要:本文將會講解如何使用實現延時重試和失敗消息隊列,實現可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當達到一定的重試次數后,將消息投遞到失敗消息隊列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發的開源消息隊列。本文假設讀者對RabbitMQ是什么已經有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網的 RabbitMQ Tutorials 入門...
摘要:本文將會講解如何使用實現延時重試和失敗消息隊列,實現可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當達到一定的重試次數后,將消息投遞到失敗消息隊列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發的開源消息隊列。本文假設讀者對RabbitMQ是什么已經有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網的 RabbitMQ Tutorials 入門...
閱讀 3996·2021-11-18 13:22
閱讀 1823·2021-11-17 09:33
閱讀 2882·2021-09-26 09:46
閱讀 1213·2021-08-21 14:11
閱讀 2891·2019-08-30 15:53
閱讀 2710·2019-08-30 15:52
閱讀 1894·2019-08-30 10:52
閱讀 1521·2019-08-29 15:30