摘要:它是事件驅動的,我們不斷的發送消息接受消息處理消息。使用消息實現事件通信的概念被稱為消息驅動架構,也被稱為消息驅動架構。許可證服務收到該消息后清除對應的緩存。通過綁定器,使得開發人員不必依賴于特定平臺的庫和來發布和消費消息。
springcloud 總集:https://www.tapme.top/blog/detail/2019-02-28-11-33
代碼見文章結尾
??想想平常生活中做飯的場景,在用電飯鍋做飯的同時,我們可以洗菜、切菜,等待電飯鍋發出飯做好的提示我們回去拔下電飯鍋電源(或者什么也不知讓它處于保溫狀態),反正這個時候我們知道飯做好了,接下來可以炒菜了。從這里可以看出我們在日常生活中與世界的互動并不是同步的、線性的,不是簡單的請求--響應模型。它是事件驅動的,我們不斷的發送消息、接受消息、處理消息。
??同樣在軟件世界中也不全是請求--響應模型,也會需要進行異步的消息通信。使用消息實現事件通信的概念被稱為消息驅動架構(Event Driven Architecture,EDA),也被稱為消息驅動架構(Message Driven Architecture,MDA)。使用這類架構可以構建高度解耦的系統,該系統能夠對變化做出響應,且不需要與特定的庫或者服務緊密耦合。
??在 Spring Cloud 項目中可以使用Spirng Cloud Stream輕而易舉的構建基于消息傳遞的解決方案。
為什么使用消息傳遞??要解答這個問題,讓我們從一個例子開始,之前一直使用的兩個服務:許可證服務和組織服務。每次對許可證服務進行請求,許可證服務都要通過 http 請求到組織服務上查詢組織信息。顯而易見這次額外的 http 請求會花費較長的時間。如果能夠將緩存組織數據的讀操作,將會大幅提高許可證服務的響應時間。但是緩存數據有如下 2 個要求:
緩存的數據需要在許可證服務的所有實例之間保存一致——這意味著不能將數據緩存到服務實例的內存中。
在更新或者刪除一個組織數據時,許可證服務緩存的數據需要失效——避免讀取到過期數據,需要盡早讓過時數據失效并刪除。
??要實現上面的要求,現在有兩種辦法。
使用同步請求--響應模型來實現。組織服務在組織數據變化時調用許可證服務的接口通知組織服務已經變化,或者直接操作許可證服務的緩存。
使用事件驅動。組織服務發出一個異步消息。許可證服務收到該消息后清除對應的緩存。
同步請求-響應方式??許可證服務在 redis 中緩存從組織服務中查詢到的服務信息,當組織數據更新時,組織服務同步 http 請求通知許可證服務數據過期。這種方式有以下幾個問題:
組織服務和許可證服務緊密耦合
這種方式不夠靈活,如果要為組織服務添加新的消費者,必須修改組織服務代碼,以讓其通知新的服務數據變動。
使用消息傳遞方式??同樣的許可證服務在 redis 中緩存從組織服務中查詢到的服務信息,當組織數據更新時,組織服務將更新信息寫入到隊列中。許可證服務監聽消息隊列。使用消息傳遞有一下 4 個好處:
松耦合性:將服務間的依賴,變成了服務對隊列的依賴,依賴關系變弱了。
耐久性:即使服務消費者已經關閉了,也可以繼續往里發送消息,等消費者開啟后處理
可伸縮性: 消息發送者不用等待消息消費者的響應,它們可以繼續做各自的工作
靈活性:消息發送者不用知道誰會消費這個消息,因此在有新的消息消費者時無需修改消息發送代碼
spring cloud 中使用消息傳遞??spring cloud 項目中可以通過 spring cloud stream 框架來輕松集成消息傳遞。該框架最大的特點是抽象了消息傳遞平臺的細節,因此可以在支持的消息隊列中隨意切換(包括 Apache Kafka 和 RabbitMQ)。
spring cloud stream 架構??spring cloud stream 中有 4 個組件涉及到消息發布和消息消費,分別為:
發射器
??當一個服務準備發送消息時,它將使用發射器發布消息。發射器是一個 Spring 注解接口,它接收一個普通 Java 對象,表示要發布的消息。發射器接收消息,然后序列化(默認序列化為 JSON)后發布到通道中。
通道
??通道是對隊列的一個抽象。通道名稱是與目標隊列名稱相關聯的。但是隊列名稱并不會直接公開在代碼中,代碼永遠只會使用通道名。
綁定器
??綁定器是 spring cloud stream 框架的一部分,它是與特定消息平臺對話的 Spring 代碼。通過綁定器,使得開發人員不必依賴于特定平臺的庫和 API 來發布和消費消息。
接收器
??服務通過接收器來從隊列中接收消息,并將消息反序列化。
處理邏輯如下:
實戰??繼續使用之前的項目,在許可證服務中緩存組織數據到 redis 中。
建立 redis 服務??為方便起見,使用 docker 創建 redis,建立腳本如下:
docker run -itd --name redis --net host redis:建立 kafka 服務 在組織服務中編寫消息生產者
??首先在 organization 服務中引入 spring cloud stream 和 kafka 的依賴。
org.springframework.cloud spring-cloud-stream org.springframework.cloud spring-cloud-starter-stream-kafka
??然后在 events 類中編寫SimpleSouce類,用于組織數據修改,產生一條消息到隊列中。代碼如下:
@EnableBinding(Source.class) public class SimpleSource { private Logger logger = LoggerFactory.getLogger(SimpleSource.class); private Source source; @Autowired public SimpleSource(Source source) { this.source = source; } public void publishOrChange(String action, String orgId) { logger.info("在請求:{}中,發送kafka消息:{} for Organization Id:{}", UserContextHolder.getContext().id, action, orgId); OrganizationChange change = new OrganizationChange(action, orgId, UserContextHolder.getContext().id); source.output().send(MessageBuilder.withPayload(change).build()); } }
這里使用的是默認通道,Source 類定義的 output 通道發消息。后面通過 Sink 定義的 input 通道收消息。
??然后在OrganizationController類中定義一個 delete 方法,并注入 SimpleSouce 類,代碼如下:
@Autowired private SimpleSource simpleSource; @DeleteMapping(value = "/organization/{orgId}") public void deleteOne(@PathVariable("orgId") String id) { logger.debug("刪除了組織:{}", id); simpleSource.publishOrChange("delete", id); }
??最后在配置文件中加入消息隊列的配置:
# 省略了其他配置 spring: cloud: stream: bindings: output: destination: orgChangeTopic content-type: application/json kafka: binder: # 替換為部署kafka的ip和端口 zk-nodes: 192.168.226.5:2181 brokers: 192.168.226.5:9092
??現在我們可以測試下訪問localhost:5555/apis/org/organization/12,可以看到控制臺打印消息生成的日志。
在許可證服務中編寫消息消費者??集成 redis 的方法,參看[]()。這里不作說明。
??首先引入依賴,依賴項同上面組織服務。
??然后在 event 包下創建OrgChange的類,代碼如下:
@EnableBinding(Sink.class) //使用Sink接口中定義的通道來監聽傳入消息 public class OrgChange { private Logger logger = LoggerFactory.getLogger(OrgChange.class); @StreamListener(Sink.INPUT) public void loggerSink(OrganizationChange change){ logger.info("收到一個消息,組織id為:{},關聯id為:{}",change.getOrgId(),change.getId()); //刪除失效緩存 RedisUtils.del(RedisKeyUtils.getOrgCacheKey(change.getOrgId())); } } //下面兩個都在util包下 //RedisKeyUtils.java代碼如下 public class RedisKeyUtils { private static final String ORG_CACHE_PREFIX = "orgCache_"; public static String getOrgCacheKey(String orgId){ return ORG_CACHE_PREFIX+orgId; } } //RedisUtils.java代碼如下 @Component @SuppressWarnings("all") public class RedisUtils { public static RedisTemplate redisTemplate; @Autowired public void setRedisTemplate(RedisTemplate redisTemplate) { RedisUtils.redisTemplate = redisTemplate; } public static boolean setObj(String key,Object value){ return setObj(key,value,0); } /** * Description: * * @author fanxb * @date 2019/2/21 15:21 * @param key 鍵 * @param value 值 * @param time 過期時間,單位ms * @return boolean 是否成功 */ public static boolean setObj(String key,Object value,long time){ try{ if(time<=0){ redisTemplate.opsForValue().set(key,value); }else{ redisTemplate.opsForValue().set(key,value,time,TimeUnit.MILLISECONDS); } return true; }catch (Exception e){ e.printStackTrace(); return false; } } public static Object get(String key){ if(key==null){ return null; } try{ Object obj = redisTemplate.opsForValue().get(key); return obj; }catch (Exception e){ e.printStackTrace(); return null; } } public static void del(String... key){ if(key!=null && key.length>0){ redisTemplate.delete(CollectionUtils.arrayToList(key)); } } }
??上面用到的是 Sink.INPUT 通道,這個和之前的 Source.OUTPUT 通道剛好一隊,一個負責收,一個負責發。
??然后修改OrganizationByRibbonService.java文件中的getOrganizationWithRibbon方法:
public Organization getOrganizationWithRibbon(String id) { String key = RedisKeyUtils.getOrgCacheKey(id); //先從redis緩存取數據 Object res = RedisUtils.get(key); if (res == null) { logger.info("當前數據無緩存:{}", id); try{ ResponseEntityresponseEntity = restTemplate.exchange("http://organizationservice/organization/{id}", HttpMethod.GET, null, Organization.class, id); res = responseEntity.getBody(); RedisUtils.setObj(key, res); }catch (Exception e){ e.printStackTrace(); } } else { logger.info("當前數據為緩存數據:{}", id); } return (Organization) res; }
??最后修改配置文件,為 input 通道指定 topic,配置如下:
spring: cloud: stream: bindings: input: destination: orgChangeTopic content-type: application/json # 定義將要消費消息的消費者組的名稱 # 可能多個服務監聽同一個消息隊列。如果定義了消費者組,那么同組中只要有一個消費了消息,剩余的不會再次消費該消息,保證只有消息的 # 一個副本會被該組的某個實例所消費 group: licensingGroup kafka: binder: zk-nodes: 192.168.226.5:2181 brokers: 192.168.226.5:9092
基本和發送的配置相同,只是這里是為input通道映射隊列,然后還定義了一個組名,避免一個消息被重復消費。
??現在來多次訪問localhost:5555/apis/licensingservice/licensingByRibbon/12,可以看到 licensingservice 控制臺打印數據從緩存中讀取,如下所示:
然后再以 delete 訪問localhost:5555/apis/org/organization/12清除緩存,再次訪問 licensingservice 服務,結果如下:
自定義通道??上面用的是Spring Cloud Stream自帶的 input/output 通道,那么要如何自定義通道呢?下面以自定義customInput/customOutput通道為例。
自定義發數據通道public interface CustomOutput { @Output("customOutput") MessageChannel out(); }
??對于每個自定義的發數據通道,需使用@OutPut 注解標記的返回 MessageChannel 類的方法。
自定義收數據通道public interface CustomInput { @Input("customInput") SubscribableChannel in(); }
??同上,對應自定義的收數據通道,需要使用@Input 注解標記的返回 SubscribableChannel 類的方法。
結束??看完本篇你應該已經能夠在 Spring Cloud 中集成 Spring Cloud Stream 消息隊列了,貌似這個也能用到普通的 spring boot 項目中,比直接集成 mq 更加的優雅。
2019,Fighting!
本篇原創發布于:FleyX 的個人博客
本篇所用全部代碼:FleyX 的 github
掃碼關注微信公眾號:FleyX 學習筆記,獲取更多干貨
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77850.html
摘要:授權框架使第三方應用程序來獲取對服務的有限訪問機會。無論是通過編排資源所有者和服務之間的交互批準的資源所有者,或通過允許第三方應用程序來獲取自己的訪問權限。 SpringCloud打造微服務平臺--概覽 簡述 SpringCloud是什么 Spring Boot和SpringCloud是什么關系 Spring Boot是Spring的一套快速WEB開發的腳手架,可建立獨立的Sprin...
摘要:它就是史上最簡單的教程第三篇服務消費者后端掘金上一篇文章,講述了通過去消費服務,這篇文章主要講述通過去消費服務。概覽和架構設計掘金技術征文后端掘金是基于的一整套實現微服務的框架。 Spring Boot 配置文件 – 在坑中實踐 - 后端 - 掘金作者:泥瓦匠鏈接:Spring Boot 配置文件 – 在坑中實踐版權歸作者所有,轉載請注明出處本文提綱一、自動配置二、自定義屬性三、ran...
摘要:調用百度實現圖像識別使用渲染導出的制作的超級炫酷的三維模型一個代碼庫本人本人瀏覽器調試及所有錯誤代碼整合千峰超級好用的各種開發自學文檔這是它對應的學習視頻使用教程詳細虛擬機安裝系統詳解版網易開源鏡像站在線數據互轉使 1.Java調用百度API實現圖像識別 2.使用Three.js渲染Sketchup導出的dae 3.three.js制作的超級炫酷的三維模型 4.three.js - 一...
摘要:調用百度實現圖像識別使用渲染導出的制作的超級炫酷的三維模型一個代碼庫本人本人瀏覽器調試及所有錯誤代碼整合千峰超級好用的各種開發自學文檔這是它對應的學習視頻使用教程詳細虛擬機安裝系統詳解版網易開源鏡像站在線數據互轉使 1.Java調用百度API實現圖像識別 2.使用Three.js渲染Sketchup導出的dae 3.three.js制作的超級炫酷的三維模型 4.three.js - 一...
閱讀 3161·2023-04-25 19:09
閱讀 3873·2021-10-22 09:54
閱讀 1742·2021-09-29 09:35
閱讀 2904·2021-09-08 09:45
閱讀 2231·2021-09-06 15:00
閱讀 2766·2019-08-29 15:32
閱讀 1028·2019-08-28 18:30
閱讀 369·2019-08-26 13:43