摘要:無證連接進行異常記錄并關閉連接。離線消息檢測到上線立即推送這是消息推送需要實現的基本功能之一了,詳見代碼。主要功能協助進行初始化,心跳包檢測,斷線自動重連消息推送的第二種方式在下篇中再編寫
消息重發中需要注意的問題
由于最近工作中接觸了比較多關閉消息推送以及異常重發機制的問題,終于得空總結一下經驗
目前接觸的消息推送分為兩種
主動推送:一般為websocket建立長連接實現,此處網上多有各種實現方式。下面貼出本人結合實際應用場景使用的長連接方式。
websocket服務端代碼
import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import javax.annotation.PostConstruct; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @ServerEndpoint(value = "/websocket/{id}") @Component @Slf4j public class WebSocket { // 靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。 private static int onlineCount = 0; // concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。 private static ConcurrentHashMapwebSocketSet = new ConcurrentHashMap<>(); // 保存允許建立連接的id private static List idList = Lists.newArrayList(); private String id = ""; /** * 這里使用AutoWired注入的bean會出現無法持續保存而出現null的情況。 * 具體原因暫時沒有深究,如果有需要時,可以再init初始化方法中手動將臨時的beanTmp類存入static常量中即可正常使用該bean類。 * @Autowired * private RedisCacheUtil redisTmp; * private static RedisCacheUtil redis; * */ // 與某個客戶端的連接會話,需要通過它來給客戶端發送數據 private Session session; public void closeConn(String appId) { // 關閉連接 try { WebSocket socket = webSocketSet.get(appId); if (null != socket) { if (socket.session.isOpen()) { socket.session.close(); } } } catch (IOException e) { System.out.println("IO異常"); e.printStackTrace(); } idList.remove(appId); } /** * 連接/注冊時去重 */ public void conn(String appId) { // 去重 if (!idList.contains(appId)) { idList.add(appId); } } /** * 獲取注冊在websocket進行連接的id */ public static List getIdList() { return idList; } /** * 初始化方法 * @author caoting * @date 2019年2月13日 */ @PostConstruct public void init() { try { /** * TODO 這里的設計是在項目啟動時從DB或者緩存中獲取注冊了允許建立連接的id * 然后將獲取到的id存入內存--idList * // 從數據庫獲取idList * List ids = wsIdsServiceTmp.selectList(null); */ // TODO 初始化時將剛注入的對象進行靜態保存 // redis = redisTmp; } catch (Exception e) { // TODO 項目啟動錯誤信息 } } /** * 連接啟動時查詢是否有滯留的新郵件提醒 * @param id * * @author caoting * @throws IOException * @date 2019年2月28日 */ private void selectOfflineMail(String id) throws IOException { // 查詢緩存中是否存在離線郵件消息 Jedis jedis = redis.getConnection(); try { List mails = jedis.lrange(Constant.MAIL_OFFLINE+id, 0, -1); if (CommomUtil.isNotEmpty(mails)) { for (String mailuuid : mails) { String mail = jedis.get(mailuuid); if (StringUtils.isNotEmpty(mail)) sendToUser(Constant.MESSAGE_MAIL + mail, id); Thread.sleep(1000); } // 發送完成從緩存中移除 jedis.del(Constant.MAIL_OFFLINE+id); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { jedis.close(); } } /** * 連接建立成功調用的方法 * @param id */ @OnOpen public void onOpen(@PathParam(value = "id") String id, Session session) { try { // 注:ws-admin是管理員內部使用通道 不受監控 謹慎使用 if (!id.contains(Constant.WS_ADMIN)) { this.session = session; this.id = id;//接收到發送消息的人員編號 // 驗證id是否在允許 if (idList.contains(id)) { // 判斷是否已存在相同id WebSocket socket = webSocketSet.get(id); if (socket == null) { webSocketSet.put(id, this); //加入set中 addOnlineCount(); // 在線數加1 this.sendMessage("Hello:::" + id); System.out.println("用戶"+id+"加入!當前在線人數為" + getOnlineCount()); // 檢查是否存在離線推送消息 selectOfflineMail(id); } else { this.sendMessage(Constant.MESSAGE_ERROR+"連接id重復--連接即將關閉"); this.session.close(); } } else { // 查詢數據庫中是否存在數據 WsIds wsIds = wsIdsService.selectByAppId(id); if ( null != wsIds ) { idList.add(id); webSocketSet.put(id, this); //加入set中 addOnlineCount(); // 在線數加1 this.sendMessage("Hello:::" + id); log.debug("用戶"+id+"加入!當前在線人數為" + getOnlineCount()); // 檢查是否存在離線推送消息 selectOfflineMail(id); } else { // 關閉 this.sendMessage(Constant.MESSAGE_ERROR+"暫無連接權限,連接即將關閉,請確認連接申請是否過期!"); this.session.close(); log.warn("有異常應用嘗試與服務器進行長連接 使用id為:"+id); } } } else { this.session = session; this.id = id;//接收到發送消息的人員編號 webSocketSet.put(id, this); //加入set中 addOnlineCount(); // 在線數加1 this.sendMessage("Hello:::" + id); log.debug("用戶"+id+"加入!當前在線人數為" + getOnlineCount()); } } catch (IOException e) { e.printStackTrace(); } } /** * 連接關閉調用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this.id); // 從set中刪除 subOnlineCount(); // 在線數減1 log.debug("有一連接關閉!當前在線人數為" + getOnlineCount()); } /** * 收到客戶端消息后調用的方法 * * @param message * 客戶端發送過來的消息 */ @OnMessage public void onMessage(String message, Session session) { log.debug("來自客戶端的消息:" + message); // TODO 收到客戶端消息后的操作 } /** * 發生錯誤時調用 */ @OnError public void onError(Session session, Throwable error) { log.debug("發生錯誤"); error.printStackTrace(); } public void sendMessage(String message) throws IOException { this.session.getAsyncRemote().sendText(message); } /** * 發送信息給指定ID用戶,如果用戶不在線則返回不在線信息給自己 * @param message * @param sendUserId * @throws IOException */ public Boolean sendToUser(String message, String sendUserId) throws IOException { Boolean flag = true; WebSocket socket = webSocketSet.get(sendUserId); if (socket != null) { try { if (socket.session.isOpen()) { socket.sendMessage(message); } else { flag = false; } } catch (Exception e) { flag = false; e.printStackTrace(); } } else { flag = false; log.warn("【" + sendUserId + "】 該用戶不在線"); } return flag; } /** * 群發自定義消息 */ public void sendToAll(String message) throws IOException { for (String key : webSocketSet.keySet()) { try { WebSocket socket = webSocketSet.get(key); if (socket.session.isOpen()) { socket.sendMessage(message); } } catch (IOException e) { continue; } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocket.onlineCount++; } public static synchronized void subOnlineCount() { if (WebSocket.onlineCount > 0) WebSocket.onlineCount--; } }
這里使用的是較為原始的websocket連接方式,事實上springboot已經融合了websocket,工作關系沒有空暫未研究。記錄一下有空了再去寫寫demo。這個socket服務端主要實現了:1. 連接控制,建立連接時驗證id的合法性。無證連接進行異常記錄并關閉連接。2. 離線消息檢測到上線立即推送 這是消息推送需要實現的基本功能之一了,詳見代碼。3. 統計在線人數 依舊是基本功能
下面是websocket服務端配置類WebSocketServerConfig
import lombok.extern.slf4j.Slf4j; import org.apache.catalina.session.StandardSessionFacade; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; import javax.servlet.http.HttpSession; import javax.websocket.HandshakeResponse; import javax.websocket.server.HandshakeRequest; import javax.websocket.server.ServerEndpointConfig; import javax.websocket.server.ServerEndpointConfig.Configurator; @Configuration @Slf4j public class WebSocketServerConfig extends Configurator { @Override public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { /* 如果沒有監聽器,那么這里獲取到的HttpSession是null */ StandardSessionFacade ssf = (StandardSessionFacade) request.getHttpSession(); if (ssf != null) { HttpSession httpSession = (HttpSession) request.getHttpSession(); // 關鍵操作 sec.getUserProperties().put("sessionId", httpSession.getId()); log.debug("獲取到的SessionID:" + httpSession.getId()); } } /** * 如果使用獨立的servlet容器,而不是直接使用springboot的內置容器 * 就不要注入ServerEndpointExporter,因為它將由容器自己提供和管理。 * 即:生產環境中在獨立的tomcat運行時請注釋掉這個bean * * @return * * @author caoting * @date 2019年2月20日 */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
這里其實有個坑,就是上述代碼中的bean類 serverEndpointExporter,開發環境如果不是配置獨立的tomcat運行的話是需要注入的,但是生產環境下在獨立的tomcat容器運行時是需要注釋掉的,否則會報錯。
很重要的session監聽器 RequestListener
import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.servlet.ServletRequestEvent; import javax.servlet.ServletRequestListener; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpSession; /** * 監聽器類:主要任務是用ServletRequest將我們的HttpSession攜帶過去 * 此注解千萬千萬不要忘記,它的主要作用就是將這個監聽器納入到Spring容器中進行管理,相當于注冊監聽 */ @Component @Slf4j public class RequestListener implements ServletRequestListener { @Override public void requestInitialized(ServletRequestEvent sre) { // 將所有request請求都攜帶上httpSession HttpSession httpSession = ((HttpServletRequest) sre.getServletRequest()).getSession(); log.debug("將所有request請求都攜帶上httpSession " + httpSession.getId()); } public RequestListener() { } @Override public void requestDestroyed(ServletRequestEvent arg0) { } }
以上就是一個websocket服務端需要的所有配置和類
websocket客戶端代碼
import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.curator.shaded.com.google.common.collect.Maps; import redis.clients.jedis.Jedis; import javax.websocket.*; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; /** * @author caoting * @date 2018年9月27日 */ @Slf4j @ClientEndpoint public class MailWebSocketClient { private static RedisCacheUtil redis; protected void setRedis(RedisCacheUtil redisTmp) { redis = redisTmp; } /** * @author caoting * @date 2019年3月11日 */ public static void doSomething() { // TODO 由于這個類沒有寫初始化方法,但是有些初始化操作必須完成, // 因此在socket配置類中調用此方法可以完成一些需要初始化注入的操作 } private Session session; @OnOpen public void open(Session session) { log.info("連接開啟..."); this.session = session; } @OnMessage public void onMessage(String message) { log.info("來自服務端的消息: " + message); // TODO 對消息進行過濾判斷處理 // 不做過多操作影響性能 直接交給異步任務處理--這個辦法還是比較low的現在springboot有更好的解決辦法@Async 有空再記錄下多線程異步處理任務調度的相關代碼。 ExecutorService executor = Executors.newSingleThreadExecutor(); FutureTaskfuture = new FutureTask (new Callable () {// 使用Callable接口作為構造參數 public Boolean call() { return pushMsg(message); } }); executor.execute(future); Boolean res = CommomUtil.timeOutTask(future, executor, 600); if (res != null && res) log.info("操作成功"); else log.info("操作失敗"); } /** * @author caoting * @date 2019年3月11日 */ private Boolean pushMailMsg(String message) { Boolean flag = true; // 推送消息 ReceiverRes resObj = new ReceiverRes(); try { resObj = restTemplate.httpPostMediaTypeJson(url, ReceiverRes.class, message); } catch (Exception e) { // 這里異常一般是http接口服務宕機了,所以放進緩存在對方上線時進行重新推送 resObj.setCode(500); log.error(e.getMessage(), e); } // ====推送完成后的后續異常檢查與數據重發工作 這里是一個redis任務調度 處理失敗任務的典型案例 看不懂就刪掉 Integer code = resObj.getCode(); if (code == 500) { // 發送失敗存進redis緩存 按照約定好的狀態碼進行判斷 jedis.lpush(Constant.PUSH_ERROR, mailMapJson); } else { // 發送成功以后查詢以前出錯的數據進行重新推送。--這種辦法只適合消息很頻繁的,畢竟不頻繁的等下次發消息又不知道是何時了,因此需要采用別的方法 while (true) { // 查詢以往的異常發送數據 重新發送 String jsonMap = jedis.rpoplpush(Constant.PUSH_ERROR, Constant.PUSH_ERROR_TMP); if (StringUtils.isEmpty(jsonMap)) { break; } try { errObj = restTemplate.httpPostMediaTypeJson(receiverUrl, ReceiverRes.class, message); } catch (Exception e) { errObj.setCode(500); log.error(e.getMessage(), e); } if (errObj.getCode() == 500) { // 再次失敗 彈回原隊列 jedis.rpoplpush(Constant.PUSH_ERROR_TMP, Constant.PUSH_ERROR); } else { jedis.rpop(Constant.PUSH_ERROR_TMP); } } } return flag; } @OnClose public void onClose() { log.info("長連接關閉..."); } @OnError public void onError(Session session, Throwable t) { t.printStackTrace(); } public void send(String message) { this.session.getAsyncRemote().sendText(message); } public void close() throws IOException { if (this.session.isOpen()) { this.session.close(); } } }
上面是websocket客戶端的代碼。其中主要有:1、http推送失敗重發機制 2、redis任務調度經典案例
websocket客戶端配置類WebSocketConfig
import com.hnpolice.business.service.ApplicationService; import com.hnpolice.sso.common.ex.BaseException; import com.hnpolice.sso.common.utils.RedisCacheUtil; import com.hnpolice.sync.RestTemplateFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import javax.websocket.ContainerProvider; import javax.websocket.WebSocketContainer; import java.net.URI; @Slf4j @Component public class WebSocketConfig implements ApplicationRunner { @Autowired private RedisCacheUtil redisTmp; private static Boolean isOk; private MailWebSocketClient client; private static WebSocketContainer conmtainer = ContainerProvider.getWebSocketContainer(); @Override public void run(ApplicationArguments args) throws Exception { // 跟隨項目啟動的方法可以在這里做一些初始化工作 // websocket客戶端初始化 wsClientInit(); } public void wsClientInit() { try { client = new MailWebSocketClient(); client.setRedis(redisTmp); MailWebSocketClient.dosomething(); conmtainer.connectToServer(client, new URI(##socket服務連接地址##)); isOk = true; } catch (Exception e) { isOk = false; log.error(e); } // 斷線重連 while (true) { if (isOk != null && isOk) { try { client.send("ping:"+appId); } catch (Exception e) { isOk = false; } } else { // 系統連接失敗進行重試 log.warn("系統連接失敗,正在重連..."); try { client.send("ping:"+appId); log.warn("系統重連成功!"); isOk = true; } catch (Exception e) { try { client = new MailWebSocketClient(); conmtainer.connectToServer(client, new URI(mailUrl)); isOk = true; } catch (Exception e1) { isOk = false; } if (isOk != null && isOk) { log.warn("系統重連成功!"); } } } try { Thread.sleep(30000); } catch (InterruptedException e) { log.error(BaseException.collectExceptionStackMsg(e)); e.printStackTrace(); } } } }
這是websocket客戶端的配置類,實現ApplicationRunner 接口是為了在項目啟動時完成一些初始化工作,并非必須。主要功能:1、協助websocketCient進行初始化,2、心跳包檢測,斷線自動重連
消息推送的第二種方式在下篇中再編寫
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/74103.html
摘要:修改記錄版本的通知欄消息功能上并未發生變化,右上角的縮減為了。增加了,允許可穿戴設備遠程控制通知欄消息。鎖屏狀態下,可以控制通知欄消息的隱私程度。但是谷歌規定,自定義布局展示的通知欄消息最大高度是。具體適配不正常的機型有。 此文已由作者黎星授權網易云社區發布。 歡迎訪問網易云社區,了解更多網易技術產品運營經驗。 由于歷史原因,Android在發布之初對通知欄Notification的設...
摘要:在對事實性要求沒有那么高的情況下,可以用基于最大努力交付消息隊列以及消息存儲來解決最終一致性。可靠消息服務和消息組件,協調上下游消息的傳遞,并確保上下游數據的一致性。下游應用通知可靠消息服務該消息已經成功消費。 本文對比 二階段事務、最大努力交付以及消息最終一致性,并給出部分解決方案,最終一致性方案參考阿里RockMQ事務消息:http://blog.csdn.net/chunlong...
摘要:一小小推廣講座本話題已收入視頻講座分布式事務解決方案大家不妨圍觀下開源項目我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。 一 小小推廣 講座 本話題已收入視頻講座《Spring Cloud分布式事務解決方案》大家不妨圍觀下 開源項目 我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。可以參考Github CoolMQ源碼,項目支持網站: http:/...
摘要:在端,盡管開發人員對其功能的需求很高,但出于某些原因,推送通知被引入的時間比較晚。發送推送通知在服務器上實現調用,該調用觸發到用戶設備的推送消息。推送服務推送服務是接收請求驗證請求并將推送消息發送到對應的瀏覽器。 這是專門探索 JavaScript 及其所構建的組件的系列文章的第9篇。 想閱讀更多優質文章請猛戳GitHub博客,一年百來篇優質文章等著你! 如果你錯過了前面的章節,可以在...
閱讀 2857·2023-04-26 02:49
閱讀 3441·2021-11-25 09:43
閱讀 3370·2021-10-09 09:43
閱讀 2985·2021-09-28 09:44
閱讀 2446·2021-09-22 15:29
閱讀 4507·2021-09-14 18:02
閱讀 2773·2021-09-03 10:48
閱讀 3426·2019-08-30 12:47