前端小白也能懂!实时消息小红点学会使用 RabbitMQ 推送消息

一、前言

在Web应用程序中使用实时消息推送可以为用户提供更好的体验。例如,在聊天室或社交媒体上,用户会看到其他用户发送的消息的实时更新,这提高了用户参与感,并为用户提供了方便。实时消息推送技术在许多场景中都是必不可少的,比如在线游戏,实时股票报价等。

实时消息推送常用的技术包括 WebSocket,SSE 和长轮询。除此之外,我们还可以使用消息队列来实现实时通信,其中 RabbitMQ 是一个非常受欢迎的消息队列方案。

在本文中,我们将介绍每种实时消息推送技术的原理、应用场景和实现方式,然后深入了解 RabbitMQ,掌握它的特点、主要组件和应用场景。最后,我们将讨论如何将 RabbitMQ 集成到 Web 应用程序中,以便实现实时消息推送并添加未读消息提示功能。

官方网站:www.rabbitmq.com

二、实时消息推送技术概述

2.1 WebSocket

WebSocket 是一种在单个 TCP 连接上进行全双工通信的网络协议。它使用 HTTP/1.1 协议进行初始握手,并在随后的通信中切换到全双工模式。

WebSocket 的应用场景包括在线聊天室、多玩家在线游戏、实时协作和数据可视化等。

使用 WebSocket 可以轻松实现实时消息推送功能,其实现方式如下:

  1. 客户端使用 JavaScript API new WebSocket(url) 创建一个 WebSocket 对象。
  2. 客户端通过 WebSocket.send(data) 方法向服务器发送数据。
  3. 服务器通过 WebSocket.onmessage 事件接收客户端发送的数据,并通过 WebSocket.send(data) 方法向客户端发送数据。

以下是使用 Node.js 和 WebSocket 实现的一个简单的实时聊天室例子。

首先,我们需要安装 ws 模块,该模块提供了 WebSocket 的实现。可以使用 npm 命令进行安装:

npm install ws

然后,创建一个 WebSocket 服务器,如下所示:

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });





wss.on('connection', (ws) => {

  console.log('client connected');


  // 监听客户端发来的消息
  ws.on('message', (message) => {
    console.log(`received: ${message}`);



    // 将消息广播给所有客户端
    wss.clients.forEach((client) => {
      if (client.readyState === WebSocket.OPEN) {
        client.send(message);
      }
    });
  });
});

这段代码创建了一个新的 WebSocket 服务器,监听 8080 端口。当有新客户端连接时,服务器记录该事件,并在客户端发送消息后将该消息广播给所有连接的客户端。

接下来,我们可以使用以下 HTML 和 JavaScript 代码在客户端创建一个 WebSocket 对象,并使用它发送和接收消息:

<!DOCTYPE html>



<html>



<head>


  <meta charset="UTF-8">


  <title>WebSocket Chat</title>
</head>


<body>


  <input type="text" id="message" placeholder="Type your message and press enter...">
  <ul id="messages"></ul>



  <script>
    const ws = new WebSocket('ws://localhost:8080');

    // 监听连接成功事件
    ws.addEventListener('open', (event) => {
      console.log('connected to server');
    });

    // 监听收到消息事件
    ws.addEventListener('message', (event) => {
      const messages = document.getElementById('messages');
      const li = document.createElement('li');
      li.textContent = event.data;
      messages.appendChild(li);
    });

    // 监听输入框按下回车键事件
    const input = document.getElementById('message');
    input.addEventListener('keypress', (event) => {
      if (event.keyCode === 13) {
        ws.send(input.value);
        input.value = '';
      }
    });
  </script>
</body>
</html>

以上代码创建了一个包含文本输入框和消息列表的 HTML 页面。当用户在文本输入框中输入消息并按下回车键时,网页将发送该消息到 WebSocket 服务器。当服务器接收到消息后,它会将消息广播给所有与之连接的客户端,并显示在每个客户端的消息列表中。

2.2 SSE

SSE(Server-Sent Events)是一种仅允许服务器向客户端发送事件的 Web 技术。与 WebSocket 不同,SSE 是基于 HTTP/1.1 的,它使用了 HTTP 连接的长连接,并且只支持服务器向客户端推送数据。

SSE 的应用场景包括实时股票报价、新闻和天气预报等。

