Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bindings/java/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ fn make_write_options<'a>(
if_none_match: convert::read_string_field(env, options, "ifNoneMatch")?,
if_not_exists: convert::read_bool_field(env, options, "ifNotExists").unwrap_or_default(),
user_metadata: convert::read_map_field(env, options, "userMetadata")?,
expires: None,
concurrent,
chunk: convert::read_jlong_field_to_usize(env, options, "chunk")?,
})
Expand Down
1 change: 1 addition & 0 deletions bindings/nodejs/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ impl From<WriteOptions> for opendal::options::WriteOptions {
if_match: value.if_match,
if_none_match: value.if_none_match,
if_not_exists: value.if_not_exists.unwrap_or_default(),
expires: None,
concurrent: value.concurrent.unwrap_or_default() as usize,
}
}
Expand Down
1 change: 1 addition & 0 deletions bindings/python/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ impl From<WriteOptions> for ocore::options::WriteOptions {
if_match: opts.if_match,
if_none_match: opts.if_none_match,
if_not_exists: opts.if_not_exists.unwrap_or(false),
expires: None,
}
}
}
Expand Down
25 changes: 25 additions & 0 deletions core/core/src/layers/correctness_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ impl<A: Access> LayeredAccess for CorrectnessAccessor<A> {
return Err(err);
}
}
if args.expires().is_some() && !capability.write_with_expires {
return Err(new_unsupported_error(
&self.info,
Operation::Write,
"expires",
));
}

self.inner.write(path, args).await
}
Expand Down Expand Up @@ -460,6 +467,24 @@ mod tests {
});
let res = op.writer_with("path").append(true).await;
assert!(res.is_ok());

let res = op
.write_with("path", "".as_bytes())
.expires(Duration::from_secs(60))
.await;
assert!(res.is_err());
assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);

let op = new_test_operator(Capability {
write: true,
write_with_expires: true,
..Default::default()
});
let res = op
.write_with("path", "".as_bytes())
.expires(Duration::from_secs(60))
.await;
assert!(res.is_ok());
}

#[tokio::test]
Expand Down
13 changes: 13 additions & 0 deletions core/core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@ pub struct OpWrite {
if_none_match: Option<String>,
if_not_exists: bool,
user_metadata: Option<HashMap<String, String>>,
expires: Option<Duration>,
}

impl OpWrite {
Expand Down Expand Up @@ -833,6 +834,17 @@ impl OpWrite {
pub fn user_metadata(&self) -> Option<&HashMap<String, String>> {
self.user_metadata.as_ref()
}

/// Set the expiration duration of the op.
pub fn with_expires(mut self, expires: Duration) -> Self {
self.expires = Some(expires);
self
}

/// Get the expiration duration from the op.
pub fn expires(&self) -> Option<Duration> {
self.expires
}
}

/// Args for `writer` operation.
Expand Down Expand Up @@ -884,6 +896,7 @@ impl From<options::WriteOptions> for (OpWrite, OpWriter) {
if_none_match: value.if_none_match,
if_not_exists: value.if_not_exists,
user_metadata: value.user_metadata,
expires: value.expires,
},
OpWriter { chunk: value.chunk },
)
Expand Down
2 changes: 2 additions & 0 deletions core/core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ pub struct Capability {
pub write_with_if_not_exists: bool,
/// Indicates if custom user metadata can be attached during write operations.
pub write_with_user_metadata: bool,
/// Indicates if write operations can expire the object after a duration.
pub write_with_expires: bool,
/// Maximum size supported for multipart uploads.
/// For example, AWS S3 supports up to 5GiB per part in multipart uploads.
pub write_multi_max_size: Option<usize>,
Expand Down
48 changes: 48 additions & 0 deletions core/core/src/types/operator/operator_futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,29 @@ impl<F: Future<Output = Result<Metadata>>> FutureWrite<F> {
self.args.0.user_metadata = Some(HashMap::from_iter(data));
self
}

/// Sets a duration after which the written object should expire.
///
/// Refer to [`options::WriteOptions::expires`] for more details.
///
/// ### Example
///
/// ```
/// # use opendal_core::Result;
/// # use opendal_core::Operator;
/// # use std::time::Duration;
/// # async fn test(op: Operator) -> Result<()> {
/// let _ = op
/// .write_with("path/to/file", vec![0; 4096])
/// .expires(Duration::from_secs(60))
/// .await?;
/// # Ok(())
/// # }
/// ```
pub fn expires(mut self, v: Duration) -> Self {
self.args.0.expires = Some(v);
self
}
}

/// Future that generated by [`Operator::writer_with`].
Expand Down Expand Up @@ -1260,6 +1283,31 @@ impl<F: Future<Output = Result<Writer>>> FutureWriter<F> {
self.args.user_metadata = Some(HashMap::from_iter(data));
self
}

/// Sets a duration after which the written object should expire.
///
/// Refer to [`options::WriteOptions::expires`] for more details.
///
/// ### Example
///
/// ```
/// # use opendal_core::Result;
/// # use opendal_core::Operator;
/// # use std::time::Duration;
/// # async fn test(op: Operator) -> Result<()> {
/// let mut w = op
/// .writer_with("path/to/file")
/// .expires(Duration::from_secs(60))
/// .await?;
/// w.write(vec![0; 4096]).await?;
/// w.close().await?;
/// # Ok(())
/// # }
/// ```
pub fn expires(mut self, v: Duration) -> Self {
self.args.expires = Some(v);
self
}
}

/// Future that generated by [`Operator::delete_with`].
Expand Down
17 changes: 16 additions & 1 deletion core/core/src/types/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Options module provides options definitions for operations.

use crate::raw::{BytesRange, Timestamp};
use crate::raw::{BytesRange, Duration, Timestamp};
use std::collections::HashMap;

/// Options for delete operations.
Expand Down Expand Up @@ -420,6 +420,21 @@ pub struct WriteOptions {
/// This metadata can be retrieved later when reading the object.
pub user_metadata: Option<HashMap<String, String>>,

/// Sets a duration after which the written object should expire.
///
/// ### Capability
///
/// Check [`Capability::write_with_expires`] before using this feature.
///
/// ### Behavior
///
/// - If supported, the target object will expire after the provided duration.
/// - Services without native expiration support will return an unsupported error.
/// - If a service also has a configured default TTL, this per-write value takes precedence.
///
/// This operation is useful for cache-like backends such as Redis and Memcached.
pub expires: Option<Duration>,

/// Sets If-Match header for this write request.
///
/// ### Capability
Expand Down
8 changes: 6 additions & 2 deletions core/services/memcached/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ impl MemcachedBackend {
stat: true,
write: true,
write_can_empty: true,
write_with_expires: true,
delete: true,
shared: true,
..Default::default()
Expand Down Expand Up @@ -251,9 +252,12 @@ impl Access for MemcachedBackend {
Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
}

async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let p = build_abs_path(&self.root, path);
Ok((RpWrite::new(), MemcachedWriter::new(self.core.clone(), p)))
Ok((
RpWrite::new(),
MemcachedWriter::new(self.core.clone(), p, args),
))
}

async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Expand Down
7 changes: 3 additions & 4 deletions core/services/memcached/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,15 @@ impl MemcachedCore {
Ok(result.map(Buffer::from))
}

pub async fn set(&self, key: &str, value: Buffer) -> Result<()> {
pub async fn set(&self, key: &str, value: Buffer, expires: Option<Duration>) -> Result<()> {
let mut conn = self.conn().await?;
let ttl = expires.or(self.default_ttl);

conn.set(
&percent_encode_path(key),
&value.to_vec(),
// Set expiration to 0 if ttl not set.
self.default_ttl
.map(|v| v.as_secs() as u32)
.unwrap_or_default(),
ttl.map(|v| v.as_secs() as u32).unwrap_or_default(),
)
.await
}
Expand Down
8 changes: 5 additions & 3 deletions core/services/memcached/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@

use std::sync::Arc;

use opendal_core::raw::oio;
use opendal_core::raw::{OpWrite, oio};
use opendal_core::*;

use super::core::*;

pub struct MemcachedWriter {
core: Arc<MemcachedCore>,
path: String,
op: OpWrite,
buffer: oio::QueueBuf,
}

impl MemcachedWriter {
pub fn new(core: Arc<MemcachedCore>, path: String) -> Self {
pub fn new(core: Arc<MemcachedCore>, path: String, op: OpWrite) -> Self {
Self {
core,
path,
op,
buffer: oio::QueueBuf::new(),
}
}
Expand All @@ -47,7 +49,7 @@ impl oio::Write for MemcachedWriter {
async fn close(&mut self) -> Result<Metadata> {
let buf = self.buffer.clone().collect();
let length = buf.len() as u64;
self.core.set(&self.path, buf).await?;
self.core.set(&self.path, buf, self.op.expires()).await?;

let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
Ok(meta)
Expand Down
5 changes: 3 additions & 2 deletions core/services/redis/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ impl RedisBackend {
delete: true,
stat: true,
write_can_empty: true,
write_with_expires: true,
shared: true,
..Default::default()
});
Expand Down Expand Up @@ -361,9 +362,9 @@ impl Access for RedisBackend {
Ok((RpRead::new(), buffer))
}

async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let p = build_abs_path(&self.root, path);
Ok((RpWrite::new(), RedisWriter::new(self.core.clone(), p)))
Ok((RpWrite::new(), RedisWriter::new(self.core.clone(), p, args)))
}

async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Expand Down
4 changes: 2 additions & 2 deletions core/services/redis/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ impl RedisCore {
Ok(result.map(Buffer::from))
}

pub async fn set(&self, key: &str, value: Buffer) -> Result<()> {
pub async fn set(&self, key: &str, value: Buffer, expires: Option<Duration>) -> Result<()> {
let mut conn = self.conn().await?;
let value = value.to_vec();
if let Some(dur) = self.default_ttl {
if let Some(dur) = expires.or(self.default_ttl) {
let _: () = conn
.set_ex(key, value, dur.as_secs())
.await
Expand Down
6 changes: 4 additions & 2 deletions core/services/redis/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ use super::core::RedisCore;
pub struct RedisWriter {
core: Arc<RedisCore>,
path: String,
op: OpWrite,
buffer: oio::QueueBuf,
}

impl RedisWriter {
pub fn new(core: Arc<RedisCore>, path: String) -> Self {
pub fn new(core: Arc<RedisCore>, path: String, op: OpWrite) -> Self {
Self {
core,
path,
op,
buffer: oio::QueueBuf::new(),
}
}
Expand All @@ -47,7 +49,7 @@ impl oio::Write for RedisWriter {
async fn close(&mut self) -> Result<Metadata> {
let buf = self.buffer.clone().collect();
let length = buf.len() as u64;
self.core.set(&self.path, buf).await?;
self.core.set(&self.path, buf, self.op.expires()).await?;

let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
Ok(meta)
Expand Down
23 changes: 23 additions & 0 deletions core/tests/behavior/async_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::collections::HashMap;
use std::time::Duration;

use anyhow::Result;
use bytes::Bytes;
Expand Down Expand Up @@ -46,6 +47,7 @@ pub fn tests(op: &Operator, tests: &mut Vec<Trial>) {
test_write_with_if_not_exists,
test_write_with_if_match,
test_write_with_user_metadata,
test_write_with_expires,
test_write_returns_metadata,
test_writer_write,
test_writer_write_with_overwrite,
Expand Down Expand Up @@ -249,6 +251,27 @@ pub async fn test_write_with_user_metadata(op: Operator) -> Result<()> {
Ok(())
}

/// Write a single file with expires should expire the object.
pub async fn test_write_with_expires(op: Operator) -> Result<()> {
if !op.info().full_capability().write_with_expires {
return Ok(());
}

let (path, content, size) = TEST_FIXTURE.new_file(op.clone());

op.write_with(&path, content)
.expires(Duration::from_secs(1))
.await?;

let meta = op.stat(&path).await.expect("stat must succeed");
assert_eq!(meta.content_length(), size as u64);

tokio::time::sleep(Duration::from_secs(2)).await;
assert!(!op.exists(&path).await?);

Ok(())
}

pub async fn test_write_returns_metadata(op: Operator) -> Result<()> {
let (path, content, _) = TEST_FIXTURE.new_file(op.clone());

Expand Down
Loading