Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions core/core/src/layers/correctness_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,38 @@ impl<A: Access> LayeredAccess for CorrectnessAccessor<A> {
"if_match",
));
}
if args.source_if_match().is_some() && !capability.copy_with_source_if_match {
return Err(new_unsupported_error(
&self.info,
Operation::Copy,
"source_if_match",
));
}
if args.source_if_none_match().is_some() && !capability.copy_with_source_if_none_match {
return Err(new_unsupported_error(
&self.info,
Operation::Copy,
"source_if_none_match",
));
}
if args.source_if_modified_since().is_some()
&& !capability.copy_with_source_if_modified_since
{
return Err(new_unsupported_error(
&self.info,
Operation::Copy,
"source_if_modified_since",
));
}
if args.source_if_unmodified_since().is_some()
&& !capability.copy_with_source_if_unmodified_since
{
return Err(new_unsupported_error(
&self.info,
Operation::Copy,
"source_if_unmodified_since",
));
}

self.inner.copy(from, to, args, opts).await
}
Expand Down
64 changes: 64 additions & 0 deletions core/core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,10 @@ impl From<options::WriteOptions> for (OpWrite, OpWriter) {
pub struct OpCopy {
if_not_exists: bool,
if_match: Option<String>,
source_if_match: Option<String>,
source_if_none_match: Option<String>,
source_if_modified_since: Option<Timestamp>,
source_if_unmodified_since: Option<Timestamp>,
}

impl OpCopy {
Expand Down Expand Up @@ -930,6 +934,62 @@ impl OpCopy {
pub fn if_match(&self) -> Option<&str> {
self.if_match.as_deref()
}

/// Set the source_if_match condition for the operation.
///
/// When set, the copy operation will only proceed if the source object's
/// ETag matches the given value.
pub fn with_source_if_match(mut self, source_if_match: impl Into<String>) -> Self {
self.source_if_match = Some(source_if_match.into());
self
}

/// Get source_if_match condition.
pub fn source_if_match(&self) -> Option<&str> {
self.source_if_match.as_deref()
}

/// Set the source_if_none_match condition for the operation.
///
/// When set, the copy operation will only proceed if the source object's
/// ETag does not match the given value.
pub fn with_source_if_none_match(mut self, source_if_none_match: impl Into<String>) -> Self {
self.source_if_none_match = Some(source_if_none_match.into());
self
}

/// Get source_if_none_match condition.
pub fn source_if_none_match(&self) -> Option<&str> {
self.source_if_none_match.as_deref()
}

/// Set the source_if_modified_since condition for the operation.
///
/// When set, the copy operation will only proceed if the source object has
/// been modified after the given timestamp.
pub fn with_source_if_modified_since(mut self, v: Timestamp) -> Self {
self.source_if_modified_since = Some(v);
self
}

/// Get source_if_modified_since condition.
pub fn source_if_modified_since(&self) -> Option<Timestamp> {
self.source_if_modified_since
}

/// Set the source_if_unmodified_since condition for the operation.
///
/// When set, the copy operation will only proceed if the source object has
/// not been modified after the given timestamp.
pub fn with_source_if_unmodified_since(mut self, v: Timestamp) -> Self {
self.source_if_unmodified_since = Some(v);
self
}

/// Get source_if_unmodified_since condition.
pub fn source_if_unmodified_since(&self) -> Option<Timestamp> {
self.source_if_unmodified_since
}
}

/// Args for `copier` operation.
Expand Down Expand Up @@ -986,6 +1046,10 @@ impl From<options::CopyOptions> for (OpCopy, OpCopier) {
OpCopy {
if_not_exists: value.if_not_exists,
if_match: value.if_match,
source_if_match: value.source_if_match,
source_if_none_match: value.source_if_none_match,
source_if_modified_since: value.source_if_modified_since,
source_if_unmodified_since: value.source_if_unmodified_since,
},
OpCopier {
concurrent: value.concurrent.max(1),
Expand Down
8 changes: 8 additions & 0 deletions core/core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ pub struct Capability {
pub copy_with_if_not_exists: bool,
/// Indicates if conditional copy operations with if-match are supported.
pub copy_with_if_match: bool,
/// Indicates if conditional copy operations with source-side if-match are supported.
pub copy_with_source_if_match: bool,
/// Indicates if conditional copy operations with source-side if-none-match are supported.
pub copy_with_source_if_none_match: bool,
/// Indicates if conditional copy operations with source-side if-modified-since are supported.
pub copy_with_source_if_modified_since: bool,
/// Indicates if conditional copy operations with source-side if-unmodified-since are supported.
pub copy_with_source_if_unmodified_since: bool,
/// Indicates if copy operations can be split into multiple server-side tasks.
pub copy_can_multi: bool,
/// Maximum size supported for segmented copy tasks.
Expand Down
36 changes: 36 additions & 0 deletions core/core/src/types/operator/operator_futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,42 @@ impl<F: Future<Output = Result<Metadata>>> FutureCopy<F> {
self
}

/// Sets the condition that copy operation will succeed only if the source
/// object currently has the given ETag.
///
/// Refer to [`options::CopyOptions::source_if_match`] for more details.
pub fn source_if_match(mut self, etag: &str) -> Self {
self.args.0.source_if_match = Some(etag.to_string());
self
}

/// Sets the condition that copy operation will succeed only if the source
/// object's ETag does not match the given value.
///
/// Refer to [`options::CopyOptions::source_if_none_match`] for more details.
pub fn source_if_none_match(mut self, etag: &str) -> Self {
self.args.0.source_if_none_match = Some(etag.to_string());
self
}

/// Sets the condition that copy operation will succeed only if the source
/// object has been modified after the given timestamp.
///
/// Refer to [`options::CopyOptions::source_if_modified_since`] for more details.
pub fn source_if_modified_since(mut self, v: Timestamp) -> Self {
self.args.0.source_if_modified_since = Some(v);
self
}

/// Sets the condition that copy operation will succeed only if the source
/// object has not been modified after the given timestamp.
///
/// Refer to [`options::CopyOptions::source_if_unmodified_since`] for more details.
pub fn source_if_unmodified_since(mut self, v: Timestamp) -> Self {
self.args.0.source_if_unmodified_since = Some(v);
self
}

/// Sets concurrent copy operations for this copy.
///
/// Refer to [`options::CopyOptions::concurrent`] for more details.
Expand Down
55 changes: 55 additions & 0 deletions core/core/src/types/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,61 @@ pub struct CopyOptions {
/// destination object's ETag matches the given value.
pub if_match: Option<String>,

/// Sets the condition that copy operation will succeed only if the source
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S3 copy supports multiple conditions, do we want to support them all, so we could declare we support S3 condition copy with source check?

x-amz-copy-source: CopySource
x-amz-copy-source-if-match: CopySourceIfMatch
x-amz-copy-source-if-modified-since: CopySourceIfModifiedSince
x-amz-copy-source-if-none-match: CopySourceIfNoneMatch
x-amz-copy-source-if-unmodified-since: CopySourceIfUnmodifiedSince

ref: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html#API_CopyObject_RequestSyntax

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, makes sense, I will extend this PR to support all of them

/// object currently has the given ETag.
///
/// ### Capability
///
/// Check [`Capability::copy_with_source_if_match`] before using this feature.
///
/// ### Behavior
///
/// - If supported, the copy operation will only succeed when the source
/// object's ETag matches the given value.
pub source_if_match: Option<String>,

/// Sets the condition that copy operation will succeed only if the source
/// object's ETag does not match the given value.
///
/// ### Capability
///
/// Check [`Capability::copy_with_source_if_none_match`] before using this
/// feature.
///
/// ### Behavior
///
/// - If supported, the copy operation will only succeed when the source
/// object's ETag does not match the given value.
pub source_if_none_match: Option<String>,

/// Sets the condition that copy operation will succeed only if the source
/// object has been modified after the given timestamp.
///
/// ### Capability
///
/// Check [`Capability::copy_with_source_if_modified_since`] before using
/// this feature.
///
/// ### Behavior
///
/// - If supported, the copy operation will only succeed when the source
/// object has been modified after the given timestamp.
pub source_if_modified_since: Option<Timestamp>,

/// Sets the condition that copy operation will succeed only if the source
/// object has not been modified after the given timestamp.
///
/// ### Capability
///
/// Check [`Capability::copy_with_source_if_unmodified_since`] before using
/// this feature.
///
/// ### Behavior
///
/// - If supported, the copy operation will only succeed when the source
/// object has not been modified after the given timestamp.
pub source_if_unmodified_since: Option<Timestamp>,

/// Known content length of the source object.
///
/// This is an execution hint that allows OpenDAL to avoid extra metadata
Expand Down
32 changes: 32 additions & 0 deletions core/layers/capability-check/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,38 @@ impl<A: Access> LayeredAccess for CapabilityAccessor<A> {
"if_not_exists",
));
}
if args.source_if_match().is_some() && !capability.copy_with_source_if_match {
return Err(new_unsupported_error(
self.info.as_ref(),
Operation::Copy,
"source_if_match",
));
}
if args.source_if_none_match().is_some() && !capability.copy_with_source_if_none_match {
return Err(new_unsupported_error(
self.info.as_ref(),
Operation::Copy,
"source_if_none_match",
));
}
if args.source_if_modified_since().is_some()
&& !capability.copy_with_source_if_modified_since
{
return Err(new_unsupported_error(
self.info.as_ref(),
Operation::Copy,
"source_if_modified_since",
));
}
if args.source_if_unmodified_since().is_some()
&& !capability.copy_with_source_if_unmodified_since
{
return Err(new_unsupported_error(
self.info.as_ref(),
Operation::Copy,
"source_if_unmodified_since",
));
}

self.inner.copy(from, to, args, opts).await
}
Expand Down
4 changes: 4 additions & 0 deletions core/services/s3/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,10 @@ impl Builder for S3Builder {
copy_can_multi: true,
copy_with_if_not_exists: true,
copy_with_if_match: true,
copy_with_source_if_match: true,
copy_with_source_if_none_match: true,
copy_with_source_if_modified_since: true,
copy_with_source_if_unmodified_since: true,
// The min multipart size of S3 is 5 MiB.
//
// ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
Expand Down
46 changes: 46 additions & 0 deletions core/services/s3/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ use opendal_core::*;
pub mod constants {
pub const X_AMZ_COPY_SOURCE: &str = "x-amz-copy-source";
pub const X_AMZ_COPY_SOURCE_RANGE: &str = "x-amz-copy-source-range";
pub const X_AMZ_COPY_SOURCE_IF_MATCH: &str = "x-amz-copy-source-if-match";
pub const X_AMZ_COPY_SOURCE_IF_NONE_MATCH: &str = "x-amz-copy-source-if-none-match";
pub const X_AMZ_COPY_SOURCE_IF_MODIFIED_SINCE: &str = "x-amz-copy-source-if-modified-since";
pub const X_AMZ_COPY_SOURCE_IF_UNMODIFIED_SINCE: &str = "x-amz-copy-source-if-unmodified-since";

pub const X_AMZ_SERVER_SIDE_ENCRYPTION: &str = "x-amz-server-side-encryption";
pub const X_AMZ_SERVER_REQUEST_PAYER: (&str, &str) = ("x-amz-request-payer", "requester");
Expand Down Expand Up @@ -669,6 +673,27 @@ impl S3Core {
if let Some(if_match) = args.if_match() {
req = req.header(IF_MATCH, if_match);
}
if let Some(source_if_match) = args.source_if_match() {
req = req.header(constants::X_AMZ_COPY_SOURCE_IF_MATCH, source_if_match);
}
if let Some(source_if_none_match) = args.source_if_none_match() {
req = req.header(
constants::X_AMZ_COPY_SOURCE_IF_NONE_MATCH,
source_if_none_match,
);
}
if let Some(v) = args.source_if_modified_since() {
req = req.header(
constants::X_AMZ_COPY_SOURCE_IF_MODIFIED_SINCE,
v.format_http_date(),
);
}
if let Some(v) = args.source_if_unmodified_since() {
req = req.header(
constants::X_AMZ_COPY_SOURCE_IF_UNMODIFIED_SINCE,
v.format_http_date(),
);
}

// Set SSE headers.
req = self.insert_sse_headers(req, true);
Expand Down Expand Up @@ -1119,6 +1144,27 @@ impl S3Core {
if let Some(if_match) = args.if_match() {
req = req.header(IF_MATCH, if_match);
}
if let Some(source_if_match) = args.source_if_match() {
req = req.header(constants::X_AMZ_COPY_SOURCE_IF_MATCH, source_if_match);
}
if let Some(source_if_none_match) = args.source_if_none_match() {
req = req.header(
constants::X_AMZ_COPY_SOURCE_IF_NONE_MATCH,
source_if_none_match,
);
}
if let Some(v) = args.source_if_modified_since() {
req = req.header(
constants::X_AMZ_COPY_SOURCE_IF_MODIFIED_SINCE,
v.format_http_date(),
);
}
if let Some(v) = args.source_if_unmodified_since() {
req = req.header(
constants::X_AMZ_COPY_SOURCE_IF_UNMODIFIED_SINCE,
v.format_http_date(),
);
}

// Set request payer header if enabled.
req = self.insert_request_payer_header(req);
Expand Down
Loading
Loading