Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions objectstore-server/src/auth/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use objectstore_service::service::{DeleteResponse, GetResponse, InsertResponse,
use objectstore_service::{ClientStream, StorageService};
use objectstore_types::auth::Permission;
use objectstore_types::metadata::Metadata;
use objectstore_types::range::ByteRange;

use crate::auth::{AuthContext, AuthError};
use crate::endpoints::common::ApiResult;
Expand Down Expand Up @@ -110,9 +111,13 @@ impl AuthAwareService {
}

/// Auth-aware wrapper around [`StorageService::get_object`].
pub async fn get_object(&self, id: ObjectId) -> ApiResult<GetResponse> {
pub async fn get_object(
&self,
id: ObjectId,
range: Option<ByteRange>,
) -> ApiResult<GetResponse> {
self.assert_authorized(Permission::ObjectRead, id.context())?;
Ok(self.service.get_object(id).await?)
Ok(self.service.get_object(id, range).await?)
}

/// Auth-aware wrapper around [`StorageService::delete_object`].
Expand Down
2 changes: 1 addition & 1 deletion objectstore-server/src/endpoints/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async fn convert_to_part(
match result {
Ok(OpResponse::Got {
key,
response: Some((metadata, stream)),
response: Some((metadata, _content_range, stream)),
}) => got_to_part(idx, key, metadata, stream, state, context)
.await
.unwrap_or_else(|e| create_error_part(idx, &e)),
Expand Down
3 changes: 3 additions & 0 deletions objectstore-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ impl ApiError {

ApiError::Service(ServiceError::Client(_)) => StatusCode::BAD_REQUEST,
ApiError::Service(ServiceError::Metadata(_)) => StatusCode::BAD_REQUEST,
ApiError::Service(ServiceError::RangeNotSatisfiable { .. }) => {
StatusCode::RANGE_NOT_SATISFIABLE
}
ApiError::Service(ServiceError::InvalidUploadId(_)) => StatusCode::BAD_REQUEST,
ApiError::Service(ServiceError::AtCapacity) => StatusCode::TOO_MANY_REQUESTS,
ApiError::Service(ServiceError::NotImplemented) => StatusCode::NOT_IMPLEMENTED,
Expand Down
59 changes: 55 additions & 4 deletions objectstore-server/src/endpoints/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use axum::{Json, Router};
use objectstore_service::error::Error as ServiceError;
use objectstore_service::id::{ObjectContext, ObjectId};
use objectstore_types::metadata::Metadata;
use objectstore_types::range::{ByteRange, RangeError};
use serde::Serialize;

use crate::auth::AuthAwareService;
Expand Down Expand Up @@ -64,15 +65,65 @@ async fn object_get(
service: AuthAwareService,
State(state): State<ServiceState>,
Xt(id): Xt<ObjectId>,
headers: HeaderMap,
) -> ApiResult<Response> {
let byte_range = match headers.get(http::header::RANGE) {
Some(value) => {
let header_str = value
.to_str()
.map_err(|_| ApiError::Client("invalid Range header".into()))?;
match ByteRange::try_from(header_str) {
Ok(range) => Some(range),
Err(RangeError::InvalidUnit(_) | RangeError::MultiRange) => None,
Err(e) => return Err(ApiError::Client(format!("invalid Range header: {e}"))),
}
}
None => None,
};

let context = id.context().clone();
let Some((metadata, stream)) = service.get_object(id).await? else {
return Ok(StatusCode::NOT_FOUND.into_response());
let result = service.get_object(id, byte_range).await;

let (metadata, content_range, stream) = match result {
Ok(Some(tuple)) => tuple,
Ok(None) => return Ok(StatusCode::NOT_FOUND.into_response()),
Err(ApiError::Service(ServiceError::RangeNotSatisfiable { total })) => {
return Ok((
StatusCode::RANGE_NOT_SATISFIABLE,
[(http::header::CONTENT_RANGE, format!("bytes */{total}"))],
[(http::header::ACCEPT_RANGES, "bytes")],
)
.into_response());
}
Err(e) => return Err(e),
};

let stream = state.meter_stream(stream, &context);
let metadata_headers = metadata.to_headers("").map_err(ServiceError::from)?;

let headers = metadata.to_headers("").map_err(ServiceError::from)?;
Ok((headers, Body::from_stream(stream)).into_response())
let is_partial = !content_range.is_full();
let status = if is_partial {
StatusCode::PARTIAL_CONTENT
} else {
StatusCode::OK
};
let mut response = (status, metadata_headers, Body::from_stream(stream)).into_response();

let headers = response.headers_mut();
headers.insert(
http::header::ACCEPT_RANGES,
http::header::HeaderValue::from_static("bytes"),
);
if is_partial {
// When `is_partial == false`, `CONTENT_LENGTH is already part of `metadata_headers`.
headers.insert(
http::header::CONTENT_LENGTH,
content_range.len_to_header_value(),
);
headers.insert(http::header::CONTENT_RANGE, content_range.to_header_value());
}

Ok(response)
}

async fn object_head(service: AuthAwareService, Xt(id): Xt<ObjectId>) -> ApiResult<Response> {
Expand Down
242 changes: 242 additions & 0 deletions objectstore-server/tests/range_requests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
//! End-to-end tests for HTTP Range request support.

use anyhow::Result;
use objectstore_server::config::{AuthZ, Config};
use objectstore_test::server::TestServer;

async fn setup() -> (TestServer, String) {
let server = TestServer::with_config(Config {
auth: AuthZ {
enforce: false,
..Default::default()
},
..Default::default()
})
.await;

let client = reqwest::Client::new();
let payload = "Hello, Range Requests!"; // 22 bytes

let resp = client
.post(server.url("/v1/objects/test/org=1/"))
.header("content-type", "text/plain")
.body(payload)
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::CREATED);
let body: serde_json::Value = resp.json().await.unwrap();
let key = body["key"].as_str().unwrap().to_string();

(server, key)
}

#[tokio::test]
async fn no_range_returns_200_with_accept_ranges() -> Result<()> {
let (server, key) = setup().await;
let client = reqwest::Client::new();

let resp = client
.get(server.url(&format!("/v1/objects/test/org=1/{key}")))
.send()
.await?;

assert_eq!(resp.status(), reqwest::StatusCode::OK);
assert_eq!(
resp.headers().get("accept-ranges").unwrap().to_str()?,
"bytes"
);
assert!(
resp.headers().get("content-range").is_none(),
"200 response must not include Content-Range"
);

let body = resp.text().await?;
assert_eq!(body, "Hello, Range Requests!");
Ok(())
}

#[tokio::test]
async fn range_prefix_returns_206() -> Result<()> {
let (server, key) = setup().await;
let client = reqwest::Client::new();

let resp = client
.get(server.url(&format!("/v1/objects/test/org=1/{key}")))
.header("range", "bytes=0-4")
.send()
.await?;

assert_eq!(resp.status(), reqwest::StatusCode::PARTIAL_CONTENT);
assert_eq!(
resp.headers().get("accept-ranges").unwrap().to_str()?,
"bytes"
);
assert_eq!(
resp.headers().get("content-range").unwrap().to_str()?,
"bytes 0-4/22"
);
assert_eq!(resp.headers().get("content-length").unwrap().to_str()?, "5");

let body = resp.text().await?;
assert_eq!(body, "Hello");
Ok(())
}

#[tokio::test]
async fn range_suffix_returns_206() -> Result<()> {
let (server, key) = setup().await;
let client = reqwest::Client::new();

let resp = client
.get(server.url(&format!("/v1/objects/test/org=1/{key}")))
.header("range", "bytes=-9")
.send()
.await?;

assert_eq!(resp.status(), reqwest::StatusCode::PARTIAL_CONTENT);
assert_eq!(
resp.headers().get("content-range").unwrap().to_str()?,
"bytes 13-21/22"
);

let body = resp.text().await?;
assert_eq!(body, "Requests!");
Ok(())
}

#[tokio::test]
async fn range_from_offset_returns_206() -> Result<()> {
let (server, key) = setup().await;
let client = reqwest::Client::new();

let resp = client
.get(server.url(&format!("/v1/objects/test/org=1/{key}")))
.header("range", "bytes=7-")
.send()
.await?;

assert_eq!(resp.status(), reqwest::StatusCode::PARTIAL_CONTENT);
assert_eq!(
resp.headers().get("content-range").unwrap().to_str()?,
"bytes 7-21/22"
);
assert_eq!(
resp.headers().get("content-length").unwrap().to_str()?,
"15"
);

let body = resp.text().await?;
assert_eq!(body, "Range Requests!");
Ok(())
}

#[tokio::test]
async fn unknown_range_unit_returns_200_full_body() -> Result<()> {
let (server, key) = setup().await;
let client = reqwest::Client::new();

let resp = client
.get(server.url(&format!("/v1/objects/test/org=1/{key}")))
.header("range", "items=0-10")
.send()
.await?;

assert_eq!(resp.status(), reqwest::StatusCode::OK);
let body = resp.text().await?;
assert_eq!(body, "Hello, Range Requests!");
Ok(())
}

#[tokio::test]
async fn invalid_bytes_range_returns_400() -> Result<()> {
let (server, key) = setup().await;
let client = reqwest::Client::new();

let resp = client
.get(server.url(&format!("/v1/objects/test/org=1/{key}")))
.header("range", "bytes=abc-def")
.send()
.await?;

assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
Ok(())
}

#[tokio::test]
async fn multi_range_falls_back_to_full_body() -> Result<()> {
let (server, key) = setup().await;
let client = reqwest::Client::new();

let resp = client
.get(server.url(&format!("/v1/objects/test/org=1/{key}")))
.header("range", "bytes=0-4, 10-14")
.send()
.await?;

assert_eq!(resp.status(), reqwest::StatusCode::OK);
let body = resp.text().await?;
assert_eq!(body, "Hello, Range Requests!");
Ok(())
}

#[tokio::test]
async fn unsatisfiable_range_returns_416() -> Result<()> {
let (server, key) = setup().await;
let client = reqwest::Client::new();

let resp = client
.get(server.url(&format!("/v1/objects/test/org=1/{key}")))
.header("range", "bytes=100-200")
.send()
.await?;

assert_eq!(resp.status(), reqwest::StatusCode::RANGE_NOT_SATISFIABLE);
assert_eq!(
resp.headers().get("content-range").unwrap().to_str()?,
"bytes */22"
);
assert_eq!(
resp.headers().get("accept-ranges").unwrap().to_str()?,
"bytes"
);
Ok(())
}

#[tokio::test]
async fn range_on_nonexistent_object_returns_404() -> Result<()> {
let (server, _key) = setup().await;
let client = reqwest::Client::new();

let resp = client
.get(server.url("/v1/objects/test/org=1/nonexistent"))
.header("range", "bytes=0-10")
.send()
.await?;

assert_eq!(resp.status(), reqwest::StatusCode::NOT_FOUND);
Ok(())
}

#[tokio::test]
async fn full_range_returns_200() -> Result<()> {
let (server, key) = setup().await;
let client = reqwest::Client::new();

// Request the full object as a range — should still get 200 since it's the full content.
let resp = client
.get(server.url(&format!("/v1/objects/test/org=1/{key}")))
.header("range", "bytes=0-21")
.send()
.await?;

assert_eq!(resp.status(), reqwest::StatusCode::OK);
assert!(
resp.headers().get("content-range").is_none(),
"full-object range should be 200 without Content-Range"
);

let body = resp.text().await?;
assert_eq!(body, "Hello, Range Requests!");
Ok(())
}
Loading
Loading