国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

跟著大彬讀源碼 - Redis 4 - 服務器的事件驅動有什么含義?

姘存按 / 2785人閱讀

摘要:服務器會為執行不同任務的套接字關聯不同的事件處理器。當服務器套接字變得可寫時,套接字會產生事件。多路復用程序允許服務器同時監聽套接字的事件和事件。

眾所周知,Redis 服務器是一個事件驅動程序。那么事件驅動對于 Redis 而言有什么含義?源碼中又是如何實現事件驅動的呢?今天,我們一起來認識下 Redis 服務器的事件驅動。

對于 Redis 而言,服務器需要處理以下兩類事件:

文件事件(file event):Redis 服務器通過套接字與客戶端進行連接,而文件事件就是服務器對套接字操作的抽象。服務器與客戶端的通信會產生相應的文件事件,而服務器則通過監聽并處理這些事件來完成一系列的網絡通信操作。

時間時間(time event):Redis 服務器中的一些操作(比如 serverCron 函數)需要在給定的時間點執行,而時間事件就是服務器對這類定時操作的抽象。

接下來,我們先來認識下文件事件。

1 文件事件

Redis 基于 Reactor 模式開發了自己的網絡事件處理器,這個處理器被稱為文件事件處理器(file event handler)

文件事件處理器使用 IO 多路復用程序來同時監聽多個套接字,并根據套接字目前執行的任務來為套接字關聯不同的事件處理器。

當被監聽的套接字準備好執行連接應答(accept)、讀取(read)、寫入(write)、關閉(close)等操作時,與操作相對應的文件事件就會產生,這時文件事件處理器就會調用套接字之前關聯好的事件處理器來處理這些事件。

雖然文件處理器以單線程方式運行,但通過 IO 多路復用程序監聽多個套接字,既實現了高性能的網絡通信模型,又可以很好的與 Redis 服務器中其它同樣以單線程運行的模塊進行對接,保持了 Redis 內部單線程設計的簡潔。

1.1 文件事件處理器的構成

圖 1 展示了文件事件處理器的四個組成部分:

套接字;

IO 多路復用程序;

文件事件分派器(dispatcher);

事件處理器;

文件事件是對套接字的抽象。每當一個套接字準備好執行連接應答(accept)、寫入、讀取、關閉等操作時,就好產生一個文件事件。因為一個服務器通常會連接多個套接字,所以多個文件事件有可能會并發的出現。

而 IO 多了復用程序負責監聽多個套接字,并向文件事件分派器分發那些產生事件的套接字。

盡管多個文件事件可能會并發的出現,但 IO 多路復用程序總是會將所有產生事件的套接字都放到一個隊列里面,然后通過這個隊列,以有序、同步的方式,把每一個套接字傳輸給文件事件分派器。當上一個套接字產生的事件被處理完畢之后(即,該套接字為事件所關聯的事件處理器執行完畢),IO 多路復用程序才會繼續向文件事件分派器傳送下一個套接字。如圖 2 所示:

文件事件分派器接收 IO 多路復用程序傳來的套接字,并根據套接字產生的事件類型,調用相應的事件處理器。

服務器會為執行不同任務的套接字關聯不同的事件處理器。這些處理器本質上就是一個個函數。它們定義了某個事件發生時,服務器應該執行的動作。

1.2 IO 多路復用程序的實現

Redis 的 IO 多路復用程序的所有功能都是通過包裝常見的 select、epoll、evport 和 kqueue 這些 IO 多路復用函數庫來實現的。每個 IO 多路復用函數庫在 Redis 源碼中都對應一個多帶帶的文件,比如 ae_select.c、ae_poll.c、ae_kqueue.c 等。

由于 Redis 為每個 IO 多路復用函數庫都實現了相同的 API,所以 IO 多路復用程序的底層實現是可以互換的,如圖 3 所示:

