摘要:消費者開發使用本例時,請確保你使用的編譯時開啟了本例我們采用的守護程序協程池來完成一個超高性能的郵件發送程序。
去年 Mix PHP V1 發布時,我寫了一個多進程的郵件發送實例: 使用 mixphp 打造多進程異步郵件發送,今年 Mix PHP V2 發布,全面的協程支持讓我們可以使用一個進程就可達到之前多個進程都無法達到的更高 IO 性能,所以今天重寫一個協程池版本的郵件發送實例。
郵件發送是很常見的需求,由于發送郵件的操作一般是比較耗時的,所以我們一般采用異步處理來提升用戶體驗,而異步通常我們使用消息隊列來實現。
下面演示一個異步郵件發送系統的開發過程,涉及知識點:
異步
消息隊列
守護進程
協程池
如何使用消息隊列實現異步PHP 使用消息隊列通常是使用中間件來實現,常用的消息中間件有:
redis
rabbitmq
kafka
本次我們選用 Redis 來實現異步郵件發送,Redis 的數據類型中有一個 list 類型,可實現消息隊列,使用以下命令:
// 入列 $redis->lpush($key, $data); // 出列 $data = $redis->rpop($key); // 阻塞出列 $data = $redis->brpop($key, 10);架構設計
本實例由傳統 MVC 框架投遞郵件發送需求(生產者),Mix PHP 編寫的守護程序執行發送任務(消費者)。
郵件發送庫選型以往我們通常使用框架提供的郵件發送庫,或者網上下載別的用戶分享的庫,composer 出現后,https://packagist.org/ 上有大量優質的庫,我們只需選擇一個最好的即可,本例選擇 swiftmailer。
由于發送任務是由 Mix PHP 執行,所以 swiftmailer 是安裝在 Mix PHP 項目中,在項目根目錄中執行以下命令安裝:
composer require swiftmailer/swiftmailer生產者開發
在郵件發送這個需求中生產者是指投遞發送任務的一方,這一方通常是一個接口或網頁,這個部分并不一定需 Mix PHP 開發,TP、CI、YII 這些都可以,只需在接口或網頁中把任務信息投遞到消息隊列中即可。
在傳統 MVC 框架的控制器中增加如下代碼:
通常框架中使用 Redis 會安裝一個類庫來使用,本例使用原生代碼,便于理解。
// 連接 $redis = new Redis(); if (!$redis->connect("127.0.0.1", 6379)) { throw new Exception("Redis connect failed."); } $redis->auth(""); $redis->select(0); // 投遞任務 $data = [ "to" => "***@qq.com", "body" => "The message content", "subject" => "The title content", ]; $redis->lpush("queue:email", serialize($data));
通常異步開發中,投遞完成后就會立即響應一個消息給用戶,當然此時該任務并沒有在生產者中執行,而是待消息被消費者獲取后才執行。
消費者開發使用本例時,請確保你使用的 Swoole 編譯時開啟了 openssl
本例我們采用 Mix PHP V2 的守護程序、協程池來完成一個超高性能的郵件發送程序。
因為我們是開發一個守護程序,所以我們在 applications/daemon 模塊中開發,首先我們在配置 applications/daemon/config/main.php 中注冊一個命令:
// 命令 "commands" => [ "mailer" => ["Mailer", "description" => "Mailer daemon."], ],
注冊的命令中指定的 Mailer 命令類,接下來我們編寫一個 MailerCommand 類:
applications/daemon/src/Commands/MailerCommand.php
*/ class MailerCommand { /** * 退出 * @var bool */ public $quit = false; /** * 主函數 */ public function main() { // 捕獲信號 ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], function ($signal) { $this->quit = true; ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], null); }); // 協程池執行任務 xgo(function () { $maxWorkers = 20; $maxQueue = 20; $jobQueue = new Channel($maxQueue); $dispatch = new Dispatcher([ "jobQueue" => $jobQueue, "maxWorkers" => $maxWorkers, ]); $dispatch->start(MailerWorker::class); // 投放任務 $redis = app()->redisPool->getConnection(); while (true) { if ($this->quit) { $dispatch->stop(); return; } try { $data = $redis->brPop(["queue:email"], 3); } catch (Throwable $e) { $dispatch->stop(); return; } if (!$data) { continue; } $data = array_pop($data); // brPop命令最后一個鍵才是值 $jobQueue->push($data); } }); } }
從 $data = $redis->brPop(["queue:email"], 3); 外部的異常捕獲可得知,當 Redis 連接出錯時,比如 Redis 重啟、連接異常時協程池會安全退出,也就是說當進程異常退出后用戶需使用 supervisor、pm2 等工具重啟守護進程。
上面是一個 Mix PHP 協程池的使用代碼,基本可以直接復制使用,框架默認包含了協程池的 Demo,本次實例只是修改了協程池的 Worker,本命令主要是完成從 Redis 隊列中獲取消息然后 push 到 jobQueue 中,jobQueue 中的數據會被 20 個 Worker 實例中某一個搶占后并行執行,本例的郵件發送代碼邏輯就在 MailerWorker 類中:
applications/daemon/src/Libraries/MailerWorker.php
*/ class MailerWorker extends AbstractWorker implements WorkerInterface { /** * 郵件發送器 * @var Mailer */ public $mailer; /** * 初始化事件 */ public function onInitialize() { parent::onInitialize(); // TODO: Change the autogenerated stub // 實例化一些需重用的對象 $this->mailer = new Mailer(); } /** * 處理 * @param $data */ public function handle($data) { // TODO: Implement handle() method. $data = unserialize($data); if (empty($data)) { return; } try { $this->mailer->send($data["to"], $data["subject"], $data["body"]); app()->log->info("Mail sent successfully:to {to} subject {subject}", $data); } catch (Throwable $e) { app()->log->error("Mail failed to send:to {to} subject {subject} error {error}", array_merge($data, ["error" => $e->getMessage()])); } } }
由以上代碼可見,Worker 在初始化時,新增了一個 Mailer 類的屬性,當 jobQueue 消息投遞過來時消息會傳遞到 handle 方法,在該方法中使用 Mailer 類的實例完成郵件發送任務,所以我們要編寫了一個 Mailer 發送程序:
applications/daemon/src/Libraries/Mailer.php
*/ class Mailer { /** * 配置信息 */ const HOST = "smtpdm.aliyun.com"; const PORT = 465; const SECURITY = "ssl"; const USERNAME = "***"; const PASSWORD = "***"; /** * Mailer constructor. */ public function __construct() { // 開啟協程鉤子 Coroutine::enableHook(); } /** * 發送 * @param $to * @param $subject * @param $body * @return int */ public function send($to, $subject, $body) { // Create the Transport $transport = (new Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY)) ->setUsername(self::USERNAME) ->setPassword(self::PASSWORD); // Create the Mailer using your created Transport $mailer = new Swift_Mailer($transport); // Create a message $message = (new Swift_Message($subject)) ->setFrom([self::USERNAME => "**網"]) ->setTo($to) ->setBody($body); // Send the message return $mailer->send($message); } }
在 Mailer 發送程序中我們使用了前面 composer 安裝的 swiftmailer 庫來發送郵件,以上就完成了全部的代碼邏輯,現在我們開始測試。
先啟動消費者守護程序:
[root@localhost bin]# ./mix-daemon mailer
將上文的生產者腳本命名為 push.php 然后在 CLI 中執行 (開一個新終端):
[root@localhost bin]# php /tmp/push.php
消費者守護程序結果:
[root@localhost bin]# ./mix-daemon mailer [info] 2019-04-15 11:48:36 [message] Mail sent successfully:to ***@qq.com subject The title content
命令行終端打印了發送成功的日志,發送完成。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/31259.html
摘要:前些時間我們發布了實例協程池異步郵件發送守護程序范例,這一次我們提供一個使用大廠通過協程化來并行執行短信發送任務,本文是一個代碼簡單性能極強的范例。 前些時間我們發布了 Mix PHP V2 實例:協程池異步郵件發送守護程序 范例,這一次我們提供一個使用大廠 SDK 通過 Swoole Hook 協程化來并行執行短信發送任務,本文是一個代碼簡單、IO 性能極強的范例。 請先升級到 m...
摘要:主函數查詢數據不手動釋放的連接不會歸還連接池,會在析構時丟棄執行結果為,說明是并行執行的。主函數查詢數據即便拋出了異常,仍然能執行到,沒有導致內的一直處于阻塞狀態。主函數一次性定時持續定時停止定時 協程 Mix PHP V2 基于 Swoole 4 的 PHP Stream Hook 協程技術開發,協程使用方式與 Golang 幾乎一致,包括框架封裝的協程池、連接池、命令行處理都大量參...
摘要:是一個非常流行的的客戶端,現在各大廠的也都開始基于開發,因為只支持的協程,而默認是使用擴展的,所以開發了,能在不修改源碼的情況下讓協程化。 Guzzle 是一個非常流行的 PHP 的 HTTP 客戶端,現在各大廠的 SDK 也都開始基于 Guzzle 開發,因為 Swoole 只支持 PHP Stream 的協程 Hook ,而 Guzzle 默認是使用 cURL 擴展的,所以 Mix...
摘要:之前過行代碼實現通用協程池今天看了下相關文檔,用也實現了一個,由于沒有的,所以實現的有點簡單,但是實用性還可以,通過工廠函數實現了通用性。官方的協程池是用只能用在。因為協程池代碼層耦合了實例化邏輯。 之前過golang40行代碼實現通用協程池 今天看了下swoole相關文檔,用PHP也實現了一個,由于swoole沒有golang的select,所以實現的有點簡單,但是實用性還可以,通過...
閱讀 3601·2021-11-23 09:51
閱讀 1473·2021-11-04 16:08
閱讀 3547·2021-09-02 09:54
閱讀 3616·2019-08-30 15:55
閱讀 2595·2019-08-30 15:54
閱讀 958·2019-08-29 16:30
閱讀 2047·2019-08-29 16:15
閱讀 2317·2019-08-29 14:05