Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ethmonitor/ethmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/0xsequence/ethkit/util"
"github.com/goware/breaker"
cachestore "github.com/goware/cachestore2"
"github.com/goware/calc"
"github.com/goware/channel"
"github.com/goware/superr"
"github.com/zeebo/xxh3"
Expand Down Expand Up @@ -626,7 +625,7 @@ func (m *Monitor) buildCanonicalChain(ctx context.Context, nextBlock *types.Bloc

// let's always take a pause between any reorg for the polling interval time
// to allow nodes to sync to the correct chain
pause := calc.Max(2*m.options.PollingInterval, 2*time.Second)
pause := max(2*m.options.PollingInterval, 2*time.Second)
time.Sleep(pause)

// Fetch/connect the broken chain backwards by traversing recursively via parent hashes
Expand Down
182 changes: 105 additions & 77 deletions ethreceipts/ethreceipts.go

Large diffs are not rendered by default.

36 changes: 21 additions & 15 deletions ethreceipts/ethreceipts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,22 +199,26 @@ func TestFetchTransactionReceiptBasic(t *testing.T) {
go func(i int, txnHash common.Hash) {
defer wg.Done()

receipt, waitFinality, err := receiptsListener.FetchTransactionReceipt(ctx, txnHash, 7)
receipt, err := receiptsListener.FetchTransactionReceipt(ctx, txnHash, 7)
require.NoError(t, err)
require.NotNil(t, receipt)
require.True(t, receipt.Status() == types.ReceiptStatusSuccessful)
require.False(t, receipt.Final)

t.Logf("=> MINED %d :: %s", i, receipt.TransactionHash().String())

_ = waitFinality
finalReceipt, err := waitFinality(context.Background())
require.NoError(t, err)
require.NotNil(t, finalReceipt)
require.True(t, finalReceipt.Status() == types.ReceiptStatusSuccessful)
require.True(t, finalReceipt.Final)

t.Logf("=> FINAL %d :: %s", i, receipt.TransactionHash().String())
// receipt, waitFinality, err := receiptsListener.FetchTransactionReceiptWithFinality(ctx, txnHash, 7)
// require.NoError(t, err)
// require.NotNil(t, receipt)
// require.True(t, receipt.Status() == types.ReceiptStatusSuccessful)
// require.False(t, receipt.Final)
// t.Logf("=> MINED %d :: %s", i, receipt.TransactionHash().String())
//
// finalReceipt, err := waitFinality(context.Background())
// require.NoError(t, err)
// require.NotNil(t, finalReceipt)
// require.True(t, finalReceipt.Status() == types.ReceiptStatusSuccessful)
// require.True(t, finalReceipt.Final)
// t.Logf("=> FINAL %d :: %s", i, receipt.TransactionHash().String())
}(i, txnHash)
}
wg.Wait()
Expand All @@ -226,7 +230,7 @@ func TestFetchTransactionReceiptBasic(t *testing.T) {
require.Equal(t, 1, monitor.NumSubscribers())

// Testing exhausted filter after maxWait period is unable to find non-existant txn hash
receipt, waitFinality, err := receiptsListener.FetchTransactionReceipt(ctx, ethkit.Hash{1, 2, 3, 4}, 5)
receipt, waitFinality, err := receiptsListener.FetchTransactionReceiptWithFinality(ctx, ethkit.Hash{1, 2, 3, 4}, 5)
require.Error(t, err)
require.True(t, errors.Is(err, ethreceipts.ErrFilterExhausted))
require.Nil(t, receipt)
Expand All @@ -245,7 +249,7 @@ func TestFetchTransactionReceiptBasic(t *testing.T) {
monitor.PurgeHistory()
receiptsListener.PurgeHistory()

receipt, waitFinality, err = receiptsListener.FetchTransactionReceipt(ctx, txnHashes[0])
receipt, waitFinality, err = receiptsListener.FetchTransactionReceiptWithFinality(ctx, txnHashes[0])
require.NoError(t, err)
require.NotNil(t, receipt)
finalReceipt, err = waitFinality(context.Background())
Expand All @@ -256,7 +260,7 @@ func TestFetchTransactionReceiptBasic(t *testing.T) {
// wait enough time, so that the fetched receipt will come as finalized right away
time.Sleep(5 * time.Second)

receipt, waitFinality, err = receiptsListener.FetchTransactionReceipt(ctx, txnHashes[1])
receipt, waitFinality, err = receiptsListener.FetchTransactionReceiptWithFinality(ctx, txnHashes[1])
require.NoError(t, err)
require.NotNil(t, receipt)
require.True(t, receipt.Final)
Expand Down Expand Up @@ -349,7 +353,7 @@ func TestFetchTransactionReceiptBlast(t *testing.T) {
go func(i int, txnHash common.Hash) {
defer wg.Done()

receipt, receiptFinality, err := receiptsListener.FetchTransactionReceipt(ctx, txnHash)
receipt, receiptFinality, err := receiptsListener.FetchTransactionReceiptWithFinality(ctx, txnHash)
assert.NoError(t, err)
assert.NotNil(t, receipt)
assert.True(t, receipt.Status() == types.ReceiptStatusSuccessful)
Expand Down Expand Up @@ -574,7 +578,7 @@ func TestReceiptsListenerERC20(t *testing.T) {
ethreceipts.FilterLogTopic(erc20TransferTopic).Finalize(true).ID(9999).MaxWait(3),

// won't be found..
ethreceipts.FilterFrom(ethkit.Address{}).MaxWait(0).ID(8888),
ethreceipts.FilterFrom(ethkit.Address{1, 2, 3, 4}).MaxWait(0).ID(8888),

// ethreceipts.FilterLogs(func(logs []*types.Log) bool {
// for _, log := range logs {
Expand Down Expand Up @@ -674,6 +678,8 @@ loop:
assert.True(t, found, "looking for matched receipt %s", mr.TransactionHash().String())
}

t.Logf("matchedCount: %d", matchedCount)
t.Logf("erc20Receipts: %d", len(erc20Receipts))
require.Equal(t, matchedCount, len(erc20Receipts)*2)
}

Expand Down
39 changes: 30 additions & 9 deletions ethreceipts/filterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ func FilterTxnHash(txnHash ethkit.Hash) FilterQuery {
// default options for TxnHash filter. Note, other filter conds
// have a different set of defaults.
options: FilterOptions{
Finalize: true,
LimitOne: true,
SearchCache: true,
SearchOnChain: true,
Finalize: true,
LimitOne: true,
SearchCache: true,
QueryOnChainTxnHash: true,

// wait up to NumBlocksToFinality*2 number of blocks between
// filter matches before unsubcribing if no matches occured
Expand Down Expand Up @@ -116,8 +116,13 @@ type FilterQuery interface {
Finalize(bool) FilterQuery
LimitOne(bool) FilterQuery
SearchCache(bool) FilterQuery
SearchOnChain(bool) FilterQuery
QueryOnChainTxnHash(bool) FilterQuery
QueryOnChain(func(context.Context) (*types.Receipt, error)) FilterQuery
MaxWait(int) FilterQuery

// DEPRECATED: please use QueryOnChainTxnHash instead, which is the same thing, renamed to be more clear
// in addition, see new QueryChain(fn) as additional feature.
SearchOnChain(bool) FilterQuery
}

type FilterOptions struct {
Expand All @@ -133,9 +138,14 @@ type FilterOptions struct {
// ..
SearchCache bool

// SearchOnChain will search for txn hash on-chain. This is only useful
// when used in combination with TxnHash filter cond.
SearchOnChain bool
// QueryOnChainTxnHash will query the chain for the txn hash at the start
// of a filter subscription. This is only useful when used in combination with
// TxnHash filter cond, to search for past transactions which may have been
// mined before the filter was created.
QueryOnChainTxnHash bool

// ..
QueryOnChain func(context.Context) (*types.Receipt, error)

// MaxWait filter option waits some number of blocks without a filter match after
// which point will auto-unsubscribe the filter. This is useful to help automatically
Expand Down Expand Up @@ -196,8 +206,19 @@ func (f *filter) SearchCache(searchCache bool) FilterQuery {
return f
}

// DEPRECATED: please use QueryChainForTxnHash instead, which is the same thing, renamed to be more clear
func (f *filter) SearchOnChain(searchOnChain bool) FilterQuery {
f.options.SearchOnChain = searchOnChain
f.options.QueryOnChainTxnHash = searchOnChain
return f
}

func (f *filter) QueryOnChainTxnHash(queryOnChainTxnHash bool) FilterQuery {
f.options.QueryOnChainTxnHash = queryOnChainTxnHash
return f
}

func (f *filter) QueryOnChain(fn func(context.Context) (*types.Receipt, error)) FilterQuery {
f.options.QueryOnChain = fn
return f
}

Expand Down
2 changes: 1 addition & 1 deletion ethreceipts/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (f *finalizer) enqueue(filterID uint64, receipt Receipt, blockNum *big.Int)
txnID := txnHash
if filterID > 0 {
for i := 0; i < 8; i++ {
txnID[i] = txnID[i] + byte(filterID>>i)
txnID[i] = txnID[i] + byte(filterID>>uint(i))
}
}

Expand Down
59 changes: 47 additions & 12 deletions ethreceipts/receipt.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package ethreceipts

import (
"fmt"
"math/big"
"sync/atomic"

"github.com/0xsequence/ethkit"
"github.com/0xsequence/ethkit/ethtxn"
"github.com/0xsequence/ethkit/go-ethereum/common"
"github.com/0xsequence/ethkit/go-ethereum/core"
"github.com/0xsequence/ethkit/go-ethereum/core/types"
Expand All @@ -14,10 +17,14 @@ type Receipt struct {
Final bool // flags that this receipt is finalized
Reorged bool // chain reorged / removed the txn

chainID *big.Int
transaction *types.Transaction
message *core.Message // TODOXXX: this intermediate type is lame.. with new ethrpc we can remove
receipt *types.Receipt
logs []*types.Log

// TODOXXX: this intermediate type is lame.. with new ethrpc we can remove
// NOTE: we only use this for From/To address resolution currently
message atomic.Value
}

func (r *Receipt) Receipt() *types.Receipt {
Expand Down Expand Up @@ -143,24 +150,52 @@ func (r *Receipt) Logs() []*types.Log {
func (r *Receipt) From() common.Address {
if r.receipt != nil {
return r.receipt.From
} else if r.message != nil {
return r.message.From
} else {
return common.Address{}
if msg, _ := r.AsMessage(); msg != nil {
return msg.From
}
}
return common.Address{}
}

func (r *Receipt) To() common.Address {
if r.receipt != nil {
return r.receipt.To
} else if r.message != nil {
to := r.message.To
if to == nil {
return common.Address{}
} else {
return *to
}
} else {
return common.Address{}
if msg, _ := r.AsMessage(); msg != nil {
to := msg.To
if to == nil {
return common.Address{}
} else {
return *to
}
}
}
return common.Address{}
}

func (r *Receipt) AsMessage() (*core.Message, error) {
msg, ok := r.message.Load().(*core.Message)
if !ok {
return nil, fmt.Errorf("ethreceipts: Receipt.message type-assertion fail, unexpected")
}
if msg != nil {
return msg, nil
}

// TODOXXX: avoid using AsMessage as its fairly expensive operation, especially
// to do it for every txn for every filter.
// TODO: in order to do this, we'll have to update ethrpc with a different
// implementation to just use raw types, aka, ethrpc/types.go with Block/Transaction/Receipt/Log ..
txnMsg, err := ethtxn.AsMessage(r.transaction, r.chainID)
if err != nil {
// NOTE: this should never happen, but lets log in case it does. In the
// future, we should just not use go-ethereum for these types.
// l.log.Warn(fmt.Sprintf("unexpected failure of txn (%s index %d) on block %d (total txns=%d) AsMessage(..): %s",
// txn.Hash(), i, block.NumberU64(), len(block.Transactions()), err,
// ))
return nil, err
}
r.message.Store(txnMsg)
return txnMsg, nil
}
6 changes: 3 additions & 3 deletions ethreceipts/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func (s *subscriber) AddFilter(filterQueries ...FilterQuery) {
}

s.mu.Lock()

if len(s.filters)+len(filters) > maxFiltersPerListener {
// too many filters, ignore the extra filter. not ideal, but better than
// deadlocking
Expand All @@ -112,7 +111,6 @@ func (s *subscriber) AddFilter(filterQueries ...FilterQuery) {
s.mu.Unlock()
return
}

s.filters = append(s.filters, filters...)
s.mu.Unlock()

Expand Down Expand Up @@ -143,7 +141,9 @@ func (s *subscriber) ClearFilters() {
s.filters = s.filters[:0]
}

func (s *subscriber) matchFilters(ctx context.Context, filterers []Filterer, receipts []Receipt) ([]bool, error) {
// matchFiltersAndPublish matches the given receipts against the provided filterers,
// fetches any missing receipt data as needed, and notifies the subscriber of matches.
func (s *subscriber) matchFiltersAndPublish(ctx context.Context, filterers []Filterer, receipts []Receipt) ([]bool, error) {
oks := make([]bool, len(filterers))

// Collect matches that need receipt fetching
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ require (
github.com/goware/cachestore-mem v0.2.2
github.com/goware/cachestore-redis v0.2.1
github.com/goware/cachestore2 v0.12.3
github.com/goware/calc v0.2.0
github.com/goware/channel v0.5.0
github.com/goware/pp v0.0.3
github.com/goware/superr v0.0.2
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ github.com/goware/cachestore-redis v0.2.1 h1:bMdkzGuy6Dsybq2chv3kRwu8UGoJz3aQBO9
github.com/goware/cachestore-redis v0.2.1/go.mod h1:+8rOAfL1qNLNiXHe8+WACPk+I9kaLOAfArj2Z7FDiWg=
github.com/goware/cachestore2 v0.12.3 h1:V4VODChSAV29p8htHj8Lb36Hvv28CLrJsw49gx0h+ks=
github.com/goware/cachestore2 v0.12.3/go.mod h1:PR+lXK8UXa/wjKB7mpIj6HtRhC7vbcRXx4b5F1Av/ik=
github.com/goware/calc v0.2.0 h1:3B9qjXYpE0kgS4LhyklbM6X/0cOvZLdUZG7sdAuVCb4=
github.com/goware/calc v0.2.0/go.mod h1:BSQUbfS6ICW9RvSV9SikDY+t6/HQKI+CUxIpjE3VD28=
github.com/goware/channel v0.5.0 h1:cOllKceCH5Xhibs0v8jtPJ81ez3L7WpYri/OU+9IBfg=
github.com/goware/channel v0.5.0/go.mod h1:Eai0KCjphDZ44M/qT7G1ZE6lZfywiTFvwV3Xc6cDPdo=
github.com/goware/pp v0.0.3 h1:2Yv0IFGOpVjCDayPYzrqskCe9qmGoKBIyu6Uy//LVUU=
Expand Down
Loading