Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ duckdb-bundled = ["stac-duckdb/bundled"]

[dependencies]
anyhow.workspace = true
chrono.workspace = true
async-stream.workspace = true
axum.workspace = true
clap = { workspace = true, features = ["derive"] }
Expand Down
190 changes: 190 additions & 0 deletions crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,56 @@ pub enum Command {
shell: clap_complete::Shell,
},

/// Compute each item's spatio-temporal hash and prefix it's id.
///
/// Creates a sortable Z-order curve hash from each item's datetime and
/// bounding box center, then prefixes item ids with the hex-encoded
/// hash and optionally sorts items by hash value.
///
/// If --spatial-precision and --temporal-precision are not provided, the hasher
/// automatically uses maximum precision (21 bits per dimension).
Hash {
/// The input file.
///
/// To read from standard input, pass `-` or don't provide an argument at all.
infile: Option<String>,

/// The output file.
///
/// To write to standard output, pass `-` or don't provide an argument at all.
outfile: Option<String>,

/// Minimum spatial cell size in degrees.
///
/// Must be provided together with --temporal-precision. If neither is
/// provided, maximum precision is used automatically.
#[arg(long, requires = "temporal_precision")]
spatial_precision: Option<f64>,

/// Minimum temporal cell size, as an ISO 8601 duration.
///
/// Must be provided together with --spatial-precision. If neither is
/// provided, maximum precision is used automatically.
///
/// Examples: P1D (1 day), PT1H (1 hour), P30D (30 days)
#[arg(long, requires = "spatial_precision")]
temporal_precision: Option<String>,

/// Sort items by hash value.
#[arg(long, default_value_t = false)]
sort: bool,

/// Temporal extent for the hasher, as a '/'-separated RFC 3339 interval.
///
/// Examples: 2020-01-01T00:00:00Z/2025-01-01T00:00:00Z
///
/// If provided, the hasher is created directly from this extent and
/// items are streamed instead of loaded into memory (unless --sort
/// is also provided).
#[arg(long)]
temporal_extent: Option<String>,
},

/// Generate a STAC collection from one or more items
Collection {
/// The input file.
Expand Down Expand Up @@ -636,6 +686,94 @@ impl Rustac {
clap_complete::generate(shell, &mut command, "rustac", &mut std::io::stdout());
Ok(())
}
Command::Hash {
ref infile,
ref outfile,
spatial_precision,
ref temporal_precision,
sort,
ref temporal_extent,
} => {
let precisions = match (spatial_precision, temporal_precision) {
(Some(s), Some(t)) => Some((s, parse_iso8601_duration(t)?)),
_ => None,
};
if let Some(temporal_extent) = temporal_extent {
let (start, end) = stac::datetime::parse(temporal_extent)?;
let start = start
.ok_or_else(|| anyhow!("temporal extent start must not be open"))?
.to_utc();
let end = end
.ok_or_else(|| anyhow!("temporal extent end must not be open"))?
.to_utc();
let hasher = if let Some((spatial_precision, temporal_precision)) = precisions {
stac::hash::Hasher::new(spatial_precision, temporal_precision, start..end)?
} else {
stac::hash::Hasher::from_temporal_extent(start..end)?
};
if sort {
let items = self.get_item_stream(infile.as_deref()).await?;
let mut hashed: Vec<(u64, Item)> = items
.map(|item| {
let mut item = item?;
let hash = item.hash(&hasher).unwrap_or(0);
item.id = format!("{hash:016x}-{}", item.id);
Ok((hash, item))
})
.collect::<Result<Vec<_>>>()?;
hashed.sort_by_key(|(hash, _)| *hash);
let items: Vec<Item> = hashed.into_iter().map(|(_, item)| item).collect();
self.put_item_stream(outfile.as_deref(), items.into_iter().map(Ok))
.await
} else {
let items = self.get_item_stream(infile.as_deref()).await?;
let items = items.map(move |item| {
let mut item = item?;
let hash = item.hash(&hasher).unwrap_or(0);
item.id = format!("{hash:016x}-{}", item.id);
Ok(item)
});
self.put_item_stream(outfile.as_deref(), items).await
}
} else {
let value = self.get(infile.as_deref()).await?;
let items = match value {
stac::Value::ItemCollection(ic) => ic.items,
stac::Value::Item(item) => vec![item],
other => {
return Err(anyhow!(
"expected an item collection or item, got {}",
other.type_name()
));
}
};
let hasher = if let Some((spatial_precision, temporal_precision)) = precisions {
stac::hash::Hasher::from_items(
&items,
spatial_precision,
temporal_precision,
)?
.ok_or_else(|| anyhow!("no items have datetimes"))?
} else {
stac::hash::Hasher::from_items_auto(&items)?
.ok_or_else(|| anyhow!("no items have datetimes"))?
};
let mut hashed: Vec<(u64, Item)> = items
.into_iter()
.map(|mut item| {
let hash = item.hash(&hasher).unwrap_or(0);
item.id = format!("{hash:016x}-{}", item.id);
(hash, item)
})
.collect();
if sort {
hashed.sort_by_key(|(hash, _)| *hash);
}
let items: Vec<Item> = hashed.into_iter().map(|(_, item)| item).collect();
self.put_item_stream(outfile.as_deref(), items.into_iter().map(Ok))
.await
}
}
Command::Collection {
ref infile,
ref outfile,
Expand Down Expand Up @@ -1018,5 +1156,57 @@ async fn crawl(value: stac::Value, store: StacStore) -> impl TryStream<Item = Re
}
}