使用 SSE 可以轻松实现实时消息推送功能,其实现方式如下:

  1. 客户端使用 JavaScript API new EventSource(url) 创建一个 SSE 对象。
  2. 服务器向客户端发送数据时,要求使用特定格式的事件流(Event Stream)格式。
  3. 客户端通过 SSE.onmessage 事件接收从服务器发送的数据。

以下是使用 Node.js 和 SSE 实现的一个简单例子:

首先,我们需要安装 express 模块和 sse-express 模块,它们分别提供了 Web 应用程序框架和 SSE 的实现。可以使用 npm 命令进行安装:

npm install express sse-express

然后,创建一个 SSE 服务器,如下所示:

const express = require('express');

const sseExpress = require('sse-express');
const app = express();

app.get('/events', sseExpress(), (req, res) => {
  console.log('client connected');

  // 发送一条消息给客户端
  res.sse(`data: ${new Date().toISOString()}\n\n`);



  // 每 5 秒钟发送一条消息给客户端
  setInterval(() => {
    res.sse(`data: ${new Date().toISOString()}\n\n`);
  }, 5000);
});

app.listen(8080, () => console.log('app listening on port 8080'));

这段代码创建了一个 SSE 服务器,它可以在客户端连接时将当前时间发送到客户端,并每隔 5 秒钟将时间发送到客户端。

接下来,我们可以使用以下 HTML 和 JavaScript 代码在客户端创建一个 SSE 对象,并使用它接收从服务器发送的数据:

<!DOCTYPE html>



<html>



<head>


  <meta charset="UTF-8">


  <title>SSE Example</title>
</head>


<body>


  <ul id="messages"></ul>



  <script>

    const es = new EventSource('/events');

    // 监听收到消息事件
    es.addEventListener('message', (event) => {
      const messages = document.getElementById('messages');
      const li = document.createElement('li');
      li.textContent = event.data;
      messages.appendChild(li);
    });
  </script>
</body>
</html>

以上代码创建了一个包含消息列表的 HTML 页面。当用户打开该页面时,网页将连接到 SSE 服务器,侦听从服务器发送的消息,并在每次接收到新消息时将该消息添加到消息列表中。

2.3 长轮询

长轮询是指客户端向服务器发送一个请求并保持该请求打开状态,直到服务器有数据可返回时才响应。在每次对话结束后,客户端将立即发送一个新的请求,并继续等待服务器的响应。

长轮询的应用场景包括在线游戏、实时协作和数据可视化等。

使用长轮询可以轻松实现实时消息推送功能,其实现方式如下:

  1. 客户端使用 AJAX API 向服务器发送一个请求。
  2. 服务器保留该请求,直到有数据可返回时才响应。
  3. 客户端接收到响应后立即发送一个新的请求,并继续等待服务器的响应。

以下是使用 Node.js 和长轮询实现的一个简单例子:

首先,创建一个 Express 服务器,如下所示:

const express = require('express');

const app = express();




app.use(express.static('public'));

// 返回一个 JSON 对象
app.get('/api/messages', (req, res) => {
  const messages = ['Hello', 'World', 'Goodbye'];
  res.json(messages);
});

app.listen(8080, () => console.log('app listening on port 8080'));

这段代码创建了一个 Express 服务器,其中 /api/messages 端点将返回一个包含三个字符串的 JSON 对象。

接下来,我们可以使用以下 HTML 和 JavaScript 代码在客户端发起长轮询请求,并在每次接收到新数据时更新网页:

<!DOCTYPE html>



<html>



<head>


  <meta charset="UTF-8">


  <title>Long Polling Example</title>
</head>


<body>


  <ul id="messages"></ul>



  <script>

    function poll() {
      // 发起 AJAX 请求
      const xhr = new XMLHttpRequest();
      xhr.open('GET', '/api/messages', true);

      // 监听收到响应事件
      xhr.onreadystatechange = function() {
        if (xhr.readyState === 4 && xhr.status === 200) {
          const messages = JSON.parse(xhr.responseText);
          const ul = document.getElementById('messages');

          // 更新网页
          while (ul.firstChild) {
            ul.removeChild(ul.firstChild);
          }
          for (let i = 0; i < messages.length; i++) {
            const li = document.createElement('li');
            li.textContent = messages[i];
            ul.appendChild(li);
          }

          // 发起下一次请求
          poll();
        }
      };

      xhr.send();
    }

    poll();
  </script>
