Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Zig helpers for the wire formats of **[ethp2p](https://github.com/ethp2p/ethp2p)
| `PartialMessagesExtension` (nested in `RPC.partial`) | libp2p `rpc.proto` field 10 body | `encodePartialMessagesExtension`, `decodePartialMessagesExtensionOwned` |
| Unsigned-varint length prefix before `RPC` body | common libp2p framing | `encodeRpcLengthPrefixed`, `decodeRpcLengthPrefixedPrefix` |
| In-process duplex for length-prefixed `RPC` (simnet-style, no TCP/QUIC) | pair of `Endpoint`s over bounded byte queues | `sim.gossipsub_rpc_host` (`Link`, `Endpoint.sendRpc` / `recvRpcOwned`) |
| QUIC transport + UNI stream alignment | [`sim/host.go`](https://github.com/ethp2p/ethp2p/blob/main/sim/host.go), `peer.go`, `peer_ctrl.go`, `peer_in.go` | `transport.eth_ec_quic`: IETF QUIC, TLS 1.3, ALPN `eth-ec-broadcast`, **unidirectional** BCAST/SESS/CHUNK streams matching `OpenUniStream`/`AcceptUniStream`; `PeerConn` lifecycle sketch. See [QUIC transport](#quic-transport-lsquic-build). |
| QUIC transport + UNI stream alignment | [`sim/host.go`](https://github.com/ethp2p/ethp2p/blob/main/sim/host.go), `peer.go`, `peer_ctrl.go`, `peer_in.go` | `transport.eth_ec_quic`: IETF QUIC, TLS 1.3, ALPN `eth-ec-broadcast`, **unidirectional** BCAST/SESS/CHUNK streams matching `OpenUniStream`/`AcceptUniStream`; `PeerConn` + `broadcast.engine_quic` (`EngineQuicHost`) wire inbound SESS/CHUNK into `Engine` / `ChannelRs`. See [QUIC transport](#quic-transport-lsquic-build). |
| **Still open** | — | [Pending work](#pending-work) |

## Scope on `main` (at a glance)
Expand All @@ -52,7 +52,7 @@ This is **what is already implemented** — not the backlog. Per-module mapping
- **EC scheme id:** `layer.ec_scheme` (`EcSchemeKind`, `"reed-solomon"` wire name); only Reed–Solomon is wired end-to-end ([#14](https://github.com/ch4r10t33r/zig-ethp2p/issues/14) tracks RLNC and further schemes).
- **Abstract RS mesh:** heap-backed graphs and `PeerSessionStats` (`sim.rs_mesh`): 2-node, 4-node ring, 6-node `TestNetwork`-style topology, **partition/heal** line test, chunk-len variant; with `ZIG_ETHP2P_STRESS=1`, larger six-node budget plus **8-** and **16-node** rings.
- **Gossipsub (sim / wire helpers):** transport, protocol, broadcast, interop, `RPC` encode/decode (including **`partial`** / `PartialMessagesExtension`), full `ControlMessage`, varint length prefix, in-process **`gossipsub_rpc_host`** for tests (`sim.gossipsub_*`, `broadcast.gossip`).
- **QUIC:** `transport.eth_ec_quic` — IETF QUIC, TLS 1.3, ALPN `eth-ec-broadcast`, **unidirectional** BCAST/SESS/CHUNK streams matching the ethp2p Go reference (`peer.go` `OpenUniStream`/`AcceptUniStream`). `PeerConn` poll-driven lifecycle sketch included. See [QUIC transport](#quic-transport-lsquic-build).
- **QUIC:** `transport.eth_ec_quic` — IETF QUIC, TLS 1.3, ALPN `eth-ec-broadcast`, **unidirectional** BCAST/SESS/CHUNK streams matching the ethp2p Go reference (`peer.go` `OpenUniStream`/`AcceptUniStream`). `PeerConn` is poll-driven; `broadcast.engine_quic.EngineQuicHost` connects inbound streams to the RS channel path. See [QUIC transport](#quic-transport-lsquic-build).
- **CI:** aligned with [ethp2p's `ci.yml`](https://github.com/ethp2p/ethp2p/blob/main/.github/workflows/ci.yml): `zig build test-broadcast`, `test-sim-rs`, `test-sim-gossipsub` (Debug + TSan), `test-quic` (**`quic-transport`** job: vendored TLS, **45m** job timeout + **`timeout 40m`** on the command so a hung poll loop cannot exhaust the runner), `test-stress-ci` on **`main` only**, plus lint (`zig fmt --check`, `zig build`, `zig ast-check`). `build.zig.zon` **`minimum_zig_version`** must match workflow **`ZIG_VERSION`**; `just check-zig-ci-align` checks that locally.
- **One-shot local verification:** `zig build test` runs the full suite.

Expand All @@ -69,6 +69,7 @@ lsquic + BoringSSL are always compiled — no build flag is needed. **Windows**
| `eth_ec_quic_common.zig` / `eth_ec_quic_enabled.zig` | Shared config and ALPN string; **enabled** path implements `listenImpl` / `dialImpl` and integration tests. |
| `eth_ec_quic.zig` | Public `transport.eth_ec_quic`: `listen`, `dial`, `logInit` (programmatic lsquic logger), listener wrapper. |
| `eth_ec_quic_peer.zig` | `PeerConn` poll-driven state machine: `idle → handshaking → active → closed`; symmetric BCAST UNI handshake + `runAcceptLoop` dispatch by protocol selector byte. |
| `broadcast/engine_quic.zig` | `EngineQuicHost`: SESS `session_open` → `ChannelRs.attachRelaySession`, CHUNK → `relayIngestChunkVerifiedEngine` (issue #37). |
| `vendor/lsquic_zig/patch_uni.sh` | Build-time patch that removes `static` from `create_uni_stream_out` in lsquic and appends a public `lsquic_conn_make_uni_stream()` wrapper (lsquic 4.3 has no public API for outgoing UNI streams). |

**Operation**
Expand All @@ -90,7 +91,7 @@ Call **`quic.logInit("debug")`** (or any `lsquic_set_log_level` level) to enable

## Pending work

- **`PeerConn` wiring:** connect `eth_ec_quic_peer.zig` to `Engine`/channel tables (currently a lifecycle sketch only).
- **Outbound chunk path:** origin `ChannelRs` publish/drain over QUIC (sending CHUNK streams to peers) is not wired yet; inbound relay via `EngineQuicHost` is implemented (#37).
- **Erasure coding:** **RLNC** (strategy, preamble, chunk layout) and any further `Scheme` types beyond Reed–Solomon ([#14](https://github.com/ch4r10t33r/zig-ethp2p/issues/14)).
- **libp2p:** Noise handshake, multistream-select, Yamux, identify — handled by zeam's rust-libp2p layer; out of scope for this repo.

Expand Down
2 changes: 1 addition & 1 deletion UPSTREAM.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Zig alignment:
- `lsquic_quic_shim.zig` detects stream type via `lsquic_stream_id() & 0x2` (bit 1 = unidirectional per RFC 9000 §2.1)
- `incoming_uni_streams` queue holds peer-initiated UNI streams; `tryAcceptIncomingUniStream` pops them
- `streamMakeUni` opens an outgoing UNI stream via `lsquic_conn_make_uni_stream`
- `eth_ec_quic_peer.zig` sketches the `PeerConn` poll-driven state machine (handshake + accept-loop)
- `eth_ec_quic_peer.zig` implements the `PeerConn` poll-driven state machine (handshake + accept-loop); `broadcast/engine_quic.zig` (`EngineQuicHost`) forwards inbound SESS/CHUNK into `Engine` / `ChannelRs` (#37)

### lsquic vendor patch

Expand Down
5 changes: 5 additions & 0 deletions src/broadcast/engine.zig
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ pub const Engine = struct {
try self.channels.put(self.allocator, key, ch);
return ch;
}

/// Look up an RS channel by id (e.g. QUIC `EngineQuicHost` inbound routing).
pub fn channelRs(self: *Engine, channel_id: []const u8) ?*ChannelRs {
return self.channels.get(channel_id);
}
};

pub const Error = error{ChannelExists};
204 changes: 204 additions & 0 deletions src/broadcast/engine_quic.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
//! Wire `transport/eth_ec_quic_peer.zig` `PeerConn` inbound SESS/CHUNK streams into
//! `broadcast/engine.zig` `Engine` / `ChannelRs` (issue #37).
//!
//! After TLS + BCAST handshake, call `wireEngine`, then `finishBcastHandshakeRead`
//! to capture the peer id. Drive `quic.poll` on both endpoints, then `PeerConn.drive`
//! (via `engineQuicDrive`) so inbound SESS opens relay sessions and CHUNK frames
//! call `relayIngestChunkVerifiedEngine`.

const std = @import("std");
const quic = @import("quic");
const peer_mod = @import("../transport/eth_ec_quic_peer.zig");
const Engine = @import("engine.zig").Engine;
const rs_strategy = @import("../layer/rs_strategy.zig");
const chunk_stream = @import("../wire/chunk_stream.zig");
const sess_stream = @import("../wire/sess_stream.zig");
const bcast_stream = @import("../wire/bcast_stream.zig");
const protocol = @import("../wire/protocol.zig");
const wire_rs = @import("../wire/rs.zig");

const PeerConn = peer_mod.PeerConn;

/// Bridges one QUIC `PeerConn` to an `Engine` for inbound RS relay traffic.
pub const EngineQuicHost = struct {
engine: *Engine,
allocator: std.mem.Allocator,
peer: PeerConn,
/// Set by `finishBcastHandshakeRead` from the peer's BCAST `peer_handshake`.
remote_peer_id: []u8 = &.{},
/// Peer endpoint to co-poll while draining UNI streams (loopback tests).
peer_ep: ?*quic.QuicEndpoint = null,

pub fn init(allocator: std.mem.Allocator, engine: *Engine, conn: *quic.QuicConnection, ep: *quic.QuicEndpoint) EngineQuicHost {
return .{
.engine = engine,
.allocator = allocator,
.peer = PeerConn.init(allocator, conn, ep),
};
}

pub fn deinit(self: *EngineQuicHost) void {
self.peer.close();
if (self.remote_peer_id.len != 0) {
self.allocator.free(self.remote_peer_id);
}
}

/// Install SESS/CHUNK handlers that forward into `Engine` channels.
pub fn wireEngine(self: *EngineQuicHost) void {
self.peer.user_data = @ptrCast(self);
self.peer.on_sess_stream = engineQuicOnSessStream;
self.peer.on_chunk_stream = engineQuicOnChunkStream;
}

pub fn setPeerEndpoint(self: *EngineQuicHost, peer_ep: ?*quic.QuicEndpoint) void {
self.peer_ep = peer_ep;
}

/// After `PeerConn` reaches `.active`, read our inbound BCAST stream and
/// store the peer's `peer_id` (for chunk/session relay attribution).
pub fn finishBcastHandshakeRead(self: *EngineQuicHost) !void {
const st = self.peer.bcast_in orelse return error.MissingBcastIn;
const buf = try drainUniStream(self.allocator, st, self.peer.ep, self.peer_ep);
defer self.allocator.free(buf);
var fbs = std.io.fixedBufferStream(buf);
var owned = try bcast_stream.readBcastPeerHandshake(self.allocator, fbs.reader());
defer owned.deinit(self.allocator);
switch (owned) {
.peer_handshake => |h| {
if (self.remote_peer_id.len != 0) self.allocator.free(self.remote_peer_id);
self.remote_peer_id = try self.allocator.dupe(u8, h.peer_id);
},
else => return error.ExpectedPeerHandshake,
}
}

pub fn drive(self: *EngineQuicHost) bool {
return self.peer.drive();
}
};

fn engineQuicOnSessStream(user_data: ?*anyopaque, pc: *PeerConn, st: *quic.QuicStream) void {
_ = pc;
const host: *EngineQuicHost = @ptrCast(@alignCast(user_data orelse return));
handleSessStream(host, st) catch {};
}

fn engineQuicOnChunkStream(user_data: ?*anyopaque, pc: *PeerConn, st: *quic.QuicStream) void {
_ = pc;
const host: *EngineQuicHost = @ptrCast(@alignCast(user_data orelse return));
handleChunkStream(host, st) catch {};
}

fn preambleOwnedToRs(allocator: std.mem.Allocator, owned: wire_rs.PreambleOwned) (std.mem.Allocator.Error || error{InvalidPreambleHash})!rs_strategy.RsPreamble {
if (owned.hash.len != 32) {
var o = owned;
o.deinit(allocator);
return error.InvalidPreambleHash;
}
var msg_hash: [32]u8 = undefined;
@memcpy(&msg_hash, owned.hash);

const n = owned.hashes.len;
const nd = owned.num_data;
const np = owned.num_parity;
const ml = owned.length;

const hashes = try allocator.alloc([]u8, n);
errdefer {
for (hashes) |row| allocator.free(row);
allocator.free(hashes);
}
for (owned.hashes, 0..) |h, i| {
hashes[i] = try allocator.dupe(u8, h);
}
var o = owned;
o.deinit(allocator);

return .{
.data_chunks = nd,
.parity_chunks = np,
.message_length = ml,
.chunk_hashes = hashes,
.message_hash = msg_hash,
};
}

fn handleSessStream(host: *EngineQuicHost, st: *quic.QuicStream) !void {
const buf = try drainUniStream(host.allocator, st, host.peer.ep, host.peer_ep);
defer host.allocator.free(buf);
var fbs = std.io.fixedBufferStream(buf);
const r = fbs.reader();
const sel = try protocol.readSelectorByte(r);
if (sel != .sess) return;
var open_msg = try sess_stream.readSessSessionOpenAfterSelector(host.allocator, r);
defer open_msg.deinit(host.allocator);
const open = switch (open_msg) {
.session_open => |o| o,
else => return,
};

const preamble_owned = try wire_rs.decodePreamble(host.allocator, open.preamble);
var rs_pre = try preambleOwnedToRs(host.allocator, preamble_owned);
errdefer rs_pre.deinit(host.allocator);

const ch = host.engine.channelRs(open.channel) orelse return error.UnknownChannel;
try ch.attachRelaySession(open.message_id, &rs_pre);
rs_pre.deinit(host.allocator);
}

fn handleChunkStream(host: *EngineQuicHost, st: *quic.QuicStream) !void {
if (host.remote_peer_id.len == 0) return error.MissingRemotePeerId;

const buf = try drainUniStream(host.allocator, st, host.peer.ep, host.peer_ep);
defer host.allocator.free(buf);
var fbs = std.io.fixedBufferStream(buf);
var chunk_in = try chunk_stream.readChunkStream(host.allocator, fbs.reader());
defer chunk_in.deinit(host.allocator);

const ch = host.engine.channelRs(chunk_in.header.channel) orelse return error.UnknownChannel;

const ident = try wire_rs.decodeChunkIdent(host.allocator, chunk_in.header.chunk_id);

_ = try ch.relayIngestChunkVerifiedEngine(
chunk_in.header.message_id,
host.remote_peer_id,
.{ .index = ident.index },
chunk_in.payload,
null,
);
}

/// Poll until a UNI stream's buffered length stabilizes, then copy out.
fn drainUniStream(
allocator: std.mem.Allocator,
st: *quic.QuicStream,
ep: *quic.QuicEndpoint,
peer: ?*quic.QuicEndpoint,
) ![]u8 {
var i: u32 = 0;
while (i < 30_000) : (i += 1) {
try quic.poll(ep, 0);
if (peer) |p| try quic.poll(p, 0);
const raw = quic.streamReadSlice(st);
if (raw.len == 0) continue;

var last = raw.len;
var stable: u32 = 0;
var j: u32 = 0;
while (j < 2_000) : (j += 1) {
try quic.poll(ep, 0);
if (peer) |p| try quic.poll(p, 0);
const r2 = quic.streamReadSlice(st);
if (r2.len == last) {
stable += 1;
if (stable >= 2) return try allocator.dupe(u8, r2);
} else {
last = r2.len;
stable = 0;
}
}
return try allocator.dupe(u8, quic.streamReadSlice(st));
}
return error.StreamDrainTimeout;
}
3 changes: 3 additions & 0 deletions src/root.zig
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ pub const layer = struct {
/// QUIC listen/dial — see `transport/eth_ec_quic.zig` and README; mapping streams to `wire.*` is issue **#27**.
pub const transport = struct {
pub const eth_ec_quic = @import("transport/eth_ec_quic.zig");
pub const eth_ec_quic_peer = @import("transport/eth_ec_quic_peer.zig");
pub const shared_udp_socket = @import("transport/shared_udp_socket.zig");
};

pub const broadcast = struct {
pub const observer = @import("broadcast/observer.zig");
pub const engine = @import("broadcast/engine.zig");
pub const engine_quic = @import("broadcast/engine_quic.zig");
pub const channel_rs = @import("broadcast/channel_rs.zig");
pub const session_rs = @import("broadcast/session_rs.zig");
pub const gossip = @import("broadcast/gossip.zig");
Expand Down Expand Up @@ -75,5 +77,6 @@ test {
_ = broadcast.observer;
_ = broadcast.gossip;
_ = broadcast.relay_async_verify;
_ = broadcast.engine_quic;
_ = discovery;
}
Loading
Loading