Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions native/src/ffi_profiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,13 @@ pub enum Section {

// -- FR4 range filter elimination (1) --
RangeFilterElimination = 60,

// -- TxLog parallel improvements (#153) (2) --
TxLogCheckpointVerify = 61,
TxLogBackwardProbe = 62,
}

pub const NUM_SECTIONS: usize = 61;
pub const NUM_SECTIONS: usize = 63;

impl Section {
pub const fn name(self) -> &'static str {
Expand Down Expand Up @@ -176,6 +180,8 @@ impl Section {
Self::TxLogDataSkip => "txlog_data_skip",
Self::TxLogArrowExport => "txlog_arrow_export",
Self::RangeFilterElimination => "range_filter_elimination",
Self::TxLogCheckpointVerify => "txlog_checkpoint_verify",
Self::TxLogBackwardProbe => "txlog_backward_probe",
}
}

Expand Down Expand Up @@ -203,6 +209,7 @@ impl Section {
Self::TxLogSnapshot, Self::TxLogManifestPrune, Self::TxLogManifestRead,
Self::TxLogPartitionFilter, Self::TxLogDataSkip, Self::TxLogArrowExport,
Self::RangeFilterElimination,
Self::TxLogCheckpointVerify, Self::TxLogBackwardProbe,
];
}

