本文主要讲解关于SpringBoot整合MOTT动态读取数据库连接信息并连接MQTT服务端相关内容,让我们来一起学习下吧!
SpringBoot整合MOTT动态读取数据库连接信息并连接MQTT服务端
MQTT介绍:
概述:
MQTT是一种轻量级的消息传输协议(Message Queuing Telemetry Transport),旨在实现设备之间的低带宽和高延迟的通信。它是基于发布/订阅模式(Publish/Subscribe)的消息协议,最初由IBM开发,现在成为了一种开放的标准,被广泛应用于物联网(IoT)领域。
MQTT特点包括:
1、轻量级:MQTT协议设计简单,消息头部轻量,适用于受限环境的设备,如传感器、嵌入式设备等。 2、简单易用:MQTT采用发布/订阅模式,消息的发送者(发布者)和接收者(订阅者)之间解耦,通信过程简单易理解。 3、低带宽、高延迟:MQTT协议设计考虑了网络带宽受限和延迟较高的情况,能够在不理想的网络环境下保持稳定的消息传输。 4、可靠性:MQTT支持消息的持久化和确认机制,确保消息的可靠传输,同时提供了QoS(Quality of Service)等级,可以根据实际需求进行灵活配置。 5、灵活性:MQTT支持多种消息格式和负载类型,可以传输文本、二进制数据等多种类型的消息,同时支持SSL/TLS加密,保障通信安全。 5、适用于多种场景:由于其轻量级和灵活性,MQTT被广泛应用于物联网、传感器网络、远程监控、消息通知等场景,成为连接设备的重要通信协议之一。
话不多说,直接看代码如何连接
因项目需求,本次做的是在项目启动时,动态读取数据库中已经配置好的mqtt连接信息,并且根据这些信息动态的循环连接服务端,在接收到消息后进行持久化和相关逻辑处理。
一、首先加载依赖
<!-- mqtt -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
二、因为是要在项目启动时候连接,但是又要等项目初始化后拿到要用的mapper,所以在这个类中需要实现ApplicationRunner接口,而没有用其他的方法,有多种实现但是我用的这个
package com.ruoyi;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ruoyi.system.domain.mqtt.MqttBean;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author 1097022316
* 启动后建立MQTT连接 并对数据持久化和相关逻辑处理
*/
@Component
public class StartInit implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
MemoryPersistence persistence = new MemoryPersistence();
//这里是从数据库中查询出所有的mqtt连接相关信息,如ip、topic等
List<DeviceCollector> collectorList = collectorService.list(new QueryWrapper<DeviceCollector>().isNotNull("collector_ip").ne("collector_ip", "").eq("is_del", "0").groupBy("collector_topic"));
System.err.println("collectorList = " + collectorList);
//如果没有任何连接 直接结束
if (CollUtil.isEmpty(collectorList)) {
return;
}
List<MqttBean> mqttBeans = new ArrayList<>();
//把从数据库中查询出来的信息组装成mqttclient连接所需要的对象
//一般都是ip、port、username、password、topic、clientid 这里是简单的用法,如有高级用法可自行摸索(顺便在下面评论教一下)
collectorList.forEach(mqtt -> {
MqttBean bean = new MqttBean("tcp://" + mqtt.getCollectorIp() + ":" + mqtt.getCollectorPort(), mqtt.getCollectorUsername(), mqtt.getCollectorPassword(), mqtt.getCollectorTopic(), mqtt.getCollectorClientId());
mqttBeans.add(bean);
});
//对我们组装的mqtt连接对象信息进行遍历循环连接
mqttBeans.forEach(bean -> {
try {
MqttClient mqttClient = new MqttClient(bean.getUrl(), bean.getClientId(), persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
//设置相关连接参数,有些是必要有些是非必要 可自行点进去查看源码
connOpts.setAutomaticReconnect(false);
connOpts.setCleanSession(true);
connOpts.setUserName(bean.getUserName());
connOpts.setPassword(bean.getPassword().toCharArray());
mqttClient.connect(connOpts);
//把连接对象加入到全局
mqttClients.put(bean.getTopics(), mqttClient);
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
if (reconnect) {
System.out.println("Reconnected successfully. url = " + serverURI);
} else {
System.out.println("Connected successfully for the first time.");
}
}
/**
* 设置重连机制
*/
@Override
public void connectionLost(Throwable cause) {
System.err.println(bean.getTopics() + "连接丢失" + cause.getMessage());
if (!mqttClient.isConnected()) {
try {
Thread.sleep(1000 * 60 * 5);
//尝试连接
System.out.println("bean = " + bean);
boolean flag = MyMqttUtils.connectMqtt(new MqttBean(bean.getUrl(), bean.getUserName(), bean.getPassword(), bean.getTopics(), bean.getClientId()));
if (flag) {
System.err.println(bean.getTopics() + "重新连接,重新订阅!");
}
} catch (InterruptedException e) {
MyMqttUtils.saveToTxt(bean.getTopics() + "MQTT连接出异常了" + e.getMessage(), "CLCW");
throw new RuntimeException(e);
}
}
}
@Override
public void messageArrived(String topic, MqttMessage message) {
//这里是当消息推送时我们做的事情
System.out.println("dosomethings...");
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("消息发送完整");
}
});
mqttClient.subscribe(bean.getTopics(), 2);
} catch (Exception e) {
e.printStackTrace();
System.err.println(bean.getTopics() + "MQTT连接出异常了");
MyMqttUtils.saveToTxt(bean.getTopics() + "MQTT连接出异常了"+e.getMessage(),"LJCW");
try {
Thread.sleep(1000 * 60 * 30);
//尝试连接
System.out.println("bean = " + bean);
boolean flag = MyMqttUtils.connectMqtt(new MqttBean(bean.getUrl(), bean.getUserName(), bean.getPassword(), bean.getTopics(), bean.getClientId()));
if (flag) {
System.err.println(bean.getTopics() + "重新连接,重新订阅!");
}
} catch (InterruptedException ex) {
//可以在这里把报错信息存入本地看看
throw new RuntimeException(ex);
}
}
});
}
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author 1097022316
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MqttBean {
private String url;
private String userName;
private String password;
private String topics;
private String clientId;
}
重点是MqttClient中几个参数和配置参数,以及那几个重写的方法,看下源码就好了。这里用的比较粗糙,只是简单的实现了连接和重连,一些复杂的如心跳或者遗嘱啥的都没用,要研究可自行查看
以上就是关于SpringBoot整合MOTT动态读取数据库连接信息并连接MQTT服务端相关的全部内容,希望对你有帮助。欢迎持续关注程序员导航网,学习愉快哦!