Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions payjoin-mailroom/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub struct Config {
pub storage_dir: PathBuf,
#[serde(deserialize_with = "deserialize_duration_secs")]
pub timeout: Duration,
#[serde(deserialize_with = "deserialize_optional_duration_secs")]
pub ohttp_keys_max_age: Option<Duration>,
pub v1: Option<V1Config>,
#[cfg(feature = "telemetry")]
pub telemetry: Option<TelemetryConfig>,
Expand Down Expand Up @@ -85,6 +87,7 @@ impl Default for Config {
listener: "[::]:8080".parse().expect("valid default listener address"),
storage_dir: PathBuf::from("./data"),
timeout: Duration::from_secs(30),
ohttp_keys_max_age: Some(Duration::from_secs(7 * 24 * 60 * 60)),
v1: None,
#[cfg(feature = "telemetry")]
telemetry: None,
Expand All @@ -104,17 +107,33 @@ where
Ok(Duration::from_secs(secs))
}

fn deserialize_optional_duration_secs<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
where
D: serde::Deserializer<'de>,
{
let secs: Option<u64> = Option::deserialize(deserializer)?;
match secs {
None => Ok(None),
Some(0) => Err(<D::Error as serde::de::Error>::custom(
"ohttp_keys_max_age must be greater than 0 seconds when set",
)),
Some(s) => Ok(Some(Duration::from_secs(s))),
}
}

impl Config {
pub fn new(
listener: ListenerAddress,
storage_dir: PathBuf,
timeout: Duration,
ohttp_keys_max_age: Option<Duration>,
v1: Option<V1Config>,
) -> Self {
Self {
listener,
storage_dir,
timeout,
ohttp_keys_max_age,
v1,
#[cfg(feature = "telemetry")]
telemetry: None,
Expand Down
214 changes: 192 additions & 22 deletions payjoin-mailroom/src/directory.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::path::PathBuf;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

use anyhow::Result;
use axum::body::{Body, Bytes};
use axum::http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, CONTENT_TYPE};
use axum::http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, CACHE_CONTROL, CONTENT_TYPE};
use axum::http::{Method, Request, Response, StatusCode, Uri};
use http_body_util::BodyExt;
use payjoin::directory::{ShortId, ShortIdError, ENCAPSULATED_MESSAGE_BYTES};
use tokio::sync::RwLock;
use tracing::{debug, error, trace, warn};

use crate::db::{Db, Error as DbError, SendableError};
Expand All @@ -28,6 +31,97 @@ const V1_VERSION_UNSUPPORTED_RES_JSON: &str =

pub type BoxError = Box<dyn std::error::Error + Send + Sync>;

// Two-slot OHTTP key set supporting rotation overlap.
//
// Key IDs alternate between 1 and 2. The current key is served to new
// clients; both slots are accepted for decapsulation so that clients
// with a cached previous key still work during the overlap window.
#[derive(Debug)]
pub struct KeyRotatingServer {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming nit. I would suggest a more descriptive name e.g RotatingKeyConfig.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

due to how the ohttp crate is structured, Server has only a single KeyConfig so unless that is refactored (which is a big divergence from the upstream ohttp crate's API) this struct multiplexes Servers not KeyConfig

keys: [Option<ohttp::Server>; 2],
Copy link
Copy Markdown
Collaborator

@arminsabouri arminsabouri Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we ever have more than 2 active keys? If not, I would suggest having two fields in this struct instead of indexing

{
current_key: ohttp::Server,
previous_key: Option<ohttp::Server>,
current_key_created_at: Instant,
}

This at least ensures that the primary key is always Some. And may obviate the need for key ids to be in {1,2}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes , we would have two key for a short while , the grace period of the previous key before it is retired

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. However, it seems odd to me that they are optional. Shouldn't the current key always be Some?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both keys should always be Some #1449 (comment)

current_key_id: u8,
current_key_created_at: Instant,
}

impl KeyRotatingServer {
pub fn from_single(server: ohttp::Server, key_id: u8) -> Self {
assert!(key_id == 1 || key_id == 2, "key_id must be 1 or 2");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, key identifiers are an arbitrary u8: https://www.ietf.org/rfc/rfc9458.html#section-3.1

so probably easier to use key ids 0 and 1 instead of 1 and 2? i think i said 1 and 2 somewhere so sorry if i made it seem like that was an important detail

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be addressed. Ohttp targets can set whatever key id they want

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the mailroom only deals with mailroom ohttp targets and the only reason to support more than 1 key is to do graceful rotation

this may change if say a PQ KEM is added to the OHTTP layer or something like that, but right now supporting more than two distinct keyids serves no purpose that i can see and neither rfc 9458 nor our gateway opt-in extension allows for arbitrary targets, only the mailbox service

let mut keys = [None, None];
keys[(key_id - 1) as usize] = Some(server);
Copy link
Copy Markdown
Contributor

@nothingmuch nothingmuch Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be simpler to just initialize both and avoid the Option IMO, no need to pattern match or expect everywhere

btw that's also why i suggested changing "created_at" to "valid_from", since both are created at the same time but only one is valid from Instant::now()

Self { current_key_id: key_id, keys, current_key_created_at: Instant::now() }
}

pub fn from_pair(
current: (u8, ohttp::Server),
previous: Option<(u8, ohttp::Server)>,
current_key_age: Duration,
) -> Self {
assert!(current.0 == 1 || current.0 == 2, "key_id must be 1 or 2");
let mut keys = [None, None];
keys[(current.0 - 1) as usize] = Some(current.1);
if let Some((id, server)) = previous {
assert!(id == 1 || id == 2, "key_id must be 1 or 2");
keys[(id - 1) as usize] = Some(server);
}
let created_at = Instant::now().checked_sub(current_key_age).unwrap_or_else(Instant::now);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should .expect(), not `.unwrap_or_else. if there's a key claimed to be from the future it's best to crash the server because the host machine is clearly misconfigured

Self { current_key_id: current.0, keys, current_key_created_at: created_at }
}

pub fn current_key_id(&self) -> u8 { self.current_key_id }
pub fn current_key_created_at(&self) -> Instant { self.current_key_created_at }
pub fn next_key_id(&self) -> u8 {
if self.current_key_id == 1 {
2
} else {
1
}
}

// Look up the server matching the key_id in an OHTTP message and
// decapsulate. The first byte of an OHTTP encapsulated request is the
// key identifier (RFC 9458 Section 4.3).
pub fn decapsulate(
&self,
ohttp_body: &[u8],
) -> std::result::Result<(Vec<u8>, ohttp::ServerResponse), ohttp::Error> {
let key_id = ohttp_body.first().copied().unwrap_or(0);
let server = key_id
.checked_sub(1)
.filter(|&i| (i as usize) < 2)
.and_then(|i| self.keys[i as usize].as_ref());
match server {
Some(s) => s.decapsulate(ohttp_body),
None => Err(ohttp::Error::KeyId),
}
}

// Encode the current key's config for serving to clients.
pub fn encode_current(&self) -> std::result::Result<Vec<u8>, ohttp::Error> {
self.keys[(self.current_key_id - 1) as usize]
.as_ref()
.expect("current key must exist")
.config()
.encode()
}

// Install a new key as current, displacing whatever occupied that slot.
// The old current key remains in its slot for overlap decapsulation
// until retire() clears it.
pub fn rotate(&mut self, server: ohttp::Server) {
let new_key_id = self.next_key_id();
self.keys[(new_key_id - 1) as usize] = Some(server);
self.current_key_id = new_key_id;
self.current_key_created_at = Instant::now();
Comment on lines +113 to +114
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.current_key_id = new_key_id;
self.current_key_created_at = Instant::now();

best if this rotates the next key but doesn't yet activate it, see my previous comments discussing why we want two keys and why we want a grace period, if clients whose clock is running a bit fast expire their local key they will request the same key repeatedly, so it's better to already offer the next key (and decapsulate from it) a few seconds before the current key expires, and to still accept an expired key a few seconds after expiry, but then after this grace period already generate the new key to invalidate the old one, removing the need for retire as well

}

// Clear a key slot so it is no longer accepted for decapsulation.
pub fn retire(&mut self, key_id: u8) {
assert!(key_id == 1 || key_id == 2, "key_id must be 1 or 2");
assert_ne!(key_id, self.current_key_id, "cannot retire the current key");
self.keys[(key_id - 1) as usize] = None;
}
}

/// Opaque blocklist of Bitcoin addresses stored as script pubkeys.
///
/// Addresses are converted to `ScriptBuf` at parse time so that
Expand Down Expand Up @@ -91,7 +185,8 @@ fn parse_address_lines(text: &str) -> std::collections::HashSet<bitcoin::ScriptB
#[derive(Clone)]
pub struct Service<D: Db> {
db: D,
ohttp: ohttp::Server,
ohttp: Arc<RwLock<KeyRotatingServer>>,
ohttp_keys_max_age: Option<Duration>,
sentinel_tag: SentinelTag,
v1: Option<V1>,
}
Expand All @@ -117,10 +212,18 @@ where
}

impl<D: Db> Service<D> {
pub fn new(db: D, ohttp: ohttp::Server, sentinel_tag: SentinelTag, v1: Option<V1>) -> Self {
Self { db, ohttp, sentinel_tag, v1 }
pub fn new(
db: D,
ohttp: Arc<RwLock<KeyRotatingServer>>,
ohttp_keys_max_age: Option<Duration>,
sentinel_tag: SentinelTag,
v1: Option<V1>,
) -> Self {
Self { db, ohttp, ohttp_keys_max_age, sentinel_tag, v1 }
}

pub fn ohttp_key_set(&self) -> &Arc<RwLock<KeyRotatingServer>> { &self.ohttp }

async fn serve_request<B>(&self, req: Request<B>) -> Result<Response<Body>>
where
B: axum::body::HttpBody<Data = Bytes> + Send + 'static,
Expand Down Expand Up @@ -200,11 +303,15 @@ impl<D: Db> Service<D> {
.map_err(|e| HandlerError::BadRequest(anyhow::anyhow!(e.into())))?
.to_bytes();

// Decapsulate OHTTP request
let (bhttp_req, res_ctx) = self
.ohttp
.decapsulate(&ohttp_body)
.map_err(|e| HandlerError::OhttpKeyRejection(e.into()))?;
// Decapsulate OHTTP request using the key matching the message's key_id.
// Drop the read guard immediately so long-polling handlers don't
// block key rotation or other readers waiting behind a queued writer.
let (bhttp_req, res_ctx) = {
let keyset = self.ohttp.read().await;
keyset
.decapsulate(&ohttp_body)
.map_err(|e| HandlerError::OhttpKeyRejection(e.into()))?
};
let mut cursor = std::io::Cursor::new(bhttp_req);
let req = bhttp::Message::read_bhttp(&mut cursor)
.map_err(|e| HandlerError::BadRequest(e.into()))?;
Expand Down Expand Up @@ -378,13 +485,22 @@ impl<D: Db> Service<D> {
}

async fn get_ohttp_keys(&self) -> Result<Response<Body>, HandlerError> {
let ohttp_keys = self
.ohttp
.config()
.encode()
.map_err(|e| HandlerError::InternalServerError(e.into()))?;
let keyset = self.ohttp.read().await;
let ohttp_keys =
keyset.encode_current().map_err(|e| HandlerError::InternalServerError(e.into()))?;
let mut res = Response::new(full(ohttp_keys));
res.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("application/ohttp-keys"));
if let Some(max_age) = self.ohttp_keys_max_age {
let remaining = max_age.saturating_sub(keyset.current_key_created_at().elapsed());
res.headers_mut().insert(
CACHE_CONTROL,
HeaderValue::from_str(&format!(
"public, s-maxage={}, immutable",
remaining.as_secs()
))
.expect("valid header value"),
);
}
Ok(res)
}

Expand Down Expand Up @@ -412,6 +528,60 @@ impl<D: Db> Service<D> {
}
}

// Grace period after rotation during which the old key is still
// accepted. Accounts for network latency and reasonable clock skew
// without letting two valid key IDs persist long enough to fingerprint
// clients by which key they present.
const ROTATION_OVERLAP: Duration = Duration::from_secs(30);

// Background task that rotates OHTTP keys on a fixed interval.
//
// Every interval the task generates a fresh key, persists it to
// keys_dir, and installs it as the current key. The previous key is
// accepted for ROTATION_OVERLAP after rotation, then retired so that
// only a single valid key exists most of the time.
pub fn spawn_key_rotation(
keyset: Arc<RwLock<KeyRotatingServer>>,
keys_dir: PathBuf,
interval: Duration,
) {
tokio::spawn(async move {
loop {
tokio::time::sleep(interval).await;

let new_key_id = keyset.read().await.next_key_id();

let config = match crate::key_config::gen_ohttp_server_config_with_id(new_key_id) {
Ok(c) => c,
Err(e) => {
tracing::error!("Failed to generate OHTTP key: {e}");
continue;
}
};
let _ = tokio::fs::remove_file(keys_dir.join(format!("{new_key_id}.ikm"))).await;
if let Err(e) = crate::key_config::persist_key_config(&config, &keys_dir) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IO, especially blocking should not be done while holding a write lock on the whole keyset, since that prevents requests from being handled

see also my suggestion to use a Arc<[Box<RwLock<Server>; 2]> inside of keyset as that eliminates this concern, i think it's perfectly fine to write the key while holding a lock there, but persist_key_config should be async

tracing::error!("Failed to persist OHTTP key: {e}");
continue;
}

let old_key_id = {
let mut ks = keyset.write().await;
let old = ks.current_key_id();
ks.rotate(config.into_server());
old
};
tracing::info!("Rotated OHTTP keys: key_id {old_key_id} -> {new_key_id}");

tokio::time::sleep(ROTATION_OVERLAP).await;

keyset.write().await.retire(old_key_id);
let old_path = keys_dir.join(format!("{old_key_id}.ikm"));
let _ = tokio::fs::remove_file(&old_path).await;
tracing::info!("Retired OHTTP key_id {old_key_id}");
}
});
}

fn handle_peek<E: SendableError>(
result: Result<Arc<Vec<u8>>, DbError<E>>,
timeout_response: Response<Body>,
Expand Down Expand Up @@ -485,8 +655,8 @@ impl HandlerError {
}
HandlerError::OhttpKeyRejection(e) => {
const OHTTP_KEY_REJECTION_RES_JSON: &str = r#"{"type":"https://iana.org/assignments/http-problem-types#ohttp-key", "title": "key identifier unknown"}"#;
warn!("Bad request: Key configuration rejected: {}", e);
*res.status_mut() = StatusCode::BAD_REQUEST;
warn!("Key configuration rejected: {}", e);
*res.status_mut() = StatusCode::UNPROCESSABLE_ENTITY;
res.headers_mut()
.insert(CONTENT_TYPE, HeaderValue::from_static("application/problem+json"));
*res.body_mut() = full(OHTTP_KEY_REJECTION_RES_JSON);
Expand Down Expand Up @@ -592,9 +762,9 @@ mod tests {
async fn test_service(v1: Option<V1>) -> Service<FilesDb> {
let dir = tempfile::tempdir().expect("tempdir");
let db = FilesDb::init(Duration::from_millis(100), dir.keep()).await.expect("db init");
let ohttp: ohttp::Server =
crate::key_config::gen_ohttp_server_config().expect("ohttp config").into();
Service::new(db, ohttp, SentinelTag::new([0u8; 32]), v1)
let config = crate::key_config::gen_ohttp_server_config().expect("ohttp config");
let keyset = Arc::new(RwLock::new(KeyRotatingServer::from_single(config.into_server(), 1)));
Service::new(db, keyset, None, SentinelTag::new([0u8; 32]), v1)
}

/// A valid ShortId encoded as bech32 for use in URL paths.
Expand Down Expand Up @@ -826,9 +996,9 @@ mod tests {
let dir = tempfile::tempdir().expect("tempdir");
let db = FilesDb::init(Duration::from_millis(100), dir.keep()).await.expect("db init");
let db = MetricsDb::new(db, metrics);
let ohttp: ohttp::Server =
crate::key_config::gen_ohttp_server_config().expect("ohttp config").into();
let svc = Service::new(db, ohttp, SentinelTag::new([0u8; 32]), None);
let config = crate::key_config::gen_ohttp_server_config().expect("ohttp config");
let keyset = Arc::new(RwLock::new(KeyRotatingServer::from_single(config.into_server(), 1)));
let svc = Service::new(db, keyset, None, SentinelTag::new([0u8; 32]), None);

let id = valid_short_id_path();
let res = svc
Expand Down
Loading
Loading