Add Inbound Channel Adapter for MQTT based on hivemq#10989
Open
mjd507 wants to merge 1 commit into
Open
Conversation
mjd507
commented
May 6, 2026
| GenericContainer<?> HIVEMQ_CONTAINER = new GenericContainer<>("hivemq/hivemq-ce:2024.3") | ||
| .withExposedPorts(CONTAINER_PORT) | ||
| .withCreateContainerCmdModifier(cmd -> cmd.withHostConfig(new HostConfig() | ||
| .withPortBindings(new PortBinding(Ports.Binding.bindPort(MAPPED_PORT), new ExposedPort(CONTAINER_PORT))) |
Contributor
Author
There was a problem hiding this comment.
due to mapped port will be changed after container stop then start,
I haven't found a way to set the new mapped port to the already built MqttClient,
so nows set a fixed mapped port here.
Related to: spring-projects#3102 - introduce a new `spring-integration-hivemq` module for MQTT adapters. - basically, the inbound message driven adapters are based on the `MqttClient`(either `Mqtt5AsyncClient` or `Mqtt3AsyncClient`) and a `topic`. - abstract common properties setup (topic/qos/manualAck/executor) for both mqtt v3 and v5 from hivemq side. - abstract common properties setup (messageConverter/payloadType) for both mqtt v3 and v5 from SI side. - for Mqtt5MessageDrivenChannelAdapter, there are some additional properties (noLocal/retainHandling/retainAsPublished) for subscription, also a `Mqtt5HeaderMapper` is used for mapping v5 specific headers like contentType/responseTopic/correlationData/UserProperties. - introduce `MqttClientConnectionCoordinator` for coordinating MQTT client connections and disconnections. avoid race-induced state exception. these v3 and v5 Coordinators are intent for internal use only. - provide `setMqttConnect(MqttConnect)` in v5 and `setMqtt3ConnectView(Mqtt3ConnectView)` in v3, if specific setup needed in connect. for example: cleanStart/keepAlive. - if connect or subscribe failed, a `MqttConnectionFailedEvent` will be published. after subscribe success, a `MqttSubscribedEvent` will be published. - if AutomaticReconnect is applied by the mqttClient, after reconnect, topic will be automatic subscribed as well. Signed-off-by: Jiandong Ma <jiandong.ma.cn@gmail.com>
Contributor
Author
|
here is the PR in my own repo for outbound v3/v5 adapters (mjd507#1). |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Related to: #3102
spring-integration-hivemqmodule for MQTT adapters.MqttClient(eitherMqtt5AsyncClientorMqtt3AsyncClient) and atopic.Mqtt5HeaderMapperis used for mapping v5 specific headers like contentType/responseTopic/correlationData/UserProperties.MqttClientConnectionCoordinatorfor coordinating MQTT client connections and disconnections. avoid race-induced state exception. these v3 and v5 Coordinators are intent for internal use only.setMqttConnect(MqttConnect)in v5 andsetMqtt3ConnectView(Mqtt3ConnectView)in v3, if specific setup needed in connect. for example: cleanStart/keepAlive.MqttConnectionFailedEventwill be published. after subscribe success, aMqttSubscribedEventwill be published.