摘要:關閉套接字和上下文備注說明如何利用使用首先下載所需的包,解壓以后將和文件放到自己電腦中的安裝路徑中的文件夾下,最后需要將之前解壓后的包放在項目的中或者資源下載鏈接密碼項目源碼下載鏈接鏈接密碼
在講ZeroMQ前先給大家講一下什么是消息隊列。
消息隊列簡介:消息隊列中間件是分布式系統中重要的組件,主要解決應用耦合,異步消息,流量削鋒等問題。實現高性能,高可用,可伸縮和最終一致性架構。是大型分布式系統不可缺少的中間件。目前在生產環境,使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。其實簡單點說,消息隊列就是如何使各分載器如何實現負載均衡使得完成分布式目標。
ZeroMQ簡介:ZeroMQ是一種基于消息隊列的多線程網絡庫,其對套接字類型、連接處理、幀、甚至路由的底層細節進行抽象,提供跨越多種傳輸協議的套接字。ZeroMQ是網絡通信中新的一層,介于應用層和傳輸層之間(按照TCP/IP劃分),其是一個可伸縮層,可并行運行,分散在分布式系統間。ZeroMQ幾乎所有的I/O操作都是異步的,主線程不會被阻塞。ZeroMQ會根據用戶調用zmq_init函數時傳入的接口參數,創建對應數量的I/O Thread。每個I/O Thread都有與之綁定的Poller,Poller采用經典的Reactor模式實現,Poller根據不同操作系統平臺使用不同的網絡I/O模型(select、poll、epoll、devpoll、kequeue等)。主線程與I/O線程通過Mail Box傳遞消息來進行通信。Server開始監聽或者Client發起連接時,在主線程中創建zmq_connecter或zmq_listener,通過Mail Box發消息的形式將其綁定到I/O線程,I/O線程會把zmq_connecter或zmq_listener添加到Poller中用以偵聽讀/寫事件。Server與Client在第一次通信時,會創建zmq_init來發送identity,用以進行認證。認證結束后,雙方會為此次連接創建Session,以后雙方就通過Session進行通信。每個Session都會關聯到相應的讀/寫管道, 主線程收發消息只是分別從管道中讀/寫數據。Session并不實際跟kernel交換I/O數據,而是通過plugin到Session中的Engine來與kernel交換I/O數據。
ZeroMQ三種模型講解及實例【1】Request-Response
由請求端發起請求,然后等待回應端應答。一個請求必須對應一個回應,從請求端的角度來看是發-收配對,從回應端的角度是收-發對。跟一對一結對模型的區別在于請求端可以是1~N個。該模型主要用于遠程調用及任務分配等。Echo服務就是這種經典模型的應用。
下面通過Java實現這一模型:
server port
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class Server {
public static void main(String[] args) throws InterruptedException { //實現服務器端的上下文及套接字 Context context = ZMQ.context(1); Socket responder = context.socket(ZMQ.REP); //使服務器端通過tcp協議通信,監聽5555端口 responder.bind("tcp://*:5555"); while (!Thread.currentThread().isInterrupted()) { byte[] request = responder.recv(0); System.out.println("Received Hello"); Thread.sleep(1000); String reply = "World"; responder.send(reply.getBytes(), 0); } //關閉服務器端的上下文及套接字 responder.close(); context.close(); }
}
client port
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class Client {
public static void main(String[] args) { //創立客戶端的上下文捷套接字 Context context = ZMQ.context(1); System.out.println("Connecting to hello world server…"); Socket requester = context.socket(ZMQ.REQ); //講客戶端綁定在5555端口 requester.connect("tcp://localhost:5555"); for (int requestNbr = 0; requestNbr != 100; requestNbr++) { String request = "Hello"; System.out.println("Sending Hello " + requestNbr); requester.send(request.getBytes(), 0); byte[] reply = requester.recv(0); System.out.println("Received " + new String(reply) + " " + requestNbr); } //關閉客戶端的上下文套接字 requester.close(); context.term(); }
}
【2】Publisher/Subscriber model
發布端單向分發數據,且不關心是否把全部信息發送給訂閱端。如果發布端開始發布信息時,訂閱端尚未連接上來,則這些信息會被直接丟棄。訂閱端未連接導致信息丟失的問題,可以通過與請求回應模型組合來解決。訂閱端只負責接收,而不能反饋,且在訂閱端消費速度慢于發布端的情況下,會在訂閱端堆積數據。該模型主要用于數據分發。天氣預報、微博明星粉絲可以應用這種經典模型。
Server Port
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class ZMQ_PUB {
public static void main(String[] args) throws InterruptedException { Context context = ZMQ.context(1); Socket publisher = context.socket(ZMQ.PUB); publisher.bind("tcp://*:5555"); Thread.sleep(3000); for(int i=0;i<100;i++){ publisher.send(("admin " + i).getBytes(), ZMQ.NOBLOCK); System.out.println("pub msg " + i); Thread.sleep(1000); } context.close(); publisher.close(); }
}
Client Port
import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class ZMQ_SUB { public static void main(String[] args) { Context context = ZMQ.context(1); Socket subscriber = context.socket(ZMQ.SUB); subscriber.connect("tcp://localhost:5555"); subscriber.subscribe("".getBytes()); for (int i=0;i<100;i++) { //Receive a message. String string = new String(subscriber.recv(0)); System.out.println("recv 1" + string); } //關閉套接字和上下文 subscriber.close(); context.term(); } }
【3】push/pull
push port import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class Push { public static void main(String[] args) { Context context = ZMQ.context(1); Socket push = context.socket(ZMQ.PUSH); push.bind("ipc://fjs"); for (int i = 0; i < 10000000; i++) { push.send("hello".getBytes(), i); } push.close(); context.term(); } }
pull port
import java.util.concurrent.atomic.AtomicInteger;
import org.zeromq.ZMQ;
public class Pull {
public static void main(String args[]) { final AtomicInteger number = new AtomicInteger(0); for (int i = 0; i < 5; i++) { new Thread(new Runnable(){ private int here = 0; public void run() { // TODO Auto-generated method stub ZMQ.Context context = ZMQ.context(1); ZMQ.Socket pull = context.socket(ZMQ.PULL); pull.connect("ipc://fjs"); //pull.connect("ipc://fjs"); while (true) { String message = new String(pull.recv()); int now = number.incrementAndGet(); here++; if (now % 1000000 == 0) { System.out.println(now + " here is : " + here); } } } }).start(); } }
}
備注說明:
【1】如何利用Java使用ZeroMQ
首先下載zmq所需的zip包,解壓以后將libzmq.dll和jzmq.dll文件放到自己電腦中的jdk安裝路徑中的bin文件夾下,最后需要將之前解壓后的zmq.jar包放在項目的lib中或者
zeromq資源下載:
鏈接:http://pan.baidu.com/s/1miuvSfQ 密碼:ttss
項目源碼下載鏈接:
鏈接:http://pan.baidu.com/s/1dE5Plr7 密碼:vqze
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68009.html
摘要:用于控制活動人數,將超過此一定閥值的訂單直接丟棄。緩解短時間的高流量壓垮應用。目前比較推薦的就是我們手動然后將消費錯誤的消息轉移到其它的消息隊列中,做補償處理消費者該方案是默認的方式不太推薦。 SpringBoot 是為了簡化 Spring 應用的創建、運行、調試、部署等一系列問題而誕生的產物,自動裝配的特性讓我們可以更好的關注業務本身而不是外部的XML配置,我們只需遵循規范,引入相...
摘要:設備改造上傳結果數據的技術實現一項目需求及分析按照領導的要求,要改造一臺儀器,添加點功能,將測量數據上傳到服務器。所以選擇用提交,的通信可以多線程調度。考慮到新增的上傳功能不能影響之前的測量節拍,所以要多線程實現。 **設備改造——上傳結果數據的技術實現 一、項目需求及分析 按照領導的要求,要改造一臺儀器,添加點功能,將測量數據上傳到服務器。儀器測量節拍大概是20s,數據量目前不大,...
摘要:的明確目標是成為標準網絡協議棧的一部分,之后進入內核。實現端測試消息已發送端正在轉發端輸出結果已發送已發送已發送正在轉發正在轉發正在轉發測試消息測試消息測試消息 簡介 ZMQ (以下 ZeroMQ 簡稱 ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個 socket library,他使得 Socket 編程更加簡單、簡潔和性能更高。是一個消息處理隊列庫,可在多個線程、內核和主機盒之間...
摘要:我推薦你使用進行日志收集,將作為的出口。集群目前暫時沒有提供日志查看機制。以如下的形式啟動容器,容器日志將發往配置的。 【作者barnett】本文介紹了k8s官方提供的日志收集方法,并介紹了Fluentd日志收集器并與其他產品做了比較。最后介紹了好雨云幫如何對k8s進行改造并使用ZeroMQ以消息的形式將日志傳輸到統一的日志處理中心。 容器日志存在形式 目前容器日志有兩種輸出形式: ...
閱讀 1760·2023-04-26 00:20
閱讀 1804·2021-11-08 13:21
閱讀 1930·2021-09-10 10:51
閱讀 1557·2021-09-10 10:50
閱讀 3249·2019-08-30 15:54
閱讀 2131·2019-08-30 14:22
閱讀 1429·2019-08-29 16:10
閱讀 3089·2019-08-26 11:50