摘要:集群部署安裝依賴可以參閱支持的客戶端版本生產者連接集群,創建,生產數據。鏈接集群創建消費者自動分配,,。消費者指定消費。沒有消費組的概念,也可以認為每個消費者都屬于一個獨立消費組。
Kafka集群部署
安裝rdkafkardkafka 依賴 libkafka
yum install rdkafka rdkafka-devel pecl install rdkafka php --ri rdkafka
http://pecl.php.net/package/r... 可以參閱支持的kafka客戶端版本
生產者連接集群,創建 topic,生產數據。
setLogLevel(LOG_DEBUG); // 鏈接kafka集群 $rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093"); // 創建topic $topic = $rk->newTopic("topic_1"); while (true) { $message = "hello kafka " . date("Y-m-d H:i:s"); echo "hello kafka " . date("Y-m-d H:i:s") . PHP_EOL; try { $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message); sleep(2); } catch (Exception $e) { echo $e->getMessage() . PHP_EOL; } }消費者-HighLevel
自動分配partition,rebalance,comsumer group。
setRebalanceCb(function (RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(null); break; default: throw new Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume // different partitions. $conf->set("group.id", "group_1"); // Initial list of Kafka brokers $conf->set("metadata.broker.list", "192.168.20.6:9092,192.168.20.6:9093"); $topicConf = new RdKafkaTopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // "smallest": start from the beginning $topicConf->set("auto.offset.reset", "smallest"); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafkaKafkaConsumer($conf); // Subscribe to topic "topic_1" $consumer->subscribe(["topic_1"]); echo "Waiting for partition assignment... (make take some time when "; echo "quickly re-joining the group after leaving it.) "; while (true) { $message = $consumer->consume(3e3); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: sleep(2); case RD_KAFKA_RESP_ERR__TIMED_OUT: echo $message->errstr() . PHP_EOL; break; default: throw new Exception($message->errstr(), $message->err); break; } }消費者-LowLevel
指定partition消費。
php consumer_lowlevel.php [partitonNuo]
LowLevel 沒有消費組的概念,也可以認為每個消費者都屬于一個獨立消費組。
set("group.id", "group_2"); $rk = new RdKafkaConsumer($conf); $rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093"); $topicConf = new RdKafkaTopicConf(); $topicConf->set("auto.commit.interval.ms", 2000); // Set the offset store method to "file" // $topicConf->set("offset.store.method", "file"); // $topicConf->set("offset.store.path", sys_get_temp_dir()); // Alternatively, set the offset store method to "broker" $topicConf->set("offset.store.method", "broker"); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // "smallest": start from the beginning $topicConf->set("auto.offset.reset", "smallest"); $topic = $rk->newTopic($topic, $topicConf); // Start consuming partition 0 $topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume($partition, 3 * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: case RD_KAFKA_RESP_ERR__TIMED_OUT: echo $message->errstr() . PHP_EOL; break; default: throw new Exception($message->errstr(), $message->err); break; } }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/31031.html
摘要:消息以為類別記錄將消息種子分類每一類的消息稱之為一個主題。這意味著生產者不等待來自同步完成的確認繼續發送下一條批消息。這意味著在已成功收到的數據并得到確認后發送下一條。三種機制,性能依次遞減吞吐量降低,數據健壯性則依次遞增。 kafka 簡介 Kafka 是一種高吞吐量的分布式發布訂閱消息系統 kafka角色必知 producer:生產者。 consumer:消費者。 topic: 消...
摘要:最近項目開發中需要使用消息隊列。不過在環境中安裝的過程中出現了以下報錯開始以為是因為安裝缺少了一些依賴。然后使用了源碼編譯的方式進行安裝同樣報錯了。然后安裝它再執行,執行。擴展包使用純粹的編寫的客戶端,目前支持以上版本的。 最近項目開發中需要使用 Kafka 消息隊列。經過檢索,PHP下面有通用的兩種方式來調用 Kafka 。 php-rdkafka 擴展 以 PHP 擴展的形式進行...
閱讀 533·2023-04-26 01:39
閱讀 4485·2021-11-16 11:45
閱讀 2610·2021-09-27 13:37
閱讀 882·2021-09-01 10:50
閱讀 3579·2021-08-16 10:50
閱讀 2217·2019-08-30 15:55
閱讀 2979·2019-08-30 15:55
閱讀 2259·2019-08-30 14:07