摘要:并沒有使用命名管道。的創(chuàng)建創(chuàng)建匿名管道就是調(diào)用函數(shù),程序自動(dòng)設(shè)置管道為非阻塞式。函數(shù)同樣的獲取管道文件描述符根據(jù)來決定。模塊負(fù)責(zé)為進(jìn)程創(chuàng)建與。當(dāng)線程啟動(dòng)的時(shí)候,會(huì)將加入的監(jiān)控當(dāng)中。
前言
管道是進(jìn)程間通信 IPC 的最基礎(chǔ)的方式,管道有兩種類型:命名管道和匿名管道,匿名管道專門用于具有血緣關(guān)系的進(jìn)程之間,完成數(shù)據(jù)傳遞,命名管道可以用于任何兩個(gè)進(jìn)程之間。swoole 中的管道都是匿名管道。
在 swoole 中,有三種不同類型的管道,其中 swPipeBase 是最基礎(chǔ)的管道,swPipeUnsock 是利用 socketpair 實(shí)現(xiàn)的管道,swPipeEventfd 是 eventfd 實(shí)現(xiàn)的管道。swoole 并沒有使用 FIFO 命名管道。
Pipe 數(shù)據(jù)結(jié)構(gòu)不管哪種類型的管道,其基礎(chǔ)都是 swPipe,該結(jié)構(gòu)體包含一個(gè)具體的 pipe 類 object,代表著是否阻塞的 blocking,超時(shí)時(shí)間 timeout,還有對(duì)管道的操作函數(shù)read、write、getfd、close
typedef struct _swPipe { void *object; int blocking; double timeout; int (*read)(struct _swPipe *, void *recv, int length); int (*write)(struct _swPipe *, void *send, int length); int (*getFd)(struct _swPipe *, int master); int (*close)(struct _swPipe *); } swPipe;swPipeBase 匿名管道 swPipeBase 數(shù)據(jù)結(jié)構(gòu)
數(shù)據(jù)結(jié)構(gòu)非常簡(jiǎn)單,就是一個(gè)數(shù)組,存放著 pipe 的讀端和寫端。值得注意的是,swPipeBase 是半全工的管道,也就是說 pipes[0] 只能用于讀,pipes[1] 只能用于寫。
當(dāng)多個(gè)進(jìn)程共享這個(gè)管道的時(shí)候,所有的進(jìn)程讀取都需要 read 讀端 pipes[0],進(jìn)程寫入消息都要 write 寫端 pipes[1]。
因此使用這個(gè)匿名管道的時(shí)候,一般情形是一個(gè)進(jìn)程只負(fù)責(zé)寫,另一個(gè)進(jìn)程只負(fù)責(zé)讀,只能單向傳遞消息,不能雙向傳遞,否則很有可能讀到了自己剛剛發(fā)送的消息。
typedef struct _swPipeBase { int pipes[2]; } swPipeBase;swPipeBase 的創(chuàng)建
創(chuàng)建匿名管道就是調(diào)用 pipe 函數(shù),程序自動(dòng)設(shè)置管道為非阻塞式。
int swPipeBase_create(swPipe *p, int blocking) { int ret; swPipeBase *object = sw_malloc(sizeof(swPipeBase)); if (object == NULL) { return -1; } p->blocking = blocking; ret = pipe(object->pipes); if (ret < 0) { swWarn("pipe() failed. Error: %s[%d]", strerror(errno), errno); sw_free(object); return -1; } else { //Nonblock swSetNonBlock(object->pipes[0]); swSetNonBlock(object->pipes[1]); p->timeout = -1; p->object = object; p->read = swPipeBase_read; p->write = swPipeBase_write; p->getFd = swPipeBase_getFd; p->close = swPipeBase_close; } return 0; }swPipeBase_read 管道的讀
由于匿名管道被設(shè)置為非阻塞式,無法實(shí)現(xiàn)超時(shí)等待寫入。如果想要阻塞式的向管道寫入數(shù)據(jù),設(shè)置一定超時(shí)時(shí)間,就需要利用 poll 函數(shù)。當(dāng) pipefd 可讀時(shí),poll 立刻返回,或者達(dá)到超時(shí)時(shí)間。
static int swPipeBase_read(swPipe *p, void *data, int length) { swPipeBase *object = p->object; if (p->blocking == 1 && p->timeout > 0) { if (swSocket_wait(object->pipes[0], p->timeout * 1000, SW_EVENT_READ) < 0) { return SW_ERR; } } return read(object->pipes[0], data, length); } int swSocket_wait(int fd, int timeout_ms, int events) { struct pollfd event; event.fd = fd; event.events = 0; if (events & SW_EVENT_READ) { event.events |= POLLIN; } if (events & SW_EVENT_WRITE) { event.events |= POLLOUT; } while (1) { int ret = poll(&event, 1, timeout_ms); if (ret == 0) { return SW_ERR; } else if (ret < 0 && errno != EINTR) { swWarn("poll() failed. Error: %s[%d]", strerror(errno), errno); return SW_ERR; } else { return SW_OK; } } return SW_OK; }swPipeBase_write 管道的寫入
管道的寫入直接調(diào)用 write 即可,非阻塞式 IO 會(huì)立刻返回結(jié)果。
static int swPipeBase_write(swPipe *p, void *data, int length) { swPipeBase *this = p->object; return write(this->pipes[1], data, length); }swPipeBase_getFd
本函數(shù)用于獲取管道的讀端或者寫端。
static int swPipeBase_getFd(swPipe *p, int isWriteFd) { swPipeBase *this = p->object; return (isWriteFd == 0) ? this->pipes[0] : this->pipes[1]; }swPipeBase_close 關(guān)閉管道
static int swPipeBase_close(swPipe *p) { int ret1, ret2; swPipeBase *this = p->object; ret1 = close(this->pipes[0]); ret2 = close(this->pipes[1]); sw_free(this); return 0 - ret1 - ret2; }swPipeEventfd 管道 swPipeEventfd 數(shù)據(jù)結(jié)構(gòu)
數(shù)據(jù)結(jié)構(gòu)中僅僅存放 eventfd 函數(shù)返回的文件描述符。
和 pipe 管道不同的是,eventfd 只有一個(gè)文件描述符,讀和寫都是對(duì)這個(gè)文件描述符進(jìn)行操作。
該管道同樣也是只適用于進(jìn)程間單向通信。
typedef struct _swPipeEventfd { int event_fd; } swPipeEventfd;swPipeEventfd_read 管道的讀取
類似于匿名管道,eventfd 也不支持超時(shí)等待,因此還是利用 poll 函數(shù)進(jìn)行超時(shí)等待。
由于 eventfd 可能是阻塞式,因此 read 時(shí)可能會(huì)被信號(hào)打斷。
static int swPipeEventfd_read(swPipe *p, void *data, int length) { int ret = -1; swPipeEventfd *object = p->object; //eventfd not support socket timeout if (p->blocking == 1 && p->timeout > 0) { if (swSocket_wait(object->event_fd, p->timeout * 1000, SW_EVENT_READ) < 0) { return SW_ERR; } } while (1) { ret = read(object->event_fd, data, sizeof(uint64_t)); if (ret < 0 && errno == EINTR) { continue; } break; } return ret; }swPipeEventfd_write 管道的寫入
寫入和讀取的過程類似,注意被信號(hào)打斷后繼續(xù)循環(huán)即可。
static int swPipeEventfd_write(swPipe *p, void *data, int length) { int ret; swPipeEventfd *this = p->object; while (1) { ret = write(this->event_fd, data, sizeof(uint64_t)); if (ret < 0) { if (errno == EINTR) { continue; } } break; } return ret; }swPipeEventfd_getFd
static int swPipeEventfd_getFd(swPipe *p, int isWriteFd) { return ((swPipeEventfd *) (p->object))->event_fd; }swPipeEventfd_close 關(guān)閉管道
static int swPipeEventfd_close(swPipe *p) { int ret; ret = close(((swPipeEventfd *) (p->object))->event_fd); sw_free(p->object); return ret; }swPipeUnsock 管道 swPipeUnsock 數(shù)據(jù)結(jié)構(gòu)
不同于 pipe 的匿名管道,swPipeUnsock 管道是雙向通信的管道。
因此兩個(gè)進(jìn)程利用 swPipeUnsock 管道進(jìn)行通信的時(shí)候,獨(dú)占一個(gè) sock,也就是說 A 進(jìn)程讀寫都是用 socks[0],B 進(jìn)程讀寫都是用 socks[1],socks[0] 寫入的消息會(huì)在 socks[1] 讀出來,反之,socks[0] 讀出的消息是 sock[1] 寫入的,這樣就實(shí)現(xiàn)了兩個(gè)進(jìn)程的雙向通信。
typedef struct _swPipeUnsock { /** * master : socks[1] * worker : socks[0] */ int socks[2]; /** * master pipe is closed */ uint8_t pipe_master_closed; /** * worker pipe is closed */ uint8_t pipe_worker_closed; } swPipeUnsock;swPipeUnsock 的創(chuàng)建
swPipeUnsock 的創(chuàng)建主要是調(diào)用 socketpair 函數(shù),protocol 決定了創(chuàng)建的 socket 是 SOCK_DGRAM 類型還是 SOCK_STREAM 類型。
int swPipeUnsock_create(swPipe *p, int blocking, int protocol) { int ret; swPipeUnsock *object = sw_malloc(sizeof(swPipeUnsock)); if (object == NULL) { swWarn("malloc() failed."); return SW_ERR; } bzero(object, sizeof(swPipeUnsock)); p->blocking = blocking; ret = socketpair(AF_UNIX, protocol, 0, object->socks); if (ret < 0) { swWarn("socketpair() failed. Error: %s [%d]", strerror(errno), errno); sw_free(object); return SW_ERR; } else { //Nonblock if (blocking == 0) { swSetNonBlock(object->socks[0]); swSetNonBlock(object->socks[1]); } int sbsize = SwooleG.socket_buffer_size; swSocket_set_buffer_size(object->socks[0], sbsize); swSocket_set_buffer_size(object->socks[1], sbsize); p->object = object; p->read = swPipeUnsock_read; p->write = swPipeUnsock_write; p->getFd = swPipeUnsock_getFd; p->close = swPipeUnsock_close; } return 0; } int swSocket_set_buffer_size(int fd, int buffer_size) { if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffer_size, sizeof(buffer_size)) < 0) { swSysError("setsockopt(%d, SOL_SOCKET, SO_SNDBUF, %d) failed.", fd, buffer_size); return SW_ERR; } if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buffer_size, sizeof(buffer_size)) < 0) { swSysError("setsockopt(%d, SOL_SOCKET, SO_RCVBUF, %d) failed.", fd, buffer_size); return SW_ERR; } return SW_OK; }swPipeUnsock_getFd 函數(shù)
同樣的獲取管道文件描述符根據(jù) master 來決定。
static int swPipeUnsock_getFd(swPipe *p, int master) { swPipeUnsock *this = p->object; return master == 1 ? this->socks[1] : this->socks[0]; }swPipeUnsock_close 關(guān)閉管道
關(guān)閉管道就是調(diào)用 close 來依次關(guān)閉兩個(gè) socket.
static int swPipeUnsock_close(swPipe *p) { swPipeUnsock *object = p->object; int ret = swPipeUnsock_close_ext(p, 0); sw_free(object); return ret; } int swPipeUnsock_close_ext(swPipe *p, int which) { int ret1 = 0, ret2 = 0; swPipeUnsock *object = p->object; if (which == SW_PIPE_CLOSE_MASTER) { if (object->pipe_master_closed) { return SW_ERR; } ret1 = close(object->socks[1]); object->pipe_master_closed = 1; } else if (which == SW_PIPE_CLOSE_WORKER) { if (object->pipe_worker_closed) { return SW_ERR; } ret1 = close(object->socks[0]); object->pipe_worker_closed = 1; } else { ret1 = swPipeUnsock_close_ext(p, SW_PIPE_CLOSE_MASTER); ret2 = swPipeUnsock_close_ext(p, SW_PIPE_CLOSE_WORKER); } return 0 - ret1 - ret2; }管道的應(yīng)用 tasker 模塊
當(dāng)調(diào)用 taskwait 函數(shù)后,投遞的 worker 進(jìn)程會(huì)阻塞在 serv->task_notify[SwooleWG.id] 管道的讀取中,tasker 模塊處理完畢后,會(huì)向 serv->task_notify[source_worker_id] 管道寫入數(shù)據(jù)。
這個(gè)就是 pipe 函數(shù)或者 eventfd 創(chuàng)建的匿名管道的用途,用于單向的進(jìn)程通信(tasker 進(jìn)程向 worker 進(jìn)程傳遞數(shù)據(jù))。
static inline int swPipeNotify_auto(swPipe *p, int blocking, int semaphore) { #ifdef HAVE_EVENTFD return swPipeEventfd_create(p, blocking, semaphore, 0); #else return swPipeBase_create(p, blocking); #endif }worker 模塊
manager 負(fù)責(zé)為 worker 進(jìn)程創(chuàng)建 pipe_master 與 pipe_worker。用于 reactor 線程與 worker 進(jìn)程直接進(jìn)行通信。
int swManager_start(swFactory *factory) { ... for (i = 0; i < serv->worker_num; i++) { if (swPipeUnsock_create(&object->pipes[i], 1, SOCK_DGRAM) < 0) { return SW_ERR; } serv->workers[i].pipe_master = object->pipes[i].getFd(&object->pipes[i], SW_PIPE_MASTER); serv->workers[i].pipe_worker = object->pipes[i].getFd(&object->pipes[i], SW_PIPE_WORKER); serv->workers[i].pipe_object = &object->pipes[i]; swServer_store_pipe_fd(serv, serv->workers[i].pipe_object); } ... }
當(dāng) reactor 線程啟動(dòng)的時(shí)候,會(huì)將 pipe_master 加入 reactor 的監(jiān)控當(dāng)中。
static int swReactorThread_loop(swThreadParam *param) { ... for (i = 0; i < serv->worker_num; i++) { if (i % serv->reactor_num == reactor_id) { pipe_fd = serv->workers[i].pipe_master; swSetNonBlock(pipe_fd); reactor->add(reactor, pipe_fd, SW_FD_PIPE); if (thread->notify_pipe == 0) { thread->notify_pipe = serv->workers[i].pipe_worker; } } } ... }
在 worker 進(jìn)程中,會(huì)將 pipe_worker 作為另一端 socket 放入 worker 的 reactor 事件循環(huán)中進(jìn)行監(jiān)控。
int swWorker_loop(swFactory *factory, int worker_id) { ... int pipe_worker = worker->pipe_worker; swSetNonBlock(pipe_worker); SwooleG.main_reactor->ptr = serv; SwooleG.main_reactor->add(SwooleG.main_reactor, pipe_worker, SW_FD_PIPE | SW_EVENT_READ); SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_PIPE, swWorker_onPipeReceive); SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_WRITE, swReactor_onWrite); ... }tasker 進(jìn)程
tasker 進(jìn)程中管道的創(chuàng)建是 swProcessPool_create 函數(shù)完成的。
int swProcessPool_create(swProcessPool *pool, int worker_num, int max_request, key_t msgqueue_key, int ipc_mode) { ... else if (ipc_mode == SW_IPC_UNIXSOCK) { pool->pipes = sw_calloc(worker_num, sizeof(swPipe)); if (pool->pipes == NULL) { swWarn("malloc[2] failed."); return SW_ERR; } swPipe *pipe; int i; for (i = 0; i < worker_num; i++) { pipe = &pool->pipes[i]; if (swPipeUnsock_create(pipe, 1, SOCK_DGRAM) < 0) { return SW_ERR; } pool->workers[i].pipe_master = pipe->getFd(pipe, SW_PIPE_MASTER); pool->workers[i].pipe_worker = pipe->getFd(pipe, SW_PIPE_WORKER); pool->workers[i].pipe_object = pipe; } } ... }
向 tasker 進(jìn)程發(fā)布任務(wù)的時(shí)候,會(huì)調(diào)用 swProcessPool_dispatch 函數(shù),進(jìn)而會(huì)向 pipe_master 管道寫入任務(wù)數(shù)據(jù)。
int swProcessPool_dispatch(swProcessPool *pool, swEventData *data, int *dst_worker_id) { ... ret = swWorker_send2worker(worker, data, sendn, SW_PIPE_MASTER | SW_PIPE_NONBLOCK); ... } int swWorker_send2worker(swWorker *dst_worker, void *buf, int n, int flag) { int pipefd, ret; if (flag & SW_PIPE_MASTER) { pipefd = dst_worker->pipe_master; } else { pipefd = dst_worker->pipe_worker; } ... if ((flag & SW_PIPE_NONBLOCK) && SwooleG.main_reactor) { return SwooleG.main_reactor->write(SwooleG.main_reactor, pipefd, buf, n); } else { ret = swSocket_write_blocking(pipefd, buf, n); } return ret; }
tasker 進(jìn)程并沒有 reactor 事件循環(huán),只會(huì)阻塞在某個(gè)系統(tǒng)調(diào)用中,如果 tasker 進(jìn)程采用的是 unix socket 進(jìn)行投遞任務(wù)的時(shí)候,就會(huì)阻塞在對(duì)管道的 read 當(dāng)中。
static int swProcessPool_worker_loop(swProcessPool *pool, swWorker *worker) { ... while (SwooleG.running > 0 && task_n > 0) { ... else { n = read(worker->pipe_worker, &out.buf, sizeof(out.buf)); if (n < 0 && errno != EINTR) { swSysError("[Worker#%d] read(%d) failed.", worker->id, worker->pipe_worker); } } ... } ... }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/30864.html
摘要:清空主進(jìn)程殘留的定時(shí)器與信號(hào)。設(shè)定為執(zhí)行回調(diào)函數(shù)如果在回調(diào)函數(shù)中調(diào)用了異步系統(tǒng),啟動(dòng)函數(shù)進(jìn)行事件循環(huán)。因此為了區(qū)分兩者,規(guī)定并不允許兩者同時(shí)存在。 前言 swoole-1.7.2 增加了一個(gè)進(jìn)程管理模塊,用來替代 PHP 的 pcntl 擴(kuò)展。 PHP自帶的pcntl,存在很多不足,如 pcntl 沒有提供進(jìn)程間通信的功能 pcntl 不支持重定向標(biāo)準(zhǔn)輸入和輸出 pcntl 只...
摘要:是緩存區(qū)高水位線,達(dá)到了說明緩沖區(qū)即將滿了創(chuàng)建線程函數(shù)用于將監(jiān)控的存放于中向中添加監(jiān)聽的文件描述符等待所有的線程開啟事件循環(huán)利用創(chuàng)建線程,線程啟動(dòng)函數(shù)是保存監(jiān)聽本函數(shù)將用于監(jiān)聽的存放到當(dāng)中,并設(shè)置相應(yīng)的屬性 Server 的啟動(dòng) 在 server 啟動(dòng)之前,swoole 首先要調(diào)用 php_swoole_register_callback 將 PHP 的回調(diào)函數(shù)注冊(cè)到 server...
摘要:的部分是基于以及協(xié)議的。例如父進(jìn)程向中寫入子進(jìn)程從中讀取子進(jìn)程向中寫入父進(jìn)程從中讀取。默認(rèn)使用對(duì)進(jìn)程進(jìn)行分配交給對(duì)應(yīng)的線程進(jìn)行監(jiān)聽線程收到某個(gè)進(jìn)程的數(shù)據(jù)后會(huì)進(jìn)行處理值得注意的是這個(gè)線程可能并不是發(fā)送請(qǐng)求的那個(gè)線程。 作者:施洪寶 一. 基礎(chǔ)知識(shí) 1.1 swoole swoole是面向生產(chǎn)環(huán)境的php異步網(wǎng)絡(luò)通信引擎, php開發(fā)人員可以利用swoole開發(fā)出高性能的server服務(wù)。...
摘要:配合模塊,創(chuàng)建的子進(jìn)程可以異步的事件驅(qū)動(dòng)模式。默認(rèn)為阻塞讀取。函數(shù)用于將一個(gè)加入到的事件監(jiān)聽中。為事件類型的掩碼,可選擇關(guān)閉開啟可讀可寫事件,如,,或者。在程序中使用,可以理解為在進(jìn)程中將此注冊(cè)到事件中。 Process Process是swoole內(nèi)置的進(jìn)程管理模塊,用來替代PHP的pcntl擴(kuò)展。 swoole_process支持重定向標(biāo)準(zhǔn)輸入和輸出,在子進(jìn)程內(nèi)echo不會(huì)打印屏...
摘要:如果為,就不斷循環(huán),殺死或者啟動(dòng)相應(yīng)的進(jìn)程,如果為,那么就關(guān)閉所有的進(jìn)程,調(diào)用函數(shù)退出程序。調(diào)用函數(shù),監(jiān)控已結(jié)束的進(jìn)程如果函數(shù)返回異常,很有可能是被信號(hào)打斷。函數(shù)主要用于調(diào)用函數(shù),進(jìn)而調(diào)用函數(shù) swManager_loop 函數(shù) manager 進(jìn)程管理 manager 進(jìn)程開啟的時(shí)候,首先要調(diào)用 onManagerStart 回調(diào) 添加信號(hào)處理函數(shù) swSignal_add,S...
閱讀 3226·2021-10-13 09:40
閱讀 3688·2019-08-30 15:54
閱讀 1309·2019-08-30 13:20
閱讀 2993·2019-08-30 11:26
閱讀 475·2019-08-29 11:33
閱讀 1099·2019-08-26 14:00
閱讀 2357·2019-08-26 13:58
閱讀 3366·2019-08-26 10:39