Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions clients/rust/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::LazyLock;
use objectstore_client::{Client, Error, SecretKey, TokenGenerator, Usecase};
use objectstore_test::server::{TEST_EDDSA_KID, TEST_EDDSA_PRIVKEY_PATH, TestServer, config};
use objectstore_types::Compression;
use reqwest::StatusCode;

pub static TEST_EDDSA_PRIVKEY: LazyLock<String> =
LazyLock::new(|| std::fs::read_to_string(&*TEST_EDDSA_PRIVKEY_PATH).unwrap());
Expand Down Expand Up @@ -199,6 +200,8 @@ async fn fails_with_insufficient_auth_token_perms() {

let put_result = session.put("initial body").send().await;
println!("{:?}", put_result);
// TODO: When server errors cause appropriate status codes to be returned, ensure this is 403
assert!(matches!(put_result, Err(Error::Reqwest(_))));
match put_result {
Err(Error::Reqwest(err)) => assert_eq!(err.status().unwrap(), StatusCode::FORBIDDEN),
_ => panic!("Expected error"),
}
}
10 changes: 6 additions & 4 deletions objectstore-server/src/auth/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use objectstore_service::id::{ObjectContext, ObjectId};
use objectstore_service::{PayloadStream, StorageService};
use objectstore_service::{
DeleteResponse, GetResponse, InsertResponse, PayloadStream, StorageService,
};
use objectstore_types::{Metadata, Permission};

