Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions objectstore-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
tonic = { version = "0.13.1", default-features = false }
tracing = { workspace = true }
uuid = { workspace = true, features = ["v7"] }

Expand Down
84 changes: 62 additions & 22 deletions objectstore-service/src/backend/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use std::fmt;
use std::time::{Duration, SystemTime};

use anyhow::Result;
use bigtable_rs::bigtable::BigTableConnection;
use bigtable_rs::bigtable::{BigTableConnection, Error as BigTableError};
use bigtable_rs::google::bigtable::v2::{self, mutation};
use futures_util::{StreamExt, TryStreamExt, stream};
use objectstore_types::{ExpirationPolicy, Metadata};
use tokio::runtime::Handle;
use tonic::Code;

use crate::PayloadStream;
use crate::backend::common::Backend;
Expand Down Expand Up @@ -51,6 +52,36 @@ impl fmt::Debug for BigTableBackend {
}
}

fn is_retryable(error: &BigTableError) -> bool {
match error {
// Transient errors on auth token refresh
BigTableError::GCPAuthError(_) => true,
// Transient GRPC network failures
BigTableError::TransportError(_) => true,
// These could also indicate transient network failures
BigTableError::IoError(_) => true,
BigTableError::TimeoutError(_) => true,

// See https://docs.cloud.google.com/bigtable/docs/status-codes
BigTableError::RpcError(status) => match status.code() {
// Generic retriable status
Code::Unavailable => true,
// Timeouts
Code::Cancelled => true,
Code::DeadlineExceeded => true,
// Token might have refreshed too late
Code::Unauthenticated => true,
// Unspecified, attempt to retry anyways
Code::Aborted => true,
Code::Internal => true,
Code::FailedPrecondition => true,
Code::Unknown => true,
_ => false,
},
_ => false,
}
}

impl BigTableBackend {
pub async fn new(
endpoint: Option<&str>,
Expand Down Expand Up @@ -121,19 +152,24 @@ impl BigTableBackend {
.mutate_row(request.clone())
.await
.map(|res| res.into_inner());
// TODO: Stop retrying if the object doesn't exist
if response.is_ok() || retry_count >= REQUEST_RETRY_COUNT {
if response.is_err() {
merni::counter!("bigtable.mutate_failures": 1, "action" => action);

match response {
Ok(res) => return Ok(res),
Err(e) => {
if retry_count >= REQUEST_RETRY_COUNT || !is_retryable(&e) {
merni::counter!("bigtable.mutate_failures": 1, "action" => action);
return Err(ServiceError::Generic {
context: format!(
"Bigtable: failed mutating row performing a `{action}`"
),
cause: Some(Box::new(e)),
});
}
retry_count += 1;
merni::counter!("bigtable.mutate_retry": 1, "action" => action);
tracing::debug!(retry_count = retry_count, action, "Retrying mutate");
}
return response.map_err(|e| ServiceError::Generic {
context: format!("Bigtable: failed mutating row performing a `{action}`"),
cause: Some(Box::new(e)),
});
}
retry_count += 1;
merni::counter!("bigtable.mutate_retry": 1, "action" => action);
tracing::debug!(retry_count = retry_count, action, "Retrying mutate");
}
}

Expand Down Expand Up @@ -221,18 +257,22 @@ impl Backend for BigTableBackend {
let mut retry_count = 0;
let response = loop {
let response = client.read_rows(request.clone()).await;
if response.is_ok() || retry_count >= REQUEST_RETRY_COUNT {
if response.is_err() {
merni::counter!("bigtable.read_failures": 1);

match response {
Ok(res) => break res,
Err(e) => {
if retry_count >= REQUEST_RETRY_COUNT || !is_retryable(&e) {
merni::counter!("bigtable.read_failures": 1);
return Err(ServiceError::Generic {
context: "Bigtable: failed to read rows".to_string(),
cause: Some(Box::new(e)),
});
}
retry_count += 1;
merni::counter!("bigtable.read_retry": 1);
tracing::debug!(retry_count = retry_count, "Retrying read");
Copy link
Member Author

@lcian lcian Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was supposed to be a tracing::debug I think so I changed it.
tracing::warn was ending up in Sentry, even though we already capture this as a metric in DD.

}
break response.map_err(|e| ServiceError::Generic {
context: "Bigtable: failed to read rows".to_string(),
cause: Some(Box::new(e)),
})?;
}
retry_count += 1;
merni::counter!("bigtable.read_retry": 1);
tracing::warn!(retry_count = retry_count, "Retrying read");
};
debug_assert!(response.len() <= 1, "Expected at most one row");

Expand Down