Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build/devenv/components/committeeccv/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ func (c *component) RunPhase3(
}
impls, _ := priorOutputs["_impls"].([]cciptestinterfaces.CCIP17Configuration)
blockchains, _ := priorOutputs["blockchains"].([]*ctfblockchain.Input)
useLegacyConfigureLane, _ := priorOutputs["_use_legacy_configure_lane"].(bool)
var useLegacyConfigureLane bool
if pcMap, ok := globalConfig["protocol_contracts"].(map[string]any); ok {
useLegacyConfigureLane, _ = pcMap["use_legacy_configure_lane"].(bool)
}

// Step 1: Generate HMAC client credentials for all aggregators before launching verifiers.
for _, agg := range aggregators {
Expand Down
107 changes: 43 additions & 64 deletions build/devenv/components/executor/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ func (c *component) ValidateConfig(componentConfig any) error {
}

// RunPhase3 launches standalone executor containers, registers them with JD,
// and emits FundingEffect requests for transmitter addresses. Job spec
// generation and proposal are deferred to Phase 4 because they require
// deployed contract addresses.
// emits FundingEffect requests for transmitter addresses, generates job specs,
// and emits JobProposalEffect for each standalone executor.
func (c *component) RunPhase3(
ctx context.Context,
_ map[string]any,
Expand Down Expand Up @@ -133,61 +132,6 @@ func (c *component) RunPhase3(
}
}

return map[string]any{configKey: executors}, effects, nil
}

// registerWithJD registers all standalone executors with JD and waits for their
// WSRPC connections. Mirrors the logic in environment.go:registerExecutorsWithJD.
func registerWithJD(ctx context.Context, executors []*executorsvc.Input, jdInfra *jobs.JDInfrastructure) error {
var standalone []*executorsvc.Input
for _, exec := range executors {
if exec != nil && exec.Mode == services.Standalone && exec.Out != nil {
standalone = append(standalone, exec)
}
}
if len(standalone) == 0 {
return nil
}

g, gCtx := errgroup.WithContext(ctx)
var mu sync.Mutex

for _, exec := range standalone {
g.Go(func() error {
if exec.Out.BootstrapKeys.CSAPublicKey == "" {
return fmt.Errorf("executor %s started but CSAPublicKey not available", exec.ContainerName)
}
reg := &jobs.BootstrapJDRegistration{
Name: exec.ContainerName,
CSAPublicKey: exec.Out.BootstrapKeys.CSAPublicKey,
}
if err := jobs.RegisterBootstrapWithJD(gCtx, jdInfra.OffchainClient, reg); err != nil {
return fmt.Errorf("failed to register executor %s with JD: %w", exec.ContainerName, err)
}
mu.Lock()
exec.Out.JDNodeID = reg.NodeID
mu.Unlock()
if err := jobs.WaitForBootstrapConnection(gCtx, jdInfra.OffchainClient, reg.NodeID, 60*time.Second); err != nil {
return fmt.Errorf("executor %s failed to connect to JD: %w", exec.ContainerName, err)
}
return nil
})
}
return g.Wait()
}

