Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions objectstore-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ anyhow = { workspace = true }
argh = "0.1.13"
async-stream = "0.3.6"
axum = { version = "0.8.4", features = ["multipart"] }
base64 = "0.22"
axum-extra = "0.10.1"
bytes = { workspace = true }
console = "0.16.1"
Expand Down
2 changes: 1 addition & 1 deletion objectstore-server/config/local.example.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Use a port different from devservices
http_addr: 0.0.0.0:18888
http_addr: 0.0.0.0:8888

long_term_storage:
type: filesystem
Expand Down
10 changes: 6 additions & 4 deletions objectstore-server/src/auth/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use objectstore_service::id::{ObjectContext, ObjectId};
use objectstore_service::{PayloadStream, StorageService};
use objectstore_service::{
DeleteResponse, GetResponse, InsertResponse, PayloadStream, StorageService,
};
use objectstore_types::{Metadata, Permission};

use crate::auth::AuthContext;
Expand Down Expand Up @@ -56,7 +58,7 @@ impl AuthAwareService {
key: Option<String>,
metadata: &Metadata,
stream: PayloadStream,
) -> ApiResult<ObjectId> {
) -> ApiResult<InsertResponse> {
self.assert_authorized(Permission::ObjectWrite, &context)?;
Ok(self
.service
Expand All @@ -65,13 +67,13 @@ impl AuthAwareService {
}

/// Auth-aware wrapper around [`StorageService::get_object`].
pub async fn get_object(&self, id: &ObjectId) -> ApiResult<Option<(Metadata, PayloadStream)>> {
pub async fn get_object(&self, id: &ObjectId) -> ApiResult<GetResponse> {
self.assert_authorized(Permission::ObjectRead, id.context())?;
Ok(self.service.get_object(id).await?)
}

/// Auth-aware wrapper around [`StorageService::delete_object`].
pub async fn delete_object(&self, id: &ObjectId) -> ApiResult<()> {
pub async fn delete_object(&self, id: &ObjectId) -> ApiResult<DeleteResponse> {
self.assert_authorized(Permission::ObjectDelete, id.context())?;
Ok(self.service.delete_object(id).await?)
}
Expand Down
2 changes: 2 additions & 0 deletions objectstore-server/src/batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub const HEADER_BATCH_OPERATION_KEY: &str = "x-sn-batch-operation-key";
pub const HEADER_BATCH_OPERATION_KIND: &str = "x-sn-batch-operation-kind";
301 changes: 290 additions & 11 deletions objectstore-server/src/endpoints/batch.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,31 @@
use std::time::SystemTime;

use axum::Router;
use axum::extract::DefaultBodyLimit;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::extract::{DefaultBodyLimit, State};
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use multer directly to easily set limits on the request and its parts

use axum::response::Response;
use axum::routing;
use objectstore_service::id::ObjectContext;
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use bytes::{Bytes, BytesMut};
use futures::StreamExt;
use futures::TryStreamExt;
use futures::stream::BoxStream;
use http::header::CONTENT_TYPE;
use http::{HeaderMap, HeaderValue, StatusCode};
use objectstore_service::id::{ObjectContext, ObjectId, ObjectKey};
use objectstore_types::Metadata;

use crate::auth::AuthAwareService;
use crate::endpoints::common::ApiResult;
use crate::extractors::{BatchRequest, Xt};
use crate::batch::{HEADER_BATCH_OPERATION_KEY, HEADER_BATCH_OPERATION_KIND};
use crate::endpoints::common::{ApiError, ApiErrorResponse, ApiResult};
use crate::extractors::Xt;
use crate::extractors::batch::{BatchError, BatchOperationStream, Operation};
use crate::multipart::{IntoMultipartResponse, Part};
use crate::rate_limits::MeteredPayloadStream;
use crate::state::ServiceState;

const MAX_BODY_SIZE: usize = 1024 * 1024 * 1024; // 1 GB
const HEADER_BATCH_OPERATION_STATUS: &str = "x-sn-batch-operation-status";

pub fn router() -> Router<ServiceState> {
Router::new()
Expand All @@ -19,10 +34,274 @@ pub fn router() -> Router<ServiceState> {
.layer(DefaultBodyLimit::max(MAX_BODY_SIZE))
}

struct GetResponse {
pub key: ObjectKey,
pub result: ApiResult<Option<(Metadata, Bytes)>>,
Comment thread
lcian marked this conversation as resolved.
}

impl std::fmt::Debug for GetResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GetResponse")
.field("key", &self.key)
.finish()
}
}

#[derive(Debug)]
struct InsertResponse {
pub key: ObjectKey,
pub result: ApiResult<objectstore_service::InsertResponse>,
}

#[derive(Debug)]
struct DeleteResponse {
pub key: ObjectKey,
pub result: ApiResult<objectstore_service::DeleteResponse>,
}

#[derive(Debug)]
struct ErrorResponse {
pub key: Option<ObjectKey>,
pub kind: Option<&'static str>,
pub error: ApiError,
}

#[derive(Debug)]
enum OperationResponse {
Get(GetResponse),
Insert(InsertResponse),
Delete(DeleteResponse),
Error(ErrorResponse),
}

async fn batch(
_service: AuthAwareService,
Xt(_context): Xt<ObjectContext>,
_request: BatchRequest,
) -> ApiResult<Response> {
Ok(StatusCode::NOT_IMPLEMENTED.into_response())
service: AuthAwareService,
State(state): State<ServiceState>,
Xt(context): Xt<ObjectContext>,
mut requests: BatchOperationStream,
) -> Response {
let responses: BoxStream<OperationResponse> = async_stream::stream! {
while let Some(operation) = requests.0.next().await {
let (operation, key, kind) = match operation {
Ok(op) => {
let key = op.key().clone();
let kind = match &op {
Operation::Get(_) => "get",
Operation::Insert(_) => "insert",
Operation::Delete(_) => "delete",
};
(Ok(op), Some(key), Some(kind))
}
Err(e) => (Err(e), None, None),
};

if !state.rate_limiter.check(&context) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to rethink the placement of our ratelimiter calls. The rate limiter is called once in the extractor for ObjectContext. In the case of batch endpoint, that is invalid since we need to call it exactly once per operation - now it's one too many.

Apart from that, the placement is inconsistent now.

tracing::debug!("Batch operation rejected due to rate limits");
yield OperationResponse::Error(ErrorResponse {
key,
kind,
error: BatchError::RateLimited.into(),
});
continue;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra rate limit check in batch endpoint extractor

High Severity

The batch endpoint performs rate limiting once in the Xt<ObjectContext> extractor (before the handler runs) and then again for each operation in the handler. This causes N+1 rate limit checks for N operations, consuming an extra token per batch request. The extractor-level check at line 130 of id.rs triggers before the per-operation checks in batch.rs, resulting in overcounting against rate limits.

Fix in Cursor Fix in Web


let result = match operation {
Ok(operation) => match operation {
Operation::Get(get) => {
let key = get.key.clone();
let result = service
.get_object(&ObjectId::new(context.clone(), get.key))
.await;

let result = match result {
Ok(Some((metadata, stream))) => {
let metered_stream = MeteredPayloadStream::from(
stream,
state.rate_limiter.bytes_accumulator()
);
match metered_stream.try_collect::<BytesMut>().await {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're buffering the entire object contents instead of streaming them out to the client. This can cause significant memory pressure - we need to look into passing the stream through.

Ok(bytes) => Ok(Some((metadata, bytes.freeze()))),
Err(e) => Err(ApiError::Service(e.into())),
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GET responses fully buffered causing memory pressure

Medium Severity

Object payloads retrieved via GET operations are fully buffered into memory using try_collect::<BytesMut>() rather than being streamed. For large objects or many concurrent batch requests, this can cause significant memory pressure and potentially OOM conditions on the server.

Fix in Cursor Fix in Web

}
Ok(None) => Ok(None),
Err(e) => Err(e),
};

OperationResponse::Get(GetResponse { key, result })
}
Operation::Insert(insert) => {
let key = insert.key.clone();
let mut metadata = insert.metadata;
metadata.time_created = Some(SystemTime::now());

let payload_len = insert.payload.len() as u64;
state.rate_limiter.bytes_accumulator()
.fetch_add(payload_len, std::sync::atomic::Ordering::Relaxed);

let stream = futures_util::stream::once(async { Ok(insert.payload) }).boxed();
let result = service
.insert_object(context.clone(), Some(insert.key), &metadata, stream)
.await;
OperationResponse::Insert(InsertResponse { key, result })
}
Operation::Delete(delete) => {
let key = delete.key.clone();
let result = service
.delete_object(&ObjectId::new(context.clone(), delete.key))
.await;
OperationResponse::Delete(DeleteResponse { key, result })
}
},
Err(error) => OperationResponse::Error(ErrorResponse {
key,
kind,
error: error.into(),
}),
};
yield result;
}
}
.boxed();

let r = rand::random::<u128>();
responses.into_multipart_response(r)
}

fn insert_key_header(headers: &mut HeaderMap, key: &ObjectKey) {
let encoded = BASE64_STANDARD.encode(key.as_bytes());
headers.insert(
HEADER_BATCH_OPERATION_KEY,
encoded
.parse()
.expect("base64 encoded string is always a valid header value"),
);
}

fn insert_kind_header(headers: &mut HeaderMap, kind: &str) {
headers.insert(
HEADER_BATCH_OPERATION_KIND,
kind.parse()
.expect("operation kind is always a valid header value"),
);
}

fn insert_status_header(headers: &mut HeaderMap, status: StatusCode) {
let status_str = format!(
"{} {}",
status.as_u16(),
status.canonical_reason().unwrap_or("")
)
.trim()
.to_owned();

headers.insert(
HEADER_BATCH_OPERATION_STATUS,
status_str.parse().expect("always a valid header value"),
);
}

fn create_success_part(
key: &ObjectKey,
kind: &str,
status: StatusCode,
content_type: Option<HeaderValue>,
body: Bytes,
additional_headers: Option<HeaderMap>,
) -> Part {
let mut headers = HeaderMap::new();
insert_key_header(&mut headers, key);
insert_kind_header(&mut headers, kind);
insert_status_header(&mut headers, status);
if let Some(additional) = additional_headers {
headers.extend(additional);
}
Part::new(body, headers, content_type)
}

fn create_error_part(key: Option<&ObjectKey>, kind: Option<&str>, error: &ApiError) -> Part {
let mut headers = HeaderMap::new();
if let Some(key) = key {
insert_key_header(&mut headers, key);
}
if let Some(kind) = kind {
insert_kind_header(&mut headers, kind);
}
insert_status_header(&mut headers, error.status());

let error_body = serde_json::to_vec(&ApiErrorResponse::from_error(error))
.inspect_err(|err| {
tracing::error!(
error = err as &dyn std::error::Error,
"error serializing ApiErrorResponse, this should never happen"
)
})
.unwrap_or_default();
Part::new(Bytes::from(error_body), headers, None)
}

impl From<OperationResponse> for Part {
fn from(value: OperationResponse) -> Self {
match value {
OperationResponse::Get(GetResponse { key, result }) => match result {
Ok(Some((metadata, bytes))) => {
let mut metadata_headers = match metadata.to_headers("", false) {
Ok(headers) => headers,
Err(err) => {
let err = BatchError::ResponseSerialization {
context: "serializing object metadata".to_owned(),
cause: Box::new(err),
}
.into();
return create_error_part(Some(&key), Some("get"), &err);
}
};
create_success_part(
&key,
"get",
StatusCode::OK,
metadata_headers.remove(CONTENT_TYPE),
bytes,
Some(metadata_headers),
)
}
Ok(None) => create_success_part(
&key,
"get",
StatusCode::NOT_FOUND,
None,
Bytes::new(),
None,
),
Err(error) => create_error_part(Some(&key), Some("get"), &error),
},
OperationResponse::Insert(InsertResponse { key, result }) => match result {
// XXX: this could actually be either StatusCode::OK or StatusCode::CREATED, the service
// layer doesn't allow us to distinguish between them currently
Ok(_) => create_success_part(
&key,
"insert",
StatusCode::CREATED,
None,
Bytes::new(),
None,
),
Err(error) => create_error_part(Some(&key), Some("insert"), &error),
},
OperationResponse::Delete(DeleteResponse { key, result }) => match result {
Ok(_) => create_success_part(
&key,
"delete",
StatusCode::NO_CONTENT,
None,
Bytes::new(),
None,
),
Err(error) => create_error_part(Some(&key), Some("delete"), &error),
},
OperationResponse::Error(ErrorResponse { key, kind, error }) => {
create_error_part(key.as_ref(), kind, &error)
}
}
}
}
Loading