摘要:在中,用戶主要通過配置文件的塊來控制和調(diào)節(jié)事件模塊的參數(shù)。中,事件會使用結(jié)構(gòu)體來表示。初始化定時器,該定時器就是一顆紅黑樹,根據(jù)時間對事件進(jìn)行排序。
運(yùn)營研發(fā)團(tuán)隊 譚淼
一、nginx模塊介紹高并發(fā)是nginx最大的優(yōu)勢之一,而高并發(fā)的原因就是nginx強(qiáng)大的事件模塊。本文將重點(diǎn)介紹nginx是如果利用Linux系統(tǒng)的epoll來完成高并發(fā)的。
首先介紹nginx的模塊,nginx1.15.5源碼中,自帶的模塊主要分為core模塊、conf模塊、event模塊、http模塊和mail模塊五大類。其中mail模塊比較特殊,本文暫不討論。
查看nginx模塊屬于哪一類也很簡單,對于每一個模塊,都有一個ngx_module_t類型的結(jié)構(gòu)體,該結(jié)構(gòu)體的type字段就是標(biāo)明該模塊是屬于哪一類模塊的。以ngx_http_module為例:
ngx_module_t ngx_http_module = { NGX_MODULE_V1, &ngx_http_module_ctx, /* module context */ ngx_http_commands, /* module directives */ NGX_CORE_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ NULL, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING };
可以ngx_core_module是屬于NGX_CORE_MODULE類型的模塊。
由于本文主要介紹使用epoll來完成nginx的事件驅(qū)動,故主要介紹core模塊的ngx_events_module與event模塊的ngx_event_core_module、ngx_epoll_module。
二、epoll介紹 2.1、epoll原理關(guān)于epoll的實(shí)現(xiàn)原理,本文不會具體介紹,這里只是介紹epoll的工作流程。具體的實(shí)現(xiàn)參考:https://titenwang.github.io/2...
epoll的使用是三個函數(shù):
int epoll_create(int size); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
首先epoll_create函數(shù)會在內(nèi)核中創(chuàng)建一塊獨(dú)立的內(nèi)存存儲一個eventpoll結(jié)構(gòu)體,該結(jié)構(gòu)體包括一顆紅黑樹和一個鏈表,如下圖所示:
然后通過epoll_ctl函數(shù),可以完成兩件事。
(1)將事件添加到紅黑樹中,這樣可以防止重復(fù)添加事件;
(2)將事件與網(wǎng)卡建立回調(diào)關(guān)系,當(dāng)事件發(fā)生時,網(wǎng)卡驅(qū)動會回調(diào)ep_poll_callback函數(shù),將事件添加到epoll_create創(chuàng)建的鏈表中。
最后,通過epoll_wait函數(shù),檢查并返回鏈表中是否有事件。該函數(shù)是阻塞函數(shù),阻塞時間為timeout,當(dāng)雙向鏈表有事件或者超時的時候就會返回鏈表長度(發(fā)生事件的數(shù)量)。
2.2、epoll相關(guān)函數(shù)的參數(shù)(1)epoll_create函數(shù)的參數(shù)size表示該紅黑樹的大致數(shù)量,實(shí)際上很多操作系統(tǒng)沒有使用這個參數(shù)。
(2)epoll_ctl函數(shù)的參數(shù)為epfd,op,fd和event。
(3)epoll_wait函數(shù)的參數(shù)為epfd,events,maxevents和timeout
三、事件模塊的初始化眾所周知,nginx是master/worker框架,在nginx啟動時是一個進(jìn)程,在啟動的過程中master會fork出了多個子進(jìn)程作為worker。master主要是管理worker,本身并不處理請求。而worker負(fù)責(zé)處理請求。因此,事件模塊的初始化也是分成兩部分。一部分發(fā)生在fork出worker前,主要是配置文件解析等操作,另外一部分發(fā)生在fork之后,主要是向epoll中添加監(jiān)聽事件。
3.1 啟動進(jìn)程對事件模塊的初始化啟動進(jìn)程對事件模塊的初始化分為配置文件解析、開始監(jiān)聽端口和ngx_event_core_module模塊的初始化。這三個步驟均在ngx_init_cycle函數(shù)進(jìn)行。
調(diào)用關(guān)系:main() ---> ngx_init_cycle()
下圖是ngx_init_cycle函數(shù)的流程,紅框是本節(jié)將要介紹的三部分內(nèi)容。
3.1.1 配置文件解析啟動進(jìn)程的一個主要工作是解析配置文件。在nginx中,用戶主要通過nginx配置文件nginx.conf的event塊來控制和調(diào)節(jié)事件模塊的參數(shù)。下面是一個event塊配置的示例:
user nobody; worker_processes 1; error_log logs/error.log; pid logs/nginx.pid; ...... events { use epoll; worker_connections 1024; accept_mutex on; } http { ...... }
首先我們先看看nginx是如何解析event塊,并將event塊存儲在什么地方。
在nginx中,解析配置文件的工作是調(diào)用ngx_init_cycle函數(shù)完成的。下圖是該函數(shù)在解析配置文件部分的一個流程:
(1)ngx_init_cycle函數(shù)首先會進(jìn)行一些初始化工作,包括更新時間,創(chuàng)建內(nèi)存池和創(chuàng)建并更新ngx_cycle_t結(jié)構(gòu)體cycle;
(2)調(diào)用各個core模塊的create_conf方法,可以創(chuàng)建cycle的conf_ctx數(shù)組,該階段完成后cycle->conf_ctx如下圖所示:
(3)初始化ngx_conf_t類型的結(jié)構(gòu)體conf,將cycle->conf_ctx結(jié)構(gòu)體賦值給conf的ctx字段
(4)解析配置文件
解析配置文件會調(diào)用ngx_conf_parse函數(shù),該函數(shù)會解析一行命令,當(dāng)遇到塊時會遞歸調(diào)用自身。解析的方法也很簡單,就是讀取一個命令,然后在所有模塊的cmd數(shù)組中尋找該命令,若找到則調(diào)用該命令的cmd->set(),完成參數(shù)的解析。下面介紹event塊的解析。
event命令是在event/ngx_event.c文件中定義的,代碼如下。
static ngx_command_t ngx_events_commands[] = { { ngx_string("events"), NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS, ngx_events_block, 0, 0, NULL }, ngx_null_command };
在從配置文件中讀取到event后,會調(diào)用ngx_events_block函數(shù)。下面是ngx_events_block函數(shù)的主要工作:
解析完配置文件中的event塊后,cycle->conf_ctx如下圖所示:
(5)解析完整個配置文件后,調(diào)用各個core類型模塊的init_conf方法。ngx_event_module的ctx的init_conf方法為ngx_event_init_conf。該方法并沒有實(shí)際的用途,暫不詳述。
3.1.2 監(jiān)聽socket雖然監(jiān)聽socket和事件模塊并沒有太多的關(guān)系,但是為了使得整個流程完整,此處會簡單介紹一下啟動進(jìn)程是如何監(jiān)聽端口的。
該過程首先檢查old_cycle,如果old_cycle中有和cycle中相同的socket,就直接把old_cycle中的fd賦值給cycle。之后會調(diào)用ngx_open_listening_socket函數(shù),監(jiān)聽端口。
下面是ngx_open_listening_sockets函數(shù),該函數(shù)的作用是遍歷所有需要監(jiān)聽的端口,然后調(diào)用socket(),bind()和listen()函數(shù),該函數(shù)會重試5次。
ngx_int_t ngx_open_listening_sockets(ngx_cycle_t *cycle) { ...... /* 重試5次 */ for (tries = 5; tries; tries--) { failed = 0; /* 遍歷需要監(jiān)聽的端口 */ ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { ...... /* ngx_socket函數(shù)就是socket函數(shù) */ s = ngx_socket(ls[i].sockaddr->sa_family, ls[i].type, 0); ...... /* 設(shè)置socket屬性 */ if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const void *) &reuseaddr, sizeof(int)) == -1) { ...... } ...... /* IOCP事件操作 */ if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) { if (ngx_nonblocking(s) == -1) { ...... } } ...... /* 綁定socket和地址 */ if (bind(s, ls[i].sockaddr, ls[i].socklen) == -1) { ...... } ...... /* 開始監(jiān)聽 */ if (listen(s, ls[i].backlog) == -1) { ...... } ls[i].listen = 1; ls[i].fd = s; } ...... /* 兩次重試間隔500ms */ ngx_msleep(500); } ...... return NGX_OK; }3.1.3 ngx_event_core_module模塊的初始化
在ngx_init_cycle函數(shù)監(jiān)聽完端口,并提交新的cycle后,便會調(diào)用ngx_init_modules函數(shù),該方法會遍歷所有模塊并調(diào)用其init_module方法。對于該階段,和事件驅(qū)動模塊有關(guān)系的只有ngx_event_core_module的ngx_event_module_init方法。該方法主要做了下面三個工作:
(1)獲取core模塊配置結(jié)構(gòu)體中的時間精度timer_resolution,用在epoll里更新緩存時間
(2)調(diào)用getrlimit方法,檢查連接數(shù)是否超過系統(tǒng)的資源限制
(3)利用 mmap 分配一塊共享內(nèi)存,存儲負(fù)載均衡鎖(ngx_accept_mutex)、連接計數(shù)器(ngx_connection_counter)
3.2 worker進(jìn)程對事件模塊的初始化啟動進(jìn)程在完成一系列操作后,會fork出master進(jìn)程,并自我關(guān)閉,讓master進(jìn)程繼續(xù)完成初始化工作。master進(jìn)程會在ngx_spawn_process函數(shù)中fork出worker進(jìn)程,并讓worker進(jìn)程調(diào)用ngx_worker_process_cycle函數(shù)。ngx_worker_process_cycle函數(shù)是worker進(jìn)程的主循環(huán)函數(shù),該函數(shù)首先會調(diào)用ngx_worker_process_init函數(shù)完成worker的初始化,然后就會進(jìn)入到一個循環(huán)中,持續(xù)監(jiān)聽處理請求。
事件模塊的初始化就發(fā)生在ngx_worker_process_init函數(shù)中。
其調(diào)用關(guān)系:main() ---> ngx_master_process_cycle() ---> ngx_start_worker_processes() ---> ngx_spawn_process() ---> ngx_worker_process_cycle() ---> ngx_worker_process_init()。
對于ngx_worker_process_init函數(shù),會調(diào)用各個模塊的init_process方法:
static void ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker) { ...... for (i = 0; cycle->modules[i]; i++) { if (cycle->modules[i]->init_process) { if (cycle->modules[i]->init_process(cycle) == NGX_ERROR) { /* fatal */ exit(2); } } } ...... }
在此處,會調(diào)用ngx_event_core_module的ngx_event_process_init函數(shù)。該函數(shù)較為關(guān)鍵,將會重點(diǎn)解析。在介紹ngx_event_process_init函數(shù)前,先介紹兩個終于的結(jié)構(gòu)體,由于這兩個結(jié)構(gòu)體較為復(fù)雜,故只介紹部分字段:
(1)ngx_event_s結(jié)構(gòu)體。nginx中,事件會使用ngx_event_s結(jié)構(gòu)體來表示。
ngx_event_s struct ngx_event_s { /* 通常指向ngx_connection_t結(jié)構(gòu)體 */ void *data; /* 事件可寫 */ unsigned write:1; /* 事件可建立新連接 */ unsigned accept:1; /* 檢測事件是否過期 */ unsigned instance:1; /* 通常將事件加入到epoll中會將該字段置為1 */ unsigned active:1; ...... /* 事件超時 */ unsigned timedout:1; /* 事件是否在定時器中 */ unsigned timer_set:1; ...... /* 事件是否在延遲處理隊列中 */ unsigned posted:1; ...... /* 事件的處理函數(shù) */ ngx_event_handler_pt handler; ...... /* 定時器紅黑樹節(jié)點(diǎn) */ ngx_rbtree_node_t timer; /* 延遲處理隊列節(jié)點(diǎn) */ ngx_queue_t queue; ...... };
(2)ngx_connection_s結(jié)構(gòu)體代表一個nginx連接
struct ngx_connection_s { /* 若該結(jié)構(gòu)體未使用,則指向下一個為使用的ngx_connection_s,若已使用,則指向ngx_http_request_t */ void *data; /* 指向一個讀事件結(jié)構(gòu)體,這個讀事件結(jié)構(gòu)體表示該連接的讀事件 */ ngx_event_t *read; /* 指向一個寫事件結(jié)構(gòu)體,這個寫事件結(jié)構(gòu)體表示該連接的寫事件 */ ngx_event_t *write; /* 連接的套接字 */ ngx_socket_t fd; ...... /* 該連接對應(yīng)的監(jiān)聽端口,表示是由該端口建立的連接 */ ngx_listening_t *listening; ...... };
下面介紹ngx_event_process_init函數(shù)的實(shí)現(xiàn),代碼如下:
/* 此方法在worker進(jìn)程初始化時調(diào)用 */ static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) { ...... /* 打開accept_mutex負(fù)載均衡鎖,用于防止驚群 */ if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) { ngx_use_accept_mutex = 1; ngx_accept_mutex_held = 0; ngx_accept_mutex_delay = ecf->accept_mutex_delay; } else { ngx_use_accept_mutex = 0; } /* 初始化兩個隊列,一個用于存放不能及時處理的建立連接事件,一個用于存儲不能及時處理的讀寫事件 */ ngx_queue_init(&ngx_posted_accept_events); ngx_queue_init(&ngx_posted_events); /* 初始化定時器 */ if (ngx_event_timer_init(cycle->log) == NGX_ERROR) { return NGX_ERROR; } /** * 調(diào)用使用的ngx_epoll_module的ctx的actions的init方法,即ngx_epoll_init函數(shù) * 該函數(shù)主要的作用是調(diào)用epoll_create()和創(chuàng)建用于epoll_wait()返回事件鏈表的event_list **/ for (m = 0; cycle->modules[m]; m++) { ...... if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) { exit(2); } break; } /* 如果在配置中設(shè)置了timer_resolution,則要設(shè)置控制時間精度。通過setitimer方法會設(shè)置一個定時器,每隔timer_resolution的時間會發(fā)送一個SIGALRM信號 */ if (ngx_timer_resolution && !(ngx_event_flags & NGX_USE_TIMER_EVENT)) { ...... sa.sa_handler = ngx_timer_signal_handler; sigemptyset(&sa.sa_mask); if (sigaction(SIGALRM, &sa, NULL) == -1) { ...... } itv.it_interval.tv_sec = ngx_timer_resolution / 1000; ...... if (setitimer(ITIMER_REAL, &itv, NULL) == -1) { ...... } } ...... /* 分配連接池空間 */ cycle->connections = ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log); ...... c = cycle->connections; /* 分配讀事件結(jié)構(gòu)體數(shù)組空間,并初始化讀事件的closed和instance */ cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log); ...... rev = cycle->read_events; for (i = 0; i < cycle->connection_n; i++) { rev[i].closed = 1; rev[i].instance = 1; } /* 分配寫事件結(jié)構(gòu)體數(shù)組空間,并初始化寫事件的closed */ cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log); ...... wev = cycle->write_events; for (i = 0; i < cycle->connection_n; i++) { wev[i].closed = 1; } /* 將序號為i的讀事件結(jié)構(gòu)體和寫事件結(jié)構(gòu)體賦值給序號為i的connections結(jié)構(gòu)體的元素 */ i = cycle->connection_n; next = NULL; do { i--; /* 將connection的data字段設(shè)置為下一個connection */ c[i].data = next; c[i].read = &cycle->read_events[i]; c[i].write = &cycle->write_events[i]; c[i].fd = (ngx_socket_t) -1; next = &c[i]; } while (i); /* 初始化cycle->free_connections */ cycle->free_connections = next; cycle->free_connection_n = cycle->connection_n; /* 為每個監(jiān)聽端口分配連接 */ ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { ...... c = ngx_get_connection(ls[i].fd, cycle->log); ...... rev = c->read; ...... /* 為監(jiān)聽的端口的connection結(jié)構(gòu)體的read事件設(shè)置回調(diào)函數(shù) */ rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept : ngx_event_recvmsg; /* 將監(jiān)聽的connection的read事件添加到事件驅(qū)動模塊(epoll) */ ...... if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ERROR; } } return NGX_OK; }
該方法主要做了下面幾件事:
(1)打開accept_mutex負(fù)載均衡鎖,用于防止驚群。驚群是指當(dāng)多個worker都處于等待事件狀態(tài),如果突然來了一個請求,就會同時喚醒多個worker,但是只有一個worker會處理該請求,這就造成系統(tǒng)資源浪費(fèi)。為了解決這個問題,nginx使用了accept_mutex負(fù)載均衡鎖。各個worker首先會搶鎖,搶到鎖的worker才會監(jiān)聽各個端口。
(2)初始化兩個隊列,一個用于存放不能及時處理的建立連接事件,一個用于存儲不能及時處理的讀寫事件。
(3)初始化定時器,該定時器就是一顆紅黑樹,根據(jù)時間對事件進(jìn)行排序。
(4)調(diào)用使用的ngx_epoll_module的ctx的actions的init方法,即ngx_epoll_init函數(shù)。該函數(shù)較為簡單,主要的作用是調(diào)用epoll_create()和創(chuàng)建用于存儲epoll_wait()返回事件的鏈表event_list。
(5)如果再配置中設(shè)置了timer_resolution,則要設(shè)置控制時間精度,用于控制nginx時間。這部分在第五部分重點(diǎn)講解。
(6)分配連接池空間、讀事件結(jié)構(gòu)體數(shù)組、寫事件結(jié)構(gòu)體數(shù)組。
上文介紹了ngx_connection_s和ngx_event_s結(jié)構(gòu)體,我們了解到每一個ngx_connection_s結(jié)構(gòu)體都有兩個ngx_event_s結(jié)構(gòu)體,一個讀事件,一個寫事件。在這個階段,會向內(nèi)存池中申請三個數(shù)組:cycle->connections、cycle->read_events和cycle->write_events,并將序號為i的讀事件結(jié)構(gòu)體和寫事件結(jié)構(gòu)體賦值給序號為i的connections結(jié)構(gòu)體的元素。并將cycle->free_connections指向第一個未使用的ngx_connections結(jié)構(gòu)體。
(7)為每個監(jiān)聽端口分配連接
在此階段,會獲取cycle->listening數(shù)組中的ngx_listening_s結(jié)構(gòu)體元素。在3.1.2小節(jié)中,我們已經(jīng)講了nginx啟動進(jìn)程會監(jiān)聽端口,并將socket連接的fd存儲在cycle->listening數(shù)組中。在這里,會獲取到3.1.2小節(jié)中監(jiān)聽的端口,并為每個監(jiān)聽分配連接結(jié)構(gòu)體。
(8)為每個監(jiān)聽端口的連接的讀事件設(shè)置handler
在為cycle->listening的元素分配完ngx_connection_s類型的連接后,會為連接的讀事件設(shè)置回調(diào)方法handler。這里handler為ngx_event_accept函數(shù),對于該函數(shù),將在后文講解。
(9)將每個監(jiān)聽端口的連接的讀事件添加到epoll中
在此處,會調(diào)用ngx_epoll_module的ngx_epoll_add_event函數(shù),將監(jiān)聽端口的連接的讀事件(ls[i].connection->read)添加到epoll中。ngx_epoll_add_event函數(shù)的流程如下:
在向epoll中添加事件前,需要判斷之前是否添加過該連接的事件。
至此,ngx_event_process_init的工作完成,事件模塊的初始化也完成了。后面worker開始進(jìn)入循環(huán)監(jiān)聽階段。
四、事件處理 4.1 worker的主循環(huán)函數(shù)ngx_worker_process_cycleworker在初始化完成之后,開始循環(huán)監(jiān)聽端口,并處理請求。下面開始我們開始講解worker是如何處理事件的。worker的循環(huán)代碼如下:
static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data) { ngx_int_t worker = (intptr_t) data; ngx_process = NGX_PROCESS_WORKER; ngx_worker = worker; /* 初始化worker */ ngx_worker_process_init(cycle, worker); ngx_setproctitle("worker process"); for ( ;; ) { if (ngx_exiting) { ...... } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle"); /* 處理IO事件和時間事件 */ ngx_process_events_and_timers(cycle); if (ngx_terminate) { ...... } if (ngx_quit) { ...... } if (ngx_reopen) { ...... } } }
可以看到,在worker初始化后進(jìn)入一個for循環(huán),所有的IO事件和時間事件都是在函數(shù)ngx_process_events_and_timers中處理的。
4.2 worker的事件處理函數(shù)ngx_process_events_and_timers在worker的主循環(huán)中,所有的事件都是通過函數(shù)ngx_process_events_and_timers處理的,該函數(shù)的代碼如下:
/* 事件處理函數(shù)和定時器處理函數(shù) */ void ngx_process_events_and_timers(ngx_cycle_t *cycle) { ngx_uint_t flags; ngx_msec_t timer, delta; /* timer_resolution模式,設(shè)置epoll_wait函數(shù)阻塞ngx_timer_resolution的時間 */ if (ngx_timer_resolution) { /* timer_resolution模式 */ timer = NGX_TIMER_INFINITE; flags = 0; } else { /* 非timer_resolution模式,epoll_wait函數(shù)等待至下一個定時器事件到來時返回 */ timer = ngx_event_find_timer(); flags = NGX_UPDATE_TIME; } /* 是否使用accept_mutex */ if (ngx_use_accept_mutex) { /** * 該worker是否負(fù)載過高,若負(fù)載過高則不搶鎖 * 判斷負(fù)載過高是判斷該worker建立的連接數(shù)是否大于該worker可以建立的最大連接數(shù)的7/8 **/ if (ngx_accept_disabled > 0) { ngx_accept_disabled--; } else { /* 搶鎖 */ if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { return; } if (ngx_accept_mutex_held) { /* 搶到鎖,則收到事件后暫不處理,先扔到事件隊列中 */ flags |= NGX_POST_EVENTS; } else { /* 未搶到鎖,要修改worker在epoll_wait函數(shù)等待的時間,使其不要過大 */ if (timer == NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay) { timer = ngx_accept_mutex_delay; } } } } /* delta用于計算ngx_process_events的耗時 */ delta = ngx_current_msec; /* 事件處理函數(shù),epoll使用的是ngx_epoll_process_events函數(shù) */ (void) ngx_process_events(cycle, timer, flags); delta = ngx_current_msec - delta; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "timer delta: %M", delta); /* 處理ngx_posted_accept_events隊列的連接事件 */ ngx_event_process_posted(cycle, &ngx_posted_accept_events); /* 若持有accept_mutex,則釋放鎖 */ if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); } /* 若事件處理函數(shù)的執(zhí)行時間不為0,則要處理定時器事件 */ if (delta) { ngx_event_expire_timers(); } /* 處理ngx_posted_events隊列的讀寫事件 */ ngx_event_process_posted(cycle, &ngx_posted_events); }
ngx_process_events_and_timers函數(shù)是nginx處理事件的核心函數(shù),主要的工作可以分為下面幾部分:
(1)設(shè)置nginx更新時間的方式。
nginx會將時間存儲在內(nèi)存中,每隔一段時間調(diào)用ngx_time_update函數(shù)更新時間。那么多久更新一次呢?nginx提供兩種方式:
方式一:timer_resolution模式。在nginx配置文件中,可以使用timer_resolution之類來選擇此方式。如果使用此方式,會將epoll_wait的阻塞時間設(shè)置為無窮大,即一直阻塞。那么如果nginx一直都沒有收到事件,會一直阻塞嗎?答案是不會的。在本文3.2節(jié)中講解的ngx_event_process_init函數(shù)(第5步)將會設(shè)置一個時間定時器和一個信號處理函數(shù),其中時間定時器會每隔timer_resolution的時間發(fā)送一個SIGALRM信號,而當(dāng)worker收到時間定時器發(fā)送的信號,會將epoll_wait函數(shù)終端,同時調(diào)用SIGALRM信號的中斷處理函數(shù),將全局變量ngx_event_timer_alarm置為1。后面會檢查該變量,調(diào)用ngx_time_update函數(shù)來更新nginx的時間。
方式二:如果不在配置文件中設(shè)置timer_resolution,nginx默認(rèn)會使用方式二來更新nginx的時間。首先會調(diào)用ngx_event_find_timer函數(shù)來設(shè)置epoll_wait的阻塞時間,ngx_event_find_timer函數(shù)返回的是下一個時間事件發(fā)生的時間與當(dāng)前時間的差值,即讓epoll_wait阻塞到下一個時間事件發(fā)生為止。當(dāng)使用這種模式,每當(dāng)epoll_wait返回,都會調(diào)用ngx_time_update函數(shù)更新時間。
(2)使用負(fù)載均衡鎖ngx_use_accept_mutex。
上文曾經(jīng)提過一個問題,當(dāng)多個worker都處于等待事件狀態(tài),如果突然來了一個請求,就會同時喚醒多個worker,但是只有一個worker會處理該請求,這就造成系統(tǒng)資源浪費(fèi)。nginx如果解決這個問題呢?答案就是使用一個鎖來解決。在監(jiān)聽事件前,各個worker會進(jìn)行一次搶鎖行為,只有搶到鎖的worker才會監(jiān)聽端口,而其他worker值處理已經(jīng)建立連接的事件。
首先函數(shù)會通過ngx_accept_disabled是否大于0來判斷是否過載,過載的worker是不允許搶鎖的。ngx_accept_disabled的計算方式如下。
/** * ngx_cycle->connection_n是每個進(jìn)程最大連接數(shù),也是連接池的總連接數(shù),ngx_cycle->free_connection_n是連接池中未使用的連接數(shù)量。 * 當(dāng)未使用的數(shù)量小于總數(shù)量的1/8時,會使ngx_accept_disabled大于0。這時認(rèn)為該worker過載。 **/ ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;
若ngx_accept_disabled小于0,worker可以搶鎖。這時會通過ngx_trylock_accept_mutex函數(shù)搶鎖。該函數(shù)的流程如下圖所示:
在搶鎖結(jié)束后,若worker搶到鎖,設(shè)置該worker的flag為NGX_POST_EVENTS,表示搶到鎖的這個worker在收到事件后并不會立即調(diào)用事件的處理函數(shù),而是會把事件放到一個隊列里,后期處理。
(3)調(diào)用事件處理函數(shù)ngx_process_events,epoll使用的是ngx_epoll_process_events函數(shù)。此代碼較為重要,下面是代碼:
static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { int events; uint32_t revents; ngx_int_t instance, i; ngx_uint_t level; ngx_err_t err; ngx_event_t *rev, *wev; ngx_queue_t *queue; ngx_connection_t *c; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll timer: %M", timer); /* 調(diào)用epoll_wait,從epoll中獲取發(fā)生的事件 */ events = epoll_wait(ep, event_list, (int) nevents, timer); err = (events == -1) ? ngx_errno : 0; /* 兩種方式更新nginx時間,timer_resolution模式ngx_event_timer_alarm為1,非timer_resolution模式flags & NGX_UPDATE_TIME不為0,均會進(jìn)入if條件 */ if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { ngx_time_update(); } /* 處理epoll_wait返回為-1的情況 */ if (err) { /** * 對于timer_resolution模式,如果worker接收到SIGALRM信號,會調(diào)用該信號的處理函數(shù),將ngx_event_timer_alarm置為1,從而更新時間。 * 同時如果在epoll_wait阻塞的過程中接收到SIGALRM信號,會中斷epoll_wait,使其返回NGX_EINTR。由于上一步已經(jīng)更新了時間,這里要把ngx_event_timer_alarm置為0。 **/ if (err == NGX_EINTR) { if (ngx_event_timer_alarm) { ngx_event_timer_alarm = 0; return NGX_OK; } level = NGX_LOG_INFO; } else { level = NGX_LOG_ALERT; } ngx_log_error(level, cycle->log, err, "epoll_wait() failed"); return NGX_ERROR; } /* 若events返回為0,判斷是因?yàn)閑poll_wait超時還是其他原因 */ if (events == 0) { if (timer != NGX_TIMER_INFINITE) { return NGX_OK; } ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "epoll_wait() returned no events without timeout"); return NGX_ERROR; } /* 對epoll_wait返回的鏈表進(jìn)行遍歷 */ for (i = 0; i < events; i++) { c = event_list[i].data.ptr; /* 從data中獲取connection & instance的值,并解析出instance和connection */ instance = (uintptr_t) c & 1; c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); /* 取出connection的read事件 */ rev = c->read; /* 判斷讀事件是否過期 */ if (c->fd == -1 || rev->instance != instance) { ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } /* 取出事件的類型 */ revents = event_list[i].events; ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: fd:%d ev:%04XD d:%p", c->fd, revents, event_list[i].data.ptr); /* 若連接發(fā)生錯誤,則將EPOLLIN、EPOLLOUT添加到revents中,在調(diào)用讀寫事件時能夠處理連接的錯誤 */ if (revents & (EPOLLERR|EPOLLHUP)) { ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll_wait() error on fd:%d ev:%04XD", c->fd, revents); revents |= EPOLLIN|EPOLLOUT; } /* 事件為讀事件且讀事件在epoll中 */ if ((revents & EPOLLIN) && rev->active) { #if (NGX_HAVE_EPOLLRDHUP) if (revents & EPOLLRDHUP) { rev->pending_eof = 1; } rev->available = 1; #endif rev->ready = 1; /* 事件是否需要延遲處理?對于搶到鎖監(jiān)聽端口的worker,會將事件延遲處理 */ if (flags & NGX_POST_EVENTS) { /* 根據(jù)事件的是否是accept事件,加到不同的隊列中 */ queue = rev->accept ? &ngx_posted_accept_events : &ngx_posted_events; ngx_post_event(rev, queue); } else { /* 若不需要延遲處理,直接調(diào)用read事件的handler */ rev->handler(rev); } } /* 取出connection的write事件 */ wev = c->write; /* 事件為寫事件且寫事件在epoll中 */ if ((revents & EPOLLOUT) && wev->active) { /* 判斷寫事件是否過期 */ if (c->fd == -1 || wev->instance != instance) { ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } wev->ready = 1; #if (NGX_THREADS) wev->complete = 1; #endif /* 事件是否需要延遲處理?對于搶到鎖監(jiān)聽端口的worker,會將事件延遲處理 */ if (flags & NGX_POST_EVENTS) { ngx_post_event(wev, &ngx_posted_events); } else { /* 若不需要延遲處理,直接調(diào)用write事件的handler */ wev->handler(wev); } } } return NGX_OK; }
該函數(shù)的流程圖如下:
(4)計算ngx_process_events函數(shù)的調(diào)用時間。
(5)處理ngx_posted_accept_events隊列的連接事件。這里就是遍歷ngx_posted_accept_events隊列,調(diào)用事件的handler方法,這里accept事件的handler為ngx_event_accept。
(6)釋放負(fù)載均衡鎖。
(7)處理定時器事件,具體操作是在定時器紅黑樹中查找過期的事件,調(diào)用其handler方法。
(8)處理ngx_posted_events隊列的讀寫事件,即遍歷ngx_posted_events隊列,調(diào)用事件的handler方法。
結(jié)束至此,我們介紹完了nginx事件模塊的事件處理函數(shù)ngx_process_events_and_timers。nginx事件模塊的相關(guān)知識也初步介紹完了。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/40152.html
摘要:限流算法最簡單粗暴的限流算法就是計數(shù)器法了,而比較常用的有漏桶算法和令牌桶算法計數(shù)器計數(shù)器法是限流算法里最簡單也是最容易實(shí)現(xiàn)的一種算法。 運(yùn)營研發(fā)團(tuán)隊 李樂 高并發(fā)系統(tǒng)有三把利器:緩存、降級和限流; 限流的目的是通過對并發(fā)訪問/請求進(jìn)行限速來保護(hù)系統(tǒng),一旦達(dá)到限制速率則可以拒絕服務(wù)(定向到錯誤頁)、排隊等待(秒殺)、降級(返回兜底數(shù)據(jù)或默認(rèn)數(shù)據(jù)); 高并發(fā)系統(tǒng)常見的限流有:限制總并發(fā)...
摘要:上圖中,每個紅圈表示一個請求,每一層的請求分別是上一層請求的子請求。換而言之,父請求是依賴于子請求的。特別地,的子請求運(yùn)行時,會阻塞父請求掛起其對應(yīng)的協(xié)程。 張超:又拍云系統(tǒng)開發(fā)高級工程師,負(fù)責(zé)又拍云 CDN 平臺相關(guān)組件的更新及維護(hù)。Github ID: tokers,活躍于 OpenResty 社區(qū)和 Nginx 郵件列表等開源社區(qū),專注于服務(wù)端技術(shù)的研究;曾為 ngx_lua 貢...
摘要:而對于堆內(nèi)存,通常需要程序員進(jìn)行管理。二內(nèi)存池管理說明本部分使用的版本為具體源碼參見文件實(shí)現(xiàn)使用流程內(nèi)存池的使用較為簡單可以分為步,調(diào)用函數(shù)獲取指針。將內(nèi)存塊按照的整數(shù)次冪進(jìn)行劃分最小為最大為。 運(yùn)營研發(fā)團(tuán)隊 施洪寶 一. 概述 應(yīng)用程序的內(nèi)存可以簡單分為堆內(nèi)存,棧內(nèi)存。對于棧內(nèi)存而言,在函數(shù)編譯時,編譯器會插入移動棧當(dāng)前指針位置的代碼,實(shí)現(xiàn)棧空間的自管理。而對于堆內(nèi)存,通常需要程序...
摘要:本文將從源碼從此深入分析配置文件的解析,配置存儲,與配置查找。在學(xué)習(xí)配置文件的解析過程之前,需要先了解一下模塊與指令的一些基本知識。 運(yùn)營研發(fā)團(tuán)隊 李樂 配置文件是nginx的基礎(chǔ),對于學(xué)習(xí)nginx源碼甚至開發(fā)nginx模塊的同學(xué)來說更是必須深究。本文將從源碼從此深入分析nginx配置文件的解析,配置存儲,與配置查找。 看本文之前讀者可以先思考兩個問題: 1.nginx源碼中隨處可以...
摘要:反向代理反向代理反向代理負(fù)載均衡鑒權(quán)限流等邏輯架構(gòu)在邏輯上分為入口層,模塊化的功能處理層,系統(tǒng)調(diào)用層。多個共同監(jiān)聽事件并處理,反向代理會把請求轉(zhuǎn)發(fā)給后端服務(wù)。 一.概述 本文將深入剖析nginx的架構(gòu)。 第一部分介紹nginx現(xiàn)有框架,用典型的4+1視圖闡述,包括邏輯架構(gòu),開發(fā)架構(gòu),運(yùn)行架構(gòu),物理架構(gòu),功能用例,nginx為單機(jī)服務(wù),不考慮物理架構(gòu)。其中功能用例概述nginx功能;邏輯...
閱讀 955·2023-04-25 23:50
閱讀 1954·2021-11-19 09:40
閱讀 598·2019-08-30 13:50
閱讀 2727·2019-08-29 17:11
閱讀 1041·2019-08-29 16:37
閱讀 2986·2019-08-29 12:54
閱讀 2792·2019-08-28 18:17
閱讀 2636·2019-08-26 16:55