feat(lore-0034): pool_type + event_count classifier for Phoenix XYK dispatch#31
Conversation
…ispatch Scaffold Tranche 1 Ledger Processor crates (extractors-core, phoenix-extractor, ledger-processor) and implement the multi-WASM tolerance fix: Phoenix pool registry keys by contract_id and routes swap events by (pool_type, event_count) — never by WASM hash. Both production XYK builds (167ab414…506c and 13b158655e…f2ca) are covered by fixture tests proving the PHO/USDC pool is not silently dropped from price feeds.
…stable-pool test - Add cargo fmt --all check to CI - Format all workspace crates (including sdex-backfill) - Remove empty [dev-dependencies] from phoenix-extractor - Extract shared test_fixtures module behind feature gate - Add dispatch_stable_pool_returns_error_unimplemented test
There was a problem hiding this comment.
Pull request overview
This PR scaffolds the initial “Tranche 1” Rust workspace crates for Soroban swap extraction/dispatch and implements Phoenix XYK multi-WASM tolerance by routing based on (pool_type, event_count) and registry lookup by contract_id (not WASM hash). It also adds a Rust CI workflow and applies rustfmt-driven formatting updates to sdex-backfill.
Changes:
- Add new crates:
extractors-core(shared types/traits),phoenix-extractor(Phoenix registry + XYK extractor),ledger-processor(venue + Phoenix shape dispatcher). - Implement Phoenix multi-WASM tolerance via
contract_idregistry keying and(pool_type, event_count)classifier, with fixture-based unit tests. - Add Rust CI workflow (
fmt/check/test/clippy) and format existing Rust code (notablysdex-backfill).
Reviewed changes
Copilot reviewed 22 out of 23 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/sdex-backfill/src/sync.rs | Rustfmt formatting of tracing logs and function signature. |
| packages/sdex-backfill/src/sink.rs | Rustfmt formatting of queries/logs and a match arm. |
| packages/sdex-backfill/src/run.rs | Rustfmt formatting around preflight/logging/spawn call. |
| packages/sdex-backfill/src/obs.rs | Rustfmt formatting of EnvFilter initialization. |
| packages/sdex-backfill/src/ingest.rs | Rustfmt formatting of logging and match arms. |
| packages/sdex-backfill/src/filter.rs | Rustfmt formatting of claim-to-trade call and match arm. |
| packages/sdex-backfill/src/canonical.rs | Rustfmt formatting of struct literal. |
| packages/sdex-backfill/src/bucket.rs | Rustfmt formatting of drain/map/collect pipeline. |
| packages/phoenix-extractor/src/xyk.rs | New Phoenix XYK (8-event) swap extractor + unit tests. |
| packages/phoenix-extractor/src/test_fixtures.rs | New shared test fixtures for Phoenix events and WASM hash variants. |
| packages/phoenix-extractor/src/registry.rs | New Phoenix pool registry keyed by contract_id with fixture helpers + tests. |
| packages/phoenix-extractor/src/lib.rs | New crate exports/constants and test-fixtures feature gating. |
| packages/phoenix-extractor/Cargo.toml | New crate manifest and test-fixtures feature. |
| packages/ledger-processor/src/main.rs | New stub binary entrypoint. |
| packages/ledger-processor/src/lib.rs | New library module export. |
| packages/ledger-processor/src/dispatch.rs | New venue dispatcher + Phoenix (pool_type, event_count) routing + tests. |
| packages/ledger-processor/Cargo.toml | New crate manifest and dev-dependency on Phoenix fixtures. |
| packages/extractors-core/src/lib.rs | New shared types (SorobanEventRow, TaggedValue, TradeRow), SwapExtractor, and errors. |
| packages/extractors-core/Cargo.toml | New crate manifest. |
| lore/1-tasks/active/0034_FEATURE_consumer-multi-xyk-wasm-tolerance.md | Document implementation notes/design decisions and update acceptance checklist. |
| Cargo.toml | Add new Rust crates to workspace members. |
| Cargo.lock | Lockfile updates for new workspace crates. |
| .github/workflows/rust.yml | Add Rust CI workflow (fmt/check/test/clippy). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let result = PhoenixXykExtractor.extract(rows)?; | ||
| Ok(result.trades) |
There was a problem hiding this comment.
By design — dispatch_phoenix receives a pre-grouped (transaction_id, contract_id) slice from the caller (one logical swap = one call). The upstream grouping logic (not yet implemented — arrives with the CH reader in task 0038) is responsible for chunking rows by tx+contract before calling dispatch. rows_consumed in ExtractResult is preserved so the future grouper can use it to advance through a multi-swap batch. No change needed here.
| match venue { | ||
| Some(Venue::Phoenix) => dispatch_phoenix(rows, phoenix_registry), | ||
| Some(Venue::Soroswap) => { | ||
| todo!("Soroswap extractor not yet implemented") | ||
| } | ||
| Some(Venue::Aquarius) => { | ||
| todo!("Aquarius extractor not yet implemented") | ||
| } | ||
| None => Ok(vec![]), |
There was a problem hiding this comment.
Resolved in 1e25f4b — replaced todo!() with Err(DispatchError::VenueNotImplemented { venue, contract_id }).
| let contract_id = &rows[0].contract_id; | ||
| let pool = registry | ||
| .lookup(contract_id) | ||
| .ok_or_else(|| DispatchError::UnknownPhoenixShape { | ||
| contract_id: contract_id.clone(), | ||
| pool_type: u32::MAX, | ||
| event_count: rows.len(), | ||
| })?; |
There was a problem hiding this comment.
Resolved in 1e25f4b — added a dedicated UnknownPool { contract_id } error variant, no more u32::MAX sentinel.
| for row in group { | ||
| let field_name = row | ||
| .topics | ||
| .get(1) | ||
| .and_then(|t| t.as_str()) | ||
| .ok_or(ExtractError::UnexpectedTopicShape(row.event_index))?; | ||
|
|
||
| match field_name { | ||
| "sender" => sender = row.data.as_address().map(|s| s.to_string()), | ||
| "sell_token" => sell_token = row.data.as_address().map(|s| s.to_string()), | ||
| "offer_amount" => offer_amount = row.data.as_i128(), | ||
| "buy_token" => buy_token = row.data.as_address().map(|s| s.to_string()), | ||
| "return_amount" => return_amount = row.data.as_i128(), | ||
| "actual received amount" | "spread_amount" | "referral_fee_amount" => {} | ||
| _ => {} | ||
| } |
There was a problem hiding this comment.
Good catch — adding a topics[0] == "swap" validation guard now.
There was a problem hiding this comment.
Resolved in e071f6a — added topics[0] == "swap" validation guard. Non-swap events now get rejected with UnexpectedTopicShape before field parsing.
…ariants Add UnknownPool and VenueNotImplemented error variants to DispatchError. Removes u32::MAX sentinel for unregistered pools and todo!() panics in the Soroswap/Aquarius dispatch arms.
Reject event rows where topics[0] is not "swap" before parsing the field name from topics[1]. Prevents non-swap events from being silently misinterpreted.
Summary
extractors-core,phoenix-extractor,ledger-processor) per ADR 0006 and task 0037 speccontract_idand routes swap events by(pool_type, event_count)— never by WASM hash167ab414…506c× 10 pools,13b158655e…f2ca× PHO/USDC) are covered by fixture tests proving the alt-WASM pool is not silently dropped from price feedscargo check,cargo test,cargo clippy)Crates
extractors-coreSwapExtractortrait,SorobanEventRow,TaggedValue,TradeRow,Venueenum (per 0018 Appendix A)phoenix-extractorPhoenixPoolRegistry+PhoenixXykExtractor(8-event grouping decoder)ledger-processordispatch()routing by venue →(pool_type, event_count)for PhoenixTest plan
cargo test -p phoenix-extractor— 8 tests (registry fixture lookup, both WASM variants, XYK decode, edge cases)cargo test -p ledger-processor— 5 tests (dispatch routing both pools, pool_type-not-hash proof, unknown venue, empty rows)cargo clippyclean on all three crates