Vue集成mqtt

与后端的思路一样,我们需要在前端集成mqtt的客户端,这里使用mqtt.js包。通过执行以下命令引入:

css
复制代码







 npm i mqtt -s

为了减少客户端实例,我们通过vuex来统一管理客户端,再通过封装vue组件的形式,方便不同的页面订阅不同的主题。

具体思路如下:

  • vuex中增加mqtt模块,state中维护客户端、以及每个topic及其接收到消息后的回调方法
  • vuex提供客户端的连接、销毁、订阅、发布功能
  • 提供mqtt-subscribe-client组件,在mounted方法中,订阅传入的主题
  • app启动时,调用客户端的连接方法,创建实例
  • 在需要使用mqtt的页面,引入组件,初始化时订阅主题

下面开始实现代码。

环境变量

.env.development中增加mqtt服务器的地址信息:

ini

复制代码







 VUE_APP_BASE_EMQX_SERVER_HOST='ws://localhost:63084/mqtt'

注意与服务端的地址的区别,这里需要拼接上/mqtt后缀!!

注意.env中的属性必须以VUE_APP开头,否则vue程序无法读取到配置。并且增加了配置之后,记得重新install,热加载也是无法读取到新的配置的。

vuex模块

/store/modules目录下创建mqtt.js文件

javascript



复制代码







 import { guid } from '@/utils/guid'
 import mqtt from 'mqtt'
 ​
 const state = {
   // 连接服务器的url
   url: undefined,
   // 客户端实例
   client: undefined,
   // 订阅主题的集合,key为topic, value为接收到该topic时需要执行的回调
   subscribeMembers: {}
 }
 ​
 const clientOptions = {
   mqttVersion: 5,
   clean: true, // true: 清除会话, false: 保留会话
   connectTimeout: 4000, // 超时时间
   keepAlive:60,
   // 认证信息
   clientId: 'pc_mqtt_client_' + guid(),
   username: 'xxxx',
   password: 'xxxx',
   qos: 1
 }
 ​
 const mutations = {
   INIT_SERVER_URL(state, url) {
     state.url = url
   },
   INIT(state, client) {
     state.client = client
   },
   DESTROY(state) {
     state.client = undefined
     state.subscribeMembers = {}
   },
   SUBSCRIBE(state, { topic, callback }) {
     state.subscribeMembers[topic] = callback
   },
   UNSUBSCRIBE(state, topic) {
     state.subscribeMembers[topic] = undefined
   }
 }
 ​
 const actions = {
   // 创建mqtt连接
   connect({ commit }, url) {
     let client = mqtt.connect(url, clientOptions)
     client.on("connect", onConnect)
     client.on("reconnect", onReconnect)
     client.on("error", onError)
     client.on("message", onMessage)
     commit('INIT', client)
     commit('INIT_SERVER_URL', url)
   },
 ​
   disconnect({ commit }) {
     state.client.end()
     commit('DESTROY')
     console.log(`服务器已断开连接!`)
   },
 ​
   /**
    *  订阅
    * @param commit
    * @param dispatch
    * @param topic            消息主题
    * @param subscribeOption  订阅设置
    * @param callback         接收消息的回调
    */
   subscribe({ commit, dispatch }, {topic, callback, subscribeOption}) {
     if (!state.client) {
       dispatch('connect')
     }
     state.client.subscribe(topic,
       subscribeOption || {qos: clientOptions.qos},
       (error, granted) => {
         if (error) {
           console.log(`订阅主题: ${topic}失败: `,
             error)
         } else {
           console.log(`订阅主题: ${topic}成功`)
         }
       })
     commit('SUBSCRIBE', { topic, callback })
   },
 ​
   /**
    *  取消订阅
    * @param commit
    * @param topic  消息主题
    */
   unsubscribe({ commit }, topic) {
     if (!state.client) {
       return
     }
     state.client.unsubscribe(topic,
       {},
       (error, granted) => {
         if (error) {
           console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}失败: `,
             error)
         } else {
           console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}成功`)
         }
       })
     commit('UNSUBSCRIBE', topic)
   },
 ​
   publish({ commit, dispatch }, { topic, message }) {
     if (!state.client || !state.client.connected) {
       dispatch('connect')
     }
     state.client.publish(topic, message, {qos: clientOptions.qos}, (e) => {
       if(e) {
         console.log(`客户端: ${clientOptions.clientId}, 发送主题为: ${topic} 的消息, 发送失败: `, e)
       }
     })
   }
 ​
 }
 ​
 const onConnect = (e) => {
   console.log(`客户端: ${clientOptions.clientId}, 连接emqx服务器成功:`, e)
 }
 const onReconnect = (error) => {
   console.log(`客户端: ${clientOptions.clientId}, 正在重连:`, error)
 }
 const onError = (error) => {
   console.log(`客户端: ${clientOptions.clientId}, 连接失败:`, error)
 }
 const onMessage = (topic, message) => {
   console.log(`客户端: ${clientOptions.clientId}, 接收到来自主题: ${topic} 的消息: `, message?.toString())
   let callback = state.subscribeMembers[topic]
   callback?.(topic, message?.toString())
 }
 ​
 export default {
   namespaced: true,
   state,
   mutations,
   actions
 }