use crate::auth::AuthContext;
Expand Down Expand Up @@ -56,7 +58,7 @@ impl AuthAwareService {
key: Option<String>,
metadata: &Metadata,
stream: PayloadStream,
) -> ApiResult<ObjectId> {
) -> ApiResult<InsertResponse> {
self.assert_authorized(Permission::ObjectWrite, &context)?;
Ok(self
.service
Expand All @@ -65,13 +67,13 @@ impl AuthAwareService {
}

/// Auth-aware wrapper around [`StorageService::get_object`].
pub async fn get_object(&self, id: &ObjectId) -> ApiResult<Option<(Metadata, PayloadStream)>> {
pub async fn get_object(&self, id: &ObjectId) -> ApiResult<GetResponse> {
self.assert_authorized(Permission::ObjectRead, id.context())?;
Ok(self.service.get_object(id).await?)
}

/// Auth-aware wrapper around [`StorageService::delete_object`].
pub async fn delete_object(&self, id: &ObjectId) -> ApiResult<()> {
pub async fn delete_object(&self, id: &ObjectId) -> ApiResult<DeleteResponse> {
self.assert_authorized(Permission::ObjectDelete, id.context())?;
Ok(self.service.delete_object(id).await?)
}
Expand Down
11 changes: 5 additions & 6 deletions objectstore-service/src/backend/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ use futures_util::{StreamExt, TryStreamExt, stream};
use objectstore_types::{ExpirationPolicy, Metadata};
use tokio::runtime::Handle;

use crate::PayloadStream;
use crate::backend::common::Backend;
use crate::backend::common::{Backend, DeleteResponse, GetResponse, PutResponse};
use crate::id::ObjectId;
use crate::{ServiceError, ServiceResult};
use crate::{PayloadStream, ServiceError, ServiceResult};

/// Connection timeout used for the initial connection to BigQuery.
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
Expand Down Expand Up @@ -187,7 +186,7 @@ impl Backend for BigTableBackend {
id: &ObjectId,
metadata: &Metadata,
mut stream: PayloadStream,
) -> ServiceResult<()> {
) -> ServiceResult<PutResponse> {
tracing::debug!("Writing to Bigtable backend");
let path = id.as_storage_path().to_string().into_bytes();

Expand All @@ -201,7 +200,7 @@ impl Backend for BigTableBackend {
}

#[tracing::instrument(level = "trace", fields(?id), skip_all)]
async fn get_object(&self, id: &ObjectId) -> ServiceResult<Option<(Metadata, PayloadStream)>> {
async fn get_object(&self, id: &ObjectId) -> ServiceResult<GetResponse> {
tracing::debug!("Reading from Bigtable backend");
let path = id.as_storage_path().to_string().into_bytes();
let rows = v2::RowSet {
Expand Down Expand Up @@ -294,7 +293,7 @@ impl Backend for BigTableBackend {
}

#[tracing::instrument(level = "trace", fields(?id), skip_all)]
async fn delete_object(&self, id: &ObjectId) -> ServiceResult<()> {
async fn delete_object(&self, id: &ObjectId) -> ServiceResult<DeleteResponse> {
tracing::debug!("Deleting from Bigtable backend");

let path = id.as_storage_path().to_string().into_bytes();
Expand Down
16 changes: 11 additions & 5 deletions objectstore-service/src/backend/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,21 @@ use std::fmt::Debug;

use objectstore_types::Metadata;

use crate::PayloadStream;
use crate::ServiceResult;
use crate::id::ObjectId;
use crate::{PayloadStream, ServiceResult};

/// User agent string used for outgoing requests.
///
/// This intentionally has a "sentry" prefix so that it can easily be traced back to us.
pub const USER_AGENT: &str = concat!("sentry-objectstore/", env!("CARGO_PKG_VERSION"));

/// Backend response for put operations.
pub(super) type PutResponse = ();
/// Backend response for get operations.
pub(super) type GetResponse = Option<(Metadata, PayloadStream)>;
/// Backend response for delete operations.
pub(super) type DeleteResponse = ();

/// A type-erased [`Backend`] instance.
pub type BoxedBackend = Box<dyn Backend>;

Expand All @@ -25,13 +31,13 @@ pub trait Backend: Debug + Send + Sync + 'static {
id: &ObjectId,
metadata: &Metadata,
stream: PayloadStream,
) -> ServiceResult<()>;
) -> ServiceResult<PutResponse>;

/// Retrieves an object at the given path, returning its metadata and a stream of bytes.
async fn get_object(&self, id: &ObjectId) -> ServiceResult<Option<(Metadata, PayloadStream)>>;
async fn get_object(&self, id: &ObjectId) -> ServiceResult<GetResponse>;

/// Deletes the object at the given path.
async fn delete_object(&self, id: &ObjectId) -> ServiceResult<()>;
async fn delete_object(&self, id: &ObjectId) -> ServiceResult<DeleteResponse>;
}

/// Creates a reqwest client with required defaults.
Expand Down
11 changes: 5 additions & 6 deletions objectstore-service/src/backend/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ use objectstore_types::{ExpirationPolicy, Metadata};
use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode, Url, header, multipart};
use serde::{Deserialize, Serialize};

use crate::PayloadStream;
use crate::backend::common::{self, Backend};
use crate::backend::common::{self, Backend, DeleteResponse, GetResponse, PutResponse};
use crate::id::ObjectId;
use crate::{ServiceError, ServiceResult};
use crate::{PayloadStream, ServiceError, ServiceResult};

/// Default endpoint used to access the GCS JSON API.
const DEFAULT_ENDPOINT: &str = "https://storage.googleapis.com";
Expand Down Expand Up @@ -347,7 +346,7 @@ impl Backend for GcsBackend {
id: &ObjectId,
metadata: &Metadata,
stream: PayloadStream,
) -> ServiceResult<()> {
) -> ServiceResult<PutResponse> {
tracing::debug!("Writing to GCS backend");
let gcs_metadata = GcsObject::from_metadata(metadata);

Expand Down Expand Up @@ -401,7 +400,7 @@ impl Backend for GcsBackend {
}

#[tracing::instrument(level = "trace", fields(?id), skip_all)]
async fn get_object(&self, id: &ObjectId) -> ServiceResult<Option<(Metadata, PayloadStream)>> {
async fn get_object(&self, id: &ObjectId) -> ServiceResult<GetResponse> {
tracing::debug!("Reading from GCS backend");
let object_url = self.object_url(id)?;
let metadata_response = self
Expand Down Expand Up @@ -485,7 +484,7 @@ impl Backend for GcsBackend {
}

#[tracing::instrument(level = "trace", fields(?id), skip_all)]
async fn delete_object(&self, id: &ObjectId) -> ServiceResult<()> {
async fn delete_object(&self, id: &ObjectId) -> ServiceResult<DeleteResponse> {
tracing::debug!("Deleting from GCS backend");
let response = self
.request(Method::DELETE, self.object_url(id)?)
Expand Down
11 changes: 5 additions & 6 deletions objectstore-service/src/backend/local_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ use tokio::fs::OpenOptions;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio_util::io::{ReaderStream, StreamReader};

use crate::PayloadStream;
use crate::backend::common::Backend;
use crate::backend::common::{Backend, DeleteResponse, GetResponse, PutResponse};
use crate::id::ObjectId;
use crate::{ServiceError, ServiceResult};
use crate::{PayloadStream, ServiceError, ServiceResult};

#[derive(Debug)]
pub struct LocalFsBackend {
Expand All @@ -36,7 +35,7 @@ impl Backend for LocalFsBackend {
id: &ObjectId,
metadata: &Metadata,
stream: PayloadStream,
) -> ServiceResult<()> {
) -> ServiceResult<PutResponse> {
let path = self.path.join(id.as_storage_path().to_string());
tracing::debug!(path=%path.display(), "Writing to local_fs backend");
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
Expand Down Expand Up @@ -69,7 +68,7 @@ impl Backend for LocalFsBackend {

// TODO: Return `Ok(None)` if object is found but past expiry
#[tracing::instrument(level = "trace", fields(?id), skip_all)]
async fn get_object(&self, id: &ObjectId) -> ServiceResult<Option<(Metadata, PayloadStream)>> {
async fn get_object(&self, id: &ObjectId) -> ServiceResult<GetResponse> {
tracing::debug!("Reading from local_fs backend");
let path = self.path.join(id.as_storage_path().to_string());
let file = match OpenOptions::new().read(true).open(path).await {
Expand Down Expand Up @@ -97,7 +96,7 @@ impl Backend for LocalFsBackend {
}

#[tracing::instrument(level = "trace", fields(?id), skip_all)]
async fn delete_object(&self, id: &ObjectId) -> ServiceResult<()> {
async fn delete_object(&self, id: &ObjectId) -> ServiceResult<DeleteResponse> {
tracing::debug!("Deleting from local_fs backend");
let path = self.path.join(id.as_storage_path().to_string());
let result = tokio::fs::remove_file(path).await;
Expand Down
11 changes: 5 additions & 6 deletions objectstore-service/src/backend/s3_compatible.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ use futures_util::{StreamExt, TryStreamExt};
use objectstore_types::{ExpirationPolicy, Metadata};
use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode};

use crate::PayloadStream;
use crate::backend::common::{self, Backend};
use crate::backend::common::{self, Backend, DeleteResponse, GetResponse, PutResponse};
use crate::id::ObjectId;
use crate::{ServiceError, ServiceResult};
use crate::{PayloadStream, ServiceError, ServiceResult};

/// Prefix used for custom metadata in headers for the GCS backend.
///
Expand Down Expand Up @@ -156,7 +155,7 @@ impl<T: TokenProvider> Backend for S3CompatibleBackend<T> {
id: &ObjectId,
metadata: &Metadata,
stream: PayloadStream,
) -> ServiceResult<()> {
) -> ServiceResult<PutResponse> {
tracing::debug!("Writing to s3_compatible backend");
self.request(Method::PUT, self.object_url(id))
.await?
Expand All @@ -178,7 +177,7 @@ impl<T: TokenProvider> Backend for S3CompatibleBackend<T> {
}

#[tracing::instrument(level = "trace", fields(?id), skip_all)]
async fn get_object(&self, id: &ObjectId) -> ServiceResult<Option<(Metadata, PayloadStream)>> {
async fn get_object(&self, id: &ObjectId) -> ServiceResult<GetResponse> {
tracing::debug!("Reading from s3_compatible backend");
let object_url = self.object_url(id);

Expand Down Expand Up @@ -231,7 +230,7 @@ impl<T: TokenProvider> Backend for S3CompatibleBackend<T> {
}

#[tracing::instrument(level = "trace", fields(?id), skip_all)]
async fn delete_object(&self, id: &ObjectId) -> ServiceResult<()> {
async fn delete_object(&self, id: &ObjectId) -> ServiceResult<DeleteResponse> {
tracing::debug!("Deleting from s3_compatible backend");
let response = self
.request(Method::DELETE, self.object_url(id))
Expand Down
18 changes: 11 additions & 7 deletions objectstore-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ enum BackendChoice {
LongTerm,
}

/// Service response for get operations.
pub type GetResponse = Option<(Metadata, PayloadStream)>;
/// Service response for insert operations.
pub type InsertResponse = ObjectId;
/// Service response for delete operations.
pub type DeleteResponse = ();

// TODO(ja): Move into a submodule
/// High-level asynchronous service for storing and retrieving objects.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -114,7 +121,7 @@ impl StorageService {
key: Option<String>,
metadata: &Metadata,
mut stream: PayloadStream,
) -> ServiceResult<ObjectId> {
) -> ServiceResult<InsertResponse> {
let start = Instant::now();

let mut first_chunk = BytesMut::new();
Expand Down Expand Up @@ -219,10 +226,7 @@ impl StorageService {
}

/// Streams the contents of an object stored at the given key.
pub async fn get_object(
&self,
id: &ObjectId,
) -> ServiceResult<Option<(Metadata, PayloadStream)>> {
pub async fn get_object(&self, id: &ObjectId) -> ServiceResult<GetResponse> {
let start = Instant::now();

let mut backend_choice = "high-volume";
Expand Down Expand Up @@ -259,7 +263,7 @@ impl StorageService {
}

/// Deletes an object stored at the given key, if it exists.
pub async fn delete_object(&self, id: &ObjectId) -> ServiceResult<()> {
pub async fn delete_object(&self, id: &ObjectId) -> ServiceResult<DeleteResponse> {
let start = Instant::now();

if let Some((metadata, _stream)) = self.0.high_volume_backend.get_object(id).await? {
Expand All @@ -278,7 +282,7 @@ impl StorageService {
}
}

fn is_tombstoned(result: &Option<(Metadata, PayloadStream)>) -> bool {
fn is_tombstoned(result: &GetResponse) -> bool {
matches!(
result,
Some((
Expand Down