Redis 在 IO 多路復用程序的實現源碼中用 #include 宏定義了相應的規則,**程序會在編譯時自動選擇系統中性能最高的 IO 多路復用函數庫來作為 Redis 的 IO 多路復用程序的底層實現,這保證了 Redis 在各個平臺的兼容性和高性能。對應源碼如下:

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif
1.3 事件的類型

IO 多路復用程序可以監聽多個套接字的 ae.h/AE_READABLEae.h/AE_WRITABLE 事件,這兩類事件和套接字操作之間有以下對應關系:

當服務器套接字變得可讀時,套接字會產生 AE_READABLE 事件。此處的套接字可讀,是指客戶端對套接字執行 write、close 操作,或者有新的可應答(acceptable)套接字出現時(客戶端對服務器的監聽套接字執行 connect 操作),套接字會產生 AE_READABLE 事件。

當服務器套接字變得可寫時,套接字會產生 AE_WRITABLE 事件。

IO 多路復用程序允許服務器同時監聽套接字的 AR_READABLE 事件和 AE_WRITABLE 事件。如果一個套接字同時產生了兩個事件,那么文件分派器會優先處理 AE_READABLE 事件,然后再處理 AE_WRITABLE 事件。簡單來說,如果一個套接字既可讀又可寫,那么服務器將先讀套接字,后寫套接字。

1.4 文件事件處理器

Redis 為文件事件編寫了多個處理器,這些事件處理器分別用于實現不同的網絡通信需求。比如說:

為了對連接服務器的各個客戶端進行應答,服務器要為監聽套接字關聯連接應答處理器

為了接收客戶端傳了的命令請求,服務器要為客戶端套接字關聯命令請求處理器

為了向客戶端返回命令執行結果,服務器要為客戶端套接字關聯命令回復處理器

當主服務器和從服務器進行復制操作時,主從服務器都需要關聯復制處理器

在這些事件處理器中,服務器最常用的是與客戶端進行通信的連接應答處理器、命令請求處理器和命令回復處理器

1)連接應答處理器

networking.c/acceptTcpHandle 函數是 Redis 的連接應答處理器,這個處理器用于對連接服務器監聽套接字的客戶端進行應答,具體實現為 sys/socket.h/accept 函數的包裝。

當 Redis 服務器進行初始化的時候,程序會將這個連接應答處理器和服務器監聽套接字的 AE_READABLE 事件關聯。對應源碼如下

# server.c/initServer
...
/* Create an event handler for accepting new connections in TCP and Unix
 * domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
        acceptTcpHandler,NULL) == AE_ERR)
        {
            serverPanic(
                "Unrecoverable error creating server.ipfd file event.");
        }
}
...

當有客戶端用 sys/scoket.h/connect 函數連接服務器監聽套接字時,套接字就會產生 AE_READABLE 事件,引發連接應答處理器執行,并執行相應的套接字應答操作。如圖 4 所示:

2)命令請求處理器
networking.c/readQueryFromClient 函數是 Redis 的命令請求處理器,這個處理器負責從套接字中讀入客戶端發送的命令請求內容,具體實現為 unistd.h/read 函數的包裝。

當一個客戶端通過連接應答處理器成功連接到服務器之后,服務器會將客戶端套接字的 AE_READABLE 事件和命令請求處理器關聯起來(networking.c/acceptCommonHandler 函數)。

當客戶端向服務器發送命令請求的時候,套接字就會產生 AR_READABLE 事件,引發命令請求處理器執行,并執行相應的套接字讀入操作,如圖 5 所示:

在客戶端連接服務器的整個過程中,服務器都會一直為客戶端套接字的 AE_READABLE 事件關聯命令請求處理器。

3)命令回復處理器
networking.c/sendReplToClient 函數是 Redis 的命令回復處理器,這個處理器負責將服務器執行命令后得到的命令回復通過套接字返回給客戶端。

當服務器有命令回復需要發給客戶端時,服務器會將客戶端套接字的 AE_WRITABLE 事件和命令回復處理器關聯(networking.c/handleClientsWithPendingWrites 函數)。

當客戶端準備好接收服務器傳回的命令回復時,就會產生 AE_WRITABLE 事件,引發命令回復處理器執行,并執行相應的套接字寫入操作。如圖 6 所示:

當命令回復發送完畢之后,服務器就會解除命令回復處理器與客戶端套接字的 AE_WRITABLE 事件的關聯。對應源碼如下:

# networking.c/writeToClient
...
if (!clientHasPendingReplies(c)) {
    c->sentlen = 0;
    # buffer 緩沖區命令回復已發送,刪除套接字和事件的關聯
    if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);

    /* Close connection after entire reply has been sent. */
    if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
        freeClient(c);
        return C_ERR;
    }
}
...
1.5 客戶端與服務器連接事件

