Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 35 additions & 98 deletions rsworkspace/crates/acp-nats/src/agent/authenticate.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,10 @@
use super::Bridge;
use crate::error::AGENT_UNAVAILABLE;
use crate::error::map_nats_error;
use crate::nats::{self, RequestClient, agent};
use agent_client_protocol::{AuthenticateRequest, AuthenticateResponse, Error, ErrorCode, Result};
use tracing::{info, instrument, warn};
use trogon_nats::NatsError;
use agent_client_protocol::{AuthenticateRequest, AuthenticateResponse, Result};
use tracing::{info, instrument};
use trogon_std::time::GetElapsed;

fn map_authenticate_error(e: NatsError) -> Error {
match &e {
NatsError::Timeout { subject } => {
warn!(subject = %subject, "authenticate request timed out");
Error::new(
ErrorCode::Other(AGENT_UNAVAILABLE).into(),
"Authenticate request timed out; agent may be overloaded or unavailable",
)
}
NatsError::Request { subject, error } => {
warn!(subject = %subject, error = %error, "authenticate NATS request failed");
Error::new(
ErrorCode::Other(AGENT_UNAVAILABLE).into(),
"Agent unavailable",
)
}
NatsError::Serialize(inner) => {
warn!(error = %inner, "failed to serialize authenticate request");
Error::new(
ErrorCode::InternalError.into(),
format!("Failed to serialize authenticate request: {}", inner),
)
}
NatsError::Deserialize(inner) => {
warn!(error = %inner, "failed to deserialize authenticate response");
Error::new(
ErrorCode::InternalError.into(),
"Invalid response from agent",
)
}
_ => {
warn!(error = %e, "authenticate NATS request failed");
Error::new(
ErrorCode::InternalError.into(),
"Authenticate request failed",
)
}
}
}

#[instrument(
name = "acp.authenticate",
skip(bridge, args),
Expand All @@ -67,7 +26,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed>(
bridge.config.operation_timeout,
)
.await
.map_err(map_authenticate_error);
.map_err(map_nats_error);

bridge.metrics.record_request(
"authenticate",
Expand All @@ -80,7 +39,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed>(

#[cfg(test)]
mod tests {
use super::{Bridge, map_authenticate_error};
use super::Bridge;
use crate::config::Config;
use crate::error::AGENT_UNAVAILABLE;
use agent_client_protocol::{Agent, AuthenticateRequest, AuthenticateResponse, ErrorCode};
Expand All @@ -91,13 +50,13 @@ mod tests {
PeriodicReader, SdkMeterProvider, in_memory_exporter::InMemoryMetricExporter,
};
use std::time::Duration;
use trogon_nats::{AdvancedMockNatsClient, NatsError};
use trogon_nats::AdvancedMockNatsClient;

fn assert_authenticate_metric_recorded(
fn has_authenticate_metric(
finished_metrics: &[opentelemetry_sdk::metrics::data::ResourceMetrics],
expected_success: bool,
) {
let found = finished_metrics
) -> bool {
finished_metrics
.iter()
.flat_map(|rm| rm.scope_metrics())
.any(|sm| {
Expand All @@ -123,9 +82,15 @@ mod tests {
method_ok && success_ok
})
})
});
})
}

