摘要:遠程通訊目標介紹的相關實現(xiàn)邏輯介紹中的包內的源碼解析。源碼分析一處理對應的命令命令該接口上命令處理器接口,是一個可擴展接口。關閉通道五該類實現(xiàn)了接口,封裝了命令的實現(xiàn)。下一篇我會講解基于實現(xiàn)遠程通信部分。
遠程通訊——Telnet
目標:介紹telnet的相關實現(xiàn)邏輯、介紹dubbo-remoting-api中的telnet包內的源碼解析。前言
從dubbo 2.0.5開始,dubbo開始支持通過 telnet 命令來進行服務治理。本文就是講解一些公用的telnet命令的實現(xiàn)。下面來看一下telnet實現(xiàn)的類圖:
可以看到,實現(xiàn)了TelnetHandler接口的有六個類,除了TelnetHandlerAdapter是以外,其他五個分別對應了clear、exit、help、log、status命令的實現(xiàn),具體用來干嘛,請看官方文檔的介紹。
源碼分析 (一)TelnetHandler@SPI public interface TelnetHandler { /** * telnet. * 處理對應的telnet命令 * @param channel * @param message telnet命令 */ String telnet(Channel channel, String message) throws RemotingException; }
該接口上telnet命令處理器接口,是一個可擴展接口。它定義了一個方法,就是處理相關的telnet命令。
(二)TelnetHandlerAdapter該類繼承了ChannelHandlerAdapter,實現(xiàn)了TelnetHandler接口,是TelnetHandler的適配器類,負責在接收到HeaderExchangeHandler發(fā)來的telnet命令后分發(fā)給對應的TelnetHandler實現(xiàn)類去實現(xiàn),并且返回命令結果。
public class TelnetHandlerAdapter extends ChannelHandlerAdapter implements TelnetHandler { /** * 擴展加載器 */ private final ExtensionLoaderextensionLoader = ExtensionLoader.getExtensionLoader(TelnetHandler.class); @Override public String telnet(Channel channel, String message) throws RemotingException { // 獲得提示鍵配置,用于nc獲取信息時不顯示提示符 String prompt = channel.getUrl().getParameterAndDecoded(Constants.PROMPT_KEY, Constants.DEFAULT_PROMPT); boolean noprompt = message.contains("--no-prompt"); message = message.replace("--no-prompt", ""); StringBuilder buf = new StringBuilder(); // 刪除頭尾空白符的字符串 message = message.trim(); String command; // 獲得命令 if (message.length() > 0) { int i = message.indexOf(" "); if (i > 0) { // 獲得命令 command = message.substring(0, i).trim(); // 獲得參數(shù) message = message.substring(i + 1).trim(); } else { command = message; message = ""; } } else { command = ""; } if (command.length() > 0) { // 如果有該命令的擴展實現(xiàn)類 if (extensionLoader.hasExtension(command)) { try { // 執(zhí)行相應命令的實現(xiàn)類的telnet String result = extensionLoader.getExtension(command).telnet(channel, message); if (result == null) { return null; } // 返回結果 buf.append(result); } catch (Throwable t) { buf.append(t.getMessage()); } } else { buf.append("Unsupported command: "); buf.append(command); } } if (buf.length() > 0) { buf.append(" "); } // 添加 telnet 提示語 if (prompt != null && prompt.length() > 0 && !noprompt) { buf.append(prompt); } return buf.toString(); } }
該類只實現(xiàn)了telnet方法,其中的邏輯還是比較清晰,就是根據(jù)對應的命令去讓對應的實現(xiàn)類產(chǎn)生命令結果。
(三)ClearTelnetHandler該類實現(xiàn)了TelnetHandler接口,封裝了clear命令的實現(xiàn)。
@Activate @Help(parameter = "[lines]", summary = "Clear screen.", detail = "Clear screen.") public class ClearTelnetHandler implements TelnetHandler { @Override public String telnet(Channel channel, String message) { // 清除屏幕上的內容行數(shù) int lines = 100; if (message.length() > 0) { // 如果不是一個數(shù)字 if (!StringUtils.isInteger(message)) { return "Illegal lines " + message + ", must be integer."; } lines = Integer.parseInt(message); } StringBuilder buf = new StringBuilder(); // 一行一行清除 for (int i = 0; i < lines; i++) { buf.append(" "); } return buf.toString(); } }(四)ExitTelnetHandler
該類實現(xiàn)了TelnetHandler接口,封裝了exit命令的實現(xiàn)。
@Activate @Help(parameter = "", summary = "Exit the telnet.", detail = "Exit the telnet.") public class ExitTelnetHandler implements TelnetHandler { @Override public String telnet(Channel channel, String message) { // 關閉通道 channel.close(); return null; } }(五)HelpTelnetHandler
該類實現(xiàn)了TelnetHandler接口,封裝了help命令的實現(xiàn)。
@Activate @Help(parameter = "[command]", summary = "Show help.", detail = "Show help.") public class HelpTelnetHandler implements TelnetHandler { /** * 擴展加載器 */ private final ExtensionLoaderextensionLoader = ExtensionLoader.getExtensionLoader(TelnetHandler.class); @Override public String telnet(Channel channel, String message) { // 如果需要查看某一個命令的幫助 if (message.length() > 0) { if (!extensionLoader.hasExtension(message)) { return "No such command " + message; } // 獲得對應的擴展實現(xiàn)類 TelnetHandler handler = extensionLoader.getExtension(message); Help help = handler.getClass().getAnnotation(Help.class); StringBuilder buf = new StringBuilder(); // 生成命令和幫助信息 buf.append("Command: "); buf.append(message + " " + help.parameter().replace(" ", " ").replace(" ", " ")); buf.append(" Summary: "); buf.append(help.summary().replace(" ", " ").replace(" ", " ")); buf.append(" Detail: "); buf.append(help.detail().replace(" ", " ").replace(" ", " ")); return buf.toString(); // 如果查看所有命令的幫助 } else { List > table = new ArrayList
>(); // 獲得所有命令的提示信息 List
handlers = extensionLoader.getActivateExtension(channel.getUrl(), "telnet"); if (handlers != null && !handlers.isEmpty()) { for (TelnetHandler handler : handlers) { Help help = handler.getClass().getAnnotation(Help.class); List row = new ArrayList (); String parameter = " " + extensionLoader.getExtensionName(handler) + " " + (help != null ? help.parameter().replace(" ", " ").replace(" ", " ") : ""); row.add(parameter.length() > 50 ? parameter.substring(0, 50) + "..." : parameter); String summary = help != null ? help.summary().replace(" ", " ").replace(" ", " ") : ""; row.add(summary.length() > 50 ? summary.substring(0, 50) + "..." : summary); table.add(row); } } return "Please input "help [command]" show detail. " + TelnetUtils.toList(table); } } }
help分為了需要查看某一個命令的幫助還是查看全部命令的幫助。
(六)LogTelnetHandler該類實現(xiàn)了TelnetHandler接口,封裝了log命令的實現(xiàn)。
@Activate @Help(parameter = "level", summary = "Change log level or show log ", detail = "Change log level or show log") public class LogTelnetHandler implements TelnetHandler { public static final String SERVICE_KEY = "telnet.log"; @Override public String telnet(Channel channel, String message) { long size = 0; File file = LoggerFactory.getFile(); StringBuffer buf = new StringBuffer(); if (message == null || message.trim().length() == 0) { buf.append("EXAMPLE: log error / log 100"); } else { String str[] = message.split(" "); if (!StringUtils.isInteger(str[0])) { // 設置日志級別 LoggerFactory.setLevel(Level.valueOf(message.toUpperCase())); } else { // 獲得日志長度 int SHOW_LOG_LENGTH = Integer.parseInt(str[0]); if (file != null && file.exists()) { try { FileInputStream fis = new FileInputStream(file); try { FileChannel filechannel = fis.getChannel(); try { size = filechannel.size(); ByteBuffer bb; if (size <= SHOW_LOG_LENGTH) { // 分配緩沖區(qū) bb = ByteBuffer.allocate((int) size); // 讀日志數(shù)據(jù) filechannel.read(bb, 0); } else { int pos = (int) (size - SHOW_LOG_LENGTH); // 分配緩沖區(qū) bb = ByteBuffer.allocate(SHOW_LOG_LENGTH); // 讀取日志數(shù)據(jù) filechannel.read(bb, pos); } bb.flip(); String content = new String(bb.array()).replace("<", "<") .replace(">", ">").replace(" ", "
"); buf.append(" content:" + content); buf.append(" modified:" + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") .format(new Date(file.lastModified())))); buf.append(" size:" + size + " "); } finally { filechannel.close(); } } finally { fis.close(); } } catch (Exception e) { buf.append(e.getMessage()); } } else { size = 0; buf.append(" MESSAGE: log file not exists or log appender is console ."); } } } buf.append(" CURRENT LOG LEVEL:" + LoggerFactory.getLevel()) .append(" CURRENT LOG APPENDER:" + (file == null ? "console" : file.getAbsolutePath())); return buf.toString(); } }
log命令實現(xiàn)原理就是從日志文件中把日志信息讀取出來。
(七)StatusTelnetHandler該類實現(xiàn)了TelnetHandler接口,封裝了status命令的實現(xiàn)。
@Activate @Help(parameter = "[-l]", summary = "Show status.", detail = "Show status.") public class StatusTelnetHandler implements TelnetHandler { private final ExtensionLoader(八)HelpextensionLoader = ExtensionLoader.getExtensionLoader(StatusChecker.class); @Override public String telnet(Channel channel, String message) { // 顯示狀態(tài)列表 if (message.equals("-l")) { List checkers = extensionLoader.getActivateExtension(channel.getUrl(), "status"); String[] header = new String[]{"resource", "status", "message"}; List > table = new ArrayList
>(); Map
statuses = new HashMap (); if (checkers != null && !checkers.isEmpty()) { // 遍歷各個資源的狀態(tài),如果一個當全部 OK 時則顯示 OK,只要有一個 ERROR 則顯示 ERROR,只要有一個 WARN 則顯示 WARN for (StatusChecker checker : checkers) { String name = extensionLoader.getExtensionName(checker); Status stat; try { stat = checker.check(); } catch (Throwable t) { stat = new Status(Status.Level.ERROR, t.getMessage()); } statuses.put(name, stat); if (stat.getLevel() != null && stat.getLevel() != Status.Level.UNKNOWN) { List row = new ArrayList (); row.add(name); row.add(String.valueOf(stat.getLevel())); row.add(stat.getMessage() == null ? "" : stat.getMessage()); table.add(row); } } } Status stat = StatusUtils.getSummaryStatus(statuses); List row = new ArrayList (); row.add("summary"); row.add(String.valueOf(stat.getLevel())); row.add(stat.getMessage()); table.add(row); return TelnetUtils.toTable(header, table); } else if (message.length() > 0) { return "Unsupported parameter " + message + " for status."; } String status = channel.getUrl().getParameter("status"); Map statuses = new HashMap (); if (status != null && status.length() > 0) { String[] ss = Constants.COMMA_SPLIT_PATTERN.split(status); for (String s : ss) { StatusChecker handler = extensionLoader.getExtension(s); Status stat; try { stat = handler.check(); } catch (Throwable t) { stat = new Status(Status.Level.ERROR, t.getMessage()); } statuses.put(s, stat); } } Status stat = StatusUtils.getSummaryStatus(statuses); return String.valueOf(stat.getLevel()); } }
該接口是幫助文檔接口
@Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) public @interface Help { String parameter() default ""; String summary(); String detail() default ""; }
可以看上在每個命令的實現(xiàn)類上都加上了@Help注解,為了添加一些幫助文案。
(九)TelnetUtils該類是Telnet命令的工具類,其中邏輯我就不介紹了。
(十)TelnetCodec該類繼承了TransportCodec,是telnet的編解碼類。
1.屬性private static final Logger logger = LoggerFactory.getLogger(TelnetCodec.class); /** * 歷史命令列表 */ private static final String HISTORY_LIST_KEY = "telnet.history.list"; /** * 歷史命令位置,就是用上下鍵來找歷史命令 */ private static final String HISTORY_INDEX_KEY = "telnet.history.index"; /** * 向上鍵 */ private static final byte[] UP = new byte[]{27, 91, 65}; /** * 向下鍵 */ private static final byte[] DOWN = new byte[]{27, 91, 66}; /** * 回車 */ private static final List> ENTER = Arrays.asList(new Object[]{new byte[]{" ", " "} /* Windows Enter */, new byte[]{" "} /* Linux Enter */}); /** * 退出 */ private static final List> EXIT = Arrays.asList(new Object[]{new byte[]{3} /* Windows Ctrl+C */, new byte[]{-1, -12, -1, -3, 6} /* Linux Ctrl+C */, new byte[]{-1, -19, -1, -3, 6} /* Linux Pause */});2.getCharset
private static Charset getCharset(Channel channel) { if (channel != null) { // 獲得屬性設置 Object attribute = channel.getAttribute(Constants.CHARSET_KEY); // 返回指定字符集的charset對象。 if (attribute instanceof String) { try { return Charset.forName((String) attribute); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } else if (attribute instanceof Charset) { return (Charset) attribute; } URL url = channel.getUrl(); if (url != null) { String parameter = url.getParameter(Constants.CHARSET_KEY); if (parameter != null && parameter.length() > 0) { try { return Charset.forName(parameter); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } } // 默認的編碼是utf-8 try { return Charset.forName(Constants.DEFAULT_CHARSET); } catch (Throwable t) { logger.warn(t.getMessage(), t); } return Charset.defaultCharset(); }
該方法是獲得通道的字符集,根據(jù)url中編碼來獲得字符集,默認是utf-8。
3.encode@Override public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException { // 如果需要編碼的是 telnet 命令結果 if (message instanceof String) { //如果為客戶端側的通道m(xù)essage直接返回 if (isClientSide(channel)) { message = message + " "; } // 獲得字節(jié)數(shù)組 byte[] msgData = ((String) message).getBytes(getCharset(channel).name()); // 寫入緩沖區(qū) buffer.writeBytes(msgData); } else { super.encode(channel, buffer, message); } }
該方法是編碼方法。
4.decode@Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { // 獲得緩沖區(qū)可讀的字節(jié) int readable = buffer.readableBytes(); byte[] message = new byte[readable]; // 從緩沖區(qū)讀數(shù)據(jù) buffer.readBytes(message); return decode(channel, buffer, readable, message); } @SuppressWarnings("unchecked") protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] message) throws IOException { // 如果是客戶端側,直接返回結果 if (isClientSide(channel)) { return toString(message, getCharset(channel)); } // 檢驗消息長度 checkPayload(channel, readable); if (message == null || message.length == 0) { return DecodeResult.NEED_MORE_INPUT; } // 如果回退 if (message[message.length - 1] == "") { // Windows backspace echo try { boolean doublechar = message.length >= 3 && message[message.length - 3] < 0; // double byte char channel.send(new String(doublechar ? new byte[]{32, 32, 8, 8} : new byte[]{32, 8}, getCharset(channel).name())); } catch (RemotingException e) { throw new IOException(StringUtils.toString(e)); } return DecodeResult.NEED_MORE_INPUT; } // 如果命令是退出 for (Object command : EXIT) { if (isEquals(message, (byte[]) command)) { if (logger.isInfoEnabled()) { logger.info(new Exception("Close channel " + channel + " on exit command: " + Arrays.toString((byte[]) command))); } // 關閉通道 channel.close(); return null; } } boolean up = endsWith(message, UP); boolean down = endsWith(message, DOWN); // 如果用上下鍵找歷史命令 if (up || down) { LinkedListhistory = (LinkedList ) channel.getAttribute(HISTORY_LIST_KEY); if (history == null || history.isEmpty()) { return DecodeResult.NEED_MORE_INPUT; } Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY); Integer old = index; if (index == null) { index = history.size() - 1; } else { // 向上 if (up) { index = index - 1; if (index < 0) { index = history.size() - 1; } } else { // 向下 index = index + 1; if (index > history.size() - 1) { index = 0; } } } // 獲得歷史命令,并發(fā)送給客戶端 if (old == null || !old.equals(index)) { // 設置當前命令位置 channel.setAttribute(HISTORY_INDEX_KEY, index); // 獲得歷史命令 String value = history.get(index); // 清除客戶端原有命令,用查到的歷史命令替代 if (old != null && old >= 0 && old < history.size()) { String ov = history.get(old); StringBuilder buf = new StringBuilder(); for (int i = 0; i < ov.length(); i++) { buf.append(""); } for (int i = 0; i < ov.length(); i++) { buf.append(" "); } for (int i = 0; i < ov.length(); i++) { buf.append(""); } value = buf.toString() + value; } try { channel.send(value); } catch (RemotingException e) { throw new IOException(StringUtils.toString(e)); } } // 返回,需要更多指令 return DecodeResult.NEED_MORE_INPUT; } // 關閉命令 for (Object command : EXIT) { if (isEquals(message, (byte[]) command)) { if (logger.isInfoEnabled()) { logger.info(new Exception("Close channel " + channel + " on exit command " + command)); } channel.close(); return null; } } byte[] enter = null; // 如果命令是回車 for (Object command : ENTER) { if (endsWith(message, (byte[]) command)) { enter = (byte[]) command; break; } } if (enter == null) { return DecodeResult.NEED_MORE_INPUT; } LinkedList history = (LinkedList ) channel.getAttribute(HISTORY_LIST_KEY); Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY); // 移除歷史命令 channel.removeAttribute(HISTORY_INDEX_KEY); // 將歷史命令拼接 if (history != null && !history.isEmpty() && index != null && index >= 0 && index < history.size()) { String value = history.get(index); if (value != null) { byte[] b1 = value.getBytes(); byte[] b2 = new byte[b1.length + message.length]; System.arraycopy(b1, 0, b2, 0, b1.length); System.arraycopy(message, 0, b2, b1.length, message.length); message = b2; } } // 將命令字節(jié)數(shù)組,轉成具體的一條命令 String result = toString(message, getCharset(channel)); if (result.trim().length() > 0) { if (history == null) { history = new LinkedList (); channel.setAttribute(HISTORY_LIST_KEY, history); } if (history.isEmpty()) { history.addLast(result); } else if (!result.equals(history.getLast())) { history.remove(result); // 添加當前命令到歷史尾部 history.addLast(result); // 超過上限,移除歷史的頭部 if (history.size() > 10) { history.removeFirst(); } } } return result; }
該方法是編碼。
后記該部分相關的源碼解析地址:https://github.com/CrazyHZM/i...
該文章講解了telnet的相關實現(xiàn)邏輯,本文有興趣的朋友可以看看。下一篇我會講解基于grizzly實現(xiàn)遠程通信部分。
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/72810.html
摘要:而存在的意義就是保證請求或響應對象可在線程池中被解碼,解碼完成后,就會分發(fā)到的。 2.7大揭秘——服務端處理請求過程 目標:從源碼的角度分析服務端接收到請求后的一系列操作,最終把客戶端需要的值返回。 前言 上一篇講到了消費端發(fā)送請求的過程,該篇就要將服務端處理請求的過程。也就是當服務端收到請求數(shù)據(jù)包后的一系列處理以及如何返回最終結果。我們也知道消費端在發(fā)送請求的時候已經(jīng)做了編碼,所以我...
摘要:而編碼器是講應用程序的數(shù)據(jù)轉化為網(wǎng)絡格式,解碼器則是講網(wǎng)絡格式轉化為應用程序,同時具備這兩種功能的單一組件就叫編解碼器。在中是老的編解碼器接口,而是新的編解碼器接口,并且已經(jīng)用把適配成了。 遠程通訊——開篇 目標:介紹之后解讀遠程通訊模塊的內容如何編排、介紹dubbo-remoting-api中的包結構設計以及最外層的的源碼解析。 前言 服務治理框架中可以大致分為服務通信和服務管理兩個...
摘要:可以參考源碼解析二十四遠程調用協(xié)議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠程通信層的四。二十的可以參考源碼解析十七遠程通信的一。 2.7大揭秘——消費端發(fā)送請求過程 目標:從源碼的角度分析一個服務方法調用經(jīng)歷怎么樣的磨難以后到達服務端。 前言 前一篇文章講到的是引用服務的過程,引用服務無非就是創(chuàng)建出一個代理。供消費者調用服務的相關方法。...
摘要:大揭秘異步化改造目標從源碼的角度分析的新特性中對于異步化的改造原理。看源碼解析四十六消費端發(fā)送請求過程講到的十四的,在以前的邏輯會直接在方法中根據(jù)配置區(qū)分同步異步單向調用。改為關于可以參考源碼解析十遠程通信層的六。 2.7大揭秘——異步化改造 目標:從源碼的角度分析2.7的新特性中對于異步化的改造原理。 前言 dubbo中提供了很多類型的協(xié)議,關于協(xié)議的系列可以查看下面的文章: du...
摘要:和斷開,處理措施不一樣,會分別做出重連和關閉通道的操作。取消定時器取消大量已排隊任務,用于回收空間該方法是停止現(xiàn)有心跳,也就是停止定時器,釋放空間。做到異步處理返回結果時能給準確的返回給對應的請求。 遠程通訊——Exchange層 目標:介紹Exchange層的相關設計和邏輯、介紹dubbo-remoting-api中的exchange包內的源碼解析。 前言 上一篇文章我講的是dubb...
閱讀 3239·2021-11-15 11:37
閱讀 2458·2021-09-29 09:48
閱讀 3821·2021-09-22 15:55
閱讀 3021·2021-09-22 10:02
閱讀 2641·2021-08-25 09:40
閱讀 3235·2021-08-03 14:03
閱讀 1700·2019-08-29 13:11
閱讀 1575·2019-08-29 12:49