Skip to content

Add Inbound Channel Adapter for MQTT based on hivemq#10989

Open
mjd507 wants to merge 1 commit into
spring-projects:mainfrom
mjd507:hivemq-mqtt
Open

Add Inbound Channel Adapter for MQTT based on hivemq#10989
mjd507 wants to merge 1 commit into
spring-projects:mainfrom
mjd507:hivemq-mqtt

Conversation

@mjd507
Copy link
Copy Markdown
Contributor

@mjd507 mjd507 commented May 6, 2026

Related to: #3102

  • introduce a new spring-integration-hivemq module for MQTT adapters.
  • basically, the inbound message driven adapters are based on the MqttClient(either Mqtt5AsyncClient or Mqtt3AsyncClient) and a topic.
  • abstract common properties setup (topic/qos/manualAck/executor) for both mqtt v3 and v5 from hivemq side.
  • abstract common properties setup (messageConverter/payloadType) for both mqtt v3 and v5 from SI side.
  • for Mqtt5MessageDrivenChannelAdapter, there are some additional properties (noLocal/retainHandling/retainAsPublished) for subscription, also a Mqtt5HeaderMapper is used for mapping v5 specific headers like contentType/responseTopic/correlationData/UserProperties.
  • introduce MqttClientConnectionCoordinator for coordinating MQTT client connections and disconnections. avoid race-induced state exception. these v3 and v5 Coordinators are intent for internal use only.
  • provide setMqttConnect(MqttConnect) in v5 and setMqtt3ConnectView(Mqtt3ConnectView) in v3, if specific setup needed in connect. for example: cleanStart/keepAlive.
  • if connect or subscribe failed, a MqttConnectionFailedEvent will be published. after subscribe success, a MqttSubscribedEvent will be published.
  • if AutomaticReconnect is applied by the mqttClient, after reconnect, topic will be automatic subscribed as well.

GenericContainer<?> HIVEMQ_CONTAINER = new GenericContainer<>("hivemq/hivemq-ce:2024.3")
.withExposedPorts(CONTAINER_PORT)
.withCreateContainerCmdModifier(cmd -> cmd.withHostConfig(new HostConfig()
.withPortBindings(new PortBinding(Ports.Binding.bindPort(MAPPED_PORT), new ExposedPort(CONTAINER_PORT)))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

due to mapped port will be changed after container stop then start,
I haven't found a way to set the new mapped port to the already built MqttClient,
so nows set a fixed mapped port here.

Related to: spring-projects#3102

- introduce a new `spring-integration-hivemq` module for MQTT adapters.
- basically, the inbound message driven adapters are based on the `MqttClient`(either `Mqtt5AsyncClient` or `Mqtt3AsyncClient`) and a `topic`.
- abstract common properties setup (topic/qos/manualAck/executor) for both mqtt v3 and v5 from hivemq side.
- abstract common properties setup (messageConverter/payloadType) for both mqtt v3 and v5 from SI side.
- for Mqtt5MessageDrivenChannelAdapter, there are some additional properties (noLocal/retainHandling/retainAsPublished) for subscription, also a `Mqtt5HeaderMapper` is used for mapping v5 specific headers like contentType/responseTopic/correlationData/UserProperties.
- introduce `MqttClientConnectionCoordinator` for coordinating MQTT client connections and disconnections. avoid race-induced state exception. these v3 and v5 Coordinators are intent for internal use only.
- provide `setMqttConnect(MqttConnect)` in v5 and `setMqtt3ConnectView(Mqtt3ConnectView)` in v3, if specific setup needed in connect. for example: cleanStart/keepAlive.
- if connect or subscribe failed, a `MqttConnectionFailedEvent` will be published. after subscribe success, a `MqttSubscribedEvent` will be published.
- if AutomaticReconnect is applied by the mqttClient, after reconnect, topic will be automatic subscribed as well.

Signed-off-by: Jiandong Ma <jiandong.ma.cn@gmail.com>
@mjd507
Copy link
Copy Markdown
Contributor Author

mjd507 commented May 12, 2026

here is the PR in my own repo for outbound v3/v5 adapters (mjd507#1).
I will modify and raise it once the inbound review completed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant