Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ All ethp2p application protocols use UNI streams — both peers independently op

## Pending work

- **Outbound chunk path:** origin `ChannelRs` publish/drain over QUIC (sending CHUNK streams to peers) is not wired yet; inbound relay via `EngineQuicHost` is implemented (#37).
- **Outbound chunk path:** use `broadcast.engine_quic.peerSendRsChunk` with `ChannelRs.sessionDrainOutboundOverQuic` / `SessionRs.drainOutboundOverQuic` (integration test in `eth_ec_quic_enabled.zig`). Inbound relay via `EngineQuicHost` is implemented (#37).
- **Erasure coding:** **RLNC** (strategy, preamble, chunk layout) and any further `Scheme` types beyond Reed–Solomon ([#14](https://github.com/ch4r10t33r/zig-ethp2p/issues/14)).
- **libp2p:** Noise handshake, multistream-select, Yamux, identify — handled by zeam's rust-libp2p layer; out of scope for this repo.

Expand Down
36 changes: 25 additions & 11 deletions src/broadcast/channel_rs.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ const std = @import("std");
const broadcast_types = @import("../layer/broadcast_types.zig");
const dedup_mod = @import("../layer/dedup.zig");
const dedup_registry_mod = @import("../layer/dedup_registry.zig");
const emit_planner = @import("../layer/emit_planner.zig");
const rs_init = @import("../layer/rs_init.zig");
const rs_strategy = @import("../layer/rs_strategy.zig");
const Engine = @import("engine.zig").Engine;
const errors = @import("errors.zig");
const SendRsChunkFn = @import("session_rs.zig").SendRsChunkFn;
const SessionRs = @import("session_rs.zig").SessionRs;

const Allocator = std.mem.Allocator;
Expand Down Expand Up @@ -71,7 +74,7 @@ pub const ChannelRs = struct {

/// Origin session: encodes `payload` and attaches current members.
pub fn publish(self: *ChannelRs, message_id: []const u8, payload: []const u8) !void {
if (self.sessions.get(message_id) != null) return error.DuplicateMessage;
if (self.sessions.get(message_id) != null) return error.InvalidMessage;

const mid = try self.allocator.dupe(u8, message_id);
errdefer self.allocator.free(mid);
Expand Down Expand Up @@ -113,7 +116,7 @@ pub const ChannelRs = struct {

/// Relay-side session for an existing preamble (same members as `publish`).
pub fn attachRelaySession(self: *ChannelRs, message_id: []const u8, preamble: *const rs_strategy.RsPreamble) !void {
if (self.sessions.get(message_id) != null) return error.DuplicateMessage;
if (self.sessions.get(message_id) != null) return error.InvalidMessage;

const mid = try self.allocator.dupe(u8, message_id);
errdefer self.allocator.free(mid);
Expand Down Expand Up @@ -153,12 +156,23 @@ pub const ChannelRs = struct {
}

pub fn sessionDrainOutbound(self: *ChannelRs, message_id: []const u8) !usize {
const slot = self.sessions.getPtr(message_id) orelse return error.UnknownMessage;
const slot = self.sessions.getPtr(message_id) orelse return error.InvalidMessage;
return slot.*.drainOutbound();
}

/// Same as [`SessionRs.drainOutboundOverQuic`](`SessionRs.drainOutboundOverQuic`) for `message_id` on this channel.
pub fn sessionDrainOutboundOverQuic(
self: *ChannelRs,
message_id: []const u8,
ctx: *anyopaque,
send_chunk: SendRsChunkFn,
) (Allocator.Error || emit_planner.PlannerError || anyerror)!usize {
const slot = self.sessions.getPtr(message_id) orelse return error.InvalidMessage;
return slot.*.drainOutboundOverQuic(self.id, ctx, send_chunk);
}

pub fn sessionDecode(self: *ChannelRs, message_id: []const u8) ![]u8 {
const slot = self.sessions.getPtr(message_id) orelse return error.UnknownMessage;
const slot = self.sessions.getPtr(message_id) orelse return error.InvalidMessage;
return slot.*.strategy.decode();
}

Expand Down Expand Up @@ -186,8 +200,8 @@ pub const ChannelRs = struct {
chunk_id: rs_strategy.ChunkIdent,
data: []const u8,
dedup: ?*broadcast_types.DedupCancel,
) (Allocator.Error || error{UnknownMessage})!broadcast_types.ChunkIngestResult {
const strat = self.sessionStrategy(message_id) orelse return error.UnknownMessage;
) (Allocator.Error || errors.Error)!broadcast_types.ChunkIngestResult {
const strat = self.sessionStrategy(message_id) orelse return error.InvalidMessage;
if (registry) |reg| {
const first = try reg.claim(self.allocator, self.id, message_id, chunk_id.index);
if (!first) {
Expand All @@ -205,7 +219,7 @@ pub const ChannelRs = struct {
chunk_id: rs_strategy.ChunkIdent,
data: []const u8,
dedup: ?*broadcast_types.DedupCancel,
) (Allocator.Error || error{UnknownMessage})!broadcast_types.ChunkIngestResult {
) (Allocator.Error || errors.Error)!broadcast_types.ChunkIngestResult {
return self.relayIngestChunk(self.engine.dedupRegistryPtr(), message_id, peer, chunk_id, data, dedup);
}

Expand All @@ -218,8 +232,8 @@ pub const ChannelRs = struct {
chunk_id: rs_strategy.ChunkIdent,
data: []const u8,
dedup: ?*broadcast_types.DedupCancel,
) (Allocator.Error || error{UnknownMessage})!broadcast_types.ChunkIngestResult {
const strat = self.sessionStrategy(message_id) orelse return error.UnknownMessage;
) (Allocator.Error || errors.Error)!broadcast_types.ChunkIngestResult {
const strat = self.sessionStrategy(message_id) orelse return error.InvalidMessage;
const v = strat.verifyChunk(chunk_id, data);
if (v != .accepted) {
return .{ .verdict = v, .complete = false };
Expand All @@ -235,7 +249,7 @@ pub const ChannelRs = struct {
chunk_id: rs_strategy.ChunkIdent,
data: []const u8,
dedup: ?*broadcast_types.DedupCancel,
) (Allocator.Error || error{UnknownMessage})!broadcast_types.ChunkIngestResult {
) (Allocator.Error || errors.Error)!broadcast_types.ChunkIngestResult {
return self.relayIngestChunkVerified(self.engine.dedupRegistryPtr(), message_id, peer, chunk_id, data, dedup);
}
};
Expand Down Expand Up @@ -323,7 +337,7 @@ test "relayIngestChunk unknown message" {

const ch = try eng.attachChannelRs("topic", cfg);
try std.testing.expectError(
error.UnknownMessage,
error.InvalidMessage,
ch.relayIngestChunk(eng.dedupRegistryPtr(), "missing", "peer", .{ .index = 0 }, &.{}, null),
);
}
Expand Down
7 changes: 4 additions & 3 deletions src/broadcast/engine.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
const std = @import("std");
const ChannelRs = @import("channel_rs.zig").ChannelRs;
const dedup_registry_mod = @import("../layer/dedup_registry.zig");
const errors = @import("errors.zig");
const observer_mod = @import("observer.zig");

const Allocator = std.mem.Allocator;

pub const Error = errors.Error;

pub const EngineConfig = struct {
observer: observer_mod.Observer = .{},
/// When set, `Engine` owns a `DedupRegistry` for `relayIngestChunk`-style helpers.
Expand Down Expand Up @@ -64,7 +67,7 @@ pub const Engine = struct {
self: *Engine,
channel_id: []const u8,
cfg: @import("../layer/rs_init.zig").RsConfig,
) !*ChannelRs {
) (Allocator.Error || Error)!*ChannelRs {
if (self.channels.get(channel_id) != null) return error.ChannelExists;
const key = try self.allocator.dupe(u8, channel_id);
errdefer self.allocator.free(key);
Expand All @@ -80,5 +83,3 @@ pub const Engine = struct {
return self.channels.get(channel_id);
}
};

pub const Error = error{ChannelExists};
36 changes: 30 additions & 6 deletions src/broadcast/engine_quic.zig
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
//! Wire `transport/eth_ec_quic_peer.zig` `PeerConn` inbound SESS/CHUNK streams into
//! `broadcast/engine.zig` `Engine` / `ChannelRs` (issue #37).
//! Wire `transport/eth_ec_quic_peer.zig` `PeerConn` to `Engine` / `ChannelRs`:
//! - **Inbound:** SESS/CHUNK into relay ingest (#37).
//! - **Outbound:** `peerSendRsChunk` sends origin RS shards on new CHUNK UNI streams; pair with
//! `SessionRs.drainOutboundOverQuic` / `ChannelRs.sessionDrainOutboundOverQuic`.
//!
//! After TLS + BCAST handshake, call `wireEngine`, then `finishBcastHandshakeRead`
//! to capture the peer id. Drive `quic.poll` on both endpoints, then `PeerConn.drive`
//! (via `engineQuicDrive`) so inbound SESS opens relay sessions and CHUNK frames
//! call `relayIngestChunkVerifiedEngine`.
//! so inbound SESS opens relay sessions and CHUNK frames call `relayIngestChunkVerifiedEngine`.

const std = @import("std");
const quic = @import("quic");
const errors = @import("errors.zig");
const peer_mod = @import("../transport/eth_ec_quic_peer.zig");
const Engine = @import("engine.zig").Engine;
const rs_strategy = @import("../layer/rs_strategy.zig");
Expand Down Expand Up @@ -142,7 +144,7 @@ fn handleSessStream(host: *EngineQuicHost, st: *quic.QuicStream) !void {
var rs_pre = try preambleOwnedToRs(host.allocator, preamble_owned);
errdefer rs_pre.deinit(host.allocator);

const ch = host.engine.channelRs(open.channel) orelse return error.UnknownChannel;
const ch = host.engine.channelRs(open.channel) orelse return error.ChannelNotFound;
try ch.attachRelaySession(open.message_id, &rs_pre);
rs_pre.deinit(host.allocator);
}
Expand All @@ -156,7 +158,7 @@ fn handleChunkStream(host: *EngineQuicHost, st: *quic.QuicStream) !void {
var chunk_in = try chunk_stream.readChunkStream(host.allocator, fbs.reader());
defer chunk_in.deinit(host.allocator);

const ch = host.engine.channelRs(chunk_in.header.channel) orelse return error.UnknownChannel;
const ch = host.engine.channelRs(chunk_in.header.channel) orelse return error.ChannelNotFound;

const ident = try wire_rs.decodeChunkIdent(host.allocator, chunk_in.header.chunk_id);

Expand Down Expand Up @@ -202,3 +204,25 @@ fn drainUniStream(
}
return error.StreamDrainTimeout;
}

/// Open a new outbound UNI stream and write one RS shard CHUNK (`wire/chunk_stream.writeRsShardChunk`).
/// `poll_peer` is the remote QUIC endpoint to co-poll (same as other `eth_ec_quic_enabled` tests).
pub fn peerSendRsChunk(
pc: *PeerConn,
poll_peer: ?*quic.QuicEndpoint,
channel_id: []const u8,
message_id: []const u8,
shard_index: i32,
payload: []const u8,
) (std.mem.Allocator.Error || errors.Error)!void {
const st = quic.streamMakeUni(pc.conn, poll_peer) catch return error.ChunkWriteFail;
var buf = std.ArrayList(u8).empty;
defer buf.deinit(pc.allocator);
{
const w = buf.writer(pc.allocator);
chunk_stream.writeRsShardChunk(w, pc.allocator, channel_id, message_id, shard_index, payload) catch return error.ChunkMarshal;
}
try quic.streamQueueWrite(st, buf.items);
const peer_ep = poll_peer orelse pc.ep;
quic.streamDrainWrites(st, peer_ep, 10_000) catch return error.ChunkWriteFail;
}
30 changes: 30 additions & 0 deletions src/broadcast/errors.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//! Stable error names aligned with ethp2p [`broadcast/errors.go`](https://github.com/ethp2p/ethp2p/blob/main/broadcast/errors.go).

pub const Error = error{
EngineClosed,
ChannelExists,
ChannelNotFound,
PeerExists,
PeerNotFound,
InvalidMessage,
ProtocolMismatch,
UnexpectedMsgType,
AlreadySubscribed,
UnbufferedSubscription,
ChunkMarshal,
ChunkPeerGone,
ChunkSlotFull,
ChunkWriteFail,
ChunkCancelled,
SessionClosing,
};

/// Mirrors `ChunkProcessError` in the reference (peer + channel + message + wrapped cause).
pub fn ChunkProcessError(comptime PeerId: type, comptime ChannelId: type, comptime MessageId: type) type {
return struct {
peer: PeerId,
channel_id: ChannelId,
message_id: MessageId,
err: anyerror,
};
}
8 changes: 4 additions & 4 deletions src/broadcast/relay_async_verify.zig
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const rs_strategy = @import("../layer/rs_strategy.zig");
const verify_queue_mod = @import("../layer/verify_queue.zig");
const verify_workers_mod = @import("../layer/verify_workers.zig");
const ChannelRs = @import("channel_rs.zig").ChannelRs;
const errors = @import("errors.zig");

const Allocator = std.mem.Allocator;

Expand All @@ -33,8 +34,7 @@ pub const RelayAsyncVerifier = struct {
dedup: ?*broadcast_types.DedupCancel,
};

pub const Error = Allocator.Error || error{
UnknownMessage,
pub const Error = Allocator.Error || errors.Error || error{
InvalidChunkIndex,
OrphanVerifyRecord,
SystemResources,
Expand Down Expand Up @@ -106,7 +106,7 @@ pub const RelayAsyncVerifier = struct {
data: []const u8,
dedup: ?*broadcast_types.DedupCancel,
) Error!void {
const strat = self.channel.sessionStrategy(message_id) orelse return error.UnknownMessage;
const strat = self.channel.sessionStrategy(message_id) orelse return error.InvalidMessage;
const idx_i = chunk_id.index;
if (idx_i < 0) return error.InvalidChunkIndex;
const idx: usize = @intCast(idx_i);
Expand Down Expand Up @@ -159,7 +159,7 @@ pub const RelayAsyncVerifier = struct {
data: []const u8,
dedup: ?*broadcast_types.DedupCancel,
) Error!void {
const strat = self.channel.sessionStrategy(message_id) orelse return error.UnknownMessage;
const strat = self.channel.sessionStrategy(message_id) orelse return error.InvalidMessage;
const idx_i = chunk_id.index;
if (idx_i < 0) return error.InvalidChunkIndex;
const idx: usize = @intCast(idx_i);
Expand Down
35 changes: 35 additions & 0 deletions src/broadcast/session_rs.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@

const std = @import("std");
const broadcast_types = @import("../layer/broadcast_types.zig");
const emit_planner = @import("../layer/emit_planner.zig");
const rs_strategy = @import("../layer/rs_strategy.zig");

const Allocator = std.mem.Allocator;
const RsStrategy = rs_strategy.RsStrategy;

/// Context + `peerSendRsChunk` (or custom transport) for [`drainOutboundOverQuic`](SessionRs.drainOutboundOverQuic).
pub const SendRsChunkFn = *const fn (
ctx: *anyopaque,
channel_id: []const u8,
message_id: []const u8,
shard_index: i32,
payload: []const u8,
) anyerror!void;

pub const SessionRs = struct {
allocator: Allocator,
/// Owned by the parent `ChannelRs` map key; not freed here.
Expand Down Expand Up @@ -42,4 +52,29 @@ pub const SessionRs = struct {
}
return total;
}

/// Drain the RS emit planner by sending each scheduled chunk via `send_chunk` (e.g. QUIC UNI per
/// [`engine_quic.peerSendRsChunk`](`@import("engine_quic.zig").peerSendRsChunk`)), then `chunkSent` with success.
pub fn drainOutboundOverQuic(
self: *SessionRs,
channel_id: []const u8,
ctx: *anyopaque,
send_chunk: SendRsChunkFn,
) (Allocator.Error || emit_planner.PlannerError || anyerror)!usize {
var total: usize = 0;
while (true) {
const out = try self.strategy.pollChunks();
defer self.allocator.free(out);
if (out.len == 0) break;
for (out) |disp| {
send_chunk(ctx, channel_id, self.message_id, disp.chunk_id.index, disp.data) catch |err| {
self.strategy.chunkSent(disp.peer, disp.chunk_id.handle(), false);
return err;
};
self.strategy.chunkSent(disp.peer, disp.chunk_id.handle(), true);
total += 1;
}
}
return total;
}
};
1 change: 1 addition & 0 deletions src/ci_root_broadcast.zig
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ test {
_ = @import("layer/rs_strategy.zig");
_ = @import("layer/verify_queue.zig");
_ = @import("layer/verify_workers.zig");
_ = @import("broadcast/errors.zig");
_ = @import("broadcast/observer.zig");
_ = @import("broadcast/engine.zig");
_ = @import("broadcast/channel_rs.zig");
Expand Down
2 changes: 2 additions & 0 deletions src/root.zig
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub const transport = struct {
};

pub const broadcast = struct {
pub const errors = @import("broadcast/errors.zig");
pub const observer = @import("broadcast/observer.zig");
pub const engine = @import("broadcast/engine.zig");
pub const engine_quic = @import("broadcast/engine_quic.zig");
Expand All @@ -73,6 +74,7 @@ test {
_ = sim.gossipsub_interop;
_ = sim.gossipsub_rpc_pb;
_ = sim.gossipsub_rpc_host;
_ = broadcast.errors;
_ = broadcast.engine;
_ = broadcast.channel_rs;
_ = broadcast.session_rs;
Expand Down
Loading
Loading