Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 128 additions & 14 deletions crates/iceberg/src/io/storage/opendal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

//! OpenDAL-based storage implementation.

use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
#[cfg(feature = "storage-azdls")]
Expand Down Expand Up @@ -122,20 +123,24 @@ impl StorageFactory for OpenDalStorageFactory {
configured_scheme: "s3".to_string(),
config: s3_config_parse(config.props().clone())?.into(),
customized_credential_load: customized_credential_load.clone(),
operator_cache: default_operator_cache(),
})),
#[cfg(feature = "storage-gcs")]
OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
config: gcs_config_parse(config.props().clone())?.into(),
operator_cache: default_operator_cache(),
})),
#[cfg(feature = "storage-oss")]
OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
config: oss_config_parse(config.props().clone())?.into(),
operator_cache: default_operator_cache(),
})),
#[cfg(feature = "storage-azdls")]
OpenDalStorageFactory::Azdls { configured_scheme } => {
Ok(Arc::new(OpenDalStorage::Azdls {
configured_scheme: configured_scheme.clone(),
config: azdls_config_parse(config.props().clone())?.into(),
operator_cache: default_operator_cache(),
}))
}
#[cfg(all(
Expand All @@ -160,6 +165,11 @@ fn default_memory_operator() -> Operator {
memory_config_build().expect("Failed to create default memory operator")
}

/// Default empty operator cache for serde deserialization.
fn default_operator_cache() -> Arc<Mutex<HashMap<String, Operator>>> {
Arc::new(Mutex::new(HashMap::new()))
}

/// OpenDAL-based storage implementation.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum OpenDalStorage {
Expand All @@ -180,18 +190,27 @@ pub enum OpenDalStorage {
/// Custom AWS credential loader.
#[serde(skip)]
customized_credential_load: Option<CustomAwsCredentialLoader>,
/// Reuses operators across calls to avoid rebuilding credential chains.
#[serde(skip, default = "default_operator_cache")]
operator_cache: Arc<Mutex<HashMap<String, Operator>>>,
},
/// GCS storage variant.
#[cfg(feature = "storage-gcs")]
Gcs {
/// GCS configuration.
config: Arc<GcsConfig>,
/// Reuses operators across calls to avoid rebuilding credential chains.
#[serde(skip, default = "default_operator_cache")]
operator_cache: Arc<Mutex<HashMap<String, Operator>>>,
},
/// OSS storage variant.
#[cfg(feature = "storage-oss")]
Oss {
/// OSS configuration.
config: Arc<OssConfig>,
/// Reuses operators across calls to avoid rebuilding credential chains.
#[serde(skip, default = "default_operator_cache")]
operator_cache: Arc<Mutex<HashMap<String, Operator>>>,
},
/// Azure Data Lake Storage variant.
/// Expects paths of the form
Expand All @@ -206,9 +225,32 @@ pub enum OpenDalStorage {
configured_scheme: AzureStorageScheme,
/// Azure DLS configuration.
config: Arc<AzdlsConfig>,
/// Reuses operators across calls to avoid rebuilding credential chains.
#[serde(skip, default = "default_operator_cache")]
operator_cache: Arc<Mutex<HashMap<String, Operator>>>,
},
}

/// Returns a cached operator for the given key, or creates and caches a new one.
fn get_or_create_operator(
cache: &Mutex<HashMap<String, Operator>>,
key: &str,
build: impl FnOnce() -> Result<Operator>,
) -> Result<Operator> {
let mut guard = cache.lock().map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Operator cache lock poisoned: {e}"),
)
})?;
if let Some(op) = guard.get(key) {
return Ok(op.clone());
}
let op = build()?.layer(RetryLayer::new());
guard.insert(key.to_string(), op.clone());
Ok(op)
}

