摘要:方案介紹該方案出來的場(chǎng)景一天有一個(gè)業(yè)務(wù)需求,需要把我方的一些信息或訂單狀態(tài)等異步發(fā)起請(qǐng)求同步給第三方,這里就會(huì)出現(xiàn)定時(shí)時(shí)間和延遲時(shí)間消息的處理,考慮過很多消息隊(duì)列方案如云消息服務(wù)等。
方案介紹
方案架構(gòu)圖 mysql的任務(wù)隊(duì)列表該方案出來的場(chǎng)景:一天有一個(gè)業(yè)務(wù)需求,需要把我方的一些信息或訂單狀態(tài)等異步發(fā)起請(qǐng)求同步給第三方,這里就會(huì)出現(xiàn)定時(shí)時(shí)間和延遲時(shí)間消息的處理,考慮過很多消息隊(duì)列方案(如:rabbitmq、云消息服務(wù)等)。
不過最后公司定了因?yàn)樵摌I(yè)務(wù)流量很小,不用做那么麻煩。所以就直接出了這個(gè)方案
該方案在50條消息/s,應(yīng)該壓力不大,量大了就會(huì)出現(xiàn)一個(gè)消息延遲問題,如果不注重這個(gè)業(yè)務(wù)時(shí)間準(zhǔn)確性,該方案承載的秒級(jí)處理在1000內(nèi)應(yīng)該問題也不大
CREATE TABLE `open_queue` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `type` tinyint(4) unsigned NOT NULL DEFAULT "0" COMMENT "任務(wù)類型,可用于后續(xù)讀取不同任務(wù)失敗次數(shù)區(qū)分", `order_id` int(10) unsigned NOT NULL DEFAULT "0" COMMENT "業(yè)務(wù)綁定的訂單id等,根據(jù)自己自行設(shè)計(jì)業(yè)務(wù)id", `event_identity` varchar(30) NOT NULL DEFAULT "" COMMENT "事件表示分類,可不用,同type", `data` text NOT NULL COMMENT "需要處理的數(shù)據(jù),json格式化", `try` tinyint(4) unsigned NOT NULL DEFAULT "1" COMMENT "特定任務(wù)需要當(dāng)時(shí)就處理的次數(shù),而不是發(fā)起回調(diào)請(qǐng)求的任務(wù)", `again` tinyint(1) unsigned NOT NULL DEFAULT "0" COMMENT "0不是重試記錄 1重試記錄,失敗后發(fā)起任務(wù)未1,否者未0", `status` tinyint(1) unsigned NOT NULL DEFAULT "0" COMMENT "是否被執(zhí)行 1是 0否", `create_time` int(10) unsigned NOT NULL DEFAULT "0" COMMENT "任務(wù)創(chuàng)建的時(shí)間戳", `at_time` int(11) unsigned NOT NULL DEFAULT "0" COMMENT "任務(wù)時(shí)間的時(shí)間戳", `task_status` tinyint(1) NOT NULL DEFAULT "0" COMMENT "執(zhí)行結(jié)果 -1重復(fù)消息系統(tǒng)主動(dòng)取消 0未執(zhí)行 1執(zhí)行成功 2執(zhí)行失敗", `task_time` int(10) unsigned NOT NULL DEFAULT "0" COMMENT "執(zhí)行時(shí)間", PRIMARY KEY (`id`), KEY `IDX_EVENT` (`event_identity`), KEY `IDX_AT_TIME` (`at_time`) USING BTREE, KEY `IDX_ORDER` (`order_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;方案實(shí)現(xiàn)源碼
redis同步到mysql下方代碼都是偽代碼,大家自行根據(jù)思路自己設(shè)計(jì)業(yè)務(wù)
該步驟可以省略,因?yàn)槲曳疆?dāng)時(shí)redis是單機(jī)部署,也沒做持久化,
$redisTypeLock; //lock類型 if (!IS_WIN) { $file = fopen($path . "/lock/{$redisTypeLock}.lock","w+"); if (!flock($file, LOCK_EX | LOCK_NB)) { flock($file,LOCK_UN); fclose($file); exit; } } //成功獲得鎖 開始業(yè)務(wù)執(zhí)行 $count = get_redis_lLen("msgevent:" . $redisType); if (!$count) { //無需要入數(shù)據(jù)庫(kù)訂單隊(duì)列,直接返回 return false; } $successCount = 0; for ($i = 0; $i < $loopLen; $i++) { $data = get_redis_lPop("msgevent:" . $redisType); //把redis數(shù)據(jù)格式化存入到數(shù)據(jù)庫(kù)持久化 $result = $this->saveToMysql($data); if (!$result) { //如果執(zhí)行失敗,重新推入redis隊(duì)列 get_redis_lPush("msgevent:" . $redisType, $data); } $successCount++; } if (!IS_WIN) { flock($file, LOCK_UN); fclose($file); }定時(shí)執(zhí)行任務(wù)的的入口
這個(gè)入口是定時(shí)秒級(jí)處理腳本和處理小于當(dāng)前時(shí)間的腳本調(diào)用的入口
public function task_run() { $timeType = $_GET["tt"] ? : "now"; //獲取當(dāng)前秒需要處理的所有消息, 依次路由后執(zhí)行 //如果你redis無需存入mysql,你可以redis的list結(jié)構(gòu)實(shí)現(xiàn), 一次全部取出數(shù)據(jù)集合,并刪除list //如果接下去業(yè)務(wù)里發(fā)現(xiàn)發(fā)起請(qǐng)求失敗了,重新把任務(wù)分配給redis建立恢復(fù)list,list名為t+需要什么時(shí)間執(zhí)行的時(shí)間戳 $lists = $this->getTaskLists($timeType); foreach ($lists as $item) { $this->task("event:" . $item["event_type"]); //$this->task("event:demo1"); //$this->task("event:demo2"); //$this->task("event:demo3"); } }實(shí)際處理業(yè)務(wù)(發(fā)起通知請(qǐng)求)
# 模擬一個(gè)發(fā)起的請(qǐng)求事件demo private function pushDemo1Event($info) { $sendInfoData = $this->getSendInfoFromInfo($info); //發(fā)起請(qǐng)求,這里做的真正的發(fā)起給對(duì)方的請(qǐng)求,你可以根據(jù)$sendInfo拼接各種事件或消息分類給對(duì)方 $isSuccess = $this->sendInfo($sendInfoData); $data = array( "status" => 1, "task_status" => $isSuccess ? 1 : 2, "task_time" => NOW_TIME ); $db->save($data); //如果執(zhí)行失敗,重新插入一條記錄,并把a(bǔ)t_time生成下次執(zhí)行的時(shí)間戳,這樣定時(shí)器根據(jù)at_time字段可以取出判斷執(zhí)行到當(dāng)時(shí)的秒級(jí) if (!$isSuccess) { $tryCount = $db->where(...)->count(); if ($tryCount >= 5) { return false; } $this->newTask($info, $tryCount, 30); } } private function newTask($info, $tryCount, $timeout = 30) { $newTime = $this->nextTaskTime($info, $timeout); $data = array( ... "at_time" => newTime //執(zhí)行的時(shí)間(時(shí)間戳) ); $db->add($data); }定時(shí)消費(fèi)方
$timeType = $_GET["time_type"] ? : "now"; $lists = $this->getMsgAll($timeType); if (!$lists) { return false; } foreach ($lists as $item) { if ($item["type"] == "1") { //根據(jù)業(yè)務(wù)生成我方真實(shí)訂單 call_user_func_array(array($this, "add_real_order"), array($item)); } if (in_array($item["type"], array(2,3))) { $this->push("push:toengineer", $item); //指派或取消工程師 } if ($item["type"] == "5") { $this->push("push:edit", $item); //編輯 } if ($item["type"] == "6") { $this->push("push:cancel", $item); //取消 } if ($item["type"] == "7" || $item["type"] == "8" || $item["type"] == "9") { $this->push("push:status", $item); //簡(jiǎn)單訂單狀態(tài) } }借助crontab+shell外力實(shí)現(xiàn)秒級(jí)執(zhí)行
讀取redis任務(wù)隊(duì)列到mysql存儲(chǔ)因?yàn)閏rontab是最小單位是分鐘,所以需要借助shell腳本來實(shí)現(xiàn)秒級(jí)執(zhí)行
#!/bin/bash cd /web/project/src for (( i = 1; i < 60; i = i + 1 )) do #執(zhí)行php直接返回,不會(huì)阻塞 不要忘記最后面的 & $(/usr/bin/php index.php redis2mysql > /dev/null 2>&1 &) sleep 1 done exit 0處理秒級(jí)時(shí)間sh腳本
#!/bin/bash cd /web/project/src for (( i = 1; i < 60; i = i + 1 )) do $(/usr/bin/php index.php task_run > /dev/null 2>&1 &) sleep 1 done exit 0處理秒級(jí)小于當(dāng)前時(shí)間sh腳本
#!/bin/bash cd /web/project/src for (( i = 1; i < 60; i = i + 1 )) do $(/usr/bin/php index.php task_run/tt/lt > /dev/null 2>&1 &) sleep 1 done exit 0linux下的crontab
* * * * * sh /web/project/src/shell/repair_build_order.sh * * * * * sh /web/project/src/shell/repair_sync_second.sh * * * * * sh /web/project/src/shell/repair_sync_lt_time.sh
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/26001.html
摘要:的毫秒級(jí)超時(shí)也有問題。。中超時(shí)實(shí)現(xiàn)一初級(jí)最簡(jiǎn)單的超時(shí)實(shí)現(xiàn)秒級(jí)超時(shí)思路很簡(jiǎn)單鏈接一個(gè)后端,然后設(shè)置為非阻塞模式,如果沒有連接上就一直循環(huán),判斷當(dāng)前時(shí)間和超時(shí)時(shí)間之間的差異。實(shí)際處理這個(gè)調(diào)用的部件在完成后,通過狀態(tài)通知和回調(diào)來通知調(diào)用者。 概述 在PHP開發(fā)中工作里非常多使用到超時(shí)處理到超時(shí)的場(chǎng)合,我說幾個(gè)場(chǎng)景: 異步獲取數(shù)據(jù)如果某個(gè)后端數(shù)據(jù)源獲取不成功則跳過,不影響整個(gè)頁面展現(xiàn) 為了保...
摘要:在軟件項(xiàng)目中,定時(shí)器也被應(yīng)用到了各方各面,本文將從項(xiàng)目入手,講述定時(shí)器,本文的例子都以為例。定時(shí)器總類定時(shí)器有兩種對(duì)應(yīng)重復(fù)任務(wù)和一次性任務(wù)。 在大規(guī)模分布式系統(tǒng)中,每個(gè)業(yè)務(wù)都可能是集群,每個(gè)業(yè)務(wù)機(jī)都會(huì)產(chǎn)生定時(shí)任務(wù),不同的業(yè)務(wù)會(huì)有不同的任務(wù)管理需求,統(tǒng)一的任務(wù)調(diào)度和管理變得非常有必要。 定時(shí)如何準(zhǔn)確,大量的定時(shí)被同時(shí)觸發(fā)怎么辦? 定時(shí)結(jié)束的時(shí)候,怎么通知業(yè)務(wù)機(jī)去處理呢? 某臺(tái)業(yè)務(wù)機(jī)下線...
摘要:在軟件項(xiàng)目中,定時(shí)器也被應(yīng)用到了各方各面,本文將從項(xiàng)目入手,講述定時(shí)器,本文的例子都以為例。定時(shí)器總類定時(shí)器有兩種對(duì)應(yīng)重復(fù)任務(wù)和一次性任務(wù)。 在大規(guī)模分布式系統(tǒng)中,每個(gè)業(yè)務(wù)都可能是集群,每個(gè)業(yè)務(wù)機(jī)都會(huì)產(chǎn)生定時(shí)任務(wù),不同的業(yè)務(wù)會(huì)有不同的任務(wù)管理需求,統(tǒng)一的任務(wù)調(diào)度和管理變得非常有必要。 定時(shí)如何準(zhǔn)確,大量的定時(shí)被同時(shí)觸發(fā)怎么辦? 定時(shí)結(jié)束的時(shí)候,怎么通知業(yè)務(wù)機(jī)去處理呢? 某臺(tái)業(yè)務(wù)機(jī)下線...
摘要:諸如此類,隊(duì)列的應(yīng)用范圍是如此之廣。方案抽象到更高一層,開發(fā)一套通用異步處理隊(duì)列適用于任何復(fù)雜的業(yè)務(wù)邏輯那么,作為架構(gòu)師,使用隊(duì)列的做法,將抽象層和業(yè)務(wù)層分離,可具有良好的擴(kuò)展性和可維護(hù)性。 一、隊(duì)列使用場(chǎng)景:為什么需要隊(duì)列 在web開發(fā)中,我們經(jīng)常會(huì)遇到需要處理批量任務(wù)的時(shí)候,這些批量任務(wù)可能是用戶提交的,也可能是當(dāng)系統(tǒng)被某個(gè)事件觸發(fā)時(shí)需要進(jìn)行批量處理的,面對(duì)這樣的任務(wù),如果是用戶提...
閱讀 3406·2021-11-25 09:43
閱讀 3464·2021-11-19 09:40
閱讀 2464·2021-10-14 09:48
閱讀 1283·2021-09-09 11:39
閱讀 1920·2019-08-30 15:54
閱讀 2821·2019-08-30 15:44
閱讀 1994·2019-08-29 13:12
閱讀 1543·2019-08-29 12:59