国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Kafka - PHP 使用 Rdkafka 生產/消費數據

BetaRabbit / 3636人閱讀

摘要:集群部署安裝依賴可以參閱支持的客戶端版本生產者連接集群,創建,生產數據。鏈接集群創建消費者自動分配,,。消費者指定消費。沒有消費組的概念,也可以認為每個消費者都屬于一個獨立消費組。

Kafka集群部署

安裝rdkafka

rdkafka 依賴 libkafka

yum install rdkafka rdkafka-devel
pecl install rdkafka
php --ri rdkafka

http://pecl.php.net/package/r... 可以參閱支持的kafka客戶端版本

生產者

連接集群,創建 topic,生產數據。

setLogLevel(LOG_DEBUG);

// 鏈接kafka集群
$rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093");

// 創建topic
$topic = $rk->newTopic("topic_1");

while (true) {
    $message = "hello kafka " . date("Y-m-d H:i:s");
    echo "hello kafka " . date("Y-m-d H:i:s") . PHP_EOL;

    try {
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);
        sleep(2);
    } catch (Exception $e) {
        echo $e->getMessage() . PHP_EOL;
    }
}
消費者-HighLevel

自動分配partitionrebalancecomsumer group

setRebalanceCb(function (RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;
        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            echo "Revoke: ";
            var_dump($partitions);
            $kafka->assign(null);
            break;
        default:
            throw new Exception($err);
    }
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set("group.id", "group_1");

// Initial list of Kafka brokers
$conf->set("metadata.broker.list", "192.168.20.6:9092,192.168.20.6:9093");

$topicConf = new RdKafkaTopicConf();

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// "smallest": start from the beginning
$topicConf->set("auto.offset.reset", "smallest");

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafkaKafkaConsumer($conf);

// Subscribe to topic "topic_1"
$consumer->subscribe(["topic_1"]);

echo "Waiting for partition assignment... (make take some time when
";
echo "quickly re-joining the group after leaving it.)
";

while (true) {
    $message = $consumer->consume(3e3);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            sleep(2);
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo $message->errstr() . PHP_EOL;
            break;
        default:
            throw new Exception($message->errstr(), $message->err);
            break;
    }
}
消費者-LowLevel

指定partition消費。
php consumer_lowlevel.php [partitonNuo]
LowLevel 沒有消費組的概念,也可以認為每個消費者都屬于一個獨立消費組。

set("group.id", "group_2");

$rk = new RdKafkaConsumer($conf);
$rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093");

$topicConf = new RdKafkaTopicConf();
$topicConf->set("auto.commit.interval.ms", 2000);

// Set the offset store method to "file"
// $topicConf->set("offset.store.method", "file");
// $topicConf->set("offset.store.path", sys_get_temp_dir());

// Alternatively, set the offset store method to "broker"
$topicConf->set("offset.store.method", "broker");

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// "smallest": start from the beginning
$topicConf->set("auto.offset.reset", "smallest");

$topic = $rk->newTopic($topic, $topicConf);

// Start consuming partition 0
$topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);

while (true) {
    $message = $topic->consume($partition, 3 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo $message->errstr() . PHP_EOL;
            break;
        default:
            throw new Exception($message->errstr(), $message->err);
            break;
    }
}

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/31031.html

相關文章

  • PHPkafka的實踐

    摘要:消息以為類別記錄將消息種子分類每一類的消息稱之為一個主題。這意味著生產者不等待來自同步完成的確認繼續發送下一條批消息。這意味著在已成功收到的數據并得到確認后發送下一條。三種機制,性能依次遞減吞吐量降低,數據健壯性則依次遞增。 kafka 簡介 Kafka 是一種高吞吐量的分布式發布訂閱消息系統 kafka角色必知 producer:生產者。 consumer:消費者。 topic: 消...

    Codeing_ls 評論0 收藏0
  • PHP 使用 Kafka 安裝拾遺

    摘要:最近項目開發中需要使用消息隊列。不過在環境中安裝的過程中出現了以下報錯開始以為是因為安裝缺少了一些依賴。然后使用了源碼編譯的方式進行安裝同樣報錯了。然后安裝它再執行,執行。擴展包使用純粹的編寫的客戶端,目前支持以上版本的。 最近項目開發中需要使用 Kafka 消息隊列。經過檢索,PHP下面有通用的兩種方式來調用 Kafka 。 php-rdkafka 擴展 以 PHP 擴展的形式進行...

    SimonMa 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<