Skip to content

Commit 4dbd446

Browse files
Nick Ficanoclaude
andcommitted
fix: address review issues #22 through #30 (non-doc)
Bug fixes: - #22 (critical): server now requires AllowAnonymousAuth=true before it will accept auth.scheme="none" handshakes, so a configured bearer verifier is no longer silently bypassed - #23 (high): WebSocket client transport serialises sends through a SemaphoreSlim so the BCL WebSocket's one-send-at-a-time contract is respected across auto-ack, pong, submit, cancel, and close paths - #24 (high): JobSubmitFlow claims the idempotency key before registering the job record; a duplicate claim now short-circuits with a single error response and no second job is launched - #25 (medium): credential-provisioner failures unwind the partially accepted job — record removed, idempotency key released, watchdog disposed, cancellation source disposed, and any tracked credentials revoked before responding with the error - #26 (medium): Lease.isSubset accepts conservative narrower-glob children (literal-prefix narrowing for `prefix/**`, literal child under single-star parent) and the misleading unit test now uses a genuinely narrower pattern - #29 (medium): client dispatch tears down the JobHandle when a ResultChunk is out of order or undecodable; ChunkAssembler also catches FormatException / DecoderFallbackException and surfaces them as InvalidRequest instead of throwing through the receive loop Performance: - #27 (medium): EventLog per-session buffer uses Queue<T> so Append cap-eviction and EvictExpired are O(1) per removed entry instead of O(n) RemoveAt(0) shifts Testing (#30): - Coverage rises from 73.4 % / 56.5 % to ~89 % line / ~75 % branch (above the 80 % line target). New focused tests cover stdio framing, WebSocket send serialisation and close behavior, EventLog replay/eviction edge cases, JobContext authority and event emission, ChunkAssembler index, Pending registry, JobHandle, full Codec round-trip for every message type and event body, bearer-verifier variants, error taxonomy, trace ids, JobManager lifecycle, and table-driven property tests for id round-trips, glob coverage, and lease subset laws - CONTRIBUTING.md documents the dotnet test + reportgenerator command pair used to regenerate the coverage report Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 18b99ec commit 4dbd446

33 files changed

Lines changed: 2357 additions & 109 deletions

CONTRIBUTING.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,25 @@ Two layers must pass before a PR merges:
9696
CI runs both on every PR. A PR that changes which feature flags the SDK
9797
negotiates must also update the README feature matrix in the same change.
9898

99+
### Measuring coverage
100+
101+
The full coverage report is regenerated with:
102+
103+
```sh
104+
dotnet test ARCP.slnx --collect:"XPlat Code Coverage" \
105+
--results-directory TestResults/review-coverage
106+
reportgenerator \
107+
-reports:"TestResults/review-coverage/*/coverage.cobertura.xml" \
108+
-targetdir:"TestResults/coverage-report" \
109+
-reporttypes:"TextSummary"
110+
```
111+
112+
Install the report tool once with
113+
`dotnet tool install -g dotnet-reportgenerator-globaltool`. The summary lands
114+
at `TestResults/coverage-report/Summary.txt`. The target is ≥ 80 % line
115+
coverage; transport and async-state-machine paths drive most of the
116+
remaining branch gaps and additions there are welcome.
117+
99118
## Coding standards
100119

101120
Formatting is enforced by [Fantomas](https://fsprojects.github.io/fantomas/)

samples/Stdio/Program.fs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ let main _argv =
1515
ArcpServer(
1616
{ ArcpServerOptions.defaults with
1717
Features = Features.All
18+
AllowAnonymousAuth = true
1819
}
1920
)
2021

src/Arcp.Cli/Program.fs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ let private serveStdio (token: string option) : Task<int> =
6262
let options =
6363
{ ArcpServerOptions.defaults with
6464
BearerVerifier = buildVerifier token
65+
AllowAnonymousAuth = Option.isNone token
6566
}
6667

6768
let server = ArcpServer(options)

src/Arcp.Client/ArcpClient.fs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,16 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
4949
match payload.Body with
5050
| JobEventBody.ResultChunk(rid, chunkSeq, data, enc, more) ->
5151
let assembler = w.ChunkIndex.GetOrCreate rid
52-
assembler.Append(chunkSeq, data, enc, more) |> ignore
53-
w.Channel.Writer.TryWrite payload.Body |> ignore
52+
53+
match assembler.Append(chunkSeq, data, enc, more) with
54+
| Ok _ -> w.Channel.Writer.TryWrite payload.Body |> ignore
55+
| Error err ->
56+
// Out-of-order or undecodable chunk: tear down
57+
// the handle so callers don't sit on a job that
58+
// will never produce a usable result.
59+
handles.TryRemove jid |> ignore
60+
w.Channel.Writer.TryComplete() |> ignore
61+
w.ResultSetter.TrySetResult(Error err) |> ignore
5462
| other -> w.Channel.Writer.TryWrite other |> ignore
5563
| _ -> ()
5664

src/Arcp.Client/Internal/ChunkAssembler.fs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,30 @@ type internal ChunkAssembler() =
2626
ARCPError.InvalidRequest(sprintf "Out-of-order chunk: expected %d, got %d" expectedSeq chunkSeq, None)
2727
)
2828
else
29-
let bytes =
30-
match encoding with
31-
| ChunkEncoding.Utf8 -> Encoding.UTF8.GetBytes data
32-
| ChunkEncoding.Base64 -> Convert.FromBase64String data
29+
let bytesResult =
30+
try
31+
let bytes =
32+
match encoding with
33+
| ChunkEncoding.Utf8 -> Encoding.UTF8.GetBytes data
34+
| ChunkEncoding.Base64 -> Convert.FromBase64String data
3335

