国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專(zhuān)欄INFORMATION COLUMN

從0到1搭建RPC框架

stdying / 3409人閱讀

摘要:前言此博客所述項(xiàng)目代碼已在開(kāi)源歡迎大家一起貢獻(xiàn)點(diǎn)此進(jìn)入最近一次寫(xiě)博客還是年底謝謝大家持久以來(lái)的關(guān)注本篇博文將會(huì)教大家如何從到搭建一個(gè)簡(jiǎn)單高效且拓展性強(qiáng)的框架什么是相信大家都或多或少使用過(guò)框架比如阿里的谷歌的的等等那么究竟什么是翻譯成中文

Cool-Rpc 前言

此博客所述項(xiàng)目代碼已在github開(kāi)源,歡迎大家一起貢獻(xiàn)!
點(diǎn)此進(jìn)入:Cool-RPC

最近一次寫(xiě)博客還是17年底,謝謝大家持久以來(lái)的關(guān)注
本篇博文將會(huì)教大家如何從0到1,搭建一個(gè)簡(jiǎn)單、高效且拓展性強(qiáng)的rpc框架.

什么是RPC

相信大家都或多或少使用過(guò)RPC框架,比如阿里的Dubbo、谷歌的grpc、Facebook的Thrift等等

那么究竟什么是rpc?
rpc翻譯成中文叫做遠(yuǎn)程過(guò)程調(diào)用,通俗易懂點(diǎn):將單應(yīng)用架構(gòu)成分布式系統(tǒng)架構(gòu)后,多個(gè)系統(tǒng)間數(shù)據(jù)怎么交互,這就是rpc的職責(zé).

從服務(wù)的角度來(lái)看,rpc分為服務(wù)提供者(provider)和服務(wù)消費(fèi)者(consumer)兩大類(lèi),中間會(huì)有一些共用java接口,叫做開(kāi)放api接口
也就是說(shuō),接口服務(wù)實(shí)現(xiàn)類(lèi)所處的地方叫做provider,接口服務(wù)調(diào)用類(lèi)所處的地方叫consumer

因?yàn)樘幱诜植际江h(huán)境中,那consumer調(diào)用provider時(shí),如何知道對(duì)方服務(wù)器的IP和開(kāi)放端口呢?
這時(shí)需要一個(gè)組件叫做注冊(cè)中心,consumer通過(guò)服務(wù)名后,去注冊(cè)中心上查找該服務(wù)的IP+Port,拿到地址數(shù)據(jù)后,再去請(qǐng)求該地址的服務(wù)

如圖:

Cool-Rpc技術(shù)簡(jiǎn)介

此項(xiàng)目基于傳輸層(TCP/IP協(xié)議)進(jìn)行通訊,傳輸層框架使用netty編寫(xiě),github上會(huì)有mina版本
提供多套序列化框架,默認(rèn)使用Protostuff序列化,可配置使用java序列化等
注冊(cè)中心默認(rèn)zookeeper,可配置使用redis(只要有節(jié)點(diǎn)數(shù)據(jù)存儲(chǔ)和消息通知功能的組件即可)

consumer通過(guò)java動(dòng)態(tài)代理的方式使用執(zhí)行遠(yuǎn)程調(diào)用
將所要執(zhí)行的類(lèi)名,方法,參數(shù)等通知provider,之后provider拿著數(shù)據(jù)調(diào)用本地實(shí)現(xiàn)類(lèi),將處理后得到的結(jié)果通知給consumer

注冊(cè)中心

廢話了那么多,開(kāi)始上干貨,建議大家從github克隆完整代碼,本篇博文只講重點(diǎn)代碼

注冊(cè)中心以api接口名為key,IP+Port為value,將數(shù)據(jù)持久化,以供消費(fèi)者查詢調(diào)用

以zookeeper為例:

為了更靈活地實(shí)現(xiàn)服務(wù)注冊(cè)者和發(fā)現(xiàn)者,這里添加一個(gè)注冊(cè)中心適配器

public abstract class ServiceCenterAdapter implements ServiceCenter{

    String host;
    int port = 0;
    String passWord;

