ChIP Durable Emitter#1911
Conversation
5f7ace8 to
a268cb0
Compare
✅ API Diff Results -
|
…inlink-common into durable-emitter-poc
| d.metrics.publishBatchEvOK.Add(context.Background(), int64(len(batch))) | ||
| } | ||
|
|
||
| // Async MarkDelivered: the DB UPDATE runs in a background goroutine so |
There was a problem hiding this comment.
due to its async nature, is there a risk that a call to ListPending creates rework of tasks because this is in-flight?
There was a problem hiding this comment.
Yeah async mark implies ListPending can still see those rows and re-enqueue them until mark lands, which is a tradeoff. Not a big deal since we guarantee at least once delivery.
We can fix this in a follow-up PR
| // avoiding send vs close races on publishCh. | ||
| func (d *DurableEmitter) Close() error { | ||
| close(d.stopCh) | ||
| if d.insertCh != nil { |
There was a problem hiding this comment.
should we drain insertCh as a graceful shutdown?
|
|
||
| // rawConn is the underlying gRPC connection when the client exposes it. | ||
| // Non-nil enables zero-copy batch publishing (protowire + ForceCodec). | ||
| rawConn *grpc.ClientConn |
There was a problem hiding this comment.
Can we still fit into the interface if we consider implementing the emitter with a pool of conns?
There was a problem hiding this comment.
maybe we should do a first version without raw grpc calls and leave as a future optimization option
There was a problem hiding this comment.
That should be doable, probably not necessary for now though
Creates ChIP
DurableEmitterto persist and re-send Chip messages.DurableEmitter Integration Testing + ORM