摘要:清空主進程殘留的定時器與信號。設定為執行回調函數如果在回調函數中調用了異步系統,啟動函數進行事件循環。因此為了區分兩者,規定并不允許兩者同時存在。
前言
swoole-1.7.2 增加了一個進程管理模塊,用來替代 PHP 的 pcntl 擴展。
PHP自帶的pcntl,存在很多不足,如
pcntl 沒有提供進程間通信的功能
pcntl 不支持重定向標準輸入和輸出
pcntl 只提供了 fork 這樣原始的接口,容易使用錯誤
swoole_process 提供了比 pcntl 更強大的功能,更易用的 API,使 PHP 在多進程編程方面更加輕松。
swoole_process::__construct 創建子進程在進程初始化的時候,首先要判斷當前的環境:
非 CLI 模式下不能使用
在 server master 進程下并且已經啟動了 server 后是不能創建進程的,因為此時 master 進程已經創建了 多個 reator 線程,fork 后會將多線程也復制下來。
同樣的道理,使用了異步的 AIO 的進程使用了線程池,fork 會出現非常復雜的帶線程 fork 問題。
如果當前環境可以創建進程,那么需要初始化以下屬性:
process->id:如果是普通的客戶端進程,或者是 master 進程未啟動 server 的狀態, php_swoole_worker_round_id 就是創建的 process 進程數量,此時只需要遞增即可;如果 server 已啟動,那么 php_swoole_worker_round_id 還要加上所有 worker 進程的數量。 php_swoole_worker_round_id 遞增就是 process->id。
設置重定向,讓進程的輸入輸出與主進程管道相關聯
swPipeUnsock_create 函數新建管道
static PHP_METHOD(swoole_process, __construct) { zend_bool redirect_stdin_and_stdout = 0; long pipe_type = 2; zval *callback; //only cli env if (!SWOOLE_G(cli)) { swoole_php_fatal_error(E_ERROR, "swoole_process only can be used in PHP CLI mode."); RETURN_FALSE; } if (SwooleG.serv && SwooleG.serv->gs->start == 1 && swIsMaster()) { swoole_php_fatal_error(E_ERROR, "swoole_process can"t be used in master process."); RETURN_FALSE; } if (SwooleAIO.init) { swoole_php_fatal_error(E_ERROR, "unable to create process with async-io threads."); RETURN_FALSE; } if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|bl", &callback, &redirect_stdin_and_stdout, &pipe_type) == FAILURE) { RETURN_FALSE; } char *func_name = NULL; if (!sw_zend_is_callable(callback, 0, &func_name TSRMLS_CC)) { swoole_php_fatal_error(E_ERROR, "function "%s" is not callable", func_name); efree(func_name); RETURN_FALSE; } efree(func_name); swWorker *process = emalloc(sizeof(swWorker)); bzero(process, sizeof(swWorker)); int base = 1; if (SwooleG.serv && SwooleG.serv->gs->start) { base = SwooleG.serv->worker_num + SwooleG.serv->task_worker_num + SwooleG.serv->user_worker_num; } if (php_swoole_worker_round_id == 0) { php_swoole_worker_round_id = base; } process->id = php_swoole_worker_round_id++; if (redirect_stdin_and_stdout) { process->redirect_stdin = 1; process->redirect_stdout = 1; process->redirect_stderr = 1; /** * Forced to use stream pipe */ pipe_type = 1; } if (pipe_type > 0) { swPipe *_pipe = emalloc(sizeof(swPipe)); int socket_type = pipe_type == 1 ? SOCK_STREAM : SOCK_DGRAM; if (swPipeUnsock_create(_pipe, 1, socket_type) < 0) { RETURN_FALSE; } process->pipe_object = _pipe; process->pipe_master = _pipe->getFd(_pipe, SW_PIPE_MASTER); process->pipe_worker = _pipe->getFd(_pipe, SW_PIPE_WORKER); process->pipe = process->pipe_master; zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("pipe"), process->pipe_master TSRMLS_CC); } swoole_set_object(getThis(), process); zend_update_property(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("callback"), callback TSRMLS_CC); }swoole_process->start 啟動進程
swoole_process->start 函數用于 fork 一個新進程,并且調用 php_swoole_process_start
static PHP_METHOD(swoole_process, start) { swWorker *process = swoole_get_object(getThis()); if (process->pid > 0 && kill(process->pid, 0) == 0) { swoole_php_fatal_error(E_WARNING, "process has already been started."); RETURN_FALSE; } pid_t pid = fork(); if (pid < 0) { swoole_php_fatal_error(E_WARNING, "fork() failed. Error: %s[%d]", strerror(errno), errno); RETURN_FALSE; } else if (pid > 0) { process->pid = pid; process->child_process = 0; zend_update_property_long(swoole_server_class_entry_ptr, getThis(), ZEND_STRL("pid"), process->pid TSRMLS_CC); RETURN_LONG(pid); } else { process->child_process = 1; SW_CHECK_RETURN(php_swoole_process_start(process, getThis() TSRMLS_CC)); } RETURN_TRUE; }
php_swoole_process_start 函數用于設定重定向和清理主進程殘留的一些功能:
將 STDIN_FILENO 輸入、STDOUT_FILENO 輸出、STDERR_FILENO 錯誤輸出與 pipe_worker 相綁定,實現重定向功能。
如果存在 SwooleG.main_reactor,刪除并釋放相關內存。
清空主進程殘留的定時器與信號。
設定 process_type 為 0
執行 _construct 回調函數
如果在回調函數中調用了異步系統,啟動 php_swoole_event_wait 函數進行事件循環。
int php_swoole_process_start(swWorker *process, zval *object TSRMLS_DC) { process->pipe = process->pipe_worker; process->pid = getpid(); if (process->redirect_stdin) { if (dup2(process->pipe, STDIN_FILENO) < 0) { swoole_php_fatal_error(E_WARNING, "dup2() failed. Error: %s[%d]", strerror(errno), errno); } } if (process->redirect_stdout) { if (dup2(process->pipe, STDOUT_FILENO) < 0) { swoole_php_fatal_error(E_WARNING, "dup2() failed. Error: %s[%d]", strerror(errno), errno); } } if (process->redirect_stderr) { if (dup2(process->pipe, STDERR_FILENO) < 0) { swoole_php_fatal_error(E_WARNING, "dup2() failed. Error: %s[%d]", strerror(errno), errno); } } /** * Close EventLoop */ if (SwooleG.main_reactor) { SwooleG.main_reactor->free(SwooleG.main_reactor); SwooleG.main_reactor = NULL; swTraceLog(SW_TRACE_PHP, "destroy reactor"); } bzero(&SwooleWG, sizeof(SwooleWG)); SwooleG.pid = process->pid; if (SwooleG.process_type != SW_PROCESS_USERWORKER) { SwooleG.process_type = 0; } SwooleWG.id = process->id; if (SwooleG.timer.fd) { swTimer_free(&SwooleG.timer); bzero(&SwooleG.timer, sizeof(SwooleG.timer)); } swSignal_clear(); zend_update_property_long(swoole_process_class_entry_ptr, object, ZEND_STRL("pid"), process->pid TSRMLS_CC); zend_update_property_long(swoole_process_class_entry_ptr, object, ZEND_STRL("pipe"), process->pipe_worker TSRMLS_CC); zval *zcallback = sw_zend_read_property(swoole_process_class_entry_ptr, object, ZEND_STRL("callback"), 0 TSRMLS_CC); zval **args[1]; if (zcallback == NULL || ZVAL_IS_NULL(zcallback)) { swoole_php_fatal_error(E_ERROR, "no callback."); return SW_ERR; } zval *retval = NULL; args[0] = &object; sw_zval_add_ref(&object); if (sw_call_user_function_ex(EG(function_table), NULL, zcallback, &retval, 1, args, 0, NULL TSRMLS_CC) == FAILURE) { swoole_php_fatal_error(E_ERROR, "callback function error"); return SW_ERR; } if (EG(exception)) { zend_exception_error(EG(exception), E_ERROR TSRMLS_CC); } if (retval) { sw_zval_ptr_dtor(&retval); } if (SwooleG.main_reactor) { php_swoole_event_wait(); } SwooleG.running = 0; zend_bailout(); return SW_OK; }swoole_process->write/ swoole_process->read
主進程與子進程之間進行通信可以使用 write 與 read,如果使用了 swoole_event,會自動將管道轉為非阻塞模式,由 reactor 進行事件循環讀寫,否則就會采用阻塞式讀寫。
static PHP_METHOD(swoole_process, write) { char *data = NULL; zend_size_t data_len = 0; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &data, &data_len) == FAILURE) { RETURN_FALSE; } if (data_len < 1) { swoole_php_fatal_error(E_WARNING, "the data to send is empty."); RETURN_FALSE; } swWorker *process = swoole_get_object(getThis()); if (process->pipe == 0) { swoole_php_fatal_error(E_WARNING, "no pipe, can not write into pipe."); RETURN_FALSE; } int ret; //async write if (SwooleG.main_reactor) { swConnection *_socket = swReactor_get(SwooleG.main_reactor, process->pipe); if (_socket && _socket->nonblock) { ret = SwooleG.main_reactor->write(SwooleG.main_reactor, process->pipe, data, (size_t) data_len); } else { goto _blocking_read; } } else { _blocking_read: ret = swSocket_write_blocking(process->pipe, data, data_len); } if (ret < 0) { swoole_php_error(E_WARNING, "write() failed. Error: %s[%d]", strerror(errno), errno); RETURN_FALSE; } ZVAL_LONG(return_value, ret); } static PHP_METHOD(swoole_process, read) { long buf_size = 8192; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|l", &buf_size) == FAILURE) { RETURN_FALSE; } if (buf_size > 65536) { buf_size = 65536; } swWorker *process = swoole_get_object(getThis()); if (process->pipe == 0) { swoole_php_fatal_error(E_WARNING, "no pipe, can not read from pipe."); RETURN_FALSE; } char *buf = emalloc(buf_size + 1); int ret = read(process->pipe, buf, buf_size);; if (ret < 0) { efree(buf); if (errno != EINTR) { swoole_php_error(E_WARNING, "read() failed. Error: %s[%d]", strerror(errno), errno); } RETURN_FALSE; } buf[ret] = 0; SW_ZVAL_STRINGL(return_value, buf, ret, 0); efree(buf); }swoole_process::signal 設置信號處理函數
為異步的程序添加信號處理函數。首先程序會檢查當前的進程環境與注冊的信號,不符合條件的直接返回,例如:swoole_server 中不能設置 SIGTERM 和 SIGALAM 信號,這兩個信號是 swoole 需要保留的,用戶不能進行修改。
如果此前該信號已存在信號處理函數,該函數會覆蓋以前的回調函數,之前的邏輯會再次執行一次,之后就會被銷毀。
static PHP_METHOD(swoole_process, signal) { zval *callback = NULL; long signo = 0; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lz", &signo, &callback) == FAILURE) { return; } if (!SWOOLE_G(cli)) { swoole_php_fatal_error(E_ERROR, "cannot use swoole_process::signal here."); RETURN_FALSE; } if (SwooleG.serv && SwooleG.serv->gs->start) { if ((swIsWorker() || swIsTaskWorker()) && signo == SIGTERM) { swoole_php_fatal_error(E_WARNING, "unable to register SIGTERM in worker/task process."); RETURN_FALSE; } else if (swIsManager() && (signo == SIGTERM || signo == SIGUSR1 || signo == SIGUSR2 || signo == SIGALRM)) { swoole_php_fatal_error(E_WARNING, "unable to register SIGTERM/SIGUSR1/SIGUSR2/SIGALRM in manager process."); RETURN_FALSE; } else if (swIsMaster() && (signo == SIGTERM || signo == SIGUSR1 || signo == SIGUSR2 || signo == SIGALRM || signo == SIGCHLD)) { swoole_php_fatal_error(E_WARNING, "unable to register SIGTERM/SIGUSR1/SIGUSR2/SIGALRM/SIGCHLD in manager process."); RETURN_FALSE; } } php_swoole_check_reactor(); swSignalHander handler; if (callback == NULL || ZVAL_IS_NULL(callback)) { callback = signal_callback[signo]; if (callback) { swSignal_add(signo, NULL); SwooleG.main_reactor->defer(SwooleG.main_reactor, free_signal_callback, callback); signal_callback[signo] = NULL; RETURN_TRUE; } else { swoole_php_error(E_WARNING, "no callback."); RETURN_FALSE; } } else if (Z_TYPE_P(callback) == IS_LONG && Z_LVAL_P(callback) == (long) SIG_IGN) { handler = NULL; } else { char *func_name; if (!sw_zend_is_callable(callback, 0, &func_name TSRMLS_CC)) { swoole_php_error(E_WARNING, "function "%s" is not callable", func_name); efree(func_name); RETURN_FALSE; } efree(func_name); callback = sw_zval_dup(callback); sw_zval_add_ref(&callback); handler = php_swoole_onSignal; } /** * for swSignalfd_setup */ SwooleG.main_reactor->check_signalfd = 1; //free the old callback if (signal_callback[signo]) { SwooleG.main_reactor->defer(SwooleG.main_reactor, free_signal_callback, signal_callback[signo]); } signal_callback[signo] = callback; /** * use user settings */ SwooleG.use_signalfd = SwooleG.enable_signalfd; swSignal_add(signo, handler); RETURN_TRUE; }swoole_process::alarm 進程定時器
對比 SwooleTimer 來說,swoole_process::alarm 并不是一個非常好的選擇,swoole_process::alarm 更加類似于真是的進程 alarm 定時器,alarm 只允許設定一個 alarm 信號,而 SwooleTimer 由于實現了一個定時任務最小堆,可以在不同的時間間隔執行不同的任務。因此為了區分兩者,swoole 規定并不允許兩者同時存在。
swoole_process::alarm 函數需要與 swoole_process::signal 相結合,因為其內部調用 setitimer,會周期發送 alarm 信號,需要在 swoole_process::signal 函數中設置 alarm 信號的回調函數。
static PHP_METHOD(swoole_process, alarm) { long usec = 0; long type = ITIMER_REAL; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l|l", &usec, &type) == FAILURE) { return; } if (!SWOOLE_G(cli)) { swoole_php_fatal_error(E_ERROR, "cannot use swoole_process::alarm here."); RETURN_FALSE; } if (SwooleG.timer.fd != 0) { swoole_php_fatal_error(E_WARNING, "cannot use both "timer" and "alarm" at the same time."); RETURN_FALSE; } struct timeval now; if (gettimeofday(&now, NULL) < 0) { swoole_php_error(E_WARNING, "gettimeofday() failed. Error: %s[%d]", strerror(errno), errno); RETURN_FALSE; } struct itimerval timer_set; bzero(&timer_set, sizeof(timer_set)); if (usec > 0) { long _sec = usec / 1000000; long _usec = usec - (_sec * 1000000); timer_set.it_interval.tv_sec = _sec; timer_set.it_interval.tv_usec = _usec; timer_set.it_value.tv_sec = _sec; timer_set.it_value.tv_usec = _usec; if (timer_set.it_value.tv_usec > 1e6) { timer_set.it_value.tv_usec = timer_set.it_value.tv_usec - 1e6; timer_set.it_value.tv_sec += 1; } } if (setitimer(type, &timer_set, NULL) < 0) { swoole_php_error(E_WARNING, "setitimer() failed. Error: %s[%d]", strerror(errno), errno); RETURN_FALSE; } RETURN_TRUE; }swoole_process->useQueue 消息隊列
useQueue 會利用 swMsgQueue_create 創建 process->queue。
static PHP_METHOD(swoole_process, useQueue) { long msgkey = 0; long mode = 2; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|ll", &msgkey, &mode) == FAILURE) { RETURN_FALSE; } swWorker *process = swoole_get_object(getThis()); if (msgkey <= 0) { msgkey = ftok(sw_zend_get_executed_filename(), 1); } swMsgQueue *queue = emalloc(sizeof(swMsgQueue)); if (swMsgQueue_create(queue, 1, msgkey, 0) < 0) { RETURN_FALSE; } if (mode & MSGQUEUE_NOWAIT) { swMsgQueue_set_blocking(queue, 0); mode = mode & (~MSGQUEUE_NOWAIT); } process->queue = queue; process->ipc_mode = mode; zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("msgQueueId"), queue->msg_id TSRMLS_CC); zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("msgQueueKey"), msgkey TSRMLS_CC); RETURN_TRUE; }swoole_process->push/swoole_process->pop 消息通信
推送和消費消息就是利用 swMsgQueue_push/swMsgQueue_pop 函數。
static PHP_METHOD(swoole_process, push) { char *data; zend_size_t length; struct { long type; char data[SW_MSGMAX]; } message; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &data, &length) == FAILURE) { RETURN_FALSE; } if (length <= 0) { swoole_php_fatal_error(E_WARNING, "the data to push is empty."); RETURN_FALSE; } else if (length >= sizeof(message.data)) { swoole_php_fatal_error(E_WARNING, "the data to push is too big."); RETURN_FALSE; } swWorker *process = swoole_get_object(getThis()); if (!process->queue) { swoole_php_fatal_error(E_WARNING, "no msgqueue, can not use push()"); RETURN_FALSE; } message.type = process->id; memcpy(message.data, data, length); if (swMsgQueue_push(process->queue, (swQueue_data *)&message, length) < 0) { RETURN_FALSE; } RETURN_TRUE; } static PHP_METHOD(swoole_process, pop) { long maxsize = SW_MSGMAX; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|l", &maxsize) == FAILURE) { RETURN_FALSE; } if (maxsize > SW_MSGMAX || maxsize <= 0) { maxsize = SW_MSGMAX; } swWorker *process = swoole_get_object(getThis()); if (!process->queue) { swoole_php_fatal_error(E_WARNING, "no msgqueue, can not use pop()"); RETURN_FALSE; } struct { long type; char data[SW_MSGMAX]; } message; if (process->ipc_mode == 2) { message.type = 0; } else { message.type = process->id; } int n = swMsgQueue_pop(process->queue, (swQueue_data *) &message, maxsize); if (n < 0) { RETURN_FALSE; } SW_RETURN_STRINGL(message.data, n, 1); }swoole_process::kill/swoole_process::wait
向進程發送信號 kill 與回收子進程 wait 邏輯比較簡單,就是調用對應的函數。值得注意的是 kill 之后的錯誤如果是 ESRCH,代表著相應的進程不存在。
static PHP_METHOD(swoole_process, kill) { long pid; long sig = SIGTERM; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l|l", &pid, &sig) == FAILURE) { RETURN_FALSE; } int ret = kill((int) pid, (int) sig); if (ret < 0) { if (!(sig == 0 && errno == ESRCH)) { swoole_php_error(E_WARNING, "kill(%d, %d) failed. Error: %s[%d]", (int) pid, (int) sig, strerror(errno), errno); } RETURN_FALSE; } RETURN_TRUE; } static PHP_METHOD(swoole_process, wait) { int status; zend_bool blocking = 1; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|b", &blocking) == FAILURE) { RETURN_FALSE; } int options = 0; if (!blocking) { options |= WNOHANG; } pid_t pid = swWaitpid(-1, &status, options); if (pid > 0) { array_init(return_value); add_assoc_long(return_value, "pid", pid); add_assoc_long(return_value, "code", WEXITSTATUS(status)); add_assoc_long(return_value, "signal", WTERMSIG(status)); } else { RETURN_FALSE; } } static sw_inline int swWaitpid(pid_t __pid, int *__stat_loc, int __options) { int ret; do { ret = waitpid(__pid, __stat_loc, __options); if (ret < 0 && errno == EINTR) { continue; } break; } while(1); return ret; }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/29433.html
摘要:消息隊列更常見的用途是主進程分配任務,子進程消費執行。子進程前面加了個,這是為了防止父進程還未往消息隊列中加入內容直接退出。 前面幾節都是講解pcntl擴展實現的多進程程序。本節給大家介紹swoole擴展的swoole_process模塊。 swoole多進程 swoole_process 是swoole提供的進程管理模塊,用來替代PHP的pcntl擴展。 首先,確保安裝的swoole...
摘要:修復添加超過萬個以上定時器時發生崩潰的問題增加模塊,下高性能序列化庫修復監聽端口設置無效的問題等。線程來處理網絡事件輪詢,讀取數據。當的三次握手成功了以后,由這個線程將連接成功的消息告訴進程,再由進程轉交給進程。此時進程觸發事件。 本文示例代碼詳見:https://github.com/52fhy/swoo...。 簡介 Swoole是一個PHP擴展,提供了PHP語言的異步多線程服務器...
摘要:首先,確保安裝的版本大于實例說明本例里待消費的是三個命令,會分別創建一個子進程來消費。通過管道發數據到子進程。,重定向子進程的標準輸入和輸出。 簡介 swoole_process 是swoole提供的進程管理模塊,用來替代PHP的pcntl擴展。 首先,確保安裝的swoole版本大于1.7.2: $ php --ri swoole swoole swoole support => ...
摘要:話不多說直接上代碼創建的子進程獲取異步獲取更高性能啟動子進程子進程處理邏輯異步非阻塞網關連接失敗讀取父進程管道消息父進程獲取子進程的管道消息子進程消息子進程的客戶端可以忽略不計,本只是 話不多說直接上代碼 創建的子進程: public function __construct() { $this->redis = Container::get(SwooleR...
閱讀 954·2021-11-25 09:43
閱讀 2291·2019-08-30 15:55
閱讀 3153·2019-08-30 15:44
閱讀 2053·2019-08-29 16:20
閱讀 1453·2019-08-29 12:12
閱讀 1609·2019-08-26 12:19
閱讀 2283·2019-08-26 11:49
閱讀 1712·2019-08-26 11:42