Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions native/src/parquet_companion/arrow_ffi_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::quickwit_split::{
SplitConfig, QuickwitSplitMetadata, FooterOffsets,
default_split_config, create_quickwit_split,
};
use crate::quickwit_split::json_discovery::extract_doc_mapping_from_index;
use crate::quickwit_split::json_discovery::{extract_doc_mapping_from_index, write_doc_mapping_to_dir};

/// Context for streaming Arrow FFI split creation.
/// Wrapped in Arc<Mutex<>> at the JNI layer (IndexWriter is !Sync).
Expand Down Expand Up @@ -898,8 +898,15 @@ async fn finalize_partition_writer_into_split(
skipped_splits: Vec::new(),
};

if let Ok(doc_mapping_json) = extract_doc_mapping_from_index(&pw.index) {
metadata.doc_mapping_json = Some(doc_mapping_json);
match extract_doc_mapping_from_index(&pw.index) {
Ok(doc_mapping_json) => {
// Persist so future merges recover JSON sub-fields from file instead of re-sampling.
write_doc_mapping_to_dir(&index_dir_path, &doc_mapping_json)?;
metadata.doc_mapping_json = Some(doc_mapping_json);
}
Err(e) => {
debug_println!("⚠️ ARROW_FFI: Failed to extract doc_mapping from index: {}. Split will have no doc_mapping.", e);
}
}

let footer = create_quickwit_split(
Expand Down
298 changes: 291 additions & 7 deletions native/src/parquet_companion/indexing.rs

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion native/src/parquet_companion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub mod cached_reader;
pub mod docid_mapping;
pub mod doc_retrieval;
pub mod validation;
pub mod store_stub;
pub mod schema_derivation;
pub mod indexing;
pub mod augmented_directory;
Expand Down
183 changes: 168 additions & 15 deletions native/src/parquet_companion/schema_derivation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,20 @@ fn add_field_for_arrow_type(
if config.json_fields.contains(name) {
match data_type {
DataType::Utf8 | DataType::LargeUtf8 => {
// STRING column forced to JSON: stored + indexed + fast
// STRING column forced to JSON: indexed + fast (stored only if requested)
// Fast fields are required for range queries on numeric sub-fields
// Use "raw" tokenizer to match TANT batch path behavior (exact string matching)
let opts = JsonObjectOptions::default()
.set_stored()
let mut opts = JsonObjectOptions::default()
.set_indexing_options(
TextFieldIndexing::default()
.set_tokenizer("raw")
.set_index_option(IndexRecordOption::Basic)
.set_fieldnorms(config.fieldnorms_enabled),
)
.set_fast(None);
if config.store_fields {
opts = opts.set_stored();
}
builder.add_json_field(name, opts);
return Ok(());
}
Expand Down Expand Up @@ -341,18 +343,20 @@ fn add_field_for_arrow_type(
}

DataType::List(_) | DataType::LargeList(_) | DataType::FixedSizeList(_, _) | DataType::Map(_, _) | DataType::Struct(_) => {
// Complex types → JSON object field: stored + indexed + fast
// Complex types → JSON object field: indexed + fast (stored only if requested)
// Fast fields are required for range queries on numeric sub-fields
// Use "raw" tokenizer to match TANT batch path behavior (exact string matching)
let opts = JsonObjectOptions::default()
.set_stored()
let mut opts = JsonObjectOptions::default()
.set_indexing_options(
TextFieldIndexing::default()
.set_tokenizer("raw")
.set_index_option(IndexRecordOption::Basic)
.set_fieldnorms(config.fieldnorms_enabled),
)
.set_fast(None);
if config.store_fields {
opts = opts.set_stored();
}
builder.add_json_field(name, opts);
}

Expand Down Expand Up @@ -1110,24 +1114,100 @@ mod tests {
assert!(payload_entry.is_fast(), "JSON fields should be fast for range queries on sub-fields");
}

/// Exhaustive test: verify that EVERY column type branch in add_field_for_arrow_type
/// respects store_fields=false (companion mode). No field should be stored.
#[test]
fn test_json_fields_stored_and_indexed() {
fn test_store_fields_false_no_field_stored_for_any_type() {
let arrow = ArrowSchema::new(vec![
Field::new("payload", DataType::Utf8, true),
// Numeric types
Field::new("col_bool", DataType::Boolean, true),
Field::new("col_i32", DataType::Int32, true),
Field::new("col_i64", DataType::Int64, true),
Field::new("col_u32", DataType::UInt32, true),
Field::new("col_u64", DataType::UInt64, true),
Field::new("col_f32", DataType::Float32, true),
Field::new("col_f64", DataType::Float64, true),
// Text types
Field::new("col_utf8", DataType::Utf8, true),
Field::new("col_large_utf8", DataType::LargeUtf8, true),
// Decimal types
Field::new("col_decimal128", DataType::Decimal128(10, 2), true),
Field::new("col_decimal256", DataType::Decimal256(20, 4), true),
// Binary types
Field::new("col_binary", DataType::Binary, true),
Field::new("col_large_binary", DataType::LargeBinary, true),
// Date/time types
Field::new("col_date32", DataType::Date32, true),
Field::new("col_date64", DataType::Date64, true),
Field::new("col_timestamp", DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None), true),
// Complex types → JSON
Field::new("col_list", DataType::List(std::sync::Arc::new(Field::new("item", DataType::Utf8, true))), true),
Field::new("col_struct", DataType::Struct(
vec![Field::new("a", DataType::Int32, true)].into(),
), true),
// IP address (Utf8 declared as IP)
Field::new("col_ip", DataType::Utf8, true),
// JSON (Utf8 declared as JSON)
Field::new("col_json", DataType::Utf8, true),
]);

let mut config = SchemaDerivationConfig::default();
config.json_fields.insert("payload".to_string());
config.store_fields = false; // Companion mode
config.ip_address_fields.insert("col_ip".to_string());
config.json_fields.insert("col_json".to_string());

let schema = derive_tantivy_schema(&arrow, &config).unwrap();

let payload_field = schema.get_field("payload").unwrap();
let payload_entry = schema.get_field_entry(payload_field);
// Every field must NOT be stored
for (_field, entry) in schema.fields() {
assert!(!entry.is_stored(),
"Field '{}' is stored with store_fields=false — companion mode must not store any field",
entry.name());
}
}

/// Verify that store_fields=true (standard/FFI mode) correctly stores all fields.
/// Uses the same exhaustive schema as the false test to ensure symmetric coverage.
#[test]
fn test_store_fields_true_all_fields_stored_for_all_types() {
let arrow = ArrowSchema::new(vec![
Field::new("col_bool", DataType::Boolean, true),
Field::new("col_i32", DataType::Int32, true),
Field::new("col_i64", DataType::Int64, true),
Field::new("col_u32", DataType::UInt32, true),
Field::new("col_u64", DataType::UInt64, true),
Field::new("col_f32", DataType::Float32, true),
Field::new("col_f64", DataType::Float64, true),
Field::new("col_utf8", DataType::Utf8, true),
Field::new("col_large_utf8", DataType::LargeUtf8, true),
Field::new("col_decimal128", DataType::Decimal128(10, 2), true),
Field::new("col_decimal256", DataType::Decimal256(20, 4), true),
Field::new("col_binary", DataType::Binary, true),
Field::new("col_large_binary", DataType::LargeBinary, true),
Field::new("col_date32", DataType::Date32, true),
Field::new("col_date64", DataType::Date64, true),
Field::new("col_timestamp", DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None), true),
Field::new("col_list", DataType::List(std::sync::Arc::new(Field::new("item", DataType::Utf8, true))), true),
Field::new("col_struct", DataType::Struct(
vec![Field::new("a", DataType::Int32, true)].into(),
), true),
Field::new("col_ip", DataType::Utf8, true),
Field::new("col_json", DataType::Utf8, true),
]);

// Should be stored
assert!(payload_entry.is_stored(), "JSON field should be stored");
let mut config = SchemaDerivationConfig::default();
config.store_fields = true; // Standard/FFI mode
config.ip_address_fields.insert("col_ip".to_string());
config.json_fields.insert("col_json".to_string());

// Should be indexed (JsonObject with indexing options)
assert!(payload_entry.is_indexed(), "JSON field should be indexed");
let schema = derive_tantivy_schema(&arrow, &config).unwrap();

// Every field must be stored
for (_field, entry) in schema.fields() {
assert!(entry.is_stored(),
"Field '{}' is NOT stored with store_fields=true — standard mode must store all fields",
entry.name());
}
}

// ── String indexing mode tests ─────────────────────────────────────
Expand Down Expand Up @@ -1225,6 +1305,79 @@ mod tests {
assert!(schema.get_field("log_line__uuids").is_ok());
}

#[test]
fn test_json_field_stored_respects_store_fields_flag() {
let arrow = ArrowSchema::new(vec![
Field::new("payload", DataType::Utf8, true),
]);

// Case 1: store_fields=false (companion mode) → JSON field must NOT be stored
let mut config_no_store = SchemaDerivationConfig::default();
config_no_store.json_fields.insert("payload".to_string());
config_no_store.store_fields = false;

let schema = derive_tantivy_schema(&arrow, &config_no_store).unwrap();
let entry = schema.get_field_entry(schema.get_field("payload").unwrap());
assert!(
matches!(entry.field_type(), tantivy::schema::FieldType::JsonObject(_)),
"payload should be JsonObject type"
);
assert!(!entry.is_stored(),
"JSON field must NOT be stored when store_fields=false (companion mode)");

// Case 2: store_fields=true (standard split) → JSON field must be stored
let mut config_store = SchemaDerivationConfig::default();
config_store.json_fields.insert("payload".to_string());
config_store.store_fields = true;

let schema = derive_tantivy_schema(&arrow, &config_store).unwrap();
let entry = schema.get_field_entry(schema.get_field("payload").unwrap());
assert!(
matches!(entry.field_type(), tantivy::schema::FieldType::JsonObject(_)),
"payload should be JsonObject type"
);
assert!(entry.is_stored(),
"JSON field must be stored when store_fields=true (standard split)");
}

#[test]
fn test_complex_type_json_stored_respects_store_fields_flag() {
// Tests the List/Map/Struct JSON path (line ~345), separate from the Utf8 override path
let arrow = ArrowSchema::new(vec![
Field::new(
"tags",
DataType::List(std::sync::Arc::new(Field::new("item", DataType::Utf8, true))),
true,
),
]);

// Case 1: store_fields=false (companion mode) → complex-type JSON field must NOT be stored
let mut config_no_store = SchemaDerivationConfig::default();
config_no_store.store_fields = false;

let schema = derive_tantivy_schema(&arrow, &config_no_store).unwrap();
let entry = schema.get_field_entry(schema.get_field("tags").unwrap());
assert!(
matches!(entry.field_type(), tantivy::schema::FieldType::JsonObject(_)),
"List field should be JsonObject type"
);
assert!(!entry.is_stored(),
"Complex-type JSON field must NOT be stored when store_fields=false (companion mode)");

// Case 2: store_fields=true (standard split) → complex-type JSON field must be stored
let mut config_store = SchemaDerivationConfig::default();
config_store.store_fields = true;

let schema = derive_tantivy_schema(&arrow, &config_store).unwrap();
let entry = schema.get_field_entry(schema.get_field("tags").unwrap());
assert!(
matches!(entry.field_type(), tantivy::schema::FieldType::JsonObject(_)),
"List field should be JsonObject type"
);
assert!(entry.is_stored(),
"Complex-type JSON field must be stored when store_fields=true (standard split)");
}

#[test]
fn test_text_custom_strip_creates_text_only() {
let arrow = ArrowSchema::new(vec![
Expand Down
95 changes: 0 additions & 95 deletions native/src/parquet_companion/store_stub.rs

This file was deleted.

Loading
Loading