摘要:當(dāng)此時(shí)的套接字不可寫的時(shí)候,會(huì)自動(dòng)放入緩沖區(qū)中。當(dāng)大于高水線時(shí),會(huì)自動(dòng)調(diào)用回調(diào)函數(shù)。寫就緒狀態(tài)當(dāng)監(jiān)控到套接字進(jìn)入了寫就緒狀態(tài)時(shí),就會(huì)調(diào)用函數(shù)。如果為,說(shuō)明此時(shí)異步客戶端雖然建立了連接,但是還沒(méi)有調(diào)用回調(diào)函數(shù),因此這時(shí)要調(diào)用函數(shù)。
前言
上一章我們說(shuō)了客戶端的連接 connect,對(duì)于同步客戶端來(lái)說(shuō),連接已經(jīng)建立成功;但是對(duì)于異步客戶端來(lái)說(shuō),此時(shí)可能還在進(jìn)行 DNS 的解析,onConnect 回調(diào)函數(shù)還未執(zhí)行。
本節(jié)中,我們將繼續(xù)說(shuō)明客戶端發(fā)送數(shù)據(jù)的流程,同時(shí)我們可以看到 TCP 異步客戶端執(zhí)行 onConnect 回調(diào)函數(shù)的過(guò)程。
send 入口本入口函數(shù)邏輯非常簡(jiǎn)單,從 PHP 函數(shù)中獲取數(shù)據(jù) data,然后調(diào)用 connect 函數(shù)。
static PHP_METHOD(swoole_client, send) { char *data; zend_size_t data_len; zend_long flags = 0; #ifdef FAST_ZPP ZEND_PARSE_PARAMETERS_START(1, 2) Z_PARAM_STRING(data, data_len) Z_PARAM_OPTIONAL Z_PARAM_LONG(flags) ZEND_PARSE_PARAMETERS_END(); #else if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|l", &data, &data_len, &flags) == FAILURE) { return; } #endif swClient *cli = client_get_ptr(getThis() TSRMLS_CC); //clear errno SwooleG.error = 0; int ret = cli->send(cli, data, data_len, flags); if (ret < 0) { swoole_php_sys_error(E_WARNING, "failed to send(%d) %zd bytes.", cli->socket->fd, data_len); zend_update_property_long(swoole_client_class_entry_ptr, getThis(), SW_STRL("errCode")-1, SwooleG.error TSRMLS_CC); RETVAL_FALSE; } else { RETURN_LONG(ret); } }swClient_tcp_send_sync 同步 TCP 客戶端
對(duì)于同步客戶端來(lái)說(shuō),發(fā)送數(shù)據(jù)是通過(guò) swConnection_send 函數(shù)來(lái)進(jìn)行阻塞式調(diào)用 send,當(dāng)返回的錯(cuò)誤是 EAGAIN 的時(shí)候調(diào)用 swSocket_wait 等待 1s。
static int swClient_tcp_send_sync(swClient *cli, char *data, int length, int flags) { int written = 0; int n; assert(length > 0); assert(data != NULL); while (written < length) { n = swConnection_send(cli->socket, data, length - written, flags); if (n < 0) { if (errno == EINTR) { continue; } else if (errno == EAGAIN) { swSocket_wait(cli->socket->fd, 1000, SW_EVENT_WRITE); continue; } else { SwooleG.error = errno; return SW_ERR; } } written += n; data += n; } return written; }swClient_tcp_send_async 異步 TCP 客戶端
由于異步客戶端已經(jīng)設(shè)置為非阻塞,并且加入了 reactor 的監(jiān)控,因此發(fā)送數(shù)據(jù)只需要 reactor->write 函數(shù)即可。當(dāng)此時(shí)的套接字不可寫的時(shí)候,會(huì)自動(dòng)放入 out_buff 緩沖區(qū)中。
當(dāng) out_buffer 大于高水線時(shí),會(huì)自動(dòng)調(diào)用 onBufferFull 回調(diào)函數(shù)。
static int swClient_tcp_send_async(swClient *cli, char *data, int length, int flags) { int n = length; if (cli->reactor->write(cli->reactor, cli->socket->fd, data, length) < 0) { if (SwooleG.error == SW_ERROR_OUTPUT_BUFFER_OVERFLOW) { n = -1; cli->socket->high_watermark = 1; } else { return SW_ERR; } } if (cli->onBufferFull && cli->socket->out_buffer && cli->socket->high_watermark == 0 && cli->socket->out_buffer->length >= cli->buffer_high_watermark) { cli->socket->high_watermark = 1; cli->onBufferFull(cli); } return n; }swClient_udp_send UDP 客戶端
對(duì)于 UDP 客戶端來(lái)說(shuō),如果 Socket 緩存區(qū)塞滿,并不會(huì)像 TCP 進(jìn)行等待 reactor 可寫狀態(tài),而是直接返回了結(jié)果。對(duì)于異步客戶端來(lái)說(shuō),僅僅是非阻塞調(diào)用 sendto 函數(shù)。
static int swClient_udp_send(swClient *cli, char *data, int len, int flags) { int n; n = sendto(cli->socket->fd, data, len, 0, (struct sockaddr *) &cli->server_addr.addr, cli->server_addr.len); if (n < 0 || n < len) { return SW_ERR; } else { return n; } }sendto UDP 客戶端
類似于 send 函數(shù),sendto 函數(shù)專門針對(duì) UDP 客戶端,與 send 函數(shù)不同的是,sendto 函數(shù)在底層套接字緩沖區(qū)塞滿的時(shí)候,會(huì)調(diào)用 swSocket_wait 進(jìn)行阻塞等待。
static PHP_METHOD(swoole_client, sendto) { char* ip; zend_size_t ip_len; long port; char *data; zend_size_t len; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sls", &ip, &ip_len, &port, &data, &len) == FAILURE) { return; } swClient *cli = (swClient *) swoole_get_object(getThis()); int ret; if (cli->type == SW_SOCK_UDP) { ret = swSocket_udp_sendto(cli->socket->fd, ip, port, data, len); } else if (cli->type == SW_SOCK_UDP6) { ret = swSocket_udp_sendto6(cli->socket->fd, ip, port, data, len); } else { swoole_php_fatal_error(E_WARNING, "only supports SWOOLE_SOCK_UDP or SWOOLE_SOCK_UDP6."); RETURN_FALSE; } SW_CHECK_RETURN(ret); } int swSocket_udp_sendto(int server_sock, char *dst_ip, int dst_port, char *data, uint32_t len) { struct sockaddr_in addr; if (inet_aton(dst_ip, &addr.sin_addr) == 0) { swWarn("ip[%s] is invalid.", dst_ip); return SW_ERR; } addr.sin_family = AF_INET; addr.sin_port = htons(dst_port); return swSocket_sendto_blocking(server_sock, data, len, 0, (struct sockaddr *) &addr, sizeof(addr)); } int swSocket_udp_sendto6(int server_sock, char *dst_ip, int dst_port, char *data, uint32_t len) { struct sockaddr_in6 addr; bzero(&addr, sizeof(addr)); if (inet_pton(AF_INET6, dst_ip, &addr.sin6_addr) < 0) { swWarn("ip[%s] is invalid.", dst_ip); return SW_ERR; } addr.sin6_port = (uint16_t) htons(dst_port); addr.sin6_family = AF_INET6; return swSocket_sendto_blocking(server_sock, data, len, 0, (struct sockaddr *) &addr, sizeof(addr)); } int swSocket_sendto_blocking(int fd, void *__buf, size_t __n, int flag, struct sockaddr *__addr, socklen_t __addr_len) { int n = 0; while (1) { n = sendto(fd, __buf, __n, flag, __addr, __addr_len); if (n >= 0) { break; } else { if (errno == EINTR) { continue; } else if (swConnection_error(errno) == SW_WAIT) { swSocket_wait(fd, 1000, SW_EVENT_WRITE); continue; } else { break; } } } return n; }swClient_onWrite 寫就緒狀態(tài)
當(dāng) reactor 監(jiān)控到套接字進(jìn)入了寫就緒狀態(tài)時(shí),就會(huì)調(diào)用 swClient_onWrite 函數(shù)。
從上一章我們知道,異步客戶端建立連接過(guò)程中 swoole 調(diào)用了 connect 函數(shù),該返回 0,或者返回錯(cuò)誤碼 EINPROGRESS 都會(huì)對(duì)寫就緒進(jìn)行監(jiān)控。無(wú)論哪種情況,swClient_onWrite 被調(diào)用就說(shuō)明此時(shí)連接已經(jīng)建立成功,三次握手已經(jīng)完成,但是 cli->socket->active 還是 0。
如果 cli->socket->active 為 0,說(shuō)明此時(shí)異步客戶端雖然建立了連接,但是還沒(méi)有調(diào)用 onConnect 回調(diào)函數(shù),因此這時(shí)要調(diào)用 execute_onConnect 函數(shù)。如果使用了 SSL 隧道加密,還要進(jìn)行 SSL 握手,并且設(shè)置 _socket->ssl_state = SW_SSL_STATE_WAIT_STREAM。
當(dāng) active 為 1 的時(shí)候,就可以調(diào)用 swReactor_onWrite 來(lái)發(fā)送數(shù)據(jù)。
static int swClient_onWrite(swReactor *reactor, swEvent *event) { swClient *cli = event->socket->object; swConnection *_socket = cli->socket; if (cli->socket->active) { #ifdef SW_USE_OPENSSL if (cli->open_ssl && _socket->ssl_state == SW_SSL_STATE_WAIT_STREAM) { if (swClient_ssl_handshake(cli) < 0) { goto connect_fail; } else if (_socket->ssl_state == SW_SSL_STATE_READY) { goto connect_success; } else { if (_socket->ssl_want_read) { cli->reactor->set(cli->reactor, event->fd, SW_FD_STREAM_CLIENT | SW_EVENT_READ); } return SW_OK; } } #endif if (swReactor_onWrite(cli->reactor, event) < 0) { return SW_ERR; } if (cli->onBufferEmpty && _socket->high_watermark && _socket->out_buffer->length <= cli->buffer_low_watermark) { _socket->high_watermark = 0; cli->onBufferEmpty(cli); } return SW_OK; } socklen_t len = sizeof(SwooleG.error); if (getsockopt(event->fd, SOL_SOCKET, SO_ERROR, &SwooleG.error, &len) < 0) { swWarn("getsockopt(%d) failed. Error: %s[%d]", event->fd, strerror(errno), errno); return SW_ERR; } //success if (SwooleG.error == 0) { //listen read event cli->reactor->set(cli->reactor, event->fd, SW_FD_STREAM_CLIENT | SW_EVENT_READ); //connected _socket->active = 1; #ifdef SW_USE_OPENSSL if (cli->open_ssl) { if (swClient_enable_ssl_encrypt(cli) < 0) { goto connect_fail; } if (swClient_ssl_handshake(cli) < 0) { goto connect_fail; } else { _socket->ssl_state = SW_SSL_STATE_WAIT_STREAM; } return SW_OK; } connect_success: #endif if (cli->onConnect) { execute_onConnect(cli); } } else { #ifdef SW_USE_OPENSSL connect_fail: #endif _socket->active = 0; cli->close(cli); if (cli->onError) { cli->onError(cli); } } return SW_OK; } static sw_inline void execute_onConnect(swClient *cli) { if (cli->timer) { swTimer_del(&SwooleG.timer, cli->timer); cli->timer = NULL; } cli->onConnect(cli); }client_onConnect
static void client_onConnect(swClient *cli) { zval *zobject = (zval *) cli->object; #ifdef SW_USE_OPENSSL if (cli->ssl_wait_handshake) { client_execute_callback(zobject, SW_CLIENT_CB_onSSLReady); } else #endif if (!cli->redirect) { client_execute_callback(zobject, SW_CLIENT_CB_onConnect); } else { client_callback *cb = (client_callback *) swoole_get_property(zobject, 0); if (!cb || !cb->onReceive) { swoole_php_fatal_error(E_ERROR, "has no "onReceive" callback function."); } } }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/29498.html
摘要:新建可以看到,自動(dòng)采用包長(zhǎng)檢測(cè)的方法該函數(shù)主要功能是設(shè)置各種回調(diào)函數(shù)值得注意的是第三個(gè)參數(shù)代表是否異步。發(fā)送數(shù)據(jù)函數(shù)并不是直接發(fā)送數(shù)據(jù),而是將數(shù)據(jù)存儲(chǔ)在,等著寫事件就緒之后調(diào)用發(fā)送數(shù)據(jù)。 swReactorThread_dispatch 發(fā)送數(shù)據(jù) reactor 線程會(huì)通過(guò) swReactorThread_dispatch 發(fā)送數(shù)據(jù),當(dāng)采用 stream 發(fā)送數(shù)據(jù)的時(shí)候,會(huì)調(diào)用 sw...
前言 作為一個(gè)網(wǎng)絡(luò)框架,最為核心的就是消息的接受與發(fā)送。高效的 reactor 模式一直是眾多網(wǎng)絡(luò)框架的首要選擇,本節(jié)主要講解 swoole 中的 reactor 模塊。 UNP 學(xué)習(xí)筆記——IO 復(fù)用 Reactor 的數(shù)據(jù)結(jié)構(gòu) Reactor 的數(shù)據(jù)結(jié)構(gòu)比較復(fù)雜,首先 object 是具體 Reactor 對(duì)象的首地址,ptr 是擁有 Reactor 對(duì)象的類的指針, event_nu...
摘要:兩個(gè)函數(shù)是可選回調(diào)函數(shù)。附帶了一組可信任證書(shū)。應(yīng)該注意的是,驗(yàn)證失敗并不意味著連接不能使用。在對(duì)證書(shū)進(jìn)行驗(yàn)證時(shí),有一些安全性檢查并沒(méi)有執(zhí)行,包括證書(shū)的失效檢查和對(duì)證書(shū)中通用名的有效性驗(yàn)證。 前言 swoole_client 提供了 tcp/udp socket 的客戶端的封裝代碼,使用時(shí)僅需 new swoole_client 即可。 swoole 的 socket client 對(duì)比...
摘要:對(duì)于服務(wù)端來(lái)說(shuō),緩存默認(rèn)是不能使用的,可以通過(guò)調(diào)用函數(shù)來(lái)進(jìn)行設(shè)置生效。在回調(diào)函數(shù)中,首先申請(qǐng)一個(gè)大數(shù)數(shù)據(jù)結(jié)構(gòu),然后將其設(shè)定為,該值表示公鑰指數(shù),然后利用函數(shù)生成秘鑰。此時(shí)需要調(diào)用函數(shù)將新的連接與綁定。 前言 上一篇文章我們講了 OpenSSL 的原理,接下來(lái),我們來(lái)說(shuō)說(shuō)如何利用 openssl 第三方庫(kù)進(jìn)行開(kāi)發(fā),來(lái)為 tcp 層進(jìn)行 SSL 隧道加密 OpenSSL 初始化 在 sw...
摘要:之后如果仍然有剩余未發(fā)送的數(shù)據(jù),那么就如果已經(jīng)沒(méi)有剩余數(shù)據(jù)了,繼續(xù)去取下一個(gè)數(shù)據(jù)包。拿到后,要用函數(shù)轉(zhuǎn)化為相應(yīng)的類型即可得到包長(zhǎng)值。 swPort_onRead_check_eof EOF 自動(dòng)分包 我們前面說(shuō)過(guò),swPort_onRead_raw 是最簡(jiǎn)單的向 worker 進(jìn)程發(fā)送數(shù)據(jù)包的方法,swoole 會(huì)將從客戶端接受到的數(shù)據(jù)包,立刻發(fā)送給 worker 進(jìn)程,用戶自己把...
閱讀 2993·2021-10-13 09:39
閱讀 2693·2021-09-27 13:34
閱讀 2031·2019-08-30 15:55
閱讀 3259·2019-08-30 15:43
閱讀 3630·2019-08-30 11:16
閱讀 1748·2019-08-26 18:28
閱讀 1282·2019-08-26 13:56
閱讀 913·2019-08-26 13:35