摘要:基于有序集實現延遲任務執行,比如某個時間給某個用戶發短信,訂單過期處理,等等我是在框架上寫的,實現起來很簡單對于一些不是很復雜的應用足夠了,目前在公司項目中使用,后臺進程并沒有實現多進程,不多說,貼代碼,不回排版,見諒命令行腳本執行方法這
基于redis有序集實現延遲任務執行,比如某個時間給某個用戶發短信,訂單過期處理,等等
我是在tp5框架上寫的,實現起來很簡單,對于一些不是很復雜的應用足夠了,目前在公司項目中使用,后臺進程并沒有實現多進程,
不多說,貼代碼,不回排版,見諒
1、命令行腳本 執行方法:php think delay-queue queuename(這是有序集的key)
namespace appcommand; use appcommonlibdelayqueueDelayQueue; use thinkconsoleCommand; use thinkconsoleInput; use thinkconsoleOutput; use thinkDb; class DelayQueueWorker extends Command { const COMMAND_ARGV_1 = "queue"; protected function configure() { $this->setName("delay-queue")->setDescription("延遲隊列任務進程"); $this->addArgument(self::COMMAND_ARGV_1); } protected function execute(Input $input, Output $output) { $queue = $input->getArgument(self::COMMAND_ARGV_1); //參數1 延遲隊列表名,對應與redis的有序集key名 while (true) { DelayQueue::getInstance($queue)->perform(); usleep(300000); } } }
庫類目錄結構
config.php 里是redis連接參數配置
RedisHandler.php只實現有序集的操作,重連機制還沒有實現
namespace appcommonlibdelayqueue; class RedisHandler { public $provider; private static $_instance = null; private function __construct() { $this->provider = new Redis(); //host port $config = require_once "config.php"; $this->provider->connect($config["redis_host"], $config["redis_port"]); } final private function __clone() {} public static function getInstance() { if(!self::$_instance) { self::$_instance = new RedisHandler(); } return self::$_instance; } /** * @param string $key 有序集key * @param number $score 排序值 * @param string $value 格式化的數據 * @return int */ public function zAdd($key, $score, $value) { return $this->provider->zAdd($key, $score, $value); } /** * 獲取有序集數據 * @param $key * @param $start * @param $end * @param null $withscores * @return array */ public function zRange($key, $start, $end, $withscores = null) { return $this->provider->zRange($key, $start, $end, $withscores); } /** * 刪除有序集數據 * @param $key * @param $member * @return int */ public function zRem($key,$member) { return $this->provider->zRem($key,$member); } }
延遲隊列類
namespace appcommonlibdelayqueue; class DelayQueue { private $prefix = "delay_queue:"; private $queue; private static $_instance = null; private function __construct($queue) { $this->queue = $queue; } final private function __clone() {} public static function getInstance($queue = "") { if(!self::$_instance) { self::$_instance = new DelayQueue($queue); } return self::$_instance; } /** * 添加任務信息到隊列 * * demo DelayQueue::getInstance("test")->addTask( * "appcommonlibdelayqueuejobTest", * strtotime("2018-05-02 20:55:20"), * ["abc"=>111] * ); * * @param $jobClass * @param int $runTime 執行時間 * @param array $args */ public function addTask($jobClass, $runTime, $args = null) { $key = $this->prefix.$this->queue; $params = [ "class" => $jobClass, "args" => $args, "runtime" => $runTime, ]; RedisHandler::getInstance()->zAdd( $key, $runTime, serialize($params) ); } /** * 執行job * @return bool */ public function perform() { $key = $this->prefix.$this->queue; //取出有序集第一個元素 $result = RedisHandler::getInstance()->zRange($key, 0 ,0); if (!$result) { return false; } $jobInfo = unserialize($result[0]); print_r("job: ".$jobInfo["class"]." will run at: ". date("Y-m-d H:i:s",$jobInfo["runtime"]).PHP_EOL); $jobClass = $jobInfo["class"]; if(!@class_exists($jobClass)) { print_r($jobClass." undefined". PHP_EOL); RedisHandler::getInstance()->zRem($key, $result[0]); return false; } // 到時間執行 if (time() >= $jobInfo["runtime"]) { $job = new $jobClass; $job->setPayload($jobInfo["args"]); $jobResult = $job->preform(); if ($jobResult) { // 將任務移除 RedisHandler::getInstance()->zRem($key, $result[0]); return true; } } return false; } }
異步任務基類:
namespace appcommonlibdelayqueue; class DelayJob { protected $payload; public function preform () { // todo return true; } public function setPayload($args = null) { $this->payload = $args; } }
所有異步執行的任務都卸載job目錄下,且要繼承DelayJob,你可以實現任何你想延遲執行的任務
如:
namespace appcommonlibdelayqueuejob; use appcommonlibdelayqueueDelayJob; class Test extends DelayJob { public function preform() { // payload 里應該有處理任務所需的參數,通過DelayQueue的addTask傳入 print_r("test job".PHP_EOL); return true; } }
使用方法:
假設用戶創建了一個訂單,訂單在10分鐘后失效,那么在訂單創建后加入:
DelayQueue::getInstance("close_order")->addTask( "appcommonlibdelayqueuejobCloseOrder", // 自己實現的job strtotime("2018-05-02 20:55:20"), // 訂單失效時間 ["order_id"=>123456] // 傳遞給job的參數 );
close_order 是有序集的key
命令行啟動進程
php think delay-queue close_order
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/30732.html
摘要:場景說明用于處理比較耗時的請求,例如批量發送郵件,如果直接在網頁觸發執行發送,程序會出現超時高并發場景,當某個時刻請求瞬間增加時,可以把請求寫入到隊列,后臺在去處理這些請求搶購場景,先入先出的模式命令或往列表右側推入數據客戶端阻塞直到隊列有 場景說明: 用于處理比較耗時的請求,例如批量發送郵件,如果直接在網頁觸發執行發送,程序會出現超時 高并發場景,當某個時刻請求瞬間增加時,可以把請...
摘要:場景說明用于處理比較耗時的請求,例如批量發送郵件,如果直接在網頁觸發執行發送,程序會出現超時高并發場景,當某個時刻請求瞬間增加時,可以把請求寫入到隊列,后臺在去處理這些請求搶購場景,先入先出的模式命令或往列表右側推入數據客戶端阻塞直到隊列有 場景說明: 用于處理比較耗時的請求,例如批量發送郵件,如果直接在網頁觸發執行發送,程序會出現超時 高并發場景,當某個時刻請求瞬間增加時,可以把請...
摘要:場景說明用于處理比較耗時的請求,例如批量發送郵件,如果直接在網頁觸發執行發送,程序會出現超時高并發場景,當某個時刻請求瞬間增加時,可以把請求寫入到隊列,后臺在去處理這些請求搶購場景,先入先出的模式命令或往列表右側推入數據客戶端阻塞直到隊列有 場景說明: 用于處理比較耗時的請求,例如批量發送郵件,如果直接在網頁觸發執行發送,程序會出現超時 高并發場景,當某個時刻請求瞬間增加時,可以把請...
閱讀 2226·2021-09-22 15:25
閱讀 3610·2019-08-30 12:48
閱讀 2197·2019-08-30 11:25
閱讀 2332·2019-08-30 11:05
閱讀 720·2019-08-29 17:28
閱讀 3279·2019-08-26 12:16
閱讀 2602·2019-08-26 11:31
閱讀 1683·2019-08-23 17:08