基于RabbitMQ的实时消息推送

浏览器和服务器之间的实时消息传递实现

开启 rabbitmq_web_stomp 扩展

rabbitmq-plugins enable rabbitmq_management rabbitmq_web_stomp rabbitmq_stomp

重启RabbitMQ

brew services restart rabbitmq

Producer代码

<?php

date_default_timezone_set('Asia/Shanghai');

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$exchangeName = 'topic_exchange';
$exchangeType = 'topic';
$queueName = 'topic_queue';
$routingKey = 'topic.routing_key';

$channel->exchange_declare($exchangeName, $exchangeType, false, true, false);
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, $routingKey);

for ($i = 0; $i < 1000; $i++) {
    $data = date('Y-m-d H:i:s');
    $msg = new AMQPMessage($data);
    $channel->basic_publish($msg, $exchangeName, $routingKey);
    sleep(1);
}

$channel->close();
$connection->close();

Consumer代码

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>rabbitmq_stomp</title>
</head>
<body>
服务端时间为: <span id="time"></span>
<script src="js/stomp.js"></script>
<script>
    // Stomp.js boilerplate
    var ws = new WebSocket('ws://127.0.0.1:15674/ws');
    // Init Client
    var client = Stomp.over(ws);
    // Disable heart-beats
    client.heartbeat.outgoing = 0;
    client.heartbeat.incoming = 0;
    // Declare on_connect
    var on_connect = function(x) {
        client.subscribe("/amq/queue/topic_queue", function(d) {
            document.getElementById('time').innerHTML = d.body;
        });
    };
    // Declare on_error
    var on_error = function () {
        console.log('error');
    };
    // Connect to RabbitMQ
    client.connect('guest', 'guest', on_connect, on_error, '/');
</script>
</body>
</html>

RabbitMQ控台

查看Connection

  • Publisher连接RabbitMQ,使用的还是 AMQP 0-9-1 协议
  • Consumer连接RabbitMQ,使用的是之前开启STOMP扩展的 Web STOMP 1.1 协议

查看Channel

查看Exchange

有新创建的 topic_exchange

Exchange详细信息

查看Queue

Queue详细信息

浏览器Consumer控台

参考资料