Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions pkg/loop/internal/relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key
pb.RegisterKeystoreServer(s, ks.NewServer(keystore))
})
if err != nil {
return 0, deps, fmt.Errorf("Failed to create relayer client: failed to serve keystore: %w", err)
return 0, nil, fmt.Errorf("Failed to create relayer client: failed to serve keystore: %w", err)
}
deps.Add(ksRes)

Expand All @@ -70,15 +70,15 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key
pb.RegisterKeystoreServer(s, ks.NewServer(csaKeystore))
})
if err != nil {
return 0, deps, fmt.Errorf("Failed to create relayer client: failed to serve CSA keystore: %w", err)
return 0, nil, fmt.Errorf("Failed to create relayer client: failed to serve CSA keystore: %w", err)
}
deps.Add(ksCSARes)

capabilityRegistryID, capabilityRegistryResource, err := p.ServeNew("CapabilitiesRegistry", func(s *grpc.Server) {
pb.RegisterCapabilitiesRegistryServer(s, capability.NewCapabilitiesRegistryServer(p.BrokerExt, capabilityRegistry))
})
if err != nil {
return 0, deps, fmt.Errorf("failed to serve new capability registry: %w", err)
return 0, nil, fmt.Errorf("failed to serve new capability registry: %w", err)
}
deps.Add(capabilityRegistryResource)

Expand All @@ -89,9 +89,9 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key
CapabilityRegistryID: capabilityRegistryID,
})
if err != nil {
return 0, deps, fmt.Errorf("Failed to create relayer client: failed request: %w", err)
return 0, nil, fmt.Errorf("Failed to create relayer client: failed request: %w", err)
}
return reply.RelayerID, deps, nil
return reply.RelayerID, nil, nil
})
return newRelayerClient(p.BrokerExt, cc), nil
}
Expand Down Expand Up @@ -127,7 +127,7 @@ func (p *pluginRelayerServer) NewRelayer(ctx context.Context, request *pb.NewRel
p.CloseAll(ksRes)
return nil, net.ErrConnDial{Name: "CSAKeystore", ID: request.KeystoreCSAID, Err: err}
}
ksCSARes := net.Resource{Closer: ksCSAConn, Name: "CSAKeystore"}
ksCSARes := net.Resource{Closer: ksConn, Name: "CSAKeystore"}

capRegistryConn, err := p.Dial(request.CapabilityRegistryID)
if err != nil {
Expand Down Expand Up @@ -324,7 +324,7 @@ func (r *relayerClient) NewCCIPProvider(ctx context.Context, cargs types.CCIPPro
ccipocr3pb.RegisterExtraDataCodecBundleServer(s, ccipocr3loop.NewExtraDataCodecBundleServer(cargs.ExtraDataCodecBundle))
})
if err != nil {
return 0, deps, fmt.Errorf("failed to serve ExtraDataCodecBundle: %w", err)
return 0, nil, fmt.Errorf("failed to serve ExtraDataCodecBundle: %w", err)
}
deps.Add(edcRes)
extraDataCodecBundleID = edcID
Expand All @@ -344,7 +344,7 @@ func (r *relayerClient) NewCCIPProvider(ctx context.Context, cargs types.CCIPPro
},
})
if err != nil {
return 0, deps, err
return 0, nil, err
}
return reply.CcipProviderID, deps, nil
})
Expand Down
25 changes: 18 additions & 7 deletions pkg/workflows/dontime/pb/dontime.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/workflows/dontime/pb/dontime.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ option go_package = "github.com/smartcontractkit/chainlink-common/pkg/workflows/
message Observation {
int64 timestamp = 1;
map<string, int64> requests = 2;
// Flag to roll out execution pruning fix.
// TODO(CRE-2497): Remove after rollout.
bool prune_executions = 3;
}

message Observations {
Expand Down
104 changes: 93 additions & 11 deletions pkg/workflows/dontime/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"go.opentelemetry.io/otel/metric"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

Expand All @@ -16,10 +17,51 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/smartcontractkit/libocr/quorumhelper"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/dontime/pb"
)

type pluginMetrics struct {
donTime metric.Int64Gauge
donTimeEntries metric.Int64Gauge
outcomeSize metric.Int64Gauge
}

func newPluginMetrics() (pluginMetrics, error) {
meter := beholder.GetMeter()

donTime, err := meter.Int64Gauge("platform_dontime_outcome_don_time_ms",
metric.WithDescription("DON consensus timestamp included in the latest outcome, in milliseconds"),
metric.WithUnit("ms"),
)
if err != nil {
return pluginMetrics{}, fmt.Errorf("failed to create don_time gauge: %w", err)
}

donTimeEntries, err := meter.Int64Gauge("platform_dontime_outcome_entries",
metric.WithDescription("Number of workflow execution entries tracked in the latest outcome"),
metric.WithUnit("{entry}"),
)
if err != nil {
return pluginMetrics{}, fmt.Errorf("failed to create don_time_entries gauge: %w", err)
}

outcomeSize, err := meter.Int64Gauge("platform_dontime_outcome_size_bytes",
metric.WithDescription("Serialised size of the latest outcome in bytes"),
metric.WithUnit("By"),
)
if err != nil {
return pluginMetrics{}, fmt.Errorf("failed to create outcome_size gauge: %w", err)
}

return pluginMetrics{
donTime: donTime,
donTimeEntries: donTimeEntries,
outcomeSize: outcomeSize,
}, nil
}

