摘要:是一個(gè)分布式服務(wù)框架,以及治理方案。手寫注意要點(diǎn)手寫注意要點(diǎn)基于上文中對于協(xié)議的理解,如果我們自己去實(shí)現(xiàn),需要考慮哪些技術(shù)呢其實(shí)基于圖的整個(gè)流程應(yīng)該有一個(gè)大概的理解?;谑謱憣?shí)現(xiàn)基于手寫實(shí)現(xiàn)理解了協(xié)議后,我們基于來實(shí)現(xiàn)一個(gè)通信框架。
閱讀這篇文章之前,建議先閱讀和這篇文章關(guān)聯(lián)的內(nèi)容。
[1]詳細(xì)剖析分布式微服務(wù)架構(gòu)下網(wǎng)絡(luò)通信的底層實(shí)現(xiàn)原理(圖解)
[2][年薪60W的技巧]工作了5年,你真的理解Netty以及為什么要用嗎?(深度干貨)
[3]深度解析Netty中的核心組件(圖解+實(shí)例)
[4]BAT面試必問細(xì)節(jié):關(guān)于Netty中的ByteBuf詳解
[5]通過大量實(shí)戰(zhàn)案例分解Netty中是如何解決拆包黏包問題的?
[6]基于Netty實(shí)現(xiàn)自定義消息通信協(xié)議(協(xié)議設(shè)計(jì)及解析應(yīng)用實(shí)戰(zhàn))
[7]全網(wǎng)最詳細(xì)最齊全的序列化技術(shù)及深度解析與應(yīng)用實(shí)戰(zhàn)
在前面的內(nèi)容中,我們已經(jīng)由淺入深的理解了Netty的基礎(chǔ)知識和實(shí)現(xiàn)原理,相信大家已經(jīng)對Netty有了一個(gè)較為全面的理解。那么接下來,我們通過一個(gè)手寫RPC通信的實(shí)戰(zhàn)案例來帶大家了解Netty的實(shí)際應(yīng)用。
為什么要選擇RPC來作為實(shí)戰(zhàn)呢?因?yàn)镹etty本身就是解決通信問題,而在實(shí)際應(yīng)用中,RPC協(xié)議框架是我們接觸得最多的一種,所以這個(gè)實(shí)戰(zhàn)能讓大家了解到Netty實(shí)際應(yīng)用之外,還能理解RPC的底層原理。
RPC全稱為(Remote Procedure Call),是一種通過網(wǎng)絡(luò)從遠(yuǎn)程計(jì)算機(jī)程序上請求服務(wù),而不需要了解底層網(wǎng)絡(luò)技術(shù)的協(xié)議,簡單理解就是讓開發(fā)者能夠像調(diào)用本地服務(wù)一樣調(diào)用遠(yuǎn)程服務(wù)。
既然是協(xié)議,那么它必然有協(xié)議的規(guī)范,如圖6-1所示。
為了達(dá)到“讓開發(fā)者能夠像調(diào)用本地服務(wù)那樣調(diào)用遠(yuǎn)程服務(wù)”的目的,RPC協(xié)議需像圖6-1那樣實(shí)現(xiàn)遠(yuǎn)程交互。
凡是滿足RPC協(xié)議的框架,我們成為RPC框架,在實(shí)際開發(fā)中,我們可以使用開源且相對成熟的RPC框架解決微服務(wù)架構(gòu)下的遠(yuǎn)程通信問題,常見的rpc框架:
基于上文中對于RPC協(xié)議的理解,如果我們自己去實(shí)現(xiàn),需要考慮哪些技術(shù)呢? 其實(shí)基于圖6-1的整個(gè)流程應(yīng)該有一個(gè)大概的理解。
理解了RPC協(xié)議后,我們基于Netty來實(shí)現(xiàn)一個(gè)RPC通信框架。
代碼詳見附件 netty-rpc-example
需要引入的jar包:
org.springframework.boot spring-boot-starter org.projectlombok lombok com.alibaba fastjson 1.2.72 io.netty netty-all
模塊依賴關(guān)系:
provider依賴 netty-rpc-protocol和netty-rpc-api
cosumer依賴 netty-rpc-protocol和netty-rpc-api
public interface IUserService { String saveUser(String name);}
@Service@Slf4jpublic class UserServiceImpl implements IUserService { @Override public String saveUser(String name) { log.info("begin saveUser:"+name); return "Save User Success!"; }}
注意,在當(dāng)前步驟中,描述了case的部分,暫時(shí)先不用加,后續(xù)再加上
@ComponentScan(basePackages = {"com.example.spring","com.example.service"}) //case1(后續(xù)再加上)@SpringBootApplicationpublic class NettyRpcProviderMain { public static void main(String[] args) throws Exception { SpringApplication.run(NettyRpcProviderMain.class, args); new NettyServer("127.0.0.1",8080).startNettyServer(); //case2(后續(xù)再加上) }}
開始寫通信協(xié)議模塊,這個(gè)模塊主要做幾個(gè)事情
之前我們講過自定義消息協(xié)議,我們在這里可以按照下面這個(gè)協(xié)議格式來定義好。
/* +----------------------------------------------+ | 魔數(shù) 2byte | 序列化算法 1byte | 請求類型 1byte | +----------------------------------------------+ | 消息 ID 8byte | 數(shù)據(jù)長度 4byte | +----------------------------------------------+ */
@AllArgsConstructor@Datapublic class Header implements Serializable { /* +----------------------------------------------+ | 魔數(shù) 2byte | 序列化算法 1byte | 請求類型 1byte | +----------------------------------------------+ | 消息 ID 8byte | 數(shù)據(jù)長度 4byte | +----------------------------------------------+ */ private short magic; //魔數(shù)-用來驗(yàn)證報(bào)文的身份(2個(gè)字節(jié)) private byte serialType; //序列化類型(1個(gè)字節(jié)) private byte reqType; //操作類型(1個(gè)字節(jié)) private long requestId; //請求id(8個(gè)字節(jié)) private int length; //數(shù)據(jù)長度(4個(gè)字節(jié))}
@Datapublic class RpcRequest implements Serializable { private String className; private String methodName; private Object[] params; private Class>[] parameterTypes;}
@Datapublic class RpcResponse implements Serializable { private Object data; private String msg;}
@Datapublic class RpcProtocol implements Serializable { private Header header; private T content;}
上述消息協(xié)議定義中,涉及到幾個(gè)枚舉相關(guān)的類,定義如下
消息類型
public enum ReqType { REQUEST((byte)1), RESPONSE((byte)2), HEARTBEAT((byte)3); private byte code; private ReqType(byte code) { this.code=code; } public byte code(){ return this.code; } public static ReqType findByCode(int code) { for (ReqType msgType : ReqType.values()) { if (msgType.code() == code) { return msgType; } } return null; }}
序列化類型
public enum SerialType { JSON_SERIAL((byte)0), JAVA_SERIAL((byte)1); private byte code; SerialType(byte code) { this.code=code; } public byte code(){ return this.code; }}
public class RpcConstant { //header部分的總字節(jié)數(shù) public final static int HEAD_TOTAL_LEN=16; //魔數(shù) public final static short MAGIC=0xca;}
這里演示兩種,一種是JSON方式,另一種是Java原生的方式
public interface ISerializer { byte[] serialize(T obj); T deserialize(byte[] data,Class clazz); byte getType();}
public class JavaSerializer implements ISerializer{ @Override public byte[] serialize(T obj) { ByteArrayOutputStream byteArrayOutputStream= new ByteArrayOutputStream(); try { ObjectOutputStream outputStream= new ObjectOutputStream(byteArrayOutputStream); outputStream.writeObject(obj); return byteArrayOutputStream.toByteArray(); } catch (IOException e) { e.printStackTrace(); } return new byte[0]; } @Override public T deserialize(byte[] data, Class clazz) { ByteArrayInputStream byteArrayInputStream=new ByteArrayInputStream(data); try { ObjectInputStream objectInputStream= new ObjectInputStream(byteArrayInputStream); return (T) objectInputStream.readObject(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } return null; } @Override public byte getType() { return SerialType.JAVA_SERIAL.code(); }}
public class JsonSerializer implements ISerializer{ @Override public byte[] serialize(T obj) { return JSON.toJSONString(obj).getBytes(); } @Override public T deserialize(byte[] data, Class clazz) { return JSON.parseObject(new String(data),clazz); } @Override public byte getType() { return SerialType.JSON_SERIAL.code(); }}
實(shí)現(xiàn)對序列化機(jī)制的管理
public class SerializerManager { private final static ConcurrentHashMap serializers=new ConcurrentHashMap(); static { ISerializer jsonSerializer=new JsonSerializer(); ISerializer javaSerializer=new JavaSerializer(); serializers.put(jsonSerializer.getType(),jsonSerializer); serializers.put(javaSerializer.getType(),javaSerializer); } public static ISerializer getSerializer(byte key){ ISerializer serializer=serializers.get(key); if(serializer==null){ return new JavaSerializer(); } return serializer; }}
由于自定義了消息協(xié)議,所以 需要自己實(shí)現(xiàn)編碼和解碼,代碼如下
@Slf4jpublic class RpcDecoder extends ByteToMessageDecoder { /* +----------------------------------------------+ | 魔數(shù) 2byte | 序列化算法 1byte | 請求類型 1byte | +----------------------------------------------+ | 消息 ID 8byte | 數(shù)據(jù)長度 4byte | +----------------------------------------------+ */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
@Slf4jpublic class RpcEncoder extends MessageToByteEncoder> { /* +----------------------------------------------+ | 魔數(shù) 2byte | 序列化算法 1byte | 請求類型 1byte | +----------------------------------------------+ | 消息 ID 8byte | 數(shù)據(jù)長度 4byte | +----------------------------------------------+ */ @Override protected void encode(ChannelHandlerContext ctx, RpcProtocol
實(shí)現(xiàn)NettyServer構(gòu)建。
@Slf4jpublic class NettyServer{ private String serverAddress; //地址 private int serverPort; //端口 public NettyServer(String serverAddress, int serverPort) { this.serverAddress = serverAddress; this.serverPort = serverPort; } public void startNettyServer() throws Exception { log.info("begin start Netty Server"); EventLoopGroup bossGroup=new NioEventLoopGroup(); EventLoopGroup workGroup=new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new RpcServerInitializer()); ChannelFuture channelFuture = bootstrap.bind(this.serverAddress, this.serverPort).sync(); log.info("Server started Success on Port:{}", this.serverPort); channelFuture.channel().closeFuture().sync(); }catch (Exception e){ log.error("Rpc Server Exception",e); }finally { workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }}
public class RpcServerInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,12,4,0,0)) .addLast(new RpcDecoder()) .addLast(new RpcEncoder()) .addLast(new RpcServerHandler()); }}
public class RpcServerHandler extends SimpleChannelInboundHandler> { @Override protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol msg) throws Exception { RpcProtocol resProtocol=new RpcProtocol<>(); Header header=msg.getHeader(); header.setReqType(ReqType.RESPONSE.code()); Object result=invoke(msg.getContent()); resProtocol.setHeader(header); RpcResponse response=new RpcResponse(); response.setData(result); response.setMsg("success"); resProtocol.setContent(response); ctx.writeAndFlush(resProtocol); } private Object invoke(RpcRequest request){ try { Class> clazz=Class.forName(request.getClassName()); Object bean= SpringBeansManager.getBean(clazz); //獲取實(shí)例對象(CASE) Method declaredMethod=clazz.getDeclaredMethod(request.getMethodName(),request.getParameterTypes()); return declaredMethod.invoke(bean,request.getParams()); } catch (ClassNotFoundException | NoSuchMethodException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } return null; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); }}
@Componentpublic class SpringBeansManager implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringBeansManager.applicationContext=applicationContext; } public static T getBean(Class clazz){ return applicationContext.getBean(clazz); }}
需要注意,這個(gè)類的構(gòu)建好之后,需要在netty-rpc-provider模塊的main方法中增加compone-scan進(jìn)行掃描
@ComponentScan(basePackages = {"com.example.spring","com.example.service"}) //修改這里@SpringBootApplicationpublic class NettyRpcProviderMain { public static void main(String[] args) throws Exception { SpringApplication.run(NettyRpcProviderMain.class, args); new NettyServer("127.0.0.1",8080).startNettyServer(); // 修改這里 }}
接下來開始實(shí)現(xiàn)消費(fèi)端
public class RpcClientProxy { public T clientProxy(final Class interfaceCls,final String host,final int port){ return (T) Proxy.newProxyInstance (interfaceCls.getClassLoader(), new Class>[]{interfaceCls}, new RpcInvokerProxy(host,port)); }}
@Slf4jpublic class RpcInvokerProxy implements InvocationHandler { private String serviceAddress; private int servicePort; public RpcInvokerProxy(String serviceAddress, int servicePort) { this.serviceAddress = serviceAddress; this.servicePort = servicePort; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { log.info("begin invoke target server"); //組裝參數(shù) RpcProtocol protocol=new RpcProtocol<>(); long requestId= RequestHolder.REQUEST_ID.incrementAndGet(); Header header=new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(), ReqType.REQUEST.code(),requestId,0); protocol.setHeader(header); RpcRequest request=new RpcRequest(); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParams(args); protocol.setContent(request); //發(fā)送請求 NettyClient nettyClient=new NettyClient(serviceAddress,servicePort); //構(gòu)建異步數(shù)據(jù)處理 RpcFuture future=new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop())); RequestHolder.REQUEST_MAP.put(requestId,future); nettyClient.sendRequest(protocol); return future.getPromise().get().getData(); }}
在netty-rpc-protocol這個(gè)模塊的protocol包路徑下,創(chuàng)建NettyClient
@Slf4jpublic class NettyClient { private final Bootstrap bootstrap; private final EventLoopGroup eventLoopGroup=new NioEventLoopGroup(); private String serviceAddress; private int servicePort; public NettyClient(String serviceAddress,int servicePort){ log.info("begin init NettyClient"); bootstrap=new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new RpcClientInitializer()); this.serviceAddress=serviceAddress; this.servicePort=servicePort; } public void sendRequest(RpcProtocol protocol) throws InterruptedException { ChannelFuture future=bootstrap.connect(this.serviceAddress,this.servicePort).sync(); future.addListener(listener->{ if(future.isSuccess()){ log.info("connect rpc server {} success.",this.serviceAddress); }else{ log.error("connect rpc server {} failed .",this.serviceAddress); future.cause().printStackTrace(); eventLoopGroup.shutdownGracefully(); } }); log.info("begin transfer data"); future.channel().writeAndFlush(protocol); }}
@Slf4jpublic class RpcClientInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { log.info("begin initChannel"); ch.pipeline() .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,12,4,0,0)) .addLast(new LoggingHandler()) .addLast(new RpcEncoder()) .addLast(new RpcDecoder()) .addLast(new RpcClientHandler()); }}
需要注意,Netty的通信過程是基于入站出站分離的,所以在獲取結(jié)果時(shí),我們需要借助一個(gè)Future對象來完成。
@Slf4jpublic class RpcClientHandler extends SimpleChannelInboundHandler> { @Override protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol msg) throws Exception { log.info("receive rpc server result"); long requestId=msg.getHeader().getRequestId(); RpcFuture future=RequestHolder.REQUEST_MAP.remove(requestId); future.getPromise().setSuccess(msg.getContent()); //返回結(jié)果 }}
在netty-rpc-protocol模塊中添加rpcFuture實(shí)現(xiàn)
@Datapublic class RpcFuture { //Promise是可寫的 Future, Future自身并沒有寫操作相關(guān)的接口, // Netty通過 Promise對 Future進(jìn)行擴(kuò)展,用于設(shè)置IO操作的結(jié)果 private Promise promise; public RpcFuture(Promise promise) { this.promise = promise; }}
保存requestid和future的對應(yīng)結(jié)果
public class RequestHolder { public static final AtomicLong REQUEST_ID=new AtomicLong(); public static final Map REQUEST_MAP=new ConcurrentHashMap<>();}
需要源碼的同學(xué),請關(guān)注公眾號[跟著Mic學(xué)架構(gòu)],回復(fù)關(guān)鍵字[rpc],即可獲得
版權(quán)聲明:本博客所有文章除特別聲明外,均采用 CC BY-NC-SA 4.0 許可協(xié)議。轉(zhuǎn)載請注明來自
Mic帶你學(xué)架構(gòu)
!
如果本篇文章對您有幫助,還請幫忙點(diǎn)個(gè)關(guān)注和贊,您的堅(jiān)持是我不斷創(chuàng)作的動(dòng)力。歡迎關(guān)注「跟著Mic學(xué)架構(gòu)」公眾號公眾號獲取更多技術(shù)干貨!
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/123637.html
摘要:簡單來說就是把注冊的動(dòng)作異步化,當(dāng)異步執(zhí)行結(jié)束后會(huì)把執(zhí)行結(jié)果回填到中抽象類一般就是公共邏輯的處理,而這里的處理主要就是針對一些參數(shù)的判斷,判斷完了之后再調(diào)用方法。 閱讀這篇文章之前,建議先閱讀和這篇文章關(guān)聯(lián)的內(nèi)容。 1. 詳細(xì)剖析分布式微服務(wù)架構(gòu)下網(wǎng)絡(luò)通信的底層實(shí)現(xiàn)原理(圖解) 2. (年薪60W的技巧)工作了5年,你真的理解Netty以及為什么要用嗎?(深度干貨)...
摘要:英文全名為,也叫遠(yuǎn)程過程調(diào)用,其實(shí)就是一個(gè)計(jì)算機(jī)通信協(xié)議,它是一種通過網(wǎng)絡(luò)從遠(yuǎn)程計(jì)算機(jī)程序上請求服務(wù)而不需要了解底層網(wǎng)絡(luò)技術(shù)的協(xié)議。 Hello,Dubbo 你好,dubbo,初次見面,我想和你交個(gè)朋友。 Dubbo你到底是什么? 先給出一套官方的說法:Apache Dubbo是一款高性能、輕量級基于Java的RPC開源框架。 那么什么是RPC? 文檔地址:http://dubbo.a...
摘要:前言此博客所述項(xiàng)目代碼已在開源歡迎大家一起貢獻(xiàn)點(diǎn)此進(jìn)入最近一次寫博客還是年底謝謝大家持久以來的關(guān)注本篇博文將會(huì)教大家如何從到搭建一個(gè)簡單高效且拓展性強(qiáng)的框架什么是相信大家都或多或少使用過框架比如阿里的谷歌的的等等那么究竟什么是翻譯成中文 Cool-Rpc 前言 此博客所述項(xiàng)目代碼已在github開源,歡迎大家一起貢獻(xiàn)! 點(diǎn)此進(jìn)入:Cool-RPC 最近一次寫博客還是17年底,謝謝大家...
摘要:從使用到原理學(xué)習(xí)線程池關(guān)于線程池的使用,及原理分析分析角度新穎面向切面編程的基本用法基于注解的實(shí)現(xiàn)在軟件開發(fā)中,分散于應(yīng)用中多出的功能被稱為橫切關(guān)注點(diǎn)如事務(wù)安全緩存等。 Java 程序媛手把手教你設(shè)計(jì)模式中的撩妹神技 -- 上篇 遇一人白首,擇一城終老,是多么美好的人生境界,她和他歷經(jīng)風(fēng)雨慢慢變老,回首走過的點(diǎn)點(diǎn)滴滴,依然清楚的記得當(dāng)初愛情萌芽的模樣…… Java 進(jìn)階面試問題列表 -...
閱讀 724·2023-04-25 19:43
閱讀 3921·2021-11-30 14:52
閱讀 3794·2021-11-30 14:52
閱讀 3859·2021-11-29 11:00
閱讀 3790·2021-11-29 11:00
閱讀 3882·2021-11-29 11:00
閱讀 3562·2021-11-29 11:00
閱讀 6138·2021-11-29 11:00