-
Notifications
You must be signed in to change notification settings - Fork 250
[DO NOT MERGE] ai moonshot #3079
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| --- | ||
| name: Benchmarks | ||
| permissions: {} | ||
| "on": | ||
| push: | ||
| branches: | ||
| - main | ||
| workflow_dispatch: | ||
|
|
||
| jobs: | ||
| evm-benchmark: | ||
| name: EVM Contract Benchmark | ||
| runs-on: ubuntu-latest | ||
| timeout-minutes: 30 | ||
| permissions: | ||
| contents: write | ||
| issues: write | ||
| steps: | ||
| - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 | ||
| - name: Set up Go | ||
| uses: actions/setup-go@d35c59abb061a4a6fb18e82ac0862c26744d6ab5 # v5.5.0 | ||
| with: | ||
| go-version-file: ./go.mod | ||
| - name: Set up Docker Buildx | ||
| uses: docker/setup-buildx-action@b5ca514318bd6ebac0fb2aedd5d36ec1b5c232a2 # v3.10.0 | ||
| - name: Build binaries | ||
| run: make build-evm build-da | ||
| - name: Run EVM benchmarks | ||
| run: | | ||
| cd test/e2e && go test -tags evm -bench=. -benchmem -run='^$' \ | ||
| -timeout=10m --evm-binary=../../build/evm | tee output.txt | ||
| - name: Store benchmark result | ||
| uses: benchmark-action/github-action-benchmark@4bdcce38c94cec68da58d012ac24b7b1155efe8b # v1.20.7 | ||
| with: | ||
| name: EVM Contract Roundtrip | ||
| tool: 'go' | ||
| output-file-path: test/e2e/output.txt | ||
| auto-push: true | ||
| github-token: ${{ secrets.GITHUB_TOKEN }} | ||
| alert-threshold: '150%' | ||
| fail-on-alert: true | ||
| comment-on-alert: true |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,15 @@ import ( | |
|
|
||
| var _ BlockProducer = (*Executor)(nil) | ||
|
|
||
| // lastBlockCacheEntry caches the last produced block's header hash, data hash, | ||
| // and signature to avoid store reads in CreateBlock. | ||
| type lastBlockCacheEntry struct { | ||
| height uint64 | ||
| headerHash types.Hash | ||
| dataHash types.Hash | ||
| signature types.Signature | ||
| } | ||
|
|
||
| // Executor handles block production, transaction processing, and state management | ||
| type Executor struct { | ||
| // Core components | ||
|
|
@@ -71,6 +80,19 @@ type Executor struct { | |
| // blockProducer is the interface used for block production operations. | ||
| // defaults to self, but can be wrapped with tracing. | ||
| blockProducer BlockProducer | ||
|
|
||
| // lastBlock caches last produced header/data/signature to avoid store reads | ||
| // in CreateBlock. Protected by lastBlockMu. | ||
| lastBlockMu sync.Mutex | ||
| lastBlockInfo *lastBlockCacheEntry | ||
|
|
||
| // cachedSignerInfo caches pubKey and validatorHash (never change after init). | ||
| cachedPubKey crypto.PubKey | ||
| cachedValidatorHash types.Hash | ||
| signerInfoCached bool | ||
|
|
||
| // cachedChainID avoids per-block allocation in RetrieveBatch | ||
| cachedChainID []byte | ||
| } | ||
|
|
||
| // NewExecutor creates a new block executor. | ||
|
|
@@ -133,6 +155,7 @@ func NewExecutor( | |
| txNotifyCh: make(chan struct{}, 1), | ||
| errorCh: errorCh, | ||
| logger: logger.With().Str("component", "executor").Logger(), | ||
| cachedChainID: []byte(genesis.ChainID), | ||
| } | ||
| e.blockProducer = e | ||
| return e, nil | ||
|
|
@@ -432,18 +455,21 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { | |
| header *types.SignedHeader | ||
| data *types.Data | ||
| batchData *BatchData | ||
| err error | ||
| ) | ||
|
|
||
| // Check if there's an already stored block at the newHeight | ||
| // If there is use that instead of creating a new block | ||
| pendingHeader, pendingData, err := e.getPendingBlock(ctx) | ||
| if err == nil && pendingHeader != nil && pendingHeader.Height() == newHeight { | ||
| // Check if there's an already stored pending block at the newHeight. | ||
| // This handles crash recovery — a previous run may have saved a pending block. | ||
| pendingHeader, pendingData, pendErr := e.getPendingBlock(ctx) | ||
| if pendErr == nil && pendingHeader != nil && pendingHeader.Height() == newHeight { | ||
| e.logger.Info().Uint64("height", newHeight).Msg("using pending block") | ||
| header = pendingHeader | ||
| data = pendingData | ||
| } else if err != nil && !errors.Is(err, datastore.ErrNotFound) { | ||
| return fmt.Errorf("failed to get block data: %w", err) | ||
| } else { | ||
| } else if pendErr != nil && !errors.Is(pendErr, datastore.ErrNotFound) { | ||
| return fmt.Errorf("failed to get block data: %w", pendErr) | ||
| } | ||
|
|
||
| if header == nil { | ||
| // get batch from sequencer | ||
| batchData, err = e.blockProducer.RetrieveBatch(ctx) | ||
| if errors.Is(err, common.ErrNoBatch) { | ||
|
|
@@ -459,8 +485,12 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { | |
| if err != nil { | ||
| return fmt.Errorf("failed to create block: %w", err) | ||
| } | ||
| if err := e.savePendingBlock(ctx, header, data); err != nil { | ||
| return fmt.Errorf("failed to save block data: %w", err) | ||
| // Only persist pending block for raft crash recovery — skip for non-raft | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is wrong, we do want to persist it every time. |
||
| // to avoid serialization + store write overhead on every block. | ||
| if e.raftNode != nil { | ||
| if err := e.savePendingBlock(ctx, header, data); err != nil { | ||
| return fmt.Errorf("failed to save block data: %w", err) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -485,12 +515,10 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { | |
| } | ||
| header.Signature = signature | ||
|
|
||
| if err := e.blockProducer.ValidateBlock(ctx, currentState, header, data); err != nil { | ||
| e.sendCriticalError(fmt.Errorf("failed to validate block: %w", err)) | ||
| e.logger.Error().Err(err).Msg("CRITICAL: Permanent block validation error - halting block production") | ||
| return fmt.Errorf("failed to validate block: %w", err) | ||
| } | ||
| // ValidateBlock is only needed for blocks we didn't produce (syncer path). | ||
| // On the sequencer, we just built this block — skip self-validation. | ||
|
|
||
| // Prepare store batch synchronously. Only the commit is deferred. | ||
| batch, err := e.store.NewBatch(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create batch: %w", err) | ||
|
|
@@ -533,29 +561,43 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { | |
| } | ||
| e.logger.Debug().Uint64("height", newHeight).Msg("proposed block to raft") | ||
| } | ||
| if err := e.deletePendingBlock(batch); err != nil { | ||
| e.logger.Warn().Err(err).Uint64("height", newHeight).Msg("failed to delete pending block metadata") | ||
| if e.raftNode != nil { | ||
| if err := e.deletePendingBlock(batch); err != nil { | ||
| e.logger.Warn().Err(err).Uint64("height", newHeight).Msg("failed to delete pending block metadata") | ||
| } | ||
| } | ||
|
|
||
| // Commit synchronously — DA submitter reads store height. | ||
| if err := batch.Commit(); err != nil { | ||
| return fmt.Errorf("failed to commit batch: %w", err) | ||
| } | ||
|
|
||
| // Update in-memory state after successful commit | ||
| // Update in-memory state after successful commit. | ||
| e.setLastState(newState) | ||
|
|
||
| // broadcast header and data to P2P network | ||
| g, broadcastCtx := errgroup.WithContext(e.ctx) | ||
| g.Go(func() error { | ||
| return e.headerBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, &types.P2PSignedHeader{SignedHeader: header}) | ||
| }) | ||
| g.Go(func() error { | ||
| return e.dataBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, &types.P2PData{Data: data}) | ||
| }) | ||
| if err := g.Wait(); err != nil { | ||
| e.logger.Error().Err(err).Msg("failed to broadcast header and/data") | ||
| // don't fail block production on broadcast error | ||
| } | ||
| // Cache this block for the next CreateBlock call (avoids 2 store reads). | ||
| e.lastBlockMu.Lock() | ||
| e.lastBlockInfo = &lastBlockCacheEntry{ | ||
| height: newHeight, | ||
| headerHash: header.Hash(), | ||
| dataHash: data.Hash(), | ||
| signature: signature, | ||
| } | ||
| e.lastBlockMu.Unlock() | ||
|
|
||
| // P2P broadcast is fire-and-forget — doesn't block next block production. | ||
| go func() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. EDIT: maybe it isn't preferred as we could broadcast out of order, and this will fail go-header verification. |
||
| g, broadcastCtx := errgroup.WithContext(e.ctx) | ||
| g.Go(func() error { | ||
| return e.headerBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, &types.P2PSignedHeader{SignedHeader: header}) | ||
| }) | ||
| g.Go(func() error { | ||
| return e.dataBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, &types.P2PData{Data: data}) | ||
| }) | ||
| if err := g.Wait(); err != nil { | ||
| e.logger.Error().Err(err).Msg("failed to broadcast header and/data") | ||
| } | ||
| }() | ||
|
|
||
| e.recordBlockMetrics(newState, data) | ||
|
|
||
|
|
@@ -570,7 +612,7 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { | |
| // RetrieveBatch gets the next batch of transactions from the sequencer. | ||
| func (e *Executor) RetrieveBatch(ctx context.Context) (*BatchData, error) { | ||
| req := coresequencer.GetNextBatchRequest{ | ||
| Id: []byte(e.genesis.ChainID), | ||
| Id: e.cachedChainID, | ||
| MaxBytes: common.DefaultMaxBlobSize, | ||
| LastBatchData: [][]byte{}, // Can be populated if needed for sequencer context | ||
| } | ||
|
|
@@ -612,25 +654,40 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba | |
| if height > e.genesis.InitialHeight { | ||
| headerTime = uint64(batchData.UnixNano()) | ||
|
|
||
| lastHeader, lastData, err := e.store.GetBlockData(ctx, height-1) | ||
| if err != nil { | ||
| return nil, nil, fmt.Errorf("failed to get last block: %w", err) | ||
| } | ||
| lastHeaderHash = lastHeader.Hash() | ||
| lastDataHash = lastData.Hash() | ||
| // Try cache first (hot path — avoids 2 store reads). | ||
| e.lastBlockMu.Lock() | ||
| cached := e.lastBlockInfo | ||
| e.lastBlockMu.Unlock() | ||
|
|
||
| if cached != nil && cached.height == height-1 { | ||
| lastHeaderHash = cached.headerHash | ||
| lastDataHash = cached.dataHash | ||
| lastSignature = cached.signature | ||
| } else { | ||
| // Cache miss (first block after restart) — fall back to store. | ||
| lastHeader, lastData, err := e.store.GetBlockData(ctx, height-1) | ||
| if err != nil { | ||
| return nil, nil, fmt.Errorf("failed to get last block: %w", err) | ||
| } | ||
| lastHeaderHash = lastHeader.Hash() | ||
| lastDataHash = lastData.Hash() | ||
|
|
||
| lastSignaturePtr, err := e.store.GetSignature(ctx, height-1) | ||
| if err != nil { | ||
| return nil, nil, fmt.Errorf("failed to get last signature: %w", err) | ||
| lastSignaturePtr, err := e.store.GetSignature(ctx, height-1) | ||
| if err != nil { | ||
| return nil, nil, fmt.Errorf("failed to get last signature: %w", err) | ||
| } | ||
| lastSignature = *lastSignaturePtr | ||
| } | ||
| lastSignature = *lastSignaturePtr | ||
| } | ||
|
|
||
| // Get signer info and validator hash | ||
| // Get signer info and validator hash (cached after first call). | ||
| var pubKey crypto.PubKey | ||
| var validatorHash types.Hash | ||
|
|
||
| if e.signer != nil { | ||
| if e.signerInfoCached { | ||
| pubKey = e.cachedPubKey | ||
| validatorHash = e.cachedValidatorHash | ||
| } else if e.signer != nil { | ||
| var err error | ||
| pubKey, err = e.signer.GetPublic() | ||
| if err != nil { | ||
|
|
@@ -641,13 +698,18 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba | |
| if err != nil { | ||
| return nil, nil, fmt.Errorf("failed to get validator hash: %w", err) | ||
| } | ||
| e.cachedPubKey = pubKey | ||
| e.cachedValidatorHash = validatorHash | ||
| e.signerInfoCached = true | ||
| } else { | ||
| // For based sequencer without signer, use nil pubkey and compute validator hash | ||
| var err error | ||
| validatorHash, err = e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, nil) | ||
| if err != nil { | ||
| return nil, nil, fmt.Errorf("failed to get validator hash: %w", err) | ||
| } | ||
| e.cachedValidatorHash = validatorHash | ||
| e.signerInfoCached = true | ||
| } | ||
|
|
||
| // Create header | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be removed, we are caching at store level, so this isn't an issue (see cached_store.go)