国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

python3連接kafka模塊pykafka生產者簡單封裝

fizz / 2246人閱讀

摘要:安裝模塊基本使用生產者簡單封裝初始化實例連接地址設置獲取當前所有獲取當前生產者對象發送數據需要傳入的可迭代對象連接切換設置新的獲取當前設置的獲取所有要發送的可迭代對象引用來源博客園測試集群知乎使用生成器把寫入效率提高倍

1.1安裝模塊
pip install pykafka
1.2基本使用
  # -* coding:utf8 *-  
  from pykafka import KafkaClient  
  host = "IP:9092, IP:9092, IP:9092"
  client = KafkaClient(hosts = host)  
  # 生產者  
  topicdocu = client.topics["my-topic"]  
  producer = topicdocu.get_producer()  
  for i in range(100):  
      print i  
      producer.produce("test message " + str(i ** 2))  
  producer.stop()
1.3簡單封裝
  class KafkaProduct():

      def __init__(self,hosts,topic):
          """
          初始化實例
          :param hosts: 連接地址
          :param topic:
          """
          self.__client = KafkaClient(hosts=hosts)
          self.__topic = self.__client.topics[topic.encode()]
  
      def __set_topic(self, topic):
          self.__topic = self.__client.topics[topic.encode()]
  
      def set_topic(self, topic):
          """
          設置topic
          :param topic:
          :return:
          """
          self.__set_topic(topic)
  
      def get_topics(self):
          """
          獲取當前所有topic
          :return:
          """
          return self.__client.topics
  
      def get_topic(self):
          """
          獲取當前topic
          :return:
          """
          return self.__topic
  
      def Producer(self):
          """
          生產者對象
          :return:
          """
          with self.__topic.get_producer(delivery_reports=True) as producer:
              next_data = ""
              while True:
                  if next_data:
                      producer.produce(str(next_data).encode())
                  next_data = yield True
  
      def send_data(self,datas):
          """
          發送數據
          :param datas:需要傳入的可迭代對象
          :return:
          """
          c = self.Producer()
          next(c)
          for i in datas:
              c.send(i)

if __name__ == "__main__":

    hosts = "1.2.3.4:9999,2.3.4.5:9090" #連接hosts
    topic = "test_523"
    K = KafkaProduct(hosts=hosts, topic=topic)  #
    #K.set_topic("test")  #切換設置新的topic
    K.get_topic()  #獲取當前設置的topic
    #K.get_topics() #獲取所有topic
    data = range(10000)  #要發送的可迭代對象
    K.send_data(data)
1.4引用來源

博客園:Python測試Kafka集群(pykafka)

知乎:使用生成器把Kafka寫入效率提高1000倍

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/43910.html

相關文章

  • FAQs Kafka消息隊列 UKafka

    摘要:大量的和分區會嚴重影響集群性能。介紹可參考收到離線分區總數異常告警一般是某個節點宕機或者服務異常導致。若服務卡住,可在評估后在控制臺重啟該節點服務。若想了解當前請求延時情況,建議關注平均請求延時監控項。 FAQs本篇目錄一個UKafka集群可以創建多少個Topic?如何增加Topic的副本數量(ReplicationFactor)?收到離線分區總數>=10.0個告警,離線分區總數是什么,怎么...

    ernest.wang 評論0 收藏2407
  • Kafka學習筆記之掃盲

    摘要:相關概念協議高級消息隊列協議是一個標準開放的應用層的消息中間件協議。可以用命令與不同,不是線程安全的。手動提交執行相關邏輯提交注意點將寫成單例模式,有助于減少端占用的資源。自身是線程安全的類,只要封裝得當就能最恰當的發揮好的作用。 本文使用的Kafka版本0.11 先思考些問題: 我想分析一下用戶行為(pageviews),以便我能設計出更好的廣告位 我想對用戶的搜索關鍵詞進行統計,...

    GT 評論0 收藏0
  • Kafka】《Kafka權威指南》入門

    摘要:主題和分區的悄息通過主題進行分類。在給定的分區里,每個悄息的偏移量都是唯一的。消費者把每個分區最后讀取的悄息偏移量保存在或上,如果悄費者關閉或重啟,它的讀取狀態不會丟失。主題可以配置自己的保留策略,可以將悄息保留到不再使用它們為止。發布與訂閱消息系統 在正式討論Apache Kafka (以下簡稱Kafka)之前,先來了解發布與訂閱消息系統的概念, 并認識這個系統的重要性。數據(消息)的發送...

    番茄西紅柿 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<