国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

PHP+RabbitMQ實現消息隊列(代碼全篇)

weakish / 3293人閱讀

摘要:前言先安裝對應的這里用的是不同的擴展實現方式會有細微的差異擴展地址具體以官網為準介紹配置信息基類生產者類消費者類消費者可有多個配置交換機路由生產者路由只控制發送成功不接受消費者是否收到頻道

前言

先安裝PHP對應的RabbitMQ,這里用的是 php_amqp 不同的擴展實現方式會有細微的差異.
php擴展地址: http://pecl.php.net/package/amqp
具體以官網為準  http://www.rabbitmq.com/getstarted.html 

介紹

config.php 配置信息
BaseMQ.php MQ基類
ProductMQ.php 生產者類
ConsumerMQ.php 消費者類
Consumer2MQ.php 消費者2(可有多個)

config.php

     [
            "host" => "127.0.0.1",
            "port" => "5672",
            "login" => "guest",
            "password" => "guest",
            "vhost"=>"/",
        ],
        //交換機
        "exchange"=>"word",
        //路由
        "routes" => [],
    ];

BaseMQ.php

    conf     = $conf["host"] ;
            $this->exchange = $conf["exchange"] ;
            $this->AMQPConnection = new AMQPConnection($this->conf);
            if (!$this->AMQPConnection->connect())
                throw new AMQPConnectionException("Cannot connect to the broker!
");
        }
    
        /**
         * close link
         */
        public function close()
        {
            $this->AMQPConnection->disconnect();
        }
    
        /** Channel
         * @return AMQPChannel
         * @throws AMQPConnectionException
         */
        public function channel()
        {
            if(!$this->AMQPChannel) {
                $this->AMQPChannel =  new AMQPChannel($this->AMQPConnection);
            }
            return $this->AMQPChannel;
        }
    
        /** Exchange
         * @return AMQPExchange
         * @throws AMQPConnectionException
         * @throws AMQPExchangeException
         */
        public function exchange()
        {
            if(!$this->AMQPExchange) {
                $this->AMQPExchange = new AMQPExchange($this->channel());
                $this->AMQPExchange->setName($this->exchange);
            }
            return $this->AMQPExchange ;
        }
    
        /** queue
         * @return AMQPQueue
         * @throws AMQPConnectionException
         * @throws AMQPQueueException
         */
        public function queue()
        {
            if(!$this->AMQPQueue) {
                $this->AMQPQueue = new AMQPQueue($this->channel());
            }
            return $this->AMQPQueue ;
        }
    
        /** Envelope
         * @return AMQPEnvelope
         */
        public function envelope()
        {
            if(!$this->AMQPEnvelope) {
                $this->AMQPEnvelope = new AMQPEnvelope();
            }
            return $this->AMQPEnvelope;
        }
    }

ProductMQ.php

    channel();
            //創建交換機對象
            $ex = $this->exchange();
            //消息內容
            $message = "product message ".rand(1,99999);
            //開始事務
            $channel->startTransaction();
            $sendEd = true ;
            foreach ($this->routes as $route) {
                $sendEd = $ex->publish($message, $route) ;
                echo "Send Message:".$sendEd."
";
            }
            if(!$sendEd) {
                $channel->rollbackTransaction();
            }
            $channel->commitTransaction(); //提交事務
            $this->close();
            die ;
        }
    }
    try{
        (new ProductMQ())->run();
    }catch (Exception $exception){
        var_dump($exception->getMessage()) ;
    }

ConsumerMQ.php

    exchange();
            $ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型
            $ex->setFlags(AMQP_DURABLE); //持久化
            //echo "Exchange Status:".$ex->declare()."
";
    
            //創建隊列
            $q = $this->queue();
            //var_dump($q->declare());exit();
            $q->setName($this->q_name);
            $q->setFlags(AMQP_DURABLE); //持久化
            //echo "Message Total:".$q->declareQueue()."
";
    
            //綁定交換機與隊列,并指定路由鍵
            echo "Queue Bind: ".$q->bind($this->exchange, $this->route)."
";
    
            //阻塞模式接收消息
            echo "Message:
";
            while(True){
                $q->consume(function ($envelope,$queue){
                    $msg = $envelope->getBody();
                    echo $msg."
"; //處理消息
                    $queue->ack($envelope->getDeliveryTag()); //手動發送ACK應答
                });
                //$q->consume("processMessage", AMQP_AUTOACK); //自動ACK應答
            }
            $this->close();
        }
    }
    try{
        (new ConsumerMQ)->run();
    }catch (Exception $exception){
        var_dump($exception->getMessage()) ;
    }

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/31023.html

相關文章

  • RabbitMQ使用

    摘要:的定義是使用語言開發的開源消息隊列系統,完整的實現了高級抽象層消息通信協議。交換機接受發送的消息,并根據綁定規則轉發到對應的隊列。默認是無名交換使用空字符串標識。消息隊列是內部對象,用于存儲未被消費的消息。 RabbitMQ的定義 RabbitMQ是使用erlang語言開發的開源消息隊列系統,完整的實現了AMPQ(高級抽象層消息通信協議)。 Mac下RabbitMQ安裝 使用Hom...

    codeKK 評論0 收藏0
  • RabbitMQ發布訂閱實戰-實現延時重試隊列

    摘要:本文將會講解如何使用實現延時重試和失敗消息隊列,實現可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當達到一定的重試次數后,將消息投遞到失敗消息隊列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發的開源消息隊列。本文假設讀者對RabbitMQ是什么已經有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網的 RabbitMQ Tutorials 入門...

    Heier 評論0 收藏0
  • RabbitMQ發布訂閱實戰-實現延時重試隊列

    摘要:本文將會講解如何使用實現延時重試和失敗消息隊列,實現可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當達到一定的重試次數后,將消息投遞到失敗消息隊列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發的開源消息隊列。本文假設讀者對RabbitMQ是什么已經有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網的 RabbitMQ Tutorials 入門...

    vslam 評論0 收藏0
  • 轉: RabbitMQPHP(一)

    摘要:需要特別明確的概念交換機的持久化,并不等于消息的持久化。消息的處理,是有兩種方式,一次性。在上述示例中,使用的,意味著接收全部的消息。注意與是兩個不同的隊列。后端處理,可以針對每一個啟動一個或多個,以提高消息處理的實時性。 RabbitMQ與PHP(一) 項目中使用RabbitMQ作為隊列處理用戶消息通知,消息由前端PHP代碼產生,處理消息使用Python,這就導致代碼一致性問題,調...

    wpw 評論0 收藏0
  • RabbitMQ+PHP 教程一(Hello World)

    摘要:在中間的框是一個隊列的消息緩沖區,保持代表的消費。本教程介紹,這是一個開放的通用的協議消息。我們將在本教程中使用,解決依賴管理。發送者將連接到,發送一條消息,然后退出。注意,這與發送發布的隊列匹配。 介紹 RabbitMQ是一個消息代理器:它接受和轉發消息。你可以把它當作一個郵局:當你把郵件放在信箱里時,你可以肯定郵差先生最終會把郵件送到你的收件人那里。在這個比喻中,RabbitMQ就...

    silencezwm 評論0 收藏0

發表評論

0條評論

weakish

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<