Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[features]
stream = ["dep:futures", "dep:stream-kmerge"]
std = []
geo = ["dep:geo"]
geoarrow = [
Expand Down Expand Up @@ -51,6 +52,8 @@ serde_json = { workspace = true, features = ["preserve_order"] }
serde_urlencoded.workspace = true
stac-derive = { version = "0.3.0", path = "../derive" }
thiserror.workspace = true
stream-kmerge = { version = "0.2.0", optional = true }
futures = { workspace = true, optional = true }
tracing.workspace = true
url = { workspace = true, features = ["serde"] }
wkb = { workspace = true, optional = true }
Expand All @@ -59,7 +62,7 @@ wkb = { workspace = true, optional = true }
assert-json-diff.workspace = true
bytes.workspace = true
rstest.workspace = true
tokio = { workspace = true, features = ["macros"] }
tokio = { workspace = true, features = ["macros", "rt"] }
tokio-test.workspace = true

[package.metadata.docs.rs]
Expand Down
113 changes: 113 additions & 0 deletions crates/core/src/api/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ use super::{ItemCollection, Items, Search};
use crate::{Collection, Error, Item};
use std::future::Future;

#[cfg(feature = "stream")]
use std::pin::Pin;

/// A STAC API Item — a JSON map that may be a full or partial [`Item`].
///
/// Re-exported here to avoid importing from `super` in trait signatures.
#[cfg(feature = "stream")]
type ApiItem = super::Item;

/// A client that can search for STAC items.
///
/// [`SearchClient::search`] is the only required method. [`SearchClient::item`]
Expand Down Expand Up @@ -61,6 +70,110 @@ pub trait SearchClient: Send + Sync {
}
}

/// A client that can produce a paginated stream of STAC API items.
///
/// This is the streaming counterpart to [`SearchClient::search`]. While
/// `SearchClient` returns a single page, `StreamSearchClient` returns a
/// stream that automatically pages through all results.
///
/// For types that already implement [`SearchClient`], a default paginating
/// stream can be obtained via the [`stream_search_pages`] helper function.
///
/// # Examples
///
/// ```no_run
/// use stac::api::{Search, StreamSearchClient};
/// use futures::StreamExt;
///
/// # async fn example(client: impl StreamSearchClient<Error = stac::Error>) {
/// let mut stream = client.search_stream(Search::default()).await.unwrap();
/// while let Some(result) = stream.next().await {
/// let item = result.unwrap();
/// println!("{:?}", item.get("id"));
/// }
/// # }
/// ```
#[cfg(feature = "stream")]
pub trait StreamSearchClient: Send + Sync {
/// The error type for this client.
type Error: Send;

/// Returns a stream of API items by paging through results.
fn search_stream(
&self,
search: Search,
) -> impl Future<
Output = Result<
Pin<Box<dyn futures::Stream<Item = Result<ApiItem, Self::Error>> + Send + '_>>,
Self::Error,
>,
> + Send;
}

/// Creates a paginated stream from a [`SearchClient`].
///
/// This helper repeatedly calls [`SearchClient::search`], yielding each item
/// from each page. Pagination tokens are carried forward by merging
/// [`ItemCollection::next`] into [`Search::items::additional_fields`].
///
/// Use this inside a [`StreamSearchClient`] implementation to get token-based
/// pagination for free:
///
/// ```ignore
/// impl StreamSearchClient for MyBackend {
/// type Error = MyError;
/// async fn search_stream(&self, search: Search)
/// -> Result<Pin<Box<dyn Stream<Item = Result<ApiItem, Self::Error>> + Send + '_>>, Self::Error>
/// {
/// Ok(stream_search_pages(self, search))
/// }
/// }
/// ```
#[cfg(feature = "stream")]
pub fn stream_search_pages<'a, C>(
client: &'a C,
search: Search,
) -> Pin<Box<dyn futures::Stream<Item = Result<ApiItem, C::Error>> + Send + 'a>>
where
C: SearchClient + ?Sized,
{
use futures::stream;

let stream = stream::unfold(
(client, Some(search), Vec::<ApiItem>::new()),
|(client, next_search_opt, mut buffer)| async move {
// Yield buffered items first.
if !buffer.is_empty() {
let item = buffer.remove(0);
return Some((Ok(item), (client, next_search_opt, buffer)));
}

// Fetch the next page.
let search = next_search_opt?;
match client.search(search.clone()).await {
Ok(page) => {
if page.items.is_empty() {
return None;
}
let next = page.next.and_then(|next_params| {
let mut next_search = search;
for (k, v) in next_params {
let _ = next_search.items.additional_fields.insert(k, v);
}
Some(next_search)
});
let mut items = page.items;
let first = items.remove(0);
Some((Ok(first), (client, next, items)))
}
Err(e) => Some((Err(e), (client, None, Vec::new()))),
}
},
);

Box::pin(stream)
}

/// A client that can retrieve STAC collections.
///
/// [`CollectionSearchClient::collections`] is the only required method.
Expand Down
Loading
Loading