-
-
Notifications
You must be signed in to change notification settings - Fork 6
feat(server): Implement batch endpoint #275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fd19052
752d3cc
dd487a3
5478ec6
a1a9a4d
16c0bdc
1e8070b
e57fea8
ac25a9d
66fb421
708e11b
ee0836c
adc8294
de2c49c
bb08d10
2316f5d
d6dbeee
778cda4
730bec6
0465ea7
93ebb98
3269cee
0d20f36
07ecd11
0b2350f
3b821aa
7b15a42
e1d991b
3907e3c
f74f66d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| pub const HEADER_BATCH_OPERATION_KEY: &str = "x-sn-batch-operation-key"; | ||
| pub const HEADER_BATCH_OPERATION_KIND: &str = "x-sn-batch-operation-kind"; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,16 +1,31 @@ | ||
| use std::time::SystemTime; | ||
|
|
||
| use axum::Router; | ||
| use axum::extract::DefaultBodyLimit; | ||
| use axum::http::StatusCode; | ||
| use axum::response::{IntoResponse, Response}; | ||
| use axum::extract::{DefaultBodyLimit, State}; | ||
| use axum::response::Response; | ||
| use axum::routing; | ||
| use objectstore_service::id::ObjectContext; | ||
| use base64::Engine; | ||
| use base64::prelude::BASE64_STANDARD; | ||
| use bytes::{Bytes, BytesMut}; | ||
| use futures::StreamExt; | ||
| use futures::TryStreamExt; | ||
| use futures::stream::BoxStream; | ||
| use http::header::CONTENT_TYPE; | ||
| use http::{HeaderMap, HeaderValue, StatusCode}; | ||
| use objectstore_service::id::{ObjectContext, ObjectId, ObjectKey}; | ||
| use objectstore_types::Metadata; | ||
|
|
||
| use crate::auth::AuthAwareService; | ||
| use crate::endpoints::common::ApiResult; | ||
| use crate::extractors::{BatchRequest, Xt}; | ||
| use crate::batch::{HEADER_BATCH_OPERATION_KEY, HEADER_BATCH_OPERATION_KIND}; | ||
| use crate::endpoints::common::{ApiError, ApiErrorResponse, ApiResult}; | ||
| use crate::extractors::Xt; | ||
| use crate::extractors::batch::{BatchError, BatchOperationStream, Operation}; | ||
| use crate::multipart::{IntoMultipartResponse, Part}; | ||
| use crate::rate_limits::MeteredPayloadStream; | ||
| use crate::state::ServiceState; | ||
|
|
||
| const MAX_BODY_SIZE: usize = 1024 * 1024 * 1024; // 1 GB | ||
| const HEADER_BATCH_OPERATION_STATUS: &str = "x-sn-batch-operation-status"; | ||
|
|
||
| pub fn router() -> Router<ServiceState> { | ||
| Router::new() | ||
|
|
@@ -19,10 +34,274 @@ pub fn router() -> Router<ServiceState> { | |
| .layer(DefaultBodyLimit::max(MAX_BODY_SIZE)) | ||
| } | ||
|
|
||
| struct GetResponse { | ||
| pub key: ObjectKey, | ||
| pub result: ApiResult<Option<(Metadata, Bytes)>>, | ||
|
lcian marked this conversation as resolved.
|
||
| } | ||
|
|
||
| impl std::fmt::Debug for GetResponse { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| f.debug_struct("GetResponse") | ||
| .field("key", &self.key) | ||
| .finish() | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| struct InsertResponse { | ||
| pub key: ObjectKey, | ||
| pub result: ApiResult<objectstore_service::InsertResponse>, | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| struct DeleteResponse { | ||
| pub key: ObjectKey, | ||
| pub result: ApiResult<objectstore_service::DeleteResponse>, | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| struct ErrorResponse { | ||
| pub key: Option<ObjectKey>, | ||
| pub kind: Option<&'static str>, | ||
| pub error: ApiError, | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| enum OperationResponse { | ||
| Get(GetResponse), | ||
| Insert(InsertResponse), | ||
| Delete(DeleteResponse), | ||
| Error(ErrorResponse), | ||
| } | ||
|
|
||
| async fn batch( | ||
| _service: AuthAwareService, | ||
| Xt(_context): Xt<ObjectContext>, | ||
| _request: BatchRequest, | ||
| ) -> ApiResult<Response> { | ||
| Ok(StatusCode::NOT_IMPLEMENTED.into_response()) | ||
| service: AuthAwareService, | ||
| State(state): State<ServiceState>, | ||
| Xt(context): Xt<ObjectContext>, | ||
| mut requests: BatchOperationStream, | ||
| ) -> Response { | ||
| let responses: BoxStream<OperationResponse> = async_stream::stream! { | ||
| while let Some(operation) = requests.0.next().await { | ||
| let (operation, key, kind) = match operation { | ||
| Ok(op) => { | ||
| let key = op.key().clone(); | ||
| let kind = match &op { | ||
| Operation::Get(_) => "get", | ||
| Operation::Insert(_) => "insert", | ||
| Operation::Delete(_) => "delete", | ||
| }; | ||
| (Ok(op), Some(key), Some(kind)) | ||
| } | ||
| Err(e) => (Err(e), None, None), | ||
| }; | ||
|
|
||
| if !state.rate_limiter.check(&context) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to rethink the placement of our ratelimiter calls. The rate limiter is called once in the extractor for Apart from that, the placement is inconsistent now. |
||
| tracing::debug!("Batch operation rejected due to rate limits"); | ||
| yield OperationResponse::Error(ErrorResponse { | ||
| key, | ||
| kind, | ||
| error: BatchError::RateLimited.into(), | ||
| }); | ||
| continue; | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra rate limit check in batch endpoint extractorHigh Severity The batch endpoint performs rate limiting once in the |
||
|
|
||
| let result = match operation { | ||
| Ok(operation) => match operation { | ||
| Operation::Get(get) => { | ||
| let key = get.key.clone(); | ||
| let result = service | ||
| .get_object(&ObjectId::new(context.clone(), get.key)) | ||
| .await; | ||
|
|
||
| let result = match result { | ||
| Ok(Some((metadata, stream))) => { | ||
| let metered_stream = MeteredPayloadStream::from( | ||
| stream, | ||
| state.rate_limiter.bytes_accumulator() | ||
| ); | ||
| match metered_stream.try_collect::<BytesMut>().await { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're buffering the entire object contents instead of streaming them out to the client. This can cause significant memory pressure - we need to look into passing the stream through. |
||
| Ok(bytes) => Ok(Some((metadata, bytes.freeze()))), | ||
| Err(e) => Err(ApiError::Service(e.into())), | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. GET responses fully buffered causing memory pressureMedium Severity Object payloads retrieved via GET operations are fully buffered into memory using |
||
| } | ||
| Ok(None) => Ok(None), | ||
| Err(e) => Err(e), | ||
| }; | ||
|
|
||
| OperationResponse::Get(GetResponse { key, result }) | ||
| } | ||
| Operation::Insert(insert) => { | ||
| let key = insert.key.clone(); | ||
| let mut metadata = insert.metadata; | ||
| metadata.time_created = Some(SystemTime::now()); | ||
|
|
||
| let payload_len = insert.payload.len() as u64; | ||
| state.rate_limiter.bytes_accumulator() | ||
| .fetch_add(payload_len, std::sync::atomic::Ordering::Relaxed); | ||
|
|
||
| let stream = futures_util::stream::once(async { Ok(insert.payload) }).boxed(); | ||
| let result = service | ||
| .insert_object(context.clone(), Some(insert.key), &metadata, stream) | ||
| .await; | ||
| OperationResponse::Insert(InsertResponse { key, result }) | ||
| } | ||
| Operation::Delete(delete) => { | ||
| let key = delete.key.clone(); | ||
| let result = service | ||
| .delete_object(&ObjectId::new(context.clone(), delete.key)) | ||
| .await; | ||
| OperationResponse::Delete(DeleteResponse { key, result }) | ||
| } | ||
| }, | ||
| Err(error) => OperationResponse::Error(ErrorResponse { | ||
| key, | ||
| kind, | ||
| error: error.into(), | ||
| }), | ||
| }; | ||
| yield result; | ||
| } | ||
| } | ||
| .boxed(); | ||
|
|
||
| let r = rand::random::<u128>(); | ||
| responses.into_multipart_response(r) | ||
| } | ||
|
|
||
| fn insert_key_header(headers: &mut HeaderMap, key: &ObjectKey) { | ||
| let encoded = BASE64_STANDARD.encode(key.as_bytes()); | ||
| headers.insert( | ||
| HEADER_BATCH_OPERATION_KEY, | ||
| encoded | ||
| .parse() | ||
| .expect("base64 encoded string is always a valid header value"), | ||
| ); | ||
| } | ||
|
|
||
| fn insert_kind_header(headers: &mut HeaderMap, kind: &str) { | ||
| headers.insert( | ||
| HEADER_BATCH_OPERATION_KIND, | ||
| kind.parse() | ||
| .expect("operation kind is always a valid header value"), | ||
| ); | ||
| } | ||
|
|
||
| fn insert_status_header(headers: &mut HeaderMap, status: StatusCode) { | ||
| let status_str = format!( | ||
| "{} {}", | ||
| status.as_u16(), | ||
| status.canonical_reason().unwrap_or("") | ||
| ) | ||
| .trim() | ||
| .to_owned(); | ||
|
|
||
| headers.insert( | ||
| HEADER_BATCH_OPERATION_STATUS, | ||
| status_str.parse().expect("always a valid header value"), | ||
| ); | ||
| } | ||
|
|
||
| fn create_success_part( | ||
| key: &ObjectKey, | ||
| kind: &str, | ||
| status: StatusCode, | ||
| content_type: Option<HeaderValue>, | ||
| body: Bytes, | ||
| additional_headers: Option<HeaderMap>, | ||
| ) -> Part { | ||
| let mut headers = HeaderMap::new(); | ||
| insert_key_header(&mut headers, key); | ||
| insert_kind_header(&mut headers, kind); | ||
| insert_status_header(&mut headers, status); | ||
| if let Some(additional) = additional_headers { | ||
| headers.extend(additional); | ||
| } | ||
| Part::new(body, headers, content_type) | ||
| } | ||
|
|
||
| fn create_error_part(key: Option<&ObjectKey>, kind: Option<&str>, error: &ApiError) -> Part { | ||
| let mut headers = HeaderMap::new(); | ||
| if let Some(key) = key { | ||
| insert_key_header(&mut headers, key); | ||
| } | ||
| if let Some(kind) = kind { | ||
| insert_kind_header(&mut headers, kind); | ||
| } | ||
| insert_status_header(&mut headers, error.status()); | ||
|
|
||
| let error_body = serde_json::to_vec(&ApiErrorResponse::from_error(error)) | ||
| .inspect_err(|err| { | ||
| tracing::error!( | ||
| error = err as &dyn std::error::Error, | ||
| "error serializing ApiErrorResponse, this should never happen" | ||
| ) | ||
| }) | ||
| .unwrap_or_default(); | ||
| Part::new(Bytes::from(error_body), headers, None) | ||
| } | ||
|
|
||
| impl From<OperationResponse> for Part { | ||
| fn from(value: OperationResponse) -> Self { | ||
| match value { | ||
| OperationResponse::Get(GetResponse { key, result }) => match result { | ||
| Ok(Some((metadata, bytes))) => { | ||
| let mut metadata_headers = match metadata.to_headers("", false) { | ||
| Ok(headers) => headers, | ||
| Err(err) => { | ||
| let err = BatchError::ResponseSerialization { | ||
| context: "serializing object metadata".to_owned(), | ||
| cause: Box::new(err), | ||
| } | ||
| .into(); | ||
| return create_error_part(Some(&key), Some("get"), &err); | ||
| } | ||
| }; | ||
| create_success_part( | ||
| &key, | ||
| "get", | ||
| StatusCode::OK, | ||
| metadata_headers.remove(CONTENT_TYPE), | ||
| bytes, | ||
| Some(metadata_headers), | ||
| ) | ||
| } | ||
| Ok(None) => create_success_part( | ||
| &key, | ||
| "get", | ||
| StatusCode::NOT_FOUND, | ||
| None, | ||
| Bytes::new(), | ||
| None, | ||
| ), | ||
| Err(error) => create_error_part(Some(&key), Some("get"), &error), | ||
| }, | ||
| OperationResponse::Insert(InsertResponse { key, result }) => match result { | ||
| // XXX: this could actually be either StatusCode::OK or StatusCode::CREATED, the service | ||
| // layer doesn't allow us to distinguish between them currently | ||
| Ok(_) => create_success_part( | ||
| &key, | ||
| "insert", | ||
| StatusCode::CREATED, | ||
| None, | ||
| Bytes::new(), | ||
| None, | ||
| ), | ||
| Err(error) => create_error_part(Some(&key), Some("insert"), &error), | ||
| }, | ||
| OperationResponse::Delete(DeleteResponse { key, result }) => match result { | ||
| Ok(_) => create_success_part( | ||
| &key, | ||
| "delete", | ||
| StatusCode::NO_CONTENT, | ||
| None, | ||
| Bytes::new(), | ||
| None, | ||
| ), | ||
| Err(error) => create_error_part(Some(&key), Some("delete"), &error), | ||
| }, | ||
| OperationResponse::Error(ErrorResponse { key, kind, error }) => { | ||
| create_error_part(key.as_ref(), kind, &error) | ||
| } | ||
| } | ||
| } | ||
| } | ||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use multer directly to easily set limits on the request and its parts