摘要:本文主要實現的目標是連接并且成功發送消息給。發送消息網上找了一圈,終于找到一個可以用的也可以用代碼如下發送消息到不同的參考文章最后附一張截圖
本文主要實現的目標是php連接kafka并且成功發送消息給kafka。為了驗證這個連接和發送,另外配置了logstash監聽kafka相對應的消息,然后轉發到redis,原來我不知道對kafka比較陌生,不知道怎么看里面的消息內容(我知道安裝包里有個consumer和producer的腳本) ^ _ ^
消息發送路徑:php->kafka->logstash->redis
1.安裝kafka
下載地址:https://kafka.apache.org/
下載解壓后進入根目錄,
bin/zookeeper-server-start.sh config/zookeeper.properties & 開啟zookeeper bin/kafka-server-start.sh config/server.properties & 開啟kafka
另開一個終端然后
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka
這樣就創建了一個topic為kafka的消息通道
如果這個步驟成功的話,可以通過另開終端發送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka
執行之后就可以輸入消息發送了。
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka --from-beginning
來接受上面終端發送的消息
2.安裝php zookeeper擴展,并且用php發送消息
安裝擴展之前需要安裝zookeeper的c client,擴展有依賴,步驟如下
1.通過apt-get來安裝(樓主用的是ubuntu)
2.通過源碼安裝,地址:https://github.com/apache/zoo...
下載下來
cd zookeeper ./configure make && sudo make install
按照如上步驟,/usr/local/bin目錄下就會多一個cli_mt
php的zookeeper的源碼可以去pecl.php.net下載,然后老步驟
phpize ./configure --with-libzookeeper=/usr/local/bin/cli_mt (如果你安裝擴展的php不是默認的php,則需要帶上--with-php-config參數) make && sudo make install
最后別忘了添加extension=zookeeper.so到php.ini
3.配置logstash
下載地址:https://www.elastic.co/downlo...
修改配置文件,由于樓主的logstash版本已經是5.2的了,所以又是一陣谷歌,才發現很多網上的配置都是1.2版本的,已經不兼容了。
input{ kafka{ bootstrap_servers=>"localhost:9092" topics=>["kafka"] } } output{ redis{ host=>"127.0.0.1" port=>6379 key=>"kafka" data_type=>"list" password=>"123456" } }
4.php發送消息
網上找了一圈,終于找到一個可以用的
https://github.com/nmred/kafk...
也可以用
composer require "nmred/kafka-php"
php代碼如下:
require "./vendor/autoload.php"; $produce = KafkaProduce::getInstance("10.37.129.2:2181", 3000); $produce->setRequireAck(-1); $topicName = "kafka"; $partitions = $produce->getAvailablePartitions($topicName); $partCount = count($partitions); var_dump("$partCount:".$partCount); $count = 0; $message = json_encode(array("uid" => $count, "age" => $count%100, "datetime" => date("Y-m-d H:i:s"))); //發送消息到不同的partition $partitionId = $count%$partCount; $produce->setMessages($topicName, $partitionId, array($message)); $result = $produce->send(); var_dump($result);
參考文章:http://blog.kazaff.me/2015/06...
最后附一張截圖
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/22525.html
摘要:現在用方式調用接口,中使用方式輸入內容日志平臺網關層基于。日志平臺網關層基于到此為止,提取經過網關的接口信息,并將其寫入日志文件就完成了,所有的接口日志都寫入了文件中。 背景介紹 1、問題現狀與嘗試 沒有做日志記錄的線上系統,絕對是給系統運維人員留下的坑。尤其是前后端分離的項目,后端的接口日志可以解決對接、測試和運維時的很多問題。之前項目上發布的接口都是通過Oracle Service...
摘要:是一個日志收集器,支持非常多的輸入源和輸出源。這個庫支持展開文件路徑,而且會記錄一個叫的數據庫文件來跟蹤被監聽的日志文件的當前讀取位置。 1.Zookeeper 對于Zookeeper我們用一條簡單的命令來測試一下: echo ruok|nc localhost 2181 你應該可以看到: imok 2.Kafka Kafka 是由 Linked 開發并且開源的一套分布式的流平臺,它類...
閱讀 1858·2021-09-22 15:45
閱讀 1639·2019-08-30 15:55
閱讀 1829·2019-08-29 11:16
閱讀 3302·2019-08-26 11:44
閱讀 702·2019-08-23 17:58
閱讀 2698·2019-08-23 12:25
閱讀 1624·2019-08-22 17:15
閱讀 3597·2019-08-22 16:09