type Plugin struct {
mu sync.RWMutex

Expand All @@ -30,6 +72,8 @@ type Plugin struct {

batchSize int
minTimeIncrease int64

metrics pluginMetrics
}

var _ ocr3types.ReportingPlugin[[]byte] = (*Plugin)(nil)
Expand All @@ -45,13 +89,19 @@ func NewPlugin(store *Store, config ocr3types.ReportingPluginConfig, offchainCfg
return nil, errors.New("execution removal time must be positive")
}

metrics, err := newPluginMetrics()
if err != nil {
return nil, err
}

return &Plugin{
store: store,
config: config,
offChainConfig: offchainCfg,
lggr: logger.Named(lggr, "DONTimePlugin"),
batchSize: int(offchainCfg.MaxBatchSize),
minTimeIncrease: offchainCfg.MinTimeIncrease / int64(time.Millisecond),
metrics: metrics,
}, nil
}

Expand Down Expand Up @@ -100,8 +150,9 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext,
}

observation := &pb.Observation{
Timestamp: time.Now().UTC().UnixMilli(),
Requests: requests,
Timestamp: time.Now().UTC().UnixMilli(),
Requests: requests,
PruneExecutions: true,
}

return proto.MarshalOptions{Deterministic: true}.Marshal(observation)
Expand All @@ -115,7 +166,7 @@ func (p *Plugin) ObservationQuorum(_ context.Context, _ ocr3types.OutcomeContext
return quorumhelper.ObservationCountReachesObservationQuorum(quorumhelper.QuorumTwoFPlusOne, p.config.N, p.config.F, aos), nil
}

func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) {
func (p *Plugin) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) {
observationCounts := map[string]int64{} // counts how many nodes reported where a new DON timestamp might be needed
type timestampNodePair struct {
Timestamp int64
Expand All @@ -132,14 +183,33 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
prevOutcome.ObservedDonTimes = make(map[string]*pb.ObservedDonTimes)
}

// Unmarshal all observations once and compute pruneExecutions.
// Only prune when all nodes are updated. Even if this rolls back, the logic is still correct.
parsedAOs := make([]*pb.Observation, len(aos))
pruneExecutions := true
for idx, ao := range aos {
observation := &pb.Observation{}
if err := proto.Unmarshal(ao.Observation, observation); err != nil {
p.lggr.Errorf("failed to unmarshal observation in Outcome phase")
continue
}
parsedAOs[idx] = observation
if !observation.PruneExecutions {
pruneExecutions = false // need all nodes to agree
}
}

for idx, observation := range parsedAOs {
if observation == nil {
continue
}

for id, requestSeqNum := range observation.Requests {
if !pruneExecutions { // TODO(CRE-2497): legacy behavior, remove after rollout
if _, ok := prevOutcome.ObservedDonTimes[id]; !ok {
prevOutcome.ObservedDonTimes[id] = &pb.ObservedDonTimes{}
}
}
var currSeqNum int64
if times, ok := prevOutcome.ObservedDonTimes[id]; ok {
currSeqNum = int64(len(times.Timestamps))
Expand Down Expand Up @@ -199,14 +269,23 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t

// Remove expired and empty workflow executions
for id, observedTimes := range outcome.ObservedDonTimes {
if observedTimes == nil || len(observedTimes.Timestamps) == 0 {
delete(outcome.ObservedDonTimes, id)
p.store.deleteExecutionID(id)
continue
}
if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() {
delete(outcome.ObservedDonTimes, id)
p.store.deleteExecutionID(id)
if !pruneExecutions { // TODO(CRE-2497): legacy behavior, remove after rollout
if observedTimes != nil && len(observedTimes.Timestamps) > 0 {
if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() {
delete(outcome.ObservedDonTimes, id)
p.store.deleteExecutionID(id)
}
}
} else {
if observedTimes == nil || len(observedTimes.Timestamps) == 0 {
delete(outcome.ObservedDonTimes, id)
p.store.deleteExecutionID(id)
continue
}
if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() {
delete(outcome.ObservedDonTimes, id)
p.store.deleteExecutionID(id)
}
}
}

Expand All @@ -215,6 +294,9 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
"observedDonTimesEntries", len(outcome.ObservedDonTimes),
"outcomeSizeBytes", len(outcomeBytes),
)
p.metrics.donTime.Record(ctx, outcome.Timestamp)
p.metrics.donTimeEntries.Record(ctx, int64(len(outcome.ObservedDonTimes)))
p.metrics.outcomeSize.Record(ctx, int64(len(outcomeBytes)))
return outcomeBytes, err
}

Expand Down
Loading
Loading