摘要:背景通過接口實現調用發送數據,接口返回值為發送數據的對應結果。接口為同步阻塞,為異步回調方式。接收數據回調接收到數據后,通過閉鎖釋放阻塞的線程,同時設置結果返回給調用者
背景
通過HTTP接口實現調用MQTT Client發送數據,HTTP接口返回值為MQTT Client發送數據的對應結果。 HTTP接口為同步阻塞,MQTT Client 為異步回調方式。
如何實現在HTTP接口中調用MQTT Client發送數據后,能夠阻塞等待MQTT返回結果,然后將結果返回?
CountDownLatch + Callbale+FutureTask
1.CountDownLatch作用
CountDownLatch實現在MQTT Client 發送數據后 到接收數據后這段時間的阻塞。 HTTP每次請求,新建一個CountDownLatch,然后將CountDownLatch作為值和deviceId作為KEY保存到Map中, 調用MQTT Client 發送數據后,countDownLatch.await(),進行同步等待 在MQTT Client接收數據的回調方法中更加deviceId取出CountDwonLatch然后計數減一
2.Callbale+FutureTask作用
將調用MQTT Client發送數據的過程,封裝成Callable,投遞發送任務時,通過返回的FutureTask的get()方法, 同步阻塞,直到結果返回。關鍵代碼
1.Map保存CountDownObj用于同步阻塞等待MQTT Client返回結果,以及將返回結果傳遞個FutureTask
private final static ConcurrentMapcountDownLatchMap = new ConcurrentHashMap<>(); //線程池 private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), runnable -> { Thread thread = new Thread(runnable, "mqtt thread"); return thread; });
2.HTTP API 調用的發送MQTT 消息數據的接口
/** * HTTP API 調用的發送MQTT 消息數據的接口 * 同步阻塞 */ public Integer send(Long packageId, String deviceId) throws Exception { ...... FutureTaskfutureTask = sendTask(publishDto)); return futureTask.get() }
3.投遞發送MQTT指令的task方法
/** * 投遞MQTT發送指令任務 * 同步阻塞 */ private FutureTasksendTask(PublishDto publishDto) throws Exception { FutureTask futureTask = new FutureTask<>(new GetDatapointValueCallable(publishDto)); threadPoolExecutor.execute(futureTask); //阻塞線程 return futureTask; }
4.封裝CountDownLatch 和 Integer的對象,用于CountDownLatch阻塞控制和返回結果
/** * 封裝CountDownLatch 和 Integer * 用于CountDownLatch阻塞控制和返回結果 */ private class CountDownObj { private final CountDownLatch countDownLatch; private volatile Integer value; private CountDownObj(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } public CountDownLatch getCountDownLatch() { return countDownLatch; } public Integer getValue() { return value; } public void setValue(Integer value) { this.value = value; } }
5.具體發送MQTT數據的Callbale線程Task,會新建CountDownLatch,并通過CountDownLatch.await()方法阻塞,直到MQTT回調接收到數據或者超時。
/** * 發送MQTT消息的任務Callable */ private class GetDatapointValueCallable implements Callable{ private final PublishDto publishDto; GetDatapointValueCallable(PublishDto publishDto) { this.publishDto = publishDto; } @Override public Integer call() throws Exception { //mqtt client 發送數據,此處具體代碼省略 ...... CountDownLatch countDownLatch = new CountDownLatch(1); countDownLatchMap.putIfAbsent(publishDto.getDeviceId(), new CountDownObj(countDownLatch)); //阻塞,超時時間3s countDownLatch.await(3, TimeUnit.SECONDS); //返回mqtt指令對應的結果或者null return countDownLatchMap.remove(publishDto.getDeviceId()).getValue(); } }
6.MQTT接收數據回調,這里通過deviceId從MAP里面取到CountDownObj,釋放閉鎖(結束callable線程的等待)和設置MQTT返回的結果(即callable中call()返回的結果,也就是FutureTask的get()方法返回的結果)。
/** * MQTT 接收數據回調 */ void mqttReceiveCallback(String deviceId, String datapointId, String value) { ...... //接收到數據后,通過閉鎖釋放阻塞的線程,同時設置結果返回給調用者 CountDownObj countDownObj=countDownLatchMap.get(deviceId); if(countDownObj!=null) { countDownObj.setValue(Integer.parseInt(value)); countDownObj.getCountDownLatch().countDown(); } ....... }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68941.html
摘要:線程啟動規則對象的方法先行發生于此線程的每一個動作。所以局部變量是不被多個線程所共享的,也就不會出現并發問題。通過獲取到數據,放入當前線程處理完之后將當前線程中的信息移除。主線程必須在啟動其他線程后立即調用方法。 一、線程安全性 定義:當多個線程訪問某個類時,不管運行時環境采用何種調度方式,或者這些線程將如何交替執行,并且在主調代碼中不需要任何額外的同步或協同,這個類都能表現出正確的行...
摘要:但是單核我們還是要應用多線程,就是為了防止阻塞。多線程可以防止這個問題,多條線程同時運行,哪怕一條線程的代碼執行讀取數據阻塞,也不會影響其它任務的執行。 1、多線程有什么用?一個可能在很多人看來很扯淡的一個問題:我會用多線程就好了,還管它有什么用?在我看來,這個回答更扯淡。所謂知其然知其所以然,會用只是知其然,為什么用才是知其所以然,只有達到知其然知其所以然的程度才可以說是把一個知識點...
摘要:典型地,和被用在等待另一個線程產生的結果的情形測試發現結果還沒有產生后,讓線程阻塞,另一個線程產生了結果后,調用使其恢復。使當前線程放棄當前已經分得的時間,但不使當前線程阻塞,即線程仍處于可執行狀態,隨時可能再次分得時間。 1、說說進程,線程,協程之間的區別 簡而言之,進程是程序運行和資源分配的基本單位,一個程序至少有一個進程,一個進程至少有一個線程.進程在執行過程中擁有獨立的內存單元...
摘要:大多數待遇豐厚的開發職位都要求開發者精通多線程技術并且有豐富的程序開發調試優化經驗,所以線程相關的問題在面試中經常會被提到。掌握了這些技巧,你就可以輕松應對多線程和并發面試了。進入等待通行準許時,所提供的對象。 最近看到網上流傳著,各種面試經驗及面試題,往往都是一大堆技術題目貼上去,而沒有答案。 不管你是新程序員還是老手,你一定在面試中遇到過有關線程的問題。Java語言一個重要的特點就...
摘要:每個工作線程在結束前將門栓計數器減一,門栓的計數變為就表明工作完成。常用方法遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程。使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷或超出了指定的等待時間。 【同步器 java.util.concurrent包包含幾個能幫助人們管理相互合作的線程集的類。這些機制具有為線程直間的共用集結點模式提供的‘預制功能’。如果有一個相互合作的...
閱讀 2048·2019-08-30 15:52
閱讀 2440·2019-08-29 18:37
閱讀 790·2019-08-29 12:33
閱讀 2839·2019-08-29 11:04
閱讀 1522·2019-08-27 10:57
閱讀 2092·2019-08-26 13:38
閱讀 2759·2019-08-26 12:25
閱讀 2445·2019-08-26 12:23