摘要:為了預防消息丟失,提供了,即工作進程在收到消息并處理后,發送給,告知這時候可以把該消息從隊列中刪除了。如果工作進程掛掉了,沒有收到,那么會把該消息重新分發給其他工作進程。之前在發布消息時,的值為即使用。
HelloWorld 簡介
RabbitMQ:接受消息再傳遞消息,可以視為一個“郵局”。發送者和接受者通過隊列來進行交互,隊列的大小可以視為無限的,多個發送者可以發生給一個隊列,多個接收者也可以從一個隊列中接受消息。
coderabbitmq使用的協議是amqp,用于python的推薦客戶端是pika
pip install pika -i https://pypi.douban.com/simple/
send.py
# coding: utf8 import pika # 建立一個連接 connection = pika.BlockingConnection(pika.ConnectionParameters( "localhost")) # 連接本地的RabbitMQ服務器 channel = connection.channel() # 獲得channel
這里鏈接的是本機的,如果想要連接其他機器上的服務器,只要填入地址或主機名即可。
接下來我們開始發送消息了,注意要確保接受消息的隊列是存在的,否則rabbitmq就丟棄掉該消息
channel.queue_declare(queue="hello") # 在RabbitMQ中創建hello這個隊列 channel.basic_publish(exchange="", # 使用默認的exchange來發送消息到隊列 routing_key="hello", # 發送到該隊列 hello 中 body="Hello World!") # 消息內容 connection.close() # 關閉 同時flush
RabbitMQ默認需要1GB的空閑磁盤空間,否則發送會失敗。
這時已在本地隊列hello中存放了一個消息,如果使用 rabbitmqctl list_queues 可看到
hello 1
說明有一個hello隊列 里面存放了一個消息
receive.py
# coding: utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( "localhost")) channel = connection.channel()
還是先鏈接到服務器,和之前發送時相同
channel.queue_declare(queue="hello") # 此處就是聲明了 來確保該隊列 hello 存在 可以多次聲明 這里主要是為了防止接受程序先運行時出錯 def callback(ch, method, properties, body): # 用于接收到消息后的回調 print(" [x] Received %r" % body) channel.basic_consume(callback, queue="hello", # 收指定隊列hello的消息 no_ack=True) #在處理完消息后不發送ack給服務器 channel.start_consuming() # 啟動消息接受 這會進入一個死循環工作隊列(任務隊列)
工作隊列是用于分發耗時任務給多個工作進程的。不立即做那些耗費資源的任務(需要等待這些任務完成),而是安排這些任務之后執行。例如我們把task作為message發送到隊列里,啟動工作進程來接受并最終執行,且可啟動多個工作進程來工作。這適用于web應用,即不應在一個http請求的處理窗口內完成復雜任務。
channel.basic_publish(exchange="", routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # 使得消息持久化 ))
分配消息的方式為 輪詢 即每個工作進程獲得相同的消息數。
消息ack如果消息分配給某個工作進程,但是該工作進程未處理完成就崩潰了,可能該消息就丟失了,因為rabbitmq一旦把一個消息分發給工作進程,它就把該消息刪掉了。
為了預防消息丟失,rabbitmq提供了ack,即工作進程在收到消息并處理后,發送ack給rabbitmq,告知rabbitmq這時候可以把該消息從隊列中刪除了。如果工作進程掛掉 了,rabbitmq沒有收到ack,那么會把該消息 重新分發給其他工作進程。不需要設置timeout,即使該任務需要很長時間也可以處理。
ack默認是開啟的,之前我們的工作進程顯示指定了no_ack=True
channel.basic_consume(callback, queue="hello") # 會啟用ack
帶ack的callback:
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count(".") ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) # 發送ack消息持久化
但是,有時RabbitMQ重啟了,消息也會丟失。可在創建隊列時設置持久化:
(隊列的性質一旦確定無法改變)
channel.queue_declare(queue="task_queue", durable=True)
同時在發送消息時也得設置該消息的持久化屬性:
channel.basic_publish(exchange="",
routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
但是,如果在RabbitMQ剛接收到消息還沒來得及存儲,消息還是會丟失。同時,RabbitMQ也不是在接受到每個消息都進行存盤操作。如果還需要更完善的保證,需要使用publisher confirm。
公平的消息分發輪詢模式的消息分發可能并不公平,例如奇數的消息都是繁重任務的話,某些進程則會一直運行繁 重任務。即使某工作進程上有積壓的消息未處理,如很多都沒發ack,但是RabbitMQ還是會按照順序發消息給它。可以在接受進程中加設置:
channel.basic_qos(prefetch_count=1)
告知RabbitMQ,這樣在一個工作進程沒回發ack情況下是不會再分配消息給它。
群發一般情況下,一條消息是發送給一個工作進程,然后完成,有時想把一條消息同時發送給多個進程:
exchange發送者是不是直接發送消息到隊列中的,事實上發生者根本不知道消息會發送到那個隊列,發送者只能把消息發送到exchange里。exchange一方面收生產者的消息,另一方面把他們推送到隊列中。所以作為exchange,它需要知道當收到消息時它需要做什么,是應該把它加到一個特殊的隊列中還是放到很多的隊列中,或者丟棄。exchange有direct、topic、headers、fanout等種類,而群發使用的即fanout。之前在發布消息時,exchange的值為 "" 即使用default exchange。
channel.exchange_declare(exchange="logs", type="fanout") # 該exchange會把消息發送給所有它知道的隊列中臨時隊列
result = channel.queue_declare() # 創建一個隨機隊列 result = channel.queue_declare(exclusive=True) # 創建一個隨機隊列,同時在沒有接收者連接該隊列后則銷毀它 queue_name = result.method.queue
這樣result.method.queue即是隊列名稱,在發送或接受時即可使用。
綁定exchange 和 隊列channel.queue_bind(exchange="logs", queue="hello")
logs在發送消息時給hello也發一份。
在發送消息是使用剛剛創建的 logs exchangechannel.basic_publish(exchange="logs", routing_key="", body=message)路由
之前已經使用過bind,即建立exchange和queue的關系(該隊列對來自該exchange的消息有興趣),bind時可另外指定routing_key選項。
使用direct exchange將對應routing key的消息發送到綁定相同routing key的隊列中
channel.exchange_declare(exchange="direct_logs", type="direct")
發送函數,發布不同severity的消息:
channel.basic_publish(exchange="direct_logs", routing_key=severity, body=message)
接受函數中綁定對應severity的:
channel.queue_bind(exchange="direct_logs", queue=queue_name, routing_key=severity)使用topic exchange
之前使用的direct exchange 只能綁定一個routing key,可以使用這種可以拿.隔開routing key的topic exchange,例如:
"stock.usd.nyse" "nyse.vmw"
和direct exchange一樣,在接受者那邊綁定的key與發送時指定的routing key相同即可,另外有些特殊的值:
* 代表1個單詞 # 代表0個或多個單詞
如果發送者發出的routing key都是3個部分的,如:celerity.colour.species。
Q1: *.orange.* 對應的是中間的colour都為orange的 Q2: *.*.rabbit 對應的是最后部分的species為rabbit的 lazy.# 對應的是第一部分是lazy的
qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,對于lazy.pink.rabbit雖然匹配到了Q2兩次,但是只會發送一次。如果綁定時直接綁定#,則會收到所有的。
RPC在遠程機器上運行一個函數然后獲得結果。
1、客戶端啟動 同時設置一個臨時隊列用于接受回調,綁定該隊列
self.connection = pika.BlockingConnection(pika.ConnectionParameters( host="localhost")) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)
2、客戶端發送rpc請求,同時附帶reply_to對應回調隊列,correlation_id設置為每個請求的唯一id(雖然說可以為每一次RPC請求都創建一個回調隊列,但是這樣效率不高,如果一個客戶端只使用一個隊列,則需要使用correlation_id來匹配是哪個請求),之后阻塞在回調隊列直到收到回復
注意:如果收到了非法的correlation_id直接丟棄即可,因為有這種情況--服務器已經發了響應但是還沒發ack就掛了,等一會服務器重啟了又會重新處理該任務,又發了一遍相應,但是這時那個請求已經被處理掉了
channel.basic_publish(exchange="", routing_key="rpc_queue", properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) # 發出調用 while self.response is None: # 這邊就相當于阻塞了 self.connection.process_data_events() # 查看回調隊列 return int(self.response)
3、請求會發送到rpc_queue隊列
4、RPC服務器從rpc_queue中取出,執行,發送回復
channel.basic_consume(on_request, queue="rpc_queue") # 綁定 等待請求 # 處理之后: ch.basic_publish(exchange="", routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response)) # 發送回復到回調隊列 ch.basic_ack(delivery_tag = method.delivery_tag) # 發送ack
5、客戶端從回調隊列中取出數據,檢查correlation_id,執行相應操作
if self.corr_id == props.correlation_id: self.response = body
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/38486.html
摘要:它還為具有偵聽器容器的消息驅動的提供支持。接收消息當存在基礎結構時,可以使用任何來注釋以創建偵聽器端點。默認情況下,如果禁用重試并且偵聽器拋出異常,則會無限期地重試傳遞。 Spring-amqp-tutorial Spring AMQP項目將核心Spring概念應用于基于AMQP的消息傳遞解決方案的開發。它提供了一個模板作為發送和接收消息的高級抽象。它還為具有偵聽器容器的消息驅動的PO...
摘要:前提好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲抱歉了。熟悉我的人都知道我寫博客的時間比較早,而且堅持的時間也比較久,一直到現在也是一直保持著更新狀態。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲:抱歉了!。自己這段時...
摘要:它通過使用來連接消息代理中間件以實現消息事件驅動的微服務應用。該示例主要目標是構建一個基于的微服務應用,這個微服務應用將通過使用消息中間件來接收消息并將消息打印到日志中。下面我們通過編寫生產消息的單元測試用例來完善我們的入門內容。 之前在寫Spring Boot基礎教程的時候寫過一篇《Spring Boot中使用RabbitMQ》。在該文中,我們通過簡單的配置和注解就能實現向Rabbi...
摘要:就是交換機生產者發送消息給交換機,然后由交換機將消息轉發給隊列。對應于中則是發送一個,處理完成之后將其返回給。這樣來說一個是級別而不是級別的了。當然這些也都是官網的入門例子,后續有機會的話再深入研究。 一、前言 RabbitMQ其實是我最早接觸的一個MQ框架,我記得當時是在大學的時候跑到圖書館一個人去看,由于RabbitMQ官網的英文還不算太難,因此也是參考官網學習的,一共有6章,當時...
閱讀 3250·2023-04-25 22:47
閱讀 3765·2021-10-11 10:59
閱讀 2300·2021-09-07 10:12
閱讀 4243·2021-08-11 11:15
閱讀 3432·2019-08-30 13:15
閱讀 1750·2019-08-30 13:00
閱讀 968·2019-08-29 14:02
閱讀 1680·2019-08-26 13:57