Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 93 additions & 31 deletions core/services/dropbox/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,30 +74,7 @@ impl Access for DropboxBackend {
let bytes = resp.into_body();
let decoded_response: DropboxMetadataResponse =
serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
let entry_mode: EntryMode = match decoded_response.tag.as_str() {
"file" => EntryMode::FILE,
"folder" => EntryMode::DIR,
_ => EntryMode::Unknown,
};

let mut metadata = Metadata::new(entry_mode);
// Only set last_modified and size if entry_mode is FILE, because Dropbox API
// returns last_modified and size only for files.
// FYI: https://www.dropbox.com/developers/documentation/http/documentation#files-get_metadata
if entry_mode == EntryMode::FILE {
let date_utc_last_modified =
decoded_response.client_modified.parse::<Timestamp>()?;
metadata.set_last_modified(date_utc_last_modified);

if let Some(size) = decoded_response.size {
metadata.set_content_length(size);
} else {
return Err(Error::new(
ErrorKind::Unexpected,
format!("no size found for file {path}"),
));
}
}
let metadata = decoded_response.parse_metadata()?;
Ok(RpStat::new(metadata))
}
_ => Err(parse_error(resp)),
Expand Down Expand Up @@ -139,15 +116,79 @@ impl Access for DropboxBackend {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
Ok((
RpList::default(),
oio::PageLister::new(DropboxLister::new(
let recursive = args.recursive();
let limit = args.limit();

let lister = if path.is_empty() || path == "/" || path.ends_with('/') {
DropboxLister::new(
self.core.clone(),
path.to_string(),
args.recursive(),
args.limit(),
)),
))
recursive,
limit,
Some(dir_entry(path)),
None,
true,
)
} else {
let resp = self.core.dropbox_get_metadata(path).await?;
match resp.status() {
StatusCode::OK => {
let bytes = resp.into_body();
let decoded_response: DropboxMetadataResponse =
serde_json::from_reader(bytes.reader())
.map_err(new_json_deserialize_error)?;

if decoded_response.entry_mode().is_dir() {
DropboxLister::new(
self.core.clone(),
path.to_string(),
recursive,
limit,
Some(dir_entry(path)),
None,
recursive,
)
} else if recursive {
DropboxLister::new(
self.core.clone(),
parent_list_path(path),
true,
limit,
None,
Some(path.to_string()),
true,
)
} else {
DropboxLister::new(
self.core.clone(),
String::new(),
false,
limit,
Some(oio::Entry::new(path, decoded_response.parse_metadata()?)),
None,
false,
)
}
}
_ => {
let err = parse_error(resp);
match err.kind() {
ErrorKind::NotFound => DropboxLister::new(
self.core.clone(),
parent_list_path(path),
recursive,
limit,
None,
Some(path.to_string()),
true,
),
_ => return Err(err),
}
}
}
};

Ok((RpList::default(), oio::PageLister::new(lister)))
}

async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
Expand Down Expand Up @@ -184,3 +225,24 @@ impl Access for DropboxBackend {
}
}
}

fn parent_list_path(path: &str) -> String {
let parent = get_parent(path);
if parent == "/" {
String::new()
} else {
parent.to_string()
}
}

fn dir_entry(path: &str) -> oio::Entry {
let path = if path.is_empty() || path == "/" {
"/".to_string()
} else if path.ends_with('/') {
path.to_string()
} else {
format!("{path}/")
};

oio::Entry::new(&path, Metadata::new(EntryMode::DIR))
}
55 changes: 55 additions & 0 deletions core/services/dropbox/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,61 @@ pub struct DropboxMetadataResponse {
pub size: Option<u64>,
}

impl DropboxMetadataResponse {
pub(crate) fn entry_mode(&self) -> EntryMode {
match self.tag.as_str() {
"file" => EntryMode::FILE,
"folder" => EntryMode::DIR,
_ => EntryMode::Unknown,
}
}

pub(crate) fn parse_metadata(&self) -> Result<Metadata> {
let entry_mode = self.entry_mode();
let mut metadata = Metadata::new(entry_mode);

if let Some(content_hash) = self.content_hash.as_deref() {
metadata.set_etag(content_hash);
}
if let Some(rev) = self.rev.as_deref() {
metadata.set_version(rev);
}

if entry_mode == EntryMode::FILE {
let modified = self
.server_modified
.as_deref()
.unwrap_or(self.client_modified.as_str());
metadata.set_last_modified(modified.parse::<Timestamp>()?);

if let Some(size) = self.size {
metadata.set_content_length(size);
} else {
return Err(Error::new(
ErrorKind::Unexpected,
format!("no size found for file {}", self.path_display),
));
}
}

Ok(metadata)
}

pub(crate) fn entry_path(&self, root: &str) -> String {
let mut path = if self.path_display == root {
"/".to_string()
} else {
build_rel_path(root, &self.path_display)
};

if self.entry_mode().is_dir() && path != "/" && !path.ends_with('/') {
path.push('/');
}

path
}
}

