Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions crates/examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ rust-version = { workspace = true }
version = { workspace = true }

[dependencies]
datafusion = { workspace = true }
futures = { workspace = true }
iceberg = { workspace = true }
iceberg-catalog-rest = { workspace = true }
iceberg-datafusion = { workspace = true }
tokio = { workspace = true, features = ["full"] }

[[example]]
Expand All @@ -43,6 +45,10 @@ name = "oss-backend"
path = "src/oss_backend.rs"
required-features = ["storage-oss"]

[[example]]
name = "datafusion-incremental-read"
path = "src/datafusion_incremental_read.rs"

[features]
default = []
storage-oss = ["iceberg/storage-oss"]
157 changes: 157 additions & 0 deletions crates/examples/src/datafusion_incremental_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Example demonstrating incremental reads with DataFusion.
//!
//! Incremental reads allow you to scan only the data that was added between
//! two snapshots. This is useful for:
//! - Change data capture (CDC) pipelines
//! - Incremental data processing
//! - Efficiently reading only new data since last checkpoint
//!
//! # Prerequisites
//!
//! This example requires a running iceberg-rest catalog on port 8181 with
//! a table that has multiple snapshots. You can set this up using the official
//! [quickstart documentation](https://iceberg.apache.org/spark-quickstart/).

use std::collections::HashMap;
use std::sync::Arc;

use datafusion::prelude::SessionContext;
use iceberg::{Catalog, CatalogBuilder, TableIdent};
use iceberg_catalog_rest::{REST_CATALOG_PROP_URI, RestCatalogBuilder};
use iceberg_datafusion::IcebergStaticTableProvider;

static REST_URI: &str = "http://localhost:8181";
static NAMESPACE: &str = "default";
static TABLE_NAME: &str = "incremental_test";

/// This example demonstrates how to perform incremental reads using DataFusion.
///
/// Incremental reads scan only the data files that were added between two snapshots,
/// which is much more efficient than scanning the entire table when you only need
/// the new data.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create the REST iceberg catalog
let catalog = RestCatalogBuilder::default()
.load(
"rest",
HashMap::from([(REST_CATALOG_PROP_URI.to_string(), REST_URI.to_string())]),
)
.await?;

// Load the table
let table_ident = TableIdent::from_strs([NAMESPACE, TABLE_NAME])?;
let table = catalog.load_table(&table_ident).await?;

// Get available snapshots
let snapshots: Vec<_> = table.metadata().snapshots().collect();
println!("Table has {} snapshots:", snapshots.len());
for snapshot in &snapshots {
println!(
" - Snapshot {}: {:?}",
snapshot.snapshot_id(),
snapshot.summary().operation
);
}

if snapshots.len() < 2 {
println!("Need at least 2 snapshots for incremental read demo.");
println!("Try inserting some data into the table to create more snapshots.");
return Ok(());
}

// Get the first and last snapshot IDs
let from_snapshot_id = snapshots[0].snapshot_id();
let to_snapshot_id = snapshots[snapshots.len() - 1].snapshot_id();

println!("Performing incremental read from snapshot {from_snapshot_id} to {to_snapshot_id}",);

// ANCHOR: incremental_read
// Create a DataFusion session
let ctx = SessionContext::new();

// Method 1: Scan changes between two specific snapshots (exclusive from)
// This returns only data added AFTER from_snapshot_id up to and including to_snapshot_id
let provider = IcebergStaticTableProvider::try_new_incremental(
table.clone(),
from_snapshot_id,
to_snapshot_id,
)
.await?;

ctx.register_table("incremental_changes", Arc::new(provider))?;

// Query the incremental changes
let df = ctx
.sql("SELECT * FROM incremental_changes LIMIT 10")
.await?;
println!("\nIncremental changes (first 10 rows):");
df.show().await?;
// ANCHOR_END: incremental_read

// ANCHOR: appends_after
// Method 2: Scan all appends after a specific snapshot up to current
// Useful for "give me all new data since my last checkpoint"
let provider =
IcebergStaticTableProvider::try_new_appends_after(table.clone(), from_snapshot_id).await?;

ctx.register_table("new_data", Arc::new(provider))?;

