Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ SSL is supported for the following service connections:
* Cassandra
* Elasticsearch
* MongoDB
* RabbitMQ (excluding streams)
* RabbitMQ
* RabbitMQ Streams
* Redis

To enable SSL support for a service, you can use https://docs.docker.com/reference/compose-file/services/#labels[service labels].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ If you need to create more javadoc:org.springframework.rabbit.stream.producer.Ra



[[messaging.amqp.sending-stream.ssl]]
=== SSL
To use SSL with RabbitMQ Streams, set configprop:spring.rabbitmq.stream.ssl.enabled[] to `true` or set configprop:spring.rabbitmq.stream.ssl.bundle[] to configure the xref:features/ssl.adoc#features.ssl.bundles[SSL bundle] to use.



[[messaging.amqp.receiving]]
== Receiving a Message

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ The SSL annotations are supported for the following service connections:
* Elasticsearch
* Kafka
* MongoDB
* RabbitMQ (excluding streams)
* RabbitMQ
* RabbitMQ Streams
* Redis

The `ElasticsearchContainer` additionally supports automatic detection of server side SSL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ void runCreatesConnectionDetails(RabbitConnectionDetails connectionDetails) {
}

@DockerComposeTest(composeFile = "rabbit-ssl-compose.yaml", image = TestImage.RABBITMQ,
additionalResources = { "ca.crt", "server.crt", "server.key", "client.crt", "client.key", "rabbitmq.conf" })
additionalResources = { "../../ca.crt", "../../server.crt", "../../server.key", "../../client.crt",
"../../client.key", "rabbitmq-ssl.conf" })
void runWithSslCreatesConnectionDetails(RabbitConnectionDetails connectionDetails) {
assertConnectionDetails(connectionDetails);
SslBundle sslBundle = connectionDetails.getSslBundle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.springframework.boot.amqp.autoconfigure.RabbitStreamConnectionDetails;
import org.springframework.boot.docker.compose.service.connection.test.DockerComposeTest;
import org.springframework.boot.ssl.SslBundle;
import org.springframework.boot.testsupport.container.TestImage;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -34,6 +35,15 @@ void runCreatesConnectionDetails(RabbitStreamConnectionDetails connectionDetails
assertConnectionDetails(connectionDetails);
}

@DockerComposeTest(composeFile = "rabbit-stream-ssl-compose.yaml", image = TestImage.RABBITMQ,
additionalResources = { "../../ca.crt", "../../server.crt", "../../server.key", "../../client.crt",
"../../client.key", "rabbitmq-stream-ssl.conf" })
void runWithSslCreatesConnectionDetails(RabbitStreamConnectionDetails connectionDetails) {
assertConnectionDetails(connectionDetails);
SslBundle sslBundle = connectionDetails.getSslBundle();
assertThat(sslBundle).isNotNull();
}

private void assertConnectionDetails(RabbitStreamConnectionDetails connectionDetails) {
assertThat(connectionDetails.getUsername()).isEqualTo("myuser");
assertThat(connectionDetails.getPassword()).isEqualTo("secret");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright 2012-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.amqp.testcontainers;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import com.rabbitmq.stream.Address;
import com.rabbitmq.stream.Environment;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.rabbitmq.RabbitMQContainer;
import org.testcontainers.utility.MountableFile;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.amqp.autoconfigure.EnvironmentBuilderCustomizer;
import org.springframework.boot.amqp.autoconfigure.RabbitAutoConfiguration;
import org.springframework.boot.amqp.autoconfigure.RabbitConnectionDetails;
import org.springframework.boot.amqp.autoconfigure.RabbitStreamConnectionDetails;
import org.springframework.boot.amqp.testcontainers.RabbitContainerConnectionDetailsFactory.RabbitMqContainerConnectionDetails;
import org.springframework.boot.amqp.testcontainers.RabbitStreamContainerConnectionDetailsFactory.RabbitMqStreamContainerConnectionDetails;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.testcontainers.service.connection.PemKeyStore;
import org.springframework.boot.testcontainers.service.connection.PemTrustStore;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.boot.testsupport.container.TestImage;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.rabbit.stream.support.StreamAdmin;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for {@link RabbitStreamContainerConnectionDetailsFactory} with SSL.
*
* @author Eddú Meléndez
* @author Andy Wilkinson
*/
@SpringJUnitConfig
@TestPropertySource(
properties = { "spring.rabbitmq.stream.name=stream.queue1", "spring.rabbitmq.listener.type=stream" })
@Testcontainers(disabledWithoutDocker = true)
class RabbitStreamWithSslContainerConnectionDetailsFactoryIntegrationTests {

private static final int RABBITMQ_STREAMS_TLS_PORT = 5551;

@Container
@ServiceConnection(type = RabbitStreamConnectionDetails.class)
@PemTrustStore(certificate = "classpath:org/springframework/boot/amqp/ca.crt")
@PemKeyStore(certificate = "classpath:org/springframework/boot/amqp/client.crt",
privateKey = "classpath:org/springframework/boot/amqp/client.key")
static final RabbitMQContainer rabbit = getRabbitMqStreamContainer();

private static RabbitMQContainer getRabbitMqStreamContainer() {
RabbitMQContainer container = TestImage.container(RabbitMQContainer.class);
container.addExposedPorts(RABBITMQ_STREAMS_TLS_PORT);
String enabledPlugins = "[rabbitmq_stream,rabbitmq_prometheus].";
container.withCopyToContainer(Transferable.of(enabledPlugins), "/etc/rabbitmq/enabled_plugins");
container.withCopyFileToContainer(
MountableFile
.forClasspathResource("org/springframework/boot/amqp/testcontainers/rabbitmq-stream-ssl.conf"),
"/etc/rabbitmq/rabbitmq.conf");
container.withCopyFileToContainer(MountableFile.forClasspathResource("org/springframework/boot/amqp/ca.crt"),
"/etc/rabbitmq/ca.crt");
container.withCopyFileToContainer(
MountableFile.forClasspathResource("org/springframework/boot/amqp/server.key"),
"/etc/rabbitmq/server.key");
container.withCopyFileToContainer(
MountableFile.forClasspathResource("org/springframework/boot/amqp/server.crt"),
"/etc/rabbitmq/server.crt");
return container;
}

@Autowired(required = false)
private RabbitConnectionDetails connectionDetails;

@Autowired(required = false)
private RabbitStreamConnectionDetails streamConnectionDetails;

@Autowired
private RabbitStreamTemplate rabbitStreamTemplate;

@Autowired
private TestListener listener;

@Test
void connectionCanBeMadeToRabbitContainer() {
assertThat(this.connectionDetails).isNotInstanceOf(RabbitMqContainerConnectionDetails.class);
assertThat(this.streamConnectionDetails).isInstanceOf(RabbitMqStreamContainerConnectionDetails.class);
assertThat(this.streamConnectionDetails.getSslBundle()).isNotNull();
this.rabbitStreamTemplate.convertAndSend("message");
Awaitility.waitAtMost(Duration.ofMinutes(4))
.untilAsserted(() -> assertThat(this.listener.messages).containsExactly("message"));
}

@Configuration(proxyBeanMethods = false)
@ImportAutoConfiguration(RabbitAutoConfiguration.class)
static class TestConfiguration {

@Bean
StreamAdmin streamAdmin(Environment env) {
return new StreamAdmin(env, (sc) -> sc.stream("stream.queue1").create());
}

@Bean
EnvironmentBuilderCustomizer environmentBuilderCustomizer() {
return (env) -> env.addressResolver(
(address) -> new Address(rabbit.getHost(), rabbit.getMappedPort(RABBITMQ_STREAMS_TLS_PORT)));
}

@Bean
TestListener testListener() {
return new TestListener();
}

}

static class TestListener {

private final List<String> messages = new ArrayList<>();

@RabbitListener(queues = "stream.queue1")
void processMessage(String message) {
this.messages.add(message);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ services:
- ssl-key
- ssl-cert
volumes:
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro
- ./rabbitmq-ssl.conf:/etc/rabbitmq/rabbitmq.conf:ro
labels:
- 'org.springframework.boot.sslbundle.pem.keystore.certificate=client.crt'
- 'org.springframework.boot.sslbundle.pem.keystore.private-key=client.key'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
services:
rabbitmq:
image: '{imageName}'
environment:
- 'RABBITMQ_DEFAULT_USER=myuser'
- 'RABBITMQ_DEFAULT_PASS=secret'
configs:
- source: plugins
target: /etc/rabbitmq/enabled_plugins
ports:
- '5551'
- '5552'
secrets:
- ssl-ca
- ssl-key
- ssl-cert
volumes:
- ./rabbitmq-stream-ssl.conf:/etc/rabbitmq/rabbitmq.conf:ro
labels:
- 'org.springframework.boot.sslbundle.pem.keystore.certificate=client.crt'
- 'org.springframework.boot.sslbundle.pem.keystore.private-key=client.key'
- 'org.springframework.boot.sslbundle.pem.truststore.certificate=ca.crt'
secrets:
ssl-ca:
file: 'ca.crt'
ssl-key:
file: 'server.key'
ssl-cert:
file: 'server.crt'
configs:
plugins:
content: "[rabbitmq_stream]."
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
stream.listeners.ssl.1 = 5551

ssl_options.cacertfile=/run/secrets/ssl-ca
ssl_options.certfile=/run/secrets/ssl-cert
ssl_options.keyfile=/run/secrets/ssl-key

ssl_options.verify=verify_peer
ssl_options.fail_if_no_peer_cert=true
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
stream.listeners.ssl.1 = 5551

ssl_options.cacertfile=/etc/rabbitmq/ca.crt
ssl_options.certfile=/etc/rabbitmq/server.crt
ssl_options.keyfile=/etc/rabbitmq/server.key

ssl_options.verify=verify_peer
ssl_options.fail_if_no_peer_cert=true
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* @author Scott Frederick
* @author Lasse Wulff
* @author Yanming Zhou
* @author Jay Choi
* @since 4.0.0
*/
@ConfigurationProperties("spring.rabbitmq")
Expand Down Expand Up @@ -1311,6 +1312,11 @@ public static final class Stream {
*/
private @Nullable String name;

/**
* SSL configuration for RabbitMQ instance with the Stream plugin enabled.
*/
private final Ssl ssl = new Ssl();

public String getHost() {
return this.host;
}
Expand Down Expand Up @@ -1359,6 +1365,45 @@ public void setName(@Nullable String name) {
this.name = name;
}

public Ssl getSsl() {
return this.ssl;
}

public static class Ssl {

/**
* Whether to enable SSL support. Enabled automatically if "bundle" is
* provided.
*/
private @Nullable Boolean enabled;

/**
* SSL bundle name.
*/
private @Nullable String bundle;

public @Nullable Boolean getEnabled() {
return this.enabled;
}

public boolean determineEnabled() {
return Boolean.TRUE.equals(getEnabled()) || this.bundle != null;
}

public void setEnabled(@Nullable Boolean enabled) {
this.enabled = enabled;
}

public @Nullable String getBundle() {
return this.bundle;
}

public void setBundle(@Nullable String bundle) {
this.bundle = bundle;
}

}

}

}
Loading
Loading