fn parse_iso8601_duration(s: &str) -> Result<chrono::TimeDelta> {
let s = s
.strip_prefix('P')
.ok_or_else(|| anyhow!("ISO 8601 duration must start with 'P': {s}"))?;
let (date_part, time_part) = if let Some(idx) = s.find('T') {
(&s[..idx], Some(&s[idx + 1..]))
} else {
(s, None)
};
let mut total_seconds: i64 = 0;
let mut num_buf = String::new();
for ch in date_part.chars() {
if ch.is_ascii_digit() {
num_buf.push(ch);
} else {
let n: i64 = num_buf
.parse()
.map_err(|_| anyhow!("invalid number in duration: {s}"))?;
num_buf.clear();
match ch {
'Y' => total_seconds += n * 365 * 86400,
'W' => total_seconds += n * 7 * 86400,
'D' => total_seconds += n * 86400,
_ => return Err(anyhow!("unknown date duration unit '{ch}' in: P{s}")),
}
}
}
if let Some(time_part) = time_part {
for ch in time_part.chars() {
if ch.is_ascii_digit() {
num_buf.push(ch);
} else {
let n: i64 = num_buf
.parse()
.map_err(|_| anyhow!("invalid number in duration: {s}"))?;
num_buf.clear();
match ch {
'H' => total_seconds += n * 3600,
'M' => total_seconds += n * 60,
'S' => total_seconds += n,
_ => return Err(anyhow!("unknown time duration unit '{ch}' in: P{s}")),
}
}
}
}
if total_seconds == 0 {
return Err(anyhow!("duration must be positive: P{s}"));
}
chrono::TimeDelta::try_seconds(total_seconds)
.ok_or_else(|| anyhow!("duration out of range: P{s}"))
}

#[cfg(test)]
use {assert_cmd as _, rstest as _, tempfile as _};
38 changes: 38 additions & 0 deletions crates/core/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,44 @@ impl Collection {
self.update_extents(item);
self.maybe_add_item_link(item)
}

/// Creates a [Hasher](crate::hash::Hasher) from this collection's temporal extent.
///
/// The `spatial_precision` and `temporal_precision` parameters control the
/// resolution (minimum cell size) of the hasher. The temporal extent is
/// derived from the first interval in this collection's temporal extent.
///
/// Returns `None` if the collection's first temporal interval does not have
/// both a start and end datetime.
///
/// # Examples
///
/// ```
/// use chrono::{TimeDelta, TimeZone, Utc};
/// use stac::Collection;
///
/// let mut collection = Collection::new("an-id", "a description");
/// collection.extent.temporal.interval =
/// vec![[
/// Some(Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap()),
/// Some(Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap()),
/// ]];
///
/// let hasher = collection.hasher(1.0, TimeDelta::days(1)).unwrap();
/// assert!(hasher.is_some());
/// ```
pub fn hasher(
&self,
spatial_precision: f64,
temporal_precision: chrono::TimeDelta,
) -> std::result::Result<Option<crate::hash::Hasher>, crate::hash::Error> {
let interval = self.extent.temporal.interval.first();
let (start, end) = match interval {
Some([Some(start), Some(end)]) => (*start, *end),
_ => return Ok(None),
};
crate::hash::Hasher::new(spatial_precision, temporal_precision, start..end).map(Some)
}
}

impl Provider {
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/geoarrow/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,8 @@ pub fn from_record_batch_reader<R: RecordBatchReader>(
Ok(rows)
}

pub(crate) fn record_batch_to_json_rows(
/// Converts a record batch to JSON values.
pub fn record_batch_to_json_rows(
record_batch: RecordBatch,
) -> Result<Vec<JsonMap<String, Value>>, Error> {
let mut rows: Vec<Option<JsonMap<String, Value>>> =
Expand Down
Loading