let df = ctx.sql("SELECT COUNT(*) as new_rows FROM new_data").await?;
println!("\nNew rows since snapshot {from_snapshot_id}:");
df.show().await?;
// ANCHOR_END: appends_after

// ANCHOR: incremental_inclusive
// Method 3: Inclusive incremental read (includes the from_snapshot)
let provider = IcebergStaticTableProvider::try_new_incremental_inclusive(
table.clone(),
from_snapshot_id,
to_snapshot_id,
)
.await?;

ctx.register_table("inclusive_changes", Arc::new(provider))?;

let df = ctx
.sql("SELECT COUNT(*) as total_rows FROM inclusive_changes")
.await?;
println!("\nRows including from_snapshot:");
df.show().await?;
// ANCHOR_END: incremental_inclusive

// ANCHOR: with_filters
// You can combine incremental reads with filters and projections
let provider =
IcebergStaticTableProvider::try_new_appends_after(table.clone(), from_snapshot_id).await?;

ctx.register_table("filtered_changes", Arc::new(provider))?;

// Example: Get only specific columns with a filter
// (adjust column names based on your actual table schema)
let df = ctx.sql("SELECT * FROM filtered_changes LIMIT 5").await?;
println!("\nFiltered incremental data:");
df.show().await?;
// ANCHOR_END: with_filters

println!("\nIncremental read example completed successfully!");

Ok(())
}
39 changes: 34 additions & 5 deletions crates/iceberg/src/scan/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use crate::expr::{Bind, BoundPredicate, Predicate};
use crate::io::object_cache::ObjectCache;
use crate::scan::{
BoundPredicates, ExpressionEvaluatorCache, FileScanTask, ManifestEvaluatorCache,
PartitionFilterCache,
PartitionFilterCache, SnapshotRange,
};
use crate::spec::{
ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, SchemaRef, SnapshotRef,
TableMetadataRef,
ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, ManifestStatus, SchemaRef,
SnapshotRef, TableMetadataRef,
};
use crate::{Error, ErrorKind, Result};

Expand All @@ -47,6 +47,8 @@ pub(crate) struct ManifestFileContext {
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
delete_file_index: DeleteFileIndex,
case_sensitive: bool,
/// Optional snapshot range for incremental scans
snapshot_range: Option<Arc<SnapshotRange>>,
}

/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
Expand Down Expand Up @@ -76,12 +78,36 @@ impl ManifestFileContext {
mut sender,
expression_evaluator_cache,
delete_file_index,
..
case_sensitive,
snapshot_range,
} = self;

let manifest = object_cache.get_manifest(&manifest_file).await?;

for manifest_entry in manifest.entries() {
// For incremental scans, filter entries to only include those:
// 1. With status ADDED (not EXISTING or DELETED)
// 2. With a snapshot_id that falls within the range
if let Some(ref range) = snapshot_range {
// Only include entries with status ADDED
if manifest_entry.status() != ManifestStatus::Added {
continue;
}

// Only include entries from snapshots in the range
match manifest_entry.snapshot_id() {
Some(entry_snapshot_id) => {
if !range.contains(entry_snapshot_id) {
continue;
}
}
None => {
// Skip entries without a snapshot_id in incremental mode
continue;
}
}
}

let manifest_entry_context = ManifestEntryContext {
// TODO: refactor to avoid the expensive ManifestEntry clone
manifest_entry: manifest_entry.clone(),
Expand All @@ -91,7 +117,7 @@ impl ManifestFileContext {
bound_predicates: bound_predicates.clone(),
snapshot_schema: snapshot_schema.clone(),
delete_file_index: delete_file_index.clone(),
case_sensitive: self.case_sensitive,
case_sensitive,
};

sender
Expand Down Expand Up @@ -160,6 +186,8 @@ pub(crate) struct PlanContext {
pub partition_filter_cache: Arc<PartitionFilterCache>,
pub manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
pub expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
/// Optional snapshot range for incremental scans
pub snapshot_range: Option<Arc<SnapshotRange>>,
}

impl PlanContext {
Expand Down Expand Up @@ -282,6 +310,7 @@ impl PlanContext {
expression_evaluator_cache: self.expression_evaluator_cache.clone(),
delete_file_index,
case_sensitive: self.case_sensitive,
snapshot_range: self.snapshot_range.clone(),
}
}
}
Loading
Loading