34-
buffer.Add bytes
35-
expectedSeq <- expectedSeq + 1L
36+
Ok bytes
37+
with
38+
| :? FormatException as fx ->
39+
Error(ARCPError.InvalidRequest(sprintf "Invalid base64 chunk: %s" fx.Message, None))
40+
| :? DecoderFallbackException as dx ->
41+
Error(ARCPError.InvalidRequest(sprintf "Invalid utf-8 chunk: %s" dx.Message, None))
3642

37-
if not more then
38-
closed <- true
43+
match bytesResult with
44+
| Error e -> Error e
45+
| Ok bytes ->
46+
buffer.Add bytes
47+
expectedSeq <- expectedSeq + 1L
3948

40-
Ok closed
49+
if not more then
50+
closed <- true
51+
52+
Ok closed
4153

4254
/// Materialise the assembled bytes. Throws if the stream has
4355
/// not yet seen its terminating chunk.

src/Arcp.Client/Transport/WebSocket.fs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,24 @@ open ARCP.Client
1818
/// One text frame per envelope. The receive loop reassembles
1919
/// continuation frames into a single message.
2020
type WebSocketClientTransport(socket: WebSocket, ownsSocket: bool) =
21-
let sendLock = obj ()
21+
// The BCL WebSocket allows only one outstanding send at a time,
22+
// so we serialise concurrent callers (auto-ack, pong, submit,
23+
// cancel, close) through an async-aware semaphore — a plain
24+
// `lock` only covers the synchronous call to `SendAsync` and
25+
// would release before the returned Task completes.
26+
let sendLock = new SemaphoreSlim(1, 1)
2227
let mutable closed = false
2328

2429
let sendOne (env: Envelope) (ct: CancellationToken) : Task =
2530
task {
2631
let json = Codec.writeEnvelope env
2732
let bytes = Encoding.UTF8.GetBytes json
28-
// Concurrent sends are not allowed on a single ClientWebSocket;
29-
// serialise through a lock + a write task.
30-
do!
31-
lock sendLock (fun () ->
32-
socket.SendAsync(ArraySegment<byte>(bytes), WebSocketMessageType.Text, true, ct))
33+
do! sendLock.WaitAsync ct
34+
35+
try
36+
do! socket.SendAsync(ArraySegment<byte>(bytes), WebSocketMessageType.Text, true, ct)
37+
finally
38+
sendLock.Release() |> ignore
3339
}
3440
:> Task
3541

src/Arcp.Core/Lease.fs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,14 +116,58 @@ module Lease =
116116
let private violation (msg: string) : ARCPError =
117117
ARCPError.LeaseSubsetViolation(msg, None)
118118