#[derive(Default, Debug, Deserialize)]
#[serde(default)]
pub struct DropboxMetadataFileLockInfo {
Expand Down
63 changes: 39 additions & 24 deletions core/services/dropbox/src/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,37 +26,63 @@ use opendal_core::*;

pub struct DropboxLister {
core: Arc<DropboxCore>,
path: String,
list_path: String,
recursive: bool,
limit: Option<usize>,
self_entry: Option<oio::Entry>,
filter_prefix: Option<String>,
fetch_entries: bool,
}

impl DropboxLister {
pub fn new(
core: Arc<DropboxCore>,
path: String,
list_path: String,
recursive: bool,
limit: Option<usize>,
self_entry: Option<oio::Entry>,
filter_prefix: Option<String>,
fetch_entries: bool,
) -> Self {
Self {
core,
path,
list_path,
recursive,
limit,
self_entry,
filter_prefix,
fetch_entries,
}
}

fn build_entry(&self, entry: DropboxMetadataResponse) -> Result<oio::Entry> {
let path = entry.entry_path(self.core.root.as_str());
let metadata = entry.parse_metadata()?;
Ok(oio::Entry::new(&path, metadata))
}
}

impl oio::PageList for DropboxLister {
async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
if ctx.token.is_empty() {
if let Some(entry) = self.self_entry.clone() {
ctx.entries.push_back(entry);
}

if !self.fetch_entries {
ctx.done = true;
return Ok(());
}
}

// The token is set when obtaining entries and returning `has_more` flag.
// When the token exists, we should retrieve more entries using the Dropbox continue API.
// Refer: https://www.dropbox.com/developers/documentation/http/documentation#files-list_folder-continue
let response = if !ctx.token.is_empty() {
self.core.dropbox_list_continue(&ctx.token).await?
} else {
self.core
.dropbox_list(&self.path, self.recursive, self.limit)
.dropbox_list(&self.list_path, self.recursive, self.limit)
.await?
};

Expand All @@ -79,31 +105,20 @@ impl oio::PageList for DropboxLister {
serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;

for entry in decoded_response.entries {
let entry_mode = match entry.tag.as_str() {
"file" => EntryMode::FILE,
"folder" => EntryMode::DIR,
_ => EntryMode::Unknown,
};

let mut name = entry.name;
let mut meta = Metadata::new(entry_mode);
let entry = self.build_entry(entry)?;

// Dropbox will return folder names that do not end with '/'.
if entry_mode == EntryMode::DIR && !name.ends_with('/') {
name.push('/');
if let Some(prefix) = self.filter_prefix.as_deref() {
if !entry.path().starts_with(prefix) {
continue;
}
}

// The behavior here aligns with Dropbox's stat function.
if entry_mode == EntryMode::FILE {
let date_utc_last_modified = entry.client_modified.parse::<Timestamp>()?;
meta.set_last_modified(date_utc_last_modified);

if let Some(size) = entry.size {
meta.set_content_length(size);
if let Some(self_entry) = self.self_entry.as_ref() {
if self_entry.path() == entry.path() {
continue;
}
}

ctx.entries.push_back(oio::Entry::with(name, meta));
ctx.entries.push_back(entry);
}

if decoded_response.has_more {
Expand Down
28 changes: 1 addition & 27 deletions core/services/dropbox/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,6 @@ impl DropboxWriter {
pub fn new(core: Arc<DropboxCore>, op: OpWrite, path: String) -> Self {
DropboxWriter { core, op, path }
}

fn parse_metadata(decoded_response: DropboxMetadataResponse) -> Result<Metadata> {
let mut metadata = Metadata::default();

if let Some(size) = decoded_response.size {
metadata.set_content_length(size);
}

if let Some(content_hash) = decoded_response.content_hash {
metadata.set_etag(&content_hash);
}

if let Some(rev) = decoded_response.rev {
metadata.set_version(&rev);
}

if let Some(server_modified) = decoded_response.server_modified {
let date_utc = server_modified.parse::<Timestamp>()?;
metadata.set_last_modified(date_utc);
} else {
let date_utc = decoded_response.client_modified.parse::<Timestamp>()?;
metadata.set_last_modified(date_utc);
}

Ok(metadata)
}
}

impl oio::OneShotWrite for DropboxWriter {
Expand All @@ -75,7 +49,7 @@ impl oio::OneShotWrite for DropboxWriter {
let bytes = resp.into_body();
let decoded_response: DropboxMetadataResponse =
serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
let metadata = DropboxWriter::parse_metadata(decoded_response)?;
let metadata = decoded_response.parse_metadata()?;
Ok(metadata)
}
_ => Err(parse_error(resp)),
Expand Down
Loading