Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,61 +342,64 @@ impl ClusterSandbox {
.connect_lazy()
}

/// Returns a client to one of the nodes that runs the specified service
pub fn rest_client(&self, service: QuickwitService) -> QuickwitClient {
let node_config = self.find_node_for_service(service);

let certificate = if let Some(tls_conf) = &node_config.rest_config.tls {
let cert_bytes = std::fs::read(&tls_conf.ca_path).unwrap();
Some(reqwest::tls::Certificate::from_pem(&cert_bytes).unwrap())
fn tls_parts(
node_config: &NodeConfig,
) -> (
Option<reqwest::tls::Certificate>,
Option<reqwest::tls::Identity>,
) {
let Some(tls_conf) = &node_config.rest_config.tls else {
return (None, None);
};
let ca_bytes = std::fs::read(&tls_conf.ca_path).unwrap();
let ca_cert = reqwest::tls::Certificate::from_pem(&ca_bytes).unwrap();
let identity = if tls_conf.validate_client {
let mut pem = std::fs::read(&tls_conf.key_path).unwrap();
pem.extend(std::fs::read(&tls_conf.cert_path).unwrap());
Some(reqwest::tls::Identity::from_pem(&pem).unwrap())
} else {
None
};
(Some(ca_cert), identity)
}

/// Returns a client to one of the nodes that runs the specified service
pub fn rest_client(&self, service: QuickwitService) -> QuickwitClient {
let node_config = self.find_node_for_service(service);
let (ca_cert, identity) = Self::tls_parts(&node_config);
QuickwitClientBuilder::new(transport_url(
node_config.rest_config.listen_addr,
certificate.is_some(),
ca_cert.is_some(),
))
.set_tls_ca(certificate)
.set_tls_ca(ca_cert)
.set_tls_identity(identity)
.build()
}

/// A client configured to ingest documents and return detailed parse failures.
pub fn detailed_ingest_client(&self) -> QuickwitClient {
let node_config = self.find_node_for_service(QuickwitService::Indexer);

let certificate = if let Some(tls_conf) = &node_config.rest_config.tls {
let cert_bytes = std::fs::read(&tls_conf.ca_path).unwrap();
Some(reqwest::tls::Certificate::from_pem(&cert_bytes).unwrap())
} else {
None
};

let (ca_cert, identity) = Self::tls_parts(&node_config);
QuickwitClientBuilder::new(transport_url(
node_config.rest_config.listen_addr,
certificate.is_some(),
ca_cert.is_some(),
))
.set_tls_ca(certificate)
.set_tls_ca(ca_cert)
.set_tls_identity(identity)
.detailed_response(true)
.build()
}

// TODO(#5604)
pub fn rest_client_legacy_indexer(&self) -> QuickwitClient {
let node_config = self.find_node_for_service(QuickwitService::Indexer);

let certificate = if let Some(tls_conf) = &node_config.rest_config.tls {
let cert_bytes = std::fs::read(&tls_conf.ca_path).unwrap();
Some(reqwest::tls::Certificate::from_pem(&cert_bytes).unwrap())
} else {
None
};

let (ca_cert, identity) = Self::tls_parts(&node_config);
QuickwitClientBuilder::new(transport_url(
node_config.rest_config.listen_addr,
certificate.is_some(),
ca_cert.is_some(),
))
.set_tls_ca(certificate)
.set_tls_ca(ca_cert)
.set_tls_identity(identity)
.use_legacy_ingest(true)
.build()
}
Expand Down
58 changes: 58 additions & 0 deletions quickwit/quickwit-integration-tests/src/tests/tls_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,61 @@ async fn test_tls_grpc() {

sandbox.shutdown().await.unwrap();
}

#[tokio::test]
async fn test_mtls_rest() {
quickwit_common::setup_logging_for_tests();
let mut sandbox_config = ClusterSandboxBuilder::default()
.add_node(QuickwitService::supported_services())
.build_config()
.await;
// Reuse the server cert/key as the client identity — it is signed by the same CA, so the
// server's WebPkiClientVerifier will accept it.
sandbox_config.node_configs[0].0.rest_config.tls = Some(quickwit_config::TlsConfig {
cert_path: concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/server.crt").to_string(),
key_path: concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/server.key").to_string(),
ca_path: concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/ca.crt").to_string(),
expected_name: None,
validate_client: true,
});
let sandbox = sandbox_config.start().await;
let listen_addr = sandbox.node_configs[0].0.rest_config.listen_addr;

let ca_bytes = std::fs::read(concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/ca.crt")).unwrap();
let ca_cert = reqwest::tls::Certificate::from_pem(&ca_bytes).unwrap();

// reqwest::Identity::from_pem expects a single buffer with key + cert concatenated.
let mut identity_pem =
std::fs::read(concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/server.key")).unwrap();
identity_pem.extend(
std::fs::read(concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/server.crt")).unwrap(),
);
let client_identity = reqwest::tls::Identity::from_pem(&identity_pem).unwrap();

// A client with no certificate should be rejected during the TLS handshake.
let client_no_cert = reqwest::Client::builder()
.add_root_certificate(ca_cert.clone())
.build()
.unwrap();
let url = format!("https://{listen_addr}/api/v1/indexes");
client_no_cert
.get(&url)
.send()
.await
.expect_err("connection without client certificate should fail");

// A client presenting the client certificate signed by the trusted CA should succeed.
let client_with_cert = reqwest::Client::builder()
.add_root_certificate(ca_cert)
.identity(client_identity)
.build()
.unwrap();
let resp = client_with_cert
.get(&url)
.send()
.await
.expect("connection with valid client certificate should succeed");
assert!(resp.status().is_success());

sandbox.shutdown().await.unwrap();
}
15 changes: 14 additions & 1 deletion quickwit/quickwit-rest-client/src/rest_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use quickwit_serve::{
ListSplitsQueryParams, ListSplitsResponse, RestIngestResponse, SearchRequestQueryString,
};
use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue};
use reqwest::tls::Certificate;
use reqwest::tls::{Certificate, Identity};
use reqwest::{ClientBuilder as ReqwestClientBuilder, Method, StatusCode, Url};
use reqwest_middleware::{ClientBuilder as ReqwestMiddlewareClientBuilder, ClientWithMiddleware};
use reqwest_retry::RetryTransientMiddleware;
Expand Down Expand Up @@ -58,6 +58,7 @@ impl Transport {
endpoint: Url,
connect_timeout: Timeout,
ca_cert: Option<Certificate>,
client_identity: Option<Identity>,
num_retries: u32,
) -> Self {
let base_url = endpoint;
Expand All @@ -73,6 +74,9 @@ impl Transport {
.tls_built_in_root_certs(false)
.add_root_certificate(ca_cert);
}
if let Some(identity) = client_identity {
reqwest_client_builder = reqwest_client_builder.identity(identity);
}
let retry_policy = ExponentialBackoff::builder()
.retry_bounds(Duration::from_secs(1), Duration::from_secs(60))
.build_with_max_retries(num_retries);
Expand Down Expand Up @@ -147,6 +151,8 @@ pub struct QuickwitClientBuilder {
detailed_response: bool,
/// Validate against a custom TLS certificate authority
ca_cert: Option<Certificate>,
/// Client certificate + key for mTLS
client_identity: Option<Identity>,
/// Maximum number of retries for transient errors.
num_retries: u32,
}
Expand All @@ -163,6 +169,7 @@ impl QuickwitClientBuilder {
use_legacy_ingest: false,
detailed_response: false,
ca_cert: None,
client_identity: None,
num_retries: 0,
}
}
Expand Down Expand Up @@ -208,6 +215,11 @@ impl QuickwitClientBuilder {
self
}

pub fn set_tls_identity(mut self, identity: Option<Identity>) -> Self {
self.client_identity = identity;
self
}

pub fn num_retries(mut self, num_retries: u32) -> Self {
self.num_retries = num_retries;
self
Expand All @@ -218,6 +230,7 @@ impl QuickwitClientBuilder {
self.base_url,
self.connect_timeout,
self.ca_cert,
self.client_identity,
self.num_retries,
);
QuickwitClient {
Expand Down
23 changes: 15 additions & 8 deletions quickwit/quickwit-serve/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,9 @@ mod tls {
use std::{fs, io};

use quickwit_config::TlsConfig;
use rustls::RootCertStore;
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use rustls::server::WebPkiClientVerifier;
use tokio_rustls::rustls::ServerConfig;

fn io_error(error: String) -> io::Error {
Expand Down Expand Up @@ -580,14 +582,19 @@ mod tls {
let certs = load_certs(&config.cert_path)?;
let key = load_private_key(&config.key_path)?;

// TODO we could add support for client authorization, it seems less important than on the
// gRPC side though
if config.validate_client {
anyhow::bail!("mTLS isn't supported on rest api");
}

let mut cfg = rustls::ServerConfig::builder()
.with_no_client_auth()
let builder = rustls::ServerConfig::builder();
let builder = if config.validate_client {
let ca_certs = load_certs(&config.ca_path)?;
let mut roots = RootCertStore::empty();
for ca_cert in ca_certs {
roots.add(ca_cert)?;
}
let verifier = WebPkiClientVerifier::builder(Arc::new(roots)).build()?;
builder.with_client_cert_verifier(verifier)
} else {
builder.with_no_client_auth()
};
let mut cfg = builder
.with_single_cert(certs, key)
.map_err(|error| io_error(error.to_string()))?;
// Configure ALPN to accept HTTP/2, HTTP/1.1, and HTTP/1.0 in that order.
Expand Down
Loading