摘要:如果在調用之前我們設置了,但是不在第二個進程啟動前這個套接字,那么第二個進程仍然會在調用函數的時候出錯。
前言
本節主要介紹 server 模塊進行初始化的代碼,關于初始化過程中,各個屬性的意義,可以參考官方文檔:
SERVER 配置選項
關于初始化過程中,用于監聽的 socket 綁定問題,可以參考:
UNP 學習筆記——基本 TCP 套接字編程
UNP 學習筆記——套接字選項
構造 server 對象構造 server 對象最重要的是兩件事:swServer_init 初始化 server、為 server 添加端口:
PHP_METHOD(swoole_server, __construct) { zend_size_t host_len = 0; char *serv_host; long sock_type = SW_SOCK_TCP; long serv_port = 0; long serv_mode = SW_MODE_PROCESS; swServer *serv = sw_malloc(sizeof (swServer)); swServer_init(serv); serv->factory_mode = serv_mode; if (serv_port == 0 && strcasecmp(serv_host, "SYSTEMD") == 0) { if (swserver_add_systemd_socket(serv) <= 0) { swoole_php_fatal_error(E_ERROR, "failed to add systemd socket."); return; } } else { swListenPort *port = swServer_add_port(serv, sock_type, serv_host, serv_port); } }swServer_init 函數
swServer_init 函數主要為 serv 對象賦值初值,如果想要更改 serv 對象各個屬性,可以調用 set 函數
serv->gs 是全局共享內存
void swServer_init(swServer *serv) { swoole_init(); bzero(serv, sizeof(swServer)); serv->factory_mode = SW_MODE_BASE; serv->reactor_num = SW_REACTOR_NUM > SW_REACTOR_MAX_THREAD ? SW_REACTOR_MAX_THREAD : SW_REACTOR_NUM; serv->dispatch_mode = SW_DISPATCH_FDMOD; serv->worker_num = SW_CPU_NUM; serv->max_connection = SwooleG.max_sockets < SW_SESSION_LIST_SIZE ? SwooleG.max_sockets : SW_SESSION_LIST_SIZE; serv->max_request = 0; serv->max_wait_time = SW_WORKER_MAX_WAIT_TIME; //http server serv->http_parse_post = 1; serv->upload_tmp_dir = sw_strdup("/tmp"); //heartbeat check serv->heartbeat_idle_time = SW_HEARTBEAT_IDLE; serv->heartbeat_check_interval = SW_HEARTBEAT_CHECK; serv->buffer_input_size = SW_BUFFER_INPUT_SIZE; serv->buffer_output_size = SW_BUFFER_OUTPUT_SIZE; serv->task_ipc_mode = SW_TASK_IPC_UNIXSOCK; /** * alloc shared memory */ serv->stats = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swServerStats)); if (serv->stats == NULL) { swError("[Master] Fatal Error: failed to allocate memory for swServer->stats."); } serv->gs = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swServerGS)); if (serv->gs == NULL) { swError("[Master] Fatal Error: failed to allocate memory for swServer->gs."); } SwooleG.serv = serv; }swoole_init 函數
swoole_init 函數用于初始化全局變量 SwooleG 的各個屬性
SwooleGS 是全局的共享內存
SwooleTG 是線程特有數據,每個線程都有自己獨特的數據
extern swServerG SwooleG; //Local Global Variable extern SwooleGS_t *SwooleGS; //Share Memory Global Variable extern __thread swThreadG SwooleTG; //Thread Global Variable typedef struct { swLock lock; swLock lock_2; } SwooleGS_t; void swoole_init(void) { struct rlimit rlmt; if (SwooleG.running) { return; } bzero(&SwooleG, sizeof(SwooleG)); bzero(&SwooleWG, sizeof(SwooleWG)); bzero(sw_error, SW_ERROR_MSG_SIZE); SwooleG.running = 1; SwooleG.enable_coroutine = 1; sw_errno = 0; SwooleG.log_fd = STDOUT_FILENO; SwooleG.cpu_num = sysconf(_SC_NPROCESSORS_ONLN); SwooleG.pagesize = getpagesize(); SwooleG.pid = getpid(); SwooleG.socket_buffer_size = SW_SOCKET_BUFFER_SIZE; #ifdef SW_DEBUG SwooleG.log_level = 0; #else SwooleG.log_level = SW_LOG_INFO; #endif //get system uname uname(&SwooleG.uname); //random seed srandom(time(NULL)); //init global shared memory SwooleG.memory_pool = swMemoryGlobal_new(SW_GLOBAL_MEMORY_PAGESIZE, 1); if (SwooleG.memory_pool == NULL) { printf("[Master] Fatal Error: global memory allocation failure."); exit(1); } SwooleGS = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(SwooleGS_t)); if (SwooleGS == NULL) { printf("[Master] Fatal Error: failed to allocate memory for SwooleGS."); exit(2); } //init global lock swMutex_create(&SwooleGS->lock, 1); swMutex_create(&SwooleGS->lock_2, 1); swMutex_create(&SwooleG.lock, 0); if (getrlimit(RLIMIT_NOFILE, &rlmt) < 0) { swWarn("getrlimit() failed. Error: %s[%d]", strerror(errno), errno); SwooleG.max_sockets = 1024; } else { SwooleG.max_sockets = (uint32_t) rlmt.rlim_cur; } SwooleTG.buffer_stack = swString_new(8192); if (SwooleTG.buffer_stack == NULL) { exit(3); } if (!SwooleG.task_tmpdir) { SwooleG.task_tmpdir = sw_strndup(SW_TASK_TMP_FILE, sizeof(SW_TASK_TMP_FILE)); SwooleG.task_tmpdir_len = sizeof(SW_TASK_TMP_FILE); } char *tmp_dir = swoole_dirname(SwooleG.task_tmpdir); //create tmp dir if (access(tmp_dir, R_OK) < 0 && swoole_mkdir_recursive(tmp_dir) < 0) { swWarn("create task tmp dir(%s) failed.", tmp_dir); } if (tmp_dir) { sw_free(tmp_dir); } //init signalfd #ifdef HAVE_SIGNALFD swSignalfd_init(); SwooleG.use_signalfd = 1; SwooleG.enable_signalfd = 1; #endif //timerfd #ifdef HAVE_TIMERFD SwooleG.use_timerfd = 1; #endif SwooleG.use_timer_pipe = 1; }swServer_add_port 函數
swServer_add_port 函數為服務端添加監聽的端口
首先需要檢測 listen_port_num 已監聽的端口不能大于 SW_MAX_LISTEN_PORT(默認為 60000)
如果 socket 的類型不是 unix sock,那么端口號必須大于等于 0,小于 65535
host 主域名長度也不能大于 SW_HOST_MAXSIZE(104)
然后從共享內存池中申請一個 swListenPort 類型的對象,然后調用 swPort_init 對端口對象進行初始化
利用函數 swSocket_create 創建一個 socket 對象,并返回其文件描述符
調用 swSocket_bind 函數將 socket 綁定到對應的主域與端口上來
如果協議是數據報(UDP),而不是數據流時,需要設置 socket 的發送緩存與接收緩存為 socket_buffer_size
設置 socket 為非阻塞、O_CLOEXEC(exec 之后文件描述符自動關閉)
根據協議類型設置 have_udp_sock、have_tcp_sock、udp_socket_ipv4/udp_socket_ipv6 等等屬性
遞增 listen_port_num ,向單鏈表 listen_list 中添加 swListenPort 對象
enum swSocket_type { SW_SOCK_TCP = 1, SW_SOCK_UDP = 2, SW_SOCK_TCP6 = 3, SW_SOCK_UDP6 = 4, SW_SOCK_UNIX_DGRAM = 5, //unix sock dgram SW_SOCK_UNIX_STREAM = 6, //unix sock stream }; swListenPort* swServer_add_port(swServer *serv, int type, char *host, int port) { if (serv->listen_port_num >= SW_MAX_LISTEN_PORT) { swoole_error_log(SW_LOG_ERROR, SW_ERROR_SERVER_TOO_MANY_LISTEN_PORT, "allows up to %d ports to listen", SW_MAX_LISTEN_PORT); return NULL; } if (!(type == SW_SOCK_UNIX_DGRAM || type == SW_SOCK_UNIX_STREAM) && (port < 0 || port > 65535)) { swoole_error_log(SW_LOG_ERROR, SW_ERROR_SERVER_INVALID_LISTEN_PORT, "invalid port [%d]", port); return NULL; } if (strlen(host) + 1 > SW_HOST_MAXSIZE) { swoole_error_log(SW_LOG_ERROR, SW_ERROR_NAME_TOO_LONG, "address "%s" exceeds %d characters limit", host, SW_HOST_MAXSIZE - 1); return NULL; } swListenPort *ls = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swListenPort)); if (ls == NULL) { swError("alloc failed"); return NULL; } swPort_init(ls); ls->type = type; ls->port = port; strncpy(ls->host, host, strlen(host) + 1); if (type & SW_SOCK_SSL) { type = type & (~SW_SOCK_SSL); if (swSocket_is_stream(type)) { ls->type = type; ls->ssl = 1; #ifdef SW_USE_OPENSSL ls->ssl_config.prefer_server_ciphers = 1; ls->ssl_config.session_tickets = 0; ls->ssl_config.stapling = 1; ls->ssl_config.stapling_verify = 1; ls->ssl_config.ciphers = sw_strdup(SW_SSL_CIPHER_LIST); ls->ssl_config.ecdh_curve = sw_strdup(SW_SSL_ECDH_CURVE); #endif } } //create server socket int sock = swSocket_create(ls->type); if (sock < 0) { swSysError("create socket failed."); return NULL; } //bind address and port if (swSocket_bind(sock, ls->type, ls->host, &ls->port) < 0) { close(sock); return NULL; } //dgram socket, setting socket buffer size if (swSocket_is_dgram(ls->type)) { setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &ls->socket_buffer_size, sizeof(int)); setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &ls->socket_buffer_size, sizeof(int)); } //O_NONBLOCK & O_CLOEXEC swoole_fcntl_set_option(sock, 1, 1); ls->sock = sock; if (swSocket_is_dgram(ls->type)) { serv->have_udp_sock = 1; serv->dgram_port_num++; if (ls->type == SW_SOCK_UDP) { serv->udp_socket_ipv4 = sock; } else if (ls->type == SW_SOCK_UDP6) { serv->udp_socket_ipv6 = sock; } } else { serv->have_tcp_sock = 1; } LL_APPEND(serv->listen_list, ls); serv->listen_port_num++; return ls; }swPort_init 函數
swPort_init 函數用于初始化 swListenPort 對象
backlog、tcp_keepcount、tcp_keepidle 等等都是相應 socket 的屬性
在外網通信時,有些客戶端發送數據的速度較慢,每次只能發送一小段數據。這樣 onReceive 到的數據就不是一個完整的包。 還有些客戶端是逐字節發送數據的,如果每次回調 onReceive 會拖慢整個系統。Length_Check 和 EOF_Check 的使用。package_length_type、package_eof 等等就是相關參數的具體參數。
#define SW_DATA_EOF " " void swPort_init(swListenPort *port) { port->sock = 0; port->ssl = 0; //listen backlog port->backlog = SW_BACKLOG; //tcp keepalive port->tcp_keepcount = SW_TCP_KEEPCOUNT; port->tcp_keepinterval = SW_TCP_KEEPINTERVAL; port->tcp_keepidle = SW_TCP_KEEPIDLE; port->open_tcp_nopush = 1; port->protocol.package_length_type = "N"; port->protocol.package_length_size = 4; port->protocol.package_body_offset = 4; port->protocol.package_max_length = SW_BUFFER_INPUT_SIZE; port->socket_buffer_size = SwooleG.socket_buffer_size; char eof[] = SW_DATA_EOF; port->protocol.package_eof_len = sizeof(SW_DATA_EOF) - 1; memcpy(port->protocol.package_eof, eof, port->protocol.package_eof_len); }
c:有符號、1字節
C:無符號、1字節
s :有符號、主機字節序、2字節
S:無符號、主機字節序、2字節
n:無符號、網絡字節序、2字節
N:無符號、網絡字節序、4字節
l:有符號、主機字節序、4字節(小寫L)
L:無符號、主機字節序、4字節(大寫L)
v:無符號、小端字節序、2字節
V:無符號、小端字節序、4字節
swSocket_create 創建 socketswSocket_create 函數會根據 type 的類型來調用 socket 系統調用
int swSocket_create(int type) { int _domain; int _type; switch (type) { case SW_SOCK_TCP: _domain = PF_INET; _type = SOCK_STREAM; break; case SW_SOCK_TCP6: _domain = PF_INET6; _type = SOCK_STREAM; break; case SW_SOCK_UDP: _domain = PF_INET; _type = SOCK_DGRAM; break; case SW_SOCK_UDP6: _domain = PF_INET6; _type = SOCK_DGRAM; break; case SW_SOCK_UNIX_DGRAM: _domain = PF_UNIX; _type = SOCK_DGRAM; break; case SW_SOCK_UNIX_STREAM: _domain = PF_UNIX; _type = SOCK_STREAM; break; default: swWarn("unknown socket type [%d]", type); return SW_ERR; } return socket(_domain, _type, 0); }swSocket_bind 綁定端口
SO_REUSEADDR 允許啟動一個監聽服務器并捆綁眾所周知端口,即使以前建立的該端口用作它們的本地端口的連接仍存在。
如果不對TCP的套接字選項進行任何限制時,如果啟動兩個進程,第二個進程就會在調用bind函數的時候出錯(Address already in use)。
如果在調用bind之前我們設置了SO_REUSEADDR,但是不在第二個進程啟動前close這個套接字,那么第二個進程仍然會在調用bind函數的時候出錯(Address already in use)。
如果在調用bind之前我們設置了SO_REUSEADDR,并接收了一個客戶端連接,并且在第二個進程啟動前關閉了bind的套接字,這個時候第一個進程只擁有一個套接字(與客戶端的連接),那么第二個進程則可以bind成功,符合預期。
相對 SO_REUSEADDR 來說,SO_REUSEPORT 沒有那么多的限制條件,允許兩個毫無血緣關系的進程使用相同的 IP 地址同時監聽同一個端口,并且不會出現驚群效應
對于 UNIX SOCKET,需要設置 sun_family 與 sun_path
對于 IPV4,需要設置 sin_family、sin_port、sin_addr;對于 IPV6,需要設置 sin6_family、sin6_port、sin6_addr,然后調用 bind 函數;
如果 port 為0,說明服務器綁定的是任意端口,bind 函數會將系統所選擇的端口返回給 sockaddr 對象
int swSocket_bind(int sock, int type, char *host, int *port) { int ret; struct sockaddr_in addr_in4; struct sockaddr_in6 addr_in6; struct sockaddr_un addr_un; socklen_t len; //SO_REUSEADDR option int option = 1; if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(int)) < 0) { swoole_error_log(SW_LOG_WARNING, SW_ERROR_SYSTEM_CALL_FAIL, "setsockopt(%d, SO_REUSEADDR) failed.", sock); } //reuse port #ifdef HAVE_REUSEPORT if (SwooleG.reuse_port) { if (setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &option, sizeof(int)) < 0) { swSysError("setsockopt(SO_REUSEPORT) failed."); SwooleG.reuse_port = 0; } } #endif //unix socket if (type == SW_SOCK_UNIX_DGRAM || type == SW_SOCK_UNIX_STREAM) { bzero(&addr_un, sizeof(addr_un)); unlink(host); addr_un.sun_family = AF_UNIX; strncpy(addr_un.sun_path, host, sizeof(addr_un.sun_path) - 1); ret = bind(sock, (struct sockaddr*) &addr_un, sizeof(addr_un)); } //IPv6 else if (type > SW_SOCK_UDP) { bzero(&addr_in6, sizeof(addr_in6)); inet_pton(AF_INET6, host, &(addr_in6.sin6_addr)); addr_in6.sin6_port = htons(*port); addr_in6.sin6_family = AF_INET6; ret = bind(sock, (struct sockaddr *) &addr_in6, sizeof(addr_in6)); if (ret == 0 && *port == 0) { len = sizeof(addr_in6); if (getsockname(sock, (struct sockaddr *) &addr_in6, &len) != -1) { *port = ntohs(addr_in6.sin6_port); } } } //IPv4 else { bzero(&addr_in4, sizeof(addr_in4)); inet_pton(AF_INET, host, &(addr_in4.sin_addr)); addr_in4.sin_port = htons(*port); addr_in4.sin_family = AF_INET; ret = bind(sock, (struct sockaddr *) &addr_in4, sizeof(addr_in4)); if (ret == 0 && *port == 0) { len = sizeof(addr_in4); if (getsockname(sock, (struct sockaddr *) &addr_in4, &len) != -1) { *port = ntohs(addr_in4.sin_port); } } } //bind failed if (ret < 0) { swoole_error_log(SW_LOG_WARNING, SW_ERROR_SYSTEM_CALL_FAIL, "bind(%s:%d) failed. Error: %s [%d]", host, *port, strerror(errno), errno); return SW_ERR; } return ret; }swoole_fcntl_set_option 函數為文件描述符設置選項
此函數主要是利用 fcntl 函數為文件描述符設置阻塞/非阻塞、CLOEXEC 等屬性。
void swoole_fcntl_set_option(int sock, int nonblock, int cloexec) { int opts, ret; if (nonblock >= 0) { do { opts = fcntl(sock, F_GETFL); } while (opts < 0 && errno == EINTR); if (opts < 0) { swSysError("fcntl(%d, GETFL) failed.", sock); } if (nonblock) { opts = opts | O_NONBLOCK; } else { opts = opts & ~O_NONBLOCK; } do { ret = fcntl(sock, F_SETFL, opts); } while (ret < 0 && errno == EINTR); if (ret < 0) { swSysError("fcntl(%d, SETFL, opts) failed.", sock); } } #ifdef FD_CLOEXEC if (cloexec >= 0) { do { opts = fcntl(sock, F_GETFD); } while (opts < 0 && errno == EINTR); if (opts < 0) { swSysError("fcntl(%d, GETFL) failed.", sock); } if (cloexec) { opts = opts | FD_CLOEXEC; } else { opts = opts & ~FD_CLOEXEC; } do { ret = fcntl(sock, F_SETFD, opts); } while (ret < 0 && errno == EINTR); if (ret < 0) { swSysError("fcntl(%d, SETFD, opts) failed.", sock); } } #endif }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/29216.html
摘要:消息隊列的接受消息隊列的接受是利用函數,其中是消息的類型,該參數會取出指定類型的消息,如果設定的是爭搶模式,該值會統一為,否則該值就是消息發送目的的。環形隊列的消息入隊發送消息首先要確定環形隊列的隊尾。取模操作可以優化 前言 swoole 的底層隊列有兩種:進程間通信 IPC 的消息隊列 swMsgQueue,與環形隊列 swRingQueue。IPC 的消息隊列用于 task_wor...
摘要:當此時的套接字不可寫的時候,會自動放入緩沖區中。當大于高水線時,會自動調用回調函數。寫就緒狀態當監控到套接字進入了寫就緒狀態時,就會調用函數。如果為,說明此時異步客戶端雖然建立了連接,但是還沒有調用回調函數,因此這時要調用函數。 前言 上一章我們說了客戶端的連接 connect,對于同步客戶端來說,連接已經建立成功;但是對于異步客戶端來說,此時可能還在進行 DNS 的解析,on...
摘要:新建可以看到,自動采用包長檢測的方法該函數主要功能是設置各種回調函數值得注意的是第三個參數代表是否異步。發送數據函數并不是直接發送數據,而是將數據存儲在,等著寫事件就緒之后調用發送數據。 swReactorThread_dispatch 發送數據 reactor 線程會通過 swReactorThread_dispatch 發送數據,當采用 stream 發送數據的時候,會調用 sw...
摘要:是緩存區高水位線,達到了說明緩沖區即將滿了創建線程函數用于將監控的存放于中向中添加監聽的文件描述符等待所有的線程開啟事件循環利用創建線程,線程啟動函數是保存監聽本函數將用于監聽的存放到當中,并設置相應的屬性 Server 的啟動 在 server 啟動之前,swoole 首先要調用 php_swoole_register_callback 將 PHP 的回調函數注冊到 server...
摘要:的數據結構數據結構中是鏈表元素的個數,是緩沖區創建時,鏈表元素約定的大小實際大小不一定是這個值,是實際上緩沖區占用的內存總大小。中的有三種,分別應用于緩存數據發送文件提醒連接關閉三種情景。指的是元素的內存大小。 前言 swoole 中數據的接受與發送(例如 reactor 線程接受客戶端消息、發送給客戶端的消息、接受到的來自 worker 的消息、要發送給 worker 的消息等等)都...
閱讀 1537·2021-11-04 16:10
閱讀 2774·2021-09-30 09:48
閱讀 2838·2019-08-29 11:31
閱讀 1577·2019-08-28 18:22
閱讀 3225·2019-08-26 13:44
閱讀 1319·2019-08-26 13:42
閱讀 2844·2019-08-26 10:20
閱讀 754·2019-08-23 17:00