119+
/// Longest prefix of `pattern` containing no glob metacharacters.
120+
let private literalPrefix (pattern: string) : string =
121+
let mutable i = 0
122+
let mutable stop = false
123+
124+
while not stop && i < pattern.Length do
125+
match pattern.[i] with
126+
| '*'
127+
| '?' -> stop <- true
128+
| _ -> i <- i + 1
129+
130+
pattern.Substring(0, i)
131+
132+
let private isLiteralPattern (pattern: string) : bool =
133+
not (pattern.Contains '*') && not (pattern.Contains '?')
134+
135+
/// Conservative glob coverage: returns true only when every
136+
/// string matched by `child` is also matched by `parent`.
137+
/// Returns false when coverage cannot be proven (callers should
138+
/// then reject the child as not provably a subset).
139+
let private globCovers (parent: string) (child: string) : bool =
140+
if parent = child then true
141+
elif parent = "**" then true
142+
elif parent.EndsWith "/**" then
143+
// Parent of the form `prefix/**` matches any string starting
144+
// with `prefix/`. A child is provably a subset when its
145+
// literal prefix already starts with `prefix/` — any glob
146+
// suffix after that can only produce strings matched by the
147+
// parent.
148+
let prefix = parent.Substring(0, parent.Length - 3)
149+
let childLiteral = literalPrefix child
150+
childLiteral.StartsWith(prefix + "/")
151+
elif isLiteralPattern parent then
152+
// Literal parent only covers identical patterns.
153+
false
154+
elif isLiteralPattern child then
155+
// Parent has globs, child is concrete — provable by direct
156+
// regex match (e.g. parent `render.*`, child `render.foo`).
157+
Glob.isMatch parent child
158+
else
159+
// Conservatively reject mixed glob shapes the checker can't
160+
// prove are subsets.
161+
false
162+
119163
let private checkNamespace (parent: LeaseGrant) ((ns: string), (childGlobs: string list)) : ARCPError option =
120164
match Map.tryFind ns parent.Capabilities with
121165
| None -> Some(violation (sprintf "Child lease has namespace %s not in parent" ns))
122166
| Some _ when ns = Capabilities.CostBudget -> None
123167
| Some parentGlobs ->
124168
childGlobs
125169
|> List.tryPick (fun cg ->
126-
if parentGlobs |> List.exists (fun pg -> pg = cg || pg = "**") then
170+
if parentGlobs |> List.exists (fun pg -> globCovers pg cg) then
127171
None
128172
else
129173
Some(violation (sprintf "Child glob %s in %s not covered by parent" cg ns)))

