国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

kafka

W4n9Hu1 / 2087人閱讀

摘要:生產者發送消息到指定的下,消息者從這個下消費消息。消費組,用于歸組同類消費者。中的消息序列是有序的消息序列。在使用偏移量來指定消息的位置。

什么是Kafka
Kafka是一個分布式流處理系統,流處理系統使它可以像消息隊列一樣publish或者subscribe消息,分布式提供了容錯性,并發處理消息的機制。

Kafka的基本概念

kafka運行在集群上,集群包含一個或多個服務器。kafka把消息存在topic中,每一條消息包含鍵值(key),值(value)和時間戳(timestamp)。

kafka有以下一些基本概念:

Producer

消息生產者,就是向kafka broker發消息的客戶端。

Consumer

消息消費者,是消息的使用方,負責消費Kafka服務器上的消息。

Topic

主題,由用戶定義并配置在Kafka服務器,用于建立Producer和Consumer之間的訂閱關系。生產者發送消息到指定的Topic下,消息者從這個Topic下消費消息。

Partition

消息分區,一個topic可以分為多個 partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。

Broker

一臺kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。

Consumer Group

消費組,用于歸組同類消費者。每個consumer屬于一個特定的consumer group,多個消費組可以共同消息一個Topic下的消息,每消費組中的消費者消費Topic的部分消息,這些消費者就組成了一個分組。

Offset

消息在partition中的偏移量。每一條消息在partition都有唯一的偏移量,消息者可以指定偏移量來指定要消費的消息。
Kafka分布式架構

如上圖所示,kafka將topic中的消息存在不同的partition中。如果存在鍵值(key),消息按照鍵值(key)做分類存在不同的partiition中,如果不存在鍵值(key),消息按照輪詢(Round Robin)機制存在不同的partition中。默認情況下,鍵值(key)決定了一條消息會被存在哪個partition中。

partition中的消息序列是有序的消息序列。kafka在partition使用偏移量(offset)來指定消息的位置。一個topic的一個partition只能被一個consumer group中的一個consumer消費,多個consumer消費同一個partition中的數據是不允許的,但是一個consumer可以消費多個partition中的數據。

kafka將partition的數據復制到不同的broker,提供了partition數據的備份。每一個partition都有一個broker作為leader,若干個broker作為follower。所有的數據讀寫都通過leader所在的服務器進行,并且leader在不同broker之間復制數據。

上圖中,對于Partition 0,broker 1是它的leader,broker 2和broker 3是follower。對于Partition 1,broker 2是它的leader,broker 1和broker 3是follower。

在上圖中,當有Client(也就是Producer)要寫入數據到Partition 0時,會寫入到leader Broker 1,Broker 1再將數據復制到follower Broker 2和Broker 3。

在上圖中,Client向Partition 1中寫入數據時,會寫入到Broker 2,因為Broker 2是Partition 1的Leader,然后Broker 2再將數據復制到follower Broker 1和Broker 3中。

上圖中的topic一共有3個partition,對每個partition的讀寫都由不同的broker處理,因此總的吞吐量得到了提升。

實驗一:kafka-python實現生產者消費者

kafka-python是一個python的Kafka客戶端,可以用來向kafka的topic發送消息、消費消息。

這個實驗會實現一個producer和一個consumer,producer向kafka發送消息,consumer從topic中消費消息。結構如下圖

producer代碼

#-*- coding: utf-8 -*-

from kafka import KafkaProducer
import time

producer = KafkaProducer(bootstrap_servers="localhost:9092")

i = 1000
while True:
    ts = int(time.time() * 1000)
    producer.send(topic="py_test", value=str(i), key=str(i), timestamp_ms=ts)
    producer.flush()
    print i
    i += 1
    time.sleep(1)

consumer代碼

#-*- coding: utf-8 -*-
from kafka import KafkaConsumer

consumer = KafkaConsumer("py_test", bootstrap_servers=["localhost:9092"])
for message in consumer:
    print message

接下來創建test topic

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

打開兩個窗口中,我們在window1中運行producer,如下:

在window2中運行consumer,如下:

實驗二:消費組實現容錯性機制

這個實驗將展示消費組的容錯性的特點。這個實驗中將創建一個有2個partition的topic,和2個consumer,這2個consumer共同消費同一個topic中的數據。結構如下所示

producer部分代碼和實驗一相同,這里不再重復。consumer需要指定所屬的consumer group,代碼如下

#-*- coding: utf-8 -*-
from kafka import KafkaConsumer

consumer = KafkaConsumer("py_test", group_id="testgt", bootstrap_servers=["localhost:9092"])
for message in consumer:
    print message

接下來我們創建topic,名字test,設置partition數量為2

打開三個窗口,一個窗口運行producer,還有兩個窗口運行consumer。
運行consumer的兩個窗口的輸出如下:

可以看到兩個consumer同時運行的情況下,它們分別消費不同partition中的數據。window1中的consumer消費partition 0中的數據,window2中的consumer消費parition 1中的數據。
我們嘗試關閉window1中的consumer,可以看到如下結果

