Spring MQTT pub/sub

Moki Lv6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package pl.myapp.mqtt;

import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Service
public class Mqtt {

public MqttConnectionOptions getMqttConnectOptions() {
var mqttConnectOptions = new MqttConnectionOptions();
mqttConnectOptions.setCleanStart(false);
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setKeepAliveInterval(10);
mqttConnectOptions.setAutomaticReconnect(true);

mqttConnectOptions.setServerURIs(new String[]{"tcp://example.com:1883"});
mqttConnectOptions.setUserName("myUser");
mqttConnectOptions.setPassword("myPassword".getBytes(StandardCharsets.UTF_8));
return mqttConnectOptions;
}


// input

private static final ExecutorService inputExecutors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

@Bean
public MessageChannel mqttInputChannel() {
return new ExecutorChannel(inputExecutors);
}

@Bean
public MessageProducer inbound(MessageChannel mqttInputChannel) {
var adapter = new Mqttv5PahoMessageDrivenChannelAdapter(getMqttConnectOptions(), "read-client-" + UUID.randomUUID(), "my/topic/+");
adapter.setCompletionTimeout(5000);
adapter.setMessageConverter(new StringMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel);
return adapter;
}

@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
byte[] payload=(byte[]) message.getPayload();

System.out.println(topic + " " + Hex.encodeHexString(payload));
};
}


// output

@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
var messageHandler = new Mqttv5PahoMessageHandler(getMqttConnectOptions(), "client-write-" + UUID.randomUUID());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("void");
return messageHandler;
}

private static final ExecutorService outputExecutors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

@Bean
public MessageChannel mqttOutboundChannel() {
return new ExecutorChannel(outputExecutors);
}

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {

void send(@Header("mqtt_topic") String topic, byte[] payload);

void send(@Header("mqtt_topic") String topic, String payload);

}

}
On this page
Spring MQTT pub/sub