Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions src/config.zig
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ pub const Config = struct {
lean_api_path: []const u8,
poll_interval_ms: u64,
request_timeout_ms: u64,
/// Max concurrent upstream HTTP polls per tick (multi-upstream mode only).
/// Avoids spawning one OS thread per node and limits parallel 16MB SSZ downloads.
poll_max_concurrency: u32,
stale_after_ms: u64,
static_dir: ?[]const u8,
upstreams_config: ?[]const u8,
Expand All @@ -28,6 +31,7 @@ pub fn load(allocator: std.mem.Allocator) !Config {
var lean_api_path = try allocator.dupe(u8, defaults.lean_api_path);
var poll_interval_ms: u64 = defaults.poll_interval_ms;
var request_timeout_ms: u64 = defaults.request_timeout_ms;
var poll_max_concurrency: u32 = defaults.poll_max_concurrency;
var stale_after_ms: u64 = defaults.stale_after_ms;
var static_dir: ?[]const u8 = null;
var upstreams_config: ?[]const u8 = null;
Expand Down Expand Up @@ -56,6 +60,10 @@ pub fn load(allocator: std.mem.Allocator) !Config {
request_timeout_ms = try parseU64(val);
allocator.free(val);
}
if (try getEnvOwned(allocator, "LEANPOINT_POLL_CONCURRENCY")) |val| {
poll_max_concurrency = try parseU32(val);
allocator.free(val);
}
if (try getEnvOwned(allocator, "LEANPOINT_STALE_MS")) |val| {
stale_after_ms = try parseU64(val);
allocator.free(val);
Expand Down Expand Up @@ -94,6 +102,9 @@ pub fn load(allocator: std.mem.Allocator) !Config {
} else if (std.mem.eql(u8, arg, "--timeout-ms")) {
const value = try needArg(&args, "--timeout-ms");
request_timeout_ms = try parseU64(value);
} else if (std.mem.eql(u8, arg, "--poll-concurrency")) {
const value = try needArg(&args, "--poll-concurrency");
poll_max_concurrency = try parseU32(value);
} else if (std.mem.eql(u8, arg, "--stale-ms")) {
const value = try needArg(&args, "--stale-ms");
stale_after_ms = try parseU64(value);
Expand All @@ -118,13 +129,16 @@ pub fn load(allocator: std.mem.Allocator) !Config {
lean_api_path = prefixed;
}

if (poll_max_concurrency == 0) poll_max_concurrency = defaults.poll_max_concurrency;

return Config{
.bind_address = bind_address,
.bind_port = bind_port,
.lean_api_base_url = lean_api_base_url,
.lean_api_path = lean_api_path,
.poll_interval_ms = poll_interval_ms,
.request_timeout_ms = request_timeout_ms,
.poll_max_concurrency = poll_max_concurrency,
.stale_after_ms = stale_after_ms,
.static_dir = static_dir,
.upstreams_config = upstreams_config,
Expand All @@ -138,6 +152,7 @@ const Defaults = struct {
lean_api_path: []const u8 = "/status",
poll_interval_ms: u64 = 10_000,
request_timeout_ms: u64 = 5_000,
poll_max_concurrency: u32 = 64,
stale_after_ms: u64 = 30_000,
};

Expand All @@ -156,6 +171,10 @@ fn parseU16(value: []const u8) !u16 {
return std.fmt.parseInt(u16, value, 10);
}

fn parseU32(value: []const u8) !u32 {
return std.fmt.parseInt(u32, value, 10);
}

fn needArg(args: *std.process.ArgIterator, flag: []const u8) ![]const u8 {
if (args.next()) |val| return val;
std.debug.print("Missing value for {s}\n", .{flag});
Expand All @@ -176,15 +195,17 @@ fn printUsage() void {
\\ --lean-path <path> LeanEthereum path (default /status)
\\ --upstreams-config <file> JSON config file with multiple upstreams
\\ --poll-ms <ms> Poll interval in milliseconds
\\ --timeout-ms <ms> Request timeout in milliseconds
\\ --timeout-ms <ms> Request timeout per upstream HTTP call
\\ --poll-concurrency <n> Max parallel upstream polls (multi-upstream; default 64)
\\ --stale-ms <ms> Stale threshold in milliseconds
\\ --static-dir <dir> Optional static frontend directory
\\ --help Show this help
\\
\\Env vars:
\\ LEANPOINT_BIND_ADDR, LEANPOINT_BIND_PORT, LEANPOINT_LEAN_URL,
\\ LEANPOINT_LEAN_PATH, LEANPOINT_POLL_MS, LEANPOINT_TIMEOUT_MS,
\\ LEANPOINT_STALE_MS, LEANPOINT_STATIC_DIR, LEANPOINT_UPSTREAMS_CONFIG
\\ LEANPOINT_POLL_CONCURRENCY, LEANPOINT_STALE_MS, LEANPOINT_STATIC_DIR,
\\ LEANPOINT_UPSTREAMS_CONFIG
\\
\\Multi-upstream mode:
\\ When --upstreams-config is specified, leanpoint polls multiple lean
Expand Down
15 changes: 10 additions & 5 deletions src/poller.zig
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,17 @@ pub const Poller = struct {
}
}

/// Poll multiple upstreams with consensus
/// Poll multiple upstreams with consensus (bounded worker pool in upstreams.zig).
/// `self.client` is only passed for API compatibility; workers use their own clients.
fn pollMulti(self: *Poller, manager: *upstreams_mod.UpstreamManager, now_ms: i64) !void {
// Poll all upstreams concurrently and get consensus.
// self.client is passed for API compatibility but each upstream spawns its own
// client internally to avoid shared-state hangs.
var state_ssz: ?[]u8 = null;
const consensus_slots = manager.pollUpstreams(&self.client, now_ms, self.config.request_timeout_ms, &state_ssz);
const consensus_slots = manager.pollUpstreams(
&self.client,
now_ms,
self.config.request_timeout_ms,
self.config.poll_max_concurrency,
&state_ssz,
);

if (consensus_slots) |slots| {
const latency_ms: u64 = 0; // Latency not tracked in multi-upstream mode
Expand Down Expand Up @@ -140,6 +144,7 @@ test "poller initialization" {
.lean_api_path = try std.testing.allocator.dupe(u8, "/status"),
.poll_interval_ms = 10_000,
.request_timeout_ms = 5_000,
.poll_max_concurrency = 16,
.stale_after_ms = 30_000,
.static_dir = null,
.upstreams_config = null,
Expand Down
Loading
Loading