摘要:源碼分析一該類繼承了類,是協議實現的核心。屬性默認端口號不支持協議的服務暴露,拋出異常可以看到不支持服務暴露。后記該部分相關的源碼解析地址該文章講解了遠程調用中關于協議實現的部分,邏輯比較簡單。
遠程調用——redis協議
目標:介紹redis協議的設計和實現,介紹dubbo-rpc-redis的源碼。前言
dubbo支持的redis協議是基于Redis的,Redis 是一個高效的 KV 存儲服務器,跟memcached協議實現差不多,在dubbo中也沒有涉及到關于redis協議的服務暴露,只有服務引用,因為在訪問服務器時,Redis客戶端可以在服務器上存儲也可以獲取。
源碼分析 (一)RedisProtocol該類繼承了AbstractProtocol類,是redis協議實現的核心。
1.屬性/** * 默認端口號 */ public static final int DEFAULT_PORT = 6379;2.export
@Override publicExporter export(final Invoker invoker) throws RpcException { // 不支持redis協議的服務暴露,拋出異常 throw new UnsupportedOperationException("Unsupported export redis service. url: " + invoker.getUrl()); }
可以看到不支持服務暴露。
3.refer@Override publicInvoker refer(final Class type, final URL url) throws RpcException { try { // 實例化對象池 GenericObjectPoolConfig config = new GenericObjectPoolConfig(); // 如果 testOnBorrow 被設置,pool 會在 borrowObject 返回對象之前使用 PoolableObjectFactory的 validateObject 來驗證這個對象是否有效 // 要是對象沒通過驗證,這個對象會被丟棄,然后重新選擇一個新的對象。 config.setTestOnBorrow(url.getParameter("test.on.borrow", true)); // 如果 testOnReturn 被設置, pool 會在 returnObject 的時候通過 PoolableObjectFactory 的validateObject 方法驗證對象 // 如果對象沒通過驗證,對象會被丟棄,不會被放到池中。 config.setTestOnReturn(url.getParameter("test.on.return", false)); // 指定空閑對象是否應該使用 PoolableObjectFactory 的 validateObject 校驗,如果校驗失敗,這個對象會從對象池中被清除。 // 這個設置僅在 timeBetweenEvictionRunsMillis 被設置成正值( >0) 的時候才會生效。 config.setTestWhileIdle(url.getParameter("test.while.idle", false)); if (url.getParameter("max.idle", 0) > 0) // 控制一個pool最多有多少個狀態為空閑的jedis實例。 config.setMaxIdle(url.getParameter("max.idle", 0)); if (url.getParameter("min.idle", 0) > 0) // 控制一個pool最少有多少個狀態為空閑的jedis實例。 config.setMinIdle(url.getParameter("min.idle", 0)); if (url.getParameter("max.active", 0) > 0) // 控制一個pool最多有多少個jedis實例。 config.setMaxTotal(url.getParameter("max.active", 0)); if (url.getParameter("max.total", 0) > 0) config.setMaxTotal(url.getParameter("max.total", 0)); if (url.getParameter("max.wait", 0) > 0) //表示當引入一個jedis實例時,最大的等待時間,如果超過等待時間,則直接拋出JedisConnectionException; config.setMaxWaitMillis(url.getParameter("max.wait", 0)); if (url.getParameter("num.tests.per.eviction.run", 0) > 0) // 設置驅逐線程每次檢測對象的數量。這個設置僅在 timeBetweenEvictionRunsMillis 被設置成正值( >0)的時候才會生效。 config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0)); if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) // 指定驅逐線程的休眠時間。如果這個值不是正數( >0),不會有驅逐線程運行。 config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0)); if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) // 指定最小的空閑驅逐的時間間隔(空閑超過指定的時間的對象,會被清除掉)。 // 這個設置僅在 timeBetweenEvictionRunsMillis 被設置成正值( >0)的時候才會生效。 config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0)); // 創建redis連接池 final JedisPool jedisPool = new JedisPool(config, url.getHost(), url.getPort(DEFAULT_PORT), url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), StringUtils.isBlank(url.getPassword()) ? null : url.getPassword(), url.getParameter("db.index", 0)); // 獲得值的過期時間 final int expiry = url.getParameter("expiry", 0); // 獲得get命令 final String get = url.getParameter("get", "get"); // 獲得set命令 final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set"); // 獲得delete命令 final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete"); return new AbstractInvoker (type, url) { @Override protected Result doInvoke(Invocation invocation) throws Throwable { Jedis resource = null; try { resource = jedisPool.getResource(); // 如果是get命令 if (get.equals(invocation.getMethodName())) { // get 命令必須只有一個參數 if (invocation.getArguments().length != 1) { throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } // 獲得值 byte[] value = resource.get(String.valueOf(invocation.getArguments()[0]).getBytes()); if (value == null) { return new RpcResult(); } // 反序列化 ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value)); return new RpcResult(oin.readObject()); } else if (set.equals(invocation.getMethodName())) { // 如果是set命令,參數長度必須是2 if (invocation.getArguments().length != 2) { throw new IllegalArgumentException("The redis set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } // byte[] key = String.valueOf(invocation.getArguments()[0]).getBytes(); ByteArrayOutputStream output = new ByteArrayOutputStream(); // 對需要存入對值進行序列化 ObjectOutput value = getSerialization(url).serialize(url, output); value.writeObject(invocation.getArguments()[1]); // 存入值 resource.set(key, output.toByteArray()); // 設置該key過期時間,不能大于1000s if (expiry > 1000) { resource.expire(key, expiry / 1000); } return new RpcResult(); } else if (delete.equals(invocation.getMethodName())) { // 如果是刪除命令,則參數長度必須是1 if (invocation.getArguments().length != 1) { throw new IllegalArgumentException("The redis delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } // 刪除該值 resource.del(String.valueOf(invocation.getArguments()[0]).getBytes()); return new RpcResult(); } else { // 否則拋出該操作不支持的異常 throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in redis service."); } } catch (Throwable t) { RpcException re = new RpcException("Failed to invoke redis service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t); if (t instanceof TimeoutException || t instanceof SocketTimeoutException) { // 拋出超時異常 re.setCode(RpcException.TIMEOUT_EXCEPTION); } else if (t instanceof JedisConnectionException || t instanceof IOException) { // 拋出網絡異常 re.setCode(RpcException.NETWORK_EXCEPTION); } else if (t instanceof JedisDataException) { // 拋出序列化異常 re.setCode(RpcException.SERIALIZATION_EXCEPTION); } throw re; } finally { if (resource != null) { try { jedisPool.returnResource(resource); } catch (Throwable t) { logger.warn("returnResource error: " + t.getMessage(), t); } } } } @Override public void destroy() { super.destroy(); try { // 關閉連接池 jedisPool.destroy(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } } }; } catch (Throwable t) { throw new RpcException("Failed to refer redis service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t); } }
可以看到首先是對連接池的配置賦值,然后創建連接池后,根據redis的get、set、delete命令來進行相關操作。
后記該部分相關的源碼解析地址:https://github.com/CrazyHZM/i...
該文章講解了遠程調用中關于redis協議實現的部分,邏輯比較簡單。接下來我將開始對rpc模塊關于rest協議部分進行講解。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/73237.html
摘要:大揭秘異步化改造目標從源碼的角度分析的新特性中對于異步化的改造原理。看源碼解析四十六消費端發送請求過程講到的十四的,在以前的邏輯會直接在方法中根據配置區分同步異步單向調用。改為關于可以參考源碼解析十遠程通信層的六。 2.7大揭秘——異步化改造 目標:從源碼的角度分析2.7的新特性中對于異步化的改造原理。 前言 dubbo中提供了很多類型的協議,關于協議的系列可以查看下面的文章: du...
摘要:可以參考源碼解析二十四遠程調用協議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠程通信層的四。二十的可以參考源碼解析十七遠程通信的一。 2.7大揭秘——消費端發送請求過程 目標:從源碼的角度分析一個服務方法調用經歷怎么樣的磨難以后到達服務端。 前言 前一篇文章講到的是引用服務的過程,引用服務無非就是創建出一個代理。供消費者調用服務的相關方法。...
摘要:源碼分析一該類繼承,是協議實現的核心。屬性默認端口號不支持服務暴露可以看到,服務暴露方法直接拋出異常。后記該部分相關的源碼解析地址該文章講解了遠程調用中關于協議實現的部分,邏輯比較簡單。 遠程調用——memcached協議 目標:介紹memcached協議的設計和實現,介紹dubbo-rpc-memcached的源碼。 前言 dubbo實現memcached協議是基于Memcached...
摘要:而存在的意義就是保證請求或響應對象可在線程池中被解碼,解碼完成后,就會分發到的。 2.7大揭秘——服務端處理請求過程 目標:從源碼的角度分析服務端接收到請求后的一系列操作,最終把客戶端需要的值返回。 前言 上一篇講到了消費端發送請求的過程,該篇就要將服務端處理請求的過程。也就是當服務端收到請求數據包后的一系列處理以及如何返回最終結果。我們也知道消費端在發送請求的時候已經做了編碼,所以我...
摘要:遠程調用協議目標介紹遠程調用中跟協議相關的設計和實現,介紹的源碼。二該類繼承了,是協議中獨有的服務暴露者。八該類也是對的裝飾,其中增強了調用次數多功能。 遠程調用——dubbo協議 目標:介紹遠程調用中跟dubbo協議相關的設計和實現,介紹dubbo-rpc-dubbo的源碼。 前言 Dubbo 缺省協議采用單一長連接和 NIO 異步通訊,適合于小數據量大并發的服務調用,以及服務消費者...
閱讀 695·2021-11-15 11:37
閱讀 3316·2021-10-27 14:14
閱讀 6039·2021-09-13 10:30
閱讀 2961·2021-09-04 16:48
閱讀 1926·2021-08-18 10:22
閱讀 2125·2019-08-30 14:19
閱讀 729·2019-08-30 10:54
閱讀 1745·2019-08-29 18:40