Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion src/storage/src/storage/perform_upload/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::error::WriteError;
use crate::storage::checksum::details::{
Checksum, update as checksum_update, validate as checksum_validate,
};
use base64::Engine;
use gaxi::attempt_info::AttemptInfo;
use gaxi::http::HttpRequestBuilder;
use gaxi::http::reqwest::{HeaderValue, Method};
Expand Down Expand Up @@ -84,22 +85,29 @@ where
url: &mut Option<String>,
attempt_count: u32,
) -> Result<Object> {
let is_resume = url.is_some();
let upload_url = if let Some(u) = url.as_deref() {
u
} else {
let u = self.start_resumable_upload_attempt(attempt_count).await?;
url.insert(u).as_str()
};

let mut is_partial_resume = false;
if progress.needs_query() {
match self.query_resumable_upload_attempt(upload_url, 0).await? {
ResumableUploadStatus::Finalized(object) => return Ok(*object),
ResumableUploadStatus::Partial(persisted_size) => {
if persisted_size > 0 {
is_partial_resume = true;
}
progress.handle_partial(persisted_size)?;
}
};
}

let should_send_checksum = !is_resume && !is_partial_resume;

loop {
progress
.next_buffer(&mut *self.payload.lock().await)
Expand All @@ -112,7 +120,9 @@ where
"//storage.googleapis.com/{}",
self.resource().bucket
)));
let builder = self.partial_upload_request(upload_url, progress).await?;
let builder = self
.partial_upload_request(upload_url, progress, should_send_checksum)
.await?;
// TODO(#4862) - maybe this should also use attempt_count ?
let response = builder.send(options, AttemptInfo::new(0)).await?;
match super::query_resumable_upload_handle_response(response).await {
Expand All @@ -134,6 +144,7 @@ where
&self,
upload_url: &str,
progress: &mut InProgressUpload,
should_send_checksum: bool,
) -> Result<HttpRequestBuilder> {
let range = progress.range_header();
let builder = self
Expand All @@ -148,6 +159,17 @@ where
);

let builder = apply_customer_supplied_encryption_headers(builder, &self.params);

let mut builder = builder;
if should_send_checksum && progress.is_last_chunk() {
let computed = self.payload.lock().await.final_checksum();
if let Some(crc32c) = computed.crc32c {
let bytes = crc32c.to_be_bytes();
let encoded = base64::prelude::BASE64_STANDARD.encode(bytes);
builder = builder.header("x-goog-hash", format!("crc32c={encoded}"));
}
}

Ok(builder.body(progress.put_body()))
}

Expand Down
13 changes: 12 additions & 1 deletion src/storage/src/storage/perform_upload/buffered/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub struct InProgressUpload {
///
/// When getting data from the source stream we may retrieve more data.
remainder: VecDeque<bytes::Bytes>,
/// Indicates if the source stream has ended.
source_ended: bool,
}

struct Summary<'a>(&'a VecDeque<bytes::Bytes>);
Expand Down Expand Up @@ -102,6 +104,14 @@ impl InProgressUpload {
self.persisted_size.is_none_or(|x| x != self.offset)
}

pub fn is_last_chunk(&self) -> bool {
self.source_ended
|| self
.hint
.exact()
.is_some_and(|len| self.offset + self.buffer_size as u64 == len)
}

pub async fn next_buffer<S>(&mut self, payload: &mut S) -> Result<()>
where
S: StreamingSource,
Expand Down Expand Up @@ -146,6 +156,7 @@ impl InProgressUpload {
}
self.buffer = buffer;
self.buffer_size = size;
self.source_ended = true;
Ok(())
}

Expand All @@ -154,7 +165,7 @@ impl InProgressUpload {
(0, 0, Some(len)) => format!("bytes */{len}"),
(n, o, Some(len)) => format!("bytes {o}-{}/{len}", o + n - 1),
(0, o, None) => format!("bytes */{o}"),
(n, o, None) if n < self.target_size as u64 => {
(n, o, None) if n < self.target_size as u64 || self.source_ended => {
format!("bytes {o}-{}/{}", o + n - 1, o + n)
}
(n, o, _) => format!("bytes {o}-{}/*", o + n - 1),
Expand Down
145 changes: 145 additions & 0 deletions src/storage/src/storage/perform_upload/tests/checksums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ mod buffered_single_shot {

mod buffered_resumable {
use super::*;
use base64::Engine;

fn prepare_server(body: Value) -> Server {
super::resumable_server(body)
Expand Down Expand Up @@ -126,6 +127,55 @@ mod buffered_resumable {
Ok(())
}

#[tokio::test]
async fn computed_match_sends_checksum() -> Result {
let server = Server::run();
let session = server.url("/upload/session/test-only-001");
let path = session.path().to_string();
server.expect(
Expectation::matching(all_of![
request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
request::query(url_decoded(contains(("name", "test-object")))),
request::query(url_decoded(contains(("uploadType", "resumable")))),
])
.respond_with(status_code(200).append_header("location", session.to_string())),
);
let len = VEXING.len();
let crc32c = vexing_crc32c();
let bytes = crc32c.to_be_bytes();
let encoded = base64::prelude::BASE64_STANDARD.encode(bytes);

server.expect(
Expectation::matching(all_of![
request::method_path("PUT", path.clone()),
request::headers(contains((
"content-range",
format!("bytes 0-{}/{len}", len - 1)
))),
request::headers(contains(("x-goog-hash", format!("crc32c={encoded}"))))
])
.respond_with(
status_code(200)
.append_header("content-type", "application/json")
.body(good_checksums_body().to_string()),
),
);

let client = test_builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_resumable_upload_threshold(0_usize)
.build()
.await?;

let object = client
.write_object("projects/_/buckets/test-bucket", "test-object", VEXING)
.send_buffered()
.await?;

assert_eq!(object.name, "test-object");
Ok(())
}

#[tokio::test]
async fn precomputed_mismatch() -> Result {
let server = prepare_server(bad_checksums_body());
Expand Down Expand Up @@ -157,6 +207,101 @@ mod buffered_resumable {
assert_eq!(object.name, "test-object");
Ok(())
}

#[tokio::test]
async fn resumed_computed_match_does_not_send_checksum() -> Result {
use crate::streaming_source::tests::UnknownSize;

const QUANTUM: usize = 256 * 1024;
let mut data = vec![0_u8; QUANTUM];
data.extend_from_slice(VEXING.as_bytes());
let full_len = data.len();

let crc = crc32c::crc32c(&data);
let crc_base64 = base64::prelude::BASE64_STANDARD.encode(crc.to_be_bytes());
let md5 = md5::compute(&data);
let md5_base64 = base64::prelude::BASE64_STANDARD.encode(md5.0);

let server = Server::run();
let session = server.url("/upload/session/test-only-001");
let path = session.path().to_string();

server.expect(
Expectation::matching(all_of![
request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
request::query(url_decoded(contains(("name", "test-object")))),
request::query(url_decoded(contains(("uploadType", "resumable")))),
])
.respond_with(status_code(200).append_header("location", session.to_string())),
);

// 1. First PUT fails with 429
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", path.clone()),
request::headers(contains((
"content-range",
format!("bytes 0-{}/*", QUANTUM - 1)
)))
])
.respond_with(status_code(429).body("try-again")),
);

// 2. Query returns that the first chunk was persisted
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", path.clone()),
request::headers(contains(("content-range", "bytes */*"))),
request::headers(contains(("content-length", "0"))),
])
.respond_with(
status_code(308).append_header("range", format!("bytes=0-{}", QUANTUM - 1)),
),
);

// 3. Second PUT (last chunk) should NOT contain x-goog-hash
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", path.clone()),
request::headers(contains((
"content-range",
format!("bytes {}-{}/{}", QUANTUM, full_len - 1, full_len)
))),
not(request::headers(contains(key("x-goog-hash"))))
])
.respond_with(
status_code(200)
.append_header("content-type", "application/json")
.body(
json!({
"bucket": "projects/_/buckets/test-bucket",
"name": "test-object",
"crc32c": crc_base64,
"md5Hash": md5_base64,
"size": full_len,
})
.to_string(),
),
),
);

let client = test_builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_resumable_upload_threshold(0_usize)
.with_resumable_upload_buffer_size(QUANTUM)
.build()
.await?;

let payload = UnknownSize::new(BytesSource::new(bytes::Bytes::from(data)));

let object = client
.write_object("projects/_/buckets/test-bucket", "test-object", payload)
.send_buffered()
.await?;

assert_eq!(object.name, "test-object");
Ok(())
}
}

mod unbuffered_single_shot {
Expand Down
Loading