    ServiceCenterAdapter(){}

    ServiceCenterAdapter(String host){
        this.host = host;
    }

    ServiceCenterAdapter(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public String discover(String serviceName) {
        return null;
    }

    @Override
    public void register(String serviceName, String serviceAddress) {}

    @Override
    public void setHost(String host){
        this.host = host;
    };

    @Override
    public void setPort(int port){
        this.port = port;
    };

    @Override
    public void setPassWord(String passWord){
        this.passWord = passWord;
    };
    //獲取 IP:端口
    @Override
    public String getAddress(){
        if ("".equals(host) || host == null || port == 0){
            throw new RuntimeException("the zookeeper host or port error");
        }
        return host+":"+String.valueOf(port);
    };
}

zookeeper的服務(wù)注冊(cè)(provider使用):
在實(shí)際項(xiàng)目中,需要構(gòu)造此類(lèi),并注入相應(yīng)的IP和端口,最后以bean的形式注入到IOC容器中

public class ZooKeeperServiceRegistry extends ServiceCenterAdapter {

    private static final Logger log = LoggerFactory.getLogger(ZooKeeperServiceRegistry.class);

    private ZkClient zkClient;

    {
        this.port = 2181;
        zkClient = new ZkClient(getAddress(), CoolConstant.ZK_SESSION_TIMEOUT, CoolConstant.ZK_CONNECTION_TIMEOUT);
        log.info("connect zookeeper");
    }

    public ZooKeeperServiceRegistry(String zkHost) {
        super(zkHost);
    }

    public ZooKeeperServiceRegistry(String zkHost, int zkPort) {
        super(zkHost, zkPort);
    }

    // 注冊(cè)服務(wù) serviceName=接口名  serviceAddress=IP+Port
    @Override
    public void register(String serviceName, String serviceAddress) {
        // create cool node permanent
        String registryPath = CoolConstant.ZK_REGISTRY_PATH;
        if (!zkClient.exists(registryPath)) {
            zkClient.createPersistent(registryPath);
            log.info("create registry node: {}", registryPath);
        }
        // create service node permanent
        String servicePath = registryPath + "/" + serviceName;
        if (!zkClient.exists(servicePath)) {
            zkClient.createPersistent(servicePath);
            log.info("create service node: {}", servicePath);
        }
        // create service address node temp
        String addressPath = servicePath + "/address-";
        String addressNode = zkClient.createEphemeralSequential(addressPath, serviceAddress);
        log.info("create address node: {}", addressNode);
    }

}

zookeeper的服務(wù)發(fā)現(xiàn)者(consumer使用):
同上,也需要配置相應(yīng)的IP和端口,并以bean注入到項(xiàng)目ioc容器中

public class ZooKeeperServiceDiscovery extends ServiceCenterAdapter {

    private static final Logger log = LoggerFactory.getLogger(ZooKeeperServiceDiscovery.class);

    {
        super.port = 2181;
    }

    public ZooKeeperServiceDiscovery(){};

    public ZooKeeperServiceDiscovery(String zkHost){
        super(zkHost);
    }

    public ZooKeeperServiceDiscovery(String zkHost, int zkPort){
        super(zkHost, zkPort);
    }

    // 服務(wù)發(fā)現(xiàn)    name=api接口名
    @Override
    public String discover(String name) {

        ZkClient zkClient = new ZkClient(getAddress(), CoolConstant.ZK_SESSION_TIMEOUT, CoolConstant.ZK_CONNECTION_TIMEOUT);
        log.debug("connect zookeeper");
        try {
            String servicePath = CoolConstant.ZK_REGISTRY_PATH + "/" + name;
            if (!zkClient.exists(servicePath)) {
                throw new RuntimeException(String.format("can not find any service node on path: %s", servicePath));
            }
            List addressList = zkClient.getChildren(servicePath);
            if (addressList.size() == 0) {
                throw new RuntimeException(String.format("can not find any address node on path: %s", servicePath));
            }
            String address;
            int size = addressList.size();
            if (size == 1) {
                address = addressList.get(0);
                log.debug("get only address node: {}", address);
            } else {
                address = addressList.get(ThreadLocalRandom.current().nextInt(size));
                log.debug("get random address node: {}", address);
            }
            String addressPath = servicePath + "/" + address;
            return zkClient.readData(addressPath);
        } finally {
            zkClient.close();
        }
    }

}
服務(wù)端TCP處理器

此篇博文的TCP數(shù)據(jù)(包括編解碼器、處理器)全部以netty編寫(xiě)

服務(wù)端的netty引導(dǎo)類(lèi):

public class CoolRpcServer implements ApplicationContextAware {

