摘要:已經(jīng)取消了參數(shù),都用來(lái)執(zhí)行。取數(shù)據(jù)的過(guò)程事物處理已經(jīng)打開。取得符合條件的隊(duì)列后程序會(huì)更新該條數(shù)據(jù),并且更新完后即。
"connections" => [ .... "database" => [ "driver" => "database", "table" => "jobs", "queue" => "default", "expire" => 60, ], "redis" => [ "driver" => "redis", "connection" => "default", "queue" => "default", "expire" => 180, ], .... ],
Laravel5.2隊(duì)列驅(qū)動(dòng)config/queue.php配置文件,“database”和“redis”有個(gè)expire參數(shù),手冊(cè)上解釋是“隊(duì)列任務(wù)過(guò)期時(shí)間(秒)”,默認(rèn)為60秒。
(注:5.2和之后的配置文件發(fā)生了變化,改為"retry_after" 參數(shù),具體見手冊(cè))
網(wǎng)上搜了一下這個(gè)配置,沒(méi)有太多說(shuō)明,但是實(shí)際使用的過(guò)程中,發(fā)現(xiàn)對(duì)于執(zhí)行時(shí)間超過(guò)expire設(shè)置時(shí)間的隊(duì)列進(jìn)程,還有使用隊(duì)列進(jìn)行分布式程序部署,這個(gè)參數(shù)和這種設(shè)計(jì)模式是個(gè)大坑。。。
發(fā)現(xiàn)這個(gè)問(wèn)題是想使用分布式程序部署處理隊(duì)列,兩臺(tái)服務(wù)器部署Laravel框架artisan腳本,連接一個(gè)MYSQL數(shù)據(jù)庫(kù),使用一張jobs隊(duì)列表。
部署的后,分別啟動(dòng)兩臺(tái)服務(wù)器的腳本,發(fā)現(xiàn)后執(zhí)行的腳本,在隊(duì)列驅(qū)動(dòng)中取數(shù)據(jù),如MYSQL的jobs表,遇到先執(zhí)行的腳本隊(duì)列數(shù)據(jù)時(shí)不會(huì)跳過(guò),而是把這條數(shù)據(jù)視為Failed,儲(chǔ)存一條新數(shù)據(jù)到failed_jobs表(Laravel隊(duì)列失敗時(shí)會(huì)將隊(duì)列數(shù)據(jù)儲(chǔ)存到failed_jobs表),造成數(shù)據(jù)重復(fù)。
之前在一臺(tái)服務(wù)器啟動(dòng)3個(gè)進(jìn)程執(zhí)行腳本,并不會(huì)發(fā)生這種錯(cuò)誤,后執(zhí)行的腳本不會(huì)取得前一個(gè)進(jìn)程的隊(duì)列數(shù)據(jù),更不會(huì)判斷成Failed,多服務(wù)處理時(shí)是什么原因造成隊(duì)列驅(qū)動(dòng)中的數(shù)據(jù)錯(cuò)誤呢?
根據(jù)隊(duì)列執(zhí)行的流程,程序執(zhí)行時(shí),隊(duì)列到隊(duì)列驅(qū)動(dòng)中取任務(wù),獲得任務(wù)的過(guò)程隊(duì)列驅(qū)動(dòng)應(yīng)該做事物處理,這樣第二個(gè)進(jìn)程取任務(wù)會(huì)跳過(guò)正在執(zhí)行的隊(duì)列數(shù)據(jù)。
查了一些資料,了解了Laravel隊(duì)列的原理,最后還得看Queue的源碼。
Laravel的Queue的源碼都在IlluminateQueue目錄下。
先分析以MYSQL為驅(qū)動(dòng)的jobs表:
CREATE TABLE `jobs` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `queue` varchar(255) COLLATE utf8_unicode_ci NOT NULL, `payload` longtext COLLATE utf8_unicode_ci NOT NULL, `attempts` tinyint(3) unsigned NOT NULL, `reserved` tinyint(3) unsigned NOT NULL, `reserved_at` int(10) unsigned DEFAULT NULL, `available_at` int(10) unsigned NOT NULL, `created_at` int(10) unsigned NOT NULL, PRIMARY KEY (`id`), KEY `jobs_queue_reserved_reserved_at_index` (`queue`,`reserved`,`reserved_at`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
手冊(cè)中主要介紹了隊(duì)列任務(wù)的保存,payload字段儲(chǔ)存的是序列化后的任務(wù),Laravel隊(duì)列可以將數(shù)據(jù)模型序列化,執(zhí)行時(shí)候隊(duì)列系統(tǒng)會(huì)自動(dòng)從數(shù)據(jù)庫(kù)中獲取整個(gè)模型實(shí)例,具體說(shuō)明見手冊(cè)。
但是其他幾個(gè)狀態(tài)和時(shí)間字段才是保證隊(duì)列事物處理的關(guān)鍵字段。
“attempts”執(zhí)行次數(shù),“reserved”執(zhí)行的狀態(tài),“reserved_at”執(zhí)行開始時(shí)間,‘a(chǎn)vailable_at’預(yù)訂執(zhí)行時(shí)間,‘created_at’是隊(duì)列創(chuàng)建時(shí)間。
監(jiān)聽事件的腳本有Listener.php和Worker.php兩個(gè)腳本,看源碼說(shuō)明Listener可以處理指定的隊(duì)列,connection參數(shù),但是實(shí)際上最后都是通過(guò)work來(lái)處理隊(duì)列的。Laravel5.4已經(jīng)取消了queue:listen參數(shù),都用queue:work來(lái)執(zhí)行。不過(guò)我這里說(shuō)的是Laravel5.2的問(wèn)題,不知道是不是下面的原因使Laravel優(yōu)化去掉了listen。
繼續(xù)分析隊(duì)列處理的Worker類的源碼,取隊(duì)列數(shù)據(jù)時(shí)用pop方法,這個(gè)方法會(huì)根據(jù)傳遞的驅(qū)動(dòng)類型如database或redis,調(diào)用該驅(qū)動(dòng)的pop方法。
$connection = $this->manager->connection($connectionName); $job = $this->getNextJob($connection, $queue); // If we"re able to pull a job off of the stack, we will process it and // then immediately return back out. If there is no job on the queue // we will "sleep" the worker for the specified number of seconds. if (! is_null($job)) { return $this->process( $this->manager->getName($connectionName), $job, $maxTries, $delay ); }
下面是DatabaseQueue.php的pop方法。
/** * Pop the next job off of the queue. * * @param string $queue * @return IlluminateContractsQueueJob|null */ public function pop($queue = null) { $queue = $this->getQueue($queue); $this->database->beginTransaction(); if ($job = $this->getNextAvailableJob($queue)) { $job = $this->markJobAsReserved($job); $this->database->commit(); return new DatabaseJob( $this->container, $this, $job, $queue ); } $this->database->commit(); }
取數(shù)據(jù)的過(guò)程事物處理已經(jīng)打開。
取隊(duì)列數(shù)據(jù)的核心還是$this->getNextAvailableJob($queue)。
打開sql日志,看看隊(duì)列數(shù)據(jù)是如何查詢出來(lái)的。
/** * Get the next available job for the queue. * * @param string|null $queue * @return StdClass|null */ protected function getNextAvailableJob($queue) { $this->database->enableQueryLog(); $job = $this->database->table($this->table) ->lockForUpdate() ->where("queue", $this->getQueue($queue)) ->where(function ($query) { $this->isAvailable($query); $this->isReservedButExpired($query); }) ->orderBy("id", "asc") ->first(); var_dump($this->database->getQueryLog()); return $job ? (object) $job : null; } array(1) { [0] => array(3) { "query" => string(165) "select * from `jobs` where `queue` = ? and ((`reserved` = ? and `available_at` <= ?) or (`reserved` = ? and `reserved_at` <= ?)) order by `id` asc limit 1 for update" "bindings" => array(5) { [0] => string(7) "default" [1] => int(0) [2] => int(1493634233) [3] => int(1) [4] => int(1493634173) } "time" => double(1.55) }
從sql語(yǔ)句中可以看出,取隊(duì)列數(shù)據(jù)有兩個(gè)條件
reserved為0時(shí),available_at時(shí)間小于當(dāng)前時(shí)間,這個(gè)條件是待執(zhí)行的隊(duì)列;reserved為1時(shí),reserved_at執(zhí)行開始時(shí)間小于計(jì)算出的時(shí)間($this->isReservedButExpired),即當(dāng)前時(shí)間減去超時(shí)秒Carbon::now()->subSeconds($this->expire)->getTimestamp(),這個(gè)條件是判斷隊(duì)列任務(wù)是否過(guò)期。
整個(gè)select過(guò)程是 “for update”的,有排他鎖。
取得符合條件的隊(duì)列后
/** * Mark the given job ID as reserved. * * @param stdClass $job * @return stdClass */ protected function markJobAsReserved($job) { $job->reserved = 1; $job->attempts = $job->attempts + 1; $job->reserved_at = $this->getTime(); $this->database->table($this->table)->where("id", $job->id)->update([ "reserved" => $job->reserved, "reserved_at" => $job->reserved_at, "attempts" => $job->attempts, ]); return $job; }
程序會(huì)更新該條數(shù)據(jù),并且更新完后即commit。
同一服務(wù)器,第二個(gè)進(jìn)程取數(shù)據(jù)時(shí)候遇到悲觀鎖,需要等第一個(gè)進(jìn)程取數(shù)據(jù)更新reserved和時(shí)間后執(zhí)行。也就是說(shuō)Laravel隊(duì)列使用database時(shí),并發(fā)的進(jìn)程并不是同時(shí)取多條數(shù)據(jù),而是取同一條數(shù)據(jù)等待其中一個(gè)進(jìn)程update數(shù)據(jù)狀態(tài)和執(zhí)行時(shí)間,隊(duì)列取得數(shù)據(jù)成功后第一個(gè)操作就是更新,所以第二個(gè)進(jìn)程不會(huì)取到第一進(jìn)程的同樣數(shù)據(jù),除非是隊(duì)列過(guò)期。
在DatabaseQueue.php的pop方法中,取得隊(duì)列數(shù)據(jù)后,“$this->database->commit(); ”前 sleep(10),會(huì)很明顯的看到第二隊(duì)列沒(méi)有獲取其他隊(duì)列數(shù)據(jù),說(shuō)明“for update”只是update級(jí)排他鎖,不會(huì)排斥select。
Laravel使用database隊(duì)列有時(shí)候會(huì)有阻塞現(xiàn)象,不知道是不是這個(gè)原因造成的。
如果執(zhí)行時(shí)間過(guò)長(zhǎng),超過(guò)‘expire’參數(shù)設(shè)置時(shí)間,第二隊(duì)列會(huì)取得第一個(gè)隊(duì)列數(shù)據(jù),判斷超時(shí),這時(shí)候就會(huì)根據(jù)設(shè)置的最大執(zhí)行次數(shù)tries來(lái)判斷是插入新隊(duì)列數(shù)據(jù)繼續(xù)嘗試執(zhí)行,還是插入到錯(cuò)誤隊(duì)列“failed_jobs”表判斷隊(duì)列執(zhí)行失敗。
以上就是Laravel使用mysql執(zhí)行隊(duì)列的邏輯,之前提到的兩臺(tái)服務(wù)器部署Laravel框架執(zhí)行artisan腳本,一個(gè)jobs表隊(duì)列Failed的問(wèn)題就是服務(wù)器時(shí)間不一致的原因,后一臺(tái)服務(wù)器執(zhí)行時(shí)候?qū)⑶耙魂?duì)列數(shù)據(jù)判斷為超時(shí)而插入到“failed_jobs”一條新數(shù)據(jù),已經(jīng)達(dá)到最大失敗次數(shù)的情況,否則還會(huì)插入新的數(shù)據(jù)繼續(xù)嘗試。
最后是隊(duì)列執(zhí)行完的處理邏輯。
如果隊(duì)列執(zhí)行成功會(huì)刪除jobs的數(shù)據(jù),這沒(méi)什么問(wèn)題。如果失敗,包括超時(shí)、異常等,會(huì)根據(jù)設(shè)置的最大失敗次數(shù)判斷是否插入一條新數(shù)據(jù),或者插入一條Failed數(shù)據(jù)到“failed_jobs”表。
出現(xiàn)錯(cuò)誤時(shí),handleJobException的異常處理調(diào)用DatabaseQueue.php的release方法,$job->release($delay),最終是pushToDatabase實(shí)現(xiàn)。
插入新數(shù)據(jù)時(shí)候,attempts是失敗次數(shù),reserved為0,available_at為當(dāng)前時(shí)間戳加上延時(shí)時(shí)間參數(shù),這樣整個(gè)隊(duì)列處理就形成了完整的數(shù)據(jù)邏輯操作。
Laravel5.4對(duì)隊(duì)列功能進(jìn)行了很大的修改,手冊(cè)中的提示
任務(wù)過(guò)期和超時(shí)任務(wù)執(zhí)行時(shí)間
在配置文件 config/queue.php 中,每個(gè)連接都定義了 retry_after 項(xiàng)。該配置項(xiàng)的目的是定義任務(wù)在執(zhí)行以后多少秒后釋放回隊(duì)列。如果retry_after 設(shè)定的值為 90, 任務(wù)在運(yùn)行 90 秒后還未完成,那么將被釋放回隊(duì)列而不是刪除掉。毫無(wú)疑問(wèn),你需要把 retry_after 的值設(shè)定為任務(wù)執(zhí)行時(shí)間的最大可能值。
Laravel5.4去掉了queue的listen命令,work也增加了超時(shí)參數(shù)。Laravel5.5出來(lái)的時(shí)候應(yīng)該升級(jí)上去。
附錄:Laravel5.2測(cè)試的腳本,之前網(wǎng)上搜出來(lái)的都比較早,還是把job寫成命令的方式,其實(shí)5.2以后job使用非常簡(jiǎn)單。
jobs下定義job任務(wù),handle可以增加一些測(cè)試方案,比如我這種拋出異常,直接Failed的
class MyJob extends Job implements ShouldQueue { use InteractsWithQueue, SerializesModels; private $key; private $value; /** * Create a new job instance. * * @return void */ public function __construct($key, $value) { $this->key = $key; $this->value = $value; } /** * Execute the job. * * @return void */ public function handle() { for($i=0;$i<20;$i++){ echo "$i "; sleep(1); } echo "sss ".$this->key." ".date("Y-m-d H:i:s")." "; throw new Exception("測(cè)試 "); // Redis::hset("queue.test", $this->key, $this->value); } public function failed() { dump("failed"); } }
控制器訪問(wèn)設(shè)置任務(wù)隊(duì)列,key和value之前用了測(cè)試redis插入的,可以按自己的測(cè)試方案設(shè)置job參數(shù)。
for($i = 0; $i < 5; $i ++) { echo "$i"; $job = (new MyJob($i, $i))->delay(20); $this->dispatch($job); }
我的例子設(shè)置了5個(gè)隊(duì)列,開啟多個(gè)shell并發(fā)執(zhí)行artisan測(cè)試吧。
本來(lái)想將redis隊(duì)列代碼讀完,一起發(fā)出來(lái)的,最近事情太多,redis代碼也沒(méi)怎么看。
redis驅(qū)動(dòng)可以參考 http://www.cnblogs.com/z12987... 這篇文章對(duì)Laravel隊(duì)列redis驅(qū)動(dòng)邏輯介紹的很詳細(xì)了,redis驅(qū)動(dòng)使用的list和zset結(jié)構(gòu)儲(chǔ)存隊(duì)列,執(zhí)行過(guò)程會(huì)移除轉(zhuǎn)存隊(duì)列,沒(méi)有數(shù)據(jù)庫(kù)的“for update” 操作,所以應(yīng)該不是存在隊(duì)列阻塞的情況。
BUT隊(duì)列任務(wù)過(guò)期時(shí)間設(shè)置和數(shù)據(jù)庫(kù)驅(qū)動(dòng)是一樣的,所以同樣
queue:listen的執(zhí)行時(shí)間參數(shù) --timeout=60,一定要設(shè)置小于隊(duì)列任務(wù)過(guò)期時(shí)間expire參數(shù)!終于寫完了。。。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/22916.html
摘要:配置項(xiàng)用于配置失敗隊(duì)列任務(wù)存放的數(shù)據(jù)庫(kù)及數(shù)據(jù)表。要使用隊(duì)列驅(qū)動(dòng),需要在配置文件中配置數(shù)據(jù)庫(kù)連接。如果應(yīng)用使用了,那么可以使用時(shí)間或并發(fā)來(lái)控制隊(duì)列任務(wù)。你可以使用命令運(yùn)行這個(gè)隊(duì)列進(jìn)程。如果隊(duì)列進(jìn)程意外關(guān)閉,它會(huì)自動(dòng)重啟啟動(dòng)隊(duì)列進(jìn)程。 一、概述 在Web開發(fā)中,我們經(jīng)常會(huì)遇到需要批量處理任務(wù)的場(chǎng)景,比如群發(fā)郵件、秒殺資格獲取等,我們將這些耗時(shí)或者高并發(fā)的操作放到隊(duì)列中異步執(zhí)行可以有效緩解系...
摘要:我在前面的文章中也提到了應(yīng)該怎么做自我介紹與項(xiàng)目介紹,詳情可以查看這篇文章備戰(zhàn)春招秋招系列初出茅廬的程序員該如何準(zhǔn)備面試。因此基于事件消息對(duì)象驅(qū)動(dòng)的業(yè)務(wù)架構(gòu)可以是一系列流程。 showImg(https://user-gold-cdn.xitu.io/2018/11/14/16711ac29c2ae52c?w=928&h=531&f=png&s=798562); 一 消息隊(duì)列MQ的...
摘要:深入淺出一直想致力于寫一篇關(guān)于廣義講解系統(tǒng)的文章,苦于時(shí)間有限,資源有限。事件驅(qū)動(dòng)機(jī)制是通過(guò)內(nèi)部單線程高效率地維護(hù)事件循環(huán)隊(duì)列來(lái)實(shí)現(xiàn)的,沒(méi)有多線程的資源占用和上下文的切換。 深入淺出Node.js 一直想致力于寫一篇關(guān)于廣義講解Node.js系統(tǒng)的文章,苦于時(shí)間有限,資源有限。這篇文章是在結(jié)合自己的學(xué)習(xí)心得以及與行業(yè)大佬共同探討下爭(zhēng)對(duì)于熟練掌握J(rèn)S語(yǔ)言后的廣義Node.js.至于為什么...
閱讀 1872·2019-08-30 15:53
閱讀 3198·2019-08-30 15:44
閱讀 2811·2019-08-26 13:31
閱讀 1953·2019-08-26 12:10
閱讀 799·2019-08-26 11:01
閱讀 2128·2019-08-23 15:32
閱讀 1588·2019-08-23 13:43
閱讀 2536·2019-08-23 11:58