摘要:本篇文章將介紹協議的解析思路。消息體實體類以下是對整個消息體抽象出來的一個實體類。
[TOC]
本篇文章將介紹JT808協議的解析思路。
另請大神繞路,不喜勿噴!
先寫個大致的思路,有疑問可以聯系本人,聯系方式:
emial: hylexus@163.com
1 JT808協議掃盲 1.1 數據類型數據類型 | 描述及要求 |
---|---|
BYTE | 無符號單字節整形(字節, 8 位) |
WORD | 無符號雙字節整形(字, 16 位) |
DWORD | 無符號四字節整形(雙字, 32 位) |
BYTE[n] | n 字節 |
BCD[n] | 8421 碼, n 字節 |
STRING | GBK 編碼,若無數據,置空 |
標識位 | 消息頭 | 消息體 | 校驗碼 | 標識位 |
---|---|---|---|---|
1byte(0x7e) | 16byte | 1byte | 1byte(0x7e) |
消息ID(0-1) 消息體屬性(2-3) 終端手機號(4-9) 消息流水號(10-11) 消息包封裝項(12-15) byte[0-1] 消息ID word(16) byte[2-3] 消息體屬性 word(16) bit[0-9] 消息體長度 bit[10-12] 數據加密方式 此三位都為 0,表示消息體不加密 第 10 位為 1,表示消息體經過 RSA 算法加密 其它保留 bit[13] 分包 1:消息體衛長消息,進行分包發送處理,具體分包信息由消息包封裝項決定 0:則消息頭中無消息包封裝項字段 bit[14-15] 保留 byte[4-9] 終端手機號或設備ID bcd[6] 根據安裝后終端自身的手機號轉換 手機號不足12 位,則在前面補 0 byte[10-11] 消息流水號 word(16) 按發送順序從 0 開始循環累加 byte[12-15] 消息包封裝項 byte[0-1] 消息包總數(word(16)) 該消息分包后得總包數 byte[2-3] 包序號(word(16)) 從 1 開始 如果消息體屬性中相關標識位確定消息分包處理,則該項有內容 否則無該項2 解析
整個消息體結構中最復雜的就是消息頭了。
2.1 消息體實體類以下是對整個消息體抽象出來的一個java實體類。
import java.nio.channels.Channel; public class PackageData { /** * 16byte 消息頭 */ protected MsgHeader msgHeader; // 消息體字節數組 protected byte[] msgBodyBytes; /** * 校驗碼 1byte */ protected int checkSum; //記錄每個客戶端的channel,以便下發信息給客戶端 protected Channel channel; public MsgHeader getMsgHeader() { return msgHeader; } //TODO set 和 get 方法在此處省略 //消息頭 public static class MsgHeader { // 消息ID protected int msgId; /////// ========消息體屬性 // byte[2-3] protected int msgBodyPropsField; // 消息體長度 protected int msgBodyLength; // 數據加密方式 protected int encryptionType; // 是否分包,true==>有消息包封裝項 protected boolean hasSubPackage; // 保留位[14-15] protected String reservedBit; /////// ========消息體屬性 // 終端手機號 protected String terminalPhone; // 流水號 protected int flowId; //////// =====消息包封裝項 // byte[12-15] protected int packageInfoField; // 消息包總數(word(16)) protected long totalSubPackage; // 包序號(word(16))這次發送的這個消息包是分包中的第幾個消息包, 從 1 開始 protected long subPackageSeq; //////// =====消息包封裝項 //TODO set 和 get 方法在此處省略 } }2.2 字節數組到消息體實體類的轉換 2.2.1 消息轉換器
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hylexus.jt808.util.BCD8421Operater; import cn.hylexus.jt808.util.BitOperator; import cn.hylexus.jt808.vo.PackageData; import cn.hylexus.jt808.vo.PackageData.MsgHeader; public class MsgDecoder { private static final Logger log = LoggerFactory.getLogger(MsgDecoder.class); private BitOperator bitOperator; private BCD8421Operater bcd8421Operater; public MsgDecoder() { this.bitOperator = new BitOperator(); this.bcd8421Operater = new BCD8421Operater(); } //字節數組到消息體實體類 public PackageData queueElement2PackageData(byte[] data) { PackageData ret = new PackageData(); // 1. 16byte 或 12byte 消息頭 MsgHeader msgHeader = this.parseMsgHeaderFromBytes(data); ret.setMsgHeader(msgHeader); int msgBodyByteStartIndex = 12; // 2. 消息體 // 有子包信息,消息體起始字節后移四個字節:消息包總數(word(16))+包序號(word(16)) if (msgHeader.isHasSubPackage()) { msgBodyByteStartIndex = 16; } byte[] tmp = new byte[msgHeader.getMsgBodyLength()]; System.arraycopy(data, msgBodyByteStartIndex, tmp, 0, tmp.length); ret.setMsgBodyBytes(tmp); // 3. 去掉分隔符之后,最后一位就是校驗碼 // int checkSumInPkg = // this.bitOperator.oneByteToInteger(data[data.length - 1]); int checkSumInPkg = data[data.length - 1]; int calculatedCheckSum = this.bitOperator.getCheckSum4JT808(data, 0, data.length - 1); ret.setCheckSum(checkSumInPkg); if (checkSumInPkg != calculatedCheckSum) { log.warn("檢驗碼不一致,msgid:{},pkg:{},calculated:{}", msgHeader.getMsgId(), checkSumInPkg, calculatedCheckSum); } return ret; } private MsgHeader parseMsgHeaderFromBytes(byte[] data) { MsgHeader msgHeader = new MsgHeader(); // 1. 消息ID word(16) // byte[] tmp = new byte[2]; // System.arraycopy(data, 0, tmp, 0, 2); // msgHeader.setMsgId(this.bitOperator.twoBytesToInteger(tmp)); msgHeader.setMsgId(this.parseIntFromBytes(data, 0, 2)); // 2. 消息體屬性 word(16)=================> // System.arraycopy(data, 2, tmp, 0, 2); // int msgBodyProps = this.bitOperator.twoBytesToInteger(tmp); int msgBodyProps = this.parseIntFromBytes(data, 2, 2); msgHeader.setMsgBodyPropsField(msgBodyProps); // [ 0-9 ] 0000,0011,1111,1111(3FF)(消息體長度) msgHeader.setMsgBodyLength(msgBodyProps & 0x1ff); // [10-12] 0001,1100,0000,0000(1C00)(加密類型) msgHeader.setEncryptionType((msgBodyProps & 0xe00) >> 10); // [ 13_ ] 0010,0000,0000,0000(2000)(是否有子包) msgHeader.setHasSubPackage(((msgBodyProps & 0x2000) >> 13) == 1); // [14-15] 1100,0000,0000,0000(C000)(保留位) msgHeader.setReservedBit(((msgBodyProps & 0xc000) >> 14) + ""); // 消息體屬性 word(16)<================= // 3. 終端手機號 bcd[6] // tmp = new byte[6]; // System.arraycopy(data, 4, tmp, 0, 6); // msgHeader.setTerminalPhone(this.bcd8421Operater.bcd2String(tmp)); msgHeader.setTerminalPhone(this.parseBcdStringFromBytes(data, 4, 6)); // 4. 消息流水號 word(16) 按發送順序從 0 開始循環累加 // tmp = new byte[2]; // System.arraycopy(data, 10, tmp, 0, 2); // msgHeader.setFlowId(this.bitOperator.twoBytesToInteger(tmp)); msgHeader.setFlowId(this.parseIntFromBytes(data, 10, 2)); // 5. 消息包封裝項 // 有子包信息 if (msgHeader.isHasSubPackage()) { // 消息包封裝項字段 msgHeader.setPackageInfoField(this.parseIntFromBytes(data, 12, 4)); // byte[0-1] 消息包總數(word(16)) // tmp = new byte[2]; // System.arraycopy(data, 12, tmp, 0, 2); // msgHeader.setTotalSubPackage(this.bitOperator.twoBytesToInteger(tmp)); msgHeader.setTotalSubPackage(this.parseIntFromBytes(data, 12, 2)); // byte[2-3] 包序號(word(16)) 從 1 開始 // tmp = new byte[2]; // System.arraycopy(data, 14, tmp, 0, 2); // msgHeader.setSubPackageSeq(this.bitOperator.twoBytesToInteger(tmp)); msgHeader.setSubPackageSeq(this.parseIntFromBytes(data, 12, 2)); } return msgHeader; } protected String parseStringFromBytes(byte[] data, int startIndex, int lenth) { return this.parseStringFromBytes(data, startIndex, lenth, null); } private String parseStringFromBytes(byte[] data, int startIndex, int lenth, String defaultVal) { try { byte[] tmp = new byte[lenth]; System.arraycopy(data, startIndex, tmp, 0, lenth); return new String(tmp, "UTF-8"); } catch (Exception e) { log.error("解析字符串出錯:{}", e.getMessage()); e.printStackTrace(); return defaultVal; } } private String parseBcdStringFromBytes(byte[] data, int startIndex, int lenth) { return this.parseBcdStringFromBytes(data, startIndex, lenth, null); } private String parseBcdStringFromBytes(byte[] data, int startIndex, int lenth, String defaultVal) { try { byte[] tmp = new byte[lenth]; System.arraycopy(data, startIndex, tmp, 0, lenth); return this.bcd8421Operater.bcd2String(tmp); } catch (Exception e) { log.error("解析BCD(8421碼)出錯:{}", e.getMessage()); e.printStackTrace(); return defaultVal; } } private int parseIntFromBytes(byte[] data, int startIndex, int length) { return this.parseIntFromBytes(data, startIndex, length, 0); } private int parseIntFromBytes(byte[] data, int startIndex, int length, int defaultVal) { try { // 字節數大于4,從起始索引開始向后處理4個字節,其余超出部分丟棄 final int len = length > 4 ? 4 : length; byte[] tmp = new byte[len]; System.arraycopy(data, startIndex, tmp, 0, len); return bitOperator.byteToInteger(tmp); } catch (Exception e) { log.error("解析整數出錯:{}", e.getMessage()); e.printStackTrace(); return defaultVal; } } }2.2.2 用到的工具類 2.2.2.1 BCD操作工具類
package cn.hylexus.jt808.util; public class BCD8421Operater { /** * BCD字節數組===>String * * @param bytes * @return 十進制字符串 */ public String bcd2String(byte[] bytes) { StringBuilder temp = new StringBuilder(bytes.length * 2); for (int i = 0; i < bytes.length; i++) { // 高四位 temp.append((bytes[i] & 0xf0) >>> 4); // 低四位 temp.append(bytes[i] & 0x0f); } return temp.toString().substring(0, 1).equalsIgnoreCase("0") ? temp.toString().substring(1) : temp.toString(); } /** * 字符串==>BCD字節數組 * * @param str * @return BCD字節數組 */ public byte[] string2Bcd(String str) { // 奇數,前補零 if ((str.length() & 0x1) == 1) { str = "0" + str; } byte ret[] = new byte[str.length() / 2]; byte bs[] = str.getBytes(); for (int i = 0; i < ret.length; i++) { byte high = ascII2Bcd(bs[2 * i]); byte low = ascII2Bcd(bs[2 * i + 1]); // TODO 只遮罩BCD低四位? ret[i] = (byte) ((high << 4) | low); } return ret; } private byte ascII2Bcd(byte asc) { if ((asc >= "0") && (asc <= "9")) return (byte) (asc - "0"); else if ((asc >= "A") && (asc <= "F")) return (byte) (asc - "A" + 10); else if ((asc >= "a") && (asc <= "f")) return (byte) (asc - "a" + 10); else return (byte) (asc - 48); } }2.2.2.2 位操作工具類
package cn.hylexus.jt808.util; import java.util.Arrays; import java.util.List; public class BitOperator { /** * 把一個整形該為byte * * @param value * @return * @throws Exception */ public byte integerTo1Byte(int value) { return (byte) (value & 0xFF); } /** * 把一個整形該為1位的byte數組 * * @param value * @return * @throws Exception */ public byte[] integerTo1Bytes(int value) { byte[] result = new byte[1]; result[0] = (byte) (value & 0xFF); return result; } /** * 把一個整形改為2位的byte數組 * * @param value * @return * @throws Exception */ public byte[] integerTo2Bytes(int value) { byte[] result = new byte[2]; result[0] = (byte) ((value >>> 8) & 0xFF); result[1] = (byte) (value & 0xFF); return result; } /** * 把一個整形改為3位的byte數組 * * @param value * @return * @throws Exception */ public byte[] integerTo3Bytes(int value) { byte[] result = new byte[3]; result[0] = (byte) ((value >>> 16) & 0xFF); result[1] = (byte) ((value >>> 8) & 0xFF); result[2] = (byte) (value & 0xFF); return result; } /** * 把一個整形改為4位的byte數組 * * @param value * @return * @throws Exception */ public byte[] integerTo4Bytes(int value){ byte[] result = new byte[4]; result[0] = (byte) ((value >>> 24) & 0xFF); result[1] = (byte) ((value >>> 16) & 0xFF); result[2] = (byte) ((value >>> 8) & 0xFF); result[3] = (byte) (value & 0xFF); return result; } /** * 把byte[]轉化位整形,通常為指令用 * * @param value * @return * @throws Exception */ public int byteToInteger(byte[] value) { int result; if (value.length == 1) { result = oneByteToInteger(value[0]); } else if (value.length == 2) { result = twoBytesToInteger(value); } else if (value.length == 3) { result = threeBytesToInteger(value); } else if (value.length == 4) { result = fourBytesToInteger(value); } else { result = fourBytesToInteger(value); } return result; } /** * 把一個byte轉化位整形,通常為指令用 * * @param value * @return * @throws Exception */ public int oneByteToInteger(byte value) { return (int) value & 0xFF; } /** * 把一個2位的數組轉化位整形 * * @param value * @return * @throws Exception */ public int twoBytesToInteger(byte[] value) { // if (value.length < 2) { // throw new Exception("Byte array too short!"); // } int temp0 = value[0] & 0xFF; int temp1 = value[1] & 0xFF; return ((temp0 << 8) + temp1); } /** * 把一個3位的數組轉化位整形 * * @param value * @return * @throws Exception */ public int threeBytesToInteger(byte[] value) { int temp0 = value[0] & 0xFF; int temp1 = value[1] & 0xFF; int temp2 = value[2] & 0xFF; return ((temp0 << 16) + (temp1 << 8) + temp2); } /** * 把一個4位的數組轉化位整形,通常為指令用 * * @param value * @return * @throws Exception */ public int fourBytesToInteger(byte[] value) { // if (value.length < 4) { // throw new Exception("Byte array too short!"); // } int temp0 = value[0] & 0xFF; int temp1 = value[1] & 0xFF; int temp2 = value[2] & 0xFF; int temp3 = value[3] & 0xFF; return ((temp0 << 24) + (temp1 << 16) + (temp2 << 8) + temp3); } /** * 把一個4位的數組轉化位整形 * * @param value * @return * @throws Exception */ public long fourBytesToLong(byte[] value) throws Exception { // if (value.length < 4) { // throw new Exception("Byte array too short!"); // } int temp0 = value[0] & 0xFF; int temp1 = value[1] & 0xFF; int temp2 = value[2] & 0xFF; int temp3 = value[3] & 0xFF; return (((long) temp0 << 24) + (temp1 << 16) + (temp2 << 8) + temp3); } /** * 把一個數組轉化長整形 * * @param value * @return * @throws Exception */ public long bytes2Long(byte[] value) { long result = 0; int len = value.length; int temp; for (int i = 0; i < len; i++) { temp = (len - 1 - i) * 8; if (temp == 0) { result += (value[i] & 0x0ff); } else { result += (value[i] & 0x0ff) << temp; } } return result; } /** * 把一個長整形改為byte數組 * * @param value * @return * @throws Exception */ public byte[] longToBytes(long value){ return longToBytes(value, 8); } /** * 把一個長整形改為byte數組 * * @param value * @return * @throws Exception */ public byte[] longToBytes(long value, int len) { byte[] result = new byte[len]; int temp; for (int i = 0; i < len; i++) { temp = (len - 1 - i) * 8; if (temp == 0) { result[i] += (value & 0x0ff); } else { result[i] += (value >>> temp) & 0x0ff; } } return result; } /** * 得到一個消息ID * * @return * @throws Exception */ public byte[] generateTransactionID() throws Exception { byte[] id = new byte[16]; System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 0, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 2, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 4, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 6, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 8, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 10, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 12, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 14, 2); return id; } /** * 把IP拆分位int數組 * * @param ip * @return * @throws Exception */ public int[] getIntIPValue(String ip) throws Exception { String[] sip = ip.split("[.]"); // if (sip.length != 4) { // throw new Exception("error IPAddress"); // } int[] intIP = { Integer.parseInt(sip[0]), Integer.parseInt(sip[1]), Integer.parseInt(sip[2]), Integer.parseInt(sip[3]) }; return intIP; } /** * 把byte類型IP地址轉化位字符串 * * @param address * @return * @throws Exception */ public String getStringIPValue(byte[] address) throws Exception { int first = this.oneByteToInteger(address[0]); int second = this.oneByteToInteger(address[1]); int third = this.oneByteToInteger(address[2]); int fourth = this.oneByteToInteger(address[3]); return first + "." + second + "." + third + "." + fourth; } /** * 合并字節數組 * * @param first * @param rest * @return */ public byte[] concatAll(byte[] first, byte[]... rest) { int totalLength = first.length; for (byte[] array : rest) { if (array != null) { totalLength += array.length; } } byte[] result = Arrays.copyOf(first, totalLength); int offset = first.length; for (byte[] array : rest) { if (array != null) { System.arraycopy(array, 0, result, offset, array.length); offset += array.length; } } return result; } /** * 合并字節數組 * * @param rest * @return */ public byte[] concatAll(List2.3 和netty結合 2.3.1 netty處理器鏈rest) { int totalLength = 0; for (byte[] array : rest) { if (array != null) { totalLength += array.length; } } byte[] result = new byte[totalLength]; int offset = 0; for (byte[] array : rest) { if (array != null) { System.arraycopy(array, 0, result, offset, array.length); offset += array.length; } } return result; } public float byte2Float(byte[] bs) { return Float.intBitsToFloat( (((bs[3] & 0xFF) << 24) + ((bs[2] & 0xFF) << 16) + ((bs[1] & 0xFF) << 8) + (bs[0] & 0xFF))); } public float byteBE2Float(byte[] bytes) { int l; l = bytes[0]; l &= 0xff; l |= ((long) bytes[1] << 8); l &= 0xffff; l |= ((long) bytes[2] << 16); l &= 0xffffff; l |= ((long) bytes[3] << 24); return Float.intBitsToFloat(l); } public int getCheckSum4JT808(byte[] bs, int start, int end) { if (start < 0 || end > bs.length) throw new ArrayIndexOutOfBoundsException("getCheckSum4JT808 error : index out of bounds(start=" + start + ",end=" + end + ",bytes length=" + bs.length + ")"); int cs = 0; for (int i = start; i < end; i++) { cs ^= bs[i]; } return cs; } public int getBitRange(int number, int start, int end) { if (start < 0) throw new IndexOutOfBoundsException("min index is 0,but start = " + start); if (end >= Integer.SIZE) throw new IndexOutOfBoundsException("max index is " + (Integer.SIZE - 1) + ",but end = " + end); return (number << Integer.SIZE - (end + 1)) >>> Integer.SIZE - (end - start + 1); } public int getBitAt(int number, int index) { if (index < 0) throw new IndexOutOfBoundsException("min index is 0,but " + index); if (index >= Integer.SIZE) throw new IndexOutOfBoundsException("max index is " + (Integer.SIZE - 1) + ",but " + index); return ((1 << index) & number) >> index; } public int getBitAtS(int number, int index) { String s = Integer.toBinaryString(number); return Integer.parseInt(s.charAt(index) + ""); } @Deprecated public int getBitRangeS(int number, int start, int end) { String s = Integer.toBinaryString(number); StringBuilder sb = new StringBuilder(s); while (sb.length() < Integer.SIZE) { sb.insert(0, "0"); } String tmp = sb.reverse().substring(start, end + 1); sb = new StringBuilder(tmp); return Integer.parseInt(sb.reverse().toString(), 2); } }
import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.kkbc.tpms.tcp.service.TCPServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; public class TCPServer2 { private Logger log = LoggerFactory.getLogger(getClass()); private volatile boolean isRunning = false; private EventLoopGroup bossGroup = null; private EventLoopGroup workerGroup = null; private int port; public TCPServer2() { } public TCPServer2(int port) { this(); this.port = port; } private void bind() throws Exception { this.bossGroup = new NioEventLoopGroup(); this.workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup)// .channel(NioServerSocketChannel.class) // .childHandler(new ChannelInitializer2.3.2 netty針對于JT808的消息處理器() { // @Override public void initChannel(SocketChannel ch) throws Exception { //超過15分鐘未收到客戶端消息則自動斷開客戶端連接 ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES)); //ch.pipeline().addLast(new Decoder4LoggingOnly()); // 1024表示單條消息的最大長度,解碼器在查找分隔符的時候,達到該長度還沒找到的話會拋異常 ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] { 0x7e }), Unpooled.copiedBuffer(new byte[] { 0x7e, 0x7e }))); ch.pipeline().addLast(new TCPServerHandler()); } }).option(ChannelOption.SO_BACKLOG, 128) // .childOption(ChannelOption.SO_KEEPALIVE, true); this.log.info("TCP服務啟動完畢,port={}", this.port); ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); channelFuture.channel().closeFuture().sync(); } public synchronized void startServer() { if (this.isRunning) { throw new IllegalStateException(this.getName() + " is already started ."); } this.isRunning = true; new Thread(() -> { try { this.bind(); } catch (Exception e) { this.log.info("TCP服務啟動出錯:{}", e.getMessage()); e.printStackTrace(); } }, this.getName()).start(); } public synchronized void stopServer() { if (!this.isRunning) { throw new IllegalStateException(this.getName() + " is not yet started ."); } this.isRunning = false; try { Future> future = this.workerGroup.shutdownGracefully().await(); if (!future.isSuccess()) { log.error("workerGroup 無法正常停止:{}", future.cause()); } future = this.bossGroup.shutdownGracefully().await(); if (!future.isSuccess()) { log.error("bossGroup 無法正常停止:{}", future.cause()); } } catch (InterruptedException e) { e.printStackTrace(); } this.log.info("TCP服務已經停止..."); } private String getName() { return "TCP-Server"; } public static void main(String[] args) throws Exception { TCPServer2 server = new TCPServer2(20048); server.startServer(); // Thread.sleep(3000); // server.stopServer(); } }
package cn.hylexus.jt808.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hylexus.jt808.server.SessionManager; import cn.hylexus.jt808.service.codec.MsgDecoder; import cn.hylexus.jt808.vo.PackageData; import cn.hylexus.jt808.vo.Session; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1) private final Logger logger = LoggerFactory.getLogger(getClass()); // 一個維護客戶端連接的類 private final SessionManager sessionManager; private MsgDecoder decoder = new MsgDecoder(); public TCPServerHandler() { this.sessionManager = SessionManager.getInstance(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException { // (2) try { ByteBuf buf = (ByteBuf) msg; if (buf.readableBytes() <= 0) { // ReferenceCountUtil.safeRelease(msg); return; } byte[] bs = new byte[buf.readableBytes()]; buf.readBytes(bs); PackageData jt808Msg = this.decoder.queueElement2PackageData(bs); // 處理客戶端消息 this.processClientMsg(jt808Msg); } finally { release(msg); } } private void processClientMsg(PackageData jt808Msg) { // TODO 更加消息ID的不同,分別實現自己的業務邏輯 if (jt808Msg.getMsgHeader().getMsgId() == 0x900) { // TODO ... } else if (jt808Msg.getMsgHeader().getMsgId() == 0x9001) { // TODO ... } // else if(){} // else if(){} // else if(){} // else if(){} // ... else { logger.error("位置消息,消息ID={}", jt808Msg.getMsgHeader().getMsgId()); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) logger.error("發生異常:{}", cause.getMessage()); cause.printStackTrace(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Session session = Session.buildSession(ctx.channel()); sessionManager.put(session.getId(), session); logger.debug("終端連接:{}", session); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { final String sessionId = ctx.channel().id().asLongText(); Session session = sessionManager.findBySessionId(sessionId); this.sessionManager.removeBySessionId(sessionId); logger.debug("終端斷開連接:{}", session); ctx.channel().close(); // ctx.close(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel())); logger.error("服務器主動斷開連接:{}", session); ctx.close(); } } } private void release(Object msg) { try { ReferenceCountUtil.release(msg); } catch (Exception e) { e.printStackTrace(); } } }2.3.3 用到的其他類
package cn.hylexus.jt808.server; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import java.util.stream.Collectors; import cn.hylexus.jt808.vo.Session; public class SessionManager { private static volatile SessionManager instance = null; // netty生成的sessionID和Session的對應關系 private Map3 demo級別java示例sessionIdMap; // 終端手機號和netty生成的sessionID的對應關系 private Map phoneMap; public static SessionManager getInstance() { if (instance == null) { synchronized (SessionManager.class) { if (instance == null) { instance = new SessionManager(); } } } return instance; } public SessionManager() { this.sessionIdMap = new ConcurrentHashMap<>(); this.phoneMap = new ConcurrentHashMap<>(); } public boolean containsKey(String sessionId) { return sessionIdMap.containsKey(sessionId); } public boolean containsSession(Session session) { return sessionIdMap.containsValue(session); } public Session findBySessionId(String id) { return sessionIdMap.get(id); } public Session findByTerminalPhone(String phone) { String sessionId = this.phoneMap.get(phone); if (sessionId == null) return null; return this.findBySessionId(sessionId); } public synchronized Session put(String key, Session value) { if (value.getTerminalPhone() != null && !"".equals(value.getTerminalPhone().trim())) { this.phoneMap.put(value.getTerminalPhone(), value.getId()); } return sessionIdMap.put(key, value); } public synchronized Session removeBySessionId(String sessionId) { if (sessionId == null) return null; Session session = sessionIdMap.remove(sessionId); if (session == null) return null; if (session.getTerminalPhone() != null) this.phoneMap.remove(session.getTerminalPhone()); return session; } public Set keySet() { return sessionIdMap.keySet(); } public void forEach(BiConsumer super String, ? super Session> action) { sessionIdMap.forEach(action); } public Set > entrySet() { return sessionIdMap.entrySet(); } public List toList() { return this.sessionIdMap.entrySet().stream().map(e -> e.getValue()).collect(Collectors.toList()); } }
請移步: https://github.com/hylexus/jt...
另請不要吝嗇,在GitHub給個star讓小可裝裝逼…………^_^
注急急忙忙寫的博客,先寫個大致的思路,有疑問可以聯系本人,聯系方式:
emial: hylexus@163.com
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/66624.html
摘要:是定位協議通訊協議基礎協議其他協議基于該協議進行擴展。是轉發協議監管協議第三方平臺通過向進行數據獲取與事件下發。蘇標主動安全協議高級駕駛輔助報警駕駛員狀態報警胎壓監測報警盲區監測報警在觸發報警時需要上報附件視頻圖片文本。 ...
摘要:將接收到的消息還原轉義后除去消息標識和校驗位,按位異或得到的結果就是這條消息的校驗碼,和校驗位比對驗證其的一致性。將要發出的消息封裝好后出去標示位外,按位異或,得到的校驗碼放在消息尾部,然后轉義。 終端是指obd設備,既車載obd設備。 平臺是指上文中說到的通過短信設置的上報IP指向的機器所提供的網關服務。 這兩種消息一是終端設備發出的,一是平臺發出的,都是通用應答的格式,所謂通用既是...
摘要:并返回合理錯誤提示。如果不在則再輸入密碼,成功則增加用戶信息到文件中,密碼進行加密處理。作業增加用戶名,密碼的合法化判斷和錯誤提示。 課時5:字符串-基礎 切片,索引 s = use python do somenthing s[1],s[-1],s[1:3],s[1:6:2],s[1:],s[:-1],s[:] spilt,join,[start:stop:step] 常用方法集...
單選按鈕+復選框 單選按鈕、復選框是什么這個都知道,不做解釋。上代碼(自己寫著玩的,排班不太好)package jframe;import java.awt.BorderLayout;import java.awt.Container;import java.awt.FlowLayout;import java.awt.event.ActionEvent;import java.awt.event...
摘要:的好處在于,在診斷問題的時候能夠知道的原因推薦使用帶有的操作函數作用用于掛起當前線程,如果許可可用,會立馬返回,并消費掉許可。 LockSupport是用來創建locks的基本線程阻塞基元,比如AQS中實現線程掛起的方法,就是park,對應喚醒就是unpark。JDK中有使用的如下 showImg(https://segmentfault.com/img/bVblcXS?w=884&h...
閱讀 1311·2021-11-24 10:24
閱讀 4089·2021-11-22 15:29
閱讀 1085·2019-08-30 15:53
閱讀 2788·2019-08-30 10:54
閱讀 1977·2019-08-29 17:26
閱讀 1271·2019-08-29 17:08
閱讀 605·2019-08-28 17:55
閱讀 1576·2019-08-26 14:01