Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rsworkspace/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rsworkspace/crates/acp-nats-agent/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl std::fmt::Display for DispatchError {
}
}

const DEFAULT_OPERATION_TIMEOUT: Duration = Duration::from_secs(30);
use crate::constants::DEFAULT_OPERATION_TIMEOUT;

pub struct AgentSideNatsConnection<N> {
nats: N,
Expand Down
3 changes: 3 additions & 0 deletions rsworkspace/crates/acp-nats-agent/src/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use std::time::Duration;

pub const DEFAULT_OPERATION_TIMEOUT: Duration = Duration::from_secs(30);
1 change: 1 addition & 0 deletions rsworkspace/crates/acp-nats-agent/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod connection;
pub mod constants;

pub use connection::{AgentSideNatsConnection, ConnectionError};
6 changes: 3 additions & 3 deletions rsworkspace/crates/acp-nats-ws/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use acp_nats::{AcpPrefix, AcpPrefixError, Config, NatsConfig};
use clap::Parser;
use std::net::{IpAddr, Ipv4Addr};
use std::net::IpAddr;
use trogon_std::env::ReadEnv;

const DEFAULT_HOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
const DEFAULT_PORT: u16 = 8080;
use crate::constants::{DEFAULT_HOST, DEFAULT_PORT};

#[derive(Parser, Debug)]
#[command(name = "acp-nats-ws")]
Expand Down Expand Up @@ -50,6 +49,7 @@ pub fn apply_timeout_overrides<E: ReadEnv>(mut ws: WsConfig, env_provider: &E) -
#[cfg(test)]
mod tests {
use super::*;
use std::net::Ipv4Addr;
use trogon_std::env::InMemoryEnv;

fn config_from_env(env: &InMemoryEnv) -> WsConfig {
Expand Down
2 changes: 1 addition & 1 deletion rsworkspace/crates/acp-nats-ws/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::sync::watch;
use tracing::{error, info, warn};
use trogon_std::time::SystemClock;

const DUPLEX_BUFFER_SIZE: usize = 64 * 1024;
use crate::constants::DUPLEX_BUFFER_SIZE;

/// Handles a single WebSocket connection by bridging it to NATS via ACP.
pub async fn handle<N>(
Expand Down
6 changes: 6 additions & 0 deletions rsworkspace/crates/acp-nats-ws/src/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use std::net::{IpAddr, Ipv4Addr};

pub const DEFAULT_HOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
pub const DEFAULT_PORT: u16 = 8080;
pub const DUPLEX_BUFFER_SIZE: usize = 64 * 1024;
pub const THREAD_NAME: &str = "acp-ws-local";
3 changes: 2 additions & 1 deletion rsworkspace/crates/acp-nats-ws/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod config;
mod connection;
mod constants;
mod upgrade;

use tokio::sync::{mpsc, watch};
Expand Down Expand Up @@ -82,7 +83,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
#[cfg(coverage)]
fn main() {}

const THREAD_NAME: &str = "acp-ws-local";
use constants::THREAD_NAME;

/// Runs a single-threaded tokio runtime with a
/// `LocalSet`. All WebSocket connections are processed here because the ACP
Expand Down
3 changes: 1 addition & 2 deletions rsworkspace/crates/acp-nats/src/acp_prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@

use std::sync::Arc;

use crate::constants::MAX_PREFIX_LENGTH;
use crate::nats::token;
use crate::subject_token_violation::SubjectTokenViolation;

const MAX_PREFIX_LENGTH: usize = 128;

/// Error returned when [`AcpPrefix`] validation fails.
#[derive(Debug, Clone, PartialEq)]
pub struct AcpPrefixError(pub SubjectTokenViolation);
Expand Down
9 changes: 1 addition & 8 deletions rsworkspace/crates/acp-nats/src/agent/bridge.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::cell::RefCell;
use std::time::Duration;

use crate::config::Config;
use crate::nats::{
Expand Down Expand Up @@ -30,13 +29,7 @@ use super::{
set_session_mode, set_session_model,
};

/// Delay before publishing `session.ready` to NATS.
///
/// The `Agent` trait returns the response value *before* the transport layer
/// serializes and writes it to the client. Without a delay the spawned task
/// could publish `session.ready` before the client has received the
/// `session/new` response, violating the ordering guarantee.
const SESSION_READY_DELAY: Duration = Duration::from_millis(100);
use crate::constants::SESSION_READY_DELAY;

pub struct Bridge<N, C: GetElapsed> {
pub(crate) nats: N,
Expand Down
4 changes: 1 addition & 3 deletions rsworkspace/crates/acp-nats/src/client/rpc_reply.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
pub use crate::constants::{CONTENT_TYPE_JSON, CONTENT_TYPE_PLAIN};
use crate::nats::{FlushClient, PublishClient, headers_with_trace_context};
use agent_client_protocol::{Error, ErrorCode, RequestId, Response};
use bytes::Bytes;
use tracing::warn;
use trogon_std::JsonSerialize;

pub const CONTENT_TYPE_JSON: &str = "application/json";
pub const CONTENT_TYPE_PLAIN: &str = "text/plain";

pub fn error_response_fallback_bytes<S: JsonSerialize>(serializer: &S) -> (Bytes, &'static str) {
match serializer.to_vec(&Response::<()>::Error {
id: RequestId::Null,
Expand Down
19 changes: 6 additions & 13 deletions rsworkspace/crates/acp-nats/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,13 @@ use trogon_nats::NatsConfig;
use trogon_std::env::ReadEnv;

use crate::acp_prefix::AcpPrefix;
use crate::constants::{
DEFAULT_CONNECT_TIMEOUT_SECS, DEFAULT_MAX_CONCURRENT_CLIENT_TASKS, DEFAULT_OPERATION_TIMEOUT,
DEFAULT_PROMPT_TIMEOUT, ENV_CONNECT_TIMEOUT_SECS, ENV_OPERATION_TIMEOUT_SECS,
ENV_PROMPT_TIMEOUT_SECS, MIN_TIMEOUT_SECS,
};

const DEFAULT_OPERATION_TIMEOUT: Duration = Duration::from_secs(30);
const DEFAULT_PROMPT_TIMEOUT: Duration = Duration::from_secs(7200);
const DEFAULT_MAX_CONCURRENT_CLIENT_TASKS: usize = 256;

const MIN_TIMEOUT_SECS: u64 = 1;
const DEFAULT_CONNECT_TIMEOUT_SECS: u64 = 10;

pub const ENV_ACP_PREFIX: &str = "ACP_PREFIX";
pub const DEFAULT_ACP_PREFIX: &str = "acp";

const ENV_OPERATION_TIMEOUT_SECS: &str = "ACP_OPERATION_TIMEOUT_SECS";
const ENV_PROMPT_TIMEOUT_SECS: &str = "ACP_PROMPT_TIMEOUT_SECS";
const ENV_CONNECT_TIMEOUT_SECS: &str = "ACP_NATS_CONNECT_TIMEOUT_SECS";
pub use crate::constants::{DEFAULT_ACP_PREFIX, ENV_ACP_PREFIX};

#[derive(Clone)]
pub struct Config {
Expand Down
30 changes: 30 additions & 0 deletions rsworkspace/crates/acp-nats/src/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::time::Duration;

pub const ENV_ACP_PREFIX: &str = "ACP_PREFIX";
pub const DEFAULT_ACP_PREFIX: &str = "acp";

pub const ENV_OPERATION_TIMEOUT_SECS: &str = "ACP_OPERATION_TIMEOUT_SECS";
pub const ENV_PROMPT_TIMEOUT_SECS: &str = "ACP_PROMPT_TIMEOUT_SECS";
pub const ENV_CONNECT_TIMEOUT_SECS: &str = "ACP_NATS_CONNECT_TIMEOUT_SECS";

pub const DEFAULT_OPERATION_TIMEOUT: Duration = Duration::from_secs(30);
pub const DEFAULT_PROMPT_TIMEOUT: Duration = Duration::from_secs(7200);
pub const DEFAULT_MAX_CONCURRENT_CLIENT_TASKS: usize = 256;
pub const DEFAULT_CONNECT_TIMEOUT_SECS: u64 = 10;
pub const MIN_TIMEOUT_SECS: u64 = 1;

pub const SESSION_READY_DELAY: Duration = Duration::from_millis(100);
pub const PROMPT_TIMEOUT_WARNING_SUPPRESSION_WINDOW: Duration = Duration::from_secs(5);

pub const MAX_PREFIX_LENGTH: usize = 128;
pub const MAX_SESSION_ID_LENGTH: usize = 128;
pub const MAX_METHOD_NAME_LENGTH: usize = 128;

pub const AGENT_UNAVAILABLE: i32 = -32001;

pub const AGENT_MARKER: &str = ".agent.";
pub const AGENT_EXT_PREFIX: &str = "agent.ext.";
pub const EXT_SUBJECT_PREFIX: &str = "client.ext.";

pub const CONTENT_TYPE_JSON: &str = "application/json";
pub const CONTENT_TYPE_PLAIN: &str = "text/plain";
4 changes: 1 addition & 3 deletions rsworkspace/crates/acp-nats/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use agent_client_protocol::{Error, ErrorCode};
use tracing::warn;
use trogon_nats::NatsError;

/// JSON-RPC reserved range -32001: agent unavailable / overloaded (retryable).
/// Returned for timeout and request failures; clients should retry with backoff.
pub const AGENT_UNAVAILABLE: i32 = -32001;
pub use crate::constants::AGENT_UNAVAILABLE;

pub fn map_nats_error(e: NatsError) -> Error {
match &e {
Expand Down
3 changes: 1 addition & 2 deletions rsworkspace/crates/acp-nats/src/ext_method_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@

use std::sync::Arc;

use crate::constants::MAX_METHOD_NAME_LENGTH;
use crate::nats::token;
use crate::subject_token_violation::SubjectTokenViolation;

const MAX_METHOD_NAME_LENGTH: usize = 128;

/// Error returned when [`ExtMethodName`] validation fails.
#[derive(Debug, Clone, PartialEq)]
pub struct ExtMethodNameError(pub SubjectTokenViolation);
Expand Down
1 change: 1 addition & 0 deletions rsworkspace/crates/acp-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod agent;
pub mod client;
pub mod client_proxy;
pub mod config;
pub mod constants;
pub mod error;
pub mod ext_method_name;
pub(crate) mod in_flight_slot_guard;
Expand Down
10 changes: 1 addition & 9 deletions rsworkspace/crates/acp-nats/src/nats/parsing.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::constants::{AGENT_EXT_PREFIX, AGENT_MARKER, EXT_SUBJECT_PREFIX};
use crate::ext_method_name::ExtMethodName;
use crate::session_id::AcpSessionId;

const AGENT_MARKER: &str = ".agent.";
const AGENT_EXT_PREFIX: &str = "agent.ext.";

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AgentMethod {
Initialize,
Expand Down Expand Up @@ -86,12 +84,6 @@ pub fn parse_agent_subject(subject: &str) -> Option<ParsedAgentSubject> {
None
}

/// NATS subject prefix for generic extension methods.
/// `client.ext.{name}` — the `ext` token makes extensions explicit in subjects.
/// `ExtSessionPromptResponse` is matched first as a specific ext, so it won't
/// collide with this catch-all.
const EXT_SUBJECT_PREFIX: &str = "client.ext.";

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ClientMethod {
FsReadTextFile,
Expand Down
3 changes: 1 addition & 2 deletions rsworkspace/crates/acp-nats/src/pending_prompt_waiters.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::Duration;

use agent_client_protocol::{PromptResponse, SessionId};
use tokio::sync::oneshot;
use trogon_std::time::GetElapsed;

const PROMPT_TIMEOUT_WARNING_SUPPRESSION_WINDOW: Duration = Duration::from_secs(5);
use crate::constants::PROMPT_TIMEOUT_WARNING_SUPPRESSION_WINDOW;

#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
pub(crate) struct PromptToken(pub u64);
Expand Down
3 changes: 1 addition & 2 deletions rsworkspace/crates/acp-nats/src/session_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
//! TODO: Consider extracting to `trogon-nats` as a generic `NatsSubject` (or `NatsToken`) type
//! so prefix, session_id, and other subject tokens share the same validation.

use crate::constants::MAX_SESSION_ID_LENGTH;
use crate::subject_token_violation::SubjectTokenViolation;

const MAX_SESSION_ID_LENGTH: usize = 128;

/// Error returned when [`AcpSessionId`] validation fails.
#[derive(Debug, Clone, PartialEq)]
pub struct SessionIdError(pub SubjectTokenViolation);
Expand Down
3 changes: 3 additions & 0 deletions rsworkspace/crates/acp-telemetry/src/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use std::time::Duration;

pub const METRIC_EXPORT_INTERVAL: Duration = Duration::from_secs(30);
1 change: 1 addition & 0 deletions rsworkspace/crates/acp-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod constants;
mod log;
mod metric;
mod service_name;
Expand Down
5 changes: 1 addition & 4 deletions rsworkspace/crates/acp-telemetry/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ use opentelemetry_otlp::MetricExporter;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use std::sync::OnceLock;
use std::time::Duration;

/// OTLP periodic reader pushes accumulated metrics at this cadence.
/// 30 s keeps export volume low while still surfacing near-real-time data.
const METRIC_EXPORT_INTERVAL: Duration = Duration::from_secs(30);
use crate::constants::METRIC_EXPORT_INTERVAL;

pub(crate) static METER_PROVIDER: OnceLock<SdkMeterProvider> = OnceLock::new();

Expand Down
12 changes: 4 additions & 8 deletions rsworkspace/crates/trogon-nats/src/auth.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use std::path::PathBuf;
use trogon_std::env::ReadEnv;

const ENV_NATS_URL: &str = "NATS_URL";
const ENV_NATS_CREDS: &str = "NATS_CREDS";
const ENV_NATS_NKEY: &str = "NATS_NKEY";
const ENV_NATS_USER: &str = "NATS_USER";
const ENV_NATS_PASSWORD: &str = "NATS_PASSWORD";
const ENV_NATS_TOKEN: &str = "NATS_TOKEN";

const DEFAULT_NATS_URL: &str = "localhost:4222";
use crate::constants::{
DEFAULT_NATS_URL, ENV_NATS_CREDS, ENV_NATS_NKEY, ENV_NATS_PASSWORD, ENV_NATS_TOKEN,
ENV_NATS_URL, ENV_NATS_USER,
};

/// NATS authentication method.
///
Expand Down
3 changes: 1 addition & 2 deletions rsworkspace/crates/trogon-nats/src/connect.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::auth::{NatsAuth, NatsConfig};
use crate::constants::MAX_RECONNECT_DELAY;
use async_nats::{Client, ConnectOptions, Event};
use std::time::Duration;
use tracing::{info, instrument, warn};
Expand Down Expand Up @@ -38,8 +39,6 @@ impl std::error::Error for ConnectError {
}
}

const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);

fn reconnect_delay(attempts: usize) -> Duration {
let delay = Duration::from_secs(std::cmp::min(
MAX_RECONNECT_DELAY.as_secs(),
Expand Down
14 changes: 14 additions & 0 deletions rsworkspace/crates/trogon-nats/src/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use std::time::Duration;

pub const ENV_NATS_URL: &str = "NATS_URL";
pub const ENV_NATS_CREDS: &str = "NATS_CREDS";
pub const ENV_NATS_NKEY: &str = "NATS_NKEY";
pub const ENV_NATS_USER: &str = "NATS_USER";
pub const ENV_NATS_PASSWORD: &str = "NATS_PASSWORD";
pub const ENV_NATS_TOKEN: &str = "NATS_TOKEN";
pub const DEFAULT_NATS_URL: &str = "localhost:4222";

pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
pub const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);

pub const REQ_ID_HEADER: &str = "X-Req-Id";
6 changes: 4 additions & 2 deletions rsworkspace/crates/trogon-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
pub mod auth;
pub mod client;
pub mod connect;
pub mod constants;
pub mod messaging;

#[cfg(feature = "test-support")]
Expand All @@ -48,10 +49,11 @@ pub use async_nats::subject::ToSubject;
pub use auth::{NatsAuth, NatsConfig};
pub use client::{FlushClient, PublishClient, RequestClient, SubscribeClient};
pub use connect::{ConnectError, connect};
pub use constants::REQ_ID_HEADER;
pub use messaging::{
FlushPolicy, NatsError, PublishOperationError, PublishOptions, PublishOptionsBuilder,
REQ_ID_HEADER, RetryPolicy, build_request_headers, headers_with_trace_context,
inject_trace_context, publish, request, request_with_timeout,
RetryPolicy, build_request_headers, headers_with_trace_context, inject_trace_context, publish,
request, request_with_timeout,
};

#[cfg(feature = "test-support")]
Expand Down
4 changes: 1 addition & 3 deletions rsworkspace/crates/trogon-nats/src/messaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::Duration;
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
use crate::constants::{DEFAULT_TIMEOUT, REQ_ID_HEADER};

struct HeaderMapCarrier<'a>(&'a mut HeaderMap);

Expand All @@ -23,8 +23,6 @@ pub fn inject_trace_context(headers: &mut HeaderMap) {
});
}

pub const REQ_ID_HEADER: &str = "X-Req-Id";

pub fn headers_with_trace_context() -> HeaderMap {
let mut headers = HeaderMap::new();
inject_trace_context(&mut headers);
Expand Down
Loading