Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions objectstore-server/docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,23 @@ All object operations live under the `/v1/` prefix:
| `DELETE` | `/v1/objects/{usecase}/{scopes}/{key}` | Delete object |
| `POST` | `/v1/objects:batch/{usecase}/{scopes}/` | Batch operations (multipart) |

### Multipart Upload Endpoints

| Method | Path | Description |
|-----------|--------------------------------------------------------------|--------------------------------------|
| `POST` | `/v1/objects:multipart:initiate/{usecase}/{scopes}/` | Initiate upload (server-generated key) |
| `PUT` | `/v1/objects:multipart:initiate/{usecase}/{scopes}/{key}` | Initiate upload (user-provided key) |
| `PUT` | `/v1/objects:multipart:parts/{usecase}/{scopes}/{key}` | Upload a part (`uploadId`, `partNumber` query params) |
| `GET` | `/v1/objects:multipart:parts/{usecase}/{scopes}/{key}` | List uploaded parts (`uploadId` query param) |
| `POST` | `/v1/objects:multipart:complete/{usecase}/{scopes}/{key}` | Complete upload (`uploadId` query param) |
| `DELETE` | `/v1/objects:multipart/{usecase}/{scopes}/{key}` | Abort upload (`uploadId` query param) |

The complete endpoint returns `200 OK` immediately, with a streaming body that
will contain the error (if any) as JSON. Whitespace is sent in the streaming body
to keep the connection open.
Clients must parse the body to determine the actual outcome, and not rely on the
status code.

Scopes are encoded in the URL path using Matrix URI syntax:
`org=123;project=456`. An underscore (`_`) represents empty scopes.

Expand Down
79 changes: 79 additions & 0 deletions objectstore-server/src/auth/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use objectstore_service::id::{ObjectContext, ObjectId};
use objectstore_service::multipart::{
AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
};
use objectstore_service::service::{DeleteResponse, GetResponse, InsertResponse, MetadataResponse};
use objectstore_service::{ClientStream, StorageService};
use objectstore_types::auth::Permission;
Expand Down Expand Up @@ -116,4 +120,79 @@ impl AuthAwareService {
self.assert_authorized(Permission::ObjectDelete, id.context())?;
Ok(self.service.delete_object(id).await?)
}

// --- Multipart upload operations ---

/// Auth-aware wrapper around [`StorageService::initiate_multipart`].
pub async fn initiate_multipart(
&self,
id: ObjectId,
metadata: Metadata,
) -> ApiResult<InitiateMultipartResponse> {
self.assert_authorized(Permission::ObjectWrite, id.context())?;
Ok(self.service.initiate_multipart(id, metadata).await?)
}

/// Auth-aware wrapper around [`StorageService::upload_part`].
pub async fn upload_part(
&self,
id: ObjectId,
upload_id: UploadId,
part_number: PartNumber,
content_length: u64,
content_md5: Option<String>,
body: ClientStream,
) -> ApiResult<UploadPartResponse> {
self.assert_authorized(Permission::ObjectWrite, id.context())?;
Ok(self
.service
.upload_part(
id,
upload_id,
part_number,
content_length,
content_md5,
body,
)
.await?)
}

/// Auth-aware wrapper around [`StorageService::list_parts`].
pub async fn list_parts(
&self,
id: ObjectId,
upload_id: UploadId,
max_parts: Option<u32>,
part_number_marker: Option<PartNumber>,
) -> ApiResult<ListPartsResponse> {
self.assert_authorized(Permission::ObjectWrite, id.context())?;
Ok(self
.service
.list_parts(id, upload_id, max_parts, part_number_marker)
.await?)
}

/// Auth-aware wrapper around [`StorageService::abort_multipart`].
pub async fn abort_multipart(
&self,
id: ObjectId,
upload_id: UploadId,
) -> ApiResult<AbortMultipartResponse> {
self.assert_authorized(Permission::ObjectWrite, id.context())?;
Ok(self.service.abort_multipart(id, upload_id).await?)
}

/// Auth-aware wrapper around [`StorageService::complete_multipart`].
pub async fn complete_multipart(
&self,
id: ObjectId,
upload_id: UploadId,
parts: Vec<CompletedPart>,
) -> ApiResult<CompleteMultipartResponse> {
self.assert_authorized(Permission::ObjectWrite, id.context())?;
Ok(self
.service
.complete_multipart(id, upload_id, parts)
.await?)
}
}
10 changes: 10 additions & 0 deletions objectstore-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ pub enum ApiError {
/// Errors encountered when parsing or executing a batch request.
#[error("batch error: {0}")]
Batch(#[from] BatchError),

/// Internal server errors.
#[error("internal error: {0}")]
Internal(String),
}

/// Result type for API operations.
Expand Down Expand Up @@ -90,10 +94,16 @@ impl ApiError {
ApiError::Service(ServiceError::Client(_)) => StatusCode::BAD_REQUEST,
ApiError::Service(ServiceError::Metadata(_)) => StatusCode::BAD_REQUEST,
ApiError::Service(ServiceError::AtCapacity) => StatusCode::TOO_MANY_REQUESTS,
ApiError::Service(ServiceError::NotImplemented) => StatusCode::NOT_IMPLEMENTED,
ApiError::Service(_) => {
objectstore_log::error!(!!self, "error handling request");
StatusCode::INTERNAL_SERVER_ERROR
}

ApiError::Internal(_) => {
objectstore_log::error!(!!self, "internal error");
Comment thread
lcian marked this conversation as resolved.
StatusCode::INTERNAL_SERVER_ERROR
}
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion objectstore-server/src/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod batch;
pub mod common;
pub mod health;
mod keda;
mod multipart;
mod objects;

/// Returns `true` for internal endpoints that are exempt from metrics and concurrency limits.
Expand All @@ -24,7 +25,8 @@ pub fn is_internal_route(route: &str) -> bool {
pub fn routes() -> Router<ServiceState> {
let routes_v1 = Router::new()
.merge(objects::router())
.merge(batch::router());
.merge(batch::router())
.merge(multipart::router());

Router::new()
.merge(health::router())
Expand Down
Loading
Loading