摘要:消息隊(duì)列的接受消息隊(duì)列的接受是利用函數(shù),其中是消息的類型,該參數(shù)會(huì)取出指定類型的消息,如果設(shè)定的是爭搶模式,該值會(huì)統(tǒng)一為,否則該值就是消息發(fā)送目的的。環(huán)形隊(duì)列的消息入隊(duì)發(fā)送消息首先要確定環(huán)形隊(duì)列的隊(duì)尾。取模操作可以優(yōu)化
前言
swoole 的底層隊(duì)列有兩種:進(jìn)程間通信 IPC 的消息隊(duì)列 swMsgQueue,與環(huán)形隊(duì)列 swRingQueue。IPC 的消息隊(duì)列用于 task_worker 進(jìn)程接受投遞消息,環(huán)形隊(duì)列用于 SW_MODE_THREAD 線程模式下 task_worker 接受投遞消息的方法。
swMsgQueue 消息隊(duì)列數(shù)據(jù)結(jié)構(gòu)swoole 使用的消息隊(duì)列并不是 POSIX 下的 mq_xx 系統(tǒng)函數(shù),而是 SystemV 下的 msgxxx 系列函數(shù),原因猜測是 systemv 系統(tǒng)函數(shù)可以指定 mtype,也就是消息的類型,這樣就可以實(shí)現(xiàn)對(duì)指定的 task_worker 的投放。
swMsgQueue 的數(shù)據(jù)結(jié)構(gòu)比較簡單,blocking 指定消息隊(duì)列是否是阻塞式,msg_id 是創(chuàng)建的消息隊(duì)列的 id,flags 也是指定阻塞式還是非阻塞式,perms 指定消息隊(duì)列的權(quán)限。
typedef struct _swMsgQueue { int blocking; int msg_id; int flags; int perms; } swMsgQueue;swMsgQueue 消息隊(duì)列 swMsgQueue 消息隊(duì)列的創(chuàng)建
創(chuàng)建消息隊(duì)列就是調(diào)用 msgget 函數(shù),這個(gè)函數(shù)的 msg_key 就是 server 端配置的 message_queue_key,task 隊(duì)列在 server 結(jié)束后不會(huì)銷毀,重新啟動(dòng)程序后,task 進(jìn)程仍然會(huì)接著處理隊(duì)列中的任務(wù)。如果不設(shè)置該值,那么程序會(huì)自動(dòng)生成: ftok($php_script_file, 1)
void swMsgQueue_set_blocking(swMsgQueue *q, uint8_t blocking) { if (blocking == 0) { q->flags = q->flags | IPC_NOWAIT; } else { q->flags = q->flags & (~IPC_NOWAIT); } } int swMsgQueue_create(swMsgQueue *q, int blocking, key_t msg_key, int perms) { if (perms <= 0 || perms >= 01000) { perms = 0666; } int msg_id; msg_id = msgget(msg_key, IPC_CREAT | perms); if (msg_id < 0) { swSysError("msgget() failed."); return SW_ERR; } else { bzero(q, sizeof(swMsgQueue)); q->msg_id = msg_id; q->perms = perms; q->blocking = blocking; swMsgQueue_set_blocking(q, blocking); } return 0; }swMsgQueue 消息隊(duì)列的發(fā)送
消息隊(duì)列的發(fā)送主要利用 msgsnd 函數(shù),flags 指定發(fā)送是阻塞式還是非阻塞式,在 task_worker 進(jìn)程中都是采用阻塞式發(fā)送的方法。
int swMsgQueue_push(swMsgQueue *q, swQueue_data *in, int length) { int ret; while (1) { ret = msgsnd(q->msg_id, in, length, q->flags); if (ret < 0) { SwooleG.error = errno; if (errno == EINTR) { continue; } else if (errno == EAGAIN) { return -1; } else { swSysError("msgsnd(%d, %d, %ld) failed.", q->msg_id, length, in->mtype); return -1; } } else { return ret; } } return 0; }swMsgQueue 消息隊(duì)列的接受
消息隊(duì)列的接受是利用 msgrcv 函數(shù),其中 mtype 是消息的類型,該參數(shù)會(huì)取出指定類型的消息,如果 task_ipc_mode 設(shè)定的是爭搶模式,該值會(huì)統(tǒng)一為 0,否則該值就是消息發(fā)送目的 task_worker 的 id。
task_worker 進(jìn)程的主循環(huán)會(huì)阻塞在本函數(shù)中,直到有消息到達(dá)。
int swMsgQueue_pop(swMsgQueue *q, swQueue_data *data, int length) { int ret = msgrcv(q->msg_id, data, length, data->mtype, q->flags); if (ret < 0) { SwooleG.error = errno; if (errno != ENOMSG && errno != EINTR) { swSysError("msgrcv(%d, %d, %ld) failed.", q->msg_id, length, data->mtype); } } return ret; }swRingQueue 環(huán)形隊(duì)列的數(shù)據(jù)結(jié)構(gòu)
環(huán)形隊(duì)列在之前的文章中從來沒有出現(xiàn),因?yàn)樵撽?duì)列是用于 SW_MODE_THREAD 模式下的 worker 線程中。由于并不是進(jìn)程間的通訊,而是線程間的通訊,因此效率會(huì)更高。
swoole 的環(huán)形隊(duì)列有兩種,一種是普通的環(huán)形隊(duì)列,另一種是線程安全的環(huán)形隊(duì)列,本文只會(huì)講線程安全的環(huán)形隊(duì)列,
swoole 為了環(huán)形隊(duì)列更加高效,并沒有使用線程鎖,而是使用了無鎖結(jié)構(gòu),只會(huì)利用 atomic 原子鎖。
值得注意的是數(shù)據(jù)結(jié)構(gòu)中的 flags,該值只會(huì)是 0-4 中的一個(gè),該值都是利用原子鎖來改動(dòng),以此來實(shí)現(xiàn)互斥的作用。
typedef struct _swRingQueue { void **data; /* 隊(duì)列空間 */ char *flags; // 0:push ready 1: push now // 2:pop ready; 3: pop now uint size; /* 隊(duì)列總尺寸 */ uint num; /* 隊(duì)列當(dāng)前入隊(duì)數(shù)量 */ uint head; /* 頭部,出隊(duì)列方向*/ uint tail; /* 尾部,入隊(duì)列方向*/ } swRingQueue;swRingQueue 環(huán)形隊(duì)列 swRingQueue 環(huán)形隊(duì)列的創(chuàng)建
環(huán)形隊(duì)列的創(chuàng)建很簡單,就是初始化隊(duì)列數(shù)據(jù)結(jié)構(gòu)中的各種屬性。
int swRingQueue_init(swRingQueue *queue, int buffer_size) { queue->size = buffer_size; queue->flags = (char *)sw_malloc(queue->size); if (queue->flags == NULL) { return -1; } queue->data = (void **)sw_calloc(queue->size, sizeof(void*)); if (queue->data == NULL) { sw_free(queue->flags); return -1; } queue->head = 0; queue->tail = 0; memset(queue->flags, 0, queue->size); memset(queue->data, 0, queue->size * sizeof(void*)); return 0; }swRingQueue 環(huán)形隊(duì)列的消息入隊(duì)
發(fā)送消息首先要確定環(huán)形隊(duì)列的隊(duì)尾。queue->flags 是一個(gè)數(shù)組,里面存儲(chǔ)著所有的隊(duì)列元素當(dāng)前的狀態(tài)。如果當(dāng)前隊(duì)尾元素的狀態(tài)不是 0,說明已經(jīng)有其他線程對(duì)該隊(duì)列元素進(jìn)行操作,我們當(dāng)前線程暫時(shí)不能對(duì)當(dāng)前隊(duì)尾進(jìn)行操作,要等其他線程將隊(duì)尾元素向后移動(dòng)一位,我們才能進(jìn)行更新。
當(dāng)線程將當(dāng)前隊(duì)尾的狀態(tài)從 0 改變?yōu)?1 之后,我們就要立刻更新隊(duì)尾的 offset,讓其他線程繼續(xù)入隊(duì)數(shù)據(jù)。接著將數(shù)據(jù)放入 queue->data,僅僅將數(shù)據(jù)的地址保存即可。
最后,將 cur_tail_flag_index 原子加 1,將隊(duì)列元素狀態(tài)改為待讀;將 queue->num 原子加 1
int swRingQueue_push(swRingQueue *queue, void * ele) { if (!(queue->num < queue->size)) { return -1; } int cur_tail_index = queue->tail; char * cur_tail_flag_index = queue->flags + cur_tail_index; //TODO Scheld while (!sw_atomic_cmp_set(cur_tail_flag_index, 0, 1)) { cur_tail_index = queue->tail; cur_tail_flag_index = queue->flags + cur_tail_index; } // 兩個(gè)入隊(duì)線程之間的同步 //TODO 取模操作可以優(yōu)化 int update_tail_index = (cur_tail_index + 1) % queue->size; // 如果已經(jīng)被其他的線程更新過,則不需要更新; // 否則,更新為 (cur_tail_index+1) % size; sw_atomic_cmp_set(&queue->tail, cur_tail_index, update_tail_index); // 申請(qǐng)到可用的存儲(chǔ)空間 *(queue->data + cur_tail_index) = ele; sw_atomic_fetch_add(cur_tail_flag_index, 1); sw_atomic_fetch_add(&queue->num, 1); return 0; }swRingQueue 環(huán)形隊(duì)列的消息出隊(duì)
與入隊(duì)相反,出隊(duì)需要確定當(dāng)前隊(duì)列的隊(duì)首位置,如果隊(duì)首的狀態(tài)不是 2,那么說明有其他線程已經(jīng)進(jìn)行了出隊(duì)操作,等待其他線程更新隊(duì)首位置即可。
獲取到隊(duì)首元素之后,要立刻更新隊(duì)首的新位置,然后將數(shù)據(jù)的首地址傳遞給 ele,然后將隊(duì)首元素狀態(tài)復(fù)原,減少隊(duì)列的 num。
int swRingQueue_pop(swRingQueue *queue, void **ele) { if (!(queue->num > 0)) return -1; int cur_head_index = queue->head; char * cur_head_flag_index = queue->flags + cur_head_index; while (!sw_atomic_cmp_set(cur_head_flag_index, 2, 3)) { cur_head_index = queue->head; cur_head_flag_index = queue->flags + cur_head_index; } //TODO 取模操作可以優(yōu)化 int update_head_index = (cur_head_index + 1) % queue->size; sw_atomic_cmp_set(&queue->head, cur_head_index, update_head_index); *ele = *(queue->data + cur_head_index); sw_atomic_fetch_sub(cur_head_flag_index, 3); sw_atomic_fetch_sub(&queue->num, 1); return 0; }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/30844.html
摘要:前言內(nèi)存數(shù)據(jù)結(jié)構(gòu),類似于的通道,底層基于共享內(nèi)存互斥鎖實(shí)現(xiàn),可實(shí)現(xiàn)用戶態(tài)的高性能內(nèi)存隊(duì)列。是當(dāng)前隊(duì)列占用的內(nèi)存大小,用來指定是否使用共享內(nèi)存是否使用鎖是否使用通知。 前言 內(nèi)存數(shù)據(jù)結(jié)構(gòu) Channel,類似于 Go 的 chan 通道,底層基于 共享內(nèi)存 + Mutex 互斥鎖實(shí)現(xiàn),可實(shí)現(xiàn)用戶態(tài)的高性能內(nèi)存隊(duì)列。Channel 可用于多進(jìn)程環(huán)境下,底層在讀取寫入時(shí)會(huì)自動(dòng)加鎖,應(yīng)用層不需...
摘要:函數(shù)事件循環(huán)在事件循環(huán)時(shí),如果使用的是消息隊(duì)列,那么就不斷的調(diào)用從消息隊(duì)列中取出數(shù)據(jù)。獲取后的數(shù)據(jù)調(diào)用回調(diào)函數(shù)消費(fèi)消息之后,向中發(fā)送空數(shù)據(jù),告知進(jìn)程已消費(fèi),并且關(guān)閉新連接。 swManager_start 創(chuàng)建進(jìn)程流程 task_worker 進(jìn)程的創(chuàng)建可以分為三個(gè)步驟:swServer_create_task_worker 申請(qǐng)所需的內(nèi)存、swTaskWorker_init 初始化...
摘要:官網(wǎng)源碼解讀號(hào)外號(hào)外歡迎大家我們開發(fā)組定了一個(gè)就線下聚一次的小目標(biāo)里面的框架算是非常重的了這里的重先不具體到性能層面主要是框架的設(shè)計(jì)思想和框架集成的服務(wù)讓框架可以既可以快速解決很多問題又可以輕松擴(kuò)展中的框架有在應(yīng)該無出其右了這次解讀的源碼 官網(wǎng): https://www.swoft.org/源碼解讀: http://naotu.baidu.com/file/8... 號(hào)外號(hào)外, 歡迎大...
摘要:清空主進(jìn)程殘留的定時(shí)器與信號(hào)。設(shè)定為執(zhí)行回調(diào)函數(shù)如果在回調(diào)函數(shù)中調(diào)用了異步系統(tǒng),啟動(dòng)函數(shù)進(jìn)行事件循環(huán)。因此為了區(qū)分兩者,規(guī)定并不允許兩者同時(shí)存在。 前言 swoole-1.7.2 增加了一個(gè)進(jìn)程管理模塊,用來替代 PHP 的 pcntl 擴(kuò)展。 PHP自帶的pcntl,存在很多不足,如 pcntl 沒有提供進(jìn)程間通信的功能 pcntl 不支持重定向標(biāo)準(zhǔn)輸入和輸出 pcntl 只...
摘要:基于擴(kuò)展實(shí)現(xiàn)真正的數(shù)據(jù)庫連接池這種方案中,項(xiàng)目占用的連接數(shù)僅僅為。一種是連接暫時(shí)不再使用,其占用狀態(tài)解除,可以從使用者手中交回到空閑隊(duì)列中這種我們稱為連接的歸隊(duì)。源碼剖析系列目錄 作者:bromine鏈接:https://www.jianshu.com/p/1a7...來源:簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對(duì)原文進(jìn)行了重新的排版。Swoft Github: https:...
閱讀 1215·2023-04-26 00:47
閱讀 3568·2021-11-16 11:53
閱讀 795·2021-10-08 10:05
閱讀 2739·2021-09-22 15:19
閱讀 2981·2019-08-30 15:55
閱讀 2754·2019-08-29 16:55
閱讀 2920·2019-08-29 15:20
閱讀 1111·2019-08-23 16:13