Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
c38d880
wip
lcian May 8, 2026
393ea96
wip
lcian May 8, 2026
8d0a49d
wip
lcian May 8, 2026
c20008f
wip
lcian May 8, 2026
5928985
wip
lcian May 8, 2026
5c12e5e
wip
lcian May 8, 2026
b418b62
wip
lcian May 8, 2026
8b43e4a
wip
lcian May 11, 2026
2ab6b42
revert unrelated changes
lcian May 11, 2026
8f51b3f
fix(test): set compression(None) in full_upload_flow multipart test
lcian May 11, 2026
0571711
fix(test): pre-compress parts in full_upload_flow multipart test
lcian May 11, 2026
5b7e1a0
revert unrelated changes
lcian May 11, 2026
b5ee8fc
revert unrelated changes
lcian May 11, 2026
7ed2130
improve
lcian May 11, 2026
adb0317
improve
lcian May 11, 2026
0452d6e
improve
lcian May 11, 2026
59b2dec
improve
lcian May 11, 2026
697d21e
fix: correct broken intra-doc link in multipart.rs
lcian May 11, 2026
2d1828e
feat(rust-client): Make Usecase compression optional
lcian May 11, 2026
7e5fdf6
improve
lcian May 11, 2026
d0518be
fix(rust-client): don't zstd-compress parts in uncompressed multipart…
lcian May 11, 2026
cd8ba49
improve
lcian May 11, 2026
9b399c1
improve
lcian May 11, 2026
d86cecb
improve
lcian May 11, 2026
b1a695f
improve
lcian May 11, 2026
d84038f
improve
lcian May 13, 2026
a2515a6
fix
lcian May 13, 2026
b1e6a42
add comment on decoder.multiple_members(true)
lcian May 27, 2026
175e3fc
revert spurious pyproject.toml and uv.lock changes
lcian May 29, 2026
db3745f
ref: move UploadId and PartNumber to objectstore-types
lcian May 29, 2026
8b8396e
ref(rust-client): accept u32/String at the public API boundary
lcian May 29, 2026
59da6dd
ref(rust-client): use specific error variants for validation failures
lcian May 29, 2026
bb111e4
fix(server): map InvalidUploadId to 400 Bad Request
lcian May 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions clients/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ publish = true

[dependencies]
async-compression = { version = "0.4.27", features = ["tokio", "zstd"] }
base64 = { version = "0.22.1", optional = true }
percent-encoding = { workspace = true }
bytes = { workspace = true }
futures-util = { workspace = true }
Expand Down Expand Up @@ -39,6 +40,13 @@ zstd = "0.13.3"

[features]
default = ["native-tls", "hickory-dns"]
multipart = ["dep:base64"]

rustls = ["reqwest/rustls"]
native-tls = ["reqwest/native-tls"]
hickory-dns = ["reqwest/hickory-dns"]

[[test]]
name = "multipart"
path = "tests/multipart.rs"
required-features = ["multipart"]
83 changes: 83 additions & 0 deletions clients/rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,89 @@ session.put("payload")
.send().await?;
```

### Multipart Upload API

For large objects, use multipart uploads to upload parts concurrently with bounded
parallelism.

**Important:** unlike single-object uploads, multipart uploads do **not** auto-compress.
The caller must pre-compress each part according to the compression set as part of the metadata
when initiating the upload.

```rust,ignore
use futures_util::StreamExt as _;
use futures_util::stream;
use objectstore_client::Compression;

let upload = session
.initiate_multipart_upload()
.key("my-large-object")
.compression(Compression::Zstd)
.send()
.await?;

let parts: Vec<(Vec<u8>, u32)> = vec![
(zstd::encode_all(&part1_data[..], 0)?, 1),
(zstd::encode_all(&part2_data[..], 0)?, 2),
];

let results: Vec<_> = stream::iter(
parts
.into_iter()
.map(|(data, part_number)| upload.put(data, part_number, None)),
)
.buffer_unordered(8)
.collect()
.await;

let mut done = Vec::new();
let mut errors = Vec::new();
for result in results {
match result {
Ok(part) => done.push(part),
Err(e) => errors.push(e),
}
}

if !errors.is_empty() {
// reupload failed parts...
}

let key = upload.complete(done).await?;
// or
upload.abort().await?;
```

You can also resume an in-progress multipart upload, e.g. after a process restart.

```rust,ignore
use futures_util::{StreamExt as _, TryStreamExt as _};
use futures_util::stream;
use objectstore_client::CompletePart;

let upload = session.resume_multipart_upload("my-large-object", saved_upload_id)?;

let existing = upload.list_parts().await?;
let total_parts = 10;
let uploaded: Vec<u32> = existing.iter().map(|p| p.part_number.get()).collect();
let missing: Vec<u32> = (1..=total_parts)
.filter(|n| !uploaded.contains(n))
.collect();

let mut done: Vec<_> = stream::iter(
missing
.into_iter()
.map(|part_number| upload.put(get_part_data(part_number), part_number, None)),
)
.buffer_unordered(8)
.try_collect()
.await?;

done.extend(existing.into_iter().map(CompletePart::from));

