摘要:值得一提的是目前的服務(wù)即服務(wù),暫沒有其他的服務(wù)功能,所以基本上相關(guān)的配置指代的就是。會將請求傳遞給各個中間件,最終最終傳遞給處理。源碼剖析系列目錄
作者:bromine
鏈接:https://www.jianshu.com/p/411...
來源:簡書
著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對原文進(jìn)行了重新的排版。
Swoft Github: https://github.com/swoft-clou...
Swoft提供了一個自建RPC(遠(yuǎn)程方法調(diào)用)實現(xiàn),讓你可以方便的調(diào)用其他Swoft上的服務(wù)。
RPC服務(wù)端的初始化RPC有兩種啟動方式Http伴隨啟動和RPC多帶帶啟動。值得一提的是目前swoole的tcp服務(wù)即RPC服務(wù),暫沒有其他的tcp服務(wù)功能,所以基本上tcp相關(guān)的配置指代的就是RPC。
Http伴隨啟動Swoft 的 RPC 服務(wù)在Http服務(wù)啟動時候伴隨啟動
//SwoftHttpServerHttpHttpServer.php /** * Http Server */ class HttpServer extends AbstractServer /** * Start Server * * @throws SwoftExceptionRuntimeException */ public function start() { //code ... //根據(jù).env配置文件Server區(qū)段的TCPABLE字段決定是否啟動RPC服務(wù) if ((int)$this->serverSetting["tcpable"] === 1) { $this->registerRpcEvent(); } //code .... } }Swoole監(jiān)聽
初始化流程即根據(jù)相關(guān)注解注冊一個swoole監(jiān)聽
//SwoftHttpServerHttpHttpServer.php /** * Register rpc event, swoft/rpc-server required * * @throws SwoftExceptionRuntimeException */ protected function registerRpcEvent() { //含有@SwooleListener且type為SwooleEvent::TYPE_PORT的Bean,即RpcEventListener $swooleListeners = SwooleListenerCollector::getCollector(); if (!isset($swooleListeners[SwooleEvent::TYPE_PORT][0]) || empty($swooleListeners[SwooleEvent::TYPE_PORT][0])) { throw new RuntimeException("Please use swoft/rpc-server, run "composer require swoft/rpc-server""); } //添加swoole RPC相關(guān)的tcp監(jiān)聽端口,使用的是.env文件中的TCP區(qū)段配置 $this->listen = $this->server->listen($this->tcpSetting["host"], $this->tcpSetting["port"], $this->tcpSetting["type"]); $tcpSetting = $this->getListenTcpSetting(); $this->listen->set($tcpSetting); //根據(jù)RpcEventListener的相關(guān)注解添加監(jiān)聽處理句柄 $swooleRpcPortEvents = $swooleListeners[SwooleEvent::TYPE_PORT][0]; $this->registerSwooleEvents($this->listen, $swooleRpcPortEvents); }
由于是初版,根據(jù)@SwooleListener獲取RPC監(jiān)聽Bean的相關(guān)處理暫時還有點生硬。
目前swoft中type為SwooleEvent::TYPE_PORT的@SwooleListener只有RpcEventListener一個,如果添加了同類Bean容易出問題,穩(wěn)定版出的時候應(yīng)該會有相關(guān)優(yōu)化。
入口從SwoftHttpServerCommandServerCommand換成SwoftRpcServerCommandRpcCommand,流程和Http大同小異,就是swoole的設(shè)定監(jiān)聽,僅僅是少了HTTP相關(guān)的監(jiān)聽接口和事件而已,此處不再贅述。
RPC請求處理RPC服務(wù)器和HTTP服務(wù)器的區(qū)別僅僅在于與客戶端交互報文格式和報文所在的網(wǎng)絡(luò)層(Swoft的RPC基于TCP層次),運行原理基本相通,都是路由,中間件,RPC Service(對應(yīng)Http的Controller),你完全可以以Http服務(wù)的思路去理解他。
Swoole的RPC-TCP監(jiān)聽設(shè)置好后,RPC服務(wù)端就可以開始接受請求了。RpcEventListener的負(fù)責(zé)的工作僅僅是把收到的數(shù)據(jù)轉(zhuǎn)發(fā)給SwoftRpcServerServiceDispatcher分發(fā)。Dispatcher會將請求傳遞給各個Middleware中間件,最終最終傳遞給HandlerAdapterMiddleware處理。
PackerMiddlewarePackerMiddleware 是RPC中比較重要的一個中間件,負(fù)責(zé)將TCP請求中數(shù)據(jù)流解包和數(shù)據(jù)流封包。
getAttribute(self::ATTRIBUTE_DATA); $data = $packer->unpack($data); // 觸發(fā)一個RpcServerEvent::BEFORE_RECEIVE事件,默認(rèn)只有一個用于添加請求上下文信息的BeforeReceiveListener // 利用中間件觸發(fā)流程關(guān)鍵事件的做法耦合有點高,猜測以后會調(diào)整 App::trigger(RpcServerEvent::BEFORE_RECEIVE, null, $data); //替換解包后的解包到Request中,提供給后續(xù)中間件和Handler使用 $request = $request->withAttribute(self::ATTRIBUTE_DATA, $data); /* @var SwoftRpcServerRpcResponse $response */ $response = $handler->handle($request); //為Response封包返回給RPC客戶端 $serviceResult = $response->getAttribute(HandlerAdapter::ATTRIBUTE); $serviceResult = $packer->pack($serviceResult); return $response->withAttribute(HandlerAdapter::ATTRIBUTE, $serviceResult); } }RouterMiddleware
RouterMiddleware負(fù)責(zé)根據(jù)RPC請求的method,version,interface 獲取處理的RPC服務(wù)類,充當(dāng)了路由的作用
getAttribute(PackerMiddleware::ATTRIBUTE_DATA); $method = $data["method"]??""; $version = $data["version"]??""; $interface = $data["interface"]??""; /* @var SwoftRpcServerRouterHandlerMapping $serviceRouter */ $serviceRouter = App::getBean("serviceRouter"); //路由匹配,即向SwoftRpcServerRouterHandlerMapping->$routes獲取RPC服務(wù)信息 $serviceHandler = $serviceRouter->getHandler($interface, $version, $method); // deliver service data $request = $request->withAttribute(self::ATTRIBUTE, $serviceHandler); return $handler->handle($request); } }
Swoft啟動階段會掃描并初始化注解信息(參考注解章節(jié)),注解初始化完畢后會觸發(fā)一個AppEvent::APPLICATION_LOADER事件,此時會將來自@Service的所有RPC的路由信息注冊到SwoftRpcServerRouterHandlerMapping->$routes中,用于serviceRouter Bean的路由匹配。
HandlerAdapterMiddlewareHandlerAdapterMiddleware最終轉(zhuǎn)發(fā)請求給HandlerAdapter處理,HandlerAdapter會使用剛剛RouterMiddleware匹配到的服務(wù)類信息轉(zhuǎn)發(fā)請求并封裝Response最終返回給ServiceDispatcher,ServiceDispatcher會返回TCP流給客戶端然后結(jié)束本次請求。
getAttribute(PackerMiddleware::ATTRIBUTE_DATA); $params = $data["params"] ?? []; //路由解析出來的,處理該請求的服務(wù)Bean和方法 list($serviceClass, $method) = $handler; $service = App::getBean($serviceClass); // execute handler with params $response = PhpHelper::call([$service, $method], $params); $response = ResponseHelper::formatData($response); // 構(gòu)造Response返回客戶端 if (! $response instanceof Response) { $response = (new Response())->withAttribute(self::ATTRIBUTE, $response); } return $response; } }RPC客戶端的實現(xiàn)
在Bean的屬性中聲明@Reference,Swoft即會根據(jù)@var聲明的類型注入相應(yīng)的RPC客戶端實例。
/** * @Reference(name="useraaaaaa") * * @var DemoInterface */ private $demoService;
依賴注入的實現(xiàn)會專門另外用一篇文章多帶帶解釋,這里先看看RPC客戶端的相關(guān)代碼。
遠(yuǎn)程代理namespace SwoftRpcClientService; /** * The proxy of service */ class ServiceProxy { /** * @param string $className * @param string $interfaceClass */ public static function loadProxyClass(string $className, string $interfaceClass) { $reflectionClass = new ReflectionClass($interfaceClass); $reflectionMethods = $reflectionClass->getMethods(ReflectionMethod::IS_PUBLIC); $template = "class $className extends SwoftRpcClientService implements {$interfaceClass} {"; //SwoftRpcClientService::class // the template of methods $template .= self::getMethodsTemplate($reflectionMethods); $template .= "}"; eval($template); } //code ... }
和AOP一樣,原理一樣是使用了動態(tài)代理,更具體的說法是動態(tài)遠(yuǎn)程代理。
RPC動態(tài)客戶端類實現(xiàn)了客戶端聲明的Interface類型(如DemoInterface)并繼承了SwoftRpcClientService類。
動態(tài)類的實現(xiàn)很簡單,對于接口顯式聲明的方法,實際上都是調(diào)用SwoftRpcClientService->call()方法。
interface DemoInterface { /** * @param array $ids * @return array */ public function getUsers(array $ids); }
class 動態(tài)生成RPC客戶端類 extends SwoftRpcClientService implements AppLibDemoInterface { public function getUsers ( array $ids ) { $params = func_get_args(); return $this->call("getUsers", $params); } //code ... }
對于自動生成的defer方法,則是通過魔術(shù)方法__call(),調(diào)用SwoftRpcClientService->deferCall()
/** * @param string $name * @param array $arguments * * @return ResultInterface * @throws RpcClientException */ function __call(string $name, array $arguments) { $method = $name; $prefix = self::DEFER_PREFIX;//"defer" if (strpos($name, $prefix) !== 0) { throw new RpcClientException(sprintf("the method of %s is not exist! ", $name)); } if ($name == $prefix) { $method = array_shift($arguments); } elseif (strpos($name, $prefix) === 0) { $method = lcfirst(ltrim($name, $prefix)); } return $this->deferCall($method, $arguments); }
我們這里只看具有代表性的call()方法,deferCall()大致相同。
RPC客戶端動態(tài)類的本質(zhì)是將客戶端的參數(shù)和接口信息根據(jù)Swoft自己的格式傳遞給RPC服務(wù)端,然后將服務(wù)器返回的數(shù)據(jù)解包取出返回值返回給RPC的調(diào)用者,對外偽裝成一個普通的對象,屏蔽遠(yuǎn)程調(diào)用操作。
// SwoftRpcClientService.php /** * Do call service * * @param string $func * @param array $params * * @throws Throwable * @return mixed */ public function call(string $func, array $params) { $profileKey = $this->interface . "->" . $func; //根據(jù)@reference的fallback屬性獲取降級處理句柄,在RPC服務(wù)調(diào)用失敗的時候可以會使用fallback句柄代替 $fallback = $this->getFallbackHandler($func); try { $connectPool = $this->getPool(); $circuitBreaker = $this->getBreaker(); /* @var $client AbstractServiceConnection */ $client = $connectPool->getConnection(); //數(shù)據(jù)封包,和RPC服務(wù)端一致 $packer = service_packer(); $type = $this->getPackerName(); $data = $packer->formatData($this->interface, $this->version, $func, $params); $packData = $packer->pack($data, $type); //通過熔斷器調(diào)用接口 $result = $circuitBreaker->call([$client, "send"], [$packData], $fallback); if ($result === null || $result === false) { return null; } //和defercall不一致這里直接收包,解包 App::profileStart($profileKey); $result = $client->recv(); App::profileEnd($profileKey); $connectPool->release($client); App::debug(sprintf("%s call %s success, data=%", $this->interface, $func, json_encode($data, JSON_UNESCAPED_UNICODE))); $result = $packer->unpack($result); $data = $packer->checkData($result); } catch (Throwable $throwable) { if (empty($fallback)) { throw $throwable; } //RPC調(diào)用失敗則調(diào)用降級句柄,代替實際RPC服務(wù)直接返回 $data = PhpHelper::call($fallback, $params); } return $data; }熔斷器
熔斷器的swoft-RPC的另一重要概念,RPC的所有請求都通過熔斷器發(fā)送。
熔斷器使用狀態(tài)模式實現(xiàn),熔斷器有開啟,半開,關(guān)閉 3種狀態(tài),不同狀態(tài)下熔斷器會持有不同的狀態(tài)實例,狀態(tài)根據(jù)RPC調(diào)用情況切換,熔斷器根據(jù)持有狀態(tài)實例的不同,行為也有所不同。
circuitBreaker->serviceName . "服務(wù),連接建立失敗(null)"); } if ($class instanceof Client && $class->isConnected() == false) { throw new Exception($this->circuitBreaker->serviceName . "服務(wù),當(dāng)前連接已斷開"); } //調(diào)用swoole協(xié)程客戶端的send()方法發(fā)送數(shù)據(jù) $data = $class->$method(...$params); } catch (Exception $e) { //遞增失敗計數(shù) if ($this->circuitBreaker->isClose()) { $this->circuitBreaker->incFailCount(); } App::error($this->circuitBreaker->serviceName . "服務(wù),當(dāng)前[關(guān)閉狀態(tài)],服務(wù)端調(diào)用失敗,開始服務(wù)降級容錯處理,error=" . $e->getMessage()); //RPC調(diào)用失敗則使用降級接口 $data = $this->circuitBreaker->fallback($fallback); } //失敗次數(shù)過線則切換狀態(tài) $failCount = $this->circuitBreaker->getFailCounter(); $switchToFailCount = $this->circuitBreaker->getSwitchToFailCount(); if ($failCount >= $switchToFailCount && $this->circuitBreaker->isClose()) { App::trace($this->circuitBreaker->serviceName . "服務(wù),當(dāng)前[關(guān)閉狀態(tài)],服務(wù)失敗次數(shù)達(dá)到上限,開始切換為開啟狀態(tài),failCount=" . $failCount); $this->circuitBreaker->switchToOpenState(); } App::trace($this->circuitBreaker->serviceName . "服務(wù),當(dāng)前[關(guān)閉狀態(tài)],failCount=" . $this->circuitBreaker->getFailCounter()); return $data; } }熔斷器開啟狀態(tài)策略
circuitBreaker->fallback(); App::trace($this->getServiceName() . "服務(wù),當(dāng)前[開啟狀態(tài)],執(zhí)行服務(wù)fallback服務(wù)降級容錯處理"); $nowTime = time(); if ($this->circuitBreaker->isOpen() && $nowTime > $this->circuitBreaker->getSwitchOpenToHalfOpenTime() ) { $delayTime = $this->circuitBreaker->getDelaySwitchTimer(); // swoole定時器不是嚴(yán)格的,3s容錯時間 ,定時切換狀態(tài)的半開 $switchToHalfStateTime = $nowTime + ($delayTime / 1000) + 3; App::getTimer()->addAfterTimer("openState", $delayTime, [$this, "delayCallback"]); $this->circuitBreaker->setSwitchOpenToHalfOpenTime($switchToHalfStateTime); App::trace($this->getServiceName() . "服務(wù),當(dāng)前[開啟狀態(tài)],創(chuàng)建延遲觸發(fā)器,一段時間后狀態(tài)切換為半開狀態(tài)"); } return $data; } }熔斷器半開狀態(tài)策略
半開熔斷器是熔斷器關(guān)閉狀態(tài)和熔斷器開啟狀態(tài)的過度狀態(tài),半開熔斷器的所有RPC調(diào)用都是加鎖的,連續(xù)成功或者連續(xù)失敗到閾值后會切換到關(guān)閉狀態(tài)或者開啟狀態(tài),代碼類似,此處不再累述,有興趣的讀者可以自行研究。
Swoft源碼剖析系列目錄:https://segmentfault.com/a/11...
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/28694.html
摘要:和服務(wù)關(guān)系最密切的進(jìn)程是中的進(jìn)程組,絕大部分業(yè)務(wù)處理都在該進(jìn)程中進(jìn)行。隨后觸發(fā)一個事件各組件通過該事件進(jìn)行配置文件加載路由注冊。事件每個請求到來時僅僅會觸發(fā)事件。服務(wù)器生命周期和服務(wù)基本一致,詳情參考源碼剖析功能實現(xiàn) 作者:bromine鏈接:https://www.jianshu.com/p/4c0...來源:簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對原文進(jìn)行了重新的排版。S...
摘要:作者鏈接來源簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對原文進(jìn)行了重新的排版。同時順手整理個人對源碼的相關(guān)理解,希望能夠稍微填補學(xué)習(xí)領(lǐng)域的空白。系列文章只會節(jié)選關(guān)鍵代碼輔以思路講解,請自行配合源碼閱讀。 作者:bromine鏈接:https://www.jianshu.com/p/2f6...來源:簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對原文進(jìn)行了重新的排版。Swoft...
摘要:作者鏈接來源簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對原文進(jìn)行了重新的排版。前言為應(yīng)用提供一個完整的容器作為依賴管理方案,是功能,模塊等功能的實現(xiàn)基礎(chǔ)。的依賴注入管理方案基于服務(wù)定位器。源碼剖析系列目錄 作者:bromine鏈接:https://www.jianshu.com/p/a23...來源:簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對原文進(jìn)行了重新的排版。Swof...
摘要:作為定時任務(wù)的執(zhí)行者,通過每喚醒自身一次,然后把執(zhí)行表遍歷一次,挑選當(dāng)下需要執(zhí)行的任務(wù),通過投遞出去并更新該任務(wù)執(zhí)行表中的狀態(tài)。 作者:bromine鏈接:https://www.jianshu.com/p/b44...來源:簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對原文進(jìn)行了重新的排版。Swoft Github: https://github.com/swoft-clou.....
摘要:基于擴(kuò)展實現(xiàn)真正的數(shù)據(jù)庫連接池這種方案中,項目占用的連接數(shù)僅僅為。一種是連接暫時不再使用,其占用狀態(tài)解除,可以從使用者手中交回到空閑隊列中這種我們稱為連接的歸隊。源碼剖析系列目錄 作者:bromine鏈接:https://www.jianshu.com/p/1a7...來源:簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對原文進(jìn)行了重新的排版。Swoft Github: https:...
閱讀 2947·2023-04-25 22:16
閱讀 2093·2021-10-11 11:11
閱讀 3247·2019-08-29 13:26
閱讀 593·2019-08-29 12:32
閱讀 3409·2019-08-26 11:49
閱讀 2987·2019-08-26 10:30
閱讀 1939·2019-08-23 17:59
閱讀 1507·2019-08-23 17:57