摘要:安裝模塊基本使用生產者簡單封裝初始化實例連接地址設置獲取當前所有獲取當前生產者對象發送數據需要傳入的可迭代對象連接切換設置新的獲取當前設置的獲取所有要發送的可迭代對象引用來源博客園測試集群知乎使用生成器把寫入效率提高倍
1.1安裝模塊
pip install pykafka1.2基本使用
# -* coding:utf8 *- from pykafka import KafkaClient host = "IP:9092, IP:9092, IP:9092" client = KafkaClient(hosts = host) # 生產者 topicdocu = client.topics["my-topic"] producer = topicdocu.get_producer() for i in range(100): print i producer.produce("test message " + str(i ** 2)) producer.stop()1.3簡單封裝
class KafkaProduct(): def __init__(self,hosts,topic): """ 初始化實例 :param hosts: 連接地址 :param topic: """ self.__client = KafkaClient(hosts=hosts) self.__topic = self.__client.topics[topic.encode()] def __set_topic(self, topic): self.__topic = self.__client.topics[topic.encode()] def set_topic(self, topic): """ 設置topic :param topic: :return: """ self.__set_topic(topic) def get_topics(self): """ 獲取當前所有topic :return: """ return self.__client.topics def get_topic(self): """ 獲取當前topic :return: """ return self.__topic def Producer(self): """ 生產者對象 :return: """ with self.__topic.get_producer(delivery_reports=True) as producer: next_data = "" while True: if next_data: producer.produce(str(next_data).encode()) next_data = yield True def send_data(self,datas): """ 發送數據 :param datas:需要傳入的可迭代對象 :return: """ c = self.Producer() next(c) for i in datas: c.send(i) if __name__ == "__main__": hosts = "1.2.3.4:9999,2.3.4.5:9090" #連接hosts topic = "test_523" K = KafkaProduct(hosts=hosts, topic=topic) # #K.set_topic("test") #切換設置新的topic K.get_topic() #獲取當前設置的topic #K.get_topics() #獲取所有topic data = range(10000) #要發送的可迭代對象 K.send_data(data)1.4引用來源
博客園:Python測試Kafka集群(pykafka)
知乎:使用生成器把Kafka寫入效率提高1000倍
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/43910.html
摘要:大量的和分區會嚴重影響集群性能。介紹可參考收到離線分區總數異常告警一般是某個節點宕機或者服務異常導致。若服務卡住,可在評估后在控制臺重啟該節點服務。若想了解當前請求延時情況,建議關注平均請求延時監控項。 FAQs本篇目錄一個UKafka集群可以創建多少個Topic?如何增加Topic的副本數量(ReplicationFactor)?收到離線分區總數>=10.0個告警,離線分區總數是什么,怎么...
摘要:相關概念協議高級消息隊列協議是一個標準開放的應用層的消息中間件協議。可以用命令與不同,不是線程安全的。手動提交執行相關邏輯提交注意點將寫成單例模式,有助于減少端占用的資源。自身是線程安全的類,只要封裝得當就能最恰當的發揮好的作用。 本文使用的Kafka版本0.11 先思考些問題: 我想分析一下用戶行為(pageviews),以便我能設計出更好的廣告位 我想對用戶的搜索關鍵詞進行統計,...
摘要:主題和分區的悄息通過主題進行分類。在給定的分區里,每個悄息的偏移量都是唯一的。消費者把每個分區最后讀取的悄息偏移量保存在或上,如果悄費者關閉或重啟,它的讀取狀態不會丟失。主題可以配置自己的保留策略,可以將悄息保留到不再使用它們為止。發布與訂閱消息系統 在正式討論Apache Kafka (以下簡稱Kafka)之前,先來了解發布與訂閱消息系統的概念, 并認識這個系統的重要性。數據(消息)的發送...
閱讀 955·2019-08-30 14:24
閱讀 987·2019-08-30 14:13
閱讀 1799·2019-08-29 17:21
閱讀 2660·2019-08-29 13:44
閱讀 1654·2019-08-29 11:04
閱讀 438·2019-08-26 10:44
閱讀 2564·2019-08-23 14:04
閱讀 908·2019-08-23 12:08