src/Arcp.Runtime/ArcpServer.fs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ type ArcpServerOptions =
1919
HeartbeatIntervalSec: int
2020
ResumeWindowSec: int
2121
BearerVerifier: IBearerVerifier
22+
/// When true, clients may handshake with `auth.scheme = "none"`
23+
/// and receive an `AnonymousPrincipal`. Defaults to false so
24+
/// that configuring a bearer verifier is not silently bypassed.
25+
AllowAnonymousAuth: bool
2226
TimeProvider: TimeProvider
2327
Provisioner: ICredentialProvisioner option
2428
CredentialStore: ICredentialStore option
@@ -27,7 +31,9 @@ type ArcpServerOptions =
2731
[<RequireQualifiedAccess>]
2832
module ArcpServerOptions =
2933
/// Sensible defaults: dev-mode bearer auth, every feature flag,
30-
/// 30s heartbeat, 600s resume window.
34+
/// 30s heartbeat, 600s resume window. Anonymous auth is off by
35+
/// default — opt in via `AllowAnonymousAuth = true` for local
36+
/// stdio/dev setups.
3137
let defaults: ArcpServerOptions =
3238
{
3339
Runtime =
@@ -39,6 +45,7 @@ module ArcpServerOptions =
3945
HeartbeatIntervalSec = 30
4046
ResumeWindowSec = 600
4147
BearerVerifier = DevModeBearerVerifier() :> IBearerVerifier
48+
AllowAnonymousAuth = false
4249
TimeProvider = TimeProvider.System
4350
Provisioner = None
4451
CredentialStore = None
@@ -187,6 +194,7 @@ type ArcpServer(options: ArcpServerOptions) =
187194
transport
188195
options.Runtime
189196
options.BearerVerifier
197+
options.AllowAnonymousAuth
190198
options.TimeProvider
191199
eventLog
192200
supportedFeatures

src/Arcp.Runtime/Internal/JobSubmitFlow.fs

Lines changed: 95 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -166,73 +166,109 @@ module internal JobSubmitFlow =
166166
| Error err -> do! EnvelopeOut.respondWithError ctx requestId err ct
167167
| Ok constraints ->
168168
let jobId = JobId.newId ()
169-
let budgets = BudgetCounters()
170-
budgets.SetInitial(Lease.initialBudgets lease)
171-
let cts = new CancellationTokenSource()
172-
let watchdog = buildWatchdog timeProvider jobs credentialRegistry jobId constraints
173-
174-
match submit.IdempotencyKey with
175-
| Some key ->
176-
match jobs.TryClaimIdempotencyKey(key, jobId) with
177-
| Error err -> do! EnvelopeOut.respondWithError ctx requestId err ct
178-
| Ok() -> ()
179-
| None -> ()
180-
181-
let record: JobRecord =
182-
{
183-
JobId = jobId
184-
SessionId = ctx.SessionId
185-
Principal = ctx.Principal
186-
Agent = resolvedAgent
187-
Input = submit.Input
188-
Lease = lease
189-
Constraints = constraints
190-
Credentials = []
191-
Budgets = budgets
192-
ParentJobId = None
193-
TraceId = traceIdOpt
194-
CreatedAt = timeProvider.GetUtcNow()
195-
Cancellation = cts
196-
Watchdog = watchdog
197-
Status = JobStatus.Pending
198-
LastEventSeq = 0L
199-
}
200169

201-
jobs.Register record
202-
let! issued = issueCredentialsAsync provisioner credentialRegistry record ct
170+
// Claim the idempotency key first so a duplicate
171+
// submission short-circuits before any side effects
172+
// (record registration, watchdog start, provisioner
173+
// call). Without this, two concurrent submits with
174+
// the same key both fell through and created jobs.
175+
let claimResult =
176+
match submit.IdempotencyKey with
177+
| Some key -> jobs.TryClaimIdempotencyKey(key, jobId)
178+
| None -> Ok()
203179

204-
match issued with
180+
match claimResult with
205181
| Error err -> do! EnvelopeOut.respondWithError ctx requestId err ct
206-
| Ok credentials ->
207-
record.Credentials <- credentials
208-
209-
let initialBudget =
210-
if budgets.Snapshot() = Map.empty then
211-
None
212-
else
213-
Some(budgets.Snapshot())
182+
| Ok() ->
183+
let budgets = BudgetCounters()
184+
budgets.SetInitial(Lease.initialBudgets lease)
185+
let cts = new CancellationTokenSource()
186+
let watchdog = buildWatchdog timeProvider jobs credentialRegistry jobId constraints
214187

215-
let accepted: JobAcceptedPayload =
188+
let record: JobRecord =
216189
{
217-
JobId = jobId.Value
190+
JobId = jobId
191+
SessionId = ctx.SessionId
192+
Principal = ctx.Principal
193+
Agent = resolvedAgent
194+
Input = submit.Input
218195
Lease = lease
219-
LeaseConstraints = constraints
220-
Budget = initialBudget
221-
Credentials = if List.isEmpty credentials then None else Some credentials
222-
AcceptedAt = record.CreatedAt
196+
Constraints = constraints
197+
Credentials = []
198+
Budgets = budgets
199+
ParentJobId = None
223200
TraceId = traceIdOpt
201+
CreatedAt = timeProvider.GetUtcNow()
202+
Cancellation = cts
203+
Watchdog = watchdog
204+
Status = JobStatus.Pending
205+
LastEventSeq = 0L
224206
}
225207

226-
do! sendAccepted ctx.Transport ctx.SessionId requestId jobId accepted ct
227-
228-
match agentHandlers.TryGetValue resolvedAgent with
229-
| true, handler -> JobLauncher.launch jobs credentialRegistry timeProvider record handler
230-
| _ ->
231-
do!
232-
EnvelopeOut.respondWithError
233-
ctx
234-
requestId
235-
(ARCPError.AgentNotAvailable resolvedAgent)
236-
ct
208+
jobs.Register record
209+
let! issued = issueCredentialsAsync provisioner credentialRegistry record ct
210+
211+
match issued with
212+
| Error err ->
213+
// Acceptance failed after registration —
214+
// unwind state so the failed job does not
215+
// surface in list/get and the idempotency
216+
// key is free for a retry.
217+
jobs.Unregister jobId
218+
219+
match submit.IdempotencyKey with
220+
| Some key -> jobs.ReleaseIdempotencyKey(key, jobId)
221+
| None -> ()
222+
223+
watchdog |> Option.iter (fun w -> (w :> IDisposable).Dispose())
224+
225+
try
226+
cts.Cancel()
227+
with _ ->
228+
()
229+
230+
try
231+
cts.Dispose()
232+
with _ ->
233+
()
234+
235+
try
236+
do! credentialRegistry.RevokeJobAsync(jobId, ct)
237+
with _ ->
238+
()
239+
240+
do! EnvelopeOut.respondWithError ctx requestId err ct
241+
| Ok credentials ->
242+
record.Credentials <- credentials
243+
244+
let initialBudget =
245+
if budgets.Snapshot() = Map.empty then
246+
None
247+
else
248+
Some(budgets.Snapshot())
249+
250+
let accepted: JobAcceptedPayload =
251+
{
252+
JobId = jobId.Value
253+
Lease = lease
254+
LeaseConstraints = constraints
255+
Budget = initialBudget
256+
Credentials = if List.isEmpty credentials then None else Some credentials
257+
AcceptedAt = record.CreatedAt
258+
TraceId = traceIdOpt
259+
}
260+
261+
do! sendAccepted ctx.Transport ctx.SessionId requestId jobId accepted ct
262+
263+
match agentHandlers.TryGetValue resolvedAgent with
264+
| true, handler ->
265+
JobLauncher.launch jobs credentialRegistry timeProvider record handler
266+
| _ ->
267+
do!
268+
EnvelopeOut.respondWithError
269+
ctx
270+
requestId
271+
(ARCPError.AgentNotAvailable resolvedAgent)
272+
ct
237273
}
238274
:> Task

0 commit comments

Comments
 (0)