剛開始window2中的consumer只消費partition1中的數據,當window1中的consumer退出后,window2中的consumer中也開始消費partition 0中的數據了。

實驗三:offset管理

kafka允許consumer將當前消費的消息的offset提交到kafka中,這樣如果consumer因異常退出后,下次啟動仍然可以從上次記錄的offset開始向后繼續消費消息。

這個實驗的結構和實驗一的結構是一樣的,使用一個producer,一個consumer,test topic的partition數量設為1。

producer的代碼和實驗一中的一樣,這里不再重復。consumer的代碼稍作修改,這里consumer中打印出下一個要被消費的消息的offset。consumer代碼如下

#-*- coding: utf-8 -*-
from kafka import KafkaConsumer, TopicPartition

tp = TopicPartition("py_test", 0)
consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="test_g", auto_offset_reset="earliest", enable_auto_commit=False)
consumer.assign([tp])
print "starting offset is", consumer.position(tp)
for message in consumer:
    # pass
    print message.value
auto.offset.reset值含義解釋
earliest 
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 
latest 
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 
none 
topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常

在一個窗口中啟動producer,在另一個窗口并且啟動consumer。consumer的輸出如下


可以嘗試退出consumer,再啟動consumer。每一次重新啟動,consumer都是從offset=98的消息開始消費的。
修改consumer的代碼如下 在consumer消費每一條消息后將offset提交回kafka

#-*- coding: utf-8 -*-
from kafka import KafkaConsumer, TopicPartition

tp = TopicPartition("py_test", 0)
consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="test", auto_offset_reset="earliest", enable_auto_commit=True)
consumer.assign([tp])
print "starting offset is", consumer.position(tp)
for message in consumer:
    print message.offset
    # consumer.commit() 也可以主動提交offset

啟動consumer

可以看到consumer從offset=98的消息開始消費,到offset=829時,我們Ctrl+C退出consumer。

我們再次啟動consumer

可以看到重新啟動后,consumer從上一次記錄的offset開始繼續消費消息。之后每一次consumer重新啟動,consumer都會從上一次停止的地方繼續開始消費。

不同的消費組有不同的offset管理,相互不影響
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from kafka import KafkaConsumer, TopicPartition

tp = TopicPartition("py_test", 0)
consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="test_1", auto_offset_reset="earliest", enable_auto_commit=False)
consumer.assign([tp])
print "starting offset is", consumer.position(tp)
for message in consumer:
    print message.offset
    # consumer.commit()

換一個group_id test_1,會從starting offset is 0開始輸出:

starting offset is 0
0

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/42705.html

相關文章

  • U大使獎勵規則全新發布

    U大使重要風控規則提醒——一、以下行為嚴令禁止,一旦發現虛假推廣行為,將對月結傭金進行凍結扣除并終身終止推廣合作:1. U大使利用發現的活動規則漏洞等增加推薦業績、獲得不合理的服務費用;2. 鏈接劫持、強制捆綁、違反法律法規等的非正當方式推廣方式;3. U大使私自承諾向新用戶返利;4. 與 UCloud 銷售人員、其他U大使、或被推薦用戶相互串通,弄虛作假,騙取服務費用;5. 將UCloud發放的...

    UCloud小助手 評論0 收藏0
  • U大使推廣獎勵規則

    新用戶通過點擊U大使的邀請鏈接注冊UCloud賬戶,并在注冊90日內購買指定范圍內的產品,UCloud將按照新用戶自首日訂單起90日內現金支付金額乘以約定獎勵比例進行現金獎勵。一、推廣資格本活動U大使僅限UCloud已實名的個人用戶,如推廣賬號由個人認證變更為企業認證,未發放的推廣傭金將不再發放;UCloud(前)員工及其家屬、與UCloud有合作關系的銷售工作人員及代理商,不能參加本活動。立即加...

    UCloud小助手 評論0 收藏0
  • 開源組件Flink性能優化之實時計算延遲填坑記

    開源組件Flink性能優化之實時計算延遲填坑記 img{ display:block; margin:0 auto !important; width:100%; } body{ width:75%; ...

    IT那活兒 評論0 收藏1513
  • GoldenGate間斷性休眠的troubleshooting

    GoldenGate間斷性休眠的troubleshooting img{ display:block; margin:0 auto !important; width:100%; } body{ width:75...

    IT那活兒 評論0 收藏1099
  • GreenPlumn數據采集踩坑事記

    GreenPlumn數據采集踩坑事記 img{ display:block; margin:0 auto !important; width:100%; } body{ width:75%; ...

    IT那活兒 評論0 收藏2508
  • 奧卡姆剃刀原則解決flink日志延時問題

    奧卡姆剃刀原則解決flink日志延時問題 img{ display:block; margin:0 auto !important; width:100%; } body{ width:75%; ...

    IT那活兒 評論0 收藏1466

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<