// RunPhase4 generates executor job specs using deployed contract addresses from
// Phase 3 and emits JobProposalEffect for each standalone executor.
func (c *component) RunPhase4(
_ context.Context,
_ map[string]any,
_ any,
priorOutputs map[string]any,
) (map[string]any, []devenvruntime.Effect, error) {
executors, ok := priorOutputs[configKey].([]*executorsvc.Input)
if !ok || len(executors) == 0 {
return map[string]any{}, nil, nil
}
e, ok := priorOutputs["_env"].(*deployment.Environment)
if !ok || e == nil {
return nil, nil, fmt.Errorf("executor: _env not found in phase outputs")
Expand All @@ -200,17 +144,12 @@ func (c *component) RunPhase4(
if !ok {
return nil, nil, fmt.Errorf("executor: _ds not found in phase outputs")
}
blockchainOutputs, ok := priorOutputs["blockchainOutputs"].([]*ctfblockchain.Output)
if !ok {
return nil, nil, fmt.Errorf("executor: blockchainOutputs not found in phase outputs")
}

jobSpecs, err := buildExecutorJobSpecs(e, executors, topology, ds)
if err != nil {
return nil, nil, err
}

var effects []devenvruntime.Effect
for _, exec := range executors {
if exec == nil || exec.Mode != services.Standalone {
continue
Expand Down Expand Up @@ -244,7 +183,47 @@ func (c *component) RunPhase4(
})
}

return map[string]any{}, effects, nil
return map[string]any{configKey: executors}, effects, nil
}

// registerWithJD registers all standalone executors with JD and waits for their
// WSRPC connections. Mirrors the logic in environment.go:registerExecutorsWithJD.
func registerWithJD(ctx context.Context, executors []*executorsvc.Input, jdInfra *jobs.JDInfrastructure) error {
var standalone []*executorsvc.Input
for _, exec := range executors {
if exec != nil && exec.Mode == services.Standalone && exec.Out != nil {
standalone = append(standalone, exec)
}
}
if len(standalone) == 0 {
return nil
}

g, gCtx := errgroup.WithContext(ctx)
var mu sync.Mutex

for _, exec := range standalone {
g.Go(func() error {
if exec.Out.BootstrapKeys.CSAPublicKey == "" {
return fmt.Errorf("executor %s started but CSAPublicKey not available", exec.ContainerName)
}
reg := &jobs.BootstrapJDRegistration{
Name: exec.ContainerName,
CSAPublicKey: exec.Out.BootstrapKeys.CSAPublicKey,
}
if err := jobs.RegisterBootstrapWithJD(gCtx, jdInfra.OffchainClient, reg); err != nil {
return fmt.Errorf("failed to register executor %s with JD: %w", exec.ContainerName, err)
}
mu.Lock()
exec.Out.JDNodeID = reg.NodeID
mu.Unlock()
if err := jobs.WaitForBootstrapConnection(gCtx, jdInfra.OffchainClient, reg.NodeID, 60*time.Second); err != nil {
return fmt.Errorf("executor %s failed to connect to JD: %w", exec.ContainerName, err)
}
return nil
})
}
return g.Wait()
}

type executorJobSpec struct {
Expand Down
21 changes: 7 additions & 14 deletions build/devenv/components/protocol_contracts/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,6 @@ func (p *component) RunPhase2(
if envTopology == nil {
return nil, nil, fmt.Errorf("environment_topology is required but not found in config")
}
var useLegacyConfigureLane bool
if m, ok := componentConfig.(map[string]any); ok {
if v, ok := m["use_legacy_configure_lane"].(bool); ok {
useLegacyConfigureLane = v
}
}
timeTrack := timing.New(p.lggr)
ctx = p.lggr.WithContext(ctx)

Expand Down Expand Up @@ -213,14 +207,13 @@ func (p *component) RunPhase2(
}

return map[string]any{
"_cldf": cldf,
"_env": e,
"_topology": topology,
"_selectors": selectors,
"_ds": ds,
"_impls": impls,
"_time_track": timeTrack,
"_use_legacy_configure_lane": useLegacyConfigureLane,
"_cldf": cldf,
"_env": e,
"_topology": topology,
"_selectors": selectors,
"_ds": ds,
"_impls": impls,
"_time_track": timeTrack,
}, nil, nil
}

Expand Down
2 changes: 1 addition & 1 deletion build/devenv/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type Cfg struct {
Aggregator []*services.AggregatorInput `toml:"aggregator" validate:"required"`
JD *jd.Input `toml:"jd" validate:"required"`
Blockchains []*blockchain.Input `toml:"blockchains" validate:"required"`
NodeSets []*ns.Input `toml:"nodesets" validate:"required"`
NodeSets []*ns.Input `toml:"nodesets" validate:"omitempty"`
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I believe omitempty is placed after a comma in the toml tag: toml:"nodesets,omitempty". From what I can see the validate tags aren't actually being used, since the github.com/asaskevich/govalidator dep is indirect in the devenv go.mod.

CLNodesFundingETH float64 `toml:"cl_nodes_funding_eth"`
CLNodesFundingLink float64 `toml:"cl_nodes_funding_link"`
// HighAvailability enables devenv-level service redundancy. When true,
Expand Down
58 changes: 4 additions & 54 deletions build/devenv/runtime/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ func NewEnvironmentWithRegistry(ctx context.Context, rawConfig map[string]any, r
if effectExecutor == nil {
effectExecutor = noopEffectExecutor{}
}
specific, fallback, err := r.instantiate(nil)
specific, err := r.instantiate(nil)
if err != nil {
return nil, err
}
if err := r.validate(rawConfig, specific, fallback); err != nil {
if err := r.validate(rawConfig, specific); err != nil {
return nil, err
}

Expand All @@ -51,23 +51,15 @@ func NewEnvironmentWithRegistry(ctx context.Context, rawConfig map[string]any, r
ls.SetLogger(logger)
}
}
if fallback != nil {
if ls, ok := fallback.(LogSetter); ok {
ls.SetLogger(logger)
}
}

// The fallback component receives all config keys not claimed by a specific
// registered component, rather than a single top-level key slice.
unclaimed := unclaimedKeys(rawConfig, r.factories)
if len(unclaimed) > 0 && fallback == nil {
if len(unclaimed) > 0 {
keys := make([]string, 0, len(unclaimed))
for k := range unclaimed {
keys = append(keys, k)
}
// TODO: Make this an error.
// logger.Error().Strs("keys", keys).Msg("unclaimed config keys with no fallback component registered")
logger.Warn().Strs("keys", keys).Msg("unclaimed config keys with no fallback component registered")
logger.Warn().Strs("keys", keys).Msg("unclaimed config keys")
}
accumulated := map[string]any{}

Expand All @@ -91,16 +83,6 @@ func NewEnvironmentWithRegistry(ctx context.Context, rawConfig map[string]any, r
phaseEffects = append(phaseEffects, effects...)
}
}
if p1, ok := fallback.(Phase1Component); ok {
out, effects, err := p1.RunPhase1(ctx, rawConfig, unclaimed)
if err != nil {
return nil, fmt.Errorf("phase1 fallback: %w", err)
}
if err := mergeNoOverwrite(accumulated, out, phase, fallbackOwner); err != nil {
return nil, err
}
phaseEffects = append(phaseEffects, effects...)
}
if err := effectExecutor.Execute(ctx, phaseEffects, accumulated); err != nil {
return nil, fmt.Errorf("phase1 effects: %w", err)
}
Expand All @@ -127,16 +109,6 @@ func NewEnvironmentWithRegistry(ctx context.Context, rawConfig map[string]any, r
phaseEffects = append(phaseEffects, effects...)
}
}
if p2, ok := fallback.(Phase2Component); ok {
out, effects, err := p2.RunPhase2(ctx, rawConfig, unclaimed, maps.Clone(phaseSnapshot))
if err != nil {
return nil, fmt.Errorf("phase2 fallback: %w", err)
}
if err := mergeNoOverwrite(accumulated, out, phase, fallbackOwner); err != nil {
return nil, err
}
phaseEffects = append(phaseEffects, effects...)
}
if err := effectExecutor.Execute(ctx, phaseEffects, accumulated); err != nil {
return nil, fmt.Errorf("phase2 effects: %w", err)
}
Expand All @@ -163,16 +135,6 @@ func NewEnvironmentWithRegistry(ctx context.Context, rawConfig map[string]any, r
phaseEffects = append(phaseEffects, effects...)
}
}
if p3, ok := fallback.(Phase3Component); ok {
out, effects, err := p3.RunPhase3(ctx, rawConfig, unclaimed, maps.Clone(phaseSnapshot))
if err != nil {
return nil, fmt.Errorf("phase3 fallback: %w", err)
}
if err := mergeNoOverwrite(accumulated, out, phase, fallbackOwner); err != nil {
return nil, err
}
phaseEffects = append(phaseEffects, effects...)
}
if err := effectExecutor.Execute(ctx, phaseEffects, accumulated); err != nil {
return nil, fmt.Errorf("phase3 effects: %w", err)
}
Expand All @@ -199,16 +161,6 @@ func NewEnvironmentWithRegistry(ctx context.Context, rawConfig map[string]any, r
phaseEffects = append(phaseEffects, effects...)
}
}
if p4, ok := fallback.(Phase4Component); ok {
out, effects, err := p4.RunPhase4(ctx, rawConfig, unclaimed, maps.Clone(phaseSnapshot))
if err != nil {
return nil, fmt.Errorf("phase4 fallback: %w", err)
}
if err := mergeNoOverwrite(accumulated, out, phase, fallbackOwner); err != nil {
return nil, err
}
phaseEffects = append(phaseEffects, effects...)
}
if err := effectExecutor.Execute(ctx, phaseEffects, accumulated); err != nil {
return nil, fmt.Errorf("phase4 effects: %w", err)
}
Expand All @@ -217,8 +169,6 @@ func NewEnvironmentWithRegistry(ctx context.Context, rawConfig map[string]any, r
return accumulated, nil
}

const fallbackOwner = "<fallback>"

// mergeNoOverwrite copies src into dst, returning an error if any key in src
// already exists in dst. The phase number and owner identify the offending
// component in the error message.
Expand Down
34 changes: 0 additions & 34 deletions build/devenv/runtime/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,37 +232,3 @@ func TestPhase1_OverwriteDetection(t *testing.T) {
require.Contains(t, err.Error(), `"B"`)
require.Contains(t, err.Error(), `"shared"`)
}

func TestFallbackHonorsSnapshot(t *testing.T) {
specific := newP2Comp(t, map[string]any{"from-specific": 1}, nil)

var fallbackPrior map[string]any
fb := newP2Comp(t, map[string]any{"from-fallback": 2}, func(p map[string]any) { fallbackPrior = p })

r := devenvruntime.NewRegistry()
require.NoError(t, r.Register("Specific", compFactory(specific)))
r.SetFallback(compFactory(fb))

out, err := runEnv(t, r, map[string]any{"Specific": nil, "Other": nil})
require.NoError(t, err)

require.NotContains(t, fallbackPrior, "from-specific",
"fallback must see the phase-start snapshot, not the post-Specific state")
require.Equal(t, 1, out["from-specific"])
require.Equal(t, 2, out["from-fallback"])
}

func TestFallbackOverwriteDetection(t *testing.T) {
specific := newP2Comp(t, map[string]any{"shared": "specific"}, nil)
fb := newP2Comp(t, map[string]any{"shared": "fallback"}, nil)

r := devenvruntime.NewRegistry()
require.NoError(t, r.Register("Specific", compFactory(specific)))
r.SetFallback(compFactory(fb))

_, err := runEnv(t, r, map[string]any{"Specific": nil})
require.Error(t, err)
require.Contains(t, err.Error(), "phase 2")
require.Contains(t, err.Error(), `"<fallback>"`)
require.Contains(t, err.Error(), `"shared"`)
}
Loading
Loading