fn assert_authenticate_metric_recorded(
finished_metrics: &[opentelemetry_sdk::metrics::data::ResourceMetrics],
expected_success: bool,
) {
assert!(
found,
has_authenticate_metric(finished_metrics, expected_success),
"expected acp.requests datapoint with method=authenticate, success={}",
expected_success
);
Expand Down Expand Up @@ -179,13 +144,6 @@ mod tests {
mock.set_response(subject, bytes.into());
}

struct FailsSerialize;
impl serde::Serialize for FailsSerialize {
fn serialize<S: serde::Serializer>(&self, _s: S) -> Result<S::Ok, S::Error> {
Err(serde::ser::Error::custom("test serialize failure"))
}
}

#[tokio::test]
async fn authenticate_forwards_request_and_returns_response() {
let (mock, bridge) = mock_bridge();
Expand Down Expand Up @@ -252,44 +210,23 @@ mod tests {
}

#[test]
fn map_authenticate_error_timeout() {
let err = map_authenticate_error(NatsError::Timeout {
subject: "acp.agent.authenticate".into(),
});
assert!(err.to_string().contains("timed out"));
assert_eq!(err.code, ErrorCode::Other(AGENT_UNAVAILABLE));
}

#[test]
fn map_authenticate_error_request() {
let err = map_authenticate_error(NatsError::Request {
subject: "acp.agent.authenticate".into(),
error: "connection refused".into(),
});
assert!(err.to_string().contains("Agent unavailable"));
assert_eq!(err.code, ErrorCode::Other(AGENT_UNAVAILABLE));
}

#[test]
fn map_authenticate_error_serialize() {
let serde_err = serde_json::to_vec(&FailsSerialize).unwrap_err();
let err = map_authenticate_error(NatsError::Serialize(serde_err));
assert!(err.to_string().contains("serialize"));
assert_eq!(err.code, ErrorCode::InternalError);
}

#[test]
fn map_authenticate_error_deserialize() {
let serde_err = serde_json::from_str::<AuthenticateResponse>("[]").unwrap_err();
let err = map_authenticate_error(NatsError::Deserialize(serde_err));
assert!(err.to_string().contains("Invalid response from agent"));
assert_eq!(err.code, ErrorCode::InternalError);
}

#[test]
fn map_authenticate_error_other() {
let err = map_authenticate_error(NatsError::Other("misc failure".into()));
assert!(err.to_string().contains("Authenticate request failed"));
assert_eq!(err.code, ErrorCode::InternalError);
fn has_authenticate_metric_returns_false_when_metric_is_histogram() {
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone())
.with_interval(Duration::from_millis(100))
.build();
let provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = provider.meter("test");
let counter = meter.u64_counter("acp.other_metric").build();
counter.add(1, &[]);
let histogram = meter
.f64_histogram("acp.requests")
.with_description("test")
.build();
histogram.record(1.0, &[]);
provider.force_flush().unwrap();
let finished_metrics = exporter.get_finished_metrics().unwrap();
assert!(!has_authenticate_metric(&finished_metrics, true));
provider.shutdown().unwrap();
}
}
98 changes: 4 additions & 94 deletions rsworkspace/crates/acp-nats/src/agent/ext_method.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,11 @@
use super::Bridge;
use crate::error::AGENT_UNAVAILABLE;
use crate::error::map_nats_error;
use crate::ext_method_name::ExtMethodName;
use crate::nats::{self, RequestClient, agent};
use agent_client_protocol::{Error, ErrorCode, ExtRequest, ExtResponse, Result};
use tracing::{info, instrument, warn};
use trogon_nats::NatsError;
use tracing::{info, instrument};
use trogon_std::time::GetElapsed;

fn map_ext_method_error(e: NatsError) -> Error {
match &e {
NatsError::Timeout { subject } => {
warn!(subject = %subject, "ext_method request timed out");
Error::new(
ErrorCode::Other(AGENT_UNAVAILABLE).into(),
"Extension method request timed out; agent may be overloaded or unavailable",
)
}
NatsError::Request { subject, error } => {
warn!(subject = %subject, error = %error, "ext_method NATS request failed");
Error::new(
ErrorCode::Other(AGENT_UNAVAILABLE).into(),
format!("Agent unavailable: {}", error),
)
}
NatsError::Serialize(inner) => {
warn!(error = %inner, "failed to serialize ext_method request");
Error::new(
ErrorCode::InternalError.into(),
format!("Failed to serialize ext_method request: {}", inner),
)
}
NatsError::Deserialize(inner) => {
warn!(error = %inner, "failed to deserialize ext_method response");
Error::new(
ErrorCode::InternalError.into(),
"Invalid response from agent",
)
}
_ => {
warn!(error = %e, "ext_method NATS request failed");
Error::new(
ErrorCode::InternalError.into(),
"Extension method request failed",
)
}
}
}

#[instrument(
name = "acp.ext",
skip(bridge, args),
Expand Down Expand Up @@ -85,7 +44,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed>(
bridge.config.operation_timeout(),
)
.await
.map_err(map_ext_method_error);
.map_err(map_nats_error);

bridge.metrics.record_request(
"ext_method",
Expand All @@ -109,7 +68,7 @@ mod tests {
};
use serde_json::value::RawValue;
use std::time::Duration;
use trogon_nats::{AdvancedMockNatsClient, NatsError};
use trogon_nats::AdvancedMockNatsClient;

fn mock_bridge() -> (
AdvancedMockNatsClient,
Expand Down Expand Up @@ -381,53 +340,4 @@ mod tests {
));
provider.shutdown().unwrap();
}

#[test]
fn map_error_timeout() {
let err = map_ext_method_error(NatsError::Timeout {
subject: "acp.agent.ext.my_method".into(),
});
assert!(err.to_string().contains("timed out"));
assert_eq!(err.code, ErrorCode::Other(crate::error::AGENT_UNAVAILABLE));
}

#[test]
fn map_error_request() {
let err = map_ext_method_error(NatsError::Request {
subject: "acp.agent.ext.my_method".into(),
error: "connection refused".into(),
});
assert!(err.to_string().contains("Agent unavailable"));
assert_eq!(err.code, ErrorCode::Other(crate::error::AGENT_UNAVAILABLE));
}

#[test]
fn map_error_serialize() {
let serde_err = serde_json::to_vec(&FailsSerialize).unwrap_err();
let err = map_ext_method_error(NatsError::Serialize(serde_err));
assert!(err.to_string().contains("serialize"));
assert_eq!(err.code, ErrorCode::InternalError);
}

#[test]
fn map_error_deserialize() {
let serde_err = serde_json::from_str::<ExtResponse>("invalid").unwrap_err();
let err = map_ext_method_error(NatsError::Deserialize(serde_err));
assert!(err.to_string().contains("Invalid response from agent"));
assert_eq!(err.code, ErrorCode::InternalError);
}

#[test]
fn map_error_other() {
let err = map_ext_method_error(NatsError::Other("misc failure".into()));
assert!(err.to_string().contains("Extension method request failed"));
assert_eq!(err.code, ErrorCode::InternalError);
}

struct FailsSerialize;
impl serde::Serialize for FailsSerialize {
fn serialize<S: serde::Serializer>(&self, _s: S) -> std::result::Result<S::Ok, S::Error> {
Err(serde::ser::Error::custom("test serialize failure"))
}
}
}
Loading
Loading