Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion objectstore-service/src/backend/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
/// Time to debounce bumping an object with configured TTI.
const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); // 1 day

/// How often to retry failed `mutate` operations
/// How many times to retry failed operations
const REQUEST_RETRY_COUNT: usize = 2;

/// Column that stores the raw payload (compressed).
Expand Down
238 changes: 158 additions & 80 deletions objectstore-service/src/backend/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ const DEFAULT_ENDPOINT: &str = "https://storage.googleapis.com";
const TOKEN_SCOPES: &[&str] = &["https://www.googleapis.com/auth/devstorage.read_write"];
/// Time to debounce bumping an object with configured TTI.
const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); // 1 day
/// How many times to retry failed operations
const REQUEST_RETRY_COUNT: usize = 2;

/// Prefix for our built-in metadata stored in GCS metadata field
const BUILTIN_META_PREFIX: &str = "x-sn-";
Expand Down Expand Up @@ -223,6 +225,26 @@ impl serde::Serialize for GcsMetaKey {
}
}

fn is_retryable(error: &reqwest::Error) -> bool {
// Could be transient network failures
if error.is_timeout() || error.is_connect() || error.is_request() {
return true;
}
let Some(status) = error.status() else {
return false;
};
// See https://docs.cloud.google.com/storage/docs/json_api/v1/status-codes
matches!(
status,
StatusCode::REQUEST_TIMEOUT
| StatusCode::TOO_MANY_REQUESTS
| StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT
)
}

pub struct GcsBackend {
client: reqwest::Client,
endpoint: Url,
Expand Down Expand Up @@ -307,22 +329,33 @@ impl GcsBackend {
custom_time: SystemTime,
}

self.request(Method::PATCH, object_url)
.await?
.json(&CustomTimeRequest { custom_time })
.send()
.await
.map_err(|cause| ServiceError::Reqwest {
context: "GCS: failed to send update custom time request".to_string(),
cause,
})?
.error_for_status()
.map_err(|cause| ServiceError::Reqwest {
context: "GCS: failed to update expiration time for object with TTI".to_string(),
cause,
})?;

Ok(())
let mut retry_count = 0;
loop {
let result = self
.request(Method::PATCH, object_url.clone())
.await?
.json(&CustomTimeRequest { custom_time })
.send()
.await
.and_then(|resp| resp.error_for_status());

match result {
Ok(_) => return Ok(()),
Err(e) if retry_count < REQUEST_RETRY_COUNT && is_retryable(&e) => {
retry_count += 1;
merni::counter!("gcs.update_custom_time_retry": 1);
tracing::debug!(retry_count, "Retrying update custom time");
}
Err(cause) => {
merni::counter!("gcs.update_custom_time_failures": 1);
return Err(ServiceError::Reqwest {
context: "GCS: failed to update expiration time for object with TTI"
.to_string(),
cause,
});
}
}
}
}
}

