Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: Rust CI

on:
pull_request:
paths:
- "packages/**"
- "Cargo.toml"
- "Cargo.lock"
- ".github/workflows/rust.yml"
push:
branches: [develop, master]
paths:
- "packages/**"
- "Cargo.toml"
- "Cargo.lock"
- ".github/workflows/rust.yml"

env:
CARGO_TERM_COLOR: always

jobs:
check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- uses: dtolnay/rust-toolchain@stable

- uses: Swatinem/rust-cache@v2

- name: cargo fmt
run: cargo fmt --all -- --check

- name: cargo check
run: cargo check --workspace

- name: cargo test
run: cargo test --workspace

# Scoped to new crates — sdex-backfill has pre-existing clippy issues.
- name: cargo clippy
run: cargo clippy -p extractors-core -p phoenix-extractor -p ledger-processor -- -D warnings
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
[workspace]
resolver = "3"
members = ["packages/sdex-backfill"]
members = [
"packages/sdex-backfill",
"packages/extractors-core",
"packages/phoenix-extractor",
"packages/ledger-processor",
]

[workspace.dependencies]
stellar-xdr = { version = "=26.0.0", features = ["curr"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,85 @@ and add a runtime warning if an unrecognized hash appears.

## Acceptance Criteria

- [ ] Consumer's Phoenix venue lookup does not silently drop pools
- [x] Consumer's Phoenix venue lookup does not silently drop pools
whose WASM hash differs from the most common XYK build.
- [ ] Classifier documented as `pool_type + event_count`, with a unit
- [x] Classifier documented as `pool_type + event_count`, with a unit
test covering both XYK pool variants by config-fixture.
- [ ] PHO/USDC swaps (pool `CD5XNKK3...IAA`) verified end-to-end
through the consumer in a staging run.
through the consumer in a staging run. (deferred — requires live environment)

## Implementation Notes

### Crates created

This task also delivered the 0037 skeleton as a prerequisite since no
consumer code existed. Three new workspace members under `packages/`:

| Crate | Path | Role |
|-------|------|------|
| `extractors-core` | `packages/extractors-core` | `SwapExtractor` trait, `SorobanEventRow`, `TaggedValue`, `TradeRow`, `Venue` enum — transcribed from 0018 Appendix A |
| `phoenix-extractor` | `packages/phoenix-extractor` | `PhoenixPoolRegistry` (contract_id → pool_type lookup) + `PhoenixXykExtractor` (8-event grouping decoder) |
| `ledger-processor` | `packages/ledger-processor` | lib + stub binary; `dispatch()` routes by venue, then `(pool_type, event_count)` for Phoenix |

### Classifier design

`PhoenixPoolRegistry` keys lookup by **contract_id** and stores
`pool_type: u32` from the factory's `query_config()`. WASM hash is
stored as `Option<[u8; 32]>` metadata but is **never consulted for
extractor selection**. Routing logic in `dispatch_phoenix()`:

- `pool_type == 0` AND `rows.len() >= 8` → `PhoenixXykExtractor`
- `pool_type != 0` AND `rows.len() >= 6` → stable path (stub, no
mainnet stable pools exist yet per 0032)

This survives future Phoenix XYK rebuilds without code changes.

### Tests (14 total)

**phoenix-extractor (8 tests):**
- Registry fixture construction + lookup for both WASM variants
- Proof that different WASM hashes both resolve as XYK via pool_type
- XYK extractor: 8-event group decode, PHO/USDC alt-WASM pool,
insufficient rows rejection, unordered field tolerance

**ledger-processor (6 tests):**
- Dispatch routes XLM/USDC (common WASM) correctly
- Dispatch routes PHO/USDC (alt WASM) identically
- Explicit proof that dispatch uses pool_type, not WASM hash
- Stable pool (pool_type != 0) returns error (intentionally unimplemented)
- Unknown venue skipped, empty rows return empty

### CI

Added `.github/workflows/rust.yml` — runs `cargo fmt`, `cargo check`,
`cargo test`, `cargo clippy` on PRs touching `packages/` or `Cargo.*`.

## Design Decisions

### From Plan

1. **`pool_type + event_count` classifier**: per 0032 S-note §"So what?"
recommendation. WASM hash stored but never used for routing.

2. **Per-venue extractor trait**: `SwapExtractor` with
`extract(&[SorobanEventRow]) -> ExtractResult` per 0018 Appendix A.

### Emerged

3. **Absorbed 0037 skeleton into this task**: no consumer code existed,
so the 0037 crate layout was a prerequisite. Built the minimum
skeleton (3 crates) needed for 0034's classifier to compile and test.

4. **Field-name-based extraction over positional**: the XYK extractor
matches fields by `topic[1]` string name rather than relying on
emission order. This tolerates reordered events within a group
(tested explicitly).

5. **`TaggedValue` enum for CH-level data**: models BE's tagged-JSON
encoding (`type` + `value`) from `R-be-storage-format.md` rather
than raw XDR `ScVal`. This is what the consumer actually reads from
ClickHouse.

6. **Scoped clippy in CI**: runs clippy only on the three new crates,
not workspace-wide, because `sdex-backfill` has pre-existing clippy
issues unrelated to this task.
8 changes: 8 additions & 0 deletions packages/extractors-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "extractors-core"
version = "0.1.0"
edition = "2024"
description = "Per-venue swap extractor trait and shared types for the Tranche 1 Ledger Processor"

[dependencies]
thiserror = { workspace = true }
89 changes: 89 additions & 0 deletions packages/extractors-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::collections::HashMap;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Venue {
Soroswap,
Aquarius,
Phoenix,
}

#[derive(Debug, Clone)]
pub struct SorobanEventRow {
pub contract_id: String,
pub transaction_id: String,
pub ledger_sequence: u64,
pub event_index: u32,
pub topics: Vec<TaggedValue>,
pub data: TaggedValue,
}

#[derive(Debug, Clone, PartialEq)]
pub enum TaggedValue {
Symbol(String),
String(String),
Address(String),
I128(i128),
Map(Vec<(TaggedValue, TaggedValue)>),
Vec(Vec<TaggedValue>),
Null,
}

impl TaggedValue {
pub fn as_str(&self) -> Option<&str> {
match self {
TaggedValue::Symbol(s) | TaggedValue::String(s) | TaggedValue::Address(s) => Some(s),
_ => None,
}
}

pub fn as_i128(&self) -> Option<i128> {
match self {
TaggedValue::I128(v) => Some(*v),
_ => None,
}
}

pub fn as_address(&self) -> Option<&str> {
match self {
TaggedValue::Address(s) => Some(s),
_ => None,
}
}
}

#[derive(Debug, Clone)]
pub struct TradeRow {
pub venue: Venue,
pub contract_id: String,
pub transaction_id: String,
pub ledger_sequence: u64,
pub first_event_index: u32,
pub token_in: String,
pub token_out: String,
pub amount_in: i128,
pub amount_out: i128,
pub fee: Option<i128>,
pub trader: Option<String>,
}

#[derive(Debug)]
pub struct ExtractResult {
pub trades: Vec<TradeRow>,
pub rows_consumed: usize,
}

pub trait SwapExtractor {
fn extract(&self, rows: &[SorobanEventRow]) -> Result<ExtractResult, ExtractError>;
}

#[derive(Debug, thiserror::Error)]
pub enum ExtractError {
#[error("not enough rows: need {expected}, got {actual}")]
InsufficientRows { expected: usize, actual: usize },
#[error("missing field in event group: {0}")]
MissingField(String),
#[error("unexpected topic shape in row at event_index {0}")]
UnexpectedTopicShape(u32),
}

pub type VenueRegistry = HashMap<String, Venue>;
21 changes: 21 additions & 0 deletions packages/ledger-processor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "ledger-processor"
version = "0.1.0"
edition = "2024"
description = "Tranche 1 Ledger Processor — dispatches Soroban swap events to per-venue extractors"

[lib]
name = "ledger_processor"
path = "src/lib.rs"

[[bin]]
name = "ledger-processor"
path = "src/main.rs"

[dependencies]
extractors-core = { path = "../extractors-core" }
phoenix-extractor = { path = "../phoenix-extractor" }
thiserror = { workspace = true }

[dev-dependencies]
phoenix-extractor = { path = "../phoenix-extractor", features = ["test-fixtures"] }
Loading
Loading