摘要:把因執行超時的隊列從集合重新到當前執行的隊列中。從要執行的隊列中取任務可以看到在取要執行的隊列的時候,同時會放一份到一個有序集合中,并使用過期時間戳作為分值。
(原文鏈接:https://blog.tanteng.me/2017/...)
在 Laravel 中使用 Redis 處理隊列任務,框架提供的功能非常強大,但是最近遇到一個問題,就是發現一個任務被多次執行,這是為什么呢?
先說原因:因為在 Laravel 中如果一個隊列(任務)執行時間大于 60 秒,就會被認為執行失敗并重新加入隊列中,這樣就會導致重復執行同一個任務。
這個任務的邏輯就是給用戶推送內容,需要根據隊列內容取出用戶并遍歷,通過請求后端 HTTP 接口發送。比如有 10000 個用戶,在用戶數量多或接口處理速度沒那么快的情況下,執行時間肯定會大于 60 秒,于是這個任務就被重新加入隊列。情況更糟糕一點,前面的任務如果都沒有在 60 秒執行完,就都會重新加入隊列,這樣同一個任務就不止重復執行一次了,而是多次。
下面從 Laravel 源代碼找一下罪魁禍首。
源代碼文件:vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php
/** * The expiration time of a job. * * @var int|null */ protected $expire = 60;
這個 $expire 成員變量是一個固定的值,Laravel 認為一個隊列再怎么 60 秒也該執行完了吧。取隊列方法:
public function pop($queue = null) { $original = $queue ?: $this->default; $queue = $this->getQueue($queue); $this->migrateExpiredJobs($queue.":delayed", $queue); if (! is_null($this->expire)) { $this->migrateExpiredJobs($queue.":reserved", $queue); } list($job, $reserved) = $this->getConnection()->eval( LuaScripts::pop(), 2, $queue, $queue.":reserved", $this->getTime() + $this->expire ); if ($reserved) { return new RedisJob($this->container, $this, $job, $reserved, $original); } }
取隊列有幾步操作,因為隊列執行失敗,或執行超時等都會放入另外的集合保存起來,以便重試,過程如下:
1.把因執行失敗的隊列從 delayed 集合重新 rpush 到當前執行的隊列中。
2.把因執行超時的隊列從 reserved 集合重新 rpush 到當前執行的隊列中。
3.然后才是從隊列中取任務開始執行,同時把隊列放入 reserved 的有序集合。
這里使用了 eval 命令執行這個過程,用到了幾個 lua 腳本。
從要執行的隊列中取任務:
local job = redis.call("lpop", KEYS[1]) local reserved = false if(job ~= false) then reserved = cjson.decode(job) reserved["attempts"] = reserved["attempts"] + 1 reserved = cjson.encode(reserved) redis.call("zadd", KEYS[2], ARGV[1], reserved) end return {job, reserved}
可以看到 Laravel 在取 Redis 要執行的隊列的時候,同時會放一份到一個有序集合中,并使用過期時間戳作為分值。
只有當這個任務完成后,再把有序集合中這個任務移除。從這個有序集合移除隊列的代碼就省略,我們看一下 Laravel 如何處理執行時間大于 60 秒的隊列。
也就是這段 lua 腳本執行的操作:
local val = redis.call("zrangebyscore", KEYS[1], "-inf", ARGV[1]) if(next(val) ~= nil) then redis.call("zremrangebyrank", KEYS[1], 0, #val - 1) for i = 1, #val, 100 do redis.call("rpush", KEYS[2], unpack(val, i, math.min(i+99, #val))) end end return true
這里 zrangebyscore 找出分值從無限小到當前時間戳的元素,也就是 60 秒之前加入到集合的任務,然后通過 zremrangebyrank 從集合移除這些元素并 rpush 到隊列中。
看到這里應該就恍然大悟了。
如果一個隊列 60 秒沒執行完,那么進程在取隊列的時候從 reserved 集合中把這些任務又重新 rpush 到隊列中。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/26260.html
摘要:已經取消了參數,都用來執行。取數據的過程事物處理已經打開。取得符合條件的隊列后程序會更新該條數據,并且更新完后即。 connections => [ .... database => [ driver => database, table => jobs, queue => defaul...
摘要:對于定時任務的基本用法,官網文檔已經描述得很詳細了,這里不再多說。這種情況下如果定時任務能夠并行執行,就不會有這樣的問題。這個時候我們希望能夠像隊列那樣,將定時任務分散到多臺服務器上。 定時任務 Scheduled Tasks 是 Laravel 提供的組件之一,稍微上點規模的項目應該都會用到,比如開發微信應用時通過定時任務去刷新access token,比如每天定時發推送提現用戶要記...
摘要:當查詢數據時,本地范圍允許我們創建自己的查詢構造器鏈式方法。這樣便會知道這是一個本地范圍并且可以在查詢構造器中使用。某些查詢構造器不可用或者說可用但是方法名不同,關于這些請查閱所有集合的方法。 showImg(https://segmentfault.com/img/remote/1460000017877956?w=800&h=267); Laravel 因可編寫出干凈,可用可調試的...
摘要:高性能高精度定時服務,輕松管理千萬級定時任務。支持任務到期觸發和。支持創建延時任務和定時到期任務,和原生保持相同接口,輕松使用。不支持任務輸出任務鉤子及維護模式。是不指定任務名時自動生成,每個任務名必須唯一,相同任務名重復定義將會自動覆蓋。 Forsun高性能高精度定時服務,輕松管理千萬級定時任務。 定時服務項目地址:https://github.com/snower/forsun l...
閱讀 2458·2021-09-27 13:36
閱讀 2163·2019-08-29 18:47
閱讀 2129·2019-08-29 15:21
閱讀 1394·2019-08-29 11:14
閱讀 1979·2019-08-28 18:29
閱讀 1623·2019-08-28 18:04
閱讀 568·2019-08-26 13:58
閱讀 3206·2019-08-26 12:12