1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
| package pl.myapp.mqtt;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.ExecutorChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
@Service public class Mqtt {
public MqttConnectionOptions getMqttConnectOptions() { var mqttConnectOptions = new MqttConnectionOptions(); mqttConnectOptions.setCleanStart(false); mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setKeepAliveInterval(10); mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setServerURIs(new String[]{"tcp://example.com:1883"}); mqttConnectOptions.setUserName("myUser"); mqttConnectOptions.setPassword("myPassword".getBytes(StandardCharsets.UTF_8)); return mqttConnectOptions; }
private static final ExecutorService inputExecutors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
@Bean public MessageChannel mqttInputChannel() { return new ExecutorChannel(inputExecutors); }
@Bean public MessageProducer inbound(MessageChannel mqttInputChannel) { var adapter = new Mqttv5PahoMessageDrivenChannelAdapter(getMqttConnectOptions(), "read-client-" + UUID.randomUUID(), "my/topic/+"); adapter.setCompletionTimeout(5000); adapter.setMessageConverter(new StringMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel); return adapter; }
@Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return message -> { String topic = (String) message.getHeaders().get("mqtt_receivedTopic"); byte[] payload=(byte[]) message.getPayload(); System.out.println(topic + " " + Hex.encodeHexString(payload)); }; }
@Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { var messageHandler = new Mqttv5PahoMessageHandler(getMqttConnectOptions(), "client-write-" + UUID.randomUUID()); messageHandler.setAsync(true); messageHandler.setDefaultTopic("void"); return messageHandler; }
private static final ExecutorService outputExecutors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
@Bean public MessageChannel mqttOutboundChannel() { return new ExecutorChannel(outputExecutors); }
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MyGateway {
void send(@Header("mqtt_topic") String topic, byte[] payload);
void send(@Header("mqtt_topic") String topic, String payload);
}
}
|