clientOptions中同样填入在管理页面创建的账号,clientId的逻辑与后端代码不一样,这里使用固定前缀加上guid的形式,如果是移动设备,可以使用设备的序列号来确保id的唯一性。

使用vuex时需要注意mutationsactions的区别,异步操作确保在actions中执行,mutations中只做对于state中属性的更新。

mutationsactions的方法中如果要传入多个参数,需要用对象包裹,否则是接收不到后面的参数的,因为他们的方法中定义的参数只有一个payloadts的好处在这里就体现出来了。

这里着重讲以下onMessage,在subscribe方法中,我们传入了topic及其回调方法,以key-value的形式放入到subscribeMembers中。在onMessage方法中,我们根据topic拿到对应的回调函数之后,再将topic和消息体作为入参传入回调,从而达到通知组件的目的。

最后,将mqtt模块注册到vuex实例上。

javascript



复制代码







 ​
 import settings from './modules/settings'
 import getters from './getters'
 import mqtt from './modules/mqtt'
 ​
 Vue.use(Vuex)
 ​
 const store = new Vuex.Store({
   modules: {
     settings,
     getters,
     mqtt
   },
   getters
 })
 ​
 export default store

订阅组件封装

xml
复制代码







 <template>
   <div></div>
 </template>
 ​
 <script>
 ​
 export default {
   name: 'MqttSubscribeClient',
   props: {
     topic: {
       type: String,
       required: true
     },
     global: {
       type: Boolean,
       default: false
     }
   },
   computed: {
     subscribeTopic() {
       return this.global ? this.topic : `${this.topic}/${this.$store.getters.name}`
     }
   },
   mounted() {
     this.$store.dispatch('mqtt/subscribe', {topic: this.subscribeTopic, callback: this.subscribeCallback})
   },
   beforeDestroy() {
     this.$store.dispatch('mqtt/unsubscribe', this.subscribeTopic)
   },
   data() {
     return {
 ​
     }
   },
   methods: {
     subscribeCallback(topic, message) {
       this.$emit("receive", topic, JSON.parse(message))
     }
   }
 }
 </script>
 ​
 <style scoped>
 ​
 </style>
 ​

组件需要传入topic和是否为全局消息订阅,在mounted()方法中,订阅消息,在beforeDestroy()方法中取消订阅。

创建连接

App.vue中,即程序的入口中,统一管理mqtt连接的创建和销毁。

javascript



复制代码







 crated() {
   // 创建mqtt连接
   this.$store.dispatch('mqtt/connect', process.env.VUE_APP_BASE_EMQX_SERVER_HOST)
 },
 beforeDestroy() {
   this.$store.dispatch('mqtt/disconnect')
 }

为什么不把url直接放到mqtt模块中呢,因为它只是一个工具,而连接的配置属于默认约定,直接放到mqtt模块中,限制太大。如果在系统中需要与多个服务器连接,那么在外部传入url的话,扩展性更好。

这里如果修改了代码,导致浏览器页面刷新,连接也是会重新建立的,当然这也是可以忍受的。

如果还希望优化,可以在beforeDestroy()中将连接信息写入localStorage,而不是直接销毁连接,然后在created()方法中,先读缓存,如果没有再创建连接。当然也要注意缓存中的客户端的连接状态,如果时间太久,那么很可能服务端长时间收不到心跳,会自动剔除客户端。

使用订阅组件

ini

复制代码







 <mqtt-subscribe-client :topic="'your/topic'" @receive="receiveMessage"/>
javascript



复制代码







 receiveMessage(topic, message) {
   // 处理消息
 }

至此,vue集成mqtt客户端就完成了。需要注意的地方有:

  • 如果使用websocket连接,服务端URL需要拼接上/mqtt

  • 使用vuex统一管理客户端实例,注意在程序退出时,销毁实例

  • 使用自定义组件封装订阅行为,方便业务组件调用,注意回调函数的入参

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYZUuENC' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片