Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions giga/executor/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package executor

import (
"context"

"github.com/sei-protocol/sei-chain/giga/executor/tracks"
)

type Config struct {
StatelessWorkerCount int
ReceiptSinkWorkerCount int
blocksBuffer int
receiptsBuffer int
changeSetsBuffer int
}

func RunExecutor[RawBlock, Block tracks.Identifiable, Receipt, ChangeSet any](
ctx context.Context,
config Config,
rawBlocks <-chan RawBlock, // channel where the consensus layer produces to
statelessFn func(RawBlock) Block, // TODO: function that checks sig, nonce, etc.
schedulerFn func(Block, chan<- Receipt) ChangeSet, // TODO: main processing logic
commitFn func(ChangeSet), // TODO: commit to working set after a block is fully done
receiptSinkFn func(Receipt), // TODO: persist receipts to disk
historicalStateSinkFn func(ChangeSet), // TODO: persist historical state to disk
prevBlock uint64, // TODO: the last executed block id,
) {
blocks := make(chan Block, config.blocksBuffer)
receipts := make(chan Receipt, config.receiptsBuffer)
changeSets := make(chan ChangeSet, config.changeSetsBuffer)

// spins off `StatelessWorkerCount` goroutines.
statelessTrack := tracks.NewStatelessTrack(rawBlocks, blocks, statelessFn, config.StatelessWorkerCount, prevBlock)
// spins off 1 goroutine.
executionTrack := tracks.NewExecutionTrack(blocks, receipts, changeSets, schedulerFn, commitFn)
// spins off `ReceiptSinkWorkerCount` goroutines.
receiptSinkTrack := tracks.NewReceiptSinkTrack(receipts, receiptSinkFn, config.ReceiptSinkWorkerCount)
// spins off 1 goroutine.
historicalStateSinkTrack := tracks.NewHistoricalStateSinkTrack(changeSets, historicalStateSinkFn)

statelessTrack.Start()
executionTrack.Start()
receiptSinkTrack.Start()
historicalStateSinkTrack.Start()

<-ctx.Done()

statelessTrack.Stop()
executionTrack.Stop()
receiptSinkTrack.Stop()
historicalStateSinkTrack.Stop()
}
34 changes: 34 additions & 0 deletions giga/executor/tracks/execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package tracks

type ExecutionTrack[Block, Receipt, ChangeSet any] struct {
blocks <-chan Block
receipts chan<- Receipt
changeSets chan<- ChangeSet
schedulerFn func(Block, chan<- Receipt) ChangeSet
commitFn func(ChangeSet)
}

func NewExecutionTrack[Block, Receipt, ChangeSet any](
blocks <-chan Block,
receipts chan<- Receipt,
changeSets chan<- ChangeSet,
schedulerFn func(Block, chan<- Receipt) ChangeSet,
commitFn func(ChangeSet),
) *ExecutionTrack[Block, Receipt, ChangeSet] {
return &ExecutionTrack[Block, Receipt, ChangeSet]{blocks, receipts, changeSets, schedulerFn, commitFn}
}

func (t *ExecutionTrack[Block, Receipt, ChangeSet]) Start() {
go func() {
for block := range t.blocks {
rcs := t.schedulerFn(block, t.receipts)
t.commitFn(rcs)
t.changeSets <- rcs
}
}()
Comment on lines +22 to +28

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}

func (t *ExecutionTrack[Block, Receipt, ChangeSet]) Stop() {
close(t.receipts)
close(t.changeSets)
}
47 changes: 47 additions & 0 deletions giga/executor/tracks/sinks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package tracks

type ReceiptSinkTrack[Receipt any] struct {
receipts <-chan Receipt
sinkFn func(Receipt)
workerCount int
}

func NewReceiptSinkTrack[Receipt any](
receipts <-chan Receipt,
sinkFn func(Receipt),
workerCount int,
) *ReceiptSinkTrack[Receipt] {
return &ReceiptSinkTrack[Receipt]{receipts, sinkFn, workerCount}
}

func (t *ReceiptSinkTrack[Receipt]) Start() {
go func() {
for receipt := range t.receipts {
t.sinkFn(receipt)
}
}()
Comment on lines +18 to +22

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}

func (t *ReceiptSinkTrack[Receipt]) Stop() {}

type HistoricalStateSinkTrack[ChangeSet any] struct {
changeSets <-chan ChangeSet
sinkFn func(ChangeSet)
}

func NewHistoricalStateSinkTrack[ChangeSet any](
changeSets <-chan ChangeSet,
sinkFn func(ChangeSet),
) *HistoricalStateSinkTrack[ChangeSet] {
return &HistoricalStateSinkTrack[ChangeSet]{changeSets, sinkFn}
}

func (t *HistoricalStateSinkTrack[ChangeSet]) Start() {
go func() {
for changeSet := range t.changeSets {
t.sinkFn(changeSet)
}
}()
Comment on lines +40 to +44

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}

func (t *HistoricalStateSinkTrack[ChangeSet]) Stop() {}
59 changes: 59 additions & 0 deletions giga/executor/tracks/stateless.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package tracks

import (
"sync"
"time"
)

type Identifiable interface {
GetID() uint64
}

type StatelessTrack[RawBlock, Block Identifiable] struct {
inputs <-chan RawBlock
outputs chan<- Block
processFn func(RawBlock) Block
workerCount int
prevBlock uint64
}

func NewStatelessTrack[RawBlock, Block Identifiable](
inputs <-chan RawBlock,
outputs chan<- Block,
processFn func(RawBlock) Block,
workerCount int,
prevBlock uint64,
) *StatelessTrack[RawBlock, Block] {
return &StatelessTrack[RawBlock, Block]{inputs, outputs, processFn, workerCount, prevBlock}
}

func (t *StatelessTrack[RawBlock, Block]) Start() {
// completion signals are used to ensure outputs are in order.
completionSignals := sync.Map{}
lastBlockSignal := make(chan struct{}, 1)
lastBlockSignal <- struct{}{}
completionSignals.Store(t.prevBlock, lastBlockSignal)
for range t.workerCount {
go func() {
for input := range t.inputs {
completionSignal := make(chan struct{}, 1)
completionSignals.Store(input.GetID(), completionSignal)
output := t.processFn(input)
prevCompletionSignal, ok := completionSignals.Load(input.GetID() - 1)
// in practice it's almost impossible for ok == false, but we'll handle it anyway
for !ok {
time.Sleep(10 * time.Millisecond)
prevCompletionSignal, ok = completionSignals.Load(input.GetID() - 1)
}
<-prevCompletionSignal.(chan struct{})
t.outputs <- output
completionSignal <- struct{}{}
completionSignals.Delete(input.GetID() - 1)
}
}()
Comment on lines +37 to +53

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}
}

func (t *StatelessTrack[RawBlock, Block]) Stop() {
close(t.outputs)
}
36 changes: 36 additions & 0 deletions giga/executor/tracks/stateless_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package tracks_test

import (
"math/rand"
"testing"
"time"

"github.com/sei-protocol/sei-chain/giga/executor/tracks"
"github.com/tendermint/tendermint/libs/utils/require"
)

type testBlock struct{ id uint64 }

func (b *testBlock) GetID() uint64 { return b.id }

func TestStatelessTrack(t *testing.T) {
inputs := make(chan *testBlock, 100)
outputs := make(chan *testBlock, 100)
processFn := func(input *testBlock) *testBlock {
sleepDuration := time.Duration(rand.Intn(100)) * time.Millisecond
time.Sleep(sleepDuration)
return &testBlock{id: input.GetID()}
}
workerCount := 10
lastBlock := uint64(100)
statelessTrack := tracks.NewStatelessTrack(inputs, outputs, processFn, workerCount, lastBlock)
statelessTrack.Start()
defer statelessTrack.Stop()
for i := range 100 {
inputs <- &testBlock{id: uint64(i) + 1 + lastBlock}
}
for i := range 100 {
output := <-outputs
require.Equal(t, uint64(i)+1+lastBlock, output.GetID())
}
}
Loading