Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/lro/src/internal/aip151.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ where
async fn poll(&mut self) -> Option<PollingResult<(), M>> {
self.poller.poll().await.map(self::map_polling_result)
}
fn operation_name(&self) -> Option<&str> {
self.poller.operation_name()
}
async fn backoff(&mut self, state: &PollingState) {
self.poller.backoff(state).await
}
Expand Down Expand Up @@ -164,6 +167,9 @@ where
async fn poll(&mut self) -> Option<PollingResult<R, ()>> {
self.poller.poll().await.map(self::map_polling_metadata)
}
fn operation_name(&self) -> Option<&str> {
self.poller.operation_name()
}
async fn backoff(&mut self, state: &PollingState) {
self.poller.backoff(state).await
}
Expand Down Expand Up @@ -281,6 +287,9 @@ where
}
None
}
fn operation_name(&self) -> Option<&str> {
self.operation.as_deref()
}
async fn backoff(&mut self, state: &PollingState) {
let backoff = self.backoff_policy.wait_period(state);
tokio::time::sleep(backoff).await;
Expand Down Expand Up @@ -347,7 +356,9 @@ mod tests {
start,
query,
);
assert_eq!(poller.operation_name(), None);
let p0 = poller.poll().await;
assert_eq!(poller.operation_name(), Some("test-only-name"));
match p0.unwrap() {
PollingResult::InProgress(m) => {
assert_eq!(m, Some(Timestamp::clamp(123, 0)));
Expand Down
30 changes: 30 additions & 0 deletions src/lro/src/internal/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ where
}
None
}
fn operation_name(&self) -> Option<&str> {
self.operation.as_deref()
}
async fn backoff(&mut self, state: &PollingState) {
let backoff = self.backoff_policy.wait_period(state);
tokio::time::sleep(backoff).await;
Expand Down Expand Up @@ -232,6 +235,33 @@ mod tests {
);
}

#[tokio::test]
async fn poller_operation_name() {
let start = || async move {
let op = TestOperation {
name: Some("test-operation-abc".into()),
..TestOperation::default()
};
Ok(op)
};
let query = |_name| async move {
let op = TestOperation {
done: true,
..TestOperation::default()
};
Ok(op)
};
let mut poller = new_discovery_poller(
Arc::new(AlwaysContinue),
Arc::new(test_backoff()),
start,
query,
);
assert_eq!(poller.operation_name(), None);
let _ = poller.poll().await;
assert_eq!(poller.operation_name(), Some("test-operation-abc"));
}

#[tokio::test]
async fn poller_until_done_success_with_transient() {
let start = || async move {
Expand Down
8 changes: 8 additions & 0 deletions src/lro/src/internal/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ where
Self::Right(s) => s.poll().await,
}
}
fn operation_name(&self) -> Option<&str> {
match self {
Self::Left(s) => s.operation_name(),
Self::Right(s) => s.operation_name(),
}
}
async fn backoff(&mut self, state: &PollingState) {
match self {
Self::Left(s) => s.backoff(state).await,
Expand Down Expand Up @@ -79,6 +85,7 @@ mod tests {
impl sealed::Poller for PollerA {}
impl Poller<ResponseType, MetadataType> for PollerA {
async fn poll(&mut self) -> Option<PollingResult<ResponseType, MetadataType>>;
fn operation_name<'a>(&'a self) -> Option<&'a str>;
async fn backoff(&mut self, state: &PollingState);
async fn until_done(self) -> google_cloud_gax::Result<ResponseType>;
#[cfg(feature = "unstable-stream")]
Expand All @@ -92,6 +99,7 @@ mod tests {
impl sealed::Poller for PollerB {}
impl Poller<ResponseType, MetadataType> for PollerB {
async fn poll(&mut self) -> Option<PollingResult<ResponseType, MetadataType>>;
fn operation_name<'a>(&'a self) -> Option<&'a str>;
async fn backoff(&mut self, state: &PollingState);
async fn until_done(self) -> google_cloud_gax::Result<ResponseType>;
#[cfg(feature = "unstable-stream")]
Expand Down
1 change: 1 addition & 0 deletions src/lro/src/internal/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ mod tests {
impl sealed::Poller for PollerA {}
impl Poller<ResponseType, MetadataType> for PollerA {
async fn poll(&mut self) -> Option<PollingResult<ResponseType, MetadataType>>;
fn operation_name<'a>(&'a self) -> Option<&'a str>;
async fn backoff(&mut self, state: &PollingState);
async fn until_done(self) -> google_cloud_gax::Result<ResponseType>;
#[cfg(feature = "unstable-stream")]
Expand Down
3 changes: 3 additions & 0 deletions src/lro/src/internal/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ where
let span = info_span!("LRO Poll");
self.inner.poll().instrument(span).await
}
fn operation_name(&self) -> Option<&str> {
self.inner.operation_name()
}
async fn backoff(&mut self, state: &PollingState) {
let span = info_span!("LRO Sleep");
self.inner.backoff(state).instrument(span).await
Expand Down
3 changes: 3 additions & 0 deletions src/lro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ pub trait Poller<ResponseType, MetadataType>: Send + sealed::Poller {
&mut self,
) -> impl Future<Output = Option<PollingResult<ResponseType, MetadataType>>> + Send;

/// Returns the name of the operation, if known.
fn operation_name(&self) -> Option<&str>;
Comment on lines +152 to +153
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine. It is a new public API, but not a big deal. But I just realized that backoff() is also public, and I wonder if we should have not made it so...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found an alternative to avoid touching the public API: #5695.

Instead of adding operation_name() to query the inner poller, Tracing<P>::poll now enters the T2 span scope before delegating the poll call. The T2 span is now active in the thread context, and the pollers can record the operation name directly onto it using tracing::Span::current().record().

If you think this works, I can go ahead and close this PR.


/// Sleep until the backoff time has elapsed.
fn backoff(&mut self, state: &PollingState) -> impl Future<Output = ()> + Send;

Expand Down
Loading