摘要:要說明的實現,需要先介紹的運行方式。封裝與實現細節通過上文的分析可以明顯看到,的通知機制其實和的通知機制非常類似。是用來處理上文提到的返回結果的。儲存的是和收到的消息。小結最后簡要總結一下的封裝和實現過程。
作者: 李建俊
上一篇《gRPC Server 的初始化和啟動流程》為大家介紹了 gRPC Server 的初始化和啟動流程,本篇將帶大家深入到 grpc-rs 這個庫里,查看 RPC 請求是如何被封裝和派發的,以及它是怎么和 Rust Future 進行結合的。
gRPC C CoregRPC 包括了一系列復雜的協議和流控機制,如果要為每個語言都實現一遍這些機制和協議,將會是一個很繁重的工作。因此 gRPC 提供了一個統一的庫來提供基本的實現,其他語言再基于這個實現進行封裝和適配,提供更符合相應語言習慣或生態的接口。這個庫就是 gRPC C Core,grpc-rs 就是基于 gRPC C Core 進行封裝的。
要說明 grpc-rs 的實現,需要先介紹 gRPC C Core 的運行方式。gRPC C Core 有三個很關鍵的概念 grpc_channel、grpc_completion_queue、grpc_call。grpc_channel 在 RPC 里就是底層的連接,grpc_completion_queue 就是一個處理完成事件的隊列。grpc_call 代表的是一個 RPC。要進行一次 RPC,首先從 grpc_channel 創建一個 grpc_call,然后再給這個 grpc_call 發送請求,收取響應。而這個過程都是異步,所以需要調用 grpc_completion_queue 的接口去驅動消息處理。整個過程可以通過以下代碼來解釋(為了讓代碼更可讀一些,以下代碼和實際可編譯運行的代碼有一些出入)。
grpc_completion_queue* queue = grpc_completion_queue_create_for_next(NULL); grpc_channel* ch = grpc_insecure_channel_create("example.com", NULL); grpc_call* call = grpc_channel_create_call(ch, NULL, 0, queue, "say_hello"); grpc_op ops[6]; memset(ops, 0, sizeof(ops)); char* buffer = (char*) malloc(100); ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; ops[1].op = GRPC_OP_SEND_MESSAGE; ops[1].data.send_message.send_message = "gRPC"; ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; ops[4].op = GRPC_OP_RECV_MESSAGE; ops[4].data.recv_message.recv_message = buffer; ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT; void* tag = malloc(1); grpc_call_start_batch(call, ops, 6, tag); grpc_event ev = grpc_completion_queue_next(queue); ASSERT_EQ(ev.tag, tag); ASSERT(strcmp(buffer, "Hello gRPC"));
可以看到,對 grpc_call 的操作是通過一次 grpc_call_start_batch 來指定的。這個 start batch 會將指定的操作放在內存 buffer 當中,然后通過 grpc_completion_queue_next 來實際執行相關操作,如收發消息。這里需要注意的是 tag 這個變量。當這些操作都完成以后,grpc_completion_queue_next 會返回一個包含 tag 的消息來通知這個操作完成了。所以在代碼的末尾就可以在先前指定的 buffer 讀出預期的字符串。
由于篇幅有限,對于 gRPC C Core 的解析就不再深入了,對這部分很感興趣的朋友也可以在 github.com/grpc/grpc 閱讀相關文檔和源碼。
封裝與實現細節通過上文的分析可以明顯看到,gRPC C Core 的通知機制其實和 Rust Future 的通知機制非常類似。Rust Future 提供一個 poll 方法來檢驗當前 Future 是否已經 ready。如果尚未 ready,poll 方法會注冊一個通知鉤子 task。等到 ready 時,task 會被調用,從而觸發對這個 Future 的再次 poll,獲取結果。task 其實和上文中的 tag 正好對應起來了,而在 grpc-rs 中,tag 就是一個儲存了 task 的 enum。
pub enum CallTag { Batch(BatchPromise), Request(RequestCallback), UnaryRequest(UnaryRequestCallback), Abort(Abort), Shutdown(ShutdownPromise), Spawn(SpawnNotify), }
tag 之所以是一個 enum 是因為不同的 call 會對應不同的行為,如對于服務器端接受請求的處理和客戶端發起請求的處理就不太一樣。
grpc-rs 在初始化時會創建多個線程來不斷調用 grpc_completion_queue_next 來獲取已經完成的 tag,然后根據 tag 的類型,將數據存放在結構體中并通知 task 來獲取。下面是這個流程的代碼。
// event loop fn poll_queue(cq: Arc) { let id = thread::current().id(); let cq = CompletionQueue::new(cq, id); loop { let e = cq.next(); match e.event_type { EventType::QueueShutdown => break, // timeout should not happen in theory. EventType::QueueTimeout => continue, EventType::OpComplete => {} } let tag: Box = unsafe { Box::from_raw(e.tag as _) }; tag.resolve(&cq, e.success != 0); } }
可以看到,tag 會被強轉成為一個 CallTag,然后調用 resolve 方法來處理結果。不同的 enum 類型會有不同的 resolve 方式,這里挑選其中 CallTag::Batch 和 CallTag::Request 來進行解釋,其他的 CallTag 流程類似。
BatchPromise 是用來處理上文提到的 grpc_call_start_batch 返回結果的 tag。RequestCallback 則用來接受新的 RPC 請求。下面是 BatchPromise 的定義及其 resolve 方法。
/// A promise used to resolve batch jobs. pub struct BatchPromise { ty: BatchType, ctx: BatchContext, inner: Arc>>, } impl BatchPromise { fn handle_unary_response(&mut self) { let task = { let mut guard = self.inner.lock(); let status = self.ctx.rpc_status(); if status.status == RpcStatusCode::Ok { guard.set_result(Ok(self.ctx.recv_message())) } else { guard.set_result(Err(Error::RpcFailure(status))) } }; task.map(|t| t.notify()); } pub fn resolve(mut self, success: bool) { match self.ty { BatchType::CheckRead => { assert!(success); self.handle_unary_response(); } BatchType::Finish => { self.finish_response(success); } BatchType::Read => { self.read_one_msg(success); } } } }
上面代碼中的 ctx 是用來儲存響應的字段,包括響應頭、數據之類的。當 next 返回時,gRPC C Core 會將對應內容填充到這個結構體里。inner 儲存的是 task 和收到的消息。當 resolve 被調用時,先判斷這個 tag 要執行的是什么任務。BatchType::CheckRead 表示是一問一答式的讀取任務,Batch::Finish 表示的是沒有返回數據的任務,BatchType::Read 表示的是流式響應里讀取單個消息的任務。拿 CheckRead 舉例,它會將拉取到的數據存放在 inner 里,并通知 task。而 task 對應的 Future 再被 poll 時就可以拿到對應的數據了。這個 Future 的定義如下:
/// A future object for task that is scheduled to `CompletionQueue`. pub struct CqFuture{ inner: Arc >, } impl Future for CqFuture { type Item = T; type Error = Error; fn poll(&mut self) -> Poll { let mut guard = self.inner.lock(); if guard.stale { panic!("Resolved future is not supposed to be polled again."); } if let Some(res) = guard.result.take() { guard.stale = true; return Ok(Async::Ready(res?)); } // So the task has not been finished yet, add notification hook. if guard.task.is_none() || !guard.task.as_ref().unwrap().will_notify_current() { guard.task = Some(task::current()); } Ok(Async::NotReady) } }
Inner 是一個 SpinLock。如果在 poll 時還沒拿到結果時,會將 task 存放在鎖里,在有結果的時候,存放結果并通過 task 通知再次 poll。如果有結果則直接返回結果。
下面是 RequestCallback 的定義和 resolve 方法。
pub struct RequestCallback { ctx: RequestContext, } impl RequestCallback { pub fn resolve(mut self, cq: &CompletionQueue, success: bool) { let mut rc = self.ctx.take_request_call_context().unwrap(); if !success { server::request_call(rc, cq); return; } match self.ctx.handle_stream_req(cq, &mut rc) { Ok(_) => server::request_call(rc, cq), Err(ctx) => ctx.handle_unary_req(rc, cq), } } }
上面代碼中的 ctx 是用來儲存請求的字段,主要包括請求頭。和 BatchPromise 類似,ctx 的內容也是在調用 next 方法時被填充。在 resolve 時,如果失敗,則再次調用 request_call 來接受下一個 RPC,否則會調用對應的 RPC 方法。
handle_stream_req 的定義如下:
pub fn handle_stream_req( self, cq: &CompletionQueue, rc: &mut RequestCallContext, ) -> result::Result<(), Self> { let handler = unsafe { rc.get_handler(self.method()) }; match handler { Some(handler) => match handler.method_type() { MethodType::Unary | MethodType::ServerStreaming => Err(self), _ => { execute(self, cq, None, handler); Ok(()) } }, None => { execute_unimplemented(self, cq.clone()); Ok(()) } } }
從上面可以看到,整個過程先通過 get_handler,根據 RPC 想要執行的方法名字拿到方法并調用,如果方法不存在,則向客戶端報錯。可以看到這里對于 Unary 和 ServerStreaming 返回了錯誤。這是因為這兩種請求都是客戶端只發一次請求,所以返回錯誤讓 resolve 繼續拉取消息體然后再執行對應的方法。
為什么 get_handler 可以知道調用的是什么方法呢?這是因為 gRPC 編譯器在生成代碼里對這些方法進行了映射,具體的細節在生成的 create_xxx_service 里,本文就不再展開了。
小結最后簡要總結一下 grpc-rs 的封裝和實現過程。當 grpc-rs 初始化時,會創建數個線程輪詢消息隊列(grpc_completion_queue)并 resolve。當 server 被創建時,RPC 會被注冊起來,server 啟動時,grpc-rs 會創建數個 RequestCall 來接受請求。當有 RPC 請求發到服務器端時,CallTag::Request 就會被返回并 resolve,并在 resolve 中調用對應的 RPC 方法。而 client 在調用 RPC 時,其實都是創建了一個 Call,并產生相應的 BatchPromise 來異步通知 RPC 方法是否已經完成。
還有很多 grpc-rs 的源碼在我們的文章中暫未涉及,其中還有不少有趣的技巧,比如,如何減少喚醒線程的次數而減少切換、如何無鎖地注冊調用各個 service 鉤子等。歡迎有好奇心的小伙伴自行閱讀源碼,也歡迎大家提 issue 或 PR 一起來完善這個項目。
原文閱讀:https://www.pingcap.com/blog-cn/tikv-source-code-reading-8/
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/18039.html
摘要:作者屈鵬本篇源碼解析將為大家介紹的另一周邊組件。這個函數會往完成隊列中注冊若干個,相當于用往一個中注冊一些事件的關注。在函數返回之后,服務端的初始化及啟動過程便結束了。 作者:屈鵬 本篇 TiKV 源碼解析將為大家介紹 TiKV 的另一周邊組件—— grpc-rs。grpc-rs 是 PingCAP 實現的一個 gRPC 的 Rust 綁定,其 Server/Client 端的代碼框架...
摘要:而源碼解析系列文章則是會從源碼層面給大家抽絲剝繭,讓大家知道我們內部到底是如何實現的。我們希望通過該源碼解析系列,能讓大家對有一個更深刻的理解。 作者:唐劉 TiKV 是一個支持事務的分布式 Key-Value 數據庫,有很多社區開發者基于 TiKV 來開發自己的應用,譬如 titan、tidis。尤其是在 TiKV 成為 CNCF 的 Sandbox 項目之后,吸引了越來越多開發者的...
閱讀 1668·2021-11-19 09:40
閱讀 2923·2021-09-24 10:27
閱讀 3214·2021-09-02 15:15
閱讀 1876·2019-08-30 15:54
閱讀 1201·2019-08-30 15:54
閱讀 1368·2019-08-30 13:12
閱讀 625·2019-08-28 18:05
閱讀 2793·2019-08-27 10:53