Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ jobs:

- name: Rust tests
run: cd native && cargo test --lib --tests
env:
RUST_BACKTRACE: 1

build-and-test-linux:
name: Build & Test (Linux X64)
Expand All @@ -53,3 +55,5 @@ jobs:

- name: Rust tests
run: cd native && cargo test --lib --tests
env:
RUST_BACKTRACE: 1
33 changes: 14 additions & 19 deletions native/src/disk_cache/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

#[cfg(test)]
mod tests {
use std::time::Duration;

use tempfile::TempDir;

use super::super::*;
Expand Down Expand Up @@ -79,7 +77,7 @@ mod tests {
let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();

cache.put("s3://bucket/path", "split-001", "term", None, &data);
std::thread::sleep(Duration::from_millis(100));
cache.flush_blocking();

let retrieved = cache.get("s3://bucket/path", "split-001", "term", None);
assert!(retrieved.is_some());
Expand All @@ -96,7 +94,7 @@ mod tests {
let range = Some(0u64..5000u64);

cache.put("s3://bucket", "split-002", "idx", range.clone(), &data);
std::thread::sleep(Duration::from_millis(100));
cache.flush_blocking();

let retrieved = cache.get("s3://bucket", "split-002", "idx", range);
assert!(retrieved.is_some());
Expand All @@ -115,20 +113,20 @@ mod tests {

let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
cache.put("s3://bucket", "split-001", "term", None, &data);
std::thread::sleep(Duration::from_millis(100));
cache.flush_blocking();

assert!(cache.get_total_bytes() > 0);
assert_eq!(cache.get_split_count(), 1);
assert_eq!(cache.get_component_count(), 1);

cache.put("s3://bucket", "split-001", "idx", None, &data);
std::thread::sleep(Duration::from_millis(100));
cache.flush_blocking();

assert_eq!(cache.get_split_count(), 1);
assert_eq!(cache.get_component_count(), 2);

cache.put("s3://bucket", "split-002", "term", None, &data);
std::thread::sleep(Duration::from_millis(100));
cache.flush_blocking();

assert_eq!(cache.get_split_count(), 2);
assert_eq!(cache.get_component_count(), 3);
Expand All @@ -145,14 +143,14 @@ mod tests {
cache.put("s3://bucket", "split-001", "term", None, &data);
cache.put("s3://bucket", "split-001", "idx", None, &data);
cache.put("s3://bucket", "split-002", "term", None, &data);
std::thread::sleep(Duration::from_millis(150));
cache.flush_blocking();

assert_eq!(cache.get_split_count(), 2);
assert_eq!(cache.get_component_count(), 3);
let bytes_before = cache.get_total_bytes();

cache.evict_split("s3://bucket", "split-001");
std::thread::sleep(Duration::from_millis(100));
cache.flush_blocking();

assert_eq!(cache.get_split_count(), 1);
assert_eq!(cache.get_component_count(), 1);
Expand All @@ -177,7 +175,7 @@ mod tests {
let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();

cache.put("s3://bucket", "split-001", "term", None, &data);
std::thread::sleep(Duration::from_millis(100));
cache.flush_blocking();

let retrieved = cache.get("s3://bucket", "split-001", "term", None);
assert!(retrieved.is_some());
Expand Down Expand Up @@ -216,10 +214,9 @@ mod tests {
let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
cache.put("s3://bucket", "split-001", "term", None, &data);
cache.put("s3://bucket", "split-002", "idx", None, &data);
std::thread::sleep(Duration::from_millis(100));
cache.flush_blocking();

cache.sync_manifest().unwrap();
std::thread::sleep(Duration::from_millis(100));
}

let manifest_path = cache_path.join(CACHE_SUBDIR).join("manifest.json");
Expand Down Expand Up @@ -249,7 +246,7 @@ mod tests {

let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
cache.put("s3://bucket", "split-001", "term", None, &data);
std::thread::sleep(Duration::from_millis(100));
cache.flush_blocking();
cache.sync_manifest().unwrap();
}

Expand Down Expand Up @@ -290,11 +287,9 @@ mod tests {

for i in 0..6 {
cache.put("s3://bucket", &format!("split-{:03}", i), "term", None, &data);
std::thread::sleep(Duration::from_millis(50));
cache.flush_blocking();
}

std::thread::sleep(Duration::from_millis(200));

let total_bytes = cache.get_total_bytes();
assert!(total_bytes <= 50000);

Expand Down Expand Up @@ -333,7 +328,7 @@ mod tests {
handle.join().unwrap();
}

std::thread::sleep(Duration::from_millis(200));
cache.flush_blocking();

let mut read_handles = vec![];
for i in 0..4 {
Expand Down Expand Up @@ -372,7 +367,7 @@ mod tests {

cache.put("s3://bucket", "split-001", "idx", Some(0..5000), &data1);
cache.put("s3://bucket", "split-001", "idx", Some(5000..10000), &data2);
std::thread::sleep(Duration::from_millis(100));
cache.flush_blocking();

let r1 = cache.get("s3://bucket", "split-001", "idx", Some(0..5000));
let r2 = cache.get("s3://bucket", "split-001", "idx", Some(5000..10000));
Expand Down Expand Up @@ -407,7 +402,7 @@ mod tests {
cache.put("s3://bucket-a", "split-001", "term", None, &data);
cache.put("s3://bucket-b", "split-001", "term", None, &data);
cache.put("azure://container", "split-001", "term", None, &data);
std::thread::sleep(Duration::from_millis(150));
cache.flush_blocking();

assert!(cache.get("s3://bucket-a", "split-001", "term", None).is_some());
assert!(cache.get("s3://bucket-b", "split-001", "term", None).is_some());
Expand Down
17 changes: 16 additions & 1 deletion native/src/parquet_companion/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2495,10 +2495,25 @@ mod tests {

// Read the manifest from the split bundle
let split_bytes = std::fs::read(&split_path).unwrap();
eprintln!(
"🔍 TEST: Split file size = {} bytes, path = {}",
split_bytes.len(),
split_path.display()
);
let file_slice = tantivy::directory::FileSlice::new(Arc::new(
tantivy::directory::OwnedBytes::new(split_bytes),
));
let bundle = quickwit_directories::BundleDirectory::open_split(file_slice).unwrap();
let bundle = quickwit_directories::BundleDirectory::open_split(file_slice)
.unwrap_or_else(|e| {
// On failure, dump the bundle footer for diagnosis
let split_bytes2 = std::fs::read(&split_path).unwrap();
let len = split_bytes2.len();
let footer_preview = if len > 256 { &split_bytes2[len-256..] } else { &split_bytes2[..] };
eprintln!("❌ BundleDirectory::open_split FAILED: {}", e);
eprintln!(" Split file size: {} bytes", len);
eprintln!(" Last 256 bytes (hex): {:02x?}", footer_preview);
panic!("BundleDirectory::open_split failed: {}", e);
});

use tantivy::directory::Directory;
use crate::parquet_companion::manifest_io::{MANIFEST_FILENAME, deserialize_manifest};
Expand Down
Loading