浏览器和服务器之间的实时消息传递实现
开启 rabbitmq_web_stomp 扩展
rabbitmq-plugins enable rabbitmq_management rabbitmq_web_stomp rabbitmq_stomp
重启RabbitMQ
brew services restart rabbitmq
Producer代码
<?php
date_default_timezone_set('Asia/Shanghai');
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchangeName = 'topic_exchange';
$exchangeType = 'topic';
$queueName = 'topic_queue';
$routingKey = 'topic.routing_key';
$channel->exchange_declare($exchangeName, $exchangeType, false, true, false);
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, $routingKey);
for ($i = 0; $i < 1000; $i++) {
$data = date('Y-m-d H:i:s');
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, $exchangeName, $routingKey);
sleep(1);
}
$channel->close();
$connection->close();
Consumer代码
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>rabbitmq_stomp</title>
</head>
<body>
服务端时间为: <span id="time"></span>
<script src="js/stomp.js"></script>
<script>
// Stomp.js boilerplate
var ws = new WebSocket('ws://127.0.0.1:15674/ws');
// Init Client
var client = Stomp.over(ws);
// Disable heart-beats
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
// Declare on_connect
var on_connect = function(x) {
client.subscribe("/amq/queue/topic_queue", function(d) {
document.getElementById('time').innerHTML = d.body;
});
};
// Declare on_error
var on_error = function () {
console.log('error');
};
// Connect to RabbitMQ
client.connect('guest', 'guest', on_connect, on_error, '/');
</script>
</body>
</html>
RabbitMQ控台
查看Connection
- Publisher连接RabbitMQ,使用的还是
AMQP 0-9-1
协议 - Consumer连接RabbitMQ,使用的是之前开启STOMP扩展的
Web STOMP 1.1
协议
查看Channel
查看Exchange
有新创建的 topic_exchange