Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/gax-internal/src/observability/client_signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ macro_rules! client_request_signals {
"gcp.client.version" = $info.client_version,
"gcp.schema.url" = SCHEMA_URL_VALUE,
"otel.status_code" = "UNSET",
// Fields to be recorded later
"rpc.method" = Empty,
"otel.status_description" = Empty,
"error.type" = Empty,
Expand All @@ -117,6 +116,9 @@ macro_rules! client_request_signals {
"http.request.resend_count" = Empty,
"http.response.status_code" = Empty,
"gcp.resource.destination.id" = Empty,
"gcp.longrunning.poll_attempt_count" = Empty,
"gcp.longrunning.done" = Empty,
"gcp.longrunning.status_code" = Empty,
)
}};
}
Expand Down
4 changes: 4 additions & 0 deletions src/lro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ google-cloud-wkt = { workspace = true }
[target.'cfg(google_cloud_unstable_tracing)'.dependencies]
tracing = { workspace = true }

[target.'cfg(google_cloud_unstable_tracing)'.dev-dependencies]
tracing-subscriber = { workspace = true, features = ["registry"] }
google-cloud-test-utils = { workspace = true }

[features]
unstable-stream = ["dep:futures", "dep:pin-project"]
# DO NOT USE: this allows us to detect semver changes in types used in the
Expand Down
2 changes: 1 addition & 1 deletion src/lro/src/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<R, M> Operation<R, M> {
}
}

