Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 76 additions & 29 deletions p2p/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"io"
"math/big"
"time"

Expand All @@ -14,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/eth/protocols/eth"
ethp2p "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -370,16 +372,25 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {
}

func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error {
var txs eth.TransactionsPacket
if err := msg.Decode(&txs); err != nil {
return err
payload, err := io.ReadAll(msg.Payload)
if err != nil {
return fmt.Errorf("failed to read transactions payload: %w", err)
}

var rawTxs []rlp.RawValue
if err := rlp.DecodeBytes(payload, &rawTxs); err != nil {
c.logger.Warn().Err(err).Msg("Failed to decode transactions")
return nil
}

txs := decodeTxs(rawTxs)
tfs := time.Now()

c.countMsgReceived(txs.Name(), float64(len(txs)))
c.countMsgReceived((&eth.TransactionsPacket{}).Name(), float64(len(txs)))

c.db.WriteTransactions(ctx, c.node, txs, tfs)
if len(txs) > 0 {
c.db.WriteTransactions(ctx, c.node, txs, tfs)
}

return nil
}
Expand Down Expand Up @@ -449,18 +460,18 @@ func (c *conn) handleGetBlockBodies(msg ethp2p.Msg) error {
}

func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error {
var packet eth.BlockBodiesPacket
var packet eth.BlockBodiesRLPPacket
if err := msg.Decode(&packet); err != nil {
return err
}

tfs := time.Now()

if len(packet.BlockBodiesResponse) == 0 {
if len(packet.BlockBodiesRLPResponse) == 0 {
return nil
}

c.countMsgReceived(packet.Name(), float64(len(packet.BlockBodiesResponse)))
c.countMsgReceived((&eth.BlockBodiesPacket{}).Name(), float64(len(packet.BlockBodiesRLPResponse)))

hash, ok := c.requests.Get(packet.RequestId)
if !ok {
Expand All @@ -474,7 +485,18 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error {
return nil
}

body := packet.BlockBodiesResponse[0]
var decoded rawBlockBody
if err := rlp.DecodeBytes(packet.BlockBodiesRLPResponse[0], &decoded); err != nil {
c.logger.Warn().Err(err).Msg("Failed to decode block body")
return nil
}

body := &eth.BlockBody{
Transactions: decodeTxs(decoded.Transactions),
Uncles: decoded.Uncles,
Withdrawals: decoded.Withdrawals,
}

c.db.WriteBlockBody(ctx, body, hash, tfs)

// Update cache to store body
Expand All @@ -487,26 +509,39 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error {
}

func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error {
var block eth.NewBlockPacket
if err := msg.Decode(&block); err != nil {
return err
payload, err := io.ReadAll(msg.Payload)
if err != nil {
return fmt.Errorf("failed to read new block payload: %w", err)
}

var raw rawNewBlockPacket
if err := rlp.DecodeBytes(payload, &raw); err != nil {
c.logger.Warn().Err(err).Msg("Failed to decode new block")
return nil
}

block := types.NewBlockWithHeader(raw.Block.Header).WithBody(types.Body{
Transactions: decodeTxs(raw.Block.Txs),
Uncles: raw.Block.Uncles,
Withdrawals: raw.Block.Withdrawals,
})
packet := &eth.NewBlockPacket{Block: block, TD: raw.TD}

tfs := time.Now()
hash := block.Block.Hash()
hash := packet.Block.Hash()

c.countMsgReceived(block.Name(), 1)
c.countMsgReceived(packet.Name(), 1)

// Set the head block if newer.
if c.conns.UpdateHeadBlock(block) {
if c.conns.UpdateHeadBlock(*packet) {
c.logger.Info().
Str("hash", hash.Hex()).
Uint64("number", block.Block.Number().Uint64()).
Str("td", block.TD.String()).
Uint64("number", packet.Block.Number().Uint64()).
Str("td", packet.TD.String()).
Msg("Updated head block")
}

if err := c.getParentBlock(ctx, block.Block.Header()); err != nil {
if err := c.getParentBlock(ctx, packet.Block.Header()); err != nil {
return err
}

Expand All @@ -515,17 +550,17 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error {
return nil
}

c.db.WriteBlock(ctx, c.node, block.Block, block.TD, tfs)
c.db.WriteBlock(ctx, c.node, packet.Block, packet.TD, tfs)

// Update cache to store the full block
c.conns.Blocks().Add(hash, BlockCache{
Header: block.Block.Header(),
Header: packet.Block.Header(),
Body: &eth.BlockBody{
Transactions: block.Block.Transactions(),
Uncles: block.Block.Uncles(),
Withdrawals: block.Block.Withdrawals(),
Transactions: packet.Block.Transactions(),
Uncles: packet.Block.Uncles(),
Withdrawals: packet.Block.Withdrawals(),
},
TD: block.TD,
TD: packet.TD,
})

return nil
Expand All @@ -549,7 +584,7 @@ func (c *conn) handleNewPooledTransactionHashes(version uint, msg ethp2p.Msg) er
var name string

switch version {
case 67, 68:
case 67, 68, 69:
var txs eth.NewPooledTransactionHashesPacket
if err := msg.Decode(&txs); err != nil {
return err
Expand All @@ -572,16 +607,28 @@ func (c *conn) handleNewPooledTransactionHashes(version uint, msg ethp2p.Msg) er
}

func (c *conn) handlePooledTransactions(ctx context.Context, msg ethp2p.Msg) error {
var packet eth.PooledTransactionsPacket
if err := msg.Decode(&packet); err != nil {
return err
payload, err := io.ReadAll(msg.Payload)
if err != nil {
return fmt.Errorf("failed to read pooled transactions payload: %w", err)
}

var raw rawPooledTransactionsPacket
if err := rlp.DecodeBytes(payload, &raw); err != nil {
c.logger.Warn().Err(err).Msg("Failed to decode pooled transactions")
return nil
}

packet := &eth.PooledTransactionsPacket{
PooledTransactionsResponse: decodeTxs(raw.Txs),
}

tfs := time.Now()

c.countMsgReceived(packet.Name(), float64(len(packet.PooledTransactionsResponse)))

c.db.WriteTransactions(ctx, c.node, packet.PooledTransactionsResponse, tfs)
if len(packet.PooledTransactionsResponse) > 0 {
c.db.WriteTransactions(ctx, c.node, packet.PooledTransactionsResponse, tfs)
}

return nil
}
Expand Down
80 changes: 80 additions & 0 deletions p2p/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@ package p2p
import (
"crypto/ecdsa"
"fmt"
"math/big"
"slices"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/stateless"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/rlpx"
"github.com/ethereum/go-ethereum/rlp"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

type Message interface {
Expand Down Expand Up @@ -402,3 +406,79 @@ func (c *rlpxConn) hasCap(name string, version uint) bool {
cap := p2p.Cap{Name: name, Version: version}
return slices.Contains(c.peerCaps, cap) && slices.Contains(c.caps, cap)
}

// rawBlockBody is used to decode block bodies with any transaction type.
type rawBlockBody struct {
Transactions []rlp.RawValue
Uncles []*types.Header
Withdrawals []*types.Withdrawal `rlp:"optional"`
}

// rawBlock is used to decode blocks with any transaction type.
type rawBlock struct {
Header *types.Header
Txs []rlp.RawValue
Uncles []*types.Header
Withdrawals []*types.Withdrawal `rlp:"optional"`
}

// rawNewBlockPacket is used to decode NewBlockMsg with any transaction type.
type rawNewBlockPacket struct {
Block rawBlock
TD *big.Int
}

// rawPooledTransactionsPacket is used to decode PooledTransactionsMsg with any transaction type.
type rawPooledTransactionsPacket struct {
RequestId uint64
Txs []rlp.RawValue
}

// decodeTx attempts to decode a transaction from an RLP-encoded raw value.
func decodeTx(raw []byte) *types.Transaction {
if len(raw) == 0 {
return nil
}

var bytes []byte
if rlp.DecodeBytes(raw, &bytes) == nil {
tx := new(types.Transaction)
if tx.UnmarshalBinary(bytes) == nil {
return tx
}

log.Warn().
Uint8("type", bytes[0]).
Int("size", len(bytes)).
Str("hash", crypto.Keccak256Hash(bytes).Hex()).
Msg("Failed to decode transaction")

return nil
}

tx := new(types.Transaction)
if tx.UnmarshalBinary(raw) == nil {
return tx
}

log.Warn().
Uint8("prefix", raw[0]).
Int("size", len(raw)).
Str("hash", crypto.Keccak256Hash(raw).Hex()).
Msg("Failed to decode transaction")

return nil
}

// decodeTxs decodes a list of transactions, returning only successfully decoded ones.
func decodeTxs(rawTxs []rlp.RawValue) []*types.Transaction {
var txs []*types.Transaction

for _, raw := range rawTxs {
if tx := decodeTx(raw); tx != nil {
txs = append(txs, tx)
}
}

return txs
}