</body>
</html>

以上代码创建了一个包含消息列表的 HTML 页面。当用户打开该页面时,网页将发起长轮询请求,等待服务器返回包含消息的 JSON 对象。当接收到新数据时,网页将更新消息列表,并发起下一次长轮询请求。

三、RabbitMQ 简介

image.png

3.1 定义与特点

RabbitMQ 是一个开源的 AMQP(Advanced Message Queuing Protocol)消息代理,可以在分布式系统中进行消息传递。RabbitMQ 非常灵活,具有速度快、稳定性好和跨平台等特性。

RabbitMQ 主要特点包括:

  • 灵活:支持多种消息传输协议,如 AMQP、STOMP 和 MQTT 等。
  • 可靠:保证消息传递的可靠性,确保消息不会丢失。
  • 互通性:支持多种编程语言和平台。
  • 可扩展:支持集群和分布式部署。

3.2 主要组件

RabbitMQ 由多个组件组成,其中主要组件包括:

  • Producer(生产者):创建并发送消息的程序。
  • Consumer(消费者):接收并处理消息的程序。
  • Exchange(交换机):接收生产者发送的消息,并将其路由到一个或多个队列中。
  • Queue(队列):用于存储消息的缓冲区。
  • Binding(绑定):定义 Exchange 和 Queue 之间的关系。
  • Virtual Host(虚拟主机):逻辑上分隔不同应用程序的隔离环境。

3.3 应用场景

RabbitMQ 可在以下场景中发挥作用:

  • 消息队列:通过队列存储和转发消息,实现不同应用之间的解耦和异步调用。
  • 负载均衡:通过集群和多机部署来实现负载均衡,提高系统的稳定性和可靠性。
  • 分布式系统:通过 AMQP 协议进行通信,处理并发请求,避免系统出现单点故障。
  • 订阅发布模式:通过交换机将消息广播给所有订阅者,实现实时通知和事件驱动的开发。

四、前端与 RabbitMQ 集成方法

RabbitMQ 是一个消息队列系统,可以用于在应用程序之间传递消息。如果您想将前端与 RabbitMQ 集成,可以考虑以下几种方法:

(1) 通过 WebSocket 实现实时通信:使用 WebSocket 可以在前端和后端之间建立实时通信,从而可以实时接收和发送消息。您可以使用 RabbitMQ 的 WebSocket 接口来连接到消息队列,并使用 WebSocket 协议将消息发送到前端。

(2) 使用 HTTP API:您可以使用 RabbitMQ 的 HTTP API 来发送和接收消息。您可以在前端使用 AJAX 或 Fetch 等技术来发送 HTTP 请求,然后在后端使用 RabbitMQ 的 HTTP API 将消息发送到队列。同样,您可以在后端使用 RabbitMQ 的 HTTP API 来接收消息,然后在前端使用 AJAX 或 Fetch 等技术来接收响应。

(3) 使用 AMQP 协议:AMQP 是 RabbitMQ 使用的协议,它可以在应用程序之间传递消息。您可以在前端使用 AMQP 客户端库来连接到 RabbitMQ,并使用 AMQP 协议将消息发送到队列。同样,您可以在后端使用 AMQP 客户端库来接收消息。

对于前端与 RabbitMQ 集成方法,主要有以下几个方面需要注意:

(1) 使用 WebSocket 连接

由于 RabbitMQ 的消息传递是基于 AMQP 协议进行的,如果要将消息推送给浏览器或移动端应用,需要将其转换成 WebSocket 或其他可用协议。因此,我们需要使用 WebSocket 连接来实现前端与 RabbitMQ 的通信。

(2)前端消息处理

在接收到 RabbitMQ 发送的消息后,前端需要对消息进行处理,例如更新UI界面、提示用户等操作。为了方便操作,可以使用现有的前端框架如 Vue、React 等,并结合 WebSocket 相关库进行开发。

(3)RabbitMQ 消息处理

在 RabbitMQ 中,消息发送者称为生产者,消息接收者称为消费者。在前端集成 RabbitMQ 时,我们需要编写相应的生产者和消费者代码,用于将消息发送到 RabbitMQ 队列中,并从队列中接收消息并进行处理。

RabbitMQ 的基础搭建就不详细说了,自行百度一步一步搞问题不大,这里主要写个简单的示例,让有个大体的印象,具体操作自己工作之余有时间可以慢慢探索。

