与后端的思路一样,我们需要在前端集成mqtt
的客户端,这里使用mqtt.js
包。通过执行以下命令引入:
css
复制代码
npm i mqtt -s
为了减少客户端实例,我们通过vuex
来统一管理客户端,再通过封装vue
组件的形式,方便不同的页面订阅不同的主题。
具体思路如下:
- 在
vuex
中增加mqtt
模块,state
中维护客户端、以及每个topic
及其接收到消息后的回调方法 - 在
vuex
提供客户端的连接、销毁、订阅、发布功能 - 提供
mqtt-subscribe-client
组件,在mounted
方法中,订阅传入的主题 - 在
app
启动时,调用客户端的连接方法,创建实例 - 在需要使用mqtt的页面,引入组件,初始化时订阅主题
下面开始实现代码。
环境变量
在.env.development
中增加mqtt
服务器的地址信息:
ini
复制代码
VUE_APP_BASE_EMQX_SERVER_HOST='ws://localhost:63084/mqtt'
注意与服务端的地址的区别,这里需要拼接上/mqtt后缀!!
注意.env
中的属性必须以VUE_APP
开头,否则vue程序无法读取到配置。并且增加了配置之后,记得重新install
,热加载也是无法读取到新的配置的。
vuex模块
在/store/modules
目录下创建mqtt.js
文件
javascript
复制代码
import { guid } from '@/utils/guid'
import mqtt from 'mqtt'
const state = {
// 连接服务器的url
url: undefined,
// 客户端实例
client: undefined,
// 订阅主题的集合,key为topic, value为接收到该topic时需要执行的回调
subscribeMembers: {}
}
const clientOptions = {
mqttVersion: 5,
clean: true, // true: 清除会话, false: 保留会话
connectTimeout: 4000, // 超时时间
keepAlive:60,
// 认证信息
clientId: 'pc_mqtt_client_' + guid(),
username: 'xxxx',
password: 'xxxx',
qos: 1
}
const mutations = {
INIT_SERVER_URL(state, url) {
state.url = url
},
INIT(state, client) {
state.client = client
},
DESTROY(state) {
state.client = undefined
state.subscribeMembers = {}
},
SUBSCRIBE(state, { topic, callback }) {
state.subscribeMembers[topic] = callback
},
UNSUBSCRIBE(state, topic) {
state.subscribeMembers[topic] = undefined
}
}
const actions = {
// 创建mqtt连接
connect({ commit }, url) {
let client = mqtt.connect(url, clientOptions)
client.on("connect", onConnect)
client.on("reconnect", onReconnect)
client.on("error", onError)
client.on("message", onMessage)
commit('INIT', client)
commit('INIT_SERVER_URL', url)
},
disconnect({ commit }) {
state.client.end()
commit('DESTROY')
console.log(`服务器已断开连接!`)
},
/**
* 订阅
* @param commit
* @param dispatch
* @param topic 消息主题
* @param subscribeOption 订阅设置
* @param callback 接收消息的回调
*/
subscribe({ commit, dispatch }, {topic, callback, subscribeOption}) {
if (!state.client) {
dispatch('connect')
}
state.client.subscribe(topic,
subscribeOption || {qos: clientOptions.qos},
(error, granted) => {
if (error) {
console.log(`订阅主题: ${topic}失败: `,
error)
} else {
console.log(`订阅主题: ${topic}成功`)
}
})
commit('SUBSCRIBE', { topic, callback })
},
/**
* 取消订阅
* @param commit
* @param topic 消息主题
*/
unsubscribe({ commit }, topic) {
if (!state.client) {
return
}
state.client.unsubscribe(topic,
{},
(error, granted) => {
if (error) {
console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}失败: `,
error)
} else {
console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}成功`)
}
})
commit('UNSUBSCRIBE', topic)
},
publish({ commit, dispatch }, { topic, message }) {
if (!state.client || !state.client.connected) {
dispatch('connect')
}
state.client.publish(topic, message, {qos: clientOptions.qos}, (e) => {
if(e) {
console.log(`客户端: ${clientOptions.clientId}, 发送主题为: ${topic} 的消息, 发送失败: `, e)
}
})
}
}
const onConnect = (e) => {
console.log(`客户端: ${clientOptions.clientId}, 连接emqx服务器成功:`, e)
}
const onReconnect = (error) => {
console.log(`客户端: ${clientOptions.clientId}, 正在重连:`, error)
}
const onError = (error) => {
console.log(`客户端: ${clientOptions.clientId}, 连接失败:`, error)
}
const onMessage = (topic, message) => {
console.log(`客户端: ${clientOptions.clientId}, 接收到来自主题: ${topic} 的消息: `, message?.toString())
let callback = state.subscribeMembers[topic]
callback?.(topic, message?.toString())
}
export default {
namespaced: true,
state,
mutations,
actions
}
clientOptions
中同样填入在管理页面创建的账号,clientId
的逻辑与后端代码不一样,这里使用固定前缀加上guid
的形式,如果是移动设备,可以使用设备的序列号来确保id的唯一性。
使用vuex
时需要注意mutations
和actions
的区别,异步操作确保在actions
中执行,mutations
中只做对于state
中属性的更新。
mutations
和actions
的方法中如果要传入多个参数,需要用对象包裹,否则是接收不到后面的参数的,因为他们的方法中定义的参数只有一个payload
,ts
的好处在这里就体现出来了。
这里着重讲以下onMessage
,在subscribe
方法中,我们传入了topic
及其回调方法,以key-value的形式放入到subscribeMembers
中。在onMessage
方法中,我们根据topic
拿到对应的回调函数之后,再将topic
和消息体作为入参传入回调,从而达到通知组件的目的。
最后,将mqtt
模块注册到vuex
实例上。
javascript
复制代码
import settings from './modules/settings'
import getters from './getters'
import mqtt from './modules/mqtt'
Vue.use(Vuex)
const store = new Vuex.Store({
modules: {
settings,
getters,
mqtt
},
getters
})
export default store
订阅组件封装
xml
复制代码
<template>
<div></div>
</template>
<script>
export default {
name: 'MqttSubscribeClient',
props: {
topic: {
type: String,
required: true
},
global: {
type: Boolean,
default: false
}
},
computed: {
subscribeTopic() {
return this.global ? this.topic : `${this.topic}/${this.$store.getters.name}`
}
},
mounted() {
this.$store.dispatch('mqtt/subscribe', {topic: this.subscribeTopic, callback: this.subscribeCallback})
},
beforeDestroy() {
this.$store.dispatch('mqtt/unsubscribe', this.subscribeTopic)
},
data() {
return {
}
},
methods: {
subscribeCallback(topic, message) {
this.$emit("receive", topic, JSON.parse(message))
}
}
}
</script>
<style scoped>
</style>
组件需要传入topic
和是否为全局消息订阅,在mounted()
方法中,订阅消息,在beforeDestroy()
方法中取消订阅。
创建连接
在App.vue
中,即程序的入口中,统一管理mqtt
连接的创建和销毁。
javascript
复制代码
crated() {
// 创建mqtt连接
this.$store.dispatch('mqtt/connect', process.env.VUE_APP_BASE_EMQX_SERVER_HOST)
},
beforeDestroy() {
this.$store.dispatch('mqtt/disconnect')
}
为什么不把url直接放到mqtt
模块中呢,因为它只是一个工具,而连接的配置属于默认约定,直接放到mqtt
模块中,限制太大。如果在系统中需要与多个服务器连接,那么在外部传入url的话,扩展性更好。
这里如果修改了代码,导致浏览器页面刷新,连接也是会重新建立的,当然这也是可以忍受的。
如果还希望优化,可以在beforeDestroy()
中将连接信息写入localStorage
,而不是直接销毁连接,然后在created()
方法中,先读缓存,如果没有再创建连接。当然也要注意缓存中的客户端的连接状态,如果时间太久,那么很可能服务端长时间收不到心跳,会自动剔除客户端。
使用订阅组件
ini
复制代码
<mqtt-subscribe-client :topic="'your/topic'" @receive="receiveMessage"/>
javascript
复制代码
receiveMessage(topic, message) {
// 处理消息
}
至此,vue
集成mqtt
客户端就完成了。需要注意的地方有:
-
如果使用
websocket
连接,服务端URL需要拼接上/mqtt
-
使用
vuex
统一管理客户端实例,注意在程序退出时,销毁实例 -
使用自定义组件封装订阅行为,方便业务组件调用,注意回调函数的入参