    private static Logger log = LoggerFactory.getLogger(CoolRpcServer.class);
    private Channel channel;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private ServerBootstrap bootstrap;
    private HandlerInitializer handlerInitializer;
    private ServiceCenter serviceRegistry;
    private String serviceIP;
    private int port;
    public static Map servicesMap ;

    {
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup();
        bootstrap = new ServerBootstrap();
        handlerInitializer = new HandlerInitializer();
        servicesMap = new HashMap<>(16);
    }

    public CoolRpcServer(ServiceCenter serviceRegistry, String serviceIP, int port){
        this.serviceRegistry = serviceRegistry;
        this.serviceIP = serviceIP;
        this.port = port;

    }

    /**
     * start and init tcp server if ioc contain is booting
     */
    @SuppressWarnings("unchecked")
    public void initServer() throws InterruptedException {

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(handlerInitializer);

        bootstrap.option(ChannelOption.SO_BACKLOG, 128);
        bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        // the most send bytes ( 256KB )
        bootstrap.childOption(ChannelOption.SO_SNDBUF, 1024 * 256);
        // the most receive bytes ( 2048KB )
        bootstrap.childOption(ChannelOption.SO_RCVBUF, 1024 * 1024 * 2);

        channel = bootstrap.bind(serviceIP,port).sync().channel();

        if (servicesMap != null && servicesMap.size() > 0){
            for (String beanName: servicesMap.keySet()){
                serviceRegistry.register(beanName, serviceIP + ":" + String.valueOf(port));
                log.info("register service name = {}", beanName);
            }
        }
        log.info("TCP server started successfully, port:{}", port);

        channel.closeFuture().sync();
    }


    /**
     * close ioc contain and stop tcp server
     */
    public void stopServer(){

        if (channel != null && channel.isActive()) {
            channel.close();
        }
        if (bossGroup != null) {
            bossGroup.shutdownGracefully();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }

        log.info("TCP server stopped successfully, port: {}", port);
    }

    /**
     *  scan Annotation of CoolService
     */
    @Override
    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        Map beans = ctx.getBeansWithAnnotation(CoolService.class);
        if (beans != null && beans.size()>0){
            for (Object bean : beans.values()){
                String name = bean.getClass().getAnnotation(CoolService.class).value().getName();
                servicesMap.put(name, bean);
            }
        }
    }

}

此項(xiàng)目的開(kāi)放api接口實(shí)現(xiàn)類(lèi)需要用@CoolService注解標(biāo)識(shí),服務(wù)端容器啟動(dòng)時(shí),會(huì)掃描所有帶有此注解的實(shí)現(xiàn)類(lèi),并注入到注冊(cè)中心

服務(wù)端處理器(netty handler):

@ChannelHandler.Sharable
public class CoolServerHandler extends ChannelInboundHandlerAdapter {

    private static Logger log = LoggerFactory.getLogger(CoolServerHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CoolResponse response = new CoolResponse();
        CoolRequest request = (CoolRequest) msg;

        try {
            Object result = invoke(request);
            response.setRequestID(request.getRequestID());
            response.setResult(result);
        } catch (Throwable error) {
            response.setError(error);
        }
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }


    private Object invoke(CoolRequest request) throws Throwable{
        if (request == null){
            throw new Throwable("cool rpc request not found");
        }

        String className = request.getClassName();
        String methodName = request.getMethodName();
        Object[] parameters = request.getParameters();
        Object service = CoolRpcServer.servicesMap.get(className);
        if (service == null){
            throw new Throwable("cool rpc service not exist");
        }

        Class serviceClass = service.getClass();
        Class[] parameterTypes = request.getParameterTypes();

        FastClass fastClass = FastClass.create(serviceClass);
        FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes);
        return fastMethod.invoke(service, parameters);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("server caught exception", cause);
        ctx.close();
    }

}

