-
Notifications
You must be signed in to change notification settings - Fork 130
impl(storage): add appendable upload builder and writer #5697
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| // Copyright 2026 Google LLC | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // https://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| use crate::Result; | ||
| use crate::model::{Object, WriteObjectSpec}; | ||
| use crate::model_ext::OpenAppendableObjectRequest; | ||
| use crate::storage::append_object::writer::AppendableObjectWriter; | ||
| use crate::storage::request_options::RequestOptions; | ||
| use std::sync::Arc; | ||
|
|
||
| /// A builder for configuring and initiating an appendable object upload. | ||
| #[derive(Debug)] | ||
| pub struct OpenAppendableObject<S = crate::storage::transport::Storage> { | ||
| stub: Arc<S>, | ||
| request: OpenAppendableObjectRequest, | ||
| options: RequestOptions, | ||
| } | ||
|
|
||
| impl<S> OpenAppendableObject<S> | ||
| where | ||
| S: crate::storage::stub::Storage + 'static, | ||
| { | ||
| pub(crate) fn new( | ||
| stub: Arc<S>, | ||
| bucket: impl Into<String>, | ||
| object: impl Into<String>, | ||
| options: RequestOptions, | ||
| ) -> Self { | ||
| let resource = Object::new().set_bucket(bucket).set_name(object); | ||
|
|
||
| let spec = WriteObjectSpec::new() | ||
| .set_resource(resource) | ||
| .set_appendable(true); | ||
|
|
||
| Self { | ||
| stub, | ||
| request: OpenAppendableObjectRequest { spec, params: None }, | ||
| options, | ||
| } | ||
| } | ||
|
|
||
| /// Opens the stream to append data. | ||
| pub async fn send(self) -> Result<AppendableObjectWriter> { | ||
| // TODO(#5716) - Add a test that verifies this builder sets up the request correctly and calls send. | ||
| self.stub | ||
| .open_appendable_object(self.request, self.options) | ||
| .await | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| // Copyright 2026 Google LLC | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // https://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| use crate::Result; | ||
| use crate::model::Object; | ||
| use bytes::Bytes; | ||
|
|
||
| /// A handle to an in-progress appendable object upload. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe you want to add a TODO(#5716) at the top of the file? |
||
| #[derive(Debug)] | ||
| #[non_exhaustive] | ||
| pub struct AppendableObjectWriter { | ||
| // TODO: The request channel, shared response state, and coalescing/replay | ||
| // buffers will be added later. | ||
| } | ||
|
|
||
| impl AppendableObjectWriter { | ||
| /// Appends bytes to the object. | ||
| /// | ||
| /// Small chunks are coalesced before being sent, | ||
| /// so a call does not necessarily produce wire traffic immediately. Appending | ||
| /// an empty buffer is a no-op. | ||
| pub async fn append(&mut self, _bytes: impl Into<Bytes>) -> Result<()> { | ||
| unimplemented!() | ||
| } | ||
|
|
||
| /// Flushes any pending data to the server and awaits durable persistence on | ||
| /// the server. | ||
| pub async fn flush(&mut self) -> Result<()> { | ||
| unimplemented!() | ||
| } | ||
|
|
||
| /// Finalizes the upload, sending any pending data. Returns the final [Object][crate::model::Object]. | ||
| pub async fn finalize(self) -> Result<Object> { | ||
| unimplemented!() | ||
| } | ||
|
|
||
| /// Relinquishes the writer without finalizing, draining any pending data to the server. | ||
| /// Returns the final persisted size of the object. | ||
| pub async fn close(self) -> Result<i64> { | ||
| unimplemented!() | ||
| } | ||
|
|
||
| /// Returns the latest durable offset confirmed by the server. | ||
| pub fn persisted_size(&self) -> i64 { | ||
| unimplemented!() | ||
| } | ||
|
|
||
| /// Returns the generation of the object being appended to. | ||
| pub fn generation(&self) -> i64 { | ||
| unimplemented!() | ||
| } | ||
|
|
||
| /// Returns the latest known object metadata. | ||
| pub fn object(&self) -> Option<Object> { | ||
| unimplemented!() | ||
|
Comment on lines
+34
to
+66
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While features can be partially implemented if documented, please avoid using References
|
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,8 @@ use super::request_options::RequestOptions; | |
| use crate::builder::storage::ReadObject; | ||
| use crate::builder::storage::WriteObject; | ||
| use crate::read_resume_policy::ReadResumePolicy; | ||
| #[cfg(google_cloud_unstable_storage_bidi)] | ||
| use crate::storage::append_object::builder::OpenAppendableObject; | ||
| use crate::storage::bidi::OpenObject; | ||
| use crate::storage::common_options::CommonOptions; | ||
| use crate::streaming_source::Payload; | ||
|
|
@@ -192,6 +194,42 @@ where | |
| ) | ||
| } | ||
|
|
||
| /// Open a new object for appendable uploads. | ||
| /// | ||
| /// This method allows you to incrementally append data to a single object. | ||
| /// | ||
| /// # Example | ||
| /// ``` | ||
| /// # use google_cloud_storage::client::Storage; | ||
| /// # use bytes::Bytes; | ||
| /// # async fn sample(client: &Storage) -> anyhow::Result<()> { | ||
| /// let mut writer = client | ||
| /// .open_appendable_object("projects/_/buckets/my-bucket", "my-object") | ||
| /// .send() | ||
| /// .await?; | ||
| /// | ||
| /// writer.append(Bytes::from_static(b"hello ")).await?; | ||
| /// writer.append(Bytes::from_static(b"world")).await?; | ||
| /// writer.flush().await?; | ||
| /// | ||
| /// let object = writer.finalize().await?; | ||
|
Comment on lines
+211
to
+215
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As you know, this inverts the control from the existing writes (and yes, Rust is inverted from the other SDKs). You could consider: client.append_to_object("bucket", "object", stream).send().await?where enum {
// Add some data
Data(Bytes),
// Relinquish maybe with some extra bytes.
Close(Option<Data>),
// Ditto but finalize
Finalize(Option<Data>),
}Dunno, may be overly complicated.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the pointer. We'll have a think about it. For now, can we get this merged so that we can continue working on the internal plumbing? We should be able to modify this later as long as it's done before we release the API. |
||
| /// println!("Finalized object size: {:?}", object.size); | ||
| /// # Ok(()) } | ||
| /// ``` | ||
| /// | ||
| /// # Parameters | ||
| /// * `bucket` - the bucket name containing the object. In | ||
| /// `projects/_/buckets/{bucket_id}` format. | ||
| /// * `object` - the object name. | ||
| #[cfg(google_cloud_unstable_storage_bidi)] | ||
| pub fn open_appendable_object( | ||
| &self, | ||
| bucket: impl Into<String>, | ||
| object: impl Into<String>, | ||
| ) -> OpenAppendableObject<S> { | ||
| OpenAppendableObject::new(self.stub.clone(), bucket, object, self.options.clone()) | ||
| } | ||
|
|
||
| /// Reads the contents of an object. | ||
| /// | ||
| /// # Example | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.