Skip to content

Commit ced3e2b

Browse files
nficanoclaude
andcommitted
feat(runtime): terminal-job retention + resume-token sweeper + configurable buffer
Three bookkeeping fixes that share an axis (memory hygiene + spec §6.3/§7.6 buffer sizing) and motivate the same new options: - `ArcpServerOptions.EventLogCapacity` (default 4096) sizes the session-scoped `EventLog` AND the per-job subscription-history buffer. Previously the per-job buffer was hard-coded at 1024, so subscribers with `history: true` saw a shorter window than resumers — inconsistent with spec §7.6 "same buffer window". - `ArcpServerOptions.TerminalJobRetentionSec` (default 600) bounds how long terminal jobs hang around in `_jobs` / `_idempotency`. `JobManager.ScheduleTerminalCleanup` evicts them after the window. Previously these dictionaries grew without bound for the lifetime of the runtime. - A background `PeriodicTimer`-driven sweeper in `ArcpServer` purges expired resume tokens, their reverse-index entries, and dormant session shells. Cancelled on `DisposeAsync`. Previously only cleaned opportunistically on lookup. Plumb the three values through the `JobManager` constructor; bump the public-API tracker accordingly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 2137e35 commit ced3e2b

6 files changed

Lines changed: 103 additions & 10 deletions

File tree

src/Arcp.Runtime/ArcpServer.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ public sealed class ArcpServer : IAsyncDisposable
2626
private readonly ConcurrentDictionary<SessionId, string> _resumeTokenBySession = new();
2727
private readonly ConcurrentDictionary<SessionId, SessionState> _resumableSessions = new();
2828
private readonly ILoggerFactory _loggerFactory;
29+
private readonly CancellationTokenSource _sweeperCts = new();
30+
private readonly Task? _sweeperTask;
2931

3032
private readonly record struct ResumeRegistration(SessionId SessionId, DateTimeOffset ExpiresAt);
3133

@@ -71,11 +73,41 @@ public ArcpServer(ArcpServerOptions options, ILoggerFactory? loggerFactory = nul
7173
_loggerFactory,
7274
CredentialManager,
7375
options.IdempotencyWindowSec,
76+
options.EventLogCapacity,
77+
options.TerminalJobRetentionSec,
7478
options.FatalBudgetExhaustion);
7579
if (CredentialManager is not null)
7680
{
7781
_ = Task.Run(() => RevokeOutstandingCredentialsAsync(CancellationToken.None));
7882
}
83+
_sweeperTask = Task.Run(() => RunResumeTokenSweeperAsync(_sweeperCts.Token));
84+
}
85+
86+
/// <summary>Periodically purge expired resume tokens, their reverse indices, and dormant
87+
/// session shells (spec §6.3). Without this, long-running runtimes accumulate one entry per
88+
/// session forever.</summary>
89+
private async Task RunResumeTokenSweeperAsync(CancellationToken cancellationToken)
90+
{
91+
// Sweep every minute, or sooner for short windows.
92+
var interval = TimeSpan.FromSeconds(Math.Clamp(Options.ResumeWindowSec / 4, 30, 300));
93+
try
94+
{
95+
using var timer = new PeriodicTimer(interval, Options.TimeProvider);
96+
while (await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false))
97+
{
98+
var now = Options.TimeProvider.GetUtcNow();
99+
foreach (var kv in _resumeTokens)
100+
{
101+
if (kv.Value.ExpiresAt > now) continue;
102+
if (_resumeTokens.TryRemove(kv.Key, out _))
103+
{
104+
_resumeTokenBySession.TryRemove(kv.Value.SessionId, out _);
105+
_resumableSessions.TryRemove(kv.Value.SessionId, out _);
106+
}
107+
}
108+
}
109+
}
110+
catch (OperationCanceledException) { /* shutdown */ }
79111
}
80112

