摘要:異步轉同步業務需求有些接口查詢反饋結果是異步返回的,無法立刻獲取查詢結果。正常處理邏輯觸發異步操作,然后傳遞一個唯一標識。等到異步結果返回,根據傳入的唯一標識,匹配此次結果。異步轉同步查詢空循環短暫等待。
異步轉同步 業務需求
有些接口查詢反饋結果是異步返回的,無法立刻獲取查詢結果。
正常處理邏輯
觸發異步操作,然后傳遞一個唯一標識。
等到異步結果返回,根據傳入的唯一標識,匹配此次結果。
如何轉換為同步
正常的應用場景很多,但是有時候不想做數據存儲,只是想簡單獲取調用結果。
即想達到同步操作的結果,怎么辦呢?
思路發起異步操作
在異步結果返回之前,一直等待(可以設置超時)
結果返回之后,異步操作結果統一返回
循環等待LoopQuery.java
使用 query(),將異步的操作 remoteCallback() 執行完成后,同步返回。
public class LoopQuery implements Async { private String result; private static final Logger LOGGER = LogManager.getLogger(LoopQuery.class.getName()); @Override public String query(String key) { startQuery(key); new Thread(new Runnable() { @Override public void run() { remoteCallback(key); } }).start(); final String queryResult = endQuery(); LOGGER.info("查詢結果: {}", queryResult); return queryResult; } /** * 開始查詢 * @param key 查詢條件 */ private void startQuery(final String key) { LOGGER.info("執行查詢: {}", key); } /** * 遠程的回調是等待是隨機的 * * @param key 查詢條件 */ private void remoteCallback(final String key) { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } this.result = key + "-result"; LOGGER.info("remoteCallback set result: {}", result); } /** * 結束查詢 * @return 返回結果 */ private String endQuery() { while (true) { if (null == result) { try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } else { return result; } } } }
main()
public static void main(String[] args) { new LoopQuery().query("12345"); }
測試結果
18:14:16.491 [main] INFO com.github.houbb.thread.learn.aysnc.loop.LoopQuery - 執行查詢: 12345 18:14:21.498 [Thread-1] INFO com.github.houbb.thread.learn.aysnc.loop.LoopQuery - remoteCallback set result: 12345-result 18:14:21.548 [main] INFO com.github.houbb.thread.learn.aysnc.loop.LoopQuery - 查詢結果: 12345-resultCountDownLatch
AsyncQuery.java
使用 CountDownLatch 類達到同步的效果。
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class AsyncQuery { private static final Logger LOGGER = LogManager.getLogger(AsyncQuery.class.getName()); /** * 結果 */ private String result; /** * 異步轉同步查詢 * @param key */ public void asyncQuery(final String key) { final CountDownLatch latch = new CountDownLatch(1); this.startQuery(key); new Thread(new Runnable() { @Override public void run() { LOGGER.info("遠程回調線程開始"); remoteCallback(key, latch); LOGGER.info("遠程回調線程結束"); } }).start(); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } this.endQuery(); } private void startQuery(final String key) { LOGGER.info("執行查詢: {}", key); } /** * 遠程的回調是等待是隨機的 * @param key */ private void remoteCallback(final String key, CountDownLatch latch) { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } this.result = key + "-result"; latch.countDown(); } private void endQuery() { LOGGER.info("查詢結果: {}", result); } }
main()
public static void main(String[] args) { AsyncQuery asyncQuery = new AsyncQuery(); final String key = "123456"; asyncQuery.asyncQuery(key); }
日志
18:19:12.714 [main] INFO com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery - 執行查詢: 123456 18:19:12.716 [Thread-1] INFO com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery - 遠程回調線程開始 18:19:17.720 [main] INFO com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery - 查詢結果: 123456-result 18:19:17.720 [Thread-1] INFO com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery - 遠程回調線程結束Spring EventListener
使用觀察者模式也可以。(對方案一的優化)
此處結合 spring 進行使用。
BookingCreatedEvent.java
定義一個傳輸屬性的對象。
public class BookingCreatedEvent extends ApplicationEvent { private static final long serialVersionUID = -1387078212317348344L; private String info; public BookingCreatedEvent(Object source) { super(source); } public BookingCreatedEvent(Object source, String info) { super(source); this.info = info; } public String getInfo() { return info; } }
BookingService.java
說明:當 this.context.publishEvent(bookingCreatedEvent); 觸發時,
會被 @EventListener 指定的方法監聽到。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import java.util.concurrent.TimeUnit; @Service public class BookingService { @Autowired private ApplicationContext context; private volatile BookingCreatedEvent bookingCreatedEvent; /** * 異步轉同步查詢 * @param info * @return */ public String asyncQuery(final String info) { query(info); new Thread(new Runnable() { @Override public void run() { remoteCallback(info); } }).start(); while(bookingCreatedEvent == null) { //.. 空循環 // 短暫等待。 try { TimeUnit.MILLISECONDS.sleep(1); } catch (InterruptedException e) { //... } //2. 使用兩個多帶帶的 event... } final String result = bookingCreatedEvent.getInfo(); bookingCreatedEvent = null; return result; } @EventListener public void onApplicationEvent(BookingCreatedEvent bookingCreatedEvent) { System.out.println("監聽到遠程的信息: " + bookingCreatedEvent.getInfo()); this.bookingCreatedEvent = bookingCreatedEvent; System.out.println("監聽到遠程消息后: " + this.bookingCreatedEvent.getInfo()); } /** * 執行查詢 * @param info */ public void query(final String info) { System.out.println("開始查詢: " + info); } /** * 遠程回調 * @param info */ public void remoteCallback(final String info) { System.out.println("遠程回調開始: " + info); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } // 重發結果事件 String result = info + "-result"; BookingCreatedEvent bookingCreatedEvent = new BookingCreatedEvent(this, result); //觸發event this.context.publishEvent(bookingCreatedEvent); } }
測試方法
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = SpringConfig.class) public class BookServiceTest { @Autowired private BookingService bookingService; @Test public void asyncQueryTest() { bookingService.asyncQuery("1234"); } }
日志
2018-08-10 18:27:05.958 INFO [main] com.github.houbb.spring.lean.core.ioc.event.BookingService:84 - 開始查詢:1234 2018-08-10 18:27:05.959 INFO [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:93 - 遠程回調開始:1234 接收到信息: 1234-result 2018-08-10 18:27:07.964 INFO [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:73 - 監聽到遠程的信息: 1234-result 2018-08-10 18:27:07.964 INFO [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:75 - 監聽到遠程消息后: 1234-result 2018-08-10 18:27:07.964 INFO [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:106 - 已經觸發event 2018-08-10 18:27:07.964 INFO [main] com.github.houbb.spring.lean.core.ioc.event.BookingService:67 - 查詢結果: 1234-result 2018-08-10 18:27:07.968 INFO [Thread-1] org.springframework.context.support.GenericApplicationContext:993 - Closing org.springframework.context.support.GenericApplicationContext@5cee5251: startup date [Fri Aug 10 18:27:05 CST 2018]; root of context hierarchy超時和空循環 空循環
空循環會導致 cpu 飆升
while(true) { }
解決方式
while(true) { // 小睡即可 TimeUnit.sleep(1); }超時編寫
不可能一直等待反饋,可以設置超時時間。
/** * 循環等待直到獲取結果 * @param key key * @param timeoutInSeconds 超時時間 * @param代碼地址泛型 * @return 結果。如果超時則拋出異常 */ public T loopWaitForValue(final String key, long timeoutInSeconds) { long startTime = System.nanoTime(); long deadline = startTime + TimeUnit.SECONDS.toNanos(timeoutInSeconds); //1. 如果沒有新回調,或者 key 對應元素不存在。則一直循環 while(ObjectUtil.isNull(map.get(key))) { try { TimeUnit.MILLISECONDS.sleep(5); } catch (InterruptedException e) { LOGGER.warn("Loop meet InterruptedException, just ignore it.", e); } // 超時判斷 long currentTime = System.nanoTime(); if(currentTime >= deadline) { throw new BussinessException(ErrorCode.READ_TIME_OUT); } } final T target = (T) map.get(key); LOGGER.debug("loopWaitForValue get value:{} for key:{}", JSON.toJSON(target), key); //2. 獲取到元素之后,需要移除掉對應的值 map.remove(key); return target; }
loop
countdownlatch
spring-event-listener
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/76695.html
摘要:創建線程的方式方式一將類聲明為的子類。將該線程標記為守護線程或用戶線程。其中方法隱含的線程為父線程。恢復線程,已過時。等待該線程銷毀終止。更多的使當前線程在鎖存器倒計數至零之前一直等待,除非線 知識體系圖: showImg(https://segmentfault.com/img/bVbef6v?w=1280&h=960); 1、線程是什么? 線程是進程中獨立運行的子任務。 2、創建線...
摘要:在多線程編程中我們會遇到很多需要使用線程同步機制去解決的并發問題,而這些同步機制就是多線程編程中影響正確性和運行效率的重中之重。這五個方法之所以能指定同步器的行為,則是因為中的其他方法就是通過對這五個方法的調用來實現的。 在多線程編程中我們會遇到很多需要使用線程同步機制去解決的并發問題,而這些同步機制就是多線程編程中影響正確性和運行效率的重中之重。這不禁讓我感到好奇,這些同步機制是如何...
摘要:典型地,和被用在等待另一個線程產生的結果的情形測試發現結果還沒有產生后,讓線程阻塞,另一個線程產生了結果后,調用使其恢復。使當前線程放棄當前已經分得的時間,但不使當前線程阻塞,即線程仍處于可執行狀態,隨時可能再次分得時間。 1、說說進程,線程,協程之間的區別 簡而言之,進程是程序運行和資源分配的基本單位,一個程序至少有一個進程,一個進程至少有一個線程.進程在執行過程中擁有獨立的內存單元...
摘要:在創建對象時,需要轉入一個值,用于初始化的成員變量,該成員變量表示屏障攔截的線程數。當到達屏障的線程數小于時,這些線程都會被阻塞住。當所有線程到達屏障后,將會被更新,表示進入新一輪的運行輪次中。 1.簡介 在分析完AbstractQueuedSynchronizer(以下簡稱 AQS)和ReentrantLock的原理后,本文將分析 java.util.concurrent 包下的兩個...
摘要:但是單核我們還是要應用多線程,就是為了防止阻塞。多線程可以防止這個問題,多條線程同時運行,哪怕一條線程的代碼執行讀取數據阻塞,也不會影響其它任務的執行。 1、多線程有什么用?一個可能在很多人看來很扯淡的一個問題:我會用多線程就好了,還管它有什么用?在我看來,這個回答更扯淡。所謂知其然知其所以然,會用只是知其然,為什么用才是知其所以然,只有達到知其然知其所以然的程度才可以說是把一個知識點...
閱讀 1869·2023-04-26 02:46
閱讀 1999·2021-11-25 09:43
閱讀 1144·2021-09-29 09:35
閱讀 2101·2019-08-30 15:56
閱讀 3423·2019-08-30 15:54
閱讀 2633·2019-08-29 16:35
閱讀 3120·2019-08-29 15:25
閱讀 3291·2019-08-29 14:01