Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions quickwit/quickwit-parquet-engine/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ pub use inspect::{
verify_partition_prefix,
};
pub use split_writer::ParquetSplitWriter;
pub use streaming_reader::{
ColumnPageStream, Page, ParquetReadError, RemoteByteSource, StreamingParquetReader,
StreamingReaderConfig,
};
// Re-export metadata constants for use by the merge module and tests.
pub(crate) use writer::{
PARQUET_META_NUM_MERGE_OPS, PARQUET_META_RG_PARTITION_PREFIX_LEN, PARQUET_META_ROW_KEYS,
Expand Down
117 changes: 112 additions & 5 deletions quickwit/quickwit-parquet-engine/src/storage/streaming_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use tokio::io::{AsyncRead, AsyncReadExt};
/// no business pulling in). Callers in `quickwit-indexing` provide a
/// thin adapter that delegates to `quickwit_storage::Storage`.
#[async_trait]
pub(crate) trait RemoteByteSource: Send + Sync {
pub trait RemoteByteSource: Send + Sync {
/// Total file length in bytes.
async fn file_size(&self, path: &Path) -> io::Result<u64>;

Expand All @@ -85,7 +85,7 @@ pub(crate) trait RemoteByteSource: Send + Sync {

/// Configuration for [`StreamingParquetReader`].
#[derive(Debug, Clone)]
pub(crate) struct StreamingReaderConfig {
pub struct StreamingReaderConfig {
/// Bytes prefetched from the file tail to capture the footer.
/// Default 256 KiB — sized for a 50 MB metrics split with the
/// writer config we ship in production.
Expand All @@ -107,7 +107,7 @@ impl Default for StreamingReaderConfig {

/// Errors from the streaming reader.
#[derive(Error, Debug)]
pub(crate) enum ParquetReadError {
pub enum ParquetReadError {
/// I/O error from the underlying [`RemoteByteSource`].
#[error("io error: {0}")]
Io(#[from] io::Error),
Expand Down Expand Up @@ -162,7 +162,7 @@ pub(crate) enum ParquetReadError {
/// an output writer (PR-6's direct page copy) or decompress + decode
/// for sort-column inspection.
#[derive(Debug)]
pub(crate) struct Page {
pub struct Page {
/// Row group this page belongs to.
pub rg_idx: usize,
/// Column chunk this page belongs to (within the row group).
Expand All @@ -177,12 +177,53 @@ pub(crate) struct Page {
pub bytes: Bytes,
}

/// Object-safe page stream — the contract that PR-6's merge engine
/// consumes for every input file.
///
/// Two implementations:
/// - [`StreamingParquetReader`]: streams pages directly from a remote byte source (the new-format
/// fast path).
/// - The legacy adapter (PR-5): buffers a whole file into a `RecordBatch`, re-encodes it as a
/// single-row-group parquet stream in memory, and presents it through this trait — used when an
/// input's row-group boundaries do not align with the sort prefix (`qh.rg_partition_prefix_len ==
/// 0` AND `num_row_groups > 1`).
///
/// # Contract
/// - [`Self::metadata`] returns the file's parsed metadata. Callable any time after construction;
/// does not consume the stream.
/// - [`Self::next_page`] yields pages in storage order: row-group-major,
/// column-major-within-row-group, page-major-within-column. Returns `Ok(None)` once the file is
/// fully drained, and stays `Ok(None)` on subsequent calls (idempotent EOF).
/// - I/O failures surface as [`ParquetReadError::Io`]; they are not masked as decode errors.
#[async_trait]
pub trait ColumnPageStream: Send {
/// Parsed file metadata. Schema and row-group layout come from
/// here; the caller does not need to issue any further I/O to
/// inspect them.
fn metadata(&self) -> &Arc<ParquetMetaData>;

/// Read the next page in storage order. Returns `Ok(None)` after
/// the last page; further calls continue to return `Ok(None)`.
async fn next_page(&mut self) -> Result<Option<Page>, ParquetReadError>;
}

#[async_trait]
impl ColumnPageStream for StreamingParquetReader {
fn metadata(&self) -> &Arc<ParquetMetaData> {
StreamingParquetReader::metadata(self)
}

async fn next_page(&mut self) -> Result<Option<Page>, ParquetReadError> {
StreamingParquetReader::next_page(self).await
}
}

/// Page-level streaming Parquet reader.
///
/// See module docs for the contract. Caller must consume pages in
/// storage order via [`Self::next_page`]; the body stream is forward-
/// only.
pub(crate) struct StreamingParquetReader {
pub struct StreamingParquetReader {
source: Arc<dyn RemoteByteSource>,
path: PathBuf,
file_size: u64,
Expand Down Expand Up @@ -1396,4 +1437,70 @@ mod tests {
}
}
}

// -------- ColumnPageStream trait dispatch --------

/// Drain a stream behind `&mut dyn ColumnPageStream`. Same contract
/// as the concrete-typed `drain_pages`, just exercising the trait
/// surface PR-5 and PR-6 will consume.
async fn drain_pages_via_trait(stream: &mut dyn ColumnPageStream) -> Vec<Page> {
let mut pages = Vec::new();
while let Some(p) = stream.next_page().await.unwrap() {
pages.push(p);
}
pages
}

/// `StreamingParquetReader` must satisfy the `ColumnPageStream`
/// contract when reached through `&mut dyn ColumnPageStream`.
/// Same row count, same storage order, same idempotent EOF as the
/// concrete-typed path.
#[tokio::test]
async fn test_streaming_reader_satisfies_column_page_stream_trait() {
let batch = make_metrics_batch(2048);
let bytes = write_test_file_multi_page(std::slice::from_ref(&batch), 256);

// Concrete-typed reference run.
let source_concrete = InMemorySource::new(bytes.clone());
let mut reader_concrete = StreamingParquetReader::try_open(source_concrete, dummy_path())
.await
.unwrap();
let pages_concrete = drain_pages(&mut reader_concrete).await;

// Trait-object run.
let source_trait = InMemorySource::new(bytes);
let reader_trait = StreamingParquetReader::try_open(source_trait, dummy_path())
.await
.unwrap();
// Also exercise `metadata()` through the trait surface and
// confirm it agrees with the concrete impl before draining.
let trait_metadata_num_rgs = {
let stream: &dyn ColumnPageStream = &reader_trait;
stream.metadata().num_row_groups()
};
assert_eq!(
trait_metadata_num_rgs,
reader_concrete.metadata().num_row_groups(),
);

let mut reader_trait = reader_trait;
let pages_trait = drain_pages_via_trait(&mut reader_trait).await;

// Idempotent EOF through the trait surface — second call after
// drain still returns Ok(None).
{
let stream: &mut dyn ColumnPageStream = &mut reader_trait;
assert!(stream.next_page().await.unwrap().is_none());
}

// Same number of pages, same (rg, col, page_idx_in_col) tuple
// sequence — i.e., trait dispatch preserves storage order.
assert_eq!(pages_concrete.len(), pages_trait.len());
for (a, b) in pages_concrete.iter().zip(pages_trait.iter()) {
assert_eq!(a.rg_idx, b.rg_idx);
assert_eq!(a.col_idx, b.col_idx);
assert_eq!(a.page_idx_in_col, b.page_idx_in_col);
assert_eq!(a.header.compressed_page_size, b.header.compressed_page_size);
}
}
}
Loading