-
Notifications
You must be signed in to change notification settings - Fork 530
feat: introduce MemWAL regional writer and MemTable reader #5709
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Thanks for the fast turnaround! I will take a look tonight. Meanwhile, I think the code path deserves some benchmark, can you add that? |
4fae180 to
eb46adc
Compare
rust/lance-io/src/object_store.rs
Outdated
| /// (e.g., another writer already wrote the same WAL entry). | ||
| /// | ||
| /// Returns `Err` with `AlreadyExists` if the destination file exists. | ||
| pub async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need rename_if_not_exists and copy_if_not_exists? We can fulfill rename operation with existing object_store APIs, see how we do that in commit.rs to do atomic manifest commit for local storage.
| if pk_fields.is_empty() { | ||
| return Err(Error::invalid_input( | ||
| "MemWAL requires a primary key on the dataset. \ | ||
| Define a primary key using the 'lance-schema:unenforced-primary-key' field metadata.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Arrow field metadata
| #[derive(Debug, Clone, Default)] | ||
| pub struct MemWalConfig { | ||
| /// Region specification for partitioning writes. | ||
| pub region_specs: Vec<RegionSpec>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should just take a single RegionSpec to begin with, and it should be optional (you can create a MemWAL index without region spec).
We can in the future add APIs like add_region_spec (we can add a TODO for those, don't need to add now)
| /// | ||
| /// This opens the vector index and extracts the IVF model and product | ||
| /// quantizer needed for in-memory index maintenance. | ||
| async fn load_vector_index_config( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can these just be functions, not methods within Dataset?
| /// When false: | ||
| /// - Index updates are deferred | ||
| /// - New data may not appear in index-accelerated queries immediately | ||
| pub indexed_writes: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this name is confusing, because it can mean indexed vs not indexed. What about sync_indexed_write
| })?; | ||
|
|
||
| // Best-effort update version hint | ||
| self.write_version_hint(version).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should log a warning if failed
| ..Default::default() | ||
| }; | ||
|
|
||
| self.object_store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should also differentiate between local and cloud, since local will use a rename to ensure atomicity?
| } | ||
|
|
||
| // Parallel scan forward with batches of HEAD requests | ||
| let batch_size = 8; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be a config in region writer config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batch size can just be 2 as default
|
All review comments have been addressed in commit cda38c0:
|
ea29405 to
447d443
Compare
- Move time-based flush check into maybe_trigger_wal_flush() called after each write, eliminating the separate Tick ticker process - This fixes the priority conflict where Tick (with higher priority in biased select) would flush everything before queued size-based triggers - Both time and size triggers now use TriggerWalFlush message with captured end_batch_id, ensuring sequential processing - Rename TriggerFlush to TriggerWalFlush for clarity Co-Authored-By: Jack Ye <yezhaoqin@gmail.com> Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
447d443 to
c49d5d7
Compare
The MemTableFlushHandler's freeze_memtable() was missing the call to
reset_for_new_memtable() after swapping memtables. This caused
pending_bytes to remain stale after a freeze, leading to:
1. Spurious FlushMemTable triggers based on stale pending_bytes
2. Attempting to flush empty memtables ("Cannot flush empty MemTable")
3. Wasted freeze operations on memtables with 0 batches
The fix adds reset_for_new_memtable() after the memtable swap,
matching the behavior in RegionWriter::flush_memtable().
Co-Authored-By: Jack Ye <yezhaoqin@gmail.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
… underflow When reset_for_new_memtable() sets pending_bytes to 0, in-flight WAL flushes from the old memtable may still complete and try to subtract their flushed bytes. This caused underflow to near-max u64 values like 18446744073699071552, leading to spurious flush triggers. The fix adds saturating_sub_pending_bytes() which uses a CAS loop to ensure subtraction never goes below 0. Co-Authored-By: Jack Ye <yezhaoqin@gmail.com> Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Use memtable.estimated_size() for size-based flush triggers instead of tracking pending bytes separately. This simplifies the code by: - Removing pending_bytes atomic counter from WalBuffer - Simplifying track_batch() to only return durability watcher - Adding has_pending_batches() to check if unflushed batches exist - Using total memtable bytes for flush trigger thresholds - Removing saturating_sub_pending_bytes() and related tracking The range-based flushing (last_flushed_id to end_batch_id) already determines what to flush, so tracking bytes separately was redundant. Co-Authored-By: Jack Ye <yezhaoqin@gmail.com> Co-Authored-By: Claude <noreply@anthropic.com>
…hold - Remove overflow mechanism from LockFreeIvfPartitionStorage - Add soft_capacity field (90% of hard capacity) for flush triggering - is_at_capacity() now returns true at 90% to leave 10% buffer - Update test_region_writer_s3 to include IVF-PQ index creation - Simplify get_partition_entries() without overflow handling Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…osed storage The pre-transposed column-major storage was causing extremely slow index updates (~45 seconds per WAL flush for 157 batches). Replace with a simple SkipMap that stores (partition_id, row_position) -> pq_code directly. Key changes: - Remove LockFreeIvfPartitionStorage with complex pre-allocated column-major layout - Use SkipMap<IvfPqKey, Vec<u8>> for O(log n) inserts instead of O(m) per insert - Add transpose_codes() method to convert to column-major format at read time - Remove partition capacity limits (capacity managed by memtable size) Trade-off: Slightly slower reads (transpose at read time) for much faster writes. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
6b5341d to
45ed939
Compare
Add HnswMemIndex using hnsw_rs crate as an alternative to IvfPqMemIndex for faster index updates. HNSW builds incrementally without pre-training, making it better suited for streaming writes. - Add MemVectorIndexType enum with IvfPq (default) and Hnsw variants - Add vector_index_type field to RegionWriterConfig with builder method - Implement HnswMemIndex with support for L2, Cosine, and Dot distances - Load distance type from base IVF-PQ index for HNSW compatibility Co-Authored-By: Jack Ye <yezhaoqin@gmail.com> Co-Authored-By: Claude <noreply@anthropic.com>
45ed939 to
1277042
Compare
IVF-PQ works well in release mode with SIMD optimizations, making HNSW unnecessary for now. This simplifies the codebase by removing the configurable vector index type. Changes: - Remove hnsw_rs dependency from workspace and lance crate - Remove MemVectorIndexType enum and vector_index_type config - Remove HnswMemIndex, HnswIndexConfig and related code from indexes.rs - Simplify api.rs to always use IVF-PQ for vector indexes - Replace eprintln! with debug! logging across mem_wal modules Co-Authored-By: Jack Ye <yezhaoqin@gmail.com> Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Update test_region_writer_s3_ivfpq to maintain all three index types: - BTree index on id column - FTS (Inverted) index on text column - IVF-PQ index on vector column This tests the full index maintenance capability of the MemWAL writer. Co-Authored-By: Jack Ye <yezhaoqin@gmail.com> Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Integration tests run with --nocapture need eprintln! for visible output. tracing::debug! requires a subscriber to be initialized which isn't set up in the test environment. Co-Authored-By: Jack Ye <yezhaoqin@gmail.com> Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Restore comprehensive eprintln! statements that were removed in the HNSW removal commit. These debug statements with tags like [FLUSHER], [WAL_FLUSH], [TRIGGER], [FREEZE], [LANCE_FLUSH] are essential for debugging MemWAL operations. Added #![allow(clippy::print_stderr)] to affected modules: - dispatcher.rs - batch_write.rs - flush.rs - wal.rs - writer.rs Co-Authored-By: Jack Ye <yezhaoqin@gmail.com> Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Changed insert_batches_parallel to spawn a thread for each individual index rather than grouping by type. This maximizes parallelism when multiple indexes are maintained. Before: 3 threads max (one per type: BTree, IVF-PQ, FTS) After: N threads (one per index) Co-Authored-By: Jack Ye <yezhaoqin@gmail.com> Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Log individual index update times to identify slow indexes: [INDEX_UPDATE] 3140 rows, 157 batches: id_btree(btree)=5ms, text_fts(fts)=12ms, vector_idx(ivfpq)=280ms This helps diagnose which index type is the bottleneck during high-throughput writes. Co-Authored-By: Jack Ye <yezhaoqin@gmail.com> Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Move WAL flush watermark from global WalBuffer to per-memtable LockFreeBatchStore. Each memtable now tracks its own wal_flushed_batch_id independently. - Make freeze_memtable instant (sub-millisecond) by removing synchronous WAL flush. Freeze now just swaps memtables and marks the old one as frozen. - Remove ImmutableMemTable wrapper entirely. Durability is guaranteed by WAL, not by Lance flush. Use Arc<MemTable> directly in frozen_memtables queue. - Add freeze(wal_id), frozen_at_wal_id(), is_frozen() methods to MemTable for tracking frozen state. - Update flush_oldest_immutable to finish WAL flush for the frozen memtable before Lance flush, using the memtable's own batch_store and indexes. This allows writes to continue immediately after freeze while background processing handles WAL and Lance flushes independently. Co-Authored-By: Jack Ye <yezhaoqin@gmail.com> Co-Authored-By: Claude <noreply@anthropic.com>
Set batch_capacity based on actual number of batches (with 10% buffer) to avoid "batch store is full" errors when running with large batch counts. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…ibility - Add LockFreeIvfPqPartitionStore with pre-transposed PQ codes for zero-cost search-time access - Implement hybrid storage: primary (pre-allocated, fast) + overflow (SkipMap, graceful degradation when primary full) - Update IvfPqMemIndex to use partition-based storage instead of SkipMap - Fix MVCC visibility filtering for all index types (IVF-PQ, BTree, FTS): - Push max_row_position into search to filter before top-k truncation - Remove invalid seq==0 visibility check - Use Option<u64> for max_visible_row to handle empty visibility Co-Authored-By: Jack Ye <yezhaoqin@gmail.com> Co-Authored-By: Claude <noreply@anthropic.com>
|
ACTION NEEDED The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. For details on the error please inspect the "PR Title Check" action. |
Based on draft shared from @jackye1995 , cleanup and publish for review, also added a custom fix that
RegionWriterusesArc<EpochGuard>instead ofEpochGuardto avoid unnecessarily incrementing the epoch.Memtable read performance: