摘要:前言此博客所述項(xiàng)目代碼已在開(kāi)源歡迎大家一起貢獻(xiàn)點(diǎn)此進(jìn)入最近一次寫(xiě)博客還是年底謝謝大家持久以來(lái)的關(guān)注本篇博文將會(huì)教大家如何從到搭建一個(gè)簡(jiǎn)單高效且拓展性強(qiáng)的框架什么是相信大家都或多或少使用過(guò)框架比如阿里的谷歌的的等等那么究竟什么是翻譯成中文
Cool-Rpc 前言
此博客所述項(xiàng)目代碼已在github開(kāi)源,歡迎大家一起貢獻(xiàn)!
點(diǎn)此進(jìn)入:Cool-RPC
最近一次寫(xiě)博客還是17年底,謝謝大家持久以來(lái)的關(guān)注
本篇博文將會(huì)教大家如何從0到1,搭建一個(gè)簡(jiǎn)單、高效且拓展性強(qiáng)的rpc框架.
相信大家都或多或少使用過(guò)RPC框架,比如阿里的Dubbo、谷歌的grpc、Facebook的Thrift等等
那么究竟什么是rpc?
rpc翻譯成中文叫做遠(yuǎn)程過(guò)程調(diào)用,通俗易懂點(diǎn):將單應(yīng)用架構(gòu)成分布式系統(tǒng)架構(gòu)后,多個(gè)系統(tǒng)間數(shù)據(jù)怎么交互,這就是rpc的職責(zé).
從服務(wù)的角度來(lái)看,rpc分為服務(wù)提供者(provider)和服務(wù)消費(fèi)者(consumer)兩大類(lèi),中間會(huì)有一些共用java接口,叫做開(kāi)放api接口
也就是說(shuō),接口服務(wù)實(shí)現(xiàn)類(lèi)所處的地方叫做provider,接口服務(wù)調(diào)用類(lèi)所處的地方叫consumer
因?yàn)樘幱诜植际江h(huán)境中,那consumer調(diào)用provider時(shí),如何知道對(duì)方服務(wù)器的IP和開(kāi)放端口呢?
這時(shí)需要一個(gè)組件叫做注冊(cè)中心,consumer通過(guò)服務(wù)名后,去注冊(cè)中心上查找該服務(wù)的IP+Port,拿到地址數(shù)據(jù)后,再去請(qǐng)求該地址的服務(wù)
如圖:
Cool-Rpc技術(shù)簡(jiǎn)介此項(xiàng)目基于傳輸層(TCP/IP協(xié)議)進(jìn)行通訊,傳輸層框架使用netty編寫(xiě),github上會(huì)有mina版本
提供多套序列化框架,默認(rèn)使用Protostuff序列化,可配置使用java序列化等
注冊(cè)中心默認(rèn)zookeeper,可配置使用redis(只要有節(jié)點(diǎn)數(shù)據(jù)存儲(chǔ)和消息通知功能的組件即可)
consumer通過(guò)java動(dòng)態(tài)代理的方式使用執(zhí)行遠(yuǎn)程調(diào)用
將所要執(zhí)行的類(lèi)名,方法,參數(shù)等通知provider,之后provider拿著數(shù)據(jù)調(diào)用本地實(shí)現(xiàn)類(lèi),將處理后得到的結(jié)果通知給consumer
廢話了那么多,開(kāi)始上干貨,建議大家從github克隆完整代碼,本篇博文只講重點(diǎn)代碼
注冊(cè)中心以api接口名為key,IP+Port為value,將數(shù)據(jù)持久化,以供消費(fèi)者查詢調(diào)用
以zookeeper為例:
為了更靈活地實(shí)現(xiàn)服務(wù)注冊(cè)者和發(fā)現(xiàn)者,這里添加一個(gè)注冊(cè)中心適配器
public abstract class ServiceCenterAdapter implements ServiceCenter{ String host; int port = 0; String passWord; ServiceCenterAdapter(){} ServiceCenterAdapter(String host){ this.host = host; } ServiceCenterAdapter(String host, int port) { this.host = host; this.port = port; } @Override public String discover(String serviceName) { return null; } @Override public void register(String serviceName, String serviceAddress) {} @Override public void setHost(String host){ this.host = host; }; @Override public void setPort(int port){ this.port = port; }; @Override public void setPassWord(String passWord){ this.passWord = passWord; }; //獲取 IP:端口 @Override public String getAddress(){ if ("".equals(host) || host == null || port == 0){ throw new RuntimeException("the zookeeper host or port error"); } return host+":"+String.valueOf(port); }; }
zookeeper的服務(wù)注冊(cè)(provider使用):
在實(shí)際項(xiàng)目中,需要構(gòu)造此類(lèi),并注入相應(yīng)的IP和端口,最后以bean的形式注入到IOC容器中
public class ZooKeeperServiceRegistry extends ServiceCenterAdapter { private static final Logger log = LoggerFactory.getLogger(ZooKeeperServiceRegistry.class); private ZkClient zkClient; { this.port = 2181; zkClient = new ZkClient(getAddress(), CoolConstant.ZK_SESSION_TIMEOUT, CoolConstant.ZK_CONNECTION_TIMEOUT); log.info("connect zookeeper"); } public ZooKeeperServiceRegistry(String zkHost) { super(zkHost); } public ZooKeeperServiceRegistry(String zkHost, int zkPort) { super(zkHost, zkPort); } // 注冊(cè)服務(wù) serviceName=接口名 serviceAddress=IP+Port @Override public void register(String serviceName, String serviceAddress) { // create cool node permanent String registryPath = CoolConstant.ZK_REGISTRY_PATH; if (!zkClient.exists(registryPath)) { zkClient.createPersistent(registryPath); log.info("create registry node: {}", registryPath); } // create service node permanent String servicePath = registryPath + "/" + serviceName; if (!zkClient.exists(servicePath)) { zkClient.createPersistent(servicePath); log.info("create service node: {}", servicePath); } // create service address node temp String addressPath = servicePath + "/address-"; String addressNode = zkClient.createEphemeralSequential(addressPath, serviceAddress); log.info("create address node: {}", addressNode); } }
zookeeper的服務(wù)發(fā)現(xiàn)者(consumer使用):
同上,也需要配置相應(yīng)的IP和端口,并以bean注入到項(xiàng)目ioc容器中
public class ZooKeeperServiceDiscovery extends ServiceCenterAdapter { private static final Logger log = LoggerFactory.getLogger(ZooKeeperServiceDiscovery.class); { super.port = 2181; } public ZooKeeperServiceDiscovery(){}; public ZooKeeperServiceDiscovery(String zkHost){ super(zkHost); } public ZooKeeperServiceDiscovery(String zkHost, int zkPort){ super(zkHost, zkPort); } // 服務(wù)發(fā)現(xiàn) name=api接口名 @Override public String discover(String name) { ZkClient zkClient = new ZkClient(getAddress(), CoolConstant.ZK_SESSION_TIMEOUT, CoolConstant.ZK_CONNECTION_TIMEOUT); log.debug("connect zookeeper"); try { String servicePath = CoolConstant.ZK_REGISTRY_PATH + "/" + name; if (!zkClient.exists(servicePath)) { throw new RuntimeException(String.format("can not find any service node on path: %s", servicePath)); } List服務(wù)端TCP處理器addressList = zkClient.getChildren(servicePath); if (addressList.size() == 0) { throw new RuntimeException(String.format("can not find any address node on path: %s", servicePath)); } String address; int size = addressList.size(); if (size == 1) { address = addressList.get(0); log.debug("get only address node: {}", address); } else { address = addressList.get(ThreadLocalRandom.current().nextInt(size)); log.debug("get random address node: {}", address); } String addressPath = servicePath + "/" + address; return zkClient.readData(addressPath); } finally { zkClient.close(); } } }
此篇博文的TCP數(shù)據(jù)(包括編解碼器、處理器)全部以netty編寫(xiě)
服務(wù)端的netty引導(dǎo)類(lèi):
public class CoolRpcServer implements ApplicationContextAware { private static Logger log = LoggerFactory.getLogger(CoolRpcServer.class); private Channel channel; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ServerBootstrap bootstrap; private HandlerInitializer handlerInitializer; private ServiceCenter serviceRegistry; private String serviceIP; private int port; public static MapservicesMap ; { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); bootstrap = new ServerBootstrap(); handlerInitializer = new HandlerInitializer(); servicesMap = new HashMap<>(16); } public CoolRpcServer(ServiceCenter serviceRegistry, String serviceIP, int port){ this.serviceRegistry = serviceRegistry; this.serviceIP = serviceIP; this.port = port; } /** * start and init tcp server if ioc contain is booting */ @SuppressWarnings("unchecked") public void initServer() throws InterruptedException { bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(handlerInitializer); bootstrap.option(ChannelOption.SO_BACKLOG, 128); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // the most send bytes ( 256KB ) bootstrap.childOption(ChannelOption.SO_SNDBUF, 1024 * 256); // the most receive bytes ( 2048KB ) bootstrap.childOption(ChannelOption.SO_RCVBUF, 1024 * 1024 * 2); channel = bootstrap.bind(serviceIP,port).sync().channel(); if (servicesMap != null && servicesMap.size() > 0){ for (String beanName: servicesMap.keySet()){ serviceRegistry.register(beanName, serviceIP + ":" + String.valueOf(port)); log.info("register service name = {}", beanName); } } log.info("TCP server started successfully, port:{}", port); channel.closeFuture().sync(); } /** * close ioc contain and stop tcp server */ public void stopServer(){ if (channel != null && channel.isActive()) { channel.close(); } if (bossGroup != null) { bossGroup.shutdownGracefully(); } if (workerGroup != null) { workerGroup.shutdownGracefully(); } log.info("TCP server stopped successfully, port: {}", port); } /** * scan Annotation of CoolService */ @Override public void setApplicationContext(ApplicationContext ctx) throws BeansException { Map beans = ctx.getBeansWithAnnotation(CoolService.class); if (beans != null && beans.size()>0){ for (Object bean : beans.values()){ String name = bean.getClass().getAnnotation(CoolService.class).value().getName(); servicesMap.put(name, bean); } } } }
此項(xiàng)目的開(kāi)放api接口實(shí)現(xiàn)類(lèi)需要用@CoolService注解標(biāo)識(shí),服務(wù)端容器啟動(dòng)時(shí),會(huì)掃描所有帶有此注解的實(shí)現(xiàn)類(lèi),并注入到注冊(cè)中心
服務(wù)端處理器(netty handler):
@ChannelHandler.Sharable public class CoolServerHandler extends ChannelInboundHandlerAdapter { private static Logger log = LoggerFactory.getLogger(CoolServerHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CoolResponse response = new CoolResponse(); CoolRequest request = (CoolRequest) msg; try { Object result = invoke(request); response.setRequestID(request.getRequestID()); response.setResult(result); } catch (Throwable error) { response.setError(error); } ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } private Object invoke(CoolRequest request) throws Throwable{ if (request == null){ throw new Throwable("cool rpc request not found"); } String className = request.getClassName(); String methodName = request.getMethodName(); Object[] parameters = request.getParameters(); Object service = CoolRpcServer.servicesMap.get(className); if (service == null){ throw new Throwable("cool rpc service not exist"); } Class> serviceClass = service.getClass(); Class>[] parameterTypes = request.getParameterTypes(); FastClass fastClass = FastClass.create(serviceClass); FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes); return fastMethod.invoke(service, parameters); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("server caught exception", cause); ctx.close(); } }
將客戶端傳輸過(guò)來(lái)的請(qǐng)求數(shù)據(jù)(類(lèi)名,方法,參數(shù))在本地以cglib的方式反射調(diào)用
調(diào)用成功后,將處理完畢的結(jié)果編碼返回給客戶端,并且關(guān)閉TCP連接
consumer只有api接口,并沒(méi)有其實(shí)現(xiàn)類(lèi),所以我們可以用java動(dòng)態(tài)代理的方式去自定義方法實(shí)現(xiàn),代理的方法實(shí)現(xiàn)便是建立TCP握手連接,有provider來(lái)執(zhí)行方法,將得到的結(jié)果返回給代理類(lèi),由此造成一種單憑接口就能調(diào)用實(shí)現(xiàn)類(lèi)方法的假象
第一步: 使用java動(dòng)態(tài)代理new出代理對(duì)象
public class CoolProxy { private static Logger log = LoggerFactory.getLogger(CoolProxy.class); private ServiceCenter serviceDiscovery; public CoolProxy(ServiceCenter serviceDiscovery){ this.serviceDiscovery = serviceDiscovery; } @SuppressWarnings("unchecked") publicT getInstance(Class cls){ return (T)Proxy.newProxyInstance(cls.getClassLoader(), new Class>[]{cls}, (proxy, method, args) -> { CoolRequest request = new CoolRequest(); request.setRequestID(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameters(args); request.setParameterTypes(method.getParameterTypes()); String[] addr = serviceDiscovery.discover(cls.getName()).split(":",2); CoolRpcClient client = new CoolRpcClient(addr[0], Integer.parseInt(addr[1])); CoolResponse response = client.send(request); if (response.getError()!=null){ throw response.getError(); } else { return response.getResult(); } }); } }
第二步: 在代理方法中,使用遠(yuǎn)程過(guò)程調(diào)用(rpc)
客戶端引導(dǎo)類(lèi):
public class CoolRpcClient { private static Logger log = LoggerFactory.getLogger(CoolRpcClient.class); private CountDownLatch countDownLatch; private EventLoopGroup group; private Bootstrap bootstrap; private CoolResponse response; private String serviceIP; private int port; { response = new CoolResponse(); countDownLatch = new CountDownLatch(1); group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); } public CoolRpcClient(String serviceIP, int port){ this.serviceIP = serviceIP; this.port = port; } public CoolResponse send(CoolRequest request){ try { bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new CoolRpcDecoder(CoolResponse.class)) .addLast(new CoolRpcEncoder(CoolRequest.class)) .addLast(new CoolClientHandler(countDownLatch, response)); } }); bootstrap.option(ChannelOption.TCP_NODELAY, true); Channel channel = bootstrap.connect(serviceIP, port).sync().channel(); channel.writeAndFlush(request).sync(); countDownLatch.await(); channel.closeFuture().sync(); return response; } catch (Exception e){ e.printStackTrace(); return null; } finally { group.shutdownGracefully(); } } }
客戶端處理器(handler):
@ChannelHandler.Sharable public class CoolClientHandler extends ChannelInboundHandlerAdapter { private static Logger log = LoggerFactory.getLogger(CoolClientHandler.class); private CountDownLatch latch; private CoolResponse response; public CoolClientHandler(CountDownLatch latch, CoolResponse response){ this.latch = latch; this.response = response; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CoolResponse enResponse = (CoolResponse) msg; this.response.sync(enResponse); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { latch.countDown(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("api caught exception", cause); ctx.close(); } }
最后使用CountDownLatch同步通知調(diào)用者,rpc調(diào)用完畢
結(jié)束語(yǔ)以上便是Cool-Rpc的簡(jiǎn)單講解,如有更好的想法請(qǐng)聯(lián)系我
熱烈歡迎大家一起維護(hù)此項(xiàng)目Cool-RPC
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/76893.html
摘要:在文章微服務(wù)調(diào)用鏈追蹤中心搭建一文中模擬出來(lái)的調(diào)用鏈就是一個(gè)遠(yuǎn)程調(diào)用的例子,只不過(guò)這篇文章里是通過(guò)這種同步調(diào)用方式,利用的是協(xié)議在應(yīng)用層完成的,這種方法雖然奏效,但有時(shí)效率并不高。 showImg(https://segmentfault.com/img/remote/1460000014858219); 一、概述 RPC(Remote Procedure Call)即 遠(yuǎn)程過(guò)程調(diào)...
摘要:在文章微服務(wù)調(diào)用鏈追蹤中心搭建一文中模擬出來(lái)的調(diào)用鏈就是一個(gè)遠(yuǎn)程調(diào)用的例子,只不過(guò)這篇文章里是通過(guò)這種同步調(diào)用方式,利用的是協(xié)議在應(yīng)用層完成的,這種方法雖然奏效,但有時(shí)效率并不高。 showImg(https://segmentfault.com/img/remote/1460000014858219); 一、概述 RPC(Remote Procedure Call)即 遠(yuǎn)程過(guò)程調(diào)...
摘要:與文章框架實(shí)踐之一文中實(shí)踐的另一種通用框架能通過(guò)自動(dòng)生成對(duì)應(yīng)語(yǔ)言的接口類(lèi)似,也能自動(dòng)地生成和的存根,我們只需要一個(gè)命令就能快速搭建起運(yùn)行環(huán)境。類(lèi)似于之前對(duì)于框架的實(shí)踐步驟,下面一一闡述。 showImg(https://segmentfault.com/img/remote/1460000014946557); 概述 gRPC是Google開(kāi)源的通用高性能RPC框架,它支持的是使用P...
摘要:與文章框架實(shí)踐之一文中實(shí)踐的另一種通用框架能通過(guò)自動(dòng)生成對(duì)應(yīng)語(yǔ)言的接口類(lèi)似,也能自動(dòng)地生成和的存根,我們只需要一個(gè)命令就能快速搭建起運(yùn)行環(huán)境。類(lèi)似于之前對(duì)于框架的實(shí)踐步驟,下面一一闡述。 showImg(https://segmentfault.com/img/remote/1460000014946557); 概述 gRPC是Google開(kāi)源的通用高性能RPC框架,它支持的是使用P...
Github 地址:https://github.com/Snailclimb/springboot-integration-examples ,歡迎各位 Star。 目錄: 使用 SpringBoot+Dubbo 搭建一個(gè)簡(jiǎn)單分布式服務(wù) 實(shí)戰(zhàn)之前,先來(lái)看幾個(gè)重要的概念 什么是分布式? 什么是 Duboo? Dubbo 架構(gòu) 什么是 RPC? 為什么要用 Dubbo? 開(kāi)始實(shí)戰(zhàn) 1 ...
閱讀 2771·2021-10-11 11:08
閱讀 1489·2021-09-30 09:48
閱讀 1049·2021-09-22 15:29
閱讀 1037·2019-08-30 15:54
閱讀 976·2019-08-29 15:19
閱讀 527·2019-08-29 13:12
閱讀 3161·2019-08-26 13:53
閱讀 958·2019-08-26 13:28