Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ batch-export is a tool to retrieve Ethereum event logs for specific contracts, p
- Handles large block ranges by querying in smaller chunks.
- Supports rate limiting for RPC requests.
- Saves retrieved logs to a specified output file (default: `export.ndjson`) in NDJSON format.
- Supports file compression using **Gzip**, **Zstd**, or **Xz** (LZMA2).
- Graceful shutdown on interrupt signals (Ctrl+C).

## Requirements
Expand All @@ -34,7 +35,7 @@ The primary command is export.
```sh
./dist/batch-export export \
--block-range-limit=10000 \
--compress=true \
--compression-algo=zstd \
--end=0 \
--endpoint <YOUR_GNOSIS_RPC_ENDPOINT>
```
Expand All @@ -43,7 +44,8 @@ The primary command is export.

```sh
-b, --block-range-limit uint32 Max blocks per log query (default 5)
-c, --compress Compress to GZIP
-c, --compress Compress to GZIP (deprecated, use --compression-algo)
--compression-algo string Compression algorithm (gzip, zstd, xz, none) (default "gzip")
--end uint End block (optional, uses latest block if 0) (default 39810670)
-e, --endpoint string Ethereum RPC endpoint URL
-h, --help help for export
Expand Down
33 changes: 27 additions & 6 deletions cmd/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"sync"
"time"

"github.com/ethersphere/batch-export/pkg/compressor"
ethclient "github.com/ethersphere/batch-export/pkg/ethclientwrapper"
"github.com/ethersphere/batch-export/pkg/eventfetcher"
"github.com/ethersphere/batch-export/pkg/filestore"
"github.com/ethersphere/batch-export/pkg/gzipstore"
"github.com/ethersphere/bee/v2/pkg/config"
"github.com/ethersphere/bee/v2/pkg/util/abiutil"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -91,12 +91,32 @@ The process can be interrupted at any time (Ctrl+C), and it will attempt to save
}()

compressFunc := func() error {
algo, _ := cmd.Flags().GetString("compression-algo")
if compress {
if err := gzipstore.CompressFile(outputFile, outputFile+".gzip"); err != nil {
return fmt.Errorf("error compressing file: %w", err)
}
c.log.Info("File compressed", "outputFile", outputFile+".gzip")
algo = "gzip" // backward compatibility
}

if algo == "none" {
return nil
}

outputExt := ""
switch algo {
case "gzip":
outputExt = ".gzip"
case "zstd":
outputExt = ".zst"
case "xz":
outputExt = ".xz"
default:
return fmt.Errorf("unsupported compression algorithm: %s", algo)
}

compressedFile := outputFile + outputExt
if err := compressor.CompressFile(outputFile, compressedFile, algo); err != nil {
return fmt.Errorf("error compressing file: %w", err)
}
c.log.Info("File compressed", "outputFile", compressedFile)
return nil
}

Expand Down Expand Up @@ -138,7 +158,8 @@ The process can be interrupted at any time (Ctrl+C), and it will attempt to save
cmd.Flags().IntVarP(&maxRequest, "max-request", "m", 15, "Max RPC requests/sec")
cmd.Flags().Uint32VarP(&blockRangeLimit, "block-range-limit", "b", 5, "Max blocks per log query")
cmd.Flags().StringVarP(&outputFile, "output", "o", "export.ndjson", "Output file path (NDJSON)")
cmd.Flags().BoolVarP(&compress, "compress", "c", false, "Compress to GZIP")
cmd.Flags().BoolVarP(&compress, "compress", "c", false, "Compress to GZIP (deprecated, use --compression-algo)")
cmd.Flags().String("compression-algo", "gzip", "Compression algorithm (gzip, zstd, xz, none)")

c.root.AddCommand(cmd)

Expand Down
12 changes: 12 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,22 @@ go 1.24.0
require (
github.com/ethereum/go-ethereum v1.14.5
github.com/ethersphere/bee/v2 v2.5.0
github.com/klauspost/compress v1.17.6
github.com/spf13/cobra v1.9.1
golang.org/x/time v0.11.0
)

require (
github.com/stretchr/testify v1.8.4
github.com/ulikunitz/xz v0.5.15
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+F
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/tyler-smith/go-bip39 v1.1.0 h1:5eUemwrMargf3BSLRRCalXT93Ns6pQJIjYQN2nyfOP8=
github.com/tyler-smith/go-bip39 v1.1.0/go.mod h1:gUYDtqQw1JS3ZJ8UWVcGTGqqr6YIN3CWg+kkNaLt55U=
github.com/ulikunitz/xz v0.5.15 h1:9DNdB5s+SgV3bQ2ApL10xRc35ck0DuIX/isZvIk+ubY=
github.com/ulikunitz/xz v0.5.15/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
Expand Down Expand Up @@ -204,6 +206,8 @@ golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
Expand Down
53 changes: 53 additions & 0 deletions pkg/compressor/compressor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package compressor

import (
"compress/gzip"
"fmt"
"io"
"os"

"github.com/klauspost/compress/zstd"
"github.com/ulikunitz/xz"
)

// CompressFile compresses the specified input file into the specified output files using the given algorithm.
func CompressFile(inputFilePath string, outputFilePath string, algo string) error {
inputFile, err := os.Open(inputFilePath)
if err != nil {
return fmt.Errorf("failed to open input file '%s': %w", inputFilePath, err)
}
defer inputFile.Close()

outputFile, err := os.Create(outputFilePath)
if err != nil {
return fmt.Errorf("failed to create output file '%s': %w", outputFilePath, err)
}
defer outputFile.Close()

var writer io.WriteCloser

switch algo {
case "gzip":
writer = gzip.NewWriter(outputFile)
case "zstd":
writer, err = zstd.NewWriter(outputFile)
if err != nil {
return fmt.Errorf("failed to create zstd writer: %w", err)
}
case "xz":
writer, err = xz.NewWriter(outputFile)
if err != nil {
return fmt.Errorf("failed to create xz writer: %w", err)
}
default:
return fmt.Errorf("unsupported compression algorithm: %s", algo)
}
defer writer.Close()

_, err = io.Copy(writer, inputFile)
if err != nil {
return fmt.Errorf("failed to compress data: %w", err)
}

return nil
}
51 changes: 51 additions & 0 deletions pkg/compressor/compressor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package compressor_test

import (
"fmt"
"os"
"testing"

"github.com/ethersphere/batch-export/pkg/compressor"
"github.com/stretchr/testify/assert"
)

func TestCompressionSizes(t *testing.T) {
// Create specific test data that resembles JSON logs
inputContent := []byte(`{"address":"0x123","block":100,"event":"Transfer","data":"0xabc"}
{"address":"0x123","block":101,"event":"Transfer","data":"0xdef"}
{"address":"0x123","block":102,"event":"Transfer","data":"0xghi"}
`)
// Repeat to get a reasonable file size
for i := 0; i < 1000; i++ {
inputContent = append(inputContent, []byte(`{"address":"0x123","block":100,"event":"Transfer","data":"0xabc"}`)...)
}

inputFile := "test_input.json"
err := os.WriteFile(inputFile, inputContent, 0o644)
assert.NoError(t, err)
defer os.Remove(inputFile)

algos := []string{"gzip", "zstd", "xz"}
results := make(map[string]int64)

fmt.Printf("\n--- Compression Size Comparison (Input size: %d bytes) ---\n", len(inputContent))

for _, algo := range algos {
outputFile := "test_output." + algo
defer os.Remove(outputFile)

err := compressor.CompressFile(inputFile, outputFile, algo)
assert.NoError(t, err)

info, err := os.Stat(outputFile)
assert.NoError(t, err)

results[algo] = info.Size()
fmt.Printf("%-5s: %d bytes (%.2f%% of original)\n", algo, info.Size(), float64(info.Size())/float64(len(inputContent))*100)
}
fmt.Println("----------------------------------------------------------")

// Verify expectations: xz should generally be smaller than gzip for this kind of data
// Note: for very small files/specific patterns, results may vary, so we just log them for the user
// But typically xz < gzip
}