Skip to content

feat: add ChipIngress batch emitter support#21327

Open
thomaska wants to merge 25 commits intodevelopfrom
infoplat-3436-chipingress-publishBatch
Open

feat: add ChipIngress batch emitter support#21327
thomaska wants to merge 25 commits intodevelopfrom
infoplat-3436-chipingress-publishBatch

Conversation

@thomaska
Copy link

@thomaska thomaska commented Feb 27, 2026

Ticket: https://smartcontract-it.atlassian.net/browse/INFOPLAT-3436

Summary

  • Add ChipIngressBatchEmitterEnabled telemetry config flag (default false)
    to toggle batch mode per-node without a code change
  • Implement PublishBatch on the chip-testsink gRPC server so CRE system tests
    work when batch mode is enabled
  • Bump chainlink-common to include batch-emitter feature-flag support

Detail

Config flag – new ChipIngressBatchEmitterEnabled boolean in [Telemetry].
Wired through the config interface, TOML types, beholder globals, docs, and
test fixtures.

chip-testsink – the test-helper server only had single-event Publish,
inheriting UNIMPLEMENTED for PublishBatch. Now delegates each batch event to
the configured PublishFunc and forwards the full batch upstream in one RPC.

Why

9 CRE system tests depend on chip-testsink. Without this change they get gRPC
UNIMPLEMENTED errors once batch mode is the default.

Requires

smartcontractkit/chainlink-common#1862

Copilot AI review requested due to automatic review settings February 27, 2026 14:43
@thomaska thomaska requested review from a team as code owners February 27, 2026 14:43
@github-actions
Copy link
Contributor

👋 thomaska, thanks for creating this pull request!

To help reviewers, please consider creating future PRs as drafts first. This allows you to self-review and make any final changes before notifying the team.

Once you're ready, you can mark it as "Ready for review" to request feedback. Thanks!

@github-actions
Copy link
Contributor

github-actions bot commented Feb 27, 2026

✅ No conflicts with other open PRs targeting develop

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds PublishBatch support to the chip-testsink gRPC server so CRE/system tests don’t fail with UNIMPLEMENTED when nodes emit batched ChIP ingress events.

Changes:

  • Implement PublishBatch on the chip-testsink ChipIngressServer.
  • Delegate batch handling to the existing Publish flow (including configured PublishFunc and optional upstream forwarding).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

}