Expand Down Expand Up @@ -256,7 +263,7 @@ impl CacheCounter {
// Compile-time safety: ensure ALL array and enum stay in sync
// ═══════════════════════════════════════════════════════════════════════

const _: () = assert!(Section::RangeFilterElimination as usize == NUM_SECTIONS - 1,
const _: () = assert!(Section::TxLogBackwardProbe as usize == NUM_SECTIONS - 1,
"Section enum last variant must equal NUM_SECTIONS - 1");
const _: () = assert!(CacheCounter::PqColumnMiss as usize == NUM_CACHE_COUNTERS - 1,
"CacheCounter enum last variant must equal NUM_CACHE_COUNTERS - 1");
Expand Down
212 changes: 175 additions & 37 deletions native/src/txlog/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use futures::stream::{self, StreamExt};

/// Extract the maximum concurrent read limit from a config map.
/// Key: `max_concurrent_reads` (integer, default 32, 0 → default).
fn extract_max_concurrent(config_map: &HashMap<String, String>) -> usize {
pub(crate) fn extract_max_concurrent(config_map: &HashMap<String, String>) -> usize {
config_map.get("max_concurrent_reads")
.and_then(|s| s.parse::<usize>().ok())
.map(|n| if n == 0 { 32 } else { n })
Expand Down Expand Up @@ -83,6 +83,33 @@ async fn read_versions_concurrent(
results
}

/// Run up to `max_concurrent` futures concurrently, collecting results in order.
///
/// Uses `buffer_unordered` for bounded parallelism (same pattern as
/// `read_versions_concurrent`). Results are returned in the original
/// iterator order, not completion order.
pub(crate) async fn join_all_bounded<F, T>(
futures: Vec<F>,
max_concurrent: usize,
) -> Result<Vec<T>>
where
F: std::future::Future<Output = Result<T>>,
{
// Wrap each future with its index so we can restore insertion order
// after buffer_unordered processes them in completion order.
let indexed = futures.into_iter().enumerate().map(|(i, f)| async move {
f.await.map(|v| (i, v))
});
let mut results: Vec<(usize, T)> = stream::iter(indexed)
.buffer_unordered(max_concurrent)
.collect::<Vec<Result<(usize, T)>>>()
.await
.into_iter()
.collect::<Result<Vec<(usize, T)>>>()?;
results.sort_by_key(|(i, _)| *i);
Ok(results.into_iter().map(|(_, v)| v).collect())
}

/// Returns true if the error represents a "not found" condition from
/// any object store backend (S3, Azure, local filesystem).
fn is_not_found_error(e: &TxLogError) -> bool {
Expand Down Expand Up @@ -232,7 +259,11 @@ pub async fn get_txlog_snapshot_info_with_cache(
Ok(checkpoint_data) => {
// Normal path: checkpoint exists — verify version hint isn't stale
let mut last_cp: LastCheckpointInfo = serde_json::from_slice(&checkpoint_data)?;
let _t_verify = std::time::Instant::now();
let verified_version = verify_checkpoint_version(&storage, last_cp.version).await;
if crate::ffi_profiler::is_enabled() {
crate::ffi_profiler::record(crate::ffi_profiler::Section::TxLogCheckpointVerify, _t_verify.elapsed().as_nanos() as u64);
}
if verified_version > last_cp.version {
debug_println!("⚠️ DISTRIBUTED: _last_checkpoint stale: hint={}, actual={}",
last_cp.version, verified_version);
Expand Down Expand Up @@ -293,30 +324,67 @@ async fn snapshot_from_checkpoint(
// Mirrors the Scala reader's getMetadata() fallback (OptimizedTransactionLog.scala:1321-1330),
// which scans backwards through version files. After TRUNCATE TIME TRAVEL the version files
// are gone, but a previous checkpoint's state manifest may still carry the cached metadata.
//
// Issue #153: Batched concurrent probes — probe BATCH_SIZE versions at a time
// instead of one-at-a-time to amortize S3/Azure round-trip latency.
//
// Semantic improvement over the original sequential code: the old code stopped
// probing on the first missing state directory (Err), meaning a gap at version N
// would prevent finding valid metadata at version N-1. The batched version
// tolerates gaps — it issues all probes in a batch concurrently and skips
// individual failures, which is more resilient after partial GC or failed
// checkpoint writes.
if metadata.schema_string.is_empty() && last_cp.version > 0 {
let mut probe_version = last_cp.version - 1;
while metadata.schema_string.is_empty() {
let probe_state_dir = TxLogStorage::state_dir_name(probe_version);
match super::avro::state_reader::read_state_manifest(storage, &probe_state_dir).await {
Ok(prev_manifest) => {
if let Some(ref md_str) = prev_manifest.metadata {
if let Some(m) = parse_metadata_json(md_str) {
if !m.schema_string.is_empty() {
metadata = m;
// Also merge the earlier checkpoint's schema_registry so that
// older doc_mapping_ref hashes can be resolved (Fix 2c).
for (k, v) in &prev_manifest.schema_registry {
metadata.configuration.entry(k.clone())
.or_insert_with(|| v.clone());
let _t_backward = std::time::Instant::now();
const BACKWARD_BATCH: i64 = 4;
let mut probe_ceiling = last_cp.version - 1;

'outer: while metadata.schema_string.is_empty() && probe_ceiling >= 0 {
let probe_floor = std::cmp::max(0, probe_ceiling - BACKWARD_BATCH + 1);
// N.B. `.rev()` makes join_all return results in DESCENDING version order,
// so the loop below processes newest-first and takes the first valid match.
let probe_futs: Vec<_> = (probe_floor..=probe_ceiling).rev().map(|v| {
let dir = TxLogStorage::state_dir_name(v);
async move {
(v, super::avro::state_reader::read_state_manifest(storage, &dir).await)
}
}).collect();

let results = futures::future::join_all(probe_futs).await;

// Results are in descending version order (due to .rev() above).
// Take the first (newest) version that carries valid metadata.
for (v, result) in results {
match result {
Ok(prev_manifest) => {
if let Some(ref md_str) = prev_manifest.metadata {
if let Some(m) = parse_metadata_json(md_str) {
if !m.schema_string.is_empty() {
metadata = m;
for (k, val) in &prev_manifest.schema_registry {
metadata.configuration.entry(k.clone())
.or_insert_with(|| val.clone());
}
break 'outer;
}
}
}
}
if probe_version == 0 { break; }
probe_version -= 1;
Err(_) => {
// State directory doesn't exist at this version — skip
// and continue checking earlier versions in the batch.
// This is intentionally more resilient than the old sequential
// code which stopped on the first gap.
debug_println!("⚠️ DISTRIBUTED: backward probe: state dir missing at v{}", v);
}
}
Err(_) => break, // State directory doesn't exist — stop probing
}

if probe_floor == 0 { break; }
probe_ceiling = probe_floor - 1;
}
if crate::ffi_profiler::is_enabled() {
crate::ffi_profiler::record(crate::ffi_profiler::Section::TxLogBackwardProbe, _t_backward.elapsed().as_nanos() as u64);
}
}

Expand Down Expand Up @@ -1111,34 +1179,67 @@ async fn read_existing_checkpoint_info(
/// - HEAD per file → ~30–50 ms each
/// - For the common case (no commits since checkpoint), cost is 1 HEAD → ~50 ms
///
/// Uses batched concurrent probes (issue #153): issues BATCH_SIZE HEAD requests
/// at a time. If every probe in a batch hits, we advance and issue another
/// batch. The first batch with any miss tells us the frontier.
///
/// Falls back to a full LIST once the probe count exceeds `MAX_PROBE` (100) to
/// avoid issuing thousands of serial HEADs on a very stale table.
/// avoid issuing thousands of HEADs on a very stale table.
pub(crate) async fn probe_versions_since(
storage: &TxLogStorage,
since_version: i64,
) -> Result<Vec<i64>> {
const MAX_PROBE: i64 = 100;
const BATCH_SIZE: i64 = 8;

let mut versions = Vec::new();
let mut v = since_version + 1;

loop {
if v - (since_version + 1) >= MAX_PROBE {
let probed_so_far = v - (since_version + 1);
if probed_so_far >= MAX_PROBE {
// Fell off the fast path — too many versions; switch to a full LIST
debug_println!("⚠️ DISTRIBUTED: probe_versions_since hit limit at v{}, falling back to list_versions", v);
let all = storage.list_versions().await?;
let remaining: Vec<i64> = all.into_iter().filter(|&x| x >= v).collect();
versions.extend(remaining);
break;
}
let path = TxLogStorage::version_path(v);
match storage.exists(&path).await {
Ok(true) => { versions.push(v); v += 1; }
Ok(false) => break,
Err(e) => {
debug_println!("⚠️ DISTRIBUTED: probe_versions_since HEAD error at v{}: {}", v, e);

let batch_end = std::cmp::min(BATCH_SIZE, MAX_PROBE - probed_so_far);
let probe_futs: Vec<_> = (0..batch_end).map(|offset| {
let probe_v = v + offset;
let path = TxLogStorage::version_path(probe_v);
async move {
(probe_v, storage.exists(&path).await.unwrap_or(false))
}
}).collect();

let results = futures::future::join_all(probe_futs).await;

// Results are in ascending version order (matching 0..batch_end iterator).
// Find the highest contiguous version that exists.
let mut any_hit = false;
for (probe_v, exists) in &results {
if *exists && *probe_v == v {
versions.push(v);
v += 1;
any_hit = true;
} else {
break;
}
}

if !any_hit {
break; // No newer version exists
}

// If not all probes in the batch hit, we found the frontier
if results.iter().any(|(_, exists)| !exists) {
break;
}
}

Ok(versions)
}

Expand All @@ -1148,29 +1249,66 @@ pub(crate) async fn probe_versions_since(

/// Verify a checkpoint version hint by probing for newer state directories.
///
/// Matches Scala's `verifyCheckpointVersion`: sequentially checks for state-v(N+1),
/// state-v(N+2), etc. until no more exist. Returns the actual latest version.
/// Uses batched concurrent probes (issue #153): instead of sequential HEAD
/// requests, issues BATCH_SIZE probes at a time. Each batch checks versions
/// [base+1, base+BATCH_SIZE] concurrently via HEAD requests (`storage.exists`).
/// If every probe in a batch hits, we advance and issue another batch.
/// The first batch with any miss tells us the frontier — the highest
/// contiguous version is the answer.
///
/// Batch size of 4 balances between:
/// - Common case (0-1 stale): 3-4 wasted HEAD probes (cheap)
/// - Stale case (many versions behind): 4× fewer round-trips than sequential
async fn verify_checkpoint_version(
storage: &TxLogStorage,
hint_version: i64,
) -> i64 {
const MAX_VERSION_PROBE: i64 = 1000;
const BATCH_SIZE: i64 = 4;

let mut version = hint_version;
let mut probed = 0i64;
let mut total_probed: i64 = 0;

loop {
if probed >= MAX_VERSION_PROBE {
if total_probed >= MAX_VERSION_PROBE {
debug_println!("⚠️ DISTRIBUTED: verify_checkpoint_version hit probe limit ({}) at v{}", MAX_VERSION_PROBE, version);
break;
}
let next_dir = TxLogStorage::state_dir_name(version + 1);
let next_manifest = format!("{}/{}", next_dir, STATE_MANIFEST_FILENAME);
match storage.get(&next_manifest).await {
Ok(_) => {
version += 1;
probed += 1;

let batch_end = std::cmp::min(BATCH_SIZE, MAX_VERSION_PROBE - total_probed);
let probe_futs: Vec<_> = (1..=batch_end).map(|offset| {
let v = version + offset;
let next_dir = TxLogStorage::state_dir_name(v);
let next_manifest = format!("{}/{}", next_dir, STATE_MANIFEST_FILENAME);
async move {
// Use HEAD (exists) instead of GET to avoid downloading manifest bodies.
(v, storage.exists(&next_manifest).await.unwrap_or(false))
}
}).collect();

let results = futures::future::join_all(probe_futs).await;

// Results are in ascending version order (matching the (1..=batch_end) iterator).
// Find the highest contiguous version that exists.
let mut advanced = false;
for (v, exists) in &results {
if *exists && *v == version + 1 {
version = *v;
total_probed += 1;
advanced = true;
debug_println!("📊 DISTRIBUTED: _last_checkpoint regression: found state at v{}", version);
} else {
break;
}
Err(_) => break,
}

if !advanced {
break; // No newer version exists
}

// If not all probes in the batch hit, we found the frontier
if results.iter().any(|(_, exists)| !exists) {
break;
}
}
version
Expand Down
20 changes: 12 additions & 8 deletions native/src/txlog/list_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub async unsafe fn list_files_arrow_ffi(
schema_addrs: &[i64],
cache_manager: Option<&Arc<GlobalSplitCacheManager>>,
) -> Result<ListFilesResult> {
let max_concurrent = distributed::extract_max_concurrent(config_map);
// 1. Get snapshot info (cached by table_path + TTL)
let _t_snapshot = std::time::Instant::now();
let snapshot = distributed::get_txlog_snapshot_info_with_cache(
Expand Down Expand Up @@ -144,15 +145,18 @@ pub async unsafe fn list_files_arrow_ffi(
crate::ffi_profiler::record(crate::ffi_profiler::Section::TxLogManifestPrune, _t_prune.elapsed().as_nanos() as u64);
}

// 3. Read manifests (executors would do this in Spark)
// 3. Read manifests concurrently (parallelized — issue #153)
let _t_read = std::time::Instant::now();
let mut checkpoint_entries: Vec<FileEntry> = Vec::new();
for manifest in &surviving_manifests {
let entries = distributed::read_manifest(
table_path, config, &snapshot.state_dir, &manifest.path, &metadata_config,
).await?;
checkpoint_entries.extend(entries);
}
let mut checkpoint_entries: Vec<FileEntry> = {
let manifest_futs: Vec<_> = surviving_manifests.iter().map(|manifest| {
distributed::read_manifest(
table_path, config, &snapshot.state_dir, &manifest.path, &metadata_config,
)
}).collect();

let results = distributed::join_all_bounded(manifest_futs, max_concurrent).await?;
results.into_iter().flatten().collect()
};

// 3b. Apply checkpoint tombstones (from incremental checkpoints)
if !snapshot.tombstones.is_empty() {
Expand Down
2 changes: 2 additions & 0 deletions native/src/txlog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ mod integration_tests;
mod protocol_regression_tests;
#[cfg(test)]
mod purge_tests;
#[cfg(test)]
mod parallel_bench_tests;
Loading
Loading