4.1 RabbitMQ 后端服务

首先,我们需要搭建一个 RabbitMQ 后端服务,并在服务中创建相应的队列。可以使用各种语言的 RabbitMQ 客户端库,在服务端创建生产者和消费者等相关代码。这里我们使用 Node.js 平台和 amqplib 库作为示例,代码如下:

const amqp = require('amqplib');
const queueName = 'my-queue';




(async () => {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  
  await channel.assertQueue(queueName, { durable: false });
  
  // 生产者
  setInterval(() => {
    const message = `Hello, ${new Date()}!`;
    channel.sendToQueue(queueName, Buffer.from(message));
    console.log(`Sent message: ${message}`);
  }, 1000);
  

  // 消费者
  channel.consume(queueName, (message) => {
    console.log(`Received message: ${message.content.toString()}`);
  }, { noAck: true });
})();

在服务中启动生产者和消费者,并每秒向队列中发送一条消息,同时监听队列,并输出接收到的消息。

4.2 WebSocket 服务器

在前端中,我们需要编写一个 WebSocket 服务器,用于与 RabbitMQ 后端服务进行通信。可以使用 Node.js 平台和 ws 库来搭建 WebSocket 服务器,代码如下:

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });





wss.on('connection', (ws) => {

  console.log('Client connected.');


  // 连接 RabbitMQ 服务器
  const amqp = require('amqplib');
  const queueName = 'my-queue';
  
  (async () => {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();
  
    await channel.assertQueue(queueName, { durable: false });
  

    channel.consume(queueName, (message) => {
      ws.send(message.content.toString());
    }, { noAck: true });

    console.log('Connected to RabbitMQ server.');
  })();
});

在客户端连接成功后,建立一个 WebSocket 连接和 RabbitMQ 服务器之间的连接,并监听消息队列,将收到的消息发送给客户端。

4.3 前端 UI 界面

最后,在前端中,我们需要编写相应的 UI 界面,并集成 WebSocket 相关库,用于接收并显示从 RabbitMQ 服务器推送的消息。下面是一个使用 Vue.js 框架的示例,代码如下:

<!DOCTYPE html>



<html>



  <head>
    <meta charset="utf-8">
    <title>RabbitMQ Integration Demo</title>
  </head>
  <body>
    <h1>RabbitMQ Integration Demo</h1>
    <ul>
      <li v-for="message in messages">{{ message }}</li>
    </ul>
  </body>
  <script src="https://cdn.jsdelivr.net/npm/vue"></script>
  <script>
    const host = window.location.host;
    const ws = new WebSocket(`ws://${host}`);

    new Vue({
      el: 'body',
      data: {
        messages: []
      },
      mounted() {
        ws.onmessage = (event) => {
          this.messages.push(event.data);
        }
      }
    });
  </script>
</html>

在 HTML 文件中引入 Vue.js 库,并创建一个 WebSocket 连接,用于接收数据并更新 UI 界面。在数据模型中定义一个 messages 字段,用于存储接收到的消息,并在 mounted 生命周期钩子函数中,监听 WebSocket 的 onmessage 事件,将接收到的消息保存到 messages 数组中。

五、小结一下

在这篇关于 RabbitMQ 实时消息推送的文章中,教大家认识了如何利用这个高效、可靠、可扩展的消息传递系统实现前端实时消息推送。相信有些同学也已经对 RabbitMQ 有了更深入的了解。

但是,在学习过程中我们也不可避免地遇到了一些困难和挑战。比如说,安装和配置 RabbitMQ 可能会让我们头疼不已,而使用发布/订阅模型来实现消息传递也需要一定的技巧和经验。

不过,就像学习任何新技术一样,只要持之以恒、勇往直前,总会有所收获。所以,鼓励大家不要放弃,相信自己能够掌握 RabbitMQ 实时消息推送的技能!

另外,如果您觉得学习 RabbitMQ 的过程中有些痛苦,那么请相信,这种痛苦比起被领导要求加班加点、被客户催着交付项目等等,真的算不了什么。所以,还是让我们心情愉悦、满怀信心地继续前行吧!

总之,学习 RabbitMQ 实时消息推送是一次有趣且富有意义的旅程,相信它会让我们对前端开发有更深刻的认识和体会。

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYVduELd' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片