將客戶端傳輸過(guò)來(lái)的請(qǐng)求數(shù)據(jù)(類(lèi)名,方法,參數(shù))在本地以cglib的方式反射調(diào)用
調(diào)用成功后,將處理完畢的結(jié)果編碼返回給客戶端,并且關(guān)閉TCP連接

客戶端TCP處理器

consumer只有api接口,并沒(méi)有其實(shí)現(xiàn)類(lèi),所以我們可以用java動(dòng)態(tài)代理的方式去自定義方法實(shí)現(xiàn),代理的方法實(shí)現(xiàn)便是建立TCP握手連接,有provider來(lái)執(zhí)行方法,將得到的結(jié)果返回給代理類(lèi),由此造成一種單憑接口就能調(diào)用實(shí)現(xiàn)類(lèi)方法的假象

第一步: 使用java動(dòng)態(tài)代理new出代理對(duì)象

public class CoolProxy {

    private static Logger log = LoggerFactory.getLogger(CoolProxy.class);

    private ServiceCenter serviceDiscovery;

    public CoolProxy(ServiceCenter serviceDiscovery){
        this.serviceDiscovery = serviceDiscovery;
    }

    @SuppressWarnings("unchecked")
    public  T getInstance(Class cls){

        return (T)Proxy.newProxyInstance(cls.getClassLoader(),
                new Class[]{cls},
                (proxy, method, args) -> {

                    CoolRequest request = new CoolRequest();
                    request.setRequestID(UUID.randomUUID().toString());
                    request.setClassName(method.getDeclaringClass().getName());
                    request.setMethodName(method.getName());
                    request.setParameters(args);
                    request.setParameterTypes(method.getParameterTypes());

                    String[] addr = serviceDiscovery.discover(cls.getName()).split(":",2);

                    CoolRpcClient client = new CoolRpcClient(addr[0],
                            Integer.parseInt(addr[1]));

                    CoolResponse response = client.send(request);
                    if (response.getError()!=null){
                        throw response.getError();
                    } else {
                        return response.getResult();
                    }

                });
    }

}

第二步: 在代理方法中,使用遠(yuǎn)程過(guò)程調(diào)用(rpc)

客戶端引導(dǎo)類(lèi):

public class CoolRpcClient {

    private static Logger log = LoggerFactory.getLogger(CoolRpcClient.class);

    private CountDownLatch countDownLatch;
    private EventLoopGroup group;
    private Bootstrap bootstrap;
    private CoolResponse response;
    private String serviceIP;
    private int port;

    {
        response = new CoolResponse();
        countDownLatch = new CountDownLatch(1);
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
    }


    public CoolRpcClient(String serviceIP, int port){
        this.serviceIP = serviceIP;
        this.port = port;
    }


    public CoolResponse send(CoolRequest request){
        try {
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer() {

                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline()
                            .addLast(new CoolRpcDecoder(CoolResponse.class))
                            .addLast(new CoolRpcEncoder(CoolRequest.class))
                            .addLast(new CoolClientHandler(countDownLatch, response));
                }
            });
            bootstrap.option(ChannelOption.TCP_NODELAY, true);

            Channel channel = bootstrap.connect(serviceIP, port).sync().channel();
            channel.writeAndFlush(request).sync();
            countDownLatch.await();
            channel.closeFuture().sync();

            return response;
        } catch (Exception e){
            e.printStackTrace();
            return null;
        } finally {
            group.shutdownGracefully();
        }
    }


}

客戶端處理器(handler):

@ChannelHandler.Sharable
public class CoolClientHandler extends ChannelInboundHandlerAdapter {

    private static Logger log = LoggerFactory.getLogger(CoolClientHandler.class);

    private CountDownLatch latch;
    private CoolResponse response;