for _, event := range batch.Events {
if _, err := s.Publish(ctx, event); err != nil {
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling s.Publish() inside the batch loop triggers the per-event async upstream forwarding goroutine in Publish(). For large batches this can create a burst of goroutines and N upstream RPCs. Consider handling upstream forwarding in PublishBatch with a single PublishBatch call (or at least a bounded worker/pool), and calling the configured PublishFunc directly for local handling to avoid unbounded goroutine/RPC fan-out per batch.

Suggested change
if _, err := s.Publish(ctx, event); err != nil {
// Forward upstream synchronously to avoid spawning a goroutine per event.
if s.cfg.UpstreamEndpoint != "" {
forwardCtx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
_, err := s.upstream.Publish(forwardCtx, event)
cancelFn()
if err != nil {
log.Printf("failed to forward to upstream: %v", err)
}
}
if _, err := s.cfg.PublishFunc(ctx, event); err != nil {

Copilot uses AI. Check for mistakes.
Comment on lines +124 to +125
// It delegates each event in the batch to the configured PublishFunc,
// mirroring how the real ChIP Ingress processes batches atomically.
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc comment claims this mirrors how the real ChIP ingress processes batches "atomically", but this implementation is not atomic: it publishes events one-by-one and can return an error after earlier events have already been accepted/forwarded. Please either adjust the comment to reflect best-effort sequential processing, or change the implementation to provide the atomicity guarantees being documented.

Suggested change
// It delegates each event in the batch to the configured PublishFunc,
// mirroring how the real ChIP Ingress processes batches atomically.
// It delegates each event in the batch to the configured PublishFunc
// sequentially, returning an error on the first failure. Earlier events
// in the batch may already have been published or forwarded when an error
// is returned, so processing is best-effort rather than atomic.

Copilot uses AI. Check for mistakes.
@trunk-io
Copy link

trunk-io bot commented Feb 27, 2026

Static BadgeStatic BadgeStatic BadgeStatic Badge

View Full Report ↗︎Docs

jmank88
jmank88 previously approved these changes Feb 27, 2026
@pkcll pkcll added the build-publish Build and Publish image to SDLC label Feb 27, 2026
@github-actions
Copy link
Contributor

github-actions bot commented Mar 2, 2026

I see you updated files related to core. Please run make gocs in the root directory to add a changeset as well as in the text include at least one of the following tags:

  • #added For any new functionality added.
  • #breaking_change For any functionality that requires manual action for the node to boot.
  • #bugfix For bug fixes.
  • #changed For any change to the existing functionality.
  • #db_update For any feature that introduces updates to database schema.
  • #deprecation_notice For any upcoming deprecation functionality.
  • #internal For changesets that need to be excluded from the final changelog.
  • #nops For any feature that is NOP facing and needs to be in the official Release Notes for the release.
  • #removed For any functionality/config that is removed.
  • #updated For any functionality that is updated.
  • #wip For any change that is not ready yet and external communication about it should be held off till it is feature complete.

@thomaska thomaska requested a review from a team as a code owner March 2, 2026 14:02
@thomaska thomaska changed the title Amend chip-testsink feat: add ChipIngress batch emitter support Mar 2, 2026
thomaska added 8 commits March 6, 2026 23:28
# Conflicts:
#	core/scripts/go.mod
#	core/scripts/go.sum
#	deployment/go.mod
#	deployment/go.sum
#	go.mod
#	go.sum
#	integration-tests/go.mod
#	integration-tests/go.sum
#	integration-tests/load/go.sum
#	system-tests/lib/go.mod
#	system-tests/lib/go.sum
#	system-tests/tests/go.sum
@pkcll
Copy link
Collaborator

pkcll commented Mar 17, 2026

Based on my thorough review of both PRs, here is a concrete implementation plan for registering ChipIngressBatchEmitter as a managed service in NewApplication.


Implementation Plan

Problem

The ChipIngressBatchEmitter is created and started inside beholder.NewGRPCClient() (called from initGlobals in shell.go), but it's not registered in the chainlink application's service list. This means:

  • No health check visibility (/health)
  • No ordered shutdown participation
  • No service lifecycle management

Step 1: Expose the batch emitter from beholder.Client (chainlink-common PR #1862)

Currently in pkg/beholder/client.go, the batchEmitterService is a local variable inside NewGRPCClient:

var batchEmitterService *ChipIngressBatchEmitter

Changes needed:

A. Add a field to the Client struct:

type Client struct {
    // ... existing fields ...
    BatchEmitter *ChipIngressBatchEmitter // nil when batch mode is disabled
}

B. Store the emitter when constructing the client (line ~261):

Change the return statement to include the batch emitter:

return &Client{cfg, logger, tracer, meter, emitter, chipIngressClient, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, signer, onClose, batchEmitterService}, nil

C. Add a public getter:

// GetChipIngressBatchEmitter returns the batch emitter service, or nil if batch mode is disabled.
func (c *Client) GetChipIngressBatchEmitter() *ChipIngressBatchEmitter {
    if c == nil {
        return nil
    }
    return c.BatchEmitter
}

Step 2: Register the batch emitter in NewApplication (chainlink PR #21327)

In core/services/chainlink/application.go, after telemetryManager is appended to srvcs (around line 447–448), add:

telemetryManager := telemetry.NewManager(cfg.TelemetryIngress(), csaKeystore, globalLogger)
srvcs = append(srvcs, telemetryManager)

// Register ChipIngressBatchEmitter for health checks and ordered shutdown.
// The emitter is already started by beholder.NewGRPCClient during initGlobals;
// appending it here gives us health visibility and ensures Close() runs on shutdown.
if beholderClient := beholder.GetClient(); beholderClient != nil {
    if batchEmitter := beholderClient.GetChipIngressBatchEmitter(); batchEmitter != nil {
        srvcs = append(srvcs, batchEmitter)
    }
}

Step 3: No changes to initGlobals in shell.go

The current PR #21327 changes to shell.go are already correct — they pass ChipIngressBatchEmitterEnabled and ChipIngressLogger through to beholder.Config. Since beholder creates/starts the emitter internally and we just retrieve a reference afterward, initGlobals doesn't need any additional changes.


Why this works

Concern How it's addressed
Initialization order initGlobalsbeholder.NewGRPCClient runs in beforeNode() before NewApplication, so beholder.GetClient() is already populated
Double-start services.Engine's Start() is idempotent — the service framework calling Start() again is a no-op since it's already running
Nil safety When ChipIngressBatchEmitterEnabled = false (default), batchEmitterService remains nil, and both nil checks protect against it
Shutdown order Services in srvcs are stopped in reverse order. The emitter is closed before beholder's own Client.Close() (which closes the gRPC connection), matching the drain-before-disconnect requirement already implemented in PR #1862's reordered Client.Close()
Health checks ChipIngressBatchEmitter implements services.Service via services.Engine, exposing Name(), Ready(), HealthReport() — all needed for /health

Files summary

File Repo Change
pkg/beholder/client.go chainlink-common (PR #1862) Add BatchEmitter field to Client, store it in constructor, add GetChipIngressBatchEmitter() getter
core/services/chainlink/application.go chainlink (PR #21327) After telemetryManager, retrieve and append batch emitter to srvcs

Note: The PR #21327 file list may be incomplete (API results limited to 30 files). You can view the full file list here.

# Conflicts:
#	core/scripts/cre/environment/examples/workflows/v1/proof-of-reserve/cron-based/go.mod
#	core/scripts/cre/environment/examples/workflows/v1/proof-of-reserve/cron-based/go.sum
#	core/scripts/cre/environment/examples/workflows/v1/proof-of-reserve/web-trigger-based/go.mod
#	core/scripts/cre/environment/examples/workflows/v1/proof-of-reserve/web-trigger-based/go.sum
#	core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/go.mod
#	core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/go.sum
#	core/scripts/go.mod
#	core/scripts/go.sum
#	deployment/go.mod
#	deployment/go.sum
#	devenv/go.mod
#	devenv/go.sum
#	go.mod
#	go.sum
#	integration-tests/go.mod
#	integration-tests/go.sum
#	integration-tests/load/go.mod
#	integration-tests/load/go.sum
#	system-tests/lib/go.mod
#	system-tests/lib/go.sum
#	system-tests/tests/canaries_sentinels/proof-of-reserve/cron-based/go.mod
#	system-tests/tests/canaries_sentinels/proof-of-reserve/cron-based/go.sum
#	system-tests/tests/go.mod
#	system-tests/tests/go.sum
@cl-sonarqube-production
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

build-publish Build and Publish image to SDLC

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants