摘要:場(chǎng)景說明用于處理比較耗時(shí)的請(qǐng)求,例如批量發(fā)送郵件,如果直接在網(wǎng)頁觸發(fā)執(zhí)行發(fā)送,程序會(huì)出現(xiàn)超時(shí)高并發(fā)場(chǎng)景,當(dāng)某個(gè)時(shí)刻請(qǐng)求瞬間增加時(shí),可以把請(qǐng)求寫入到隊(duì)列,后臺(tái)在去處理這些請(qǐng)求搶購場(chǎng)景,先入先出的模式命令或往列表右側(cè)推入數(shù)據(jù)客戶端阻塞直到隊(duì)列有
場(chǎng)景說明:
用于處理比較耗時(shí)的請(qǐng)求,例如批量發(fā)送郵件,如果直接在網(wǎng)頁觸發(fā)執(zhí)行發(fā)送,程序會(huì)出現(xiàn)超時(shí)
高并發(fā)場(chǎng)景,當(dāng)某個(gè)時(shí)刻請(qǐng)求瞬間增加時(shí),可以把請(qǐng)求寫入到隊(duì)列,后臺(tái)在去處理這些請(qǐng)求
搶購場(chǎng)景,先入先出的模式
命令:rpush + blpop 或 lpush + brpop
rpush : 往列表右側(cè)推入數(shù)據(jù)簡(jiǎn)單隊(duì)列: simple.php
blpop : 客戶端阻塞直到隊(duì)列有值輸出
$stmt = $pdo->prepare("select id, cid, name from zc_goods limit 200000"); $stmt->execute(); while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->rPush("goods:task", json_encode($row)); } $redis->close();
獲取20000萬個(gè)商品,并把json化后的數(shù)據(jù)推入goods:task隊(duì)列
queueBlpop.php// 出隊(duì) while (true) { // 阻塞設(shè)置超時(shí)時(shí)間為3秒 $task = $redis->blPop(array("goods:task"), 3); if ($task) { $redis->rPush("goods:success:task", $task[1]); $task = json_decode($task[1], true); echo $task["id"] . ":" . $task["cid"] . ":" . "handle success"; echo PHP_EOL; } else { echo "nothing" . PHP_EOL; sleep(5); } }
設(shè)置blpop阻塞時(shí)間為3秒,當(dāng)有數(shù)據(jù)出隊(duì)時(shí)保存到goods:success:task表示執(zhí)行成功,當(dāng)隊(duì)列沒有數(shù)據(jù)時(shí),程序睡眠10秒重新檢查goods:task是否有數(shù)據(jù)出隊(duì)
php simple.php php queueBlpop.php優(yōu)先級(jí)隊(duì)列
blpop 有多個(gè)鍵時(shí),blpop會(huì)從左至右遍歷鍵,一旦一個(gè)鍵能彈出元素,客戶端立即返回。例如:
blpop key1 key2 key3 key4
從key1到key4遍歷,如果哪個(gè)key有值,則彈出這個(gè)值,若多個(gè)key同時(shí)有值時(shí),優(yōu)先彈出排在左邊的key。
priority.php// 設(shè)置優(yōu)先級(jí)隊(duì)列 $high = "goods:high:task"; $mid = "goods:mid:task"; $low = "goods:low:task"; $stmt = $pdo->prepare("select id, cid, name from zc_goods limit 200000"); $stmt->execute(); while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { // cid 小于100放在低級(jí)隊(duì)列 if ($row["cid"] < 100) { $redis->rPush($low, json_encode($row)); } // cid 100到600之間放在中級(jí)隊(duì)列 elseif ($row["cid"] > 100 && $row["cid"] < 600) { $redis->rPush($mid, json_encode($row)); } // cid 大于600放在高級(jí)隊(duì)列 else { $redis->rPush($high, json_encode($row)); } } $redis->close();priorityBlop.php
// 優(yōu)先級(jí)隊(duì)列 $high = "goods:high:task"; $mid = "goods:mid:task"; $low = "goods:low:task"; // 出隊(duì) while(true){ // 優(yōu)先級(jí)高的隊(duì)列放在左側(cè) $task = $redis->blPop(array($high, $mid, $low), 3); if ($task) { $task = json_decode($task[1], true); echo $task["id"] . ":" . $task["cid"] . ":" . "handle success"; echo PHP_EOL; } else { echo "nothing" . PHP_EOL; sleep(5); } }
優(yōu)先級(jí)高的隊(duì)列放在blpop命令左側(cè),依次排序,blpop命令會(huì)依次彈出high, mid, low隊(duì)列的值
php priority.php php priorityBlpop.php延遲隊(duì)列
可以用一個(gè)有序集合來保存延遲任務(wù),member保存任務(wù)內(nèi)容,score保存(當(dāng)前時(shí)間 + 延時(shí)時(shí)間)。用時(shí)間作為score。程序只要用有序集合的第一條任務(wù)的score和當(dāng)前時(shí)間做比較,如果當(dāng)前時(shí)間比score小,說明有序集合的所有任務(wù)還沒到執(zhí)行時(shí)間。
delay.php$stmt = $pdo->prepare("select id, cid, name from zc_goods limit 200000"); $stmt->execute(); while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->zAdd("goods:delay:task", time() + rand(1, 300), json_encode($row)); }
將20萬條任務(wù)導(dǎo)入有序集合goods:delay:task,所有任務(wù)延遲到之后的1秒到300秒內(nèi)執(zhí)行
delayHandle.phpwhile (true) { // 因?yàn)槭怯行蚣希灰袛嗟谝粭l記錄的延時(shí)時(shí)間,例如第一條未到執(zhí)行時(shí)間 // 相對(duì)說明集合的其他任務(wù)未到執(zhí)行時(shí)間 $rs = $redis->zRange("goods:delay:task", 0, 0, true); // 集合沒有任務(wù),睡眠時(shí)間設(shè)置為5秒 if (empty($rs)) { echo "no tasks , sleep 5 seconds" . PHP_EOL; sleep(5); continue; } $taskJson = key($rs); $delay = $rs[$taskJson]; $task = json_decode($taskJson, true); $now = time(); // 到時(shí)間執(zhí)行延時(shí)任務(wù) if ($delay <= $now) { // 對(duì)當(dāng)前任務(wù)加鎖,避免移動(dòng)移動(dòng)延時(shí)任務(wù)到任務(wù)隊(duì)列時(shí)被其他客戶端修改 if (!($identifier = acquireLock($task["id"]))) { continue; } // 移動(dòng)延時(shí)任務(wù)到任務(wù)隊(duì)列 $redis->zRem("goods:delay:task", $taskJson); $redis->rPush("goods:task", $taskJson); echo $task["id"] . " run " . PHP_EOL; // 釋放鎖 releaseLock($task["id"], $identifier); } else { // 延時(shí)任務(wù)未到執(zhí)行時(shí)間 $sleep = $delay - $now; // 最大值設(shè)置為2秒,保證如果有新的任務(wù)(延時(shí)時(shí)間1秒)進(jìn)入集合時(shí)能夠及時(shí)的被處理 // $sleep = $sleep > 2 ? 2 :$sleep; echo "wait " . $sleep . " seconds " . PHP_EOL; sleep($sleep); } }
這個(gè)文件對(duì)有序集合內(nèi)的延遲任務(wù)做處理,如果延遲任務(wù)到了執(zhí)行時(shí)間,則把延遲任務(wù)移動(dòng)到任務(wù)隊(duì)列中
queueBlpop.php// 出隊(duì) while (true) { // 阻塞設(shè)置超時(shí)時(shí)間為3秒 $task = $redis->blPop(array("goods:task"), 3); if ($task) { $redis->rPush("goods:success:task", $task[1]); $task = json_decode($task[1], true); echo $task["id"] . ":" . $task["cid"] . ":" . "handle success"; echo PHP_EOL; } else { echo "nothing" . PHP_EOL; sleep(5); } }
處理任務(wù)隊(duì)列中的任務(wù)
php delay.php php delayHanlde.php php queueBlpop.php
完整代碼:https://github.com/wuzhc/demo...
相關(guān)項(xiàng)目: https://github.com/wuzhc/gmq
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/40821.html
摘要:如果任務(wù)沒有在規(guī)定時(shí)間內(nèi)完成,那么該有序集合的任務(wù)將會(huì)被重新放入隊(duì)列中。這兩個(gè)進(jìn)程操縱了三個(gè)隊(duì)列,其中一個(gè),負(fù)責(zé)即時(shí)任務(wù),兩個(gè),負(fù)責(zé)延時(shí)任務(wù)與待處理任務(wù)。如果任務(wù)執(zhí)行成功,就會(huì)刪除中的任務(wù),否則會(huì)被重新放入隊(duì)列中。 在實(shí)際的項(xiàng)目開發(fā)中,我們經(jīng)常會(huì)遇到需要輕量級(jí)隊(duì)列的情形,例如發(fā)短信、發(fā)郵件等,這些任務(wù)不足以使用 kafka、RabbitMQ 等重量級(jí)的消息隊(duì)列,但是又的確需要異步、重試...
摘要:配置項(xiàng)用于配置失敗隊(duì)列任務(wù)存放的數(shù)據(jù)庫及數(shù)據(jù)表。要使用隊(duì)列驅(qū)動(dòng),需要在配置文件中配置數(shù)據(jù)庫連接。如果應(yīng)用使用了,那么可以使用時(shí)間或并發(fā)來控制隊(duì)列任務(wù)。你可以使用命令運(yùn)行這個(gè)隊(duì)列進(jìn)程。如果隊(duì)列進(jìn)程意外關(guān)閉,它會(huì)自動(dòng)重啟啟動(dòng)隊(duì)列進(jìn)程。 一、概述 在Web開發(fā)中,我們經(jīng)常會(huì)遇到需要批量處理任務(wù)的場(chǎng)景,比如群發(fā)郵件、秒殺資格獲取等,我們將這些耗時(shí)或者高并發(fā)的操作放到隊(duì)列中異步執(zhí)行可以有效緩解系...
閱讀 2384·2023-04-26 02:54
閱讀 2307·2021-10-14 09:43
閱讀 3341·2021-09-22 15:19
閱讀 2837·2019-08-30 15:44
閱讀 2697·2019-08-30 12:54
閱讀 980·2019-08-29 18:43
閱讀 1932·2019-08-29 17:12
閱讀 1325·2019-08-29 16:40