简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的”轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
使用docker安装mqtt服务 emqx
1
| docker run -d --name emqx -p 18083:18083 -p 1883:1883 emqx:4.4
|
访问控制界面
http://localhost:18083 admin/public
使用spring mqtt
MQTT Support (spring.io)
依赖
1 2 3 4
| <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
|
inboud(收消息)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @SpringBootApplication public class MqttJavaApplication {
public static void main(String[] args) { new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); } @Bean public MqttConnectOptions getMqttConnectOptions() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName(""); mqttConnectOptions.setPassword("".toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); return mqttConnectOptions; }
@Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); }
@Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( "testClient", mqttClientFactory(), topic1", "topic2"); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; }
@Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() {
@Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println(message.getPayload()); }
}; } }
|
outbound(发消息)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| @SpringBootApplication @IntegrationComponentScan public class MqttJavaApplication {
public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); MyGateway gateway = context.getBean(MyGateway.class); gateway.sendToMqtt("foo"); }
@Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" }); options.setUserName("username"); options.setPassword("password".toCharArray()); factory.setConnectionOptions(options); return factory; }
@Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("testClient", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic("testTopic"); return messageHandler; }
@Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); }
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MyGateway { void sendToMqtt(String data); void sendToMqtt(String data,@Header(MqttHeaders.TOPIC) String topic); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param qos 对消息处理的几种机制。 * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。 * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。 * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 * @param payload 消息主体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
}
|
测试客户端
MQTT Explorer