之前我們通過 debug 的形式大致認識了客戶端與服務器的連接過程。現在,我們站在文件事件的角度,再一次來追蹤 Redis 客戶端與服務器進行連接并發送命令的整個過程,看看在過程中會產生什么事件,這些事件又是如何被處理的。

先來看客戶端與服務器建立連接的過程:

先啟動我們的 Redis 服務器(127.0.0.1-8379)。成功啟動后,服務器套接字(127.0.0.1-8379) AE_READABLE 事件正處于被監聽狀態,而該事件對應連接應答處理器。(server.c/initServer())。

使用 redis-cli 連接服務器。這是,服務器套接字(127.0.0.1-8379)將產生 AR_READABLE 事件,觸發連接應答處理器執行(networking.c/acceptTcpHandler())。

對客戶端的連接請求進行應答,創建客戶端套接字,保存客戶端狀態信息,并將客戶端套接字的 AE_READABLE 事件與命令請求處理器(networking.c/acceptCommonHandler())進行關聯,使得服務器可以接收該客戶端發來的命令請求。

此時,客戶端已成功與服務器建立連接了。上述過程,我們仍然可以用 gdb 調試,查看函數的執行過程。具體調試過程如下:

gdb ./src/redis-server
(gdb) b acceptCommonHandler    # 給 acceptCommonHandler 函數設置斷點
(gdb) r redis-conf --port 8379 # 啟動服務器

另外開一個窗口,使用 redis-cli 連接服務器:redis-cli -p 8379

回到服務器窗口,我們會看到已進入 gdb 調試模式,輸入:info stack,可以看到如圖 6 所示的堆棧信息。

現在,我們再來認識命令的執行過程:

客戶端向服務器發送一個命令請求,客戶端套接字產生 AE_READABLE 事件,引發命令請求處理器(readQueryFromClient)執行,讀取客戶端的命令內容;

根據客戶端發送命令內容,格式化客戶端 argc、argv 等相關值屬性值;

根據命令名稱查找對應函數。server.c/processCommad()lookupCommand 函數調用;

執行與命令名關聯的函數,獲得返回結果,客戶端套接字產生 。server.c/processCommad()call 函數調用。

返回命令回復,刪除客戶端套接字與 AE_WRITABLE 事件的關聯。network.c/writeToClient() 函數。

圖 7 展示了命令執行過程的堆棧信息。圖 8 則展示了命令回復過程的堆棧信息。

上一節我們一起認識了文件事件。接下來,讓我們再來認識下時間事件。

2 時間事件

Redis 的時間時間分為以下兩類:

定時時間:讓一段程序在指定的時間之后執行一次。比如,讓程序 M 在當前時間的 60 毫秒后執行一次。

周期性事件:讓一段程序每隔指定時間就執行一次。比如,讓程序 N 每隔 30 毫秒執行一次。

對于時間事件,數據結構源碼(ae.h/aeTimeEvent):

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;

主要屬性說明:

id:服務器為時間事件創建的全局唯一 ID。ID 號按從小到大的順序遞增。

when_sec:秒精度的 UNIX 時間戳,記錄了時間事件的到達時間。

when_ms:毫秒精度的 UNIX 時間戳,記錄了時間事件的到達時間。

timeProc:時間事件處理器,對應一個函數。當時間事件發生時,服務器就會調用相應的處理器來處理事件。

時間事件進程執行的函數為 ae.c/processTimeEvents()

此外,對于時間事件的類型區分,取決于時間事件處理器的返回值:

返回值是 ae.h/AE_NOMORE,為定時事件。該事件在到達一次后就會被刪除;

返回值不是 ae.h/AE_NOMORE,為周期事件。當一個周期時間事件到達后,服務器會根據事件處理器返回的值,對時間事件的 when_sec 和 when_ms 屬性進行更新,讓這個事件在一段時間之后再次到達,并以這種方式一致更新運行。比如,如果一個時間事件處理器返回 30,那么服務器應該對這個時間事件進行更新,讓這個事件在 30 毫秒后再次執行。

2.1 時間事件之 serverCron 函數

持續運行的 Redis 服務器需要定期對自身的資源和狀態進行檢查和調整,從而確保服務可以長期、穩定的運行。這些定期操作由 server.c/serverCron() 函數負責執行。主要操作包括:

更新服務器的各類統計信息。比如時間、內存占用、數據庫占用情況等。

清理數據庫中的過期鍵值對。

關閉和清理連接失效的客戶端。

嘗試進行 AOF 或 RDB 持久化操作。

如果服務器是主服務器,對從 服務器進行定期同步。

如果處于集群模式,對集群進行定期同步和連接測試。

Redis 服務器以周期性事件的方式來運行 serverCron 函數,在服務器運行期間,每隔一段時間,serverCron 就會執行一次,直到服務器關閉為止。

關于執行次數,可參見 redis.conf 文件中的 hz 選項。默認為 10,表示每秒運行 10 次。

3 事件調度與執行

由于服務器同時存在文件事件和時間事件,所以服務器必須對這兩種事件進行調度,來決定何時處理文件事件,何時處理時間事件,以及花多少時間來處理它們等等。

事件的調度和執行有 ae.c/aeProcessEvents() 函數負責。源碼如下:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* 首先判斷是否存在需要監聽的文件事件,如果存在需要監聽的文件事件,那么通過IO多路復用程序獲取
     * 準備就緒的文件事件,至于IO多路復用程序是否等待以及等待多久的時間,依發生時間距離現在最近的時間事件確定;
     * 如果eventLoop->maxfd == -1表示沒有需要監聽的文件事件,但是時間事件肯定是存在的(serverCron()),
     * 如果此時沒有設置 AE_DONT_WAIT 標志位,此時調用IO多路復用,其目的不是為了監聽文件事件是否準備就緒,
     * 而是為了使線程休眠到發生時間距離現在最近的時間事件的發生時間(作用類似于unix中的sleep函數),
     * 這種休眠操作的目的是為了避免線程一直不停的遍歷時間事件形成的無序鏈表,造成不必要的資源浪費 */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        /* 尋找發生時間距離現在最近的時間事件,該時間事件的發生時間與當前時間之差就是IO多路復用程序應該等待的時間 */
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            // 創建 timeval 結構
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            /* 如果時間之差大于0,說明時間事件到時時間未到,則等待對應的時間;
             * 如果時間間隔小于0,說明時間事件已經到時,此時如果沒有
             * 文件事件準備就緒,那么IO多路復用程序應該立即返回,以免
             * 耽誤處理時間事件*/
            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        // 阻塞并等等文件事件產生,最大阻塞事件由 timeval 結構決定
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            // 處理所有已產生的文件事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */
            int invert = fe->mask & AE_BARRIER;

            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        // 處理所有已到達的時間事件
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

aeProcessEvents 函數置于一個循環里面,加上初始化和清理函數,就構成了 Redis 服務器的主函數 server.c/main()。以下是主函數的偽代碼:

def main():
    // 初始化服務器
    init_server();
    // 一直處理事件,直到服務器關閉為止
    while server_is_not_shutdown():
        aeProcessEvents();
    // 服務器關閉,執行清理操作
    clear_server()

從事件處理的角度來看,Redis 服務器的運行流程可以用流程圖 1 來概括:

以下是事件的調度和執行規則:

aeApiPoll 函數的最大阻塞事件由到達時間最接近當前時間的時間事件決定。這個方法既可以避免服務器對時間事件進行頻繁的輪詢,也可以確保 aeApiPoll 函數不會阻塞過長時間。

因為文件事件是隨機出現的,如果等待并處理完一次文件事件之后,仍未有任何時間事件到達,那么服務器將再次等待并處理文件事件。隨著文件事件的不斷執行,時間會逐漸向時間事件所設置的到達時間逼近,并最終來到,這時服務器就可以開始處理到達的時間事件了。

對文件事件和時間事件的處理都是同步、有序、原子地執行。服務器不會中途中斷事件處理,也不會對事件進行搶占。因此,不管是文件事件的處理器,還是時間事件的處理器,它們斗毆盡可能的減少程序的阻塞事件,并在有需要時主動讓出執行權,從而降低事件饑餓的可能性。舉個栗子,在命令回復處理器將一個命令回復寫入到客戶端套接字時,如果寫入字節數超過了一個預設常量,命令回復處理器就會主動用 break 跳出寫入循環,將余下的數據留到下次再寫。另外,時間事件也會將非常耗時的持久化操作放到子線程或者子進程中執行。

因為時間事件在文件事件之后執行,并且事件之間不會出現搶占,所以時間事件的實際處理時間,通常會比時間事件設定的時間稍晚一些。

總結

Redis 服務器是一個事件驅動程序,服務器處理的事件分為時間事件文件事件兩類。

文件事件是對套接字操作的抽象。**每次套接字變得可應答(acceptable)、可寫(writable)或者可讀(readable)時,相應的文件事件就會產生。

文件事件分為 AE_READABLE 事件(讀事件)和 AE_WRITABLE 事件(寫事件)兩類。

時間事件分為定時事件和周期事件。定時事件只在指定時間執行一次,而周期事件則每隔指定時間執行一次。

服務器一般情況下只執行 serverCron 函數這一個周期性時間事件。

時間事件和文件事件之間是合作關系。服務器會輪流處理這兩種事件,并且處理事件的過程中不會進行搶占。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/62141.html

相關文章

  • 跟著彬讀源碼 - Redis 1 - 啟動服務,程序都干了什么

    摘要:此時服務器處于休眠狀態,并使用進行事件輪詢,等待監聽事件的發生。繼續執行被調試程序,直至下一個斷點或程序結束縮寫。服務啟動包括初始化基礎配置數據結構對外提供服務的準備工作還原數據庫執行事件循環等。 一直很羨慕那些能讀 Redis 源碼的童鞋,也一直想自己解讀一遍,但迫于 C 大魔王的壓力,解讀日期遙遙無期。 相信很多小伙伴應該也都對或曾對源碼感興趣,但一來覺得自己不會 C 語言,二來也...

    sewerganger 評論0 收藏0
  • 跟著彬讀源碼 - Redis 5 - 對象和數據類型(上)

    摘要:對象源碼結構如下對象類型對象編碼引用統計指向底層實現數據結構的指針字段對象類型,就是我們常說的。。對象編碼對應跳躍表壓縮列表集合動態字符串等八種底層數據結構。 相信很多人應該都知道 Redis 有五種數據類型:字符串、列表、哈希、集合和有序集合。但這五種數據類型是什么含義?Redis 的數據又是怎樣存儲的?今天我們一起來認識下 Redis 這五種數據結構的含義及其底層實現。 首先要明確...

    antz 評論0 收藏0

發表評論

0條評論

姘存按

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<