fn name(&self) -> String {
pub(crate) fn name(&self) -> String {
self.inner.name.clone()
}
fn done(&self) -> bool {
Expand Down
4 changes: 4 additions & 0 deletions src/lro/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,8 @@ pub use either::Either;
#[cfg(google_cloud_unstable_tracing)]
pub use ext::{PollerExt, PollerOptions, TracingDetails};
#[cfg(google_cloud_unstable_tracing)]
pub(crate) use tracing::LRO_RECORDER;
#[cfg(all(google_cloud_unstable_tracing, test))]
pub(crate) use tracing::LroRecorder;
#[cfg(google_cloud_unstable_tracing)]
pub use tracing::Tracing;
194 changes: 190 additions & 4 deletions src/lro/src/internal/aip151.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ where
async fn poll(&mut self) -> Option<PollingResult<ResponseType, MetadataType>> {
if let Some(start) = self.start.take() {
let result = start().await;
#[cfg(google_cloud_unstable_tracing)]
if let Ok(ref op) = result {
let name = op.name();
let _ = crate::internal::LRO_RECORDER.try_with(|r| r.record_destination_id(&name));
}
let (op, poll) = crate::details::handle_start(result);
self.operation = op;
return Some(poll);
Expand All @@ -287,6 +292,11 @@ where
let result = (self.query)(name.clone()).await;
let (op, poll) =
crate::details::handle_poll(self.error_policy.clone(), &self.state, name, result);
#[cfg(google_cloud_unstable_tracing)]
if let Some(ref next_name) = op {
let _ =
crate::internal::LRO_RECORDER.try_with(|r| r.record_destination_id(next_name));
}
self.operation = op;
return Some(poll);
}
Expand Down Expand Up @@ -328,6 +338,35 @@ mod tests {
use google_cloud_wkt::{Any, Duration, Timestamp};
use std::time::Duration as StdDuration;

#[cfg(not(google_cloud_unstable_tracing))]
pub(crate) struct DummySpan;

#[cfg(not(google_cloud_unstable_tracing))]
fn test_span() -> DummySpan {
DummySpan
}

#[cfg(not(google_cloud_unstable_tracing))]
pub(crate) trait Instrument: Sized {
fn instrument(self, _span: DummySpan) -> Self {
self
}
}

#[cfg(not(google_cloud_unstable_tracing))]
impl<T> Instrument for T {}

#[cfg(google_cloud_unstable_tracing)]
use tracing::Instrument;

#[cfg(google_cloud_unstable_tracing)]
fn test_span() -> tracing::Span {
tracing::info_span!(
"test_span",
gcp.resource.destination.id = tracing::field::Empty,
)
}

type ResponseType = Duration;
type MetadataType = Timestamp;
type TestOperation = Operation<ResponseType, MetadataType>;
Expand Down Expand Up @@ -362,7 +401,7 @@ mod tests {
start,
query,
);
let p0 = poller.poll().await;
let p0 = poller.poll().instrument(test_span()).await;
match p0.unwrap() {
PollingResult::InProgress(m) => {
assert_eq!(m, Some(Timestamp::clamp(123, 0)));
Expand All @@ -372,7 +411,7 @@ mod tests {
}
}

let p1 = poller.poll().await;
let p1 = poller.poll().instrument(test_span()).await;
match p1.unwrap() {
PollingResult::Completed(r) => {
let response = r.unwrap();
Expand All @@ -383,7 +422,7 @@ mod tests {
}
}

let p2 = poller.poll().await;
let p2 = poller.poll().instrument(test_span()).await;
assert!(p2.is_none(), "{p2:?}");
}

Expand Down Expand Up @@ -474,7 +513,7 @@ mod tests {
start,
query,
);
let response = poller.until_done().await?;
let response = poller.until_done().instrument(test_span()).await?;
assert_eq!(response, Duration::clamp(234, 0));

Ok(())
Expand Down Expand Up @@ -1019,4 +1058,151 @@ mod tests {
fn polling_error() -> Error {
Error::io("something failed")
}

#[cfg(google_cloud_unstable_tracing)]
#[tokio::test]
async fn test_poller_tracing() {
let guard = google_cloud_test_utils::test_layer::TestLayer::initialize();

let start = || async move {
let any = Any::from_msg(&Timestamp::clamp(123, 0))
.expect("test message deserializes via Any::from_msg");
let op = OperationAny::default()
.set_name("test-operation-123")
.set_metadata(any);
let op = TestOperation::new(op);
Ok::<TestOperation, Error>(op)
};

let count = Arc::new(std::sync::Mutex::new(0));
let query_count = count.clone();
let query = move |_: String| {
let mut c = query_count.lock().unwrap();
*c += 1;
let is_done = *c > 1;
async move {
if is_done {
let any = Any::from_msg(&Duration::clamp(234, 0))
.expect("test message deserializes via Any::from_msg");
let result = ResultAny::Response(any.into());
let op = OperationAny::default().set_done(true).set_result(result);
let op = TestOperation::new(op);
Ok::<TestOperation, Error>(op)
} else {
let any = Any::from_msg(&Timestamp::clamp(123, 0))
.expect("test message deserializes via Any::from_msg");
let op = OperationAny::default()
.set_name("test-operation-123")
.set_metadata(any);
let op = TestOperation::new(op);
Ok::<TestOperation, Error>(op)
}
}
};

let mut poller = PollerImpl::new(
Arc::new(AlwaysContinue),
Arc::new(ExponentialBackoff::default()),
start,
query,
);

let span = test_span();
let poller_ref = &mut poller;
let recorder = crate::internal::LroRecorder::new(span.clone());
let _ = crate::internal::LRO_RECORDER
.scope(
recorder,
async move { poller_ref.poll().instrument(span).await },
)
.await;

{
let captured = google_cloud_test_utils::test_layer::TestLayer::capture(&guard);
let got = captured
.iter()
.find(|s| s.name == "test_span")
.unwrap_or_else(|| panic!("missing `test_span` in captured spans: {captured:?}"));
assert_eq!(
got.attributes
.get("gcp.resource.destination.id")
.and_then(|v| v.as_string()),
Some("test-operation-123".to_string())
);
}

let span = test_span();
let poller_ref2 = &mut poller;
let recorder2 = crate::internal::LroRecorder::new(span.clone());
let _ = crate::internal::LRO_RECORDER
.scope(recorder2, async move {
poller_ref2.poll().instrument(span).await
})
.await;

{
let captured = google_cloud_test_utils::test_layer::TestLayer::capture(&guard);
let got = captured
.iter()
.find(|s| s.name == "test_span")
.unwrap_or_else(|| panic!("missing `test_span` in captured spans: {captured:?}"));
assert_eq!(
got.attributes
.get("gcp.resource.destination.id")
.and_then(|v| v.as_string()),
Some("test-operation-123".to_string())
);
}
}

#[cfg(google_cloud_unstable_tracing)]
#[tokio::test]
async fn test_poller_tracing_immediate_done() {
let guard = google_cloud_test_utils::test_layer::TestLayer::initialize();

let start = || async move {
let any = Any::from_msg(&Duration::clamp(234, 0))
.expect("test message deserializes via Any::from_msg");
let result = ResultAny::Response(any.into());
let op = OperationAny::default()
.set_name("immediate-operation-123")
.set_done(true)
.set_result(result);
let op = TestOperation::new(op);
Ok::<TestOperation, Error>(op)
};

let query = |_: String| async move { panic!("should not query") };

let mut poller = PollerImpl::new(
Arc::new(AlwaysContinue),
Arc::new(ExponentialBackoff::default()),
start,
query,
);

let span = test_span();
let poller_ref = &mut poller;
let recorder = crate::internal::LroRecorder::new(span.clone());
let _ = crate::internal::LRO_RECORDER
.scope(
recorder,
async move { poller_ref.poll().instrument(span).await },
)
.await;

{
let captured = google_cloud_test_utils::test_layer::TestLayer::capture(&guard);
let got = captured
.iter()
.find(|s| s.name == "test_span")
.unwrap_or_else(|| panic!("missing `test_span` in captured spans: {captured:?}"));
assert_eq!(
got.attributes
.get("gcp.resource.destination.id")
.and_then(|v| v.as_string()),
Some("immediate-operation-123".to_string())
);
}
}
}
Loading
Loading