摘要:作者屈鵬本篇源碼解析將為大家介紹的另一周邊組件。這個函數(shù)會往完成隊列中注冊若干個,相當(dāng)于用往一個中注冊一些事件的關(guān)注。在函數(shù)返回之后,服務(wù)端的初始化及啟動過程便結(jié)束了。
作者:屈鵬
本篇 TiKV 源碼解析將為大家介紹 TiKV 的另一周邊組件—— grpc-rs。grpc-rs 是 PingCAP 實(shí)現(xiàn)的一個 gRPC 的 Rust 綁定,其 Server/Client 端的代碼框架都基于 Future,事件驅(qū)動的 EventLoop 被隱藏在了庫的內(nèi)部,所以非常易于使用。本文將以一個簡單的 gRPC 服務(wù)作為例子,展示 grpc-rs 會生成的服務(wù)端代碼框架和需要服務(wù)的實(shí)現(xiàn)者填寫的內(nèi)容,然后會深入介紹服務(wù)器在啟動時如何將后臺的事件循環(huán)與這個框架掛鉤,并在后臺線程中運(yùn)行實(shí)現(xiàn)者的代碼。
基本的代碼生成及服務(wù)端 APIgRPC 使用 protobuf 定義一個服務(wù),之后調(diào)用相關(guān)的代碼生成工具就可以生成服務(wù)端、客戶端的代碼框架了,這個過程可以參考我們的 官方文檔。客戶端可以直接調(diào)用這些生成的代碼,向服務(wù)端發(fā)送請求并接收響應(yīng),而服務(wù)端則需要服務(wù)的實(shí)現(xiàn)者自己來定制對請求的處理邏輯,生成響應(yīng)并發(fā)回給客戶端。舉一個例子:
#[derive(Clone)] struct MyHelloService {} impl Hello for MyHelloService { // trait 中的函數(shù)簽名由 grpc-rs 生成,內(nèi)部實(shí)現(xiàn)需要用戶自己填寫 fn hello(&mut self, ctx: RpcContext, req: HelloRequest, sink: UnarySink) { let mut resp = HelloResponse::new(); resp.set_to(req.get_from()); ctx.spawn( sink.success(resp) .map(|_| println!("send hello response back success")) .map_err(|e| println!("send hello response back fail: {}", e)) ); } }
我們定義了一個名為 Hello 的服務(wù),里面只有一個名為 hello 的 RPC。grpc-rs 會為服務(wù)生成一個 trait,里面的方法就是這個服務(wù)包含的所有 RPC。在這個例子中唯一的 RPC 中,我們從 HelloRequest 中拿到客戶端的名字,然后再將這個名字放到 HelloResponse 中發(fā)回去,非常簡單,只是展示一下函數(shù)簽名中各個參數(shù)的用法。
然后,我們需要考慮的是如何把這個服務(wù)運(yùn)行起來,監(jiān)聽一個端口,真正能夠響應(yīng)客戶端的請求呢?下面的代碼片段展示了如何運(yùn)行這個服務(wù):
fn main() { // 創(chuàng)建一個 Environment,里面包含一個 Completion Queue let env = Arc::new(EnvBuilder::new().cq_count(4).build()); let channel_args = ChannelBuilder::new(env.clone()).build_args(); let my_service = MyHelloWorldService::new(); let mut server = ServerBuilder::new(env.clone()) // 使用 MyHelloWorldService 作為服務(wù)端的實(shí)現(xiàn),注冊到 gRPC server 中 .register_service(create_hello(my_service)) .bind("0.0.0.0", 44444) .channel_args(channel_args) .build() .unwrap(); server.start(); thread::park(); }
以上代碼展示了 grpc-rs 的足夠簡潔的 API 接口,各行代碼的意義如其注釋所示。
Server 的創(chuàng)建和啟動下面我們來看一下這個 gRPC server 是如何接收客戶端的請求,并路由到我們實(shí)現(xiàn)的服務(wù)端代碼中進(jìn)行后續(xù)的處理的。
第一步我們初始化一個 Environment,并設(shè)置 Completion Queue(完成隊列)的個數(shù)為 4 個。完成隊列是 gRPC 的一個核心概念,grpc-rs 為每一個完成隊列創(chuàng)建一個線程,并在線程中運(yùn)行一個事件循環(huán),類似于 Linux 網(wǎng)絡(luò)編程中不斷地調(diào)用 epoll_wait 來獲取事件,進(jìn)行處理:
// event loop fn poll_queue(cq: Arc) { let id = thread::current().id(); let cq = CompletionQueue::new(cq, id); loop { let e = cq.next(); match e.event_type { EventType::QueueShutdown => break, EventType::QueueTimeout => continue, EventType::OpComplete => {} } let tag: Box = unsafe { Box::from_raw(e.tag as _) }; tag.resolve(&cq, e.success != 0); } }
事件被封裝在 Tag 中。我們暫時忽略對事件的具體處理邏輯,目前我們只需要知道,當(dāng)這個 Environment 被創(chuàng)建好之后,這些后臺線程便開始運(yùn)行了。那么剩下的任務(wù)就是監(jiān)聽一個端口,將網(wǎng)絡(luò)上的事件路由到這幾個事件循環(huán)中。這個過程在 Server 的 start 方法中:
/// Start the server. pub fn start(&mut self) { unsafe { grpc_sys::grpc_server_start(self.core.server); for cq in self.env.completion_queues() { let registry = self .handlers .iter() .map(|(k, v)| (k.to_owned(), v.box_clone())) .collect(); let rc = RequestCallContext { server: self.core.clone(), registry: Arc::new(UnsafeCell::new(registry)), }; for _ in 0..self.core.slots_per_cq { request_call(rc.clone(), cq); } } } }
首先調(diào)用 grpc_server_start 來啟動這個 Server,然后對每一個完成隊列,復(fù)制一份 handler 字典。這個字典的 key 是一個字符串,而 value 是一個函數(shù)指針,指向?qū)@個類型的請求的處理函數(shù)——其實(shí)就是前面所述的服務(wù)的具體實(shí)現(xiàn)邏輯。key 的構(gòu)造方式其實(shí)就是 /
接著我們創(chuàng)建一個 RequestCallContext,然后對每個完成隊列調(diào)用幾次 request_call。這個函數(shù)會往完成隊列中注冊若干個 Call,相當(dāng)于用 epoll_ctl 往一個 epoll fd 中注冊一些事件的關(guān)注。Call 是 gRPC 在進(jìn)行遠(yuǎn)程過程調(diào)用時的基本單元,每一個 RPC 在建立的時候都會從完成隊列里取出一個 Call 對象,后者會在這個 RPC 結(jié)束時被回收。因此,在 start 函數(shù)中每一個完成隊列上注冊的 Call 個數(shù)決定了這個完成隊列上可以并發(fā)地處理多少個 RPC,在 grpc-rs 中默認(rèn)的值是 1024 個。
小結(jié)以上代碼基本都在 grpc-rs 倉庫中的 src/server.rs 文件中。在 start 函數(shù)返回之后,服務(wù)端的初始化及啟動過程便結(jié)束了。現(xiàn)在,可以快速地用幾句話回顧一下:首先創(chuàng)建一個 Environment,內(nèi)部會為每一個完成隊列啟動一個線程;接著創(chuàng)建 Server 對象,綁定端口,并將一個或多個服務(wù)注冊到這個 Server 上;最后調(diào)用 Server 的 start 方法,將服務(wù)的具體實(shí)現(xiàn)關(guān)聯(lián)到若干個 Call 上,并塞進(jìn)所有的完成隊列中。在這之后,網(wǎng)絡(luò)上新來的 RPC 請求便可以在后臺的事件循環(huán)中被取出,并根據(jù)具體實(shí)現(xiàn)的字典分別執(zhí)行了。最后,不要忘記 start 是一個非阻塞的方法,調(diào)用它的主線程在之后可以繼續(xù)執(zhí)行別的邏輯或者掛起。
本篇源碼解析就到這里,下篇關(guān)于 grpc-rs 的文章我們會進(jìn)一步介紹一個 Call 或者 RPC 的生命周期,以及每一階段在 Server 端的完成隊列中對應(yīng)哪一種事件、會被如何處理,這一部分是 grpc-rs 的核心代碼,敬請期待!
原文鏈接:https://www.pingcap.com/blog-cn/tikv-source-code-reading-7/
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/18027.html
摘要:要說明的實(shí)現(xiàn),需要先介紹的運(yùn)行方式。封裝與實(shí)現(xiàn)細(xì)節(jié)通過上文的分析可以明顯看到,的通知機(jī)制其實(shí)和的通知機(jī)制非常類似。是用來處理上文提到的返回結(jié)果的。儲存的是和收到的消息。小結(jié)最后簡要總結(jié)一下的封裝和實(shí)現(xiàn)過程。 作者: 李建俊 上一篇《gRPC Server 的初始化和啟動流程》為大家介紹了 gRPC Server 的初始化和啟動流程,本篇將帶大家深入到 grpc-rs 這個庫里,查看 R...
閱讀 2860·2021-10-14 09:50
閱讀 1217·2021-10-08 10:21
閱讀 3645·2021-10-08 10:16
閱讀 3062·2021-09-27 14:02
閱讀 3135·2021-09-23 11:21
閱讀 2108·2021-09-07 10:17
閱讀 406·2019-08-30 14:00
閱讀 2105·2019-08-29 17:26