Expand Down Expand Up @@ -404,37 +437,56 @@ impl Backend for GcsBackend {
async fn get_object(&self, id: &ObjectId) -> ServiceResult<Option<(Metadata, PayloadStream)>> {
tracing::debug!("Reading from GCS backend");
let object_url = self.object_url(id)?;
let metadata_response = self
.request(Method::GET, object_url.clone())
.await?
.send()
.await
.map_err(|cause| ServiceError::Reqwest {
context: "GCS: failed to send get metadata request".to_string(),
cause,
})?;

if metadata_response.status() == StatusCode::NOT_FOUND {
tracing::debug!("Object not found");
return Ok(None);
}

let metadata_response =
metadata_response
.error_for_status()
.map_err(|cause| ServiceError::Reqwest {
context: "GCS: failed to get object metadata".to_string(),
cause,
})?;

let gcs_metadata: GcsObject =
metadata_response
.json()
.await
.map_err(|cause| ServiceError::Reqwest {
context: "GCS: failed to parse object metadata response".to_string(),
cause,
})?;
let mut retry_count = 0;
let gcs_metadata: GcsObject = loop {
let result = self
.request(Method::GET, object_url.clone())
.await?
.send()
.await;

match result {
// Do not error for objects that do not exist
Ok(resp) if resp.status() == StatusCode::NOT_FOUND => {
tracing::debug!("Object not found");
return Ok(None);
}
result => {
let result = result.and_then(|r| r.error_for_status());
match result {
Ok(resp) => match resp.json().await {
Ok(meta) => break meta,
Err(e) if retry_count < REQUEST_RETRY_COUNT && is_retryable(&e) => {
retry_count += 1;
merni::counter!("gcs.get_metadata_retry": 1);
tracing::debug!(retry_count, "Retrying get metadata");
}
Err(cause) => {
merni::counter!("gcs.get_metadata_failures": 1);
return Err(ServiceError::Reqwest {
context: "GCS: failed to parse object metadata response"
.to_string(),
cause,
});
}
},
Err(e) if retry_count < REQUEST_RETRY_COUNT && is_retryable(&e) => {
retry_count += 1;
merni::counter!("gcs.get_metadata_retry": 1);
tracing::debug!(retry_count, "Retrying get metadata");
}
Err(cause) => {
merni::counter!("gcs.get_metadata_failures": 1);
return Err(ServiceError::Reqwest {
context: "GCS: failed to get object metadata".to_string(),
cause,
});
}
}
}
}
};

// TODO: Store custom_time directly in metadata.
let expire_at = gcs_metadata.custom_time;
Expand All @@ -461,20 +513,32 @@ impl Backend for GcsBackend {

let mut download_url = object_url;
download_url.query_pairs_mut().append_pair("alt", "media");
let payload_response = self
.request(Method::GET, download_url)
.await?
.send()
.await
.map_err(|cause| ServiceError::Reqwest {
context: "GCS: failed to send get payload request".to_string(),
cause,
})?
.error_for_status()
.map_err(|cause| ServiceError::Reqwest {
context: "GCS: failed to get object payload".to_string(),
cause,
})?;

let mut retry_count = 0;
let payload_response = loop {
let result = self
.request(Method::GET, download_url.clone())
.await?
.send()
.await
.and_then(|resp| resp.error_for_status());

match result {
Ok(resp) => break resp,
Err(e) if retry_count < REQUEST_RETRY_COUNT && is_retryable(&e) => {
retry_count += 1;
merni::counter!("gcs.get_payload_retry": 1);
tracing::debug!(retry_count, "Retrying get payload");
}
Err(cause) => {
merni::counter!("gcs.get_payload_failures": 1);
return Err(ServiceError::Reqwest {
context: "GCS: failed to get object payload".to_string(),
cause,
});
}
}
};

let stream = payload_response
.bytes_stream()
Expand All @@ -487,28 +551,42 @@ impl Backend for GcsBackend {
#[tracing::instrument(level = "trace", fields(?id), skip_all)]
async fn delete_object(&self, id: &ObjectId) -> ServiceResult<()> {
tracing::debug!("Deleting from GCS backend");
let response = self
.request(Method::DELETE, self.object_url(id)?)
.await?
.send()
.await
.map_err(|cause| ServiceError::Reqwest {
context: "GCS: failed to send delete request".to_string(),
cause,
})?;
let object_url = self.object_url(id)?;

// Do not error for objects that do not exist
if response.status() != StatusCode::NOT_FOUND {
tracing::debug!("Object not found");
response
.error_for_status()
.map_err(|cause| ServiceError::Reqwest {
context: "GCS: failed to delete object".to_string(),
cause,
})?;
let mut retry_count = 0;
loop {
let result = self
.request(Method::DELETE, object_url.clone())
.await?
.send()
.await;

match result {
// Do not error for objects that do not exist
Ok(resp) if resp.status() == StatusCode::NOT_FOUND => {
tracing::debug!("Object not found");
return Ok(());
}
result => {
let result = result.and_then(|r| r.error_for_status());
match result {
Ok(_) => return Ok(()),
Err(e) if retry_count < REQUEST_RETRY_COUNT && is_retryable(&e) => {
retry_count += 1;
merni::counter!("gcs.delete_retry": 1);
tracing::debug!(retry_count, "Retrying delete");
}
Err(cause) => {
merni::counter!("gcs.delete_failures": 1);
return Err(ServiceError::Reqwest {
context: "GCS: failed to delete object".to_string(),
cause,
});
}
}
}
}
}

Ok(())
}
}

Expand Down