Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pdp/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/big"
"time"

"github.com/data-preservation-programs/go-synapse/constants"
"github.com/data-preservation-programs/go-synapse/contracts"
Expand All @@ -20,6 +21,8 @@ import (
// createDataSet. defined in Fees.sol as SYBIL_FEE.
var SybilFee = big.NewInt(100000000000000000) // 0.1 FIL in attoFIL

const defaultReceiptTimeout = 90 * time.Second

// ProofSetManager provides high-level operations for managing PDP proof sets
type ProofSetManager interface {
// CreateProofSet creates a new proof set on-chain
Expand Down Expand Up @@ -229,7 +232,7 @@ func (m *Manager) CreateProofSet(ctx context.Context, opts CreateProofSetOptions
// Mark as sent only after successful contract call
txSent = true

receipt, err := txutil.WaitForReceipt(ctx, m.client, tx.Hash(), txutil.DefaultRetryConfig().MaxBackoff*3)
receipt, err := txutil.WaitForReceipt(ctx, m.client, tx.Hash(), defaultReceiptTimeout)
if err != nil {
// Error waiting for receipt - transaction may be pending, don't release nonce
return nil, fmt.Errorf("failed to wait for receipt: %w", err)
Expand Down Expand Up @@ -355,7 +358,7 @@ func (m *Manager) AddRoots(ctx context.Context, proofSetID *big.Int, roots []Roo
// Mark as sent only after successful contract call
txSent = true

receipt, err := txutil.WaitForReceipt(ctx, m.client, tx.Hash(), txutil.DefaultRetryConfig().MaxBackoff*3)
receipt, err := txutil.WaitForReceipt(ctx, m.client, tx.Hash(), defaultReceiptTimeout)
if err != nil {
// Error waiting for receipt - transaction may be pending, don't release nonce
return nil, fmt.Errorf("failed to wait for receipt: %w", err)
Expand Down Expand Up @@ -436,7 +439,7 @@ func (m *Manager) DeleteProofSet(ctx context.Context, proofSetID *big.Int, extra
// Mark as sent only after successful contract call
txSent = true

_, err = txutil.WaitForReceipt(ctx, m.client, tx.Hash(), txutil.DefaultRetryConfig().MaxBackoff*3)
_, err = txutil.WaitForReceipt(ctx, m.client, tx.Hash(), defaultReceiptTimeout)
if err != nil {
// Error waiting for receipt - transaction may be pending, don't release nonce
return fmt.Errorf("failed to wait for receipt: %w", err)
Expand Down
123 changes: 37 additions & 86 deletions pkg/txutil/confirmation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/ethereum/go-ethereum"
Expand All @@ -12,22 +13,17 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
)

// Error types for receipt waiting
var (
// ErrReceiptTimeout is returned when waiting for a receipt times out
ErrReceiptTimeout = errors.New("timeout waiting for transaction receipt")
// ErrReceiptRPCFailure is returned when too many consecutive RPC errors occur
ErrReceiptTimeout = errors.New("timeout waiting for transaction receipt")
ErrReceiptRPCFailure = errors.New("receipt fetch failed due to repeated RPC errors")
)

// ReceiptWaitConfig configures the WaitForReceipt behavior
type ReceiptWaitConfig struct {
Timeout time.Duration // Total timeout for waiting (default: 5 minutes)
PollInterval time.Duration // How often to poll (default: 1 second)
MaxConsecutiveErrors int // Max consecutive RPC errors before failing (default: 5)
Timeout time.Duration
PollInterval time.Duration
MaxConsecutiveErrors int
}

// DefaultReceiptWaitConfig returns the default configuration
func DefaultReceiptWaitConfig() ReceiptWaitConfig {
return ReceiptWaitConfig{
Timeout: 5 * time.Minute,
Expand All @@ -36,78 +32,8 @@ func DefaultReceiptWaitConfig() ReceiptWaitConfig {
}
}

// WaitForConfirmation waits for a transaction to be confirmed with the specified number of confirmations
func WaitForConfirmation(ctx context.Context, client *ethclient.Client, txHash common.Hash, confirmations uint64) (*types.Receipt, error) {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()

consecutiveErrors := 0
pollCount := 0
var lastErr error

for {
select {
case <-ctx.Done():
if lastErr != nil {
return nil, fmt.Errorf("%w after %d polls: %v (last error: %v)", ErrReceiptTimeout, pollCount, ctx.Err(), lastErr)
}
return nil, fmt.Errorf("%w after %d polls: %v", ErrReceiptTimeout, pollCount, ctx.Err())
case <-ticker.C:
pollCount++
receipt, err := client.TransactionReceipt(ctx, txHash)
if err != nil {
// Distinguish between "not found yet" and actual RPC errors
if errors.Is(err, ethereum.NotFound) {
// Transaction not mined yet - this is expected, reset error counter
consecutiveErrors = 0
continue
}
if !IsRetryableError(err) {
return nil, fmt.Errorf("%w: non-retryable error: %v", ErrReceiptRPCFailure, err)
}
// Actual RPC error
consecutiveErrors++
lastErr = err
if consecutiveErrors >= 5 {
return nil, fmt.Errorf("%w: %d consecutive errors, last error: %v", ErrReceiptRPCFailure, consecutiveErrors, lastErr)
}
continue
}

consecutiveErrors = 0

if receipt.Status != types.ReceiptStatusSuccessful {
return receipt, fmt.Errorf("transaction failed with status %d", receipt.Status)
}

if confirmations == 0 {
return receipt, nil
}

currentBlock, err := client.BlockNumber(ctx)
if err != nil {
if !IsRetryableError(err) {
return nil, fmt.Errorf("%w: non-retryable error: %v", ErrReceiptRPCFailure, err)
}
consecutiveErrors++
lastErr = err
if consecutiveErrors >= 5 {
return nil, fmt.Errorf("%w: %d consecutive errors, last error: %v", ErrReceiptRPCFailure, consecutiveErrors, lastErr)
}
continue
}

consecutiveErrors = 0

if receipt.BlockNumber.Uint64()+confirmations <= currentBlock {
return receipt, nil
}
}
}
}

// WaitForReceipt waits for a transaction receipt without confirmation requirements.
// Uses a default timeout of 5 minutes. For custom configuration, use WaitForReceiptWithConfig.
// WaitForReceipt polls until the receipt for txHash is available or timeout
// elapses. Default timeout is 5 minutes when timeout is zero.
func WaitForReceipt(ctx context.Context, client *ethclient.Client, txHash common.Hash, timeout time.Duration) (*types.Receipt, error) {
config := DefaultReceiptWaitConfig()
if timeout > 0 {
Expand All @@ -116,7 +42,6 @@ func WaitForReceipt(ctx context.Context, client *ethclient.Client, txHash common
return WaitForReceiptWithConfig(ctx, client, txHash, config)
}

// WaitForReceiptWithConfig waits for a transaction receipt with custom configuration
func WaitForReceiptWithConfig(ctx context.Context, client *ethclient.Client, txHash common.Hash, config ReceiptWaitConfig) (*types.Receipt, error) {
ctx, cancel := context.WithTimeout(ctx, config.Timeout)
defer cancel()
Expand Down Expand Up @@ -148,16 +73,14 @@ func WaitForReceiptWithConfig(ctx context.Context, client *ethclient.Client, txH
pollCount++
receipt, err := client.TransactionReceipt(ctx, txHash)
if err != nil {
// Distinguish between "not found yet" and actual RPC errors
if errors.Is(err, ethereum.NotFound) {
// Transaction not mined yet - this is expected, reset error counter
// not mined yet -- expected, reset error counter
consecutiveErrors = 0
continue
}
if !IsRetryableError(err) {
if !isRetryableError(err) {
return nil, fmt.Errorf("%w: non-retryable error: %v", ErrReceiptRPCFailure, err)
}
// Actual RPC error
consecutiveErrors++
lastErr = err
if consecutiveErrors >= maxErrors {
Expand All @@ -173,3 +96,31 @@ func WaitForReceiptWithConfig(ctx context.Context, client *ethclient.Client, txH
}
}
}

// isRetryableError returns true for transient RPC errors worth retrying.
// Matches by string fragment because go-ethereum surfaces these as plain errors.
func isRetryableError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return false
}

errStr := strings.ToLower(err.Error())
for _, retryable := range []string{
"nonce too low",
"replacement transaction underpriced",
"already known",
"timeout",
"connection refused",
"connection reset",
"broken pipe",
"i/o timeout",
} {
if strings.Contains(errStr, retryable) {
return true
}
}
return false
}
70 changes: 70 additions & 0 deletions pkg/txutil/confirmation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package txutil

import (
"context"
"errors"
"testing"
)

func TestIsRetryableError(t *testing.T) {
tests := []struct {
name string
err error
expected bool
}{
{
name: "nil error",
err: nil,
expected: false,
},
{
name: "nonce too low",
err: errors.New("nonce too low"),
expected: true,
},
{
name: "replacement transaction underpriced",
err: errors.New("replacement transaction underpriced"),
expected: true,
},
{
name: "already known",
err: errors.New("already known"),
expected: true,
},
{
name: "timeout error",
err: errors.New("timeout occurred"),
expected: true,
},
{
name: "connection refused",
err: errors.New("connection refused"),
expected: true,
},
{
name: "non-retryable error",
err: errors.New("insufficient funds"),
expected: false,
},
{
name: "context deadline exceeded",
err: context.DeadlineExceeded,
expected: false,
},
{
name: "context canceled",
err: context.Canceled,
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := isRetryableError(tt.err)
if result != tt.expected {
t.Errorf("isRetryableError() = %v, want %v", result, tt.expected)
}
})
}
}
60 changes: 7 additions & 53 deletions pkg/txutil/nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package txutil
import (
"context"
"fmt"
"math/big"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
)

// NonceManager manages nonces for transaction sending
// NonceManager allocates and tracks transaction nonces for a single sender.
type NonceManager struct {
client *ethclient.Client
address common.Address
Expand All @@ -19,7 +18,6 @@ type NonceManager struct {
pendingTxs map[uint64]bool
}

// NewNonceManager creates a new nonce manager
func NewNonceManager(client *ethclient.Client, address common.Address) *NonceManager {
return &NonceManager{
client: client,
Expand All @@ -28,7 +26,8 @@ func NewNonceManager(client *ethclient.Client, address common.Address) *NonceMan
}
}

// GetNonce returns the next available nonce
// GetNonce returns the next available nonce, fetching from the network on
// first call (or after MarkFailed clears the cache).
func (nm *NonceManager) GetNonce(ctx context.Context) (uint64, error) {
nm.mu.Lock()
defer nm.mu.Unlock()
Expand All @@ -48,65 +47,20 @@ func (nm *NonceManager) GetNonce(ctx context.Context) (uint64, error) {
return currentNonce, nil
}

// MarkConfirmed marks a nonce as confirmed (transaction mined)
func (nm *NonceManager) MarkConfirmed(nonce uint64) {
nm.mu.Lock()
defer nm.mu.Unlock()
delete(nm.pendingTxs, nonce)
}

// MarkFailed releases a nonce that was never successfully sent to the network.
// This should be called when a transaction fails before being sent (e.g., gas estimation
// failure, signing error) to prevent nonce leaks that would block future transactions.
//
// IMPORTANT: Only call this for local failures before the transaction is sent.
// Do NOT call this for network errors after sending - those transactions may still
// be pending in the mempool and should be tracked until confirmed or replaced.
//
// The cached nonce is cleared to force a refresh from the network on the next GetNonce.
// Call only for local failures before send (gas estimation, signing); not for
// network errors after sending, since those tx may still be pending in the
// mempool. The cached nonce is cleared so the next GetNonce refreshes from
// the network.
func (nm *NonceManager) MarkFailed(nonce uint64) {
nm.mu.Lock()
defer nm.mu.Unlock()
delete(nm.pendingTxs, nonce)
nm.nonce = nil
}

// Reset resets the nonce manager (fetches fresh nonce from network)
func (nm *NonceManager) Reset(ctx context.Context) error {
nm.mu.Lock()
defer nm.mu.Unlock()

nonce, err := nm.client.PendingNonceAt(ctx, nm.address)
if err != nil {
return fmt.Errorf("failed to reset nonce: %w", err)
}

nm.nonce = &nonce
nm.pendingTxs = make(map[uint64]bool)
return nil
}

// GetPendingCount returns the number of pending transactions
func (nm *NonceManager) GetPendingCount() int {
nm.mu.Lock()
defer nm.mu.Unlock()
return len(nm.pendingTxs)
}

// GetFreshNonce gets a fresh nonce from the network without caching
func GetFreshNonce(ctx context.Context, client *ethclient.Client, address common.Address) (uint64, error) {
nonce, err := client.PendingNonceAt(ctx, address)
if err != nil {
return 0, fmt.Errorf("failed to get nonce: %w", err)
}
return nonce, nil
}

// GetChainID returns the chain ID from the client
func GetChainID(ctx context.Context, client *ethclient.Client) (*big.Int, error) {
chainID, err := client.ChainID(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get chain ID: %w", err)
}
return chainID, nil
}
Loading
Loading