Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ci_root_broadcast.zig
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ test {
_ = @import("layer/dedup_registry.zig");
_ = @import("layer/ec_scheme.zig");
_ = @import("layer/emit_planner.zig");
_ = @import("layer/latency_tier.zig");
_ = @import("layer/rs_encode.zig");
_ = @import("layer/rs_init.zig");
_ = @import("layer/rs_strategy.zig");
Expand Down
30 changes: 5 additions & 25 deletions src/discovery/peering/score.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,12 @@
//! are derived directly from RTT and flow into chunk dispatch ordering.

const std = @import("std");
const latency_tier = @import("../../layer/latency_tier.zig");

// ---------------------------------------------------------------------------
// Latency tiers (mirror of broadcast layer tiers, 002-ec-broadcast.md)
// ---------------------------------------------------------------------------

/// Inner tier — chunks sent first. RTT < 60 ms.
pub const rtt_inner_ms: u32 = 60;
/// Mid tier — second priority. RTT 60–120 ms.
pub const rtt_mid_ms: u32 = 120;
/// Outer tier — everything above rtt_mid_ms.
pub const LatencyTier = enum { inner, mid, outer };

pub fn latencyTier(rtt_ms: u32) LatencyTier {
if (rtt_ms < rtt_inner_ms) return .inner;
if (rtt_ms < rtt_mid_ms) return .mid;
return .outer;
}
pub const rtt_inner_ms = latency_tier.rtt_inner_ms;
pub const rtt_mid_ms = latency_tier.rtt_mid_ms;
pub const LatencyTier = latency_tier.LatencyTier;
pub const latencyTier = latency_tier.latencyTier;

// ---------------------------------------------------------------------------
// Score decay
Expand Down Expand Up @@ -138,15 +127,6 @@ pub const Score = struct {
// Tests
// ---------------------------------------------------------------------------

test "latencyTier boundaries" {
try std.testing.expectEqual(LatencyTier.inner, latencyTier(0));
try std.testing.expectEqual(LatencyTier.inner, latencyTier(59));
try std.testing.expectEqual(LatencyTier.mid, latencyTier(60));
try std.testing.expectEqual(LatencyTier.mid, latencyTier(119));
try std.testing.expectEqual(LatencyTier.outer, latencyTier(120));
try std.testing.expectEqual(LatencyTier.outer, latencyTier(500));
}

test "recordRtt lowers composite for high RTT" {
var s = Score{};
s.recordRtt(300, 0);
Expand Down
4 changes: 4 additions & 0 deletions src/layer/broadcast_types.zig
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Broadcast-layer enums and aliases aligned with ethp2p `broadcast/types.go`.

const std = @import("std");

pub const ChunkHandle = u64;

pub const protocol_v1: u32 = 1;
Expand Down Expand Up @@ -33,6 +35,8 @@ pub const DedupCancel = struct {
/// Per-peer per-session stats; the session owns and mutates fields. Strategy holds a pointer only.
pub const PeerSessionStats = struct {
peer_id: []const u8 = &.{},
/// Measured RTT in ms; `maxInt(u32)` means unknown (lowest dispatch preference).
rtt_ms: u32 = std.math.maxInt(u32),
};

pub fn ChunkDispatch(comptime ChunkId: type) type {
Expand Down
25 changes: 25 additions & 0 deletions src/layer/latency_tier.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//! RTT → latency tier for RS chunk dispatch and peering (002-ec-broadcast).

const std = @import("std");

/// Inner tier — chunks sent first. RTT < 60 ms.
pub const rtt_inner_ms: u32 = 60;
/// Mid tier — second priority. RTT 60–120 ms.
pub const rtt_mid_ms: u32 = 120;
/// Outer tier — everything above `rtt_mid_ms`.
pub const LatencyTier = enum { inner, mid, outer };

pub fn latencyTier(rtt_ms: u32) LatencyTier {
if (rtt_ms < rtt_inner_ms) return .inner;
if (rtt_ms < rtt_mid_ms) return .mid;
return .outer;
}

test "latencyTier boundaries" {
try std.testing.expectEqual(LatencyTier.inner, latencyTier(0));
try std.testing.expectEqual(LatencyTier.inner, latencyTier(59));
try std.testing.expectEqual(LatencyTier.mid, latencyTier(60));
try std.testing.expectEqual(LatencyTier.mid, latencyTier(119));
try std.testing.expectEqual(LatencyTier.outer, latencyTier(120));
try std.testing.expectEqual(LatencyTier.outer, latencyTier(500));
}
63 changes: 61 additions & 2 deletions src/layer/rs_strategy.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const std = @import("std");
const bitmap_mod = @import("bitmap.zig");
const broadcast_types = @import("broadcast_types.zig");
const emit_planner = @import("emit_planner.zig");
const latency_tier = @import("latency_tier.zig");
const rs_encode = @import("rs_encode.zig");
const rs_init = @import("rs_init.zig");

Expand Down Expand Up @@ -405,16 +406,36 @@ pub const RsStrategy = struct {
return out;
}

const PollPeer = struct {
peer: []const u8,
ps: *PeerState,
};

fn lessPollPeer(_: void, a: PollPeer, b: PollPeer) bool {
const ta = latency_tier.latencyTier(a.ps.stats.rtt_ms);
const tb = latency_tier.latencyTier(b.ps.stats.rtt_ms);
if (ta != tb) return @intFromEnum(ta) < @intFromEnum(tb);
return a.ps.stats.rtt_ms < b.ps.stats.rtt_ms;
}

pub fn pollChunks(self: *RsStrategy) (Allocator.Error || emit_planner.PlannerError)![]broadcast_types.ChunkDispatch(ChunkIdent) {
const allocator = self.allocator;
var list: std.ArrayListUnmanaged(broadcast_types.ChunkDispatch(ChunkIdent)) = .{};
errdefer list.deinit(allocator);

var order: std.ArrayListUnmanaged(PollPeer) = .{};
defer order.deinit(allocator);

var it = self.peers.iterator();
while (it.next()) |kv| {
if (kv.value_ptr.completed) continue;
const peer = kv.key_ptr.*;
if (try self.allocate(peer, kv.value_ptr)) |disp| {
try order.append(allocator, .{ .peer = kv.key_ptr.*, .ps = kv.value_ptr });
}

std.sort.pdq(PollPeer, order.items, {}, lessPollPeer);

for (order.items) |entry| {
if (try self.allocate(entry.peer, entry.ps)) |disp| {
try list.append(allocator, disp);
}
}
Expand Down Expand Up @@ -526,6 +547,44 @@ test "origin decode roundtrip" {
try std.testing.expectEqualSlices(u8, &msg, out);
}

test "pollChunks prefers inner-tier peer by RTT" {
const gpa = std.testing.allocator;
const msg = [_]u8{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
const cfg = RsConfig{
.data_shards = 4,
.parity_shards = 2,
.chunk_len = 0,
.bitmap_threshold = 50,
.forward_multiplier = 4,
.disable_bitmap = false,
};

var origin = try RsStrategy.newOrigin(gpa, cfg, &msg);
defer origin.deinit();

var relay = try RsStrategy.newRelay(gpa, cfg, &origin.preamble);
defer relay.deinit();

const peer_outer = "outer";
const peer_inner = "inner";
var stats_outer: broadcast_types.PeerSessionStats = .{ .peer_id = peer_outer, .rtt_ms = 200 };
var stats_inner: broadcast_types.PeerSessionStats = .{ .peer_id = peer_inner, .rtt_ms = 20 };
try relay.attachPeer(peer_inner, &stats_inner);
try relay.attachPeer(peer_outer, &stats_outer);

// Ingest from a sender not in `peers` so downstream peer bitmaps stay empty for forwarding.
const upstream = "upstream";
for (0..4) |i| {
const r = try relay.takeChunk(upstream, .{ .index = @intCast(i) }, origin.chunks[i], null);
try std.testing.expectEqual(broadcast_types.Verdict.accepted, r.verdict);
}

const outgoing = try relay.pollChunks();
defer gpa.free(outgoing);
try std.testing.expect(outgoing.len >= 1);
try std.testing.expectEqualStrings(peer_inner, outgoing[0].peer);
}

test "relay takeChunk and decode" {
const gpa = std.testing.allocator;
const msg = [_]u8{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
Expand Down
2 changes: 2 additions & 0 deletions src/root.zig
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub const layer = struct {
pub const rs_encode = @import("layer/rs_encode.zig");
pub const rs_init = @import("layer/rs_init.zig");
pub const rs_strategy = @import("layer/rs_strategy.zig");
pub const latency_tier = @import("layer/latency_tier.zig");
pub const dedup = @import("layer/dedup.zig");
pub const dedup_registry = @import("layer/dedup_registry.zig");
pub const verify_queue = @import("layer/verify_queue.zig");
Expand Down Expand Up @@ -62,6 +63,7 @@ test {
_ = layer.verify_queue;
_ = layer.verify_workers;
_ = layer.ec_scheme;
_ = layer.latency_tier;
_ = sim.rs_mesh;
_ = sim.gossipsub_transport;
_ = sim.gossipsub_protocol;
Expand Down
Loading