Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/_shared-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9 # v8.0.0
with:
# Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version
version: v2.1.6
version: v2.7.2

# Optional: working directory, useful for monorepos
# working-directory: somedir
Expand Down
11 changes: 10 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ linters:
- unparam
- unused
- whitespace
- wsl
- wsl_v5
settings:
errcheck:
check-type-assertions: true
Expand All @@ -47,6 +47,10 @@ linters:
nolintlint:
require-explanation: true
require-specific: true
wsl_v5:
allow-first-in-block: true
allow-whole-block: false
branch-max-lines: 2
exclusions:
generated: lax
presets:
Expand All @@ -58,6 +62,11 @@ linters:
- third_party$
- builtin$
- examples$
rules:
- linters:
- revive
text: "var-naming: avoid meaningless package names"
path: "pkg/coordinator/(types|web/types|web/utils|web/api)/"
formatters:
enable:
- gofmt
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func main() {
runChan := make(chan bool)

var execErr error

go func() {
execErr = cmd.Execute(ctx)

Expand All @@ -31,6 +32,7 @@ func main() {
case sig := <-signalChan:
log.Printf("Caught signal: %v, shutdown gracefully...", sig)
cancel()

select {
case <-runChan:
// graceful shutdown completed
Expand Down
1 change: 1 addition & 0 deletions pkg/coordinator/clients/consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (block *Block) GetSeenBy() []*Client {
func (block *Block) SetSeenBy(client *Client) {
block.seenMutex.Lock()
defer block.seenMutex.Unlock()

block.seenMap[client.clientIdx] = client
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/coordinator/clients/consensus/blockcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func NewBlockCache(ctx context.Context, logger logrus.FieldLogger, followDistanc
logger.WithError(err2).Errorf("uncaught panic in BlockCache.runCacheCleanup subroutine: %v, stack: %v", err, string(debug.Stack()))
}
}()

cache.runCacheCleanup(ctx)
}()

Expand Down Expand Up @@ -202,6 +203,7 @@ func (cache *BlockCache) GetWallclock() *ethwallclock.EthereumBeaconChain {

func (cache *BlockCache) SetFinalizedCheckpoint(finalizedEpoch phase0.Epoch, finalizedRoot phase0.Root) {
cache.finalizedMutex.Lock()

if finalizedEpoch <= cache.finalizedEpoch {
cache.finalizedMutex.Unlock()
return
Expand Down
2 changes: 2 additions & 0 deletions pkg/coordinator/clients/consensus/clientlogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func (client *Client) processBlock(root phase0.Root, slot phase0.Slot, header *p
}

client.headMutex.Lock()

if bytes.Equal(client.headRoot[:], root[:]) {
client.headMutex.Unlock()
return nil
Expand All @@ -316,6 +317,7 @@ func (client *Client) processBlock(root phase0.Root, slot phase0.Slot, header *p

func (client *Client) setFinalizedHead(epoch phase0.Epoch, root phase0.Root) error {
client.headMutex.Lock()

if bytes.Equal(client.finalizedRoot[:], root[:]) {
client.headMutex.Unlock()
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/coordinator/clients/consensus/forks.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type HeadFork struct {
func (pool *Pool) resetHeadForkCache() {
pool.forkCacheMutex.Lock()
defer pool.forkCacheMutex.Unlock()

pool.forkCache = map[int64][]*HeadFork{}
}

Expand Down
46 changes: 45 additions & 1 deletion pkg/coordinator/clients/consensus/rpc/beaconapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func (bc *BeaconClient) getJSON(ctx context.Context, requrl string, returnValue
}

func (bc *BeaconClient) postJSON(ctx context.Context, requrl string, postData, returnValue interface{}) error {
return bc.postJSONWithHeaders(ctx, requrl, postData, returnValue, nil)
}

func (bc *BeaconClient) postJSONWithHeaders(ctx context.Context, requrl string, postData, returnValue interface{}, extraHeaders map[string]string) error {
logurl := getRedactedURL(requrl)

postDataBytes, err := json.Marshal(postData)
Expand All @@ -125,8 +129,8 @@ func (bc *BeaconClient) postJSON(ctx context.Context, requrl string, postData, r
}

reader := bytes.NewReader(postDataBytes)
req, err := nethttp.NewRequestWithContext(ctx, "POST", requrl, reader)

req, err := nethttp.NewRequestWithContext(ctx, "POST", requrl, reader)
if err != nil {
return err
}
Expand All @@ -137,6 +141,10 @@ func (bc *BeaconClient) postJSON(ctx context.Context, requrl string, postData, r
req.Header.Set(headerKey, headerVal)
}

for headerKey, headerVal := range extraHeaders {
req.Header.Set(headerKey, headerVal)
}

client := &nethttp.Client{Timeout: time.Second * 300}

resp, err := client.Do(req)
Expand Down Expand Up @@ -497,6 +505,42 @@ func (bc *BeaconClient) SubmitProposerSlashing(ctx context.Context, slashing *ph
return nil
}

type apiAttestationData struct {
Data *phase0.AttestationData `json:"data"`
}

func (bc *BeaconClient) GetAttestationData(ctx context.Context, slot, committeeIndex uint64) (*phase0.AttestationData, error) {
var attestationData apiAttestationData

err := bc.getJSON(ctx, fmt.Sprintf("%s/eth/v1/validator/attestation_data?slot=%d&committee_index=%d", bc.endpoint, slot, committeeIndex), &attestationData)
if err != nil {
return nil, fmt.Errorf("error retrieving attestation data: %v", err)
}

return attestationData.Data, nil
}

// SingleAttestation represents the Electra single attestation format for the v2 API.
type SingleAttestation struct {
CommitteeIndex uint64 `json:"committee_index,string"`
AttesterIndex uint64 `json:"attester_index,string"`
Data *phase0.AttestationData `json:"data"`
Signature string `json:"signature"`
}

func (bc *BeaconClient) SubmitAttestations(ctx context.Context, attestations []*SingleAttestation) error {
headers := map[string]string{
"Eth-Consensus-Version": "electra",
}

err := bc.postJSONWithHeaders(ctx, fmt.Sprintf("%s/eth/v2/beacon/pool/attestations", bc.endpoint), attestations, nil, headers)
if err != nil {
return err
}

return nil
}

type NodeIdentity struct {
PeerID string `json:"peer_id"`
ENR string `json:"enr"`
Expand Down
6 changes: 4 additions & 2 deletions pkg/coordinator/clients/consensus/rpc/beaconstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (bs *BeaconStream) startStream() {
bs.ReadyChan <- true
case err := <-stream.Errors:
logger.WithField("client", bs.client.name).Warnf("beacon block stream error: %v", err)

select {
case bs.ReadyChan <- false:
case <-bs.ctx.Done():
Expand Down Expand Up @@ -136,8 +137,8 @@ func (bs *BeaconStream) subscribeStream(endpoint string, events uint16) *eventst
var stream *eventstream.Stream

streamURL := fmt.Sprintf("%s/eth/v1/events?topics=%v", endpoint, topics.String())
req, err := http.NewRequestWithContext(bs.ctx, "GET", streamURL, http.NoBody)

req, err := http.NewRequestWithContext(bs.ctx, "GET", streamURL, http.NoBody)
if err == nil {
for headerKey, headerVal := range bs.client.headers {
req.Header.Set(headerKey, headerVal)
Expand All @@ -148,6 +149,7 @@ func (bs *BeaconStream) subscribeStream(endpoint string, events uint16) *eventst

if err != nil {
logger.WithField("client", bs.client.name).Warnf("Error while subscribing beacon event stream %v: %v", getRedactedURL(streamURL), err)

select {
case <-bs.ctx.Done():
return nil
Expand All @@ -163,11 +165,11 @@ func (bs *BeaconStream) processBlockEvent(evt eventsource.Event) {
var parsed v1.BlockEvent

err := json.Unmarshal([]byte(evt.Data()), &parsed)

if err != nil {
logger.WithField("client", bs.client.name).Warnf("beacon block stream failed to decode block event: %v", err)
return
}

bs.EventChan <- &BeaconStreamEvent{
Event: StreamBlockEvent,
Data: &parsed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,15 @@ func (stream *Stream) receiveEvents(r io.ReadCloser) {
ev, err := dec.Decode()

stream.closeMutex.Lock()

if stream.isClosed {
stream.closeMutex.Unlock()
return
}

if err != nil {
stream.Errors <- err

stream.closeMutex.Unlock()

return
Expand All @@ -209,6 +211,7 @@ func (stream *Stream) receiveEvents(r io.ReadCloser) {
}

stream.Events <- pub

stream.closeMutex.Unlock()
}
}
Expand Down Expand Up @@ -237,11 +240,14 @@ func (stream *Stream) retryRestartStream() {
}

stream.closeMutex.Lock()

if stream.isClosed {
stream.closeMutex.Unlock()
return
}

stream.Errors <- err

stream.closeMutex.Unlock()

backoff = 10 * time.Second
Expand Down
1 change: 1 addition & 0 deletions pkg/coordinator/clients/consensus/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Dispatcher[T interface{}] struct {
func (d *Dispatcher[T]) Subscribe(capacity int) *Subscription[T] {
d.mutex.Lock()
defer d.mutex.Unlock()

subscription := &Subscription[T]{
channel: make(chan T, capacity),
dispatcher: d,
Expand Down
1 change: 1 addition & 0 deletions pkg/coordinator/clients/execution/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (block *Block) GetSeenBy() []*Client {
func (block *Block) SetSeenBy(client *Client) {
block.seenMutex.Lock()
defer block.seenMutex.Unlock()

block.seenMap[client.clientIdx] = client

if block.seenChan != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/coordinator/clients/execution/blockcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func NewBlockCache(ctx context.Context, logger logrus.FieldLogger, followDistanc
logger.WithError(err2).Errorf("uncaught panic in BlockCache.runCacheCleanup subroutine: %v, stack: %v", err, string(debug.Stack()))
}
}()

cache.runCacheCleanup(ctx)
}()

Expand Down
2 changes: 1 addition & 1 deletion pkg/coordinator/clients/execution/clientlogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func (client *Client) runClientLoop() {

for {
err := client.checkClient()

if err == nil {
err = client.runClientLogic()
}
Expand Down Expand Up @@ -216,6 +215,7 @@ func (client *Client) processBlock(hash common.Hash, number uint64, block *types
}

client.headMutex.Lock()

if bytes.Equal(client.headHash[:], hash[:]) {
client.headMutex.Unlock()
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/coordinator/clients/execution/forks.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type HeadFork struct {
func (pool *Pool) resetHeadForkCache() {
pool.forkCacheMutex.Lock()
defer pool.forkCacheMutex.Unlock()

pool.forkCache = map[int64][]*HeadFork{}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/coordinator/clients/execution/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func (pool *Pool) GetBlockCache() *BlockCache {
func (pool *Pool) AddEndpoint(endpoint *ClientConfig) (*Client, error) {
clientIdx := pool.clientCounter
pool.clientCounter++
client, err := pool.newPoolClient(clientIdx, endpoint)

client, err := pool.newPoolClient(clientIdx, endpoint)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/coordinator/clients/execution/rpc/ethconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func (ec *ExecutionClient) GetEthConfig(ctx context.Context) (*EthConfigResponse
defer reqCtxCancel()

var result json.RawMessage
err := ec.rpcClient.CallContext(reqCtx, &result, "eth_config")

err := ec.rpcClient.CallContext(reqCtx, &result, "eth_config")
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/coordinator/clients/execution/rpc/executionapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (ec *ExecutionClient) GetEthClient() *ethclient.Client {

func (ec *ExecutionClient) GetClientVersion(ctx context.Context) (string, error) {
var result string

err := ec.rpcClient.CallContext(ctx, &result, "web3_clientVersion")

return result, err
Expand Down
1 change: 1 addition & 0 deletions pkg/coordinator/clients/execution/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Dispatcher[T interface{}] struct {
func (d *Dispatcher[T]) Subscribe(capacity int) *Subscription[T] {
d.mutex.Lock()
defer d.mutex.Unlock()

subscription := &Subscription[T]{
channel: make(chan T, capacity),
dispatcher: d,
Expand Down
6 changes: 4 additions & 2 deletions pkg/coordinator/db/task_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ func (db *Database) UpsertTaskResult(tx *sqlx.Tx, result *TaskResult) error {

func (db *Database) GetTaskResultByIndex(runID, taskID uint64, resultType string, index int) (*TaskResult, error) {
var result TaskResult

err := db.reader.Get(&result, `
SELECT * FROM task_results
WHERE run_id = $1 AND task_id = $2 AND result_type = $3 AND result_index = $4`,
runID, taskID, resultType, index)

if err != nil {
return nil, err
}
Expand All @@ -55,11 +55,11 @@ func (db *Database) GetTaskResultByIndex(runID, taskID uint64, resultType string

func (db *Database) GetTaskResultByName(runID, taskID uint64, resultType, name string) (*TaskResult, error) {
var result TaskResult

err := db.reader.Get(&result, `
SELECT * FROM task_results
WHERE run_id = $1 AND task_id = $2 AND result_type = $3 AND name = $4`,
runID, taskID, resultType, name)

if err != nil {
return nil, err
}
Expand All @@ -69,6 +69,7 @@ func (db *Database) GetTaskResultByName(runID, taskID uint64, resultType, name s

func (db *Database) GetTaskResults(runID, taskID uint64, summaryType string) ([]TaskResult, error) {
var results []TaskResult

err := db.reader.Select(&results, `
SELECT * FROM task_results
WHERE run_id = $1 AND task_id = $2 AND result_type = $3`,
Expand All @@ -79,6 +80,7 @@ func (db *Database) GetTaskResults(runID, taskID uint64, summaryType string) ([]

func (db *Database) GetAllTaskResultHeaders(runID uint64) ([]TaskResultHeader, error) {
var headers []TaskResultHeader

err := db.reader.Select(&headers, `
SELECT task_id, result_type, result_index, name, size FROM task_results
WHERE run_id = $1`,
Expand Down
Loading
Loading