国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Swoole 源碼分析——Server模塊之Stream 模式

wums / 1486人閱讀

摘要:新建可以看到,自動采用包長檢測的方法該函數主要功能是設置各種回調函數值得注意的是第三個參數代表是否異步。發送數據函數并不是直接發送數據,而是將數據存儲在,等著寫事件就緒之后調用發送數據。

swReactorThread_dispatch 發送數據

reactor 線程會通過 swReactorThread_dispatch 發送數據,當采用 stream 發送數據的時候,會調用 swStream_new 新建 stream,利用 swStream_send 發送數據。

</>復制代碼

  1. int swReactorThread_dispatch(swConnection *conn, char *data, uint32_t length)
  2. {
  3. ...
  4. if (serv->dispatch_mode == SW_DISPATCH_STREAM)
  5. {
  6. swStream *stream = swStream_new(serv->stream_socket, 0, SW_SOCK_UNIX_STREAM);
  7. if (stream == NULL)
  8. {
  9. return SW_ERR;
  10. }
  11. stream->response = swReactorThread_onStreamResponse;
  12. stream->session_id = conn->session_id;
  13. swListenPort *port = swServer_get_port(serv, conn->fd);
  14. swStream_set_max_length(stream, port->protocol.package_max_length);
  15. task.data.info.fd = conn->session_id;
  16. task.data.info.type = SW_EVENT_PACKAGE_END;
  17. task.data.info.len = 0;
  18. if (swStream_send(stream, (char*) &task.data.info, sizeof(task.data.info)) < 0)
  19. {
  20. return SW_ERR;
  21. }
  22. if (swStream_send(stream, data, length) < 0)
  23. {
  24. stream->cancel = 1;
  25. return SW_ERR;
  26. }
  27. return SW_OK;
  28. }
  29. ...
  30. }
swStream_new 新建 stream

可以看到,stream 自動采用包長檢測的方法

該函數主要功能是設置各種回調函數

值得注意的是 swClient_create 第三個參數代表是否異步。在這里設置的是 1,也就是說,無論 connect 還是 send 都是異步。

</>復制代碼

  1. typedef struct _swStream
  2. {
  3. swString *buffer;
  4. uint32_t session_id;
  5. uint8_t cancel;
  6. void (*response)(struct _swStream *stream, char *data, uint32_t length);
  7. swClient client;
  8. } swStream;
  9. swStream* swStream_new(char *dst_host, int dst_port, int type)
  10. {
  11. swStream *stream = (swStream*) sw_malloc(sizeof(swStream));
  12. bzero(stream, sizeof(swStream));
  13. swClient *cli = &stream->client;
  14. if (swClient_create(cli, type, 1) < 0)
  15. {
  16. swStream_free(stream);
  17. return NULL;
  18. }
  19. cli->onConnect = swStream_onConnect;
  20. cli->onReceive = swStream_onReceive;
  21. cli->onError = swStream_onError;
  22. cli->onClose = swStream_onClose;
  23. cli->object = stream;
  24. cli->open_length_check = 1;
  25. swStream_set_protocol(&cli->protocol);
  26. if (cli->connect(cli, dst_host, dst_port, -1, 0) < 0)
  27. {
  28. swSysError("failed to connect to [%s:%d].", dst_host, dst_port);
  29. swStream_free(stream);
  30. return NULL;
  31. }
  32. else
  33. {
  34. return stream;
  35. }
  36. }
  37. void swStream_set_protocol(swProtocol *protocol)
  38. {
  39. protocol->get_package_length = swProtocol_get_package_length;
  40. protocol->package_length_size = 4;
  41. protocol->package_length_type = "N";
  42. protocol->package_body_offset = 4;
  43. protocol->package_length_offset = 0;
  44. }
swStream_onConnect 連接回調函數

swStream_onConnect 不僅是連接成功的回調函數,還是每次 onWrite 寫事件的回調函數,因此每次都需要調用 cli->send 函數,發送存儲在 stream->buffer 數據。值得注意的是,每次發送數據,都要將數據長度存放在 buffer 的頭部,否則包長檢測會失敗。

</>復制代碼

  1. static void swStream_onConnect(swClient *cli)
  2. {
  3. swStream *stream = (swStream*) cli->object;
  4. if (stream->cancel)
  5. {
  6. cli->close(cli);
  7. }
  8. *((uint32_t *) stream->buffer->str) = ntohl(stream->buffer->length - 4);
  9. if (cli->send(cli, stream->buffer->str, stream->buffer->length, 0) < 0)
  10. {
  11. cli->close(cli);
  12. }
  13. else
  14. {
  15. swString_free(stream->buffer);
  16. stream->buffer = NULL;
  17. }
  18. }
swStream_send 發送數據