let key = upload.complete(done).await?;
```

### Many API

The Many API allows you to enqueue multiple requests that the client can execute using Objectstore's batch endpoint, minimizing network overhead.
Expand Down
60 changes: 54 additions & 6 deletions clients/rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl ClientBuilder {
#[derive(Debug, Clone)]
pub struct Usecase {
name: Arc<str>,
compression: Compression,
compression: Option<Compression>,
expiration_policy: ExpirationPolicy,
}

Expand All @@ -160,7 +160,7 @@ impl Usecase {
pub fn new(name: &str) -> Self {
Self {
name: name.into(),
compression: Compression::Zstd,
compression: Some(Compression::Zstd),
expiration_policy: Default::default(),
}
}
Expand All @@ -173,18 +173,18 @@ impl Usecase {

/// Returns the compression algorithm to use for operations within this usecase.
#[inline]
pub fn compression(&self) -> Compression {
pub fn compression(&self) -> Option<Compression> {
self.compression
}

/// Sets the compression algorithm to use for operations within this usecase.
///
/// It's still possible to override this default on each operation's builder.
///
/// By default, [`Compression::Zstd`] is used.
pub fn with_compression(self, compression: Compression) -> Self {
/// By default, [`Compression::Zstd`] is used. Pass [`None`] to disable compression.
pub fn with_compression(self, compression: impl Into<Option<Compression>>) -> Self {
Self {
compression,
compression: compression.into(),
..self
}
}
Expand Down Expand Up @@ -464,6 +464,41 @@ impl Session {
url
}

#[cfg(feature = "multipart")]
fn multipart_url(
&self,
suffix: Option<&'static str>,
object_key: Option<&str>,
query_pairs: Option<Vec<(&str, String)>>,
) -> Url {
let mut url = self.client.service_url.clone();

// `path_segments_mut` can only error if the url is cannot-be-a-base,
// and we check that in `ClientBuilder::new`, therefore this will never panic.
let mut segments = url.path_segments_mut().unwrap();
segments
.push("v1")
.push(match suffix {
Some("parts") => "objects:multipart:parts",
Some("complete") => "objects:multipart:complete",
_ => "objects:multipart",
})
.push(&self.scope.usecase.name)
.push(&self.scope.scopes.as_api_path().to_string());
if let Some(object_key) = object_key.filter(|key| !key.is_empty()) {
segments.extend(object_key.split("/"));
}
drop(segments);
if let Some(query_pairs) = query_pairs {
let mut pairs = url.query_pairs_mut();
for (key, value) in query_pairs {
pairs.append_pair(key, &value);
}
}
Comment thread
lcian marked this conversation as resolved.

url
}

fn prepare_builder(&self, mut builder: RequestBuilder) -> crate::Result<RequestBuilder> {
if let Some(token) = self.mint_token()? {
builder = builder.header("x-os-auth", format!("Bearer {token}"));
Expand Down Expand Up @@ -493,6 +528,19 @@ impl Session {
let builder = self.client.reqwest.post(url);
self.prepare_builder(builder)
}

#[cfg(feature = "multipart")]
pub(crate) fn multipart_request(
&self,
method: reqwest::Method,
action: Option<&'static str>,
object_key: Option<&str>,
query_pairs: Option<Vec<(&str, String)>>,
) -> crate::Result<RequestBuilder> {
let url = self.multipart_url(action, object_key, query_pairs);
let builder = self.client.reqwest.request(method, url);
self.prepare_builder(builder)
}
}

#[cfg(test)]
Expand Down
14 changes: 14 additions & 0 deletions clients/rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ pub enum Error {
/// The error message.
message: String,
},
/// Error when part number validation fails (must be >= 1).
#[error("invalid part number: {0}")]
InvalidPartNumber(u32),
/// Error when upload ID validation fails.
#[error(transparent)]
InvalidUploadId(#[from] objectstore_types::multipart::InvalidUploadId),
/// Error returned when attempting to complete a multipart upload.
#[error("multipart complete failed ({code}): {message}")]
MultipartComplete {
/// The error code or kind.
code: String,
/// The error message.
message: String,
},
}

/// A convenience alias that defaults our [`Error`] type.
Expand Down
25 changes: 24 additions & 1 deletion clients/rust/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ pub(crate) fn maybe_decompress(
match (metadata.compression, decompress && !encoding_accepted) {
(Some(Compression::Zstd), true) => {
metadata.compression = None;
ReaderStream::new(ZstdDecoder::new(StreamReader::new(stream))).boxed()
let mut decoder = ZstdDecoder::new(StreamReader::new(stream));
// Multipart uploads with compression, when each part is compressed individually,
// will consist of multiple concatenated zstd frames.
// This allows the client to handle automatic decompression for these objects transparently.
decoder.multiple_members(true);
Comment thread
lcian marked this conversation as resolved.
ReaderStream::new(decoder).boxed()
}
_ => stream,
}
Expand Down Expand Up @@ -230,4 +235,22 @@ mod tests {
assert_eq!(collect(out).await, payload);
assert_eq!(metadata.compression, None);
}

#[tokio::test]
async fn zstd_concatenated_frames_decompress() {
let payload1 = b"hello ";
let payload2 = b"world";
let compressed1 = collect(compressed_zstd_stream(payload1)).await;
let compressed2 = collect(compressed_zstd_stream(payload2)).await;
let stream = futures_util::stream::iter([
Ok::<_, std::io::Error>(bytes::Bytes::from(compressed1)),
Ok::<_, std::io::Error>(bytes::Bytes::from(compressed2)),
])
.boxed();

let mut metadata = zstd_metadata();
let out = maybe_decompress(stream, &mut metadata, true, &[]);
assert_eq!(collect(out).await, b"hello world");
assert_eq!(metadata.compression, None);
}
}
4 changes: 4 additions & 0 deletions clients/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ mod get;
mod head;
mod key;
mod many;
#[cfg(feature = "multipart")]
mod multipart;
mod put;
pub mod utils;

Expand All @@ -23,4 +25,6 @@ pub use get::*;
pub use head::*;
pub use key::*;
pub use many::*;
#[cfg(feature = "multipart")]
pub use multipart::*;
pub use put::*;
Loading
Loading