如何通过 MQTT 传输并使用 RabbitMQ 和 Spring-AMQP 在

所以我已经让 MQTT -> MQTT 和 AMQP -> AMQP 工作了;MQTT -> AMQP 的翻译似乎并没有在某处工作.这是我的测试,如果我的监听器"也在使用 paho 的 MQTT 中,则它通过了,但是这个 rabbitmq 实现没有.

So I've gotten MQTT -> MQTT and AMQP -> AMQP to work; the translation of MQTT -> AMQP doesn't seem to be working somewhere though. Here's my test, it passes if my "listener" is also in MQTT using paho, but this rabbitmq implementation doesn't.

internal open class ProvisioningTest @Autowired constructor(
    private val mqtt: IMqttAsyncClient,
    private val mapper: ObjectMapper
) {

    fun provision() {
        val entity = Foley(
            rfid = UUID.randomUUID().toString(),

        val called = AtomicBoolean(false)
        mqtt.subscribe("foley/created", 1) { _, _ -> called.set(true) }

        mqtt.publish("foley/new", MqttMessage(mapper.writeValueAsBytes(entity)))

        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilTrue(called)

这是将保存的实体发布到另一个队列的侦听器;当我发布到 MQTT 时,它永远不会被调用.

this is the listener that publishes the saved entity to the other queue; it never gets called when I publish to MQTT.

open class Provisioning(private val repo: FoleyRepo) {
    private val log: Logger = LogManager.getLogger(this::class.java)

    @RabbitListener(queuesToDeclare = [Queue("foley.new")] )
    open fun listen(entity: Foley): Foley {
        log.trace("saving: {}", entity)
        val save = repo.save(entity)
        log.debug("saved: {}", save)
        return save



open class MessagingConfig {

    open fun client(
        @Value("tcp://${mqtt.client.host:localhost}:${mqtt.client.port:1883}") uri: String,
        @Value("${mqtt.client.user:#{null}}") user: String?,
        @Value("${mqtt.client.pass:#{null}}") pass: CharArray?
    ): IMqttAsyncClient {

        val connOpt = MqttConnectOptions()
        user?.let { connOpt.userName = it }
        pass?.let { connOpt.password = it }
        connOpt.isCleanSession = false
        connOpt.isAutomaticReconnect = true
        val client = MqttAsyncClient(uri, MqttAsyncClient.generateClientId(), MemoryPersistence())
        return client

    open fun messageConverter( om: ObjectMapper): MessageConverter {
        return Jackson2JsonMessageConverter(om)

    open fun builder(): Jackson2ObjectMapperBuilderCustomizer {
        return Jackson2ObjectMapperBuilderCustomizer {
            it.modules(JavaTimeModule(), KotlinModule())

使用启用了 mqtt 的 官方 docker rabbitmq 映像.

using the official docker rabbitmq image with mqtt enabled.


What do I need to correct to make this work?



The MQTT plugin publishes to the amq.topic with the mqtt topic name as the routing key.

在消费者端,它使用路由键将自动删除队列绑定到该交换;在以下示例中,队列名为 mqtt-subscription-mqttConsumerqos1.

On the consumer side, it binds an auto-delete queue to that exchange, with the routing key; in the following example, the queue is named mqtt-subscription-mqttConsumerqos1.

为了通过 AMQP 接收 MQTT 消息,您需要将自己的队列绑定到交换器.这是一个例子:

In order to receive MQTT messages over AMQP, you need to bind your own queue to the exchange. Here is an example:

public class So54995261Application {

    public static void main(String[] args) {
        SpringApplication.run(So54995261Application.class, args);

    @ServiceActivator(inputChannel = "toMQTT")
    public MqttPahoMessageHandler sendIt(MqttPahoClientFactory clientFactory) {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler("clientId", clientFactory);
        return handler;

    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://localhost:1883" });
        return factory;

    public MessageProducerSupport mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("mqttConsumer",
                mqttClientFactory(), "so54995261");
        return adapter;

    public IntegrationFlow flow() {
        return IntegrationFlows.from(mqttInbound())

    @RabbitListener(queues = "so54995261")
    public void listen(byte[] in) {
        System.out.println(new String(in));

    public Queue queue() {
        return new Queue("so54995261");

    public Binding binding() {
        return new Binding("so54995261", DestinationType.QUEUE, "amq.topic", "so54995261", null);

    public ApplicationRunner runner(MessageChannel toMQTT) {
        return args -> toMQTT.send(new GenericMessage<>("foo"));