swStream_send 函數并不是直接發送數據,而是將數據存儲在 stream->buffer,等著寫事件就緒之后調用 swStream_onConnect 發送數據。值得注意的是,每次新建 buffer 的時候,要預留 4 個字節來存儲 buffer 的數據長度

</>復制代碼

  1. int swStream_send(swStream *stream, char *data, size_t length)
  2. {
  3. if (stream->buffer == NULL)
  4. {
  5. stream->buffer = swString_new(swoole_size_align(length + 4, SwooleG.pagesize));
  6. if (stream->buffer == NULL)
  7. {
  8. return SW_ERR;
  9. }
  10. stream->buffer->length = 4;
  11. }
  12. if (swString_append_ptr(stream->buffer, data, length) < 0)
  13. {
  14. return SW_ERR;
  15. }
  16. return SW_OK;
  17. }
swStream_onReceive 函數

swStream_onReceive 函數是 stream 讀事件就緒的回調函數,worker 進程發送給客戶端的數據將會發送到本函數。如果 length 為 4,說明 worker 只發送了一個 length 的空數據包,代表著 worker 進程已消費完畢,這時我們可以關閉 stream。

</>復制代碼

  1. static void swStream_onReceive(swClient *cli, char *data, uint32_t length)
  2. {
  3. swStream *stream = (swStream*) cli->object;
  4. if (length == 4)
  5. {
  6. cli->socket->close_wait = 1;
  7. }
  8. else
  9. {
  10. stream->response(stream, data + 4, length - 4);
  11. }
  12. }
  13. static void swReactorThread_onStreamResponse(swStream *stream, char *data, uint32_t length)
  14. {
  15. swSendData response;
  16. swConnection *conn = swServer_connection_verify(SwooleG.serv, stream->session_id);
  17. if (!conn)
  18. {
  19. swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SESSION_NOT_EXIST, "connection[fd=%d] does not exists.", stream->session_id);
  20. return;
  21. }
  22. response.info.fd = conn->session_id;
  23. response.info.type = SW_EVENT_TCP;
  24. response.info.len = 0;
  25. response.length = length;
  26. response.data = data;
  27. swReactorThread_send(&response);
  28. }
swWorker_onStreamAccept 接受連接請求

接受請求和主進程的 reactor 接受連接大致一致,略有不同的是 conn->socket_type 設置為了 SW_SOCK_UNIX_STREAM

</>復制代碼

  1. static int swWorker_onStreamAccept(swReactor *reactor, swEvent *event)
  2. {
  3. int fd = 0;
  4. swSocketAddress client_addr;
  5. socklen_t client_addrlen = sizeof(client_addr);
  6. #ifdef HAVE_ACCEPT4
  7. fd = accept4(event->fd, (struct sockaddr *) &client_addr, &client_addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
  8. #else
  9. fd = accept(event->fd, (struct sockaddr *) &client_addr, &client_addrlen);
  10. #endif
  11. if (fd < 0)
  12. {
  13. switch (errno)
  14. {
  15. case EINTR:
  16. case EAGAIN:
  17. return SW_OK;
  18. default:
  19. swoole_error_log(SW_LOG_ERROR, SW_ERROR_SYSTEM_CALL_FAIL, "accept() failed. Error: %s[%d]", strerror(errno),
  20. errno);
  21. return SW_OK;
  22. }
  23. }
  24. #ifndef HAVE_ACCEPT4
  25. else
  26. {
  27. swoole_fcntl_set_option(fd, 1, 1);
  28. }
  29. #endif
  30. swConnection *conn = swReactor_get(reactor, fd);
  31. bzero(conn, sizeof(swConnection));
  32. conn->fd = fd;
  33. conn->active = 1;
  34. conn->socket_type = SW_SOCK_UNIX_STREAM;
  35. memcpy(&conn->info.addr, &client_addr, sizeof(client_addr));
  36. if (reactor->add(reactor, fd, SW_FD_STREAM | SW_EVENT_READ) < 0)
  37. {
  38. return SW_ERR;
  39. }
  40. return SW_OK;
  41. }
swWorker_onStreamRead 讀取數據

swWorker_onStreamRead 讀取數據核心是調用 swProtocol_recv_check_length 函數收取數據放入 serv->buffer_pool 單鏈表中,swProtocol_recv_check_length 函數我們在 reactor 線程的事件循環中已經了解了,我們這里不再重復,我們知道,該函數獲取數據之后,會調用 onPackage 函數,也就是 swWorker_onStreamPackage 函數

</>復制代碼

  1. void swStream_set_protocol(swProtocol *protocol)
  2. {
  3. protocol->get_package_length = swProtocol_get_package_length;
  4. protocol->package_length_size = 4;
  5. protocol->package_length_type = "N";
  6. protocol->package_body_offset = 4;
  7. protocol->package_length_offset = 0;
  8. }
  9. static int swWorker_onStreamRead(swReactor *reactor, swEvent *event)
  10. {
  11. swConnection *conn = event->socket;
  12. swServer *serv = SwooleG.serv;
  13. swProtocol *protocol = &serv->stream_protocol;
  14. swString *buffer;
  15. if (!event->socket->recv_buffer)
  16. {
  17. buffer = swLinkedList_shift(serv->buffer_pool);
  18. if (buffer == NULL)
  19. {
  20. buffer = swString_new(8192);
  21. if (!buffer)
  22. {
  23. return SW_ERR;
  24. }
  25. }
  26. event->socket->recv_buffer = buffer;
  27. }
  28. else
  29. {
  30. buffer = event->socket->recv_buffer;
  31. }
  32. if (swProtocol_recv_check_length(protocol, conn, buffer) < 0)
  33. {
  34. swWorker_onStreamClose(reactor, event);
  35. }
  36. return SW_OK;
  37. }
swWorker_onStreamPackage 函數

swWorker_onStreamPackage 函數用于將數據包投送到 swWorker_onTask 函數進行消費。消費完畢會發送一個只含長度 0 的數據包,告知 reactor worker 已經結束。

</>復制代碼

  1. static int swWorker_onStreamPackage(swConnection *conn, char *data, uint32_t length)
  2. {
  3. swServer *serv = SwooleG.serv;
  4. swEventData *task = (swEventData *) (data + 4);
  5. serv->last_stream_fd = conn->fd;
  6. swString *package = swWorker_get_buffer(serv, task->info.from_id);
  7. uint32_t data_length = length - sizeof(task->info) - 4;
  8. //merge data to package buffer
  9. swString_append_ptr(package, data + sizeof(task->info) + 4, data_length);
  10. swWorker_onTask(&serv->factory, task);
  11. int _end = htonl(0);
  12. SwooleG.main_reactor->write(SwooleG.main_reactor, conn->fd, (void *) &_end, sizeof(_end));
  13. return SW_OK;
  14. }

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/29277.html

相關文章

  • Swoole 源碼分析——Client模塊Send

    摘要:當此時的套接字不可寫的時候,會自動放入緩沖區中。當大于高水線時,會自動調用回調函數。寫就緒狀態當監控到套接字進入了寫就緒狀態時,就會調用函數。如果為,說明此時異步客戶端雖然建立了連接,但是還沒有調用回調函數,因此這時要調用函數。 前言 上一章我們說了客戶端的連接 connect,對于同步客戶端來說,連接已經建立成功;但是對于異步客戶端來說,此時可能還在進行 DNS 的解析,on...

    caozhijian 評論0 收藏0
  • Swoole 源碼分析——Server模塊TaskWorker事件循環

    摘要:函數事件循環在事件循環時,如果使用的是消息隊列,那么就不斷的調用從消息隊列中取出數據。獲取后的數據調用回調函數消費消息之后,向中發送空數據,告知進程已消費,并且關閉新連接。 swManager_start 創建進程流程 task_worker 進程的創建可以分為三個步驟:swServer_create_task_worker 申請所需的內存、swTaskWorker_init 初始化...

    用戶83 評論0 收藏0
  • Swoole 源碼分析——Server模塊初始化

    摘要:如果在調用之前我們設置了,但是不在第二個進程啟動前這個套接字,那么第二個進程仍然會在調用函數的時候出錯。 前言 本節主要介紹 server 模塊進行初始化的代碼,關于初始化過程中,各個屬性的意義,可以參考官方文檔: SERVER 配置選項 關于初始化過程中,用于監聽的 socket 綁定問題,可以參考: UNP 學習筆記——基本 TCP 套接字編程 UNP 學習筆記——套接字選項 構造...

    Half 評論0 收藏0
  • Swoole 源碼分析——Reactor模塊ReactorBase

    前言 作為一個網絡框架,最為核心的就是消息的接受與發送。高效的 reactor 模式一直是眾多網絡框架的首要選擇,本節主要講解 swoole 中的 reactor 模塊。 UNP 學習筆記——IO 復用 Reactor 的數據結構 Reactor 的數據結構比較復雜,首先 object 是具體 Reactor 對象的首地址,ptr 是擁有 Reactor 對象的類的指針, event_nu...

    baukh789 評論0 收藏0
  • Swoole 源碼分析——Client模塊Connect

    摘要:兩個函數是可選回調函數。附帶了一組可信任證書。應該注意的是,驗證失敗并不意味著連接不能使用。在對證書進行驗證時,有一些安全性檢查并沒有執行,包括證書的失效檢查和對證書中通用名的有效性驗證。 前言 swoole_client 提供了 tcp/udp socket 的客戶端的封裝代碼,使用時僅需 new swoole_client 即可。 swoole 的 socket client 對比...

    Charles 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<