摘要:場景說明用于處理比較耗時的請求,例如批量發送郵件,如果直接在網頁觸發執行發送,程序會出現超時高并發場景,當某個時刻請求瞬間增加時,可以把請求寫入到隊列,后臺在去處理這些請求搶購場景,先入先出的模式命令或往列表右側推入數據客戶端阻塞直到隊列有
場景說明:
用于處理比較耗時的請求,例如批量發送郵件,如果直接在網頁觸發執行發送,程序會出現超時
高并發場景,當某個時刻請求瞬間增加時,可以把請求寫入到隊列,后臺在去處理這些請求
搶購場景,先入先出的模式
命令:rpush + blpop 或 lpush + brpop
rpush : 往列表右側推入數據簡單隊列: simple.php
blpop : 客戶端阻塞直到隊列有值輸出
$stmt = $pdo->prepare("select id, cid, name from zc_goods limit 200000"); $stmt->execute(); while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->rPush("goods:task", json_encode($row)); } $redis->close();
獲取20000萬個商品,并把json化后的數據推入goods:task隊列
queueBlpop.php// 出隊 while (true) { // 阻塞設置超時時間為3秒 $task = $redis->blPop(array("goods:task"), 3); if ($task) { $redis->rPush("goods:success:task", $task[1]); $task = json_decode($task[1], true); echo $task["id"] . ":" . $task["cid"] . ":" . "handle success"; echo PHP_EOL; } else { echo "nothing" . PHP_EOL; sleep(5); } }
設置blpop阻塞時間為3秒,當有數據出隊時保存到goods:success:task表示執行成功,當隊列沒有數據時,程序睡眠10秒重新檢查goods:task是否有數據出隊
php simple.php php queueBlpop.php優先級隊列
blpop 有多個鍵時,blpop會從左至右遍歷鍵,一旦一個鍵能彈出元素,客戶端立即返回。例如:
blpop key1 key2 key3 key4
從key1到key4遍歷,如果哪個key有值,則彈出這個值,若多個key同時有值時,優先彈出排在左邊的key。
priority.php// 設置優先級隊列 $high = "goods:high:task"; $mid = "goods:mid:task"; $low = "goods:low:task"; $stmt = $pdo->prepare("select id, cid, name from zc_goods limit 200000"); $stmt->execute(); while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { // cid 小于100放在低級隊列 if ($row["cid"] < 100) { $redis->rPush($low, json_encode($row)); } // cid 100到600之間放在中級隊列 elseif ($row["cid"] > 100 && $row["cid"] < 600) { $redis->rPush($mid, json_encode($row)); } // cid 大于600放在高級隊列 else { $redis->rPush($high, json_encode($row)); } } $redis->close();priorityBlop.php
// 優先級隊列 $high = "goods:high:task"; $mid = "goods:mid:task"; $low = "goods:low:task"; // 出隊 while(true){ // 優先級高的隊列放在左側 $task = $redis->blPop(array($high, $mid, $low), 3); if ($task) { $task = json_decode($task[1], true); echo $task["id"] . ":" . $task["cid"] . ":" . "handle success"; echo PHP_EOL; } else { echo "nothing" . PHP_EOL; sleep(5); } }
優先級高的隊列放在blpop命令左側,依次排序,blpop命令會依次彈出high, mid, low隊列的值
php priority.php php priorityBlpop.php延遲隊列
可以用一個有序集合來保存延遲任務,member保存任務內容,score保存(當前時間 + 延時時間)。用時間作為score。程序只要用有序集合的第一條任務的score和當前時間做比較,如果當前時間比score小,說明有序集合的所有任務還沒到執行時間。
delay.php$stmt = $pdo->prepare("select id, cid, name from zc_goods limit 200000"); $stmt->execute(); while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->zAdd("goods:delay:task", time() + rand(1, 300), json_encode($row)); }
將20萬條任務導入有序集合goods:delay:task,所有任務延遲到之后的1秒到300秒內執行
delayHandle.phpwhile (true) { // 因為是有序集合,只要判斷第一條記錄的延時時間,例如第一條未到執行時間 // 相對說明集合的其他任務未到執行時間 $rs = $redis->zRange("goods:delay:task", 0, 0, true); // 集合沒有任務,睡眠時間設置為5秒 if (empty($rs)) { echo "no tasks , sleep 5 seconds" . PHP_EOL; sleep(5); continue; } $taskJson = key($rs); $delay = $rs[$taskJson]; $task = json_decode($taskJson, true); $now = time(); // 到時間執行延時任務 if ($delay <= $now) { // 對當前任務加鎖,避免移動移動延時任務到任務隊列時被其他客戶端修改 if (!($identifier = acquireLock($task["id"]))) { continue; } // 移動延時任務到任務隊列 $redis->zRem("goods:delay:task", $taskJson); $redis->rPush("goods:task", $taskJson); echo $task["id"] . " run " . PHP_EOL; // 釋放鎖 releaseLock($task["id"], $identifier); } else { // 延時任務未到執行時間 $sleep = $delay - $now; // 最大值設置為2秒,保證如果有新的任務(延時時間1秒)進入集合時能夠及時的被處理 // $sleep = $sleep > 2 ? 2 :$sleep; echo "wait " . $sleep . " seconds " . PHP_EOL; sleep($sleep); } }
這個文件對有序集合內的延遲任務做處理,如果延遲任務到了執行時間,則把延遲任務移動到任務隊列中
queueBlpop.php// 出隊 while (true) { // 阻塞設置超時時間為3秒 $task = $redis->blPop(array("goods:task"), 3); if ($task) { $redis->rPush("goods:success:task", $task[1]); $task = json_decode($task[1], true); echo $task["id"] . ":" . $task["cid"] . ":" . "handle success"; echo PHP_EOL; } else { echo "nothing" . PHP_EOL; sleep(5); } }
處理任務隊列中的任務
php delay.php php delayHanlde.php php queueBlpop.php
完整代碼:https://github.com/wuzhc/demo...
相關項目: https://github.com/wuzhc/gmq
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/70394.html
摘要:如果任務沒有在規定時間內完成,那么該有序集合的任務將會被重新放入隊列中。這兩個進程操縱了三個隊列,其中一個,負責即時任務,兩個,負責延時任務與待處理任務。如果任務執行成功,就會刪除中的任務,否則會被重新放入隊列中。 在實際的項目開發中,我們經常會遇到需要輕量級隊列的情形,例如發短信、發郵件等,這些任務不足以使用 kafka、RabbitMQ 等重量級的消息隊列,但是又的確需要異步、重試...
摘要:配置項用于配置失敗隊列任務存放的數據庫及數據表。要使用隊列驅動,需要在配置文件中配置數據庫連接。如果應用使用了,那么可以使用時間或并發來控制隊列任務。你可以使用命令運行這個隊列進程。如果隊列進程意外關閉,它會自動重啟啟動隊列進程。 一、概述 在Web開發中,我們經常會遇到需要批量處理任務的場景,比如群發郵件、秒殺資格獲取等,我們將這些耗時或者高并發的操作放到隊列中異步執行可以有效緩解系...
閱讀 1871·2021-11-25 09:43
閱讀 3161·2021-11-15 11:38
閱讀 2708·2019-08-30 13:04
閱讀 483·2019-08-29 11:07
閱讀 1492·2019-08-26 18:37
閱讀 2697·2019-08-26 14:07
閱讀 583·2019-08-26 13:52
閱讀 2278·2019-08-26 12:09