81113
/// <summary>Register agent.</summary>
@@ -181,6 +213,14 @@ internal bool TryResume(string resumeToken, out SessionState? session)
181213
/// <summary>Dispose (asynchronous).</summary>
182214
public async ValueTask DisposeAsync()
183215
{
216+
try { _sweeperCts.Cancel(); } catch (ObjectDisposedException) { /* already disposed */ }
217+
if (_sweeperTask is not null)
218+
{
219+
try { await _sweeperTask.ConfigureAwait(false); }
220+
catch (OperationCanceledException) { /* expected */ }
221+
}
222+
_sweeperCts.Dispose();
223+
184224
foreach (var session in _sessions.Values)
185225
{
186226
try { await session.CloseAsync().ConfigureAwait(false); } catch { /* shutdown */ }

src/Arcp.Runtime/ArcpServerOptions.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,16 @@ public sealed class ArcpServerOptions
3131
/// (spec §7.2). Submissions with the same key after this window create a fresh job.</summary>
3232
public int IdempotencyWindowSec { get; init; } = 3600;
3333

34+
/// <summary>Capacity of the session-scoped event log (spec §6.3 resume window). Per-job
35+
/// subscription-history buffers (spec §7.6) inherit this size so a subscriber with
36+
/// <c>history: true</c> sees the same window as a resume client.</summary>
37+
public int EventLogCapacity { get; init; } = 4096;
38+
39+
/// <summary>How long terminal jobs are retained in <c>session.list_jobs</c> and idempotency
40+
/// records before being garbage-collected. Set to <c>0</c> or a negative value to retain
41+
/// indefinitely (legacy behavior).</summary>
42+
public int TerminalJobRetentionSec { get; init; } = 600;
43+
3444
/// <summary>Whether a <c>cost.budget</c> exhaustion terminates the job with
3545
/// <c>BUDGET_EXHAUSTED</c> (legacy v1.0 behavior) or surfaces a non-fatal
3646
/// <c>tool_result.error</c> so the agent may emit a partial result and return normally

src/Arcp.Runtime/Job.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public sealed class Job
2626
private readonly List<IssuedCredential> _credentials = [];
2727
private readonly object _eventBufferGate = new();
2828
private readonly List<Envelope> _eventBuffer = [];
29-
private const int EventBufferCapacity = 1024;
29+
private readonly int _eventBufferCapacity;
3030

3131
/// <summary>Gets the job id.</summary>
3232
public JobId JobId { get; }
@@ -117,7 +117,8 @@ internal Job(JobId jobId, SessionId sessionId, AgentRef agent, Lease lease, Leas
117117
JsonElement? input, string? idempotencyKey, TraceId? traceId, string? parentJobId,
118118
string? submitterPrincipal, int? maxRuntimeSec, DateTimeOffset createdAt,
119119
Func<Envelope, CancellationToken, ValueTask> emit, TimeProvider time,
120-
CancellationToken parentCancellation)
120+
CancellationToken parentCancellation,
121+
int eventBufferCapacity = 4096)
121122
{
122123
JobId = jobId;
123124
SessionId = sessionId;
@@ -133,6 +134,7 @@ internal Job(JobId jobId, SessionId sessionId, AgentRef agent, Lease lease, Leas
133134
CreatedAt = createdAt;
134135
_emit = emit;
135136
_time = time;
137+
_eventBufferCapacity = eventBufferCapacity > 0 ? eventBufferCapacity : 4096;
136138
CancellationSource = CancellationTokenSource.CreateLinkedTokenSource(parentCancellation);
137139
BudgetLedger.Initialize(lease);
138140
}
@@ -193,14 +195,14 @@ public async ValueTask EmitEventAsync(string kind, object body, CancellationToke
193195
private void BufferEvent(Envelope env)
194196
{
195197
// Spec §7.6: bounded per-job history so a later subscriber with `history: true`
196-
// can receive prior events in order before live events. Bounded by EventBufferCapacity
197-
// to keep server memory predictable.
198+
// can receive prior events in order before live events. Sized from
199+
// `ArcpServerOptions.EventLogCapacity` so subscribers see the same window resumers do.
198200
lock (_eventBufferGate)
199201
{
200202
_eventBuffer.Add(env);
201-
if (_eventBuffer.Count > EventBufferCapacity)
203+
if (_eventBuffer.Count > _eventBufferCapacity)
202204
{
203-
_eventBuffer.RemoveRange(0, _eventBuffer.Count - EventBufferCapacity);
205+
_eventBuffer.RemoveRange(0, _eventBuffer.Count - _eventBufferCapacity);
204206
}
205207
}
206208
}

src/Arcp.Runtime/JobManager.cs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public sealed partial class JobManager
3535
private readonly ILoggerFactory _loggers;
3636
private readonly CredentialManager? _credentials;
3737
private readonly int _idempotencyWindowSec;
38+
private readonly int _eventBufferCapacity;
39+
private readonly int _terminalRetentionSec;
3840
private readonly bool _fatalBudgetExhaustion;
3941

4042
/// <summary>Stored record for an idempotency key: original submission fingerprint plus issue time.</summary>
@@ -48,6 +50,8 @@ public JobManager(
4850
ILoggerFactory loggers,
4951
CredentialManager? credentials = null,
5052
int idempotencyWindowSec = 3600,
53+
int eventBufferCapacity = 4096,
54+
int terminalRetentionSec = 600,
5155
bool fatalBudgetExhaustion = false)
5256
{
5357
_agents = agents;
@@ -56,6 +60,8 @@ public JobManager(
5660
_loggers = loggers;
5761
_credentials = credentials;
5862
_idempotencyWindowSec = idempotencyWindowSec > 0 ? idempotencyWindowSec : 3600;
63+
_eventBufferCapacity = eventBufferCapacity > 0 ? eventBufferCapacity : 4096;
64+
_terminalRetentionSec = terminalRetentionSec;
5965
_fatalBudgetExhaustion = fatalBudgetExhaustion;
6066
}
6167

@@ -134,7 +140,8 @@ public bool TryGet(JobId id, out Job? job)
134140
jobId, sessionId, resolved, lease, submit.LeaseConstraints,
135141
submit.Input, submit.IdempotencyKey, traceId, submit.ParentJobId, submitterPrincipal,
136142
submit.MaxRuntimeSec,
137-
_time.GetUtcNow(), emit, _time, parentCancellation);
143+
_time.GetUtcNow(), emit, _time, parentCancellation,
144+
eventBufferCapacity: _eventBufferCapacity);
138145
if (_credentials is not null)
139146
{
140147
await _credentials.IssueForJobAsync(job, cancellationToken).ConfigureAwait(false);
@@ -281,9 +288,38 @@ public async Task RunAsync(Job job, IAgent agent, Func<Envelope, CancellationTok
281288
try { watchdogCts.Cancel(); } catch (ObjectDisposedException) { /* race on dispose */ }
282289
await AwaitWatchdogAsync(watchdog).ConfigureAwait(false);
283290
await AwaitWatchdogAsync(runtimeWatchdog).ConfigureAwait(false);
291+
292+
// Spec §6.6 / hygiene: terminal jobs stay listable for the retention window so
293+
// session.list_jobs surfaces them briefly, then are GC'd to bound memory.
294+
ScheduleTerminalCleanup(job);
284295
}
285296
}
286297

298+
/// <summary>Schedule removal of a terminal job's bookkeeping after
299+
/// <see cref="ArcpServerOptions.TerminalJobRetentionSec"/>. Negative / zero retention disables
300+
/// cleanup (legacy "retain forever" behavior).</summary>
301+
private void ScheduleTerminalCleanup(Job job)
302+
{
303+
if (_terminalRetentionSec <= 0) return;
304+
_ = Task.Run(async () =>
305+
{
306+
try
307+
{
308+
await Task.Delay(TimeSpan.FromSeconds(_terminalRetentionSec), _time).ConfigureAwait(false);
309+
}
310+
catch (OperationCanceledException) { return; }
311+
_jobs.TryRemove(job.JobId, out _);
312+
if (job.IdempotencyKey is { } key)
313+
{
314+
var idemKey = $"{job.SubmitterPrincipal ?? "*"}|{key}";
315+
if (_idempotency.TryGetValue(idemKey, out var rec) && rec.JobId.Equals(job.JobId))
316+
{
317+
_idempotency.TryRemove(idemKey, out _);
318+
}
319+
}
320+
});
321+
}
322+
287323
private Task? StartLeaseWatchdog(Job job, Func<Envelope, CancellationToken, ValueTask> emit, CancellationToken cancellationToken)
288324
{
289325
// Spec §9.5.

src/Arcp.Runtime/PublicAPI.Unshipped.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,14 +197,18 @@ Arcp.Runtime.Credentials.InMemoryCredentialStore.RemoveAsync(Arcp.Core.Ids.JobId
197197
Arcp.Runtime.Job.Credentials.get -> System.Collections.Generic.IReadOnlyList<Arcp.Core.Messages.ProvisionedCredential!>!
198198
Arcp.Runtime.JobContext.Credentials.get -> System.Collections.Generic.IReadOnlyList<Arcp.Core.Messages.ProvisionedCredential!>!
199199
Arcp.Runtime.JobContext.RotateCredentialAsync(string! credentialId, Arcp.Core.Messages.ProvisionedCredential! next, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
200-
Arcp.Runtime.JobManager.JobManager(Arcp.Runtime.Agents.AgentRegistry! agents, Arcp.Runtime.Leases.LeaseManager! leases, System.TimeProvider! time, Microsoft.Extensions.Logging.ILoggerFactory! loggers, Arcp.Runtime.Credentials.CredentialManager? credentials = null, int idempotencyWindowSec = 3600, bool fatalBudgetExhaustion = false) -> void
200+
Arcp.Runtime.JobManager.JobManager(Arcp.Runtime.Agents.AgentRegistry! agents, Arcp.Runtime.Leases.LeaseManager! leases, System.TimeProvider! time, Microsoft.Extensions.Logging.ILoggerFactory! loggers, Arcp.Runtime.Credentials.CredentialManager? credentials = null, int idempotencyWindowSec = 3600, int eventBufferCapacity = 4096, int terminalRetentionSec = 600, bool fatalBudgetExhaustion = false) -> void
201201
Arcp.Runtime.Job.LeaseExpired.get -> bool
202202
Arcp.Runtime.Job.MaxRuntimeSec.get -> int?
203203
Arcp.Runtime.Job.RuntimeLimitExceeded.get -> bool
204+
Arcp.Runtime.ArcpServerOptions.EventLogCapacity.get -> int
205+
Arcp.Runtime.ArcpServerOptions.EventLogCapacity.init -> void
204206
Arcp.Runtime.ArcpServerOptions.FatalBudgetExhaustion.get -> bool
205207
Arcp.Runtime.ArcpServerOptions.FatalBudgetExhaustion.init -> void
206208
Arcp.Runtime.ArcpServerOptions.IdempotencyWindowSec.get -> int
207209
Arcp.Runtime.ArcpServerOptions.IdempotencyWindowSec.init -> void
210+
Arcp.Runtime.ArcpServerOptions.TerminalJobRetentionSec.get -> int
211+
Arcp.Runtime.ArcpServerOptions.TerminalJobRetentionSec.init -> void
208212
Arcp.Runtime.JobManager.SubmitAsync(Arcp.Core.Messages.JobSubmitPayload! submit, Arcp.Core.Ids.SessionId sessionId, string? submitterPrincipal, System.Func<Arcp.Core.Wire.Envelope!, System.Threading.CancellationToken, System.Threading.Tasks.ValueTask>! emit, Arcp.Core.Ids.TraceId? inboundTraceId, System.Threading.CancellationToken parentCancellation, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<(Arcp.Runtime.Job! Job, Arcp.Core.Messages.JobAcceptedPayload! Accepted)>!
209213
Arcp.Runtime.Leases.LeaseManager.AuthorizeModelUse(Arcp.Core.Leases.Lease! lease, Arcp.Core.Leases.LeaseConstraints? constraints, string! modelId) -> void
210214
override Arcp.Runtime.Credentials.CredentialIssueContext.Equals(object? obj) -> bool

src/Arcp.Runtime/SessionState.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ public sealed partial class SessionState : IAsyncDisposable
4343
/// <summary>Gets the effective features.</summary>
4444
public IReadOnlyList<string> EffectiveFeatures { get; private set; } = Array.Empty<string>();
4545

46-
/// <summary>Gets the event log.</summary>
47-
public EventLog EventLog { get; private set; } = new();
46+
/// <summary>Gets the event log. Sized from <see cref="ArcpServerOptions.EventLogCapacity"/>.</summary>
47+
public EventLog EventLog { get; private set; }
4848

4949
/// <summary>Adopt the durable resumable state from a prior session of the same id. Called
5050
/// when a client supplies a still-valid resume token (spec §6.3).</summary>
@@ -69,6 +69,7 @@ internal SessionState(ITransport transport, ArcpServer server, ArcpServerOptions
6969
SingleWriter = false,
7070
FullMode = BoundedChannelFullMode.Wait,
7171
});
72+
EventLog = new EventLog(options.EventLogCapacity);
7273
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellation);
7374
_lastInboundAt = options.TimeProvider.GetUtcNow();
7475
}

0 commit comments

Comments
 (0)