    public CoolClientHandler(CountDownLatch latch, CoolResponse response){
        this.latch = latch;
        this.response = response;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CoolResponse enResponse = (CoolResponse) msg;
        this.response.sync(enResponse);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        latch.countDown();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("api caught exception", cause);
        ctx.close();
    }

}

最后使用CountDownLatch同步通知調(diào)用者,rpc調(diào)用完畢

結(jié)束語(yǔ)

以上便是Cool-Rpc的簡(jiǎn)單講解,如有更好的想法請(qǐng)聯(lián)系我
熱烈歡迎大家一起維護(hù)此項(xiàng)目Cool-RPC

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/76893.html

相關(guān)文章

  • RPC框架實(shí)踐之:Apache Thrift

    摘要:在文章微服務(wù)調(diào)用鏈追蹤中心搭建一文中模擬出來(lái)的調(diào)用鏈就是一個(gè)遠(yuǎn)程調(diào)用的例子,只不過(guò)這篇文章里是通過(guò)這種同步調(diào)用方式,利用的是協(xié)議在應(yīng)用層完成的,這種方法雖然奏效,但有時(shí)效率并不高。 showImg(https://segmentfault.com/img/remote/1460000014858219); 一、概述 RPC(Remote Procedure Call)即 遠(yuǎn)程過(guò)程調(diào)...

    Gilbertat 評(píng)論0 收藏0
  • RPC框架實(shí)踐之:Apache Thrift

    摘要:在文章微服務(wù)調(diào)用鏈追蹤中心搭建一文中模擬出來(lái)的調(diào)用鏈就是一個(gè)遠(yuǎn)程調(diào)用的例子,只不過(guò)這篇文章里是通過(guò)這種同步調(diào)用方式,利用的是協(xié)議在應(yīng)用層完成的,這種方法雖然奏效,但有時(shí)效率并不高。 showImg(https://segmentfault.com/img/remote/1460000014858219); 一、概述 RPC(Remote Procedure Call)即 遠(yuǎn)程過(guò)程調(diào)...

    keithxiaoy 評(píng)論0 收藏0
  • RPC框架實(shí)踐之:Google gRPC

    摘要:與文章框架實(shí)踐之一文中實(shí)踐的另一種通用框架能通過(guò)自動(dòng)生成對(duì)應(yīng)語(yǔ)言的接口類(lèi)似,也能自動(dòng)地生成和的存根,我們只需要一個(gè)命令就能快速搭建起運(yùn)行環(huán)境。類(lèi)似于之前對(duì)于框架的實(shí)踐步驟,下面一一闡述。 showImg(https://segmentfault.com/img/remote/1460000014946557); 概述 gRPC是Google開(kāi)源的通用高性能RPC框架,它支持的是使用P...

    malakashi 評(píng)論0 收藏0
  • RPC框架實(shí)踐之:Google gRPC

    摘要:與文章框架實(shí)踐之一文中實(shí)踐的另一種通用框架能通過(guò)自動(dòng)生成對(duì)應(yīng)語(yǔ)言的接口類(lèi)似,也能自動(dòng)地生成和的存根,我們只需要一個(gè)命令就能快速搭建起運(yùn)行環(huán)境。類(lèi)似于之前對(duì)于框架的實(shí)踐步驟,下面一一闡述。 showImg(https://segmentfault.com/img/remote/1460000014946557); 概述 gRPC是Google開(kāi)源的通用高性能RPC框架,它支持的是使用P...

    vibiu 評(píng)論0 收藏0
  • 超詳細(xì),新手都能看懂 !使用SpringBoot+Dubbo 搭建一個(gè)簡(jiǎn)單的分布式服務(wù)

    Github 地址:https://github.com/Snailclimb/springboot-integration-examples ,歡迎各位 Star。 目錄: 使用 SpringBoot+Dubbo 搭建一個(gè)簡(jiǎn)單分布式服務(wù) 實(shí)戰(zhàn)之前,先來(lái)看幾個(gè)重要的概念 什么是分布式? 什么是 Duboo? Dubbo 架構(gòu) 什么是 RPC? 為什么要用 Dubbo? 開(kāi)始實(shí)戰(zhàn) 1 ...

    chengtao1633 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<