摘要:定義任務處理方法。讀取來自命令行的參數,開始執行任務。該函數有兩個參數和,是引用類型,用來存儲子進程的狀態,有兩個可選常量,分別表示不等待子進程結束立即返回和等待子進程結束。
用PHP來實現異步任務一直是個難題,現有的解決方案中:PHP知名的異步框架有 swoole 和 Workerman,但都是無法在 web 環境中直接使用的,即便強行搭建 web 環境,異步調用也是使用多進程模式實現的。但有時真的不需要用啟動服務的方式,讓服務端一直等待客戶端消息,何況中間還不能改動服務端代碼。本文就介紹一下不使用任何框架和第三方庫的情況下,在 CLI 環境中如何實現多進程以及在web環境中的異步調用。在 web 環境的異步調用
常用的方式有兩種
1. 使用 socket 連接這種方式就是典型的C/S架構,需要有服務端支持。
// 1. 創建socket套接字 $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); // 2. 進行socket連接 socket_connect($socket, "127.0.0.1", "3939"); //socket_set_nonblock($socket); // 以非阻塞模式運行,由于在客戶端不實用,所以這里不考慮 // 3. 向服務端發送請求 socket_write($socket, $request, strlen($request)); // 4. 接受服務端的回應消息(忽略非阻塞的情況,如果服務端不是提供異步服務,那這一步可以省略) $recv = socket_read($socket, 2048); // 5. 關閉socket連接 socket_close($socket);2. 使用 popen 打開進程管道
這種方式是使用操作系統命令,由操作系統直接執行。
本文討論的異步調用就是使用這種方式。
$sf = "/path/to/cli_async_task.php"; //要執行的腳本文件 $op = "call"; //腳本文件接收的參數1 $data = base64_encode(serialize(["TestTask", "arg1", "arg2"])); //腳本文件接收的參數2 pclose(popen("php "$sf" --op $op --data $data &", "r")); //打開之后接著就關閉進程管道,讓該進程以守護模式運行 echo PHP_EOL."異步任務已執行。".PHP_EOL;
這種方式的優點就是:一步解決,當前進程不需要任何開銷。
缺點也很明顯:無法跟蹤任務腳本的運行狀態。
所以重頭戲會是在執行任務的腳本文件上,下面就介紹任務處理和多進程的實現方式。
注意:多進程模式僅支持Linux,不支持Windows!!
這里會從0開始(未使用任何框架和類庫)介紹每一個步驟,最后會附帶一份完整的代碼。
任何腳本不可忽視的地方就是錯誤處理。所以寫一個任務處理腳本首先就是寫錯誤處理方式。
在PHP中就是調用 set_exception_handler set_error_handler register_shutdown_function 這三個函數,然后寫上自定義的處理方法。
接著是定義自動加載函數 spl_autoload_register 免去每使用一個新類都要 require / include 的煩惱。
定義日志操作方法。
定義任務處理方法。
讀取來自命令行的參數,開始執行任務。
2. 多進程處理PHP 創建多進程是使用 pcntl_fork 函數,該函數會 fork 一份當前進程(影分身術),于是就有了兩個進程,當前進程是主進程(本體),fork 出的進程是子進程(影分身)。需要注意的是兩個進程代碼環境是一樣的,兩個進程都是執行到了 pcntl_fork 函數位置。區別就是 getmypid 獲得的進程號不一樣,最重要的區分是當調用 pcntl_fork函數時,子進程獲得的返回值是 0,而主進程獲得的是子進程的進程號 pid。
好了,當我們知道誰是子進程后,就可以讓該子進程執行任務了。
那么主進程是如何得知子進程的狀態呢?
使用 pcntl_wait。該函數有兩個參數 $status 和 $options ,$status 是引用類型,用來存儲子進程的狀態,$options 有兩個可選常量WNOHANG| WUNTRACED,分別表示不等待子進程結束立即返回和等待子進程結束。很明顯使用WUNTRACED會阻塞主進程。(也可以使用 pcntl_waitpid 函數獲取特定 pid 子進程狀態)
在多進程中,主進程要做的就是管理每個子進程的狀態,否則子進程很可能無法退出而變成僵尸進程。
關于多進程間的消息通信
這一塊需要涉及具體的業務邏輯,所以只能簡單的提一下。不考慮使用第三方比如 redis 等服務的情況下,PHP原生可以實現就是管道通信和共享內存等方式。實現起來都比較簡單,缺點就是可使用的數據容量有限,只能用簡單文本協議交換數據。
如何手動結束所有進程任務
如果多進程處理不當,很可能導致進程任務卡死,甚至占用過多系統資源,此時只能手動結束進程。
除了一個個的根據進程號來結束,還有一個快速的方法是首先在任務腳本里自定義進程名稱,就是調用cli_set_process_title函數,然后在命令行輸入:ps aux|grep cli_async_worker |grep -v grep|awk "{print $2}"|xargs kill -9 (里面的 cli_async_worker 就是自定義的進程名稱),這樣就可以快速結束多進程任務了。
未完待續...
以下是完整的任務執行腳本代碼:
可能無法直接使用,需要修改的地方有:
腳本目錄和日志目錄常量
自動加載任務類的方法(默認是加載腳本目錄中以Task結尾的文件)
其他的如:錯誤和日志處理方式和文本格式就隨意吧...
如果命名管道文件設置有錯誤,可能導致進程假死,你可能需要手動刪除進程管道通信的代碼。
多進程的例子:execAsyncTask("multi", [ "test" => ["a", "b", "c"], "grab" => [["url" => "https://www.baidu.com", "callback" => "http://localhost"]] ]);。執行情況可以在日志文件中查看。execAsyncTask函數參考【__使用popen打開進程管道__】。
[%s] %s (%s)". " ". "%s", $time, $e->getMessage(), $e->getCode(), $e->getTraceAsString() ); file_put_contents(TASK_LOGS_PATH ."/exception-".date("Ymd").".log", $msg.PHP_EOL, FILE_APPEND|LOCK_EX); }); set_error_handler(function($errno, $errmsg, $filename, $line) { if (!(error_reporting() & $errno)) return; ob_start(); debug_print_backtrace(); $backtrace = ob_get_contents(); ob_end_clean(); $datetime = date("Y-m-d H:i:s", time()); $msg = <<$header) { if (!is_numeric($_k)) $header = sprintf("%s: %s", $_k, $header); $_headers .= $header . " "; } } $headers = "Connection: close " . $_headers; $opts = array( "http" => array( "method" => strtoupper(@$job["method"] ?: "get"), "content" => @$job["data"] ?: null, "header" => $headers, "user_agent" => @$job["args"]["user_agent"] ?: "HTTPGRAB/1.0 (compatible)", "proxy" => @$job["args"]["proxy"] ?: null, "timeout" => intval(@$job["args"]["timeout"] ?: 120), "protocol_version" => @$job["args"]["protocol_version"] ?: "1.1", "max_redirects" => 3, "ignore_errors" => true ) ); $ret = @file_get_contents($url, false, stream_context_create($opts)); //debug_log($url." -->".strlen($ret)); if ($ret and isset($job["callback"])) { $postdata = http_build_query(array( "msg_id" => @$job["msg_id"] ?: 0, "url" => @$job["url"], "result" => $ret )); $opts = array( "http" => array( "method" => "POST", "header" => "Content-type:application/x-www-form-urlencoded". " ", "content" => $postdata, "timeout" => 30 ) ); file_get_contents($job["callback"], false, stream_context_create($opts)); //debug_log(json_encode(@$http_response_header)); //debug_log($job["callback"]." -->".$ret2); } return $ret; } function clean($tmpdirs, $expires=3600*24*7) { $ret = []; foreach ((array)$tmpdirs as $tmpdir) { $ret[$tmpdir] = 0; foreach (glob($tmpdir.DIRECTORY_SEPARATOR."*") as $_file) { if (fileatime($_file) < (time()-$expires)) { if (@unlink($_file)) $ret[$tmpdir]++; } } } return $ret; } function backup($file, $dest) { $zip = new ipArchive(); if (!$zip->open($file, ipArchive::CREATE)) { return false; } _backup_dir($zip, $dest); $zip->close(); return $file; } function _backup_dir($zip, $dest, $sub="") { $dest = rtrim($dest, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $sub = rtrim($sub, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $dir = opendir($dest); if (!$dir) return false; while (false !== ($file = readdir($dir))) { if (is_file($dest . $file)) { $zip->addFile($dest . $file, $sub . $file); } else { if ($file != "." and $file != ".." and is_dir($dest . $file)) { //$zip->addEmptyDir($sub . $file . DIRECTORY_SEPARATOR); _backup_dir($zip, $dest . $file, $file); } } } closedir($dir); return true; } function execute_task($op, $data) { debug_log("Start..."); $t1 = microtime(true); switch($op) { case "call": //執行任務腳本類 $cmd = $data; if (is_string($cmd) and class_exists($cmd)) $cmd = new $cmd; elseif (is_array($cmd)) { if (is_string($cmd[0]) and class_exists($cmd[0])) $cmd[0] = new $cmd[0]; } $ret = call($cmd); break; case "grab": //抓取網頁 if (is_string($data)) $data = ["url" => $data]; if (is_array($data)) $ret = grab($data); else throw new Exception("無效的命令參數!"); break; case "clean": //清理緩存文件夾:dirs 需要清理的文件夾列表,expires 過期時間(秒,默認7天) if (isset($data["dirs"])) { $ret = clean($data["dirs"], @$data["expires"]); } else { $ret = clean($data); } break; case "backup": //備份文件:zip 備份到哪個zip文件,dest 需要備份的文件夾 if (isset($data["zip"]) and is_dir($data["dest"])) $ret = backup($data["zip"], $data["dest"]); else throw new Exception("沒有指定需要備份的文件!"); break; case "require": //加載腳本文件 if (is_file($data)) $ret = require($data); else throw new Exception("不是可請求的文件!"); break; case "test": sleep(rand(1, 5)); $ret = ucfirst(strval($data)). ".PID:". getmypid(); break; case "multi": //多進程處理模式 $results = $childs = []; $fifo = TASK_LOGS_PATH . DIRECTORY_SEPARATOR . "pipe.". posix_getpid(); if (!file_exists($fifo)) { if (!posix_mkfifo($fifo, 0666)) { //開啟進程數據通信管道 throw new Exception("make pipe failed!"); } } //$shmid = shmop_open(ftok(__FILE__, "h"), "c", 0644, 4096); //共享內存 //shmop_write($shmid, serialize([]), 0); //$data = unserialize(shmop_read($shmid, 0, 4096)); //shmop_delete($shmid); //shmop_close($shmid); foreach($data as $_op => $_datas) { $_datas = (array)$_datas; //data 格式為數組表示一個 op 有多個執行數據 foreach($_datas as $_data) { $pid = pcntl_fork(); if ($pid == 0) { //子進程中執行任務 $_ret = execute_task($_op, $_data); $_pid = getmypid(); $pipe = fopen($fifo, "w"); //寫 //stream_set_blocking($pipe, false); $_ret = serialize(["pid" => $_pid, "op" => $_op, "args" => $_data, "result" => $_ret]); if (strlen($_ret) > 4096) //寫入管道的數據最大4K $_ret = serialize(["pid" => $_pid, "op" => $_op, "args" => $_data, "result" => "[RESPONSE_TOO_LONG]"]); //debug_log("write pipe: ".$_ret); fwrite($pipe, $_ret.PHP_EOL); fflush($pipe); fclose($pipe); exit(0); //退出子進程 } elseif ($pid > 0) { //主進程中記錄任務 $childs[] = $pid; $results[$pid] = 0; debug_log("fork by child: ".$pid); //pcntl_wait($status, WNOHANG); } elseif ($pid == -1) { throw new Exception("could not fork at ". getmygid()); } } } $pipe = fopen($fifo, "r+"); //讀 stream_set_blocking($pipe, true); //阻塞模式,PID與讀取的管道數據可能會不一致。 $n = 0; while(count($childs) > 0) { foreach($childs as $i => $pid) { $res = pcntl_waitpid($pid, $status, WNOHANG); if (-1 == $res || $res > 0) { $_ret = @unserialize(fgets($pipe)); //讀取管道數據 $results[$pid] = $_ret; unset($childs[$i]); debug_log("read child: ".$pid . " - " . json_encode($_ret, 64|256)); } if ($n > 1000) posix_kill($pid, SIGTERM); //超時(10分鐘)結束子進程 } usleep(200000); $n++; } debug_log("child process completed."); @fclose($pipe); @unlink($fifo); $ret = json_encode($results, 64|256); break; default: throw new Exception("沒有可執行的任務!"); break; } $t2 = microtime(true); $times = round(($t2 - $t1) * 1000, 2); $log = sprintf("[%s] %s --> (%s) %sms", strtoupper($op), @json_encode($data, 64|256), @strlen($ret)<65?$ret:@strlen($ret), $times); debug_log($log); return $ret; } // 讀取 CLI 命令行參數 $params = getopt("", array("op:", "data:")); $op = $params["op"]; $data = unserialize(base64_decode($params["data"])); // 開始執行任務 execute_task($op, $data); function __autoload($classname) { $parts = explode("", ltrim($classname, "")); if (false !== strpos(end($parts), "_")) { array_splice($parts, -1, 1, explode("_", current($parts))); } $filename = implode(DIRECTORY_SEPARATOR, $parts) . ".php"; if ($filename = stream_resolve_include_path($filename)) { include $filename; } else if (preg_match("/.*Task$/", $classname)) { //查找以Task結尾的任務腳本類 include TASK_PATH . DIRECTORY_SEPARATOR . $classname . ".php"; } else { return false; } }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/30016.html
摘要:消費者開發本例我們使用的多進程開發工具來完成這個需求,通常使用常駐進程來處理隊列的消費,所以我們使用的類型,模式。中進程負責執行郵件發送任務。此時終端將打印成功收到測試郵件官網 注意:這個是 MixPHP V1 的范例 郵件發送是很常見的需求,由于發送郵件的操作一般是比較耗時的,所以我們一般采用異步處理來提升用戶體驗,而異步通常我們使用消息隊列來實現。 傳統 MVC 框架由于缺少多進程...
摘要:管理進程會監視所有子進程的退出事件,當進程發生致命錯誤或者運行生命周期結束時,管理進程會回收此進程,并創建新的進程。換句話也就是說,對于進程的創建回收等操作全權有保姆進程進行管理。跟的交互請求到達實際上是與進程中的某個線程發生了連接。 showImg(https://segmentfault.com/img/bVbrhb2?w=600&h=360); 一、進程的基本知識 什么是進程,所...
摘要:下文如無特殊聲明將使用進程同時表示進程線程。收到數據后服務器程序進行處理然后使用向客戶端發送響應。現在各種高并發異步的服務器程序都是基于實現的,比如。 并發 IO 問題一直是服務器端編程中的技術難題,從最早的同步阻塞直接 Fork 進程,到 Worker 進程池/線程池,到現在的異步IO、協程。PHP 程序員因為有強大的 LAMP 框架,對這類底層方面的知識知之甚少,本文目的就是詳細介...
摘要:嚴格來說,并不是單線程的。其他異步和事件驅動相關的線程通過來實現內部的線程池和線程調度。線程是最小的進程,因此也是單進程的。子進程中執行的是非程序,提供一組參數后,執行的結果以回調的形式返回。在子進程中通過和的機制來接收和發送消息。 ??node遵循的是單線程單進程的模式,node的單線程是指js的引擎只有一個實例,且在nodejs的主線程中執行,同時node以事件驅動的方式處理IO...
摘要:啟動和如下信息則表示成功查看版本安裝擴展從下載最新擴展需下載最新源碼包,并解壓縮安裝安裝成功后信息然后,配置文件增加內容重啟后,出現如下信息則表示安裝擴展成功。 首發于 樊浩柏科學院 Gearman 是一個分布式任務分發系統,通過程序調用(API,跨語言)分布式地把工作委派給更適合做某項工作的機器,且這些機器可以以并發的、負載均衡的形式來共同完成某項工作。當計算密集型場景時,適合在后...
閱讀 1748·2023-04-25 16:28
閱讀 684·2021-11-23 09:51
閱讀 1467·2019-08-30 15:54
閱讀 1149·2019-08-30 15:53
閱讀 2816·2019-08-30 15:53
閱讀 3413·2019-08-30 15:43
閱讀 3250·2019-08-30 11:18
閱讀 3261·2019-08-26 10:25