摘要:此時的協程實現無法完美的支持語法,其根本原因在于沒有保存棧信息。這是因為調用函數時,底層指令已經將入棧了。協程創建時,底層通過函數實現了棧的創建創建并初始化棧為結構分配空間創建新的執行數據結構從代碼中可以看到結構是直接存儲在棧的底部。
作者:李樂
??本文基于Swoole-4.3.2和PHP-7.1.0版本
??Swoole4為PHP語言提供了強大的CSP協程編程模式,用戶可以通過go函數創建一個協程,以達到并發執行的效果,如下面代碼所示:
??其實在Swoole4之前就實現了多協程編程模式,在協程創建、切換以及結束的時候,相應的操作php棧即可(創建、切換以及回收php棧)。
??此時的協程實現無法完美的支持php語法,其根本原因在于沒有保存c棧信息。(vm內部或者某些擴展提供的API是通過c函數實現的,調用這些函數時如果發生協程切換,c棧該如何處理?)
??Swoole4新增了c棧的管理,在協程創建、切換以及結束的同時會伴隨著c棧的創建、切換以及回收。
??Swoole4協程實現方案如下圖所示:
??其中:
API層是提供給用戶使用的協程相關函數,比如go()函數用于創建協程;Co::yield()使得當前協程讓出CPU;Co::resume()可恢復某個協程執行;
Swoole4協程需要同時管理c棧與php棧,Coroutine用于管理c棧,PHPCoroutine用于管理php棧;其中Coroutine(),yield(),resume()實現了c棧的創建以及換入換出;create_func(),on_yield(),on_resume()實現了php棧的創建以及換入換出;
Swoole4在管理c棧時,用到了 boost.context庫,make_fcontext()和jump_fcontext()函數均使用匯編語言編寫,實現了c棧上下文的創建以及切換;
Swoole4對boost.context進行了簡單封裝,即Context層,Context(),SwapIn()以及SwapOut()
對應c棧的創建以及換入換出。
深入理解C棧??函數是對代碼的封裝,對外暴露的只是一組指定的參數和一個可選的返回值;假設函數P調用函數Q,Q執行后返回函數P,實現該函數調用需要考慮以下三點:
指令跳轉:進入函數Q的時候,程序計數器必須被設置為Q的代碼的起始地址;在返回時,程序計數器需要設置為P中調用Q后面那條指令的地址;
數據傳遞:P能夠向Q提供一個或多個參數,Q能夠向P返回一個值;
內存分配與釋放:Q開始執行時,可能需要為局部變量分配內存空間,而在返回前,又需要釋放這些內存空間;
??大多數語言的函數調用都采用了棧結構實現,函數的調用與返回即對應的是一系列的入棧與出棧操作,我們通常稱之為函數棧幀(stack frame)。示意圖如下:
??上面提到的程序計數器即寄存器%rip,另外還有兩個寄存器需要重點關注:%rbp指向棧幀底部,%rsp指向棧幀頂部。
??下面將通過具體的代碼事例,為讀者講解函數棧幀。c代碼與匯編代碼如下:
int add(int x, int y) { int a, b; a = 10; b = 5; return x+y; } int main() { int sum = add(1,2); }main: pushq %rbp movq %rsp, %rbp subq $16, %rsp movl $2, %esi movl $1, %edi call add movl %eax, -4(%rbp) leave ret add: pushq %rbp movq %rsp, %rbp movl %edi, -20(%rbp) movl %esi, -24(%rbp) movl $10, -4(%rbp) movl $5, -8(%rbp) movl -24(%rbp), %eax movl -20(%rbp), %edx addl %edx, %eax popq %rbp ret??分析匯編代碼:
main函數與add函數入口,首先將寄存器%rbp壓入棧中用于保存其值,其次移動%rbp指向當前棧頂部(此時%rbp,%rsp都指向棧頂,開始新的函數棧幀);
main函數"subq $16, %rsp",是為main函數棧幀預留16個字節的內存空間;
調用add函數時,第一個參數和第二個參數分別保存在寄存器%edi和%esi,返回值保存在寄存器%eax;
call指令用于函數調用,實現了兩個功能:寄存器%rip壓入棧中,跳轉到新的代碼位置;
ret指令用于函數返回,彈出棧頂內容到寄存器%rip,依次實現代碼跳轉;
leave指令等同于兩條指令:movq %rsp,%rbp和popq %rbp,用于釋放main函數棧幀,恢復前一個函數棧幀;
注意add函數棧幀,并沒有為其分配空間,寄存器%rsp和%rbp都指向棧幀底部;根本因為是add函數沒有調用其他函數。
該程序的棧結構示意圖如下:
??問題:觀察上面的匯編代碼,輸入參數分別使用的是寄存器%edi和%esi,返回值使用的是寄存器%eax,輸入輸出參數不應該保存在棧上嗎?寄存器比內存訪問要快的多,現代處理器寄存器數目也比較多,因此傾向于將參數優先保存在寄存器。比如%rdi, %rsi, %rdx, %rcx, %r8d, %r9d 六個寄存器用于存儲函數調用時的前6個參數,那么當輸入參數數目超過6個時,如何處理?這些輸入參數只能存儲在棧上了。
(%rdi等表示64位寄存器,%edi等表示32位寄存器)//add函數需要9個參數 add(1,2,3,4,5,6,7,8,9); //參數7,8,9存儲在棧上 movl $9, 16(%rsp) movl $8, 8(%rsp) movl $7, (%rsp) movl $6, %r9d movl $5, %r8d movl $4, %ecx movl $3, %edx movl $2, %esi movl $1, %ediSwoole C棧管理??通過學習c棧基本知識,我們知道最主要有三個寄存器:%rip程序計數器指向下一條需要執行的指令,%rbp指向函數棧幀底部,%rsp指向函數棧幀頂部。這三個寄存器可以確定一個c棧執行上下文,c棧的管理其實就是這些寄存器的管理。
??第一節我們提到Swoole在管理c棧時,用到了 boost.context庫,其中make_fcontext()和jump_fcontext()函數均使用匯編語言編寫,實現了c棧執行上下文的創建以及切換;函聲明命如下:
fcontext_t make_fcontext(void *sp, size_t size, void (*fn)(intptr_t)); intptr_t jump_fcontext(fcontext_t *ofc, fcontext_t nfc, intptr_t vp, bool preserve_fpu = false);??make_fcontext函數用于創建一個執行上下文,其中參數sp指向內存最高地址處(在堆中分配一塊內存作為該執行上下文的c棧),參數size為棧大小,參數fn是一個函數指針,指向該執行上下文的入口函數;代碼主要邏輯如下:
/*%rdi表示第一個參數sp,指向棧頂*/ movq %rdi, %rax //保證%rax指向的地址按照16字節對齊 andq $-16, %rax //將%rax向低地址處偏移0x48字節 leaq -0x48(%rax), %rax /* %rdx表示第三個參數fn,保存在%rax偏移0x38位置處 */ movq %rdx, 0x38(%rax) stmxcsr (%rax) fnstcw 0x4(%rax) leaq finish(%rip), %rcx movq %rcx, 0x40(%rax) //返回值保存在%rax寄存器 ret??make_fcontext函數創建的執行上下文示意圖如下(可以看到預留了若干字節用于保存上下文信息):
??Swoole協程實現的Context層封裝了上下文的創建,創建上下文函數實現如下:
Context::Context(size_t stack_size, coroutine_func_t fn, void* private_data) : fn_(fn), stack_size_(stack_size), private_data_(private_data) { stack_ = (char*) sw_malloc(stack_size_); void* sp = (void*) ((char*) stack_ + stack_size_); ctx_ = make_fcontext(sp, stack_size_, (void (*)(intptr_t))&context_func); }??可以看到c棧執行上下文是通過sw_malloc函數在堆上分配的一塊內存,默認大小為2M字節;參數sp指向的是內存最高地址處;執行上下文的入口函數為Context::context_func()。
??jump_fcontext函數用于切換c棧上下文:1)函數會將當前上下文(寄存器)保存在當前棧頂(push),同時將%rsp寄存器內容保存在ofc地址;2)函數從nfc地址處恢復%rsp寄存器內容,同時從棧頂恢復上下文信息(pop)。代碼主要邏輯如下:
//-------------------保存當前c棧上下文------------------- pushq %rbp /* save RBP */ pushq %rbx /* save RBX */ pushq %r15 /* save R15 */ pushq %r14 /* save R14 */ pushq %r13 /* save R13 */ pushq %r12 /* save R12 */ leaq -0x8(%rsp), %rsp stmxcsr (%rsp) fnstcw 0x4(%rsp) //%rdi表示第一個參數,即ofc,保存%rsp到ofc地址處 movq %rsp, (%rdi) //-------------------從nfc中恢復上下文------------------- //%rsi表示第二個參數,即nfc,從nfc地址處恢復%rsp movq %rsi, %rsp ldmxcsr (%rsp) fldcw 0x4(%rsp) leaq 0x8(%rsp), %rsp popq %r12 /* restrore R12 */ popq %r13 /* restrore R13 */ popq %r14 /* restrore R14 */ popq %r15 /* restrore R15 */ popq %rbx /* restrore RBX */ popq %rbp /* restrore RBP */ //這里彈出的其實是之前保存的%rip popq %r8 //%rdx表示第三個參數,%rax用于存儲函數返回值; movq %rdx, %rax //%rdi用于存儲第一個參數 movq %rdx, %rdi //跳轉到%r8指向的地址 jmp *%r8??觀察jump_fcontext函數的匯編代碼,可以看到保存上下文與恢復上下文的代碼基本是對稱的。恢復上下文時"popq %r8"用于彈出上一次保存的程序計數器%rip的內容,然而并沒有看到保存寄存器%rip的代碼。這是因為調用jump_fcontext函數時,底層call指令已經將%rip入棧了。
??Swoole協程實現的Context層封裝了上下文的換入換出,可以在上下文swap_ctx_和ctx_之間隨時換入換出,代碼實現如下:
bool Context::SwapIn() { jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true); return true; } bool Context::SwapOut() { jump_fcontext(&ctx_, swap_ctx_, (intptr_t) this, true); return true; }??上下文示意圖如下所示:
Swoole PHP棧管理??php代碼在執行時,同樣存在函數棧幀的分配與回收。php將此抽象為兩個結構,php棧zend_vm_stack,與執行數據(函數棧幀)zend_execute_data。
??php棧結構與c棧結構基本類似,定義如下:
struct _zend_vm_stack { zval *top; zval *end; zend_vm_stack prev; };??其中top字段指向棧頂位置,end字段指向棧底位置;prev指向上一個棧,形成鏈表,當棧空間不夠時,可以進行擴容。php虛擬機申請棧空間時默認大小為256K,Swoole創建棧空間時默認大小為8K。
??執行數據結構體,我們需要重點關注這幾個字段:當前函數編譯后的指令集(opline指向指令集數組中的某一個元素,虛擬機只需要遍歷該數組并執行所有指令即可),函數返回值,以及調用該函數的執行數據;結構定義如下:
struct _zend_execute_data { //當前執行指令 const zend_op *opline; zend_execute_data *call; //函數返回值 zval *return_value; zend_function *func; zval This; /* this + call_info + num_args */ //調用當前函數的棧幀 zend_execute_data *prev_execute_data; //符號表 zend_array *symbol_table; #if ZEND_EX_USE_RUN_TIME_CACHE void **run_time_cache; #endif #if ZEND_EX_USE_LITERALS //常量數組 zval *literals; #endif };??php棧初始化函數為zend_vm_stack_init;當執行用戶函數調用時,虛擬機通過函數zend_vm_stack_push_call_frame在php棧上分配新的執行數據,并執行該函數代碼;函數執行完成后,釋放該執行數據。代碼邏輯如下:
ZEND_API void zend_execute(zend_op_array *op_array, zval *return_value) { //分配新的執行數據 execute_data = zend_vm_stack_push_call_frame(ZEND_CALL_TOP_CODE | ZEND_CALL_HAS_SYMBOL_TABLE, (zend_function*)op_array, 0, zend_get_called_scope(EG(current_execute_data)), zend_get_this_object(EG(current_execute_data))); //設置prev execute_data->prev_execute_data = EG(current_execute_data); //初始化當前執行數據,op_array即為當前函數編譯得到的指令集 i_init_execute_data(execute_data, op_array, return_value); //執行函數代碼 zend_execute_ex(execute_data); //釋放執行數據 zend_vm_stack_free_call_frame(execute_data); }??php棧幀結構示意圖如下:
??Swoole協程實現,需要自己管理php棧,在發生協程創建以及切換時,對應的創建新的php棧,切換php棧,同時保存和恢復php棧上下文信息。這里涉及到一個很重要的結構體php_coro_task:
struct php_coro_task { zval *vm_stack_top; zval *vm_stack_end; zend_vm_stack vm_stack; zend_execute_data *execute_data; };??這里列出了php_coro_task結構體的若干關鍵字段,這些字段用于保存和恢復php上下文信息。
??協程創建時,底層通過函數PHPCoroutine::create_func實現了php棧的創建:
void PHPCoroutine::create_func(void *arg) { //創建并初始化php棧 vm_stack_init(); call = (zend_execute_data *) (EG(vm_stack_top)); //為結構php_coro_task分配空間 task = (php_coro_task *) EG(vm_stack_top); EG(vm_stack_top) = (zval *) ((char *) call + PHP_CORO_TASK_SLOT * sizeof(zval)); //創建新的執行數據結構 call = zend_vm_stack_push_call_frame( ZEND_CALL_TOP_FUNCTION | ZEND_CALL_ALLOCATED, func, argc, fci_cache.called_scope, fci_cache.object ); }??從代碼中可以看到結構php_coro_task是直接存儲在php棧的底部。
??當通過yield函數讓出CPU時,底層會調用函數PHPCoroutine::on_yield切換php棧:
void PHPCoroutine::on_yield(void *arg) { php_coro_task *task = (php_coro_task *) arg; php_coro_task *origin_task = get_origin_task(task); //保存當前php棧上下文信息到php_coro_task結構 save_task(task); //從php_coro_task結構中恢復php棧上下文信息 restore_task(origin_task); }Swoole協程實現??前面我們簡單介紹了Swoole協程的實現方案,以及Swoole對c棧與php棧的管理,接下來將結合前面的知識,系統性的介紹Swoole協程的實現原理。
swoole協程數據模型??話不多說,先看一張圖:
每個協程都需要管理自己的c棧與php棧;
Context封裝了c棧的管理操作;ctx_字段保存的是寄存器%rsp的內容(指向c棧棧頂位置);swap_ctx_字段保存的是將被換出的協程寄存器%rsp內容(即,將被換出的協程的c棧棧頂位置);SwapIn()對應協程換入操作;SwapOut()對應協程換出操作;
參考jump_fcontext實現,協程在換出時,會將寄存器%rip,%rbp等暫存在c棧棧頂;協程在換入時,相應的會從棧頂恢復這些寄存器的內容;
Coroutine管理著協程所有內容;cid字段表示當前協程的ID;task字段指向當前協程的php_coro_task結構,該結構中保存的是當前協程的php棧信息(vm_stack_top,execute_data等);ctx字段指向的是當前協程的Context對象;origin字段指向的是另一個協程Coroutine對象;yield()和resume()對應的是協程的換出換入操作;
注意到php_coro_task結構的co字段指向其對應的協程對象Coroutine;
Coroutine還有一些靜態屬性,靜態屬性的屬于類屬性,所有協程共享的;last_cid字段存儲的是當前最大的協程ID,創建協程時可用于生成協程ID;current字段指向的是當前正在運行的協程Coroutine對象;on_yield和on_resume是兩個函數指針,用于實現php棧的切換操作,實際指向的是方法PHPCoroutine::on_yield和PHPCoroutine::on_resume;
swoole協程實現 協程創建??Swoole創建協程可以使用go()函數,底層實現對應的是PHP_FUNCTION(swoole_coroutine_create),其函數實現如下:
PHP_FUNCTION(swoole_coroutine_create) { …… long cid = PHPCoroutine::create(&fci_cache, fci.param_count, fci.params); } long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv) { …… save_task(get_task()); return Coroutine::create(create_func, (void*) &php_coro_args); } class Coroutine { public: static inline long create(coroutine_func_t fn, void* args = nullptr) { return (new Coroutine(fn, args))->run(); } }注意Coroutine::create函數第一個參數偉create_func,該函數后續用于創建php棧,并開始協程代碼的執行;
可以看到PHPCoroutine::create在調用Coroutine::create創建創建協程之前,保存了當前php棧信息到php_coro_task結構中。
注意主程序的php棧是虛擬機創建的,結構與上面畫的協程php棧不同,主程序的php_coro_task結構并沒有存儲在php棧上,而是一個靜態變量PHPCoroutine::main_task,從get_task方法可以看出,主程序中get_current_task()返回的是null,因此最后獲得的php_coro_task結構是PHPCoroutine::main_task。
class PHPCoroutine { public: static inline php_coro_task* get_task() { php_coro_task *task = (php_coro_task *) Coroutine::get_current_task(); return task ? task : &main_task; } }在Coroutine的構造函數中完成了協程對象Coroutine的創建與初始化,以及Context對象的創建與初始化(創建了c棧);run()函數執行了協程的換入,從而開始協程的運行;
//全局協程map std::unordered_mapCoroutine::coroutines; class Coroutine { protected: Coroutine(coroutine_func_t fn, void *private_data) : ctx(stack_size, fn, private_data) { cid = ++last_cid; coroutines[cid] = this; } inline long run() { long cid = this->cid; origin = current; current = this; ctx.SwapIn(); if (ctx.end) { close(); } return cid; } } 可以看到創建協程對象Coroutine時,通過last_cid來計算當前協程的ID,同時將該協程對象加入到全局map中;代碼ctx(stack_size, fn, private_data)創建并初始化了Context對象;
run()函數將該協程換入執行時,賦值origin為當前協程(主程序中current為null),同時設置current為當前協程對象Coroutine;調用SwapIn()函數完成協程的換入執行;最后如果協程執行完畢,則關閉并釋放該協程對象Coroutine;
初始化Context對象時,可以看到其構造函數Context::Context(size_t stack_size, coroutine_func_t fn, void* private_data),其中參數fn為協程入口函數(PHPCoroutine::create_func),可以看到其賦值給ontext對象的字段fn_,但是在創建c棧上下文時,其傳入的入口函數為context_func;
Context::Context(size_t stack_size, coroutine_func_t fn, void* private_data) : fn_(fn), stack_size_(stack_size), private_data_(private_data) { …… ctx_ = make_fcontext(sp, stack_size_, (void (*)(intptr_t))&context_func); }函數context_func內部其實調用的就是方法PHPCoroutine::create_func;當協程執行結束時,會標記end字段為true,同時將該協程換出;
void Context::context_func(void *arg) { Context *_this = (Context *) arg; _this->fn_(_this->private_data_); _this->end = true; _this->SwapOut(); }??問題:參數arg為什么是Context對象呢,是如何傳遞的呢?這就涉及到jump_fcontext匯編實現,以及jump_fcontext的調用了
jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true); jump_fcontext: movq %rdx, %rdi??調用jump_fcontext函數時,第三個參數傳遞的是this,即當前Context對象;而函數jump_fcontext匯編實現時,將第三個參數的內容拷貝到%rdi寄存器中,當協程換入執行函數context_func時,寄存器%rdi存儲的就是第一個參數,即Context對象。
方法PHPCoroutine::create_func就是創建并初始化php棧,執行協程代碼;這里不做過多介紹。
??問題:Coroutine的靜態屬性on_yield和on_resume時什么時候賦值的?
??在Swoole模塊初始化時,會調用函數swoole_coroutine_util_init(該函數同時聲明了"Co"等短名稱),該函數進一步的調用PHPCoroutine::init()方法,該方法完成了靜態屬性的賦值操作。
void PHPCoroutine::init() { Coroutine::set_on_yield(on_yield); Coroutine::set_on_resume(on_resume); Coroutine::set_on_close(on_close); }協程切換??用戶可以通過Co::yield()和Co::resume()實現協程的讓出和恢復,
Co::yield()的底層實現函數為PHP_METHOD(swoole_coroutine_util, yield),Co::resume()的底層實現函數為PHP_METHOD(swoole_coroutine_util, resume)。本節將為讀者講述協程切換的實現原理。static unordered_mapuser_yield_coros; static PHP_METHOD(swoole_coroutine_util, yield) { Coroutine* co = Coroutine::get_current_safe(); user_yield_coros[co->get_cid()] = co; co->yield(); RETURN_TRUE; } static PHP_METHOD(swoole_coroutine_util, resume) { …… auto coroutine_iterator = user_yield_coros.find(cid); if (coroutine_iterator == user_yield_coros.end()) { swoole_php_fatal_error(E_WARNING, "you can not resume the coroutine which is in IO operation"); RETURN_FALSE; } user_yield_coros.erase(cid); co->resume(); } 調用Co::resume()恢復某個協程之前,該協程必然已經調用Co::yield()讓出CPU;因此在Co::yield()時,會將該協程對象添加到全局map中;Co::resume()時做相應校驗,如果校驗通過則恢復協程,并從map種刪除該協程對象;
co->yield()實現了協程的讓出操作;1)設置協程狀態為SW_CORO_WAITING;2)回調on_yield方法,即PHPCoroutine::on_yield,保存當前協程(task代表協程)的php棧上下文,恢復另一個協程的php棧上下文(origin代表另一個協程對象);3)設置當前協程對象為origin;4)換出該協程;
void Coroutine::yield() { state = SW_CORO_WAITING; if (on_yield) { on_yield(task); } current = origin; ctx.SwapOut(); }co->resume()實現了協程的恢復操作:1)設置協程狀態為SW_CORO_RUNNING;2)回調on_resume方法,即PHPCoroutine::on_resume,保存當前協程(current協程)的php棧上下文,恢復另一個協程(task代表協程)的php棧上下文;3)設置origin為當前協程對象,current為即將要換入的協程對象;4)換入協程;
void Coroutine::resume() { state = SW_CORO_RUNNING; if (on_resume) { on_resume(task); } origin = current; current = this; ctx.SwapIn(); if (ctx.end) { close(); } }Swoole協程有四種狀態:初始化,運行中,等待運行,運行結束;定義如下:
typedef enum { SW_CORO_INIT = 0, SW_CORO_WAITING, SW_CORO_RUNNING, SW_CORO_END, } sw_coro_state;協程之間可以通過Coroutine對象的origin字段形成一個類似鏈表的結構;Co::yield()換出當前協程時,會換入origin協程;在A協程種調用Co::resume()恢復B協程時,會換出A協程,換入B協程,同時標記A協程為B的origin協程;
??協程切換過程比較簡單,這里不做過多詳述。
協程調度??當我們調用Co::sleep()讓協程休眠時,會換出當前協程;或者調用CoroutineSocket->recv()從socket接收數據,但socket數據還沒有準備好時,會阻塞當前協程,從而使得協程換出。那么問題來了,什么時候再換入執行這個協程呢?
socket讀寫實現
??Swoole的socket讀寫使用的成熟的IO多路復用模型:epoll/kqueue/select/poll等,并且將其封裝在結構體_swReactor中,其定義如下:
struct _swReactor { //超時時間 int32_t timeout_msec; //fd的讀寫事件處理函數 swReactor_handle handle[SW_MAX_FDTYPE]; swReactor_handle write_handle[SW_MAX_FDTYPE]; swReactor_handle error_handle[SW_MAX_FDTYPE]; //fd事件的注冊修改刪除以及wait //函數指針,(以epoll為例)指向的是epoll_ctl、epoll_wait int (*add)(swReactor *, int fd, int fdtype); int (*set)(swReactor *, int fd, int fdtype); int (*del)(swReactor *, int fd); int (*wait)(swReactor *, struct timeval *); void (*free)(swReactor *); //超時回調函數,結束、開始回調函數 void (*onTimeout)(swReactor *); void (*onFinish)(swReactor *); void (*onBegin)(swReactor *); }??在調用函數PHPCoroutine::create創建協程時,會校驗是否已經初始化_swReactor對象,如果沒有則會調用php_swoole_reactor_init函數創建并初始化main_reactor對象;
void php_swoole_reactor_init() { if (SwooleG.main_reactor == NULL) { SwooleG.main_reactor = (swReactor *) sw_malloc(sizeof(swReactor)); if (swReactor_create(SwooleG.main_reactor, SW_REACTOR_MAXEVENTS) < 0) { } …… php_swoole_register_shutdown_function_prepend("swoole_event_wait"); } }??我們以epoll為例,main_reactor各回調函數如下:
reactor->onFinish = swReactor_onFinish; reactor->onTimeout = swReactor_onTimeout; reactor->add = swReactorEpoll_add; reactor->set = swReactorEpoll_set; reactor->del = swReactorEpoll_del; reactor->wait = swReactorEpoll_wait; reactor->free = swReactorEpoll_free;??注意:這里注冊了一個函數swoole_event_wait,在生命周期register_shutdown階段會執行該函數,開始Swoole的事件循環,阻擋了php生命周期的結束。
??類Socket封裝了socket讀寫相關的所有操作以及數據結構,其定義如下:
class Socket { public: swConnection *socket = nullptr; //讀寫函數 ssize_t recv(void *__buf, size_t __n); ssize_t send(const void *__buf, size_t __n); …… private: swReactor *reactor = nullptr; Coroutine *read_co = nullptr; Coroutine *write_co = nullptr; //連接超時時間,接收數據、發送數據超時時間 double connect_timeout = default_connect_timeout; double read_timeout = default_read_timeout; double write_timeout = default_write_timeout; }socket字段類型為swConnection,代表傳輸層連接;
reactor字段指向結構體swReactor對象,用于fd事件的注冊、修改、刪除以及wait;
當調用recv()函數接收數據,阻塞了該協程時,read_co字段指向該協程對象Coroutine;
當調用send()函數接收數據,阻塞了該協程時,write_co字段指向該協程對象Coroutine;
類Socket初始化函數為Socket::init_sock:
void Socket::init_sock(int _fd) { reactor = SwooleG.main_reactor; //設置協程類型fd(SW_FD_CORO_SOCKET)的讀寫事件處理函數 if (!swReactor_handle_isset(reactor, SW_FD_CORO_SOCKET)) { reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_READ, readable_event_callback); reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_WRITE, writable_event_callback); reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_ERROR, error_event_callback); } }??當我們調用CoroutineSocket->recv接收數據時,底層實現如下:
Socket::timeout_setter ts(sock->socket, timeout, SW_TIMEOUT_READ); ssize_t bytes = all ? sock->socket->recv_all(ZSTR_VAL(buf), length) : sock->socket->recv(ZSTR_VAL(buf), length);??類timeout_setter會設置socket的接收數據超時時間read_timeout為timeout。
??函數socket->recv_all會循環讀取數據,直到讀取到指定長度的數據,或者底層返回等待標識阻塞當前協程:
ssize_t Socket::recv_all(void *__buf, size_t __n) { timer_controller timer(&read_timer, read_timeout, this, timer_callback); while (true) { do { retval = swConnection_recv(socket, (char *) __buf + total_bytes, __n - total_bytes, 0); } while (retval < 0 && swConnection_error(errno) == SW_WAIT && timer.start() && wait_event(SW_EVENT_READ)); if (unlikely(retval <= 0)) { break; } total_bytes += retval; if ((size_t) total_bytes == __n) { break; } } }函數首先創建timer_controller對象,設置其超時時間為read_timeout,以及超時回調函數為timer_callback;
while (true)死循環讀取fd數據,當讀取數據量等于__n時,讀取操作結束,break該循環;如果讀取操作swConnection_recv返回值小于0,并且錯誤標識為SW_WAIT,說明需要等待數據到來,此時阻塞當前協程等待數據到來(函數wait_event會換出當前協程),阻塞超時時間為read_timeout(函數timer.start()用于設置超時時間)。
class timer_controller { public: bool start() { if (timeout > 0) { *timer_pp = swTimer_add(&SwooleG.timer, (long) (timeout * 1000), 0, data, callback); } } }函數swTimer_add用于添加一個定時器;Swoole底層定時任務是通過最小堆實現的,堆頂元素的超時時間最近;結構體_swTimer維護著Swoole內部所有的定時任務:
struct _swTimer { swHeap *heap; //最小堆 swHashMap *map; //map,定時器ID作為key //最早的定時任務觸發時間 long _next_msec; //函數指針,指向swReactorTimer_set int (*set)(swTimer *timer, long exec_msec); //函數指針,指向swReactorTimer_free void (*free)(swTimer *timer); };當調用swTimer_add向_swTimer結構中添加定時任務時,需要更新_swTimer中最早的定時任務觸發時間_next_msec,同時更新main_reactor對象的超時時間:
if (timer->_next_msec < 0 || timer->_next_msec > _msec) { timer->set(timer, _msec); timer->_next_msec = _msec; } static int swReactorTimer_set(swTimer *timer, long exec_msec) { SwooleG.main_reactor->timeout_msec = exec_msec; return SW_OK; }函數wait_event負責將當前協程換出,直到注冊的事件發生
bool Socket::wait_event(const enum swEvent_type event, const void **__buf, size_t __n) { if (unlikely(!add_event(event))) { return false; } if (likely(event == SW_EVENT_READ)) { read_co = co; read_co->yield(); read_co = nullptr; } else // if (event == SW_EVENT_WRITE) { write_co = co; write_co->yield(); write_co = nullptr; } }函數add_event用于添加事件,底層調用reactor->add添加fd的監聽事件;
read_co = co或者write_co = co,用于記錄當前哪個協程阻塞在該socket對象上,當該socket對象的讀寫事件被觸發時,可以恢復該協程執行;
函數yield()將該協程換出;
??上面提到,創建協程時,注冊了一個函數swoole_event_wait,在生命周期register_shutdown階段會執行該函數,開始Swoole的事件循環,阻擋了php生命周期的結束。函數swoole_event_wait底層就是調用main_reactor->wait等待fd讀寫事件的產生;我們以epoll為例講述事件循環的邏輯:
static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo) { while (reactor->running > 0) { n = epoll_wait(epoll_fd, events, max_event_num, swReactor_get_timeout_msec(reactor)); if (n == 0) { if (reactor->onTimeout != NULL) { reactor->onTimeout(reactor); } SW_REACTOR_CONTINUE; } for (i = 0; i < n; i++) { if ((events[i].events & EPOLLIN) && !event.socket->removed) { handle = swReactor_getHandle(reactor, SW_EVENT_READ, event.type); ret = handle(reactor, &event); } if ((events[i].events & EPOLLOUT) && !event.socket->removed) { handle = swReactor_getHandle(reactor, SW_EVENT_WRITE, event.type); ret = handle(reactor, &event); } } } }swReactorEpoll_wait是對函數epoll_wait的封裝;當有讀寫事件發生時,執行相應的handle,根據上面的講解我們知道讀寫事件的handle分別為readable_event_callback和writable_event_callback;
int Socket::readable_event_callback(swReactor *reactor, swEvent *event) { Socket *socket = (Socket *) event->socket->object; socket->read_co->resume(); }可以看到函數readable_event_callback只是簡單的恢復read_co協程即可;
當epoll_wait發生超時,最終調用的是函數swReactor_onTimeout,該函數會從Swoole維護的一系列定時任務swTimer中查找已經超時的定時任務,同時執行其callback回調;
while ((tmp = swHeap_top(timer->heap))) { tnode = tmp->data; if (tnode->exec_msec > now_msec || tnode->round == timer->round) { break; } timer->_current_id = tnode->id; if (!tnode->remove) { tnode->callback(timer, tnode); } …… } //該定時任務沒有超時,需要更新需要更新_swTimer中最早的定時任務觸發時間_next_msec long next_msec = tnode->exec_msec - now_msec; if (next_msec <= 0) { next_msec = 1; } //同時更新main_reactor對象的超時時間,實現函數為swReactorTimer_set timer->set(timer, next_msec);該callback回調函數即為上面設置的timer_callback:
void Socket::timer_callback(swTimer *timer, swTimer_node *tnode) { Socket *socket = (Socket *) tnode->data; socket->set_err(ETIMEDOUT); if (likely(tnode == socket->read_timer)) { socket->read_timer = nullptr; socket->read_co->resume(); } else if (tnode == socket->write_timer) { socket->write_timer = nullptr; socket->write_co->resume(); } }同樣的,timer_callback函數只是簡單的恢復read_co或者write_co協程即可
sleep實現
??Co::sleep()的實現函數為PHP_METHOD(swoole_coroutine_util, sleep),該函數通過調用Coroutine::sleep實現了協程休眠的功能:
int Coroutine::sleep(double sec) { Coroutine* co = Coroutine::get_current_safe(); if (swTimer_add(&SwooleG.timer, (long) (sec * 1000), 0, co, sleep_timeout) == NULL) { return -1; } co->yield(); return 0; }??可以看到,與socket讀寫事件超時處理相同,sleep內部實現時通過swTimer_add添加定時任務,同時換出當前協程實現的。該定時任務會導致main_reactor對象的超時時間的改變,即修改了epoll_wait的超時時間。
??sleep的超時處理函數為sleep_timeout,只需要換入該阻塞協程對象即可,實現如下:
static void sleep_timeout(swTimer *timer, swTimer_node *tnode) { ((Coroutine *) tnode->data)->resume(); }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/31412.html
摘要:協程完全有用戶態程序控制,所以也被成為用戶態的線程。目前支持協程的語言有很多,例如等。協程之旅前篇結束,下一篇文章我們將深入分析原生協程部分的實現。 寫在最前 ??Swoole協程經歷了幾個里程碑,我們需要在前進的道路上不斷總結與回顧自己的發展歷程,正所謂溫故而知新,本系列文章將分為協程之旅前、中、后三篇。 前篇主要介紹協程的概念和Swoole幾個版本協程實現的主要方案技術; 中篇主...
摘要:從入門到放棄三一進程子進程創建成功后要執行的函數重定向子進程的標準輸入和輸出。默認為阻塞讀取。是否創建管道,啟用后,此選項將忽略用戶參數,強制為。 swoole——從入門到放棄(三) 一、進程 swoole_process SwooleProcess swoole_process::__construct(callable $function, $redirect_stdin...
摘要:從入門到放棄三一進程子進程創建成功后要執行的函數重定向子進程的標準輸入和輸出。默認為阻塞讀取。是否創建管道,啟用后,此選項將忽略用戶參數,強制為。 swoole——從入門到放棄(三) 一、進程 swoole_process SwooleProcess swoole_process::__construct(callable $function, $redirect_stdin...
摘要:年開發并發布框架現已停止維護。經過一年實戰,年月日,一周年之際正式發布版本。宇潤部分開源項目我已通過碼云平臺,向項目力所能及地捐款,聊表心意。所以,目前主打的還是單體應用開發。協議的開發,也是帶來的一大優勢。 imi 介紹 showImg(https://segmentfault.com/img/bVbuab9?w=291&h=187); imi 是基于 PHP 協程應用開發框架,它支...
摘要:易用穩定,本次想通過對的學習和個人解析,吸收框架的思想和設計知識,加強自己對的認知和理解。當然,筆者能力水平有限,后續的文章如有錯誤,還請指出和諒解。目錄如下后續添加文章都會記錄在此服務啟動過程以及主體設計流程源碼解析 前言 swoole是什么?官網的原話介紹是這樣的: Swoole 使用純 C 語言編寫,提供了 PHP 語言的異步多線程服務器,異步 TCP/UDP 網絡客戶端,異步 ...
閱讀 826·2021-11-22 11:59
閱讀 3230·2021-11-17 09:33
閱讀 2308·2021-09-29 09:34
閱讀 1941·2021-09-22 15:25
閱讀 1955·2019-08-30 15:55
閱讀 1321·2019-08-30 15:55
閱讀 530·2019-08-30 15:53
閱讀 3346·2019-08-29 13:55