impl OpenDalStorage {
/// Convert iceberg config to opendal config.
///
Expand All @@ -230,21 +272,25 @@ impl OpenDalStorage {
customized_credential_load: extensions
.get::<CustomAwsCredentialLoader>()
.map(Arc::unwrap_or_clone),
operator_cache: default_operator_cache(),
}),
#[cfg(feature = "storage-gcs")]
Scheme::Gcs => Ok(Self::Gcs {
config: gcs_config_parse(props)?.into(),
operator_cache: default_operator_cache(),
}),
#[cfg(feature = "storage-oss")]
Scheme::Oss => Ok(Self::Oss {
config: oss_config_parse(props)?.into(),
operator_cache: default_operator_cache(),
}),
#[cfg(feature = "storage-azdls")]
Scheme::Azdls => {
let scheme = scheme_str.parse::<AzureStorageScheme>()?;
Ok(Self::Azdls {
config: azdls_config_parse(props)?.into(),
configured_scheme: scheme,
operator_cache: default_operator_cache(),
})
}
// Update doc on [`FileIO`] when adding new schemes.
Expand Down Expand Up @@ -276,15 +322,16 @@ impl OpenDalStorage {
let (operator, relative_path): (Operator, &str) = match self {
#[cfg(feature = "storage-memory")]
OpenDalStorage::Memory(op) => {
let op = op.clone().layer(RetryLayer::new());
if let Some(stripped) = path.strip_prefix("memory:/") {
(op.clone(), stripped)
(op, stripped)
} else {
(op.clone(), &path[1..])
(op, &path[1..])
}
}
#[cfg(feature = "storage-fs")]
OpenDalStorage::LocalFs => {
let op = fs_config_build()?;
let op = fs_config_build()?.layer(RetryLayer::new());
if let Some(stripped) = path.strip_prefix("file:/") {
(op, stripped)
} else {
Expand All @@ -296,11 +343,21 @@ impl OpenDalStorage {
configured_scheme,
config,
customized_credential_load,
operator_cache,
} => {
let op = s3_config_build(config, customized_credential_load, path)?;
let url = url::Url::parse(path)?;
let bucket = url.host_str().ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Invalid s3 url: {path}, missing bucket"),
)
})?;

let op = get_or_create_operator(operator_cache, bucket, || {
s3_config_build(config, customized_credential_load, path)
})?;
let op_info = op.info();

// Check prefix of s3 path.
let prefix = format!("{}://{}/", configured_scheme, op_info.name());
if path.starts_with(&prefix) {
(op, &path[prefix.len()..])
Expand All @@ -312,8 +369,21 @@ impl OpenDalStorage {
}
}
#[cfg(feature = "storage-gcs")]
OpenDalStorage::Gcs { config } => {
let operator = gcs_config_build(config, path)?;
OpenDalStorage::Gcs {
config,
operator_cache,
} => {
let url = url::Url::parse(path)?;
let bucket = url.host_str().ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Invalid gcs url: {path}, missing bucket"),
)
})?;

let operator = get_or_create_operator(operator_cache, bucket, || {
gcs_config_build(config, path)
})?;
let prefix = format!("gs://{}/", operator.info().name());
if path.starts_with(&prefix) {
(operator, &path[prefix.len()..])
Expand All @@ -325,8 +395,21 @@ impl OpenDalStorage {
}
}
#[cfg(feature = "storage-oss")]
OpenDalStorage::Oss { config } => {
let op = oss_config_build(config, path)?;
OpenDalStorage::Oss {
config,
operator_cache,
} => {
let url = url::Url::parse(path)?;
let bucket = url.host_str().ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Invalid oss url: {path}, missing bucket"),
)
})?;

let op = get_or_create_operator(operator_cache, bucket, || {
oss_config_build(config, path)
})?;
let prefix = format!("oss://{}/", op.info().name());
if path.starts_with(&prefix) {
(op, &path[prefix.len()..])
Expand All @@ -341,7 +424,41 @@ impl OpenDalStorage {
OpenDalStorage::Azdls {
configured_scheme,
config,
} => azdls_create_operator(path, config, configured_scheme)?,
operator_cache,
} => {
let url = url::Url::parse(path)?;
let filesystem = url.username().to_string();

let op = {
let guard = operator_cache.lock().map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Operator cache lock poisoned: {e}"),
)
})?;
guard.get(&filesystem).cloned()
};

match op {
Some(op) => {
let relative_path = &path[path.len() - url.path().len()..];
(op, relative_path)
}
None => {
let (op, relative_path) =
azdls_create_operator(path, config, configured_scheme)?;
let op = op.layer(RetryLayer::new());
let mut guard = operator_cache.lock().map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Operator cache lock poisoned: {e}"),
)
})?;
guard.insert(filesystem, op.clone());
(op, relative_path)
}
}
}
#[cfg(all(
not(feature = "storage-s3"),
not(feature = "storage-fs"),
Expand All @@ -357,9 +474,6 @@ impl OpenDalStorage {
}
};

// Transient errors are common for object stores; however there's no
// harm in retrying temporary failures for other storage backends as well.
let operator = operator.layer(RetryLayer::new());
Ok((operator, relative_path))
}

Expand Down
Loading