Skip to content

feat: multi-part AOF persistence, BGREWRITEAOF, optimized RDB loader#37

Closed
TinDang97 wants to merge 10 commits into
mainfrom
feat/persistence-overhaul
Closed

feat: multi-part AOF persistence, BGREWRITEAOF, optimized RDB loader#37
TinDang97 wants to merge 10 commits into
mainfrom
feat/persistence-overhaul

Conversation

@TinDang97
Copy link
Copy Markdown
Collaborator

@TinDang97 TinDang97 commented Apr 6, 2026

Summary

Major persistence overhaul: multi-part AOF format (Redis 7+ compatible), crash recovery, BGREWRITEAOF for monoio, and 3x faster RDB loading.

  • 5 critical bugs fixed: dual SO_REUSEPORT listener, io_uring accept cancel/resubmit, missing tokio cfg gates, AOF not replayed on monoio startup, BGREWRITEAOF missing from sharded handlers
  • Multi-part AOF: appendonlydir/ with base.rdb + incr.aof + manifest (matches Redis 7+ format)
  • BGREWRITEAOF: RDB preamble compaction, automatic old file cleanup, 62MB → 9.3MB (1.16x Redis)
  • RDB loader 3x faster: insert_for_load (skip duplicate check), DashTable pre-sizing, direct Vec→CompactValue path, cached timestamps
  • 100% crash recovery: 38/38 real use-case consistency checks across 6 scenarios

Benchmarks (AOF everysec, 1 shard, monoio)

Command p=16 p=64
SET 2.06x Redis 2.15x Redis (5.4M/s)
GET 1.12x 2.17x Redis (11.7M/s)

RDB load: 632K keys in 107ms = 5.9M keys/sec

Test plan

  • 1,541 unit tests pass (monoio default)
  • Tokio feature set compiles clean
  • 100% crash recovery (10 scenarios, 38 consistency checks)
  • BGREWRITEAOF: 68/68 data integrity checks
  • Multi-part AOF: 8/8 lifecycle tests (write → rewrite → more writes → crash → recover)
  • Recovery performance benchmarked at 10K/100K/500K/1M/2M keys

Summary by CodeRabbit

  • New Features
    • Sharded BGREWRITEAOF and multi-part AOF (base+incremental+manifest)
  • Improvements
    • AOF replay supports RDB preamble + RESP tail; manifest auto-initializes when missing
    • Faster bulk RDB serialization/restore with fewer copies and preallocation
    • Persistence commands now honor ACLs and report when AOF is disabled
    • Reduced per-shard accept overhead for connection handling
  • Documentation
    • Updated benchmarks and persistence docs (multi-part AOF, fast RDB loader)
  • Chores
    • Removed legacy consistency test script; some tests gated to tokio runtime

…sh recovery

Major persistence overhaul fixing 5 critical bugs and implementing Redis 7+
compatible multi-part AOF format (base.rdb + incr.aof + manifest).

Bugs fixed:
- Dual SO_REUSEPORT listener: monoio central listener stole ~50% connections
- io_uring accept cancel/resubmit: select! dropped accept future every 1ms tick
- Missing cfg gates: tokio tests failed on monoio default runtime
- AOF not replayed on monoio startup: writes went to global AOF, recovery read WAL
- BGREWRITEAOF missing from sharded handlers

Multi-part AOF:
- appendonlydir/ with moon.aof.N.base.rdb + moon.aof.N.incr.aof + manifest
- BGREWRITEAOF snapshots to RDB base, creates fresh incr, advances sequence
- Old files cleaned up automatically on rewrite
- Backward compatible: legacy single-file AOF still loads

Recovery: 100% across 10 crash scenarios, 38/38 real use-case consistency checks.
Compaction: 62MB → 9.3MB (1.16x Redis). Benchmarks: SET 1.99x Redis at p=64 with AOF.
… keys

Six optimizations to the RDB load path:

1. insert_for_load(): skip duplicate check + memory accounting during bulk load,
   single recalculate_memory() pass after all inserts (biggest win: ~2-3x)
2. Pre-sized DashTable: two-pass load counts entries per db first, then
   Database::reserve() eliminates ~9K segment splits for 500K keys
3. Zero-copy read_bytes_zero_copy(): Bytes::slice() from shared buffer
   instead of Vec alloc + copy per field
4. Cached current_secs(): derive from now_ms once, not syscall per entry
5. read_entry_zero_copy(): combines fixes 3+4 for the main entry parser
6. count_entries_per_db(): fast scan of type tags without parsing values

Results (after BGREWRITEAOF, recovery from RDB base):
  10K keys:  125ms → 4ms   (31x faster, 2.0x faster than Redis)
  100K keys: 109ms → 64ms  (1.7x faster)
  500K keys: 312ms → 111ms (2.8x faster)
  Mixed 50K: 121ms → 7ms   (17x faster, 2.7x faster than Redis)
…es overhead

Root cause: CompactValue::heap_string(data.to_vec()) during RDB load created an
intermediate Bytes via RedisValue::String, then converted back to Vec inside
CompactValue. This double-conversion (Vec→Bytes→Vec) was the dominant cost.

Fix: heap_string_vec_direct(Vec<u8>) builds CompactValue directly from an owned
Vec, skipping the RedisValue intermediate entirely. read_bytes_vec() returns
Vec<u8> instead of Bytes for the string fast path in read_entry_zero_copy.

Also adds heap_string_owned(Bytes) for the normal SET command path where the
protocol parser already provides Bytes — calls Bytes::into::<Vec<u8>> which is
zero-copy when Bytes has unique ownership.

632K keys load: 107ms warm (5.9M keys/sec), down from 320ms original (2.0M/s).
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 6, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds multi-part AOF (manifest, base RDB + incremental AOF), sharded BGREWRITEAOF enqueueing/handling, AOF replay with embedded RDB preamble, in-memory RDB save/load helpers, startup replay into shard[0], per-shard WAL skip when global AOF enabled, listener/accept refactors, and bulk-load storage helpers.

Changes

Cohort / File(s) Summary
AOF Manifest & Multi‑part Replay
src/persistence/aof_manifest.rs, src/persistence/mod.rs
New AofManifest module: manifest file, per-seq *.base.rdb and *.incr.aof; supports initialize, load, atomic write_manifest, advance() (rotate base+incr) and replay_multi_part() (load base RDB then replay incremental RESP).
AOF Core, Messages & Rewrite
src/persistence/aof.rs
Added AofMessage::RewriteSharded(Arc<...::ShardDatabases>); writer uses manifest-driven incr files; monoio/tokio rewrite paths implemented for sharded/single modes; persistent I/O error handling; replay_aof detects RDB preamble + RESP and returns combined counts.
Startup Integration & Command Handler Wiring
src/main.rs, src/command/persistence.rs, src/server/conn/handler_monoio.rs, src/server/conn/handler_sharded.rs
Startup: when appendonly == "yes" attempt AofManifest::load() + replay_multi_part() into shards[0], fallback to legacy replay_aof(); added bgrewriteaof_start_sharded(...); connection handlers route BGREWRITEAOF to sharded enqueue or return -ERR AOF is not enabled.
RDB Serialization & Loader
src/persistence/rdb.rs
Added save_to_bytes() / save_snapshot_to_bytes() and load_from_bytes(); refactored save/load to use shared Bytes buffers, pre-counting/reserving, zero-copy reads, stricter corruption handling, and returning (keys_loaded, bytes_consumed) for embedded-RDB use.
Sharded Accept / Event Loop Changes
src/shard/event_loop.rs, src/server/listener.rs
Monoio per-shard accept moved to dedicated spawned accept task (take+spawn loop); central listener binding made conditional when per_shard_accept is enabled; accept control flow simplified.
Storage Bulk‑Load & CompactValue Optimizations
src/storage/db.rs, src/storage/compact_value.rs, src/shard/shared_databases.rs
Added Database::insert_for_load, reserve, recalculate_memory; new heap-string APIs heap_string_owned/heap_string_vec_direct and optimized heap-string allocation; exposed ShardDatabases::all_shard_dbs() for cross-shard iteration.
Server Handlers: ACL & Queueing Adjustments
src/server/conn/handler_monoio.rs, src/server/conn/handler_sharded.rs
Moved BGSAVE/SAVE/LASTSAVE behind ACL checks; in sharded handler in_multi queueing moved earlier; persistence handlers short-circuit dispatch and BGREWRITEAOF support enqueued for sharded mode.
Tests, Docs & Misc
tests/integration.rs, tests/replication_test.rs, src/shard/dispatch.rs, README.md, scripts/test-consistency.sh
Gated integration/replication tests behind runtime-tokio; three async tests behind runtime-tokio; README updated with multi-part AOF and platform benchmarks; removed scripts/test-consistency.sh.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Handler as Connection Handler
    participant AofTx as AOF Channel
    participant AofTask as AOF Worker
    participant RDB as RDB Module
    participant Manifest as AofManifest
    participant Disk as Filesystem

    Client->>Handler: BGREWRITEAOF
    Handler->>AofTx: try_send(RewriteSharded)
    AofTx->>AofTask: receive RewriteSharded
    AofTask->>AofTask: snapshot per-shard DBs
    AofTask->>RDB: save_to_bytes(databases)
    RDB-->>AofTask: rdb_bytes
    AofTask->>Manifest: advance(rdb_bytes)
    Manifest->>Disk: write new base.rdb (tmp+rename)
    Manifest->>Disk: create next incr.aof
    Manifest->>Disk: write manifest atomically
    Manifest-->>AofTask: new incr path
    AofTask->>Disk: remove old files (best-effort)
    AofTask-->>Handler: complete
    Handler-->>Client: "Background append only file rewriting started"
Loading
sequenceDiagram
    participant Startup as Main Startup
    participant Manifest as AofManifest
    participant Disk as Filesystem
    participant RDB as RDB Module
    participant Shard as Shards[0]
    participant Replay as Replay Engine

    Startup->>Manifest: load(persistence_dir)
    alt manifest exists
        Manifest-->>Startup: Some(manifest)
        Startup->>Disk: read base.rdb bytes
        Startup->>RDB: load_from_bytes(base_bytes)
        RDB->>Shard: insert_for_load() keys
        Startup->>Disk: read incr.aof
        Startup->>Replay: replay_multi_part(...)
        Replay->>Shard: dispatch RESP commands
    else manifest missing
        Manifest-->>Startup: None
        Startup->>Disk: check legacy appendfile
        alt legacy file exists
            Startup->>RDB: replay_aof(appendfile) (detect RDB preamble + RESP)
            RDB->>Shard: restore entries
        end
    end
    Startup-->>Startup: continue init
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

enhancement

🐰
I nibble manifests, pack RDB in rows,
Shards hum as incremental rivers flow,
BGREWRITE hops, writers dance and write,
Recovery wakes in morning light.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The PR title clearly summarizes the main changes: multi-part AOF persistence, BGREWRITEAOF command, and RDB loader optimizations.
Description check ✅ Passed The PR description includes a detailed summary, performance benchmarks, and a comprehensive test plan, covering all major sections of the template except explicitly listing cargo checks.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/persistence-overhaul

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@qodo-code-review
Copy link
Copy Markdown

Review Summary by Qodo

Multi-part AOF persistence, BGREWRITEAOF for sharded mode, 3x faster RDB loader

✨ Enhancement 🐞 Bug fix

Grey Divider

Walkthroughs

Description
• Multi-part AOF persistence (Redis 7+ compatible) with RDB preamble + incremental RESP
• BGREWRITEAOF support for sharded mode with automatic old file cleanup
• RDB loader optimized 2-30x faster via pre-sizing, zero-copy reads, cached timestamps
• Fixed 5 critical bugs: dual SO_REUSEPORT listener, io_uring accept cancel/resubmit, missing cfg
  gates, AOF replay on monoio, BGREWRITEAOF in sharded handlers
Diagram
flowchart LR
  A["RDB/AOF Load"] --> B["Multi-part AOF<br/>base.rdb + incr.aof"]
  A --> C["Legacy AOF<br/>RDB preamble"]
  B --> D["Manifest Tracking<br/>seq + files"]
  D --> E["BGREWRITEAOF<br/>Snapshot → Advance"]
  E --> F["Auto Cleanup<br/>Old Files"]
  G["RDB Loader"] --> H["Pre-sized DashTable"]
  G --> I["Zero-copy Reads"]
  G --> J["Cached Timestamps"]
  H --> K["2-30x Faster"]
  I --> K
  J --> K
Loading

Grey Divider

File Changes

1. src/persistence/aof_manifest.rs ✨ Enhancement +297/-0

New multi-part AOF manifest tracking system

src/persistence/aof_manifest.rs


2. src/persistence/aof.rs ✨ Enhancement +266/-29

Multi-part AOF writer with RDB preamble and sharded rewrite

src/persistence/aof.rs


3. src/persistence/rdb.rs ✨ Enhancement +511/-42

Optimized RDB loader with pre-sizing and zero-copy reads

src/persistence/rdb.rs


View more (13)
4. src/main.rs ✨ Enhancement +46/-0

AOF replay on startup for multi-part and legacy formats

src/main.rs


5. src/command/persistence.rs ✨ Enhancement +15/-0

BGREWRITEAOF handler for sharded mode

src/command/persistence.rs


6. src/server/listener.rs 🐞 Bug fix +17/-9

Fix dual SO_REUSEPORT listener bug in sharded mode

src/server/listener.rs


7. src/shard/event_loop.rs 🐞 Bug fix +73/-36

Dedicated accept loop to fix io_uring cancel/resubmit bug

src/shard/event_loop.rs


8. src/server/conn/handler_sharded.rs ✨ Enhancement +13/-0

Add BGREWRITEAOF command support in sharded handler

src/server/conn/handler_sharded.rs


9. src/server/conn/handler_monoio.rs ✨ Enhancement +13/-0

Add BGREWRITEAOF command support in monoio sharded handler

src/server/conn/handler_monoio.rs


10. src/shard/shared_databases.rs ✨ Enhancement +8/-0

Expose all_shard_dbs for AOF rewrite snapshot access

src/shard/shared_databases.rs


11. src/shard/dispatch.rs 🐞 Bug fix +3/-0

Add cfg gates for tokio-only async tests

src/shard/dispatch.rs


12. src/storage/db.rs ✨ Enhancement +31/-0

Bulk-load insert, memory recalculation, and table pre-sizing

src/storage/db.rs


13. src/storage/compact_value.rs ✨ Enhancement +32/-15

Fast path for heap string creation from owned Vec/Bytes

src/storage/compact_value.rs


14. src/persistence/mod.rs ✨ Enhancement +1/-0

Export new aof_manifest module

src/persistence/mod.rs


15. tests/integration.rs 🐞 Bug fix +3/-0

Add tokio runtime feature gate for integration tests

tests/integration.rs


16. tests/replication_test.rs 🐞 Bug fix +3/-0

Add tokio runtime feature gate for replication tests

tests/replication_test.rs


Grey Divider

Qodo Logo

@qodo-code-review
Copy link
Copy Markdown

qodo-code-review Bot commented Apr 6, 2026

Code Review by Qodo

🐞 Bugs (2) 📘 Rule violations (1) 📎 Requirement gaps (0) 🎨 UX Issues (0)

Grey Divider


Action required

1. AOF replay targets shard0 🐞 Bug ≡ Correctness
Description
In sharded startup, AOF replay is applied only to shards[0].databases, so recovered keys/commands
end up on shard 0 even when the runtime routes requests by key_to_shard(). After restart, most
keys will be looked up on a different shard than where they were restored, causing widespread
misses/inconsistent state.
Code

src/main.rs[R227-251]

+            let base_dir = std::path::PathBuf::from(dir);
+            let target_dbs = &mut shards[0].databases;
+
+            if let Some(manifest) = AofManifest::load(&base_dir) {
+                // Multi-part AOF: load base RDB + replay incremental RESP
+                match moon::persistence::aof_manifest::replay_multi_part(
+                    target_dbs,
+                    &manifest,
+                    &DispatchReplayEngine,
+                ) {
+                    Ok(n) => info!(
+                        "AOF loaded (multi-part seq {}): {} keys/commands",
+                        manifest.seq, n
+                    ),
+                    Err(e) => tracing::error!("AOF multi-part load failed: {}", e),
+                }
+            } else {
+                // Legacy single-file AOF (backward compatible)
+                let aof_path = base_dir.join(&config.appendfilename);
+                if aof_path.exists() {
+                    match aof::replay_aof(
+                        target_dbs,
+                        &aof_path,
+                        &DispatchReplayEngine,
+                    ) {
Evidence
main.rs hard-codes the AOF replay target to shard 0’s database vector. The runtime’s sharding
logic hashes each key to an owning shard, so restoring all keys into shard 0 breaks routing
invariants and makes most keys unreachable on their correct shards.

src/main.rs[204-260]
src/shard/dispatch.rs[138-147]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
AOF replay during sharded startup is applied only to `shards[0].databases`, which breaks the invariant that keys must live on the shard chosen by `key_to_shard()`. This causes recovered data to be routed to the wrong shard after restart.
### Issue Context
The sharded runtime routes operations by hashing keys; persistence replay must restore keys into the same shard layout.
### Fix Focus Areas
- src/main.rs[217-260]
- src/shard/dispatch.rs[138-147]
### Implementation direction
- For **multi-part AOF base RDB**: load into temporary `Vec<Database>` and then distribute keys to shard databases using existing sharding logic (e.g., the same approach as RDB distribution utilities).
- For **incremental RESP replay**: parse each command and route it to the correct shard before applying (single-key commands via `key_to_shard`, multi-key commands need command-specific routing consistent with the live dispatcher).
- Alternatively, if global AOF is not intended for multi-shard restore, gate this path behind `num_shards == 1` and document/enforce that constraint.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


2. WAL and AOF both replayed🐞 Bug ≡ Correctness
Description
Each shard already restores state via snapshot+per-shard WAL, but startup then replays the global
AOF afterwards, and both WAL and AOF record the same serialized write commands. This can apply
non-idempotent writes twice (e.g., INCR/LPUSH) and/or overwrite newer WAL-derived state with an
older AOF base snapshot.
Code

src/main.rs[R214-261]

  })
  .collect();
+    // Replay AOF data from disk (supplements per-shard WAL restore above).
+    //
+    // Priority order:
+    // 1. Multi-part AOF (appendonlydir/ with manifest) — base RDB + incremental RESP
+    // 2. Legacy single-file AOF (appendonly.aof) — RDB preamble or pure RESP
+    if config.appendonly == "yes" {
+        if let Some(ref dir) = persistence_dir {
+            use moon::persistence::aof_manifest::AofManifest;
+            use moon::persistence::replay::DispatchReplayEngine;
+
+            let base_dir = std::path::PathBuf::from(dir);
+            let target_dbs = &mut shards[0].databases;
+
+            if let Some(manifest) = AofManifest::load(&base_dir) {
+                // Multi-part AOF: load base RDB + replay incremental RESP
+                match moon::persistence::aof_manifest::replay_multi_part(
+                    target_dbs,
+                    &manifest,
+                    &DispatchReplayEngine,
+                ) {
+                    Ok(n) => info!(
+                        "AOF loaded (multi-part seq {}): {} keys/commands",
+                        manifest.seq, n
+                    ),
+                    Err(e) => tracing::error!("AOF multi-part load failed: {}", e),
+                }
+            } else {
+                // Legacy single-file AOF (backward compatible)
+                let aof_path = base_dir.join(&config.appendfilename);
+                if aof_path.exists() {
+                    match aof::replay_aof(
+                        target_dbs,
+                        &aof_path,
+                        &DispatchReplayEngine,
+                    ) {
+                        Ok(n) => info!(
+                            "AOF loaded (legacy): {} commands from {}",
+                            n, aof_path.display()
+                        ),
+                        Err(e) => tracing::error!("AOF load failed: {}", e),
+                    }
+                }
+            }
+        }
+    }
Evidence
On startup, restore_from_persistence() replays each shard’s WAL into its databases. The
connection/shard execution path also appends successful write commands to WAL and separately appends
the same serialized command bytes to the global AOF. Replaying both recovery streams in one startup
sequence therefore re-executes the same mutations twice and breaks ordering vs the AOF base
snapshot.

src/shard/mod.rs[57-95]
src/shard/spsc_handler.rs[213-224]
src/server/conn/handler_sharded.rs[1355-1364]
src/server/conn/handler_sharded.rs[1420-1423]
src/main.rs[204-260]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
Sharded startup currently restores per-shard snapshot+WAL, then replays the global AOF. Since successful write commands are persisted to **both** WAL and AOF (using the same `aof::serialize_command` bytes), recovery can execute the same non-idempotent mutations twice and/or reconstruct state out of chronological order.
### Issue Context
- `Shard::restore_from_persistence()` replays per-shard WAL into the shard’s databases.
- Shard execution appends serialized write commands to WAL.
- Connection handler appends the same serialized write commands to the global AOF.
- `main.rs` performs both restores in one boot.
### Fix Focus Areas
- src/main.rs[204-260]
- src/shard/mod.rs[57-95]
- src/shard/spsc_handler.rs[213-224]
- src/server/conn/handler_sharded.rs[1355-1364]
- src/server/conn/handler_sharded.rs[1420-1423]
### Implementation direction (pick one clear source of truth)
1) **Global AOF is authoritative:**
- Skip replaying per-shard WAL into DB state (or gate it off when `appendonly=yes`), and use WAL only for components that truly need it (e.g., vector store) with a separate log/stream.
2) **Per-shard WAL is authoritative:**
- Do not replay the global AOF in sharded mode (or do not write to it in sharded mode).
3) If both must exist:
- Ensure WAL and AOF contain disjoint data and define strict ordering (base snapshot -> incremental log) without reapplying the same command stream twice.
Add an explicit startup decision + logging so operators can see which persistence source was used.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

3. to_vec() allocations in shard init 📘 Rule violation ➹ Performance
Description
src/shard/event_loop.rs (a listed hot-path module) introduces heap allocations via to_vec() and
multiple .clone() calls when spawning the per-shard accept task. This violates the hot-path
allocation avoidance rule and can add overhead during shard startup/reload paths.
Code

src/shard/event_loop.rs[R346-367]

+            let tls_cfg = tls_config.clone();
+            let shard_dbs = shard_databases.clone();
+            let dtx = dispatch_tx.clone();
+            let ps = pubsub_arc.clone();
+            let blk = blocking_rc.clone();
+            let sd = shutdown.clone();
+            let atx = aof_tx.clone();
+            let trk = tracking_rc.clone();
+            let lua = lua_rc.clone();
+            let sc = script_cache_rc.clone();
+            let acl = acl_table.clone();
+            let rtcfg = runtime_config.clone();
+            let svcfg = server_config.clone();
+            let notifs = all_notifiers.to_vec();
+            let snap_tx = snapshot_trigger_tx.clone();
+            let rstate = repl_state.clone();
+            let cstate = cluster_state.clone();
+            let clock = cached_clock.clone();
+            let rsm = remote_sub_map_arc.clone();
+            let all_ps = all_pubsub_registries.to_vec();
+            let all_rsm = all_remote_sub_maps.to_vec();
+            let aff = affinity_tracker.clone();
Evidence
The compliance rule forbids introducing allocations/conversions like clone()/to_vec() in
hot-path modules including src/shard/event_loop.rs. The added code copies multiple collections
with to_vec() and clones many values while wiring the new accept loop task.

CLAUDE.md
src/shard/event_loop.rs[346-367]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
New code in `src/shard/event_loop.rs` introduces `to_vec()` allocations and multiple `.clone()` operations in a hot-path module when spawning the per-shard accept loop.
## Issue Context
Compliance requires avoiding heap allocations/expensive conversions in hot-path modules (including `src/shard/event_loop.rs`). Even if this runs mainly at shard startup, the rule’s failure criteria flags these patterns when introduced in hot-path code.
## Fix Focus Areas
- src/shard/event_loop.rs[346-367]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


4. Reserve can drop existing data 🐞 Bug ≡ Correctness
Description
Database::reserve() replaces self.data with a new DashTable, which discards existing entries
when additional > self.data.len(). Since rdb::load() calls reserve() before inserting,
invoking RDB loads on a non-empty database (e.g., after another restore path) can silently wipe
previously restored data.
Code

src/storage/db.rs[R428-435]

+    /// Pre-size the internal hash table for an expected key count.
+    /// Eliminates segment splits during bulk load.
+    pub fn reserve(&mut self, additional: usize) {
+        if additional > self.data.len() {
+            let new_table = DashTable::with_capacity(additional);
+            self.data = new_table;
+        }
+    }
Evidence
The new reserve() implementation is not a capacity reservation; it swaps in a fresh table. The RDB
loader now calls reserve(count) up-front based on a first-pass count, so calling rdb::load() on
any non-empty Database can lose state even before inserts start.

src/storage/db.rs[428-435]
src/persistence/rdb.rs[237-245]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`Database::reserve()` currently replaces the whole backing table, which is destructive if called on a non-empty DB. The RDB loader uses this method, so future/alternate call orders can cause silent data loss.
### Issue Context
This was introduced as a performance optimization, but the method name/visibility suggests safe reservation semantics.
### Fix Focus Areas
- src/storage/db.rs[428-435]
- src/persistence/rdb.rs[237-245]
### Implementation direction
- If `DashTable` supports reserving/growing, use that.
- If it does not, then:
- Rename to something explicit like `reset_with_capacity_for_load(cap)` and keep it `pub(crate)`.
- Add `debug_assert!(self.data.is_empty())` (or return an error) to enforce the only-safe use case.
- Alternatively, rehash/migrate existing entries into the new table instead of discarding.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


5. Monoio AOF init exits writer🐞 Bug ☼ Reliability
Description
In the monoio AOF writer path, a manifest initialization failure returns from aof_writer_task,
leaving AOF enabled but with no writer consuming messages. Callers ignore try_send failures when
appending AOF entries, so durability can be lost after startup without client-visible errors.
Code

src/persistence/aof.rs[R125-137]

+        // Load or create manifest
+        let mut manifest = match AofManifest::load(&base_dir) {
+            Some(m) => m,
+            None => {
+                // First run or migration from legacy single-file AOF.
+                // Initialize multi-part with seq 1.
+                match AofManifest::initialize(&base_dir) {
+                    Ok(m) => m,
+                    Err(e) => {
+                        error!("Failed to initialize AOF manifest: {}", e);
+                        // Fallback: write to legacy path
+                        return;
+                    }
Evidence
The monoio writer returns immediately on AofManifest::initialize error, despite a comment claiming
fallback. The sharded connection handler intentionally discards the try_send result when appending
AOF bytes, so once the writer exits, persistence silently degrades to ‘best effort’ memory-only
until restart.

src/persistence/aof.rs[125-137]
src/server/conn/handler_sharded.rs[1420-1423]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
Monoio AOF writer task exits on manifest init failure, but the rest of the server continues to run with AOF enabled and producers dropping send errors. This risks losing durability without clear operator/client visibility.
### Issue Context
The code comment says “Fallback: write to legacy path” but the implementation returns.
### Fix Focus Areas
- src/persistence/aof.rs[125-137]
- src/server/conn/handler_sharded.rs[1420-1423]
### Implementation direction
- Either:
1) Implement the stated fallback: open and append to the legacy `aof_path` if manifest init fails; or
2) Treat this as fatal when `appendonly=yes` (bubble error up and abort startup), so operators don’t run with broken persistence.
- Additionally, consider logging/metrics when `try_send` fails so persistence backpressure/failure is observable.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

ⓘ The new review experience is currently in Beta. Learn more

Grey Divider

Qodo Logo

Comment thread src/main.rs Outdated
Comment on lines +227 to +251
let base_dir = std::path::PathBuf::from(dir);
let target_dbs = &mut shards[0].databases;

if let Some(manifest) = AofManifest::load(&base_dir) {
// Multi-part AOF: load base RDB + replay incremental RESP
match moon::persistence::aof_manifest::replay_multi_part(
target_dbs,
&manifest,
&DispatchReplayEngine,
) {
Ok(n) => info!(
"AOF loaded (multi-part seq {}): {} keys/commands",
manifest.seq, n
),
Err(e) => tracing::error!("AOF multi-part load failed: {}", e),
}
} else {
// Legacy single-file AOF (backward compatible)
let aof_path = base_dir.join(&config.appendfilename);
if aof_path.exists() {
match aof::replay_aof(
target_dbs,
&aof_path,
&DispatchReplayEngine,
) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Aof replay targets shard0 🐞 Bug ≡ Correctness

In sharded startup, AOF replay is applied only to shards[0].databases, so recovered keys/commands
end up on shard 0 even when the runtime routes requests by key_to_shard(). After restart, most
keys will be looked up on a different shard than where they were restored, causing widespread
misses/inconsistent state.
Agent Prompt
### Issue description
AOF replay during sharded startup is applied only to `shards[0].databases`, which breaks the invariant that keys must live on the shard chosen by `key_to_shard()`. This causes recovered data to be routed to the wrong shard after restart.

### Issue Context
The sharded runtime routes operations by hashing keys; persistence replay must restore keys into the same shard layout.

### Fix Focus Areas
- src/main.rs[217-260]
- src/shard/dispatch.rs[138-147]

### Implementation direction
- For **multi-part AOF base RDB**: load into temporary `Vec<Database>` and then distribute keys to shard databases using existing sharding logic (e.g., the same approach as RDB distribution utilities).
- For **incremental RESP replay**: parse each command and route it to the correct shard before applying (single-key commands via `key_to_shard`, multi-key commands need command-specific routing consistent with the live dispatcher).
- Alternatively, if global AOF is not intended for multi-shard restore, gate this path behind `num_shards == 1` and document/enforce that constraint.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment thread src/main.rs
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

🧹 Nitpick comments (3)
src/shard/shared_databases.rs (1)

114-120: Prefer a narrower accessor than exposing raw shard lock containers.

all_shard_dbs() couples callers to internal storage shape. Consider exposing an iterator/helper API for AOF rewrite traversal instead of returning &[Vec<RwLock<Database>>].

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/shard/shared_databases.rs` around lines 114 - 120, The method
all_shard_dbs() exposes the internal shards: Vec<RwLock<Database>> layout;
instead create a narrower API for callers (especially the AOF rewrite path) by
replacing the accessor with a controlled traversal helper such as pub fn
for_each_db<F: FnMut(&RwLock<Database>)>(&self, f: F) { for shard in
&self.shards { for db in shard { f(db) } } } or by returning a custom iterator
type (e.g., AllDbsIter) that yields &RwLock<Database> while keeping self.shards
private; update callers to use for_each_db or the iterator instead of relying on
&[Vec<RwLock<Database>>].
src/storage/db.rs (1)

406-435: Move the bulk-load helpers out of db.rs.

This file is already past the repo's size limit, and adding restore-only helpers here makes the hot path harder to audit. A small db/bulk_load.rs or separate impl Database module would keep load-path logic isolated.

As per coding guidelines, "No single .rs file should exceed 1500 lines; split into submodules if approaching this limit."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/db.rs` around lines 406 - 435, Extract the restore-only helpers
insert_for_load, recalculate_memory, and reserve from the large Database impl in
db.rs into a new submodule (e.g., bulk_load) as an impl block for the same
Database type; create a new db/bulk_load.rs with the three methods (keeping
signatures and visibility) and add a mod bulk_load; use self::bulk_load or
pub(crate) as appropriate so callers in the restore path still find them; update
any use/imports in modules that referenced these methods and run tests to ensure
visibility (pub/pub(crate)) matches existing callers so behavior is unchanged.
src/persistence/rdb.rs (1)

312-734: Consider extracting the fast-load path into a submodule.

rdb.rs is already beyond the repo limit, and this PR adds most of a second loader implementation inline. Splitting the counting/skip/zero-copy helpers would make the corruption paths much easier to audit.

As per coding guidelines, "No single .rs file should exceed 1500 lines; split into submodules if approaching this limit."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/rdb.rs` around lines 312 - 734, The file is too large and the
new fast-load logic should be extracted into a submodule; move the counting/skip
helpers and zero-copy loader into a new module (e.g., rdb::fast_load) by
extracting functions count_entries_per_db, skip_entry, read_u32_raw,
skip_bytes_field, and read_entry_zero_copy into that module, update their
visibility (pub(crate) as needed), import them from load_from_bytes, and keep
load_from_bytes in the parent rdb.rs to orchestrate reading/CRC and DB
insertion; ensure any referenced types (Entry, RedisValue, StreamId, Database,
MoonError, constants like EOF_MARKER/DB_SELECTOR/TYPE_*) are re-exported or
brought into scope in the new module and update mod declarations and use
statements and run tests to confirm no public-API changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/main.rs`:
- Around line 227-257: The current code replays AofManifest::load and
aof::replay_aof into &mut shards[0].databases (target_dbs), which routes all
recovered commands to shard 0; update the replay logic so it is shard-aware:
call moon::persistence::aof_manifest::replay_multi_part and aof::replay_aof into
a temporary Databases container (not shards[0]) or into per-shard databases,
then iterate the recovered keys/commands and redistribute them into the correct
shard-owned databases (shards[i].databases) based on the same sharding function
used elsewhere; ensure DispatchReplayEngine usage remains but apply the shard
mapping when inserting/committing replayed entries so shards 1..N are populated
correctly.

In `@src/persistence/aof_manifest.rs`:
- Around line 208-219: The code currently swallows errors from
crate::persistence::rdb::load when manifest.base_path() exists and continues to
replay incr.aof; instead, change the behavior so that if
crate::persistence::rdb::load(databases, &base_path) returns Err(e) you
propagate that error upward (return Err(e) or use the ? operator) instead of
logging and continuing — this prevents applying incr.aof deltas against a
missing/invalid base; update the block around manifest.base_path() /
crate::persistence::rdb::load to propagate the error to the caller so startup
can decide how to handle the failure.

In `@src/persistence/aof.rs`:
- Around line 125-139: Currently AofManifest::initialize() (called inside the
match in AofManifest::load / this block) is creating appendonlydir/ and the
manifest before startup recovery determines whether legacy single-file AOF
should be used; move manifest creation out of the load path and defer
initialization until after recovery/format detection (or when the writer thread
is started) so AofManifest::load(&base_dir) returns None when only legacy
appendfilename exists; specifically, stop calling AofManifest::initialize()
inside this failure branch of AofManifest::load, instead return None (or detect
legacy appendfilename presence) and perform AofManifest::initialize() later in
the writer-start code path after recovery completes.
- Around line 652-675: The snapshot currently walks live databases while shards
may continue enqueuing Append messages, causing commands to land both in the
base snapshot and later incremental AOF; implement a rewrite barrier/COW buffer
around the snapshot boundary: when beginning the snapshot (before iterating DB
locks in the block that builds snapshot, e.g., where Database::new and
guard.read() are used), atomically switch the append stream to a per-rewrite COW
buffer (or mark a barrier sequence number) so subsequent Append messages are
buffered instead of being written to the old incr; take the snapshot from the
live data while the buffer collects new appends, then call manifest.advance and
open the new incr file, and finally flush the buffered appends into that new
incr (or reconcile by sequence number) so no command is duplicated between the
base rdb_bytes and the incremental file opened via
std::fs::OpenOptions::new().open(&new_incr).
- Around line 308-317: The current branch that detects a MOON RDB preamble calls
crate::persistence::rdb::load_from_bytes(data) but on Err(it) it falls back to
RESP by returning (0,0); instead fail fast: when data.starts_with(b"MOON") and
load_from_bytes returns Err, propagate or return an error from the surrounding
function (or panic/terminate replay) so replay aborts rather than resetting
resp_start to 0; update the rdb preamble handling around the rdb_keys/resp_start
assignment (the match on crate::persistence::rdb::load_from_bytes) to return
Err(e) (or call the existing error-return path) and remove the fallback branch
that returns (0,0).

In `@src/persistence/rdb.rs`:
- Around line 665-710: In load_from_bytes(), mirror the same strict validation
that load() performs: after reading the version byte (variable version) validate
it against the supported RDB version(s) and return an RdbError::Corrupted (or
convert into the same error type used in load()) if the version is
unknown/unsupported; likewise when handling the DB_SELECTOR tag ensure the
parsed db_idx (db_idx[0]) is validated against databases.len() and return an
RdbError::Corrupted (or appropriate Io/Corrupted error) instead of silently
clamping/accepting out-of-range values—update the logic around current_db and
any pre-sizing to fail fast on invalid preamble data, using the same error
messages/types load() uses for consistency.

In `@src/server/conn/handler_monoio.rs`:
- Around line 1198-1210: The handler_monoio.rs file has grown too large; extract
the AOF-related command handling into a new submodule to keep the file under
1500 lines. Create a new module (e.g., aof_commands.rs) that exposes a function
(e.g., handle_aof_command) which accepts the same inputs used here (cmd bytes,
aof_tx: Option<...>, shard_databases: ..., responses: &mut Vec<Frame>) and
contains the BGREWRITEAOF branch logic (the cmd.eq_ignore_ascii_case check, the
aof_tx Some/None push behavior and use of
crate::command::persistence::bgrewriteaof_start_sharded). In handler_monoio.rs
replace the inline branch with a call to that new function, add a mod
declaration and any necessary use imports, and ensure types and cloning
(shard_databases.clone()) and behavior remain unchanged so existing
tests/passages still work.
- Around line 1198-1210: The BGREWRITEAOF branch currently runs before the ACL
gate and thus skips authorization; move the BGREWRITEAOF handling so it executes
after the existing ACL check (or explicitly run the same ACL check used for
other commands) instead of before it. Specifically, ensure the code that
inspects cmd.eq_ignore_ascii_case(b"BGREWRITEAOF") and calls
crate::command::persistence::bgrewriteaof_start_sharded(tx,
shard_databases.clone()) only executes if the ACL check for the BGREWRITEAOF
command has passed (use the same ACL helper/logic used at the ACL gate around
cmd), or replicate that check inline before pushing the response when aof_tx is
Some.

In `@src/server/conn/handler_sharded.rs`:
- Around line 1230-1242: The file handler_sharded.rs has grown too large;
extract the persistence-related command routing (e.g., BGREWRITEAOF handling
that references aof_tx, shard_databases, bgrewriteaof_start_sharded, responses
and Frame::Error) into a new submodule (for example a persistence.rs under the
same module or a handler_sharded/persistence mod), move the routing logic into a
small function like handle_persistence_command(tx, shard_databases, cmd,
responses) and export it, then replace the inline block in handler_sharded.rs
with a call to that new function and add the appropriate mod/import statements
so types like Frame, Bytes, and bgrewriteaof_start_sharded remain in scope.
Ensure tests/build still pass and keep public/private visibility consistent.

In `@src/storage/compact_value.rs`:
- Around line 341-342: The doc comment for as_bytes_mut is incorrect: it says
"returns the underlying Bytes" but the function returns Option<&mut Vec<u8>>
from the HeapString backing store; update the documentation for the method
as_bytes_mut (and any mention of HeapString) to state that it returns a mutable
reference to the underlying Vec<u8> (or Option<&mut Vec<u8>>) and clarify that
the Vec can be replaced but not mutated in-place as appropriate.

---

Nitpick comments:
In `@src/persistence/rdb.rs`:
- Around line 312-734: The file is too large and the new fast-load logic should
be extracted into a submodule; move the counting/skip helpers and zero-copy
loader into a new module (e.g., rdb::fast_load) by extracting functions
count_entries_per_db, skip_entry, read_u32_raw, skip_bytes_field, and
read_entry_zero_copy into that module, update their visibility (pub(crate) as
needed), import them from load_from_bytes, and keep load_from_bytes in the
parent rdb.rs to orchestrate reading/CRC and DB insertion; ensure any referenced
types (Entry, RedisValue, StreamId, Database, MoonError, constants like
EOF_MARKER/DB_SELECTOR/TYPE_*) are re-exported or brought into scope in the new
module and update mod declarations and use statements and run tests to confirm
no public-API changes.

In `@src/shard/shared_databases.rs`:
- Around line 114-120: The method all_shard_dbs() exposes the internal shards:
Vec<RwLock<Database>> layout; instead create a narrower API for callers
(especially the AOF rewrite path) by replacing the accessor with a controlled
traversal helper such as pub fn for_each_db<F: FnMut(&RwLock<Database>)>(&self,
f: F) { for shard in &self.shards { for db in shard { f(db) } } } or by
returning a custom iterator type (e.g., AllDbsIter) that yields
&RwLock<Database> while keeping self.shards private; update callers to use
for_each_db or the iterator instead of relying on &[Vec<RwLock<Database>>].

In `@src/storage/db.rs`:
- Around line 406-435: Extract the restore-only helpers insert_for_load,
recalculate_memory, and reserve from the large Database impl in db.rs into a new
submodule (e.g., bulk_load) as an impl block for the same Database type; create
a new db/bulk_load.rs with the three methods (keeping signatures and visibility)
and add a mod bulk_load; use self::bulk_load or pub(crate) as appropriate so
callers in the restore path still find them; update any use/imports in modules
that referenced these methods and run tests to ensure visibility
(pub/pub(crate)) matches existing callers so behavior is unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: eb87cae7-9505-4f2b-98de-5b02055780ad

📥 Commits

Reviewing files that changed from the base of the PR and between 6a5de2f and eb2610e.

📒 Files selected for processing (16)
  • src/command/persistence.rs
  • src/main.rs
  • src/persistence/aof.rs
  • src/persistence/aof_manifest.rs
  • src/persistence/mod.rs
  • src/persistence/rdb.rs
  • src/server/conn/handler_monoio.rs
  • src/server/conn/handler_sharded.rs
  • src/server/listener.rs
  • src/shard/dispatch.rs
  • src/shard/event_loop.rs
  • src/shard/shared_databases.rs
  • src/storage/compact_value.rs
  • src/storage/db.rs
  • tests/integration.rs
  • tests/replication_test.rs

Comment thread src/main.rs
Comment on lines +227 to +257
let base_dir = std::path::PathBuf::from(dir);
let target_dbs = &mut shards[0].databases;

if let Some(manifest) = AofManifest::load(&base_dir) {
// Multi-part AOF: load base RDB + replay incremental RESP
match moon::persistence::aof_manifest::replay_multi_part(
target_dbs,
&manifest,
&DispatchReplayEngine,
) {
Ok(n) => info!(
"AOF loaded (multi-part seq {}): {} keys/commands",
manifest.seq, n
),
Err(e) => tracing::error!("AOF multi-part load failed: {}", e),
}
} else {
// Legacy single-file AOF (backward compatible)
let aof_path = base_dir.join(&config.appendfilename);
if aof_path.exists() {
match aof::replay_aof(
target_dbs,
&aof_path,
&DispatchReplayEngine,
) {
Ok(n) => info!(
"AOF loaded (legacy): {} commands from {}",
n, aof_path.display()
),
Err(e) => tracing::error!("AOF load failed: {}", e),
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't replay the merged AOF into only shards[0].

src/shard/mod.rs:30-55 and src/shard/mod.rs:62-95 show that each shard owns its own databases and has already been restored independently before this block runs. Replaying the manifest/base into &mut shards[0].databases routes all recovered AOF data to shard 0 and leaves shards 1..N stale. Load into temporary databases and redistribute by shard, or make the replay path shard-aware.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.rs` around lines 227 - 257, The current code replays
AofManifest::load and aof::replay_aof into &mut shards[0].databases
(target_dbs), which routes all recovered commands to shard 0; update the replay
logic so it is shard-aware: call
moon::persistence::aof_manifest::replay_multi_part and aof::replay_aof into a
temporary Databases container (not shards[0]) or into per-shard databases, then
iterate the recovered keys/commands and redistribute them into the correct
shard-owned databases (shards[i].databases) based on the same sharding function
used elsewhere; ensure DispatchReplayEngine usage remains but apply the shard
mapping when inserting/committing replayed entries so shards 1..N are populated
correctly.

Comment thread src/persistence/aof_manifest.rs
Comment thread src/persistence/aof.rs Outdated
Comment thread src/persistence/aof.rs
Comment thread src/persistence/aof.rs Outdated
Comment thread src/persistence/rdb.rs
Comment thread src/server/conn/handler_monoio.rs Outdated
Comment on lines +1230 to +1242
if cmd.eq_ignore_ascii_case(b"BGREWRITEAOF") {
if let Some(ref tx) = aof_tx {
responses.push(crate::command::persistence::bgrewriteaof_start_sharded(
tx,
shard_databases.clone(),
));
} else {
responses.push(Frame::Error(Bytes::from_static(
b"ERR AOF is not enabled",
)));
}
continue;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

This module should be split to satisfy the repo max file-size rule.

The command-routing surface keeps growing in a file already over the cap.

As per coding guidelines "No single .rs file should exceed 1500 lines; split into submodules if approaching this limit".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded.rs` around lines 1230 - 1242, The file
handler_sharded.rs has grown too large; extract the persistence-related command
routing (e.g., BGREWRITEAOF handling that references aof_tx, shard_databases,
bgrewriteaof_start_sharded, responses and Frame::Error) into a new submodule
(for example a persistence.rs under the same module or a
handler_sharded/persistence mod), move the routing logic into a small function
like handle_persistence_command(tx, shard_databases, cmd, responses) and export
it, then replace the inline block in handler_sharded.rs with a call to that new
function and add the appropriate mod/import statements so types like Frame,
Bytes, and bgrewriteaof_start_sharded remain in scope. Ensure tests/build still
pass and keep public/private visibility consistent.

Comment thread src/storage/compact_value.rs Outdated
Comment thread src/storage/db.rs
Comment on lines +430 to +434
pub fn reserve(&mut self, additional: usize) {
if additional > self.data.len() {
let new_table = DashTable::with_capacity(additional);
self.data = new_table;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

reserve() currently erases populated databases.

DashTable::with_capacity() builds a brand-new empty table, so assigning it to self.data drops every existing entry. That becomes destructive as soon as this helper is called on a non-empty DB instead of a fresh load target.

Possible guard
 pub fn reserve(&mut self, additional: usize) {
-    if additional > self.data.len() {
+    if self.data.is_empty() && additional > 0 {
         let new_table = DashTable::with_capacity(additional);
         self.data = new_table;
     }
 }

…rsistence

- Add note that all benchmarks are ARM64 (macOS + Linux)
- Add Linux ARM64 benchmark table (11.7M GET/s, 5.4M SET/s, 2.15x Redis)
- Update persistence section with multi-part AOF, BGREWRITEAOF, fast RDB loader
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
README.md (1)

102-106: Consider moving implementation details to architecture section.

The persistence features are well-documented and match the code changes. However, Line 106's technical details ("pre-sized hash tables and direct Vec→CompactValue path") are quite specific for a high-level feature list. Consider simplifying to focus on user-facing benefits (e.g., "Fast RDB loader - 3x faster than Redis with optimized bulk loading") and moving the implementation details to the Architecture section or a dedicated ARCHITECTURE.md file.

📝 Proposed simplification
-- **Fast RDB loader** - 5.9M keys/sec with pre-sized hash tables and direct Vec→CompactValue path
+- **Fast RDB loader** - 3x faster than Redis (5.9M keys/sec) with optimized bulk loading

Then add implementation details to the Architecture section:

### RDB Loader Optimizations
- Pre-sized DashTable segments based on key count
- Direct Vec→CompactValue conversion (zero intermediate allocations)
- Skipped duplicate checks during bulk load
- Cached timestamps to avoid syscalls
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 102 - 106, Replace the hyper-specific phrase in the
README feature list ("Fast RDB loader - 5.9M keys/sec with pre-sized hash tables
and direct Vec→CompactValue path") with a concise, user-focused line like "Fast
RDB loader — significantly faster bulk loading than Redis" and move the
implementation details into a new or existing Architecture section (e.g.,
ARCHITECTURE.md or a "RDB Loader Optimizations" subsection). In that
Architecture section document the technical bullets from the review (pre-sized
DashTable segments, direct Vec→CompactValue conversion, skipped duplicate checks
during bulk load, cached timestamps) and reference the README line "Fast RDB
loader" so readers can drill down for details.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@README.md`:
- Line 102: Update the README entry that currently says "Multi-part AOF - Redis
7+ compatible format" to explicitly state that Moon's AOF is only inspired by
Redis 7+ but is incompatible: replace the compatibility claim with a short
explanation that Moon uses file naming like `moon.aof.<seq>.base.rdb`,
`moon.aof.<seq>.incr.aof` and a `moon.aof.manifest` with custom
`seq`/`base`/`incr` lines, whereas Redis 7+ expects
`appendonly.aof.<seq>.base.rdb`/`appendonly.aof.<seq>.incr.aof` and a manifest
with per-entry `file <name> seq <n> type <b|i>` lines; finally add an explicit
note that direct migration to Redis is not supported and the formats are
incompatible.

---

Nitpick comments:
In `@README.md`:
- Around line 102-106: Replace the hyper-specific phrase in the README feature
list ("Fast RDB loader - 5.9M keys/sec with pre-sized hash tables and direct
Vec→CompactValue path") with a concise, user-focused line like "Fast RDB loader
— significantly faster bulk loading than Redis" and move the implementation
details into a new or existing Architecture section (e.g., ARCHITECTURE.md or a
"RDB Loader Optimizations" subsection). In that Architecture section document
the technical bullets from the review (pre-sized DashTable segments, direct
Vec→CompactValue conversion, skipped duplicate checks during bulk load, cached
timestamps) and reference the README line "Fast RDB loader" so readers can drill
down for details.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2d9b1245-4bbe-4e0f-97b2-c3e9c4f91513

📥 Commits

Reviewing files that changed from the base of the PR and between eb2610e and 825bb8c.

📒 Files selected for processing (1)
  • README.md

Comment thread README.md Outdated
…ons)

- Run cargo fmt on all modified files
- Gate do_rewrite_single/do_rewrite_sharded with #[cfg(feature = "runtime-monoio")]
- Add #[allow(dead_code)] to read_bytes_zero_copy (retained for future use)
- CI Test timeout is pre-existing (vector recall benchmarks exceed 15min limit)
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
src/persistence/aof.rs (1)

785-802: ⚠️ Potential issue | 🔴 Critical

Same synchronization issue applies to tokio's sharded rewrite path.

The rewrite_aof_sharded_sync function (lines 793-801) has the same race condition as do_rewrite_sharded: iterating shard locks without a global barrier means concurrent writes can land in both the snapshot and subsequent appends.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/aof.rs` around lines 785 - 802, rewrite_aof_sharded_sync has
the same race as do_rewrite_sharded: it iterates per-shard read locks
(lock.read()) without a global barrier so concurrent writers can modify state
while you copy into merged_dbs; fix by acquiring exclusive/write locks for all
shards before copying (e.g., replace the per-shard read() with acquiring the
shard write lock for each lock returned by shard_dbs.all_shard_dbs(), copy
entries into merged_dbs using guard.data()/guard.base_timestamp(), then release
all write locks once the snapshot is complete) so the snapshot written by
Database::set reflects a consistent point-in-time view.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/persistence/aof_manifest.rs`:
- Around line 145-171: The current ordering writes the new base RDB first then
updates the manifest, which can leave an orphaned base if write_manifest()
fails; change advance() so it sets self.seq = new_seq and calls write_manifest()
before writing/renaming the base and creating the new incremental file, and add
rollback logic: if writing the base (base_path_seq/new_base via
std::fs::write/rename) or creating the incr (incr_path_seq) fails, restore the
previous seq and rewrite the manifest to its prior state to avoid a manifest/FS
mismatch; reference functions/fields: advance(), write_manifest(),
base_path_seq(), incr_path_seq(), and self.seq.
- Around line 268-286: The reported "commands replayed" count is being
incremented for skipped/non-command frames; remove the premature increments and
only increment count when a command is actually replayed. Specifically, in the
match handling for Frame (the branches matching Frame::Array and the inner
command name match for Frame::BulkString/Frame::SimpleString), delete the count
+= 1 statements in the skip branches and move/ensure a single count += 1 occurs
immediately after calling engine.replay_command(databases, cmd, cmd_args, &mut
selected_db) so that count reflects only successfully replayed commands.

In `@src/persistence/aof.rs`:
- Around line 740-755: rewrite_aof_sync currently takes a snapshot without
accounting for concurrently enqueued Append messages, which can race and end up
in both the RDB snapshot and the incremental AOF; fix by draining or capturing
pending Append messages from the AOF append queue before or during snapshot and
applying them to the temp Database instances so the snapshot reflects all
enqueued appends. Concretely, before building snapshot in rewrite_aof_sync,
read/peek/drain the append channel (Append messages) or acquire the same
append-enqueue mutex used by the command handler, collect those pending
commands, and apply them into the per-shard temp Database structures (the same
way Database::set is used) so the snapshot includes them; ensure you reference
the Append message type, the append queue/receiver used by the AOF writer, and
rewrite_aof_sync/Database when implementing this change.

---

Duplicate comments:
In `@src/persistence/aof.rs`:
- Around line 785-802: rewrite_aof_sharded_sync has the same race as
do_rewrite_sharded: it iterates per-shard read locks (lock.read()) without a
global barrier so concurrent writers can modify state while you copy into
merged_dbs; fix by acquiring exclusive/write locks for all shards before copying
(e.g., replace the per-shard read() with acquiring the shard write lock for each
lock returned by shard_dbs.all_shard_dbs(), copy entries into merged_dbs using
guard.data()/guard.base_timestamp(), then release all write locks once the
snapshot is complete) so the snapshot written by Database::set reflects a
consistent point-in-time view.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 0f27a001-c2f1-46c7-bf3e-c306ca8ee0cf

📥 Commits

Reviewing files that changed from the base of the PR and between 825bb8c and 9a40cda.

📒 Files selected for processing (6)
  • scripts/test-consistency.sh
  • src/main.rs
  • src/persistence/aof.rs
  • src/persistence/aof_manifest.rs
  • src/persistence/rdb.rs
  • src/server/conn/handler_monoio.rs
💤 Files with no reviewable changes (1)
  • scripts/test-consistency.sh
✅ Files skipped from review due to trivial changes (1)
  • src/server/conn/handler_monoio.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/main.rs
  • src/persistence/rdb.rs

Comment on lines +145 to +171
// 1. Write new base RDB (atomic: tmp + rename)
let new_base = self.base_path_seq(new_seq);
let tmp_base = new_base.with_extension("rdb.tmp");
std::fs::write(&tmp_base, rdb_bytes).map_err(|e| crate::error::AofError::Io {
path: tmp_base.clone(),
source: e,
})?;
std::fs::rename(&tmp_base, &new_base).map_err(|e| {
crate::error::AofError::RewriteFailed {
detail: format!("rename base: {}", e),
}
})?;

// 2. Create empty new incremental file
let new_incr = self.incr_path_seq(new_seq);
std::fs::File::create(&new_incr).map_err(|e| crate::error::AofError::Io {
path: new_incr.clone(),
source: e,
})?;

// 3. Update manifest (atomic)
self.seq = new_seq;
self.write_manifest()
.map_err(|e| crate::error::AofError::Io {
path: self.manifest_path(),
source: e,
})?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential inconsistency if manifest write fails after base RDB is written.

If manifest.advance() successfully writes the new base RDB (lines 145-156) but write_manifest() fails (lines 167-171), the system will have orphaned files: a new base.rdb exists on disk but the manifest still references the old sequence. On restart, load() returns the old seq, and recovery uses stale files while the new base is ignored.

Consider reversing the order: update the manifest before deleting old files, but this is already the case. The issue is that the base RDB is written before the manifest is updated. A more robust approach would be to write the manifest first (pointing to the new seq), then write the base RDB—though this has its own tradeoffs.

This is a minor edge case since the old data remains recoverable, but worth noting for durability guarantees.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/aof_manifest.rs` around lines 145 - 171, The current ordering
writes the new base RDB first then updates the manifest, which can leave an
orphaned base if write_manifest() fails; change advance() so it sets self.seq =
new_seq and calls write_manifest() before writing/renaming the base and creating
the new incremental file, and add rollback logic: if writing the base
(base_path_seq/new_base via std::fs::write/rename) or creating the incr
(incr_path_seq) fails, restore the previous seq and rewrite the manifest to its
prior state to avoid a manifest/FS mismatch; reference functions/fields:
advance(), write_manifest(), base_path_seq(), incr_path_seq(), and self.seq.

Comment on lines +268 to +286
let (cmd, cmd_args) = match &frame {
Frame::Array(arr) if !arr.is_empty() => {
let name = match &arr[0] {
Frame::BulkString(s) => s.as_ref(),
Frame::SimpleString(s) => s.as_ref(),
_ => {
count += 1;
continue;
}
};
(name as &[u8], &arr[1..])
}
_ => {
count += 1;
continue;
}
};
engine.replay_command(databases, cmd, cmd_args, &mut selected_db);
count += 1;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Counting non-command frames as replayed commands.

The function documents returning "commands replayed", but lines 274-275 and 281-282 increment count for frames that are skipped (non-BulkString command names, non-Array frames). These aren't actually replayed—they're discarded. This inflates the reported count and could mislead callers about actual recovery state.

Remove count increments for skipped frames
                     let name = match &arr[0] {
                         Frame::BulkString(s) => s.as_ref(),
                         Frame::SimpleString(s) => s.as_ref(),
                         _ => {
-                            count += 1;
                             continue;
                         }
                     };
                     (name as &[u8], &arr[1..])
                 }
                 _ => {
-                    count += 1;
                     continue;
                 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/aof_manifest.rs` around lines 268 - 286, The reported
"commands replayed" count is being incremented for skipped/non-command frames;
remove the premature increments and only increment count when a command is
actually replayed. Specifically, in the match handling for Frame (the branches
matching Frame::Array and the inner command name match for
Frame::BulkString/Frame::SimpleString), delete the count += 1 statements in the
skip branches and move/ensure a single count += 1 occurs immediately after
calling engine.replay_command(databases, cmd, cmd_args, &mut selected_db) so
that count reflects only successfully replayed commands.

Comment thread src/persistence/aof.rs
Comment on lines +740 to +755
fn rewrite_aof_sync(db: &SharedDatabases, aof_path: &Path) -> Result<(), MoonError> {
// Snapshot under read locks, build temp Database objects for RDB serialization
let snapshot: Vec<Database> = db
.iter()
.map(|lock| {
let guard = lock.read();
let mut temp = Database::new();
let now_ms = current_time_ms();
for (k, v) in guard.data().iter() {
if !v.is_expired_at(guard.base_timestamp(), now_ms) {
temp.set(k.to_bytes(), v.clone());
}
}
temp
})
.collect();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Single-shard rewrite has same potential race with append stream.

While single-shard mode serializes command execution through one event loop, the AOF writer task runs separately. During the snapshot (lines 742-755), new Append messages can still be enqueued from the command handler. These could land in both the RDB snapshot (if the affected database hasn't been iterated yet) and the new incremental file.

The severity is lower than sharded mode since command execution is serialized, but the window still exists between command completion enqueueing Append and the snapshot iteration reaching that database.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/aof.rs` around lines 740 - 755, rewrite_aof_sync currently
takes a snapshot without accounting for concurrently enqueued Append messages,
which can race and end up in both the RDB snapshot and the incremental AOF; fix
by draining or capturing pending Append messages from the AOF append queue
before or during snapshot and applying them to the temp Database instances so
the snapshot reflects all enqueued appends. Concretely, before building snapshot
in rewrite_aof_sync, read/peek/drain the append channel (Append messages) or
acquire the same append-enqueue mutex used by the command handler, collect those
pending commands, and apply them into the per-shard temp Database structures
(the same way Database::set is used) so the snapshot includes them; ensure you
reference the Append message type, the append queue/receiver used by the AOF
writer, and rewrite_aof_sync/Database when implementing this change.

1. ACL bypass: persistence commands (BGSAVE, SAVE, LASTSAVE, BGREWRITEAOF)
   now execute AFTER the ACL permission check in both handler_monoio.rs and
   handler_sharded.rs, preventing unprivileged users from triggering them.

2. RDB preamble fail-fast: when AOF file starts with MOON magic but RDB
   load fails, propagate the error instead of falling back to RESP parsing
   (which would produce garbage results from binary data).

3. load_from_bytes validation: validate RDB version byte and db_idx bounds
   in the AOF preamble loader, matching the strict checks in load().

4. Manifest init race: AOF writer no longer creates appendonlydir/ manifest
   on startup. main.rs creates it AFTER recovery completes, eliminating the
   race where the writer could create a manifest before legacy AOF migration
   is detected.

5. Doc fix: as_bytes_mut() doc now correctly says Option<&mut Vec<u8>>
   instead of claiming it returns Bytes.

6. RDB base error propagation: replay_multi_part() now returns Err when
   base RDB load fails, instead of applying incremental deltas against a
   missing/corrupt base (which gives wrong results).

7. Skip per-shard WAL when appendonly=yes: restore_from_persistence() takes
   skip_wal flag to avoid double-replay, since global AOF is the write
   source of truth when appendonly is enabled.
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

♻️ Duplicate comments (1)
src/main.rs (1)

228-249: ⚠️ Potential issue | 🔴 Critical

Don't replay the global AOF into only shards[0].

replay_multi_part() / replay_aof() are loading into &mut shards[0].databases, so every recovered write lands on shard 0 even though the live AOF contains writes from all shards. Recover into a temporary database slice and redistribute by shard before building ShardDatabases; you already have moon::persistence::rdb::distribute_loaded_to_shards() for that last step.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.rs` around lines 228 - 249, The current recovery loads both
multi-part and legacy AOF into &mut shards[0].databases (target_dbs), so all
recovered writes land on shard 0; instead, load the AOF into a temporary,
non-sharded database collection (e.g., a local Vec/HashMap for loaded databases)
by calling AofManifest::load +
moon::persistence::aof_manifest::replay_multi_part(...) or aof::replay_aof(...)
into that temp structure (using DispatchReplayEngine), then call
moon::persistence::rdb::distribute_loaded_to_shards(temp_loaded, &mut shards) to
redistribute entries across shards before constructing/using the existing
ShardDatabases; replace uses of target_dbs (shards[0].databases) with the temp
loader and the final call to distribute_loaded_to_shards.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/main.rs`:
- Around line 206-212: The code currently calls Shard::new(...) and
shard.restore_from_persistence(dir, skip_shard_wal) but ignores failures when
skip_shard_wal may have skipped WAL replay; change restore_from_persistence to
return/propagate a Result (or check its Result) and if persistence restore/AOF
replay fails while skip_shard_wal is true, abort startup (return Err from main
or call process::exit(1)) instead of continuing with snapshot-only state. Update
the code paths around Shard::new and shard.restore_from_persistence to handle
and propagate errors (and mirror the same fix in the other restore loop region
mentioned) so any restore error causes immediate process termination with a
clear error log.
- Around line 223-271: The current code eagerly calls
AofManifest::initialize(&base_dir) after recovery which can flip startups to
multi-part AOF even when only legacy appendfilename was loaded; modify the logic
in the second persistence_dir block (near AofManifest::load/is_none) to only
create/initialize the multi-part manifest after confirming migration or to
initialize the manifest seeded with the recovered in-memory state/sequence:
detect whether legacy AOF (config.appendfilename) was used (see the earlier
branch that calls aof::replay_aof and
moon::persistence::aof_manifest::replay_multi_part on
shards[0].databases/target_dbs), and if legacy AOF was loaded either (a) defer
creating AofManifest until you perform/flag a migration to multi-part, or (b)
call an initialize-with-seq routine (or extend AofManifest::initialize) to write
a manifest with a sequence/value that reflects the recovered state so subsequent
startups don’t ignore the legacy AOF; update the code paths around
AofManifest::load, AofManifest::initialize, aof::replay_aof, and
replay_multi_part accordingly.

In `@src/persistence/aof_manifest.rs`:
- Around line 208-229: The current logic treats a missing base RDB as
recoverable; instead, check the manifest sequence and fail when the base is
missing for sequences beyond the initial one. In the block that uses
manifest.base_path() and crate::persistence::rdb::load, call manifest.seq() (or
the manifest's sequence accessor) and if seq != 1 and base_path.exists() is
false, log an error and return Err (rather than warn and continue) so we don't
silently drop data when only an incremental AOF is present.

In `@src/persistence/aof.rs`:
- Around line 660-676: The snapshotting code incorrectly rebuilds temporary
Database instances (using Database::new() and temp.set(...)) which rebases entry
expiries to the new databases' base_timestamp and yields wrong TTLs when
save_to_bytes(&snapshot) serializes via each Database's base_timestamp; instead
capture and persist each DB's raw entries plus its base_timestamp (e.g.,
snapshot as Vec<(EntriesType, base_ts)>) and change the serialization call to
use those (entries, base_ts) tuples so save_to_bytes (or a new helper) writes
expiries relative to the original base_timestamp; update the map closure that
currently creates temp DBs (and the other similar blocks) to collect
(guard.data().clone() or entries iterator, guard.base_timestamp()) and then pass
that structure into rdb::save_to_bytes and manifest.advance.
- Around line 164-199: The AOF write/flush/sync calls in the AofMessage handlers
(AofMessage::Append, Shutdown, Rewrite, RewriteSharded) currently ignore Result
values (file.write_all, file.flush, file.sync_data), which can silently drop
persistence failures; update these call sites to check and handle errors: if
write_all/flush/sync_data returns Err, log the error with context (including
manifest.seq and which operation), set/mark persistent-failure state or send a
shutdown/error message to stop acknowledging writes (or break the loop) so
persistence doesn't silently fail, and ensure
do_rewrite_single/do_rewrite_sharded error handling propagates fatal I/O errors
similarly instead of only logging; make sure last_fsync/FsyncPolicy logic
preserves behavior but reacts to any sync failures by handling the Result rather
than discarding it.

In `@src/persistence/rdb.rs`:
- Around line 281-299: The match on read_entry_zero_copy currently logs a
warning and breaks on Err(e), allowing the loader to later return Ok despite
partial/failed load; change this to propagate the error instead of breaking:
when read_entry_zero_copy returns Err(e) return Err(...) (or convert into the
crate's RdbLoadError) including context (cursor.position(), e, total_keys) so
the caller sees a failure; apply the same change to the other parser block (the
similar match at the 804-815 region) so both code paths fail-fast rather than
returning Ok after a corrupted entry.
- Around line 689-824: The file exceeds the size guideline because helpers
around byte-slice loading are embedded; extract load_from_bytes, the pre-pass
scanner that finds EOF+CRC (the loop that sets rdb_end), and raw-skip/read
helpers (e.g., count_entries_per_db and read_entry_zero_copy) into a new
submodule (e.g., rdb::bytes or rdb::preamble): move their implementations into a
new file, make the necessary functions pub(crate) or pub as needed, re-export or
adjust use statements so current_time_ms, EOF_MARKER, DB_SELECTOR, RDB_MAGIC,
RDB_VERSION, and types like Database, MoonError, RdbError, Hasher, Bytes, Cursor
still resolve, and update callers in the original module to call the new
submodule functions (e.g., bytes::load_from_bytes or re-export load_from_bytes
from mod.rs) so behavior and visibility are unchanged.
- Around line 704-724: The EOF+CRC scan can index past the buffer for
short/corrupt input; update the scanning logic around EOF_MARKER so we never
slice beyond data. Specifically, in the loop that sets rdb_end (the for i in
5..data.len().saturating_sub(3) loop), tighten the bounds or add an explicit
guard so you only attempt to read checksum_bytes when i + 5 <= data.len() (e.g.,
iterate to data.len().saturating_sub(4) or check if
data.get(i+1..i+5).is_some()). Ensure the code that constructs checksum_bytes
and computes Hasher::new()/hasher.update(payload) only runs when the 4 CRC bytes
are actually available, and return the Corrupted error for too-short inputs
instead of panicking.

In `@src/storage/compact_value.rs`:
- Around line 183-186: Public heap_string currently delegates to heap_string_vec
which contains a debug-only assertion that len > SSO_MAX_LEN, causing
debug/release behavior mismatch for short slices; update heap_string to
explicitly handle short inputs (either route to the small-string constructor,
e.g., small_string or compact inline branch, or validate and panic/document the
precondition) by checking data.len() against SSO_MAX_LEN and only calling
heap_string_vec when data.len() > SSO_MAX_LEN, and add a clear doc comment on
heap_string about the expected length behavior; reference heap_string,
heap_string_vec, SSO_MAX_LEN, and the small-string constructor to locate the
spots to change.

---

Duplicate comments:
In `@src/main.rs`:
- Around line 228-249: The current recovery loads both multi-part and legacy AOF
into &mut shards[0].databases (target_dbs), so all recovered writes land on
shard 0; instead, load the AOF into a temporary, non-sharded database collection
(e.g., a local Vec/HashMap for loaded databases) by calling AofManifest::load +
moon::persistence::aof_manifest::replay_multi_part(...) or aof::replay_aof(...)
into that temp structure (using DispatchReplayEngine), then call
moon::persistence::rdb::distribute_loaded_to_shards(temp_loaded, &mut shards) to
redistribute entries across shards before constructing/using the existing
ShardDatabases; replace uses of target_dbs (shards[0].databases) with the temp
loader and the final call to distribute_loaded_to_shards.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 0f2ffc5b-f6b6-4c52-abdb-49321efa0f33

📥 Commits

Reviewing files that changed from the base of the PR and between 9a40cda and b170861.

📒 Files selected for processing (8)
  • src/main.rs
  • src/persistence/aof.rs
  • src/persistence/aof_manifest.rs
  • src/persistence/rdb.rs
  • src/server/conn/handler_monoio.rs
  • src/server/conn/handler_sharded.rs
  • src/shard/mod.rs
  • src/storage/compact_value.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/server/conn/handler_sharded.rs

Comment thread src/main.rs
Comment on lines +206 to +212
let skip_shard_wal = config.appendonly == "yes";
let mut shards: Vec<Shard> = (0..num_shards)
.map(|id| {
let mut shard =
Shard::new(id, num_shards, config.databases, config.to_runtime_config());
if let Some(ref dir) = persistence_dir {
shard.restore_from_persistence(dir);
shard.restore_from_persistence(dir, skip_shard_wal);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Abort startup instead of continuing with snapshot-only state.

skip_shard_wal disables per-shard WAL replay before this block runs. If AOF replay then fails, the process just logs and boots with stale snapshots, silently dropping writes newer than the last snapshot.

💥 Suggested fail-fast change
-                    Err(e) => tracing::error!("AOF multi-part load failed: {}", e),
+                    Err(e) => {
+                        return Err(anyhow::anyhow!("AOF multi-part load failed: {}", e));
+                    }
@@
-                        Err(e) => tracing::error!("AOF load failed: {}", e),
+                        Err(e) => {
+                            return Err(anyhow::anyhow!("AOF load failed: {}", e));
+                        }

Also applies to: 223-255

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.rs` around lines 206 - 212, The code currently calls Shard::new(...)
and shard.restore_from_persistence(dir, skip_shard_wal) but ignores failures
when skip_shard_wal may have skipped WAL replay; change restore_from_persistence
to return/propagate a Result (or check its Result) and if persistence
restore/AOF replay fails while skip_shard_wal is true, abort startup (return Err
from main or call process::exit(1)) instead of continuing with snapshot-only
state. Update the code paths around Shard::new and
shard.restore_from_persistence to handle and propagate errors (and mirror the
same fix in the other restore loop region mentioned) so any restore error causes
immediate process termination with a clear error log.

Comment thread src/main.rs
Comment on lines +223 to +271
if config.appendonly == "yes" {
if let Some(ref dir) = persistence_dir {
use moon::persistence::aof_manifest::AofManifest;
use moon::persistence::replay::DispatchReplayEngine;

let base_dir = std::path::PathBuf::from(dir);
let target_dbs = &mut shards[0].databases;

if let Some(manifest) = AofManifest::load(&base_dir) {
// Multi-part AOF: load base RDB + replay incremental RESP
match moon::persistence::aof_manifest::replay_multi_part(
target_dbs,
&manifest,
&DispatchReplayEngine,
) {
Ok(n) => info!(
"AOF loaded (multi-part seq {}): {} keys/commands",
manifest.seq, n
),
Err(e) => tracing::error!("AOF multi-part load failed: {}", e),
}
} else {
// Legacy single-file AOF (backward compatible)
let aof_path = base_dir.join(&config.appendfilename);
if aof_path.exists() {
match aof::replay_aof(target_dbs, &aof_path, &DispatchReplayEngine) {
Ok(n) => info!(
"AOF loaded (legacy): {} commands from {}",
n,
aof_path.display()
),
Err(e) => tracing::error!("AOF load failed: {}", e),
}
}
}
}

// Ensure multi-part AOF manifest exists for the writer thread.
// Recovery is now complete, so it's safe to create the manifest
// without racing against legacy AOF migration detection.
if let Some(ref dir) = persistence_dir {
use moon::persistence::aof_manifest::AofManifest;
let base_dir = std::path::PathBuf::from(dir);
if AofManifest::load(&base_dir).is_none() {
if let Err(e) = AofManifest::initialize(&base_dir) {
tracing::error!("Failed to initialize AOF manifest after recovery: {}", e);
}
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't create a multi-part manifest until the recovered state has actually been migrated.

This block flips future startups to appendonlydir/ even though the recovered dataset may only exist in the legacy appendfilename. Under runtime-tokio it's worse: the writer still appends only to the legacy file, so the next restart will prefer an empty/nonexistent multi-part set and ignore the real AOF. Either seed seq 1 from the current in-memory state immediately, or keep using legacy recovery/writes until that migration has happened.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.rs` around lines 223 - 271, The current code eagerly calls
AofManifest::initialize(&base_dir) after recovery which can flip startups to
multi-part AOF even when only legacy appendfilename was loaded; modify the logic
in the second persistence_dir block (near AofManifest::load/is_none) to only
create/initialize the multi-part manifest after confirming migration or to
initialize the manifest seeded with the recovered in-memory state/sequence:
detect whether legacy AOF (config.appendfilename) was used (see the earlier
branch that calls aof::replay_aof and
moon::persistence::aof_manifest::replay_multi_part on
shards[0].databases/target_dbs), and if legacy AOF was loaded either (a) defer
creating AofManifest until you perform/flag a migration to multi-part, or (b)
call an initialize-with-seq routine (or extend AofManifest::initialize) to write
a manifest with a sequence/value that reflects the recovered state so subsequent
startups don’t ignore the legacy AOF; update the code paths around
AofManifest::load, AofManifest::initialize, aof::replay_aof, and
replay_multi_part accordingly.

Comment on lines +208 to +229
// Load base RDB
let base_path = manifest.base_path();
if base_path.exists() {
match crate::persistence::rdb::load(databases, &base_path) {
Ok(n) => {
info!(
"AOF base RDB loaded: {} keys from {}",
n,
base_path.display()
);
total += n;
}
Err(e) => {
// Base RDB is corrupt or unreadable — applying incremental
// deltas on top of missing/corrupt base gives wrong results.
error!("AOF base RDB load failed: {}", e);
return Err(e);
}
}
} else {
warn!("AOF base RDB not found: {}", base_path.display());
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't treat a missing base as recoverable once seq has advanced.

For seq 1, missing base.rdb is fine because the incremental file still holds the whole history. After the first rewrite, though, moon.aof.<seq>.incr.aof is only the delta after moon.aof.<seq>.base.rdb; warning and continuing here silently drops most of the dataset.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/aof_manifest.rs` around lines 208 - 229, The current logic
treats a missing base RDB as recoverable; instead, check the manifest sequence
and fail when the base is missing for sequences beyond the initial one. In the
block that uses manifest.base_path() and crate::persistence::rdb::load, call
manifest.seq() (or the manifest's sequence accessor) and if seq != 1 and
base_path.exists() is false, log an error and return Err (rather than warn and
continue) so we don't silently drop data when only an incremental AOF is
present.

Comment thread src/persistence/aof.rs Outdated
Comment thread src/persistence/aof.rs Outdated
Comment thread src/persistence/rdb.rs
Comment on lines +281 to 299
match read_entry_zero_copy(&mut cursor, type_tag, &shared_buf, now_secs) {
Ok((key, entry)) => {
// Skip entries whose TTL is already in the past
if entry.has_expiry() && entry.is_expired_at(current_secs(), now_ms) {
if entry.has_expiry() && entry.is_expired_at(now_secs, now_ms) {
continue;
}
if current_db < databases.len() {
databases[current_db].set(key, entry);
// Fix #3: skip duplicate check + memory accounting
databases[current_db].insert_for_load(key, entry);
total_keys += 1;
}
}
Err(e) => {
let offset = cursor.position();
tracing::warn!(
"RDB load: skipping corrupted entry at offset {}: {}",
offset,
e
);
skipped_entries += 1;
// Cannot reliably skip to next entry in a variable-length
// format without framing, so break out of the loop.
// Entries loaded so far are valid (checksum passed).
tracing::warn!(
"RDB load: stopping mid-stream recovery after {} skipped entries; \
{} keys loaded successfully",
skipped_entries,
"RDB load: corrupted entry at offset {}: {}. {} keys loaded.",
cursor.position(),
e,
total_keys
);
break;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Corrupt RDB entries are still reported as a successful load.

Both parsers stop on read_entry_zero_copy() errors and then return Ok(...), which means a checksummed-but-malformed snapshot/preamble is accepted after only a prefix of the dataset was loaded. That defeats the fail-fast behavior the new AOF/base replay paths depend on.

Also applies to: 804-815

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/rdb.rs` around lines 281 - 299, The match on
read_entry_zero_copy currently logs a warning and breaks on Err(e), allowing the
loader to later return Ok despite partial/failed load; change this to propagate
the error instead of breaking: when read_entry_zero_copy returns Err(e) return
Err(...) (or convert into the crate's RdbLoadError) including context
(cursor.position(), e, total_keys) so the caller sees a failure; apply the same
change to the other parser block (the similar match at the 804-815 region) so
both code paths fail-fast rather than returning Ok after a corrupted entry.

Comment thread src/persistence/rdb.rs
Comment on lines +689 to +824
/// Load an RDB snapshot from a byte slice (for AOF RDB-preamble format).
///
/// Returns `(keys_loaded, bytes_consumed)`. The caller can use `bytes_consumed`
/// to find the start of any RESP commands appended after the RDB preamble.
pub fn load_from_bytes(
databases: &mut [Database],
data: &[u8],
) -> Result<(usize, usize), MoonError> {
if data.len() < RDB_MAGIC.len() + 1 + 1 + 4 {
return Err(RdbError::Corrupted {
detail: "RDB preamble too small".into(),
}
.into());
}

// Find EOF_MARKER to determine RDB section length.
// The RDB section is: header + entries + EOF_MARKER(1) + CRC32(4).
// We scan for EOF_MARKER (0xFF) — the first one after the header that's
// immediately followed by a valid CRC32 of the preceding bytes.
let mut rdb_end = None;
// Start scanning after header (MOON + version = 5 bytes)
for i in 5..data.len().saturating_sub(3) {
if data[i] == EOF_MARKER {
let payload = &data[..=i]; // everything up to and including EOF_MARKER
let checksum_bytes = &data[i + 1..i + 5];
if checksum_bytes.len() == 4 {
let stored = u32::from_le_bytes([
checksum_bytes[0],
checksum_bytes[1],
checksum_bytes[2],
checksum_bytes[3],
]);
let mut hasher = Hasher::new();
hasher.update(payload);
if hasher.finalize() == stored {
rdb_end = Some(i + 5); // past CRC32
break;
}
}
}
}

let rdb_len = rdb_end.ok_or_else(|| {
MoonError::from(RdbError::Corrupted {
detail: "RDB preamble: no valid EOF+CRC found".into(),
})
})?;

// Load using the same logic as `load`, but from the byte slice
let payload = &data[..rdb_len - 4]; // exclude CRC32
let mut cursor = Cursor::new(payload);

// Skip magic + version
let mut magic = [0u8; 4];
cursor.read_exact(&mut magic).map_err(|e| RdbError::Io {
path: std::path::PathBuf::from("<aof-preamble>"),
source: e,
})?;
if &magic != RDB_MAGIC {
return Err(RdbError::Corrupted {
detail: "invalid RDB magic in AOF preamble".into(),
}
.into());
}
let mut version = [0u8; 1];
cursor.read_exact(&mut version).map_err(|e| RdbError::Io {
path: std::path::PathBuf::from("<aof-preamble>"),
source: e,
})?;
if version[0] != RDB_VERSION {
return Err(RdbError::UnsupportedVersion {
version: version[0] as u32,
}
.into());
}

let now_ms = current_time_ms();
let now_secs = (now_ms / 1000) as u32;
let shared_buf = Bytes::copy_from_slice(data);
let mut total_keys = 0usize;
let mut current_db: usize = 0;

// Pre-size DashTables
let entry_counts = count_entries_per_db(&cursor, databases.len());
for (db_idx, &count) in entry_counts.iter().enumerate() {
if count > 0 && db_idx < databases.len() {
databases[db_idx].reserve(count);
}
}

loop {
let mut tag = [0u8; 1];
if cursor.read_exact(&mut tag).is_err() {
break;
}
match tag[0] {
EOF_MARKER => break,
DB_SELECTOR => {
let mut db_idx = [0u8; 1];
cursor.read_exact(&mut db_idx).map_err(|e| RdbError::Io {
path: std::path::PathBuf::from("<aof-preamble>"),
source: e,
})?;
current_db = db_idx[0] as usize;
if current_db >= databases.len() {
return Err(RdbError::Corrupted {
detail: format!(
"RDB preamble references database {} but only {} configured",
current_db,
databases.len()
),
}
.into());
}
}
type_tag => match read_entry_zero_copy(&mut cursor, type_tag, &shared_buf, now_secs) {
Ok((key, entry)) => {
if entry.has_expiry() && entry.is_expired_at(now_secs, now_ms) {
continue;
}
if current_db < databases.len() {
databases[current_db].insert_for_load(key, entry);
total_keys += 1;
}
}
Err(_) => break,
},
}
}

for db in databases.iter_mut() {
db.recalculate_memory();
}

Ok((total_keys, rdb_len))
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Please extract the byte-slice loader helpers into a submodule.

load_from_bytes(), the pre-pass scanner, and the raw-skip helpers are already a standalone boundary, and this module is now well past the repository's 1500-line cap. As per coding guidelines "No single .rs file should exceed 1500 lines; split into submodules if approaching this limit".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/rdb.rs` around lines 689 - 824, The file exceeds the size
guideline because helpers around byte-slice loading are embedded; extract
load_from_bytes, the pre-pass scanner that finds EOF+CRC (the loop that sets
rdb_end), and raw-skip/read helpers (e.g., count_entries_per_db and
read_entry_zero_copy) into a new submodule (e.g., rdb::bytes or rdb::preamble):
move their implementations into a new file, make the necessary functions
pub(crate) or pub as needed, re-export or adjust use statements so
current_time_ms, EOF_MARKER, DB_SELECTOR, RDB_MAGIC, RDB_VERSION, and types like
Database, MoonError, RdbError, Hasher, Bytes, Cursor still resolve, and update
callers in the original module to call the new submodule functions (e.g.,
bytes::load_from_bytes or re-export load_from_bytes from mod.rs) so behavior and
visibility are unchanged.

Comment thread src/persistence/rdb.rs
Comment on lines +183 to +186
/// Create a heap-allocated string CompactValue from a byte slice (copies data).
pub fn heap_string(data: &[u8]) -> Self {
Self::heap_string_vec(data.to_vec())
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Make heap_string behavior explicit for short inputs.

Line 185 delegates to a helper that assumes len > SSO_MAX_LEN (Line 206 via debug assertion), but the public API/docs at Line 183 do not communicate that precondition. This can create debug-vs-release behavioral differences for short slices.

💡 Suggested fix
-    /// Create a heap-allocated string CompactValue from a byte slice (copies data).
+    /// Create a CompactValue from a byte slice (copies data).
+    /// Uses inline storage for inputs <= SSO_MAX_LEN.
     pub fn heap_string(data: &[u8]) -> Self {
-        Self::heap_string_vec(data.to_vec())
+        Self::heap_string_vec_direct(data.to_vec())
     }

Also applies to: 205-207

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/compact_value.rs` around lines 183 - 186, Public heap_string
currently delegates to heap_string_vec which contains a debug-only assertion
that len > SSO_MAX_LEN, causing debug/release behavior mismatch for short
slices; update heap_string to explicitly handle short inputs (either route to
the small-string constructor, e.g., small_string or compact inline branch, or
validate and panic/document the precondition) by checking data.len() against
SSO_MAX_LEN and only calling heap_string_vec when data.len() > SSO_MAX_LEN, and
add a clear doc comment on heap_string about the expected length behavior;
reference heap_string, heap_string_vec, SSO_MAX_LEN, and the small-string
constructor to locate the spots to change.

Two correctness fixes from code review:

1. AOF writer I/O errors: all write_all/flush/sync_data calls now check
   Results. On write failure, sets write_error flag to stop acknowledging
   appends (prevents silent data loss). Sync failures logged with seq number.
   Successful rewrite resets error state.

2. TTL rebase during BGREWRITEAOF: snapshot was rebuilding temp Database
   objects (Database::new() with fresh base_timestamp), causing TTL deltas
   to be computed against the wrong base. Now captures (entries, base_ts)
   tuples directly and uses rdb::save_snapshot_to_bytes() which preserves
   the original base_timestamp for correct absolute TTL serialization.

NOT fixed (verified as non-issues):
- Snapshot/append race: single-threaded recv loop is the synchronization
  point — no appends processed during snapshot
- Manifest advance ordering: base-first-then-manifest is correct (orphaned
  base is harmless, manifest-first would point to nonexistent file)
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (5)
src/persistence/rdb.rs (3)

729-864: 🛠️ Refactor suggestion | 🟠 Major

Split byte-slice loader logic into a submodule to satisfy file-size rule.

This file is now significantly beyond the repository’s 1500-line limit; extracting load_from_bytes + scan/skip helpers will improve maintainability and align with project standards.

As per coding guidelines No single \.rs` file should exceed 1500 lines; split into submodules if approaching this limit`.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/rdb.rs` around lines 729 - 864, The file is too large;
extract the AOF/RDB byte-slice loader into a submodule by moving load_from_bytes
and its related helpers (the EOF+CRC scanning loop, payload/CRC handling,
count_entries_per_db, and any helper that advances/reads entries like
read_entry_zero_copy and scan/skip helpers) into a new module, make the
functions public(crate) or re-exported so rdb.rs can call them, and replace the
in-file implementation with a thin wrapper that delegates to the new submodule;
ensure types used (Database, RdbError, Hasher, current_time_ms, Bytes) are
imported into the new module and update visibility/signatures so compilation and
tests pass.

750-754: ⚠️ Potential issue | 🔴 Critical

EOF+CRC scan can index past buffer bounds on corrupt preambles.

At Line 753, data[i + 1..i + 5] is reachable when fewer than 4 bytes remain after i, which can panic instead of returning a corruption error.

Suggested fix
-    for i in 5..data.len().saturating_sub(3) {
+    for i in 5..data.len().saturating_sub(4) {
         if data[i] == EOF_MARKER {
             let payload = &data[..=i]; // everything up to and including EOF_MARKER
-            let checksum_bytes = &data[i + 1..i + 5];
-            if checksum_bytes.len() == 4 {
+            if let Some(checksum_bytes) = data.get(i + 1..i + 5) {
                 let stored = u32::from_le_bytes([
                     checksum_bytes[0],
                     checksum_bytes[1],
                     checksum_bytes[2],
                     checksum_bytes[3],
                 ]);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/rdb.rs` around lines 750 - 754, The EOF+CRC scan can slice
past the buffer when fewer than 4 bytes remain after an EOF_MARKER; update the
loop in the scan (the for i in 5..... block that creates payload and
checksum_bytes) to first check that i + 5 <= data.len() (or equivalently that
data.len().saturating_sub(i + 1) >= 4) before taking checksum_bytes = &data[i +
1..i + 5]; alternatively use safe indexing (get(..)) and bail with a corruption
error if the 4 checksum bytes are not available, ensuring no panics occur on
corrupt preambles.

332-340: ⚠️ Potential issue | 🟠 Major

Corrupt entry parsing still returns partial success instead of failing the load.

Both loaders break on entry parse error and then return Ok(...), which can accept partially loaded/corrupt snapshots.

Suggested fix
-                    Err(e) => {
-                        tracing::warn!(
-                            "RDB load: corrupted entry at offset {}: {}. {} keys loaded.",
-                            cursor.position(),
-                            e,
-                            total_keys
-                        );
-                        break;
-                    }
+                    Err(e) => {
+                        return Err(RdbError::Corrupted {
+                            detail: format!(
+                                "corrupted entry at offset {} after {} keys: {}",
+                                cursor.position(),
+                                total_keys,
+                                e
+                            ),
+                        }
+                        .into());
+                    }
@@
-                Err(_) => break,
+                Err(e) => {
+                    return Err(RdbError::Corrupted {
+                        detail: format!(
+                            "AOF preamble corrupted entry at offset {} after {} keys: {}",
+                            cursor.position(),
+                            total_keys,
+                            e
+                        ),
+                    }
+                    .into());
+                }

Also applies to: 844-855

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/rdb.rs` around lines 332 - 340, The loader currently catches
entry parse errors in the match arm `Err(e) => { tracing::warn!(...); break; }`
(using `cursor.position()` and `total_keys`) and then returns `Ok(...)`,
allowing partial/corrupt snapshots to be treated as success; change this to
propagate a failure instead: replace the `warn + break` with an early return of
an Err constructed with contextual information (include the
offset/cursor.position() and the source error `e`) so the caller receives a
proper error; apply the same fix to the other identical handler around the
second occurrence (the block covering lines ~844-855) so both loaders fail on
corrupt entries rather than returning partial success.
src/persistence/aof.rs (2)

788-803: ⚠️ Potential issue | 🟠 Major

Tokio rewrite helpers still rebase TTL due temp Database::new() snapshots.

These paths rebuild temporary databases and serialize with save_to_bytes, which uses new base timestamps; TTL deltas can drift versus live state.

Suggested fix
-fn rewrite_aof_sync(db: &SharedDatabases, aof_path: &Path) -> Result<(), MoonError> {
-    let snapshot: Vec<Database> = db
+fn rewrite_aof_sync(db: &SharedDatabases, aof_path: &Path) -> Result<(), MoonError> {
+    let snapshot: Vec<(Vec<(CompactKey, Entry)>, u32)> = db
         .iter()
         .map(|lock| {
             let guard = lock.read();
-            let mut temp = Database::new();
+            let base_ts = guard.base_timestamp();
             let now_ms = current_time_ms();
+            let mut entries = Vec::new();
             for (k, v) in guard.data().iter() {
-                if !v.is_expired_at(guard.base_timestamp(), now_ms) {
-                    temp.set(k.to_bytes(), v.clone());
+                if !v.is_expired_at(base_ts, now_ms) {
+                    entries.push((k.clone(), v.clone()));
                 }
             }
-            temp
+            (entries, base_ts)
         })
         .collect();
-
-    let rdb_bytes = crate::persistence::rdb::save_to_bytes(&snapshot)?;
+    let rdb_bytes = crate::persistence::rdb::save_snapshot_to_bytes(&snapshot)?;

Also applies to: 835-851

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/aof.rs` around lines 788 - 803, The snapshot code creates
temp Database instances with Database::new(), which resets base timestamps and
rebases TTLs before calling crate::persistence::rdb::save_to_bytes; change the
temp creation to preserve the source guard's base timestamp (e.g., construct
temp with the same base timestamp or call a setter like
Database::with_base_timestamp(guard.base_timestamp()) or
temp.set_base_timestamp(guard.base_timestamp())) before copying entries so
is_expired_at/TTL calculations remain consistent; apply the same change to the
other identical snapshot block (the one around lines 835-851) so save_to_bytes
serializes using the original base timestamps rather than a freshly created one.

693-712: ⚠️ Potential issue | 🔴 Critical

Rewrite snapshot still races with concurrent append stream.

During rewrite, live DBs are read without a snapshot barrier/COW append buffer, so writes around the boundary can be captured in both the base snapshot and subsequent incremental AOF.

Also applies to: 747-760

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/aof.rs` around lines 693 - 712, The snapshot code races with
concurrent appends because it reads each DB via db.iter() using guard.read(),
calling base_timestamp() and data() separately, allowing writes to land in both
the captured snapshot and the incremental AOF; fix by creating an atomic
snapshot barrier per database before collecting entries — either acquire a lock
that blocks appends (e.g., upgrade to a write lock or call a freeze/snapshot
API) or switch the append stream to a new COW/active buffer/generation and then
read base_timestamp() and data() from that frozen generation so entries and
base_ts come from the same consistent generation; look for the snapshot
variable, db.iter(), guard.read(), base_timestamp(), and data() to implement the
barrier or buffer-swap solution.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/persistence/aof.rs`:
- Around line 286-289: The rewrite path for AofMessage::RewriteSharded currently
discards errors from writer.flush().await and
writer.get_ref().sync_data().await, risking silent loss of durability before
calling rewrite_aof_sharded_sync; change the code to check and handle the Result
from writer.flush().await and sync_data().await (log and return Err or
propagate) before proceeding to rewrite_aof_sharded_sync so failures abort the
rewrite flow, and apply the same checks to the other rewrite branch that mirrors
these calls; reference the async calls writer.flush().await,
writer.get_ref().sync_data().await and the rewrite function
rewrite_aof_sharded_sync to locate and update the logic.

---

Duplicate comments:
In `@src/persistence/aof.rs`:
- Around line 788-803: The snapshot code creates temp Database instances with
Database::new(), which resets base timestamps and rebases TTLs before calling
crate::persistence::rdb::save_to_bytes; change the temp creation to preserve the
source guard's base timestamp (e.g., construct temp with the same base timestamp
or call a setter like Database::with_base_timestamp(guard.base_timestamp()) or
temp.set_base_timestamp(guard.base_timestamp())) before copying entries so
is_expired_at/TTL calculations remain consistent; apply the same change to the
other identical snapshot block (the one around lines 835-851) so save_to_bytes
serializes using the original base timestamps rather than a freshly created one.
- Around line 693-712: The snapshot code races with concurrent appends because
it reads each DB via db.iter() using guard.read(), calling base_timestamp() and
data() separately, allowing writes to land in both the captured snapshot and the
incremental AOF; fix by creating an atomic snapshot barrier per database before
collecting entries — either acquire a lock that blocks appends (e.g., upgrade to
a write lock or call a freeze/snapshot API) or switch the append stream to a new
COW/active buffer/generation and then read base_timestamp() and data() from that
frozen generation so entries and base_ts come from the same consistent
generation; look for the snapshot variable, db.iter(), guard.read(),
base_timestamp(), and data() to implement the barrier or buffer-swap solution.

In `@src/persistence/rdb.rs`:
- Around line 729-864: The file is too large; extract the AOF/RDB byte-slice
loader into a submodule by moving load_from_bytes and its related helpers (the
EOF+CRC scanning loop, payload/CRC handling, count_entries_per_db, and any
helper that advances/reads entries like read_entry_zero_copy and scan/skip
helpers) into a new module, make the functions public(crate) or re-exported so
rdb.rs can call them, and replace the in-file implementation with a thin wrapper
that delegates to the new submodule; ensure types used (Database, RdbError,
Hasher, current_time_ms, Bytes) are imported into the new module and update
visibility/signatures so compilation and tests pass.
- Around line 750-754: The EOF+CRC scan can slice past the buffer when fewer
than 4 bytes remain after an EOF_MARKER; update the loop in the scan (the for i
in 5..... block that creates payload and checksum_bytes) to first check that i +
5 <= data.len() (or equivalently that data.len().saturating_sub(i + 1) >= 4)
before taking checksum_bytes = &data[i + 1..i + 5]; alternatively use safe
indexing (get(..)) and bail with a corruption error if the 4 checksum bytes are
not available, ensuring no panics occur on corrupt preambles.
- Around line 332-340: The loader currently catches entry parse errors in the
match arm `Err(e) => { tracing::warn!(...); break; }` (using `cursor.position()`
and `total_keys`) and then returns `Ok(...)`, allowing partial/corrupt snapshots
to be treated as success; change this to propagate a failure instead: replace
the `warn + break` with an early return of an Err constructed with contextual
information (include the offset/cursor.position() and the source error `e`) so
the caller receives a proper error; apply the same fix to the other identical
handler around the second occurrence (the block covering lines ~844-855) so both
loaders fail on corrupt entries rather than returning partial success.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 8cbdc4dd-bf63-4b12-8387-6b60a1809b7d

📥 Commits

Reviewing files that changed from the base of the PR and between b170861 and 608d018.

📒 Files selected for processing (3)
  • README.md
  • src/persistence/aof.rs
  • src/persistence/rdb.rs

Comment thread src/persistence/aof.rs
@qodo-code-review
Copy link
Copy Markdown

CI Feedback 🧐

A test triggered by this PR failed. Here is an AI-generated analysis of the failure:

Action: Test

Failed stage: Run cargo test --no-default-features --features runtime-tokio,jemalloc [❌]

Failed test name: ""

Failure summary:

The action failed due to a GitHub Actions job timeout, not because of failing tests.
- The step
cargo test --no-default-features --features runtime-tokio,jemalloc exceeded the configured 15-minute
limit and was terminated (##[error]... has timed out after 15 minutes. at log line 2094).
- The test
suite was still running long-running benchmark-like tests (e.g., recall_10k_* and other vector
recall/search tests reporting "has been running for over 60 seconds" at lines 2085-2091) when the
timeout occurred.

Relevant error logs:
1:  ##[group]Runner Image Provisioner
2:  Hosted Compute Agent
...

158:  env:
159:  CARGO_TERM_COLOR: always
160:  targets: 
161:  components: 
162:  ##[endgroup]
163:  ##[group]Run : set $CARGO_HOME
164:  �[36;1m: set $CARGO_HOME�[0m
165:  �[36;1mecho CARGO_HOME=${CARGO_HOME:-"$HOME/.cargo"} >> $GITHUB_ENV�[0m
166:  shell: /usr/bin/bash --noprofile --norc -e -o pipefail {0}
167:  env:
168:  CARGO_TERM_COLOR: always
169:  ##[endgroup]
170:  ##[group]Run : install rustup if needed
171:  �[36;1m: install rustup if needed�[0m
172:  �[36;1mif ! command -v rustup &>/dev/null; then�[0m
173:  �[36;1m  curl --proto '=https' --tlsv1.2 --retry 10 --retry-connrefused --location --silent --show-error --fail https://sh.rustup.rs | sh -s -- --default-toolchain none -y�[0m
174:  �[36;1m  echo "$CARGO_HOME/bin" >> $GITHUB_PATH�[0m
...

235:  �[36;1mif [ -z "${CARGO_REGISTRIES_CRATES_IO_PROTOCOL+set}" -o -f "/home/runner/work/_temp"/.implicit_cargo_registries_crates_io_protocol ]; then�[0m
236:  �[36;1m  if rustc +stable --version --verbose | grep -q '^release: 1\.6[89]\.'; then�[0m
237:  �[36;1m    touch "/home/runner/work/_temp"/.implicit_cargo_registries_crates_io_protocol || true�[0m
238:  �[36;1m    echo CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse >> $GITHUB_ENV�[0m
239:  �[36;1m  elif rustc +stable --version --verbose | grep -q '^release: 1\.6[67]\.'; then�[0m
240:  �[36;1m    touch "/home/runner/work/_temp"/.implicit_cargo_registries_crates_io_protocol || true�[0m
241:  �[36;1m    echo CARGO_REGISTRIES_CRATES_IO_PROTOCOL=git >> $GITHUB_ENV�[0m
242:  �[36;1m  fi�[0m
243:  �[36;1mfi�[0m
244:  shell: /usr/bin/bash --noprofile --norc -e -o pipefail {0}
245:  env:
246:  CARGO_TERM_COLOR: always
247:  CARGO_HOME: /home/runner/.cargo
248:  CARGO_INCREMENTAL: 0
249:  ##[endgroup]
250:  ##[group]Run : work around spurious network errors in curl 8.0
251:  �[36;1m: work around spurious network errors in curl 8.0�[0m
252:  �[36;1m# https://rust-lang.zulipchat.com/#narrow/stream/246057-t-cargo/topic/timeout.20investigation�[0m
...

326:  ... Restoring cache ...
327:  Cache hit for: v0-rust-test-Linux-x64-8b64806b-a58781a7
328:  Received 268435456 of 366758905 (73.2%), 246.9 MBs/sec
329:  Received 366758905 of 366758905 (100.0%), 253.3 MBs/sec
330:  Cache Size: ~350 MB (366758905 B)
331:  [command]/usr/bin/tar -xf /home/runner/work/_temp/bae94685-b876-43a5-a6bf-3d6d6a29af0f/cache.tzst -P -C /home/runner/work/moon/moon --use-compress-program unzstd
332:  Cache restored successfully
333:  Restored from cache key "v0-rust-test-Linux-x64-8b64806b-a58781a7" full match: true.
334:  ##[group]Run cargo test --no-default-features --features runtime-tokio,jemalloc
335:  �[36;1mcargo test --no-default-features --features runtime-tokio,jemalloc�[0m
336:  shell: /usr/bin/bash -e {0}
337:  env:
338:  CARGO_TERM_COLOR: always
339:  CARGO_HOME: /home/runner/.cargo
340:  CARGO_INCREMENTAL: 0
341:  CACHE_ON_FAILURE: false
342:  MOON_NO_URING: 1
...

393:  test acl::table::tests::test_check_command_permission_denied ... ok
394:  test acl::table::tests::test_del_user ... ok
395:  test acl::table::tests::test_list_users_sorted ... ok
396:  test acl::table::tests::test_check_key_permission_multikey ... ok
397:  test blocking::tests::test_fifo_order ... ok
398:  test acl::table::tests::test_check_key_permission_read_write_patterns ... ok
399:  test acl::table::tests::test_load_or_default_nopass ... ok
400:  test blocking::tests::test_has_waiters_empty ... ok
401:  test blocking::tests::test_register_and_pop_front ... ok
402:  test blocking::tests::test_remove_wait_cross_key ... ok
403:  test cluster::command::tests::test_addslots_updates_bitmap ... ok
404:  test cluster::command::tests::test_cluster_meet_adds_node ... ok
405:  test cluster::command::tests::test_cluster_info_contains_enabled ... ok
406:  test cluster::command::tests::test_delslots ... ok
407:  test cluster::command::tests::test_cluster_nodes_format ... ok
408:  test cluster::command::tests::test_failover_force_promotes_replica ... ok
409:  test acl::table::tests::test_load_or_default_with_password ... ok
410:  test cluster::command::tests::test_failover_invalid_subcommand ... ok
411:  test cluster::command::tests::test_failover_normal_sets_waiting_delay ... ok
412:  test cluster::command::tests::test_failover_rejects_on_master ... ok
413:  test cluster::command::tests::test_cluster_myid_length ... ok
414:  test cluster::command::tests::test_setslot_node_clears_migration ... ok
415:  test cluster::command::tests::test_setslot_migrating_importing ... ok
416:  test cluster::command::tests::test_failover_takeover_promotes_replica ... ok
417:  test cluster::command::tests::test_keyslot_foo ... ok
418:  test cluster::failover::tests::test_compute_failover_delay_includes_rank ... ok
419:  test cluster::failover::tests::test_try_mark_fail_needs_majority ... ok
420:  test cluster::gossip::tests::test_bad_magic_returns_err ... ok
421:  test cluster::gossip::tests::test_gossip_section_roundtrip ... ok
422:  test cluster::failover::tests::test_failover_initiates_when_master_fail ... ok
423:  test cluster::failover::tests::test_no_failover_when_master_healthy ... ok
424:  test cluster::gossip::tests::test_ping_roundtrip ... ok
425:  test cluster::failover::tests::test_failover_vote_epoch_guard ... ok
426:  test cluster::gossip::tests::test_pong_with_sections_roundtrip ... ok
427:  test cluster::gossip::tests::test_truncated_returns_err ... ok
428:  test cluster::migration::tests::test_get_keys_in_slot_filters_correctly ... ok
429:  test cluster::slots::tests::test_empty_hash_tag_uses_full_key ... ok
430:  test cluster::slots::tests::test_error_format ... ok
431:  test cluster::slots::tests::test_foo_slot ... ok
432:  test cluster::slots::tests::test_hash_tag_co_location ... ok
433:  test cluster::migration::tests::test_migrating_slot_returns_ask_route ... ok
434:  test cluster::slots::tests::test_local_shard_for_slot ... ok
435:  test cluster::tests::test_asking_flag_with_importing_slot ... ok
436:  test cluster::tests::test_asking_without_importing_still_moved ... ok
437:  test cluster::migration::tests::test_nodes_conf_roundtrip ... ok
438:  test cluster::tests::test_moved_error_frame_format ... ok
439:  test cluster::tests::test_my_node_id ... ok
440:  test cluster::tests::test_owns_slot_bitmap ... ok
441:  test cluster::tests::test_route_local_owned_slot ... ok
442:  test cluster::tests::test_route_moved_for_peer_slot ... ok
443:  test command::acl::tests::test_acl_cat_unknown_category ... ok
444:  test command::acl::tests::test_acl_cat_all_categories ... ok
445:  test command::acl::tests::test_acl_cat_string_category ... ok
446:  test command::acl::tests::test_acl_deluser ... ok
447:  test command::acl::tests::test_acl_list ... ok
448:  test command::acl::tests::test_acl_getuser_nonexistent ... ok
449:  test command::acl::tests::test_acl_load_no_aclfile ... ok
450:  test command::acl::tests::test_acl_deluser_default_fails ... ok
451:  test command::acl::tests::test_acl_log_and_reset ... ok
452:  test command::acl::tests::test_acl_save_no_aclfile ... ok
453:  test command::acl::tests::test_acl_setuser_and_getuser ... ok
454:  test command::acl::tests::test_acl_unknown_subcommand ... ok
455:  test command::client::tests::test_parse_tracking_off ... ok
456:  test command::acl::tests::test_acl_whoami ... ok
457:  test command::client::tests::test_parse_tracking_on ... ok
458:  test command::client::tests::test_parse_tracking_on_bcast ... ok
459:  test command::acl::tests::test_acl_save_and_load ... ok
460:  test command::acl::tests::test_acl_log_with_count ... ok
461:  test command::client::tests::test_parse_tracking_on_bcast_noloop_multiple_prefixes ... ok
462:  test command::client::tests::test_parse_tracking_on_bcast_prefix ... ok
463:  test command::client::tests::test_parse_tracking_on_noloop ... ok
464:  test command::client::tests::test_parse_tracking_on_redirect ... ok
465:  test command::client::tests::test_parse_tracking_prefix_without_bcast_fails ... ok
466:  test command::client::tests::test_parse_tracking_redirect_invalid_int ... ok
...

480:  test command::connection::tests::test_auth_acl_2arg_wrong_password ... ok
481:  test command::connection::tests::test_auth_no_password_configured ... ok
482:  test command::connection::tests::test_auth_acl_disabled_user ... ok
483:  test command::connection::tests::test_auth_acl_wrong_arity ... ok
484:  test command::connection::tests::test_auth_correct_password ... ok
485:  test command::connection::tests::test_auth_wrong_arity ... ok
486:  test command::connection::tests::test_auth_wrong_password ... ok
487:  test command::connection::tests::test_client_id_returns_integer ... ok
488:  test command::connection::tests::test_command_bare ... ok
489:  test command::connection::tests::test_command_docs ... ok
490:  test command::connection::tests::test_command_docs_lowercase ... ok
491:  test command::connection::tests::test_echo ... ok
492:  test command::connection::tests::test_echo_wrong_arity ... ok
493:  test command::connection::tests::test_hello_acl_no_args ... ok
494:  test command::connection::tests::test_hello_downgrade_to_resp2 ... ok
495:  test command::connection::tests::test_hello_acl_with_auth_failure ... ok
496:  test command::connection::tests::test_hello_no_args_returns_current_proto ... ok
497:  test command::connection::tests::test_hello_upgrade_to_resp3 ... ok
498:  test command::connection::tests::test_hello_noproto ... ok
499:  test command::connection::tests::test_hello_with_auth_failure ... ok
500:  test command::connection::tests::test_hello_with_auth_success ... ok
...

528:  test command::hash::tests::test_hincrbyfloat ... ok
529:  test command::hash::tests::test_hincrbyfloat_non_float_field ... ok
530:  test command::hash::tests::test_hkeys_hvals ... ok
531:  test command::hash::tests::test_hlen ... ok
532:  test command::hash::tests::test_hkeys_hvals_missing ... ok
533:  test command::hash::tests::test_hmget_missing_key ... ok
534:  test command::hash::tests::test_hmset_hmget ... ok
535:  test command::hash::tests::test_hscan_basic ... ok
536:  test command::hash::tests::test_hscan_missing_key ... ok
537:  test command::hash::tests::test_hscan_with_count ... ok
538:  test command::hash::tests::test_hscan_with_match ... ok
539:  test command::hash::tests::test_hset_wrong_args ... ok
540:  test command::hash::tests::test_hset_new_fields ... ok
541:  test command::hash::tests::test_hset_update_existing ... ok
542:  test command::hash::tests::test_hsetnx ... ok
543:  test command::hash::tests::test_wrongtype_error ... ok
544:  test command::key::tests::test_del_multiple ... ok
...

850:  test command::vector_search::tests::test_end_to_end_create_insert_search ... ok
851:  test command::tests::test_object_help ... ok
852:  test command::vector_search::tests::test_ft_create_duplicate ... ok
853:  test command::vector_search::tests::test_ft_create_missing_dim ... ok
854:  test command::vector_search::tests::test_ft_dropindex ... ok
855:  test command::vector_search::tests::test_ft_info_returns_correct_data ... ok
856:  test command::vector_search::tests::test_ft_search_dimension_mismatch ... ok
857:  test command::vector_search::tests::test_ft_search_empty_index ... ok
858:  test command::vector_search::tests::test_ft_create_parse_full_syntax ... ok
859:  test command::tests::test_object_encoding_hash_upgrade ... ok
860:  test command::vector_search::tests::test_ft_search_unknown_index ... ok
861:  test command::vector_search::tests::test_ft_search_with_filter_no_regression ... ok
862:  test command::vector_search::tests::test_merge_search_results_empty ... ok
863:  test command::vector_search::tests::test_ft_info ... ok
864:  test command::vector_search::tests::test_merge_search_results_combines_shards ... ok
865:  test command::vector_search::tests::test_merge_search_results_handles_errors ... ok
866:  test command::vector_search::tests::test_parse_filter_clause_none ... ok
...

877:  test config::tests::test_aclfile_default_none ... ok
878:  test config::tests::test_aclfile_custom ... ok
879:  test config::tests::test_custom_port ... ok
880:  test config::tests::test_custom_bind_and_databases ... ok
881:  test config::tests::test_default_values ... ok
882:  test config::tests::test_maxmemory_custom ... ok
883:  test config::tests::test_maxmemory_defaults ... ok
884:  test config::tests::test_persistence_defaults ... ok
885:  test config::tests::test_persistence_custom_values ... ok
886:  test config::tests::test_requirepass_default_none ... ok
887:  test config::tests::test_requirepass ... ok
888:  test config::tests::test_shards_custom ... ok
889:  test config::tests::test_shards_default ... ok
890:  test config::tests::test_to_runtime_config ... ok
891:  test config::tests::test_runtime_config_default ... ok
892:  test error::tests::moon_error_from_aof_error ... ok
893:  test error::tests::moon_error_from_io_error ... ok
894:  test error::tests::moon_error_from_rdb_error ... ok
895:  test config::tests::test_to_runtime_config_aclfile ... ok
896:  test error::tests::moon_error_from_snapshot_error ... ok
897:  test error::tests::moon_error_from_wal_error ... ok
898:  test error::tests::moon_result_alias_works ... ok
899:  test io::buf_ring::tests::test_buf_ring_manager_new ... ok
...

923:  test io::static_responses::tests::test_ok_bytes ... ok
924:  test io::static_responses::tests::test_null_array_bytes ... ok
925:  test io::static_responses::tests::test_null_bulk_bytes ... ok
926:  test io::static_responses::tests::test_one_bytes ... ok
927:  test io::static_responses::tests::test_pong_bytes ... ok
928:  test io::static_responses::tests::test_queued_bytes ... ok
929:  test io::static_responses::tests::test_zero_bytes ... ok
930:  test io::tests::test_conn_id_truncated_to_24_bits ... ok
931:  test io::tests::test_encode_decode_all_event_types ... ok
932:  test io::tests::test_encode_decode_max_aux ... ok
933:  test io::tests::test_encode_decode_max_conn_id ... ok
934:  test io::tests::test_encode_decode_roundtrip ... ok
935:  test io::tests::test_event_constants ... ok
936:  test io::tests::test_event_constants_unique ... ok
937:  test io::tokio_driver::tests::test_tokio_driver_type_exists ... ok
938:  test persistence::aof::tests::test_aof_replay_corrupt_truncated_logs_error_loads_what_it_can ... ok
939:  test persistence::aof::tests::test_aof_replay_collection_types ... ok
...

945:  test persistence::aof::tests::test_fsync_policy_from_str ... ok
946:  test persistence::aof::tests::test_generate_aof_command_produces_valid_resp_that_round_trips ... ok
947:  test persistence::aof::tests::test_aof_replay_with_select_switches_databases ... ok
948:  test persistence::aof::tests::test_generate_rewrite_commands_with_ttl ... ok
949:  test persistence::aof::tests::test_serialize_command_round_trip_hset ... ok
950:  test persistence::aof::tests::test_generate_rewrite_commands_all_5_types ... ok
951:  test persistence::auto_save::tests::test_parse_save_rules_empty_string ... ok
952:  test persistence::auto_save::tests::test_parse_save_rules_none ... ok
953:  test persistence::auto_save::tests::test_parse_save_rules_odd_count ... ok
954:  test persistence::auto_save::tests::test_parse_save_rules_single ... ok
955:  test persistence::auto_save::tests::test_parse_save_rules_standard ... ok
956:  test persistence::auto_save::tests::test_parse_save_rules_three_pairs ... ok
957:  test persistence::aof::tests::test_generate_rewrite_round_trip_preserves_state ... ok
958:  test persistence::rdb::tests::test_empty_database_produces_valid_rdb ... ok
959:  test persistence::rdb::tests::test_expired_keys_skipped_during_save ... ok
960:  test persistence::rdb::tests::test_missing_file_returns_error ... ok
961:  test persistence::rdb::tests::test_multi_database_round_trip ... ok
...

1005:  test persistence::snapshot::tests::test_snapshot_per_segment_crc32 ... ok
1006:  test persistence::wal::tests::test_wal_replay_round_trip ... ok
1007:  test persistence::wal::tests::test_wal_multiple_appends_batched ... ok
1008:  test persistence::wal::tests::test_wal_v1_backward_compat ... ok
1009:  test persistence::wal::tests::test_wal_replay_with_collections ... ok
1010:  test persistence::wal::tests::test_wal_v2_block_crc_valid ... ok
1011:  test persistence::wal::tests::test_wal_v2_empty_after_header ... ok
1012:  test persistence::wal::tests::test_wal_truncate_after_snapshot ... ok
1013:  test protocol::frame::tests::frame_size_measurement ... ok
1014:  test protocol::frame::tests::test_frame_empty_array_is_valid ... ok
1015:  test protocol::frame::tests::test_frame_null_not_equal_to_empty_bulk_string ... ok
1016:  test protocol::frame::tests::test_frame_simple_string_debug_clone_partialeq ... ok
1017:  test protocol::frame::tests::test_parse_config_default_max_array_depth ... ok
1018:  test protocol::frame::tests::test_parse_config_default_max_array_length ... ok
1019:  test protocol::frame::tests::test_parse_config_default_max_bulk_string_size ... ok
1020:  test protocol::frame::tests::test_parse_error_incomplete_display ... ok
1021:  test persistence::wal::tests::test_wal_v2_header_format ... ok
1022:  test protocol::frame::tests::test_parse_error_invalid_display ... ok
1023:  test protocol::inline::tests::test_parse_inline_buffer_consumed ... ok
...

1031:  test protocol::inline::tests::test_parse_inline_ping ... ok
1032:  test protocol::inline::tests::test_parse_inline_whitespace_only ... ok
1033:  test protocol::inline::tests::test_parse_inline_tab_separated ... ok
1034:  test protocol::inline::tests::test_parse_inline_set_key_value ... ok
1035:  test protocol::inline::tests::test_parse_inline_sequential ... ok
1036:  test protocol::parse::tests::test_buffer_consumed_after_parse ... ok
1037:  test protocol::parse::tests::test_parse_array_depth_exceeding_max ... ok
1038:  test protocol::parse::tests::test_parse_array_of_bulk_strings ... ok
1039:  test protocol::parse::tests::test_parse_bulk_string_exceeding_max_size ... ok
1040:  test protocol::parse::tests::test_parse_array_with_null_element ... ok
1041:  test protocol::parse::tests::test_parse_binary_data_in_bulk_string ... ok
1042:  test protocol::parse::tests::test_parse_bulk_string ... ok
1043:  test protocol::parse::tests::test_parse_empty_array ... ok
1044:  test protocol::parse::tests::test_parse_empty_buffer ... ok
1045:  test protocol::parse::tests::test_parse_empty_bulk_string ... ok
1046:  test protocol::parse::tests::test_parse_error ... ok
1047:  test protocol::parse::tests::test_parse_incomplete_bulk_string ... ok
...

1067:  test protocol::parse::tests::test_parse_resp3_null ... ok
1068:  test protocol::parse::tests::test_parse_resp3_push ... ok
1069:  test protocol::parse::tests::test_parse_resp3_set ... ok
1070:  test protocol::parse::tests::test_parse_resp3_verbatim_string ... ok
1071:  test protocol::parse::tests::test_parse_resp_array_not_inline ... ok
1072:  test protocol::parse::tests::test_parse_resp_simple_string_not_inline ... ok
1073:  test protocol::parse::tests::test_parse_simple_string ... ok
1074:  test protocol::parse::tests::test_parse_simple_string_long ... ok
1075:  test protocol::parse::tests::test_parse_two_frames_sequentially ... ok
1076:  test protocol::resp3::tests::test_array_to_map ... ok
1077:  test protocol::resp3::tests::test_array_to_map_empty_passthrough ... ok
1078:  test protocol::resp3::tests::test_array_to_set ... ok
1079:  test protocol::resp3::tests::test_bulk_to_double ... ok
1080:  test protocol::resp3::tests::test_bulk_to_double_null ... ok
1081:  test protocol::resp3::tests::test_int_to_bool ... ok
1082:  test protocol::resp3::tests::test_maybe_convert_error_passthrough ... ok
1083:  test protocol::resp3::tests::test_maybe_convert_get_unchanged ... ok
1084:  test protocol::resp3::tests::test_maybe_convert_hgetall_resp2_unchanged ... ok
1085:  test protocol::resp3::tests::test_maybe_convert_null_passthrough ... ok
1086:  test protocol::resp3::tests::test_maybe_convert_hgetall_resp3 ... ok
1087:  test protocol::resp3::tests::test_maybe_convert_sismember_resp3 ... ok
1088:  test protocol::resp3::tests::test_maybe_convert_smembers_resp3 ... ok
1089:  test protocol::resp3::tests::test_maybe_convert_zscore_resp3 ... ok
1090:  test protocol::serialize::tests::test_resp2_downgrade_boolean_to_integer ... ok
1091:  test protocol::serialize::tests::test_resp2_downgrade_double_to_bulk_string ... ok
1092:  test protocol::serialize::tests::test_resp2_downgrade_map_to_flat_array ... ok
1093:  test protocol::serialize::tests::test_resp2_downgrade_set_to_array ... ok
1094:  test protocol::serialize::tests::test_resp2_null_still_dollar_minus_one ... ok
1095:  test protocol::serialize::tests::test_round_trip_array ... ok
1096:  test protocol::serialize::tests::test_round_trip_bulk_string ... ok
1097:  test protocol::serialize::tests::test_round_trip_error ... ok
1098:  test protocol::serialize::tests::test_round_trip_integer ... ok
...

1102:  test protocol::serialize::tests::test_round_trip_resp3_big_number ... ok
1103:  test protocol::serialize::tests::test_round_trip_resp3_boolean ... ok
1104:  test protocol::serialize::tests::test_round_trip_resp3_double ... ok
1105:  test protocol::serialize::tests::test_round_trip_resp3_map ... ok
1106:  test protocol::parse::tests::test_parse_null_array ... ok
1107:  test protocol::parse::tests::test_parse_resp3_boolean_invalid ... ok
1108:  test protocol::serialize::tests::test_round_trip_resp3_null ... ok
1109:  test protocol::serialize::tests::test_round_trip_resp3_push ... ok
1110:  test protocol::serialize::tests::test_round_trip_resp3_verbatim_string ... ok
1111:  test protocol::serialize::tests::test_round_trip_resp3_set ... ok
1112:  test protocol::serialize::tests::test_round_trip_simple_string ... ok
1113:  test protocol::serialize::tests::test_serialize_array_of_bulk_strings ... ok
1114:  test protocol::serialize::tests::test_serialize_bulk_string ... ok
1115:  test protocol::serialize::tests::test_serialize_empty_array ... ok
1116:  test protocol::serialize::tests::test_serialize_empty_bulk_string ... ok
1117:  test protocol::serialize::tests::test_serialize_error ... ok
1118:  test protocol::serialize::tests::test_serialize_integer_negative ... ok
...

1196:  test scripting::sandbox::tests::test_sandbox_allows_string_math_table ... ok
1197:  test scripting::cache::tests::test_flush ... ok
1198:  test scripting::sandbox::tests::test_sandbox_blocks_os_other_fns ... ok
1199:  test scripting::sandbox::tests::test_sandbox_removes_dangerous_globals ... ok
1200:  test scripting::tests::test_handle_eval_basic ... ok
1201:  test scripting::tests::test_handle_evalsha_after_eval ... ok
1202:  test scripting::sandbox::tests::test_timeout_hook ... ok
1203:  test scripting::tests::test_handle_evalsha_noscript ... ok
1204:  test scripting::tests::test_handle_script_subcommand_exists ... ok
1205:  test scripting::tests::test_handle_script_subcommand_flush ... ok
1206:  test scripting::tests::test_handle_script_subcommand_load ... ok
1207:  test scripting::tests::test_parse_eval_args_basic ... ok
1208:  test scripting::tests::test_parse_eval_args_too_few_args ... ok
1209:  test scripting::tests::test_parse_eval_args_numkeys_exceeds_args ... ok
1210:  test scripting::tests::test_parse_eval_args_with_keys_and_argv ... ok
1211:  test scripting::tests::test_run_script_redis_pcall_catches_error ... ok
1212:  test scripting::tests::test_run_script_keys_argv ... ok
1213:  test scripting::tests::test_run_script_simple ... ok
1214:  test scripting::tests::test_run_script_type_conversions ... ok
1215:  test scripting::tests::test_run_script_with_redis_call ... ok
1216:  test scripting::types::tests::test_frame_array_to_lua ... ok
1217:  test scripting::tests::test_setup_lua_vm ... ok
1218:  test scripting::types::tests::test_frame_boolean_to_lua ... ok
1219:  test scripting::types::tests::test_frame_bulk_string_to_lua ... ok
1220:  test scripting::types::tests::test_frame_double_to_lua ... ok
1221:  test scripting::types::tests::test_frame_integer_to_lua ... ok
1222:  test scripting::types::tests::test_frame_error_to_lua ... ok
1223:  test scripting::types::tests::test_frame_null_to_lua ... ok
...

1697:  test vector::mvcc::visibility::tests::test_non_transactional_read_sees_committed ... ok
1698:  test vector::mvcc::visibility::tests::test_read_your_own_writes_even_after_snapshot ... ok
1699:  test vector::mvcc::visibility::tests::test_read_your_own_writes_visible ... ok
1700:  test vector::persistence::recovery::tests::test_recover_checkpoint_records_lsn ... ok
1701:  test vector::persistence::recovery::tests::test_recover_committed_txn_survives ... ok
1702:  test vector::persistence::recovery::tests::test_recover_corrupt_crc_stops_replay ... ok
1703:  test vector::persistence::recovery::tests::test_recover_empty_wal_and_no_segments ... ok
1704:  test vector::persistence::recovery::tests::test_recover_mutable_delete_marks_entry ... ok
1705:  test vector::persistence::recovery::tests::test_recover_mutable_delete_nonexistent_no_panic ... ok
1706:  test vector::persistence::recovery::tests::test_recover_mutable_upsert_count ... ok
1707:  test vector::persistence::recovery::tests::test_recover_txn_abort_rolls_back ... ok
1708:  test vector::persistence::recovery::tests::test_recover_uncommitted_at_eof_rolled_back ... ok
1709:  test vector::persistence::recovery::tests::test_recover_vector_store_from_wal ... ok
1710:  test vector::persistence::recovery::tests::test_wal_writer_append_vector_record_roundtrip ... ok
1711:  test vector::persistence::segment_io::tests::test_checksum_mismatch_on_read ... ok
1712:  test vector::persistence::segment_io::tests::test_missing_graph_file_returns_error ... ok
1713:  test vector::persistence::segment_io::tests::test_roundtrip_preserves_counts ... ok
1714:  test vector::persistence::segment_io::tests::test_roundtrip_search_works ... ok
1715:  test vector::persistence::segment_io::tests::test_segment_meta_valid_json ... ok
1716:  test vector::persistence::segment_io::tests::test_write_creates_4_files ... ok
1717:  test vector::persistence::wal_record::tests::test_checkpoint_roundtrip ... ok
1718:  test vector::persistence::wal_record::tests::test_crc_mismatch_returns_error ... ok
1719:  test vector::persistence::wal_record::tests::test_delete_roundtrip ... ok
1720:  test vector::persistence::wal_record::tests::test_from_wal_frame_rejects_bad_tag ... ok
1721:  test vector::persistence::wal_record::tests::test_to_wal_frame_has_tag_and_length ... ok
1722:  test vector::persistence::wal_record::tests::test_truncated_frame_returns_error ... ok
1723:  test vector::persistence::wal_record::tests::test_txn_abort_roundtrip ... ok
1724:  test vector::persistence::wal_record::tests::test_txn_commit_roundtrip ... ok
1725:  test vector::persistence::wal_record::tests::test_upsert_roundtrip ... ok
1726:  test vector::segment::compaction::tests::test_assign_to_cells_partitions_all_vectors ... ok
1727:  test vector::segment::compaction::tests::test_compact_100_vectors ... ok
1728:  test vector::segment::compaction::tests::test_compact_empty_returns_error ... ok
1729:  test vector::segment::compaction::tests::test_compact_filters_deleted ... ok
...

1801:  test vector::turbo_quant::codebook::tests::test_centroids_sorted_ascending ... ok
1802:  test vector::turbo_quant::codebook::tests::test_centroids_symmetric ... ok
1803:  test vector::turbo_quant::codebook::tests::test_code_bytes_per_vector ... ok
1804:  test vector::turbo_quant::codebook::tests::test_codebook_version ... ok
1805:  test vector::turbo_quant::codebook::tests::test_quantize_centroids_are_fixed_points ... ok
1806:  test vector::turbo_quant::codebook::tests::test_quantize_extreme_values ... ok
1807:  test vector::turbo_quant::codebook::tests::test_quantize_just_below_boundary ... ok
1808:  test vector::turbo_quant::codebook::tests::test_quantize_with_boundaries_n_1bit ... ok
1809:  test vector::turbo_quant::codebook::tests::test_quantize_with_boundaries_n_2bit ... ok
1810:  test vector::turbo_quant::codebook::tests::test_quantize_with_boundaries_n_3bit ... ok
1811:  test vector::turbo_quant::codebook::tests::test_scaled_centroids_n_sizes ... ok
1812:  test vector::turbo_quant::codebook::tests::test_scaled_centroids_n_values ... ok
1813:  test vector::turbo_quant::collection::tests::test_bits_helper ... ok
1814:  test vector::turbo_quant::collection::tests::test_checksum_changes_when_quantization_changes ... ok
1815:  test vector::turbo_quant::collection::tests::test_checksum_deterministic ... ok
1816:  test vector::turbo_quant::collection::tests::test_checksum_mismatch_error_display ... ok
1817:  test vector::turbo_quant::collection::tests::test_code_bytes_per_vector ... ok
...

1902:  test vector::turbo_quant::tq_adc::tests::test_tq_l2_adc_multibit_self_distance_2bit ... ok
1903:  test vector::turbo_quant::tq_adc::tests::test_tq_l2_adc_multibit_self_distance_3bit ... ok
1904:  test vector::turbo_quant::tq_adc::tests::test_tq_l2_distant_vectors ... ok
1905:  test vector::turbo_quant::tq_adc::tests::test_tq_l2_matches_decoded_l2 ... ok
1906:  test vector::turbo_quant::tq_adc::tests::test_tq_l2_non_negative ... ok
1907:  test vector::turbo_quant::tq_adc::tests::test_tq_l2_norm_scaling ... ok
1908:  test vector::turbo_quant::tq_adc::tests::test_tq_l2_self_distance_small ... ok
1909:  test vector::types::tests::test_distance_metric_repr ... ok
1910:  test vector::types::tests::test_search_result_ordering ... ok
1911:  test vector::types::tests::test_vector_id_newtype ... ok
1912:  test vector::segment::ivf::tests::test_recall_at_10_nprobe_32 ... ok
1913:  test vector::hnsw::search_sq::tests::test_f32_recall_10k_128d has been running for over 60 seconds
1914:  test vector::hnsw::search_sq::tests::test_f32_recall_1k_768d has been running for over 60 seconds
1915:  test vector::hnsw::search_sq::tests::test_f32_recall_1k_768d ... ok
1916:  test vector::hnsw::search_sq::tests::test_f32_recall_10k_128d ... ok
1917:  test result: ok. 1558 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 291.60s
1918:  �[1m�[92m     Running�[0m unittests src/main.rs (target/debug/deps/moon-948e28c1944183ef)
1919:  running 0 tests
1920:  test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s
1921:  �[1m�[92m     Running�[0m tests/integration.rs (target/debug/deps/integration-4146a66fb58437b2)
...

2023:  test test_sharded_pubsub_psubscribe ... ok
2024:  test test_sharded_pubsub_subscribe ... ok
2025:  test test_sharded_pubsub_publish_count ... ok
2026:  test test_sharded_scan_all_shards ... ok
2027:  test test_sharded_set_commands ... ok
2028:  test test_sharded_sorted_set_commands ... ok
2029:  test test_sharded_pubsub_unsubscribe_cleanup ... ok
2030:  test test_sharded_set_get_across_shards ... ok
2031:  test test_sorted_set_commands ... ok
2032:  test test_type_command ... ok
2033:  test test_sharded_transaction_same_shard ... ok
2034:  test test_type_command_all_types ... ok
2035:  test test_unlink ... ok
2036:  test test_watch_abort ... ok
2037:  test test_watch_success ... ok
2038:  test test_wrongtype_error ... ok
2039:  test result: ok. 106 passed; 0 failed; 10 ignored; 0 measured; 0 filtered out; finished in 3.70s
2040:  �[1m�[92m     Running�[0m tests/replication_test.rs (target/debug/deps/replication_test-1fcdc07635938e19)
2041:  running 5 tests
2042:  test test_readonly_replica ... ok
2043:  test test_replconf_ok ... ok
2044:  test test_info_replication_master ... ok
2045:  test test_replicaof_no_one ... ok
2046:  test test_wait_no_replicas ... ok
2047:  test result: ok. 5 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.21s
2048:  �[1m�[92m     Running�[0m tests/vector_edge_cases.rs (target/debug/deps/vector_edge_cases-e23b5615a763ee78)
2049:  running 16 tests
2050:  test test_delete_nonexistent_id ... ok
2051:  test test_drop_nonexistent_index ... ok
2052:  test test_duplicate_index_create ... ok
2053:  test test_ft_create_missing_args ... ok
2054:  test test_ft_create_invalid_dim ... ok
2055:  test test_empty_index_search ... ok
2056:  test test_ft_create_missing_schema ... ok
2057:  test test_ft_dropindex_missing_args ... ok
2058:  test test_ft_info_nonexistent_index ... ok
2059:  test test_ft_search_missing_query_vector ... ok
2060:  test test_ft_search_dimension_mismatch_returns_error ... ok
2061:  test test_ft_search_nonexistent_index ... ok
2062:  test test_search_k_larger_than_index ... ok
2063:  test test_search_k_zero ... ok
2064:  test test_zero_vector_insert_and_search ... ok
2065:  test test_max_dimension_3072 ... ok
2066:  test result: ok. 16 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s
2067:  �[1m�[92m     Running�[0m tests/vector_insert_bench.rs (target/debug/deps/vector_insert_bench-b06e58f0dc73c6ac)
2068:  running 4 tests
2069:  test bench_full_insert_pipeline_128d ... ok
2070:  test bench_raw_append_768d ... ok
2071:  test bench_raw_append_128d ... ok
2072:  test bench_full_insert_pipeline_768d ... ok
2073:  test result: ok. 4 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 6.19s
2074:  �[1m�[92m     Running�[0m tests/vector_memory_audit.rs (target/debug/deps/vector_memory_audit-428c84ae3d631081)
2075:  running 4 tests
2076:  test test_memory_budget_1m_768d_tq4 ... ok
2077:  test test_aligned_buffer_no_waste ... ok
2078:  test test_struct_sizes ... ok
2079:  test test_per_vector_overhead_breakdown ... ok
2080:  test result: ok. 4 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.03s
2081:  �[1m�[92m     Running�[0m tests/vector_recall_benchmark.rs (target/debug/deps/vector_recall_benchmark-cc7d8c28982961e2)
2082:  running 8 tests
2083:  test recall_1k_128d_ef128 ... ok
2084:  test recall_1k_128d_ef64 ... ok
2085:  test recall_10k_128d_ef128 has been running for over 60 seconds
2086:  test recall_10k_768d_ef128 has been running for over 60 seconds
2087:  test recall_10k_768d_ef256 has been running for over 60 seconds
2088:  test recall_1k_768d_ef128 has been running for over 60 seconds
2089:  test recall_1k_768d_ef128 ... ok
2090:  test recall_debug_1k_128d ... ok
2091:  test recall_f32_hnsw_10k_128d_ef200 has been running for over 60 seconds
2092:  test recall_10k_128d_ef128 ... ok
2093:  test recall_f32_hnsw_10k_128d_ef200 ... ok
2094:  ##[error]The action 'Run cargo test --no-default-features --features runtime-tokio,jemalloc' has timed out after 15 minutes.
2095:  Post job cleanup.

TinDang97 added a commit that referenced this pull request Apr 8, 2026
Ports the multi-part AOF persistence work from feat/persistence-overhaul
(PR #37) as a fresh squash onto post-disk-offload main, dropping the
stale branch's obsolete accept-loop / SO_REUSEPORT / cfg-gate changes.

Additive content:
- src/persistence/aof_manifest.rs — appendonlydir/ manifest + multi-part replay
- src/persistence/rdb.rs — save_to_bytes / load_from_bytes + fast bulk loader
  (count_entries_per_db + Database::reserve + insert_for_load)
- src/persistence/aof.rs — writer rewrite: waits for manifest, handles
  AofMessage::Rewrite{,Sharded} via do_rewrite_{single,sharded}, detects
  MOON magic prefix for RDB-preamble replay
- src/command/persistence.rs — bgrewriteaof_start_sharded
- src/storage/compact_value.rs — heap_string_owned / heap_string_vec_direct
- src/storage/db.rs — insert_for_load / reserve / recalculate_memory
- src/shard/shared_databases.rs — all_shard_dbs()
- src/server/conn/handler_{sharded,monoio}.rs — BGREWRITEAOF dispatch
- src/main.rs — replay_multi_part layered on v2/v3 recovery, manifest
  initialized after recovery so the writer thread can unblock

Coexistence rule: when appendonlydir/ manifest is present it is
authoritative; legacy appendonly.aof fallback (handled by v2 recovery
inside restore_from_persistence) only fires when no manifest exists —
covering first-upgrade from pre-multi-part moon.

Known limitation: multi-part replay is single-shard only; multi-shard
clusters log a warning and fall back to v2/v3 recovery.

Validation (moon-dev OrbStack, Linux aarch64):
  cargo fmt --check                                           ok
  cargo clippy --release -- -D warnings                       ok
  cargo clippy --no-default-features --features runtime-tokio,jemalloc -- -D warnings  ok
  cargo test --release --lib                                  1858 pass
  cargo test --no-default-features --features runtime-tokio,jemalloc --lib  1877 pass

Two failures in cargo test --release --lib (test_inline_set,
test_inline_set_with_aof) and tests/replication_test.rs reproduce on
clean main — pre-existing, not introduced by this change.
@TinDang97
Copy link
Copy Markdown
Collaborator Author

Superseded by #63, which ports the multi-part AOF + BGREWRITEAOF + fast RDB loader as a fresh squash onto post-disk-offload main and fixes six correctness issues found in review (including a P0 double-apply bug in the rewrite ordering). This branch was 247 commits behind main and rebasing was infeasible.

@TinDang97 TinDang97 closed this Apr 8, 2026
pilotspacex-byte pushed a commit that referenced this pull request Apr 9, 2026
* feat: multi-part AOF + BGREWRITEAOF monoio + RDB loader 3x

Ports the multi-part AOF persistence work from feat/persistence-overhaul
(PR #37) as a fresh squash onto post-disk-offload main, dropping the
stale branch's obsolete accept-loop / SO_REUSEPORT / cfg-gate changes.

Additive content:
- src/persistence/aof_manifest.rs — appendonlydir/ manifest + multi-part replay
- src/persistence/rdb.rs — save_to_bytes / load_from_bytes + fast bulk loader
  (count_entries_per_db + Database::reserve + insert_for_load)
- src/persistence/aof.rs — writer rewrite: waits for manifest, handles
  AofMessage::Rewrite{,Sharded} via do_rewrite_{single,sharded}, detects
  MOON magic prefix for RDB-preamble replay
- src/command/persistence.rs — bgrewriteaof_start_sharded
- src/storage/compact_value.rs — heap_string_owned / heap_string_vec_direct
- src/storage/db.rs — insert_for_load / reserve / recalculate_memory
- src/shard/shared_databases.rs — all_shard_dbs()
- src/server/conn/handler_{sharded,monoio}.rs — BGREWRITEAOF dispatch
- src/main.rs — replay_multi_part layered on v2/v3 recovery, manifest
  initialized after recovery so the writer thread can unblock

Coexistence rule: when appendonlydir/ manifest is present it is
authoritative; legacy appendonly.aof fallback (handled by v2 recovery
inside restore_from_persistence) only fires when no manifest exists —
covering first-upgrade from pre-multi-part moon.

Known limitation: multi-part replay is single-shard only; multi-shard
clusters log a warning and fall back to v2/v3 recovery.

Validation (moon-dev OrbStack, Linux aarch64):
  cargo fmt --check                                           ok
  cargo clippy --release -- -D warnings                       ok
  cargo clippy --no-default-features --features runtime-tokio,jemalloc -- -D warnings  ok
  cargo test --release --lib                                  1858 pass
  cargo test --no-default-features --features runtime-tokio,jemalloc --lib  1877 pass

Two failures in cargo test --release --lib (test_inline_set,
test_inline_set_with_aof) and tests/replication_test.rs reproduce on
clean main — pre-existing, not introduced by this change.

* fix(aof): correctness issues in multi-part rewrite and manifest loading

Addresses senior-rust-engineer review of #63. Six fixes across P0/P1:

1. AofManifest::load returns Result<Option<Self>, io::Error>
   Previous silent-None-on-corruption caused main.rs to call
   initialize() and overwrite the corrupt manifest, destroying the
   reference to the real base RDB and losing all persisted data.
   Corrupt manifest is now fatal at startup.

2. Orphan cleanup on manifest load
   Scans appendonlydir/ for stray moon.aof.{N}.{base.rdb,incr.aof,
   *.tmp} with N != current seq and deletes them. Previously a crash
   between advance() phases 1-3 left zombie base RDBs that never got
   referenced by any manifest, filling disk over repeated failures.

3. replay_incr_resp: fail-hard on parse error
   Previous impl did buf.split_to(1) + scan for next '*' byte, silently
   dropping runs of corrupt commands. '*' can legitimately appear
   inside bulk-string payloads, so resync was unsound. Recovery of a
   corrupt incr log is now an error, not silent data loss.

4. Rewrite ordering: drain + lock + snapshot + drain
   P0 bug: non-idempotent commands (INCR, LPUSH, SADD, ZADD, HINCRBY,
   APPEND, etc.) were double-applied on recovery after BGREWRITEAOF.
   The handler applies the write to the DB synchronously, then sends
   an AofMessage::Append asynchronously. During rewrite, appends queue
   in the channel while the writer thread is in do_rewrite_*. After
   rewrite, queued appends are processed into the NEW incr. If the
   write happened BEFORE the snapshot captured its shard, the write
   is both in base AND in new incr → replay double-applies.

   Fix: in do_rewrite_single/sharded, (a) drain the channel to the
   old incr and fsync, (b) acquire write locks on all (shard, db)
   pairs simultaneously, (c) drain once more to catch appends completed
   between step a and step b, (d) snapshot under the locks, (e) release
   locks, (f) write new base + advance manifest + reopen.

   Invariant: any write captured in the new base is NOT in the new
   incr (handlers were blocked), and any write NOT in the new base IS
   in the new incr (queued after lock release). Creates a brief global
   write pause during snapshot — acceptable cost for correctness.

5. AOF writer honors corrupt-manifest error
   Writer thread exits with a loud log instead of spinning on load()
   forever when the manifest is corrupt, so server startup fails fast.

6. Database::reserve debug_assert empty table
   Previous impl silently replaced the DashTable regardless of current
   contents — caller who misused reserve() on a populated database
   would lose all data without warning. Debug assertion catches the
   misuse in tests.

Validation (moon-dev OrbStack):
  cargo fmt --check                                           ok
  cargo clippy --release -- -D warnings                       ok
  cargo clippy --no-default-features --features runtime-tokio,jemalloc -- -D warnings  ok
  cargo test --release --lib                                  1858 pass
  (test_inline_set* pre-existing failures on main, unrelated)

* fix(aof): legacy upgrade double-replay, base rdb fsync, missing-base guard

Addresses four real issues from PR #63 review (qodo + coderabbit). Three
were P0/P1 correctness bugs; one is correctness-adjacent.

1. Legacy AOF double-replay on upgrade (qodo #2 / coderabbit critical)
   On first upgrade from legacy single-file AOF, restore_from_persistence
   replays appendonly.aof into the databases, then main.rs used to call
   initialize() which created an empty manifest (no base). On the NEXT
   boot the multi-part replay path ran clear() and then had nothing to
   load → all legacy state lost. Additionally the legacy file remained
   on disk, so v2 recovery kept replaying it on subsequent boots,
   double-applying on top of whatever multi-part state existed.

   Fix: at first upgrade, if restore_from_persistence loaded any state,
   serialize it via rdb::save_to_bytes and create the manifest with a
   real base seq 1 via AofManifest::initialize_with_base(). Rename the
   legacy appendonly.aof to appendonly.aof.legacy so v2 recovery on the
   next boot can't find it. Also retire the legacy file after a
   successful replay_multi_part for the second-boot case.

2. Base RDB not fsynced before manifest publish (qodo #6)
   AofManifest::advance used std::fs::write + rename, which renames an
   open file whose contents aren't guaranteed to be on disk. A crash
   after the manifest write could publish a seq pointing at a base whose
   contents weren't durable.

   Fix: explicit File::create + write_all + sync_data + rename. Same
   pattern applied to initialize_with_base.

3. Multi-part replay clears databases before loading
   Prevents the double-apply of non-idempotent commands from any state
   that earlier recovery phases (WAL, legacy AOF) may have loaded. The
   multi-part AOF is the authoritative source.

4. Missing base + non-empty incr is now an error (coderabbit minor)
   Previously warned and continued, which would apply deltas on empty
   state. Now returns an error. Empty-incr case (fresh initialize) is
   still tolerated.

Already addressed in earlier commits (noted in review):
 - coderabbit: infinite busy-wait on missing manifest — ca2ec51
 - coderabbit: silent corruption skip in replay_incr_resp — ca2ec51
 - qodo #3: monoio manifest wait can hang on corrupt — ca2ec51
 - qodo #4: corrupt manifest reinitialized — ca2ec51

Validation on moon-dev (Linux aarch64):
  cargo fmt --check                                           ok
  cargo clippy --release -- -D warnings                       ok
  cargo clippy --no-default-features --features runtime-tokio,jemalloc -- -D warnings  ok
  cargo test --release --lib                                  1858 pass

Manual smoke tests passed:
  1. BGREWRITEAOF with mixed R/W load — seq advanced, manifest correct
  2. Crash-recovery kill -9 mid-rewrite — 3000/3000 keys recovered
  3. Double-apply regression: 2000 concurrent INCRs during BGREWRITEAOF —
     base had snapshot state, incr had remainder, restart counter=2000
  4. First-upgrade from legacy appendonly.aof — state captured as base
     seq 1, legacy file retired, all keys survive next boot without
     BGREWRITEAOF
  5. Corrupt manifest (seq 0) — server refuses to start with clear error

* chore(rdb): remove dead zero-copy plumbing

Addresses the last two cosmetic PR #63 comments:

- qodo: #[allow(dead_code)] on read_bytes_zero_copy lacked justification
- coderabbit: unnecessary Bytes::copy_from_slice(data) full-buffer copy
  fed into read_entry_zero_copy's ignored _shared_buf parameter

The "zero-copy" path through read_entry_zero_copy was never actually
zero-copy — read_bytes_zero_copy was defined but never called, and
read_entry_zero_copy ignored the buffer it was given. This commit:

- Deletes read_bytes_zero_copy (truly dead)
- Removes the unused _shared_buf parameter from read_entry_zero_copy
- Removes the Bytes::copy_from_slice(data) allocation in
  load_from_bytes that existed solely to feed that parameter
- Updates the two call sites
- Documents the rationale so a future zero-copy revival adds the
  plumbing as part of a landed change, not as vestigial code

No behavior change; smaller binary, one fewer full-buffer allocation
on RDB load_from_bytes.

Validation (moon-dev):
  cargo fmt --check                                            ok
  cargo clippy --release -- -D warnings                        ok
  cargo clippy --no-default-features --features runtime-tokio,jemalloc -- -D warnings  ok

* fix(rdb): load into temp databases then swap, preventing stale key survival

An RDB file (both standalone .rdb and AOF RDB-preamble) is a full
point-in-time snapshot. Loading it must replace the in-memory state,
not merge into it. Previously both rdb::load() and rdb::load_from_bytes()
called insert_for_load() directly on the live databases, so keys that
existed before the load but were absent from the RDB snapshot silently
survived — producing mixed state.

Fix: both load paths now create temporary Database instances, load
entries into them, then swap the temps into the live slots on success.
This provides:
  - Correctness: old keys not in the snapshot are gone after load.
  - Atomicity: if the load fails partway, original state is untouched.
  - Consistent metadata: recalculate_memory runs on temps before swap,
    so used_memory reflects exactly the loaded state.

The swap is safe w.r.t. cold_index/cold_shard_dir because main.rs
initializes those fields after restore_from_persistence completes.

Validation (moon-dev):
  cargo fmt --check                                            ok
  cargo clippy --release / --runtime-tokio,jemalloc            ok
  cargo test --release --lib                                   1858 pass
  Manual: 150 keys through BGREWRITEAOF + incr + restart       PASS

* fix(aof,rdb): manifest validation, RDB error propagation, CRC O(n), writer timeout

Four fixes from code review:

1. aof_manifest::load — validate base/incr records before orphan cleanup

   A truncated manifest containing only "seq 2" (no base/incr lines)
   would pass the seq > 0 check, then cleanup_orphans would delete
   files matching the PREVIOUS valid seq — destroying the actual
   recovery data. Now load() requires all three records (seq, base,
   incr) to be present. Truncated manifests return Err, which callers
   already treat as fatal.

2. rdb::load + rdb::load_from_bytes — return Err on corrupted entries

   Both load paths now load into temp_dbs, but on Err from
   read_entry_zero_copy they used to break and fall through to the
   swap, committing partially-loaded temp databases into live state.
   Now both paths return Err immediately, leaving live databases
   untouched. The error includes byte offset and key count for
   diagnosis.

3. rdb::load_from_bytes — single-pass CRC scan (O(n) vs O(n²))

   The EOF_MARKER search was re-hashing data[0..=i] from scratch for
   each candidate byte. Now maintains a running crc32fast::Hasher
   updated byte-by-byte, cloning at each candidate position. On a
   10MB RDB preamble with k candidates, this reduces from O(n*k)
   hash bytes to O(n) total.

4. aof::aof_writer_task — bound manifest wait with cancel + timeout

   The monoio writer's manifest wait loop now checks the
   CancellationToken each iteration and enforces a 60s hard timeout.
   Previously if main.rs failed to create the manifest (disk full,
   permission error), the writer would spin forever, blocking graceful
   shutdown. Now exits cleanly with a diagnostic log.

Skipped: rdb.rs submodule split (1726 lines, above 1500 limit but
high-risk churn for a correctness PR — tracked for follow-up).

Validation (moon-dev):
  cargo fmt --check                                            ok
  cargo clippy --release / --runtime-tokio,jemalloc            ok
  cargo test --release --lib                                   1858 pass
  Smoke: 700 keys + INCR counter through BGREWRITEAOF + restart PASS
@pilotspacex-byte pilotspacex-byte deleted the feat/persistence-overhaul branch April 10, 2026 05:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant