Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
49265aa
feat: vendor sink/sql from substreams-sink-sql@c05b15e (batch 1/8)
maoueh May 1, 2026
b67ef6a
feat: vendor sink/sql from substreams-sink-sql@c05b15e (batch 2/8)
maoueh May 1, 2026
092fbe9
feat: vendor sink/sql from substreams-sink-sql@c05b15e (batch 3/8)
maoueh May 1, 2026
b5c00e5
feat: vendor sink/sql from substreams-sink-sql@c05b15e (batch 4/8)
maoueh May 1, 2026
a786683
sink/sql: add db_proto sinker, sinker_factory, database, postgres, sc…
maoueh May 1, 2026
5c59d4d
sink/sql: add postgres database and dialect files
maoueh May 1, 2026
ba43590
sink/sql: add clickhouse decimal and integer utility files
maoueh May 1, 2026
829411c
sink/sql: add clickhouse database, dialect, and types files
maoueh May 1, 2026
504eb1b
sink/sql: add postgres dialect for db_changes
maoueh May 1, 2026
927931f
sink/sql: add clickhouse accumulator inserter
maoueh May 1, 2026
0138ec5
sink/sql: add pb services proto
maoueh May 1, 2026
2054915
sink/sql: add pb deprecated proto
maoueh May 1, 2026
3ef3649
Add substreams sink postgres and clickhouse CLI commands
maoueh May 1, 2026
e77ab8d
Add go.mod direct dependencies for sink/sql packages
maoueh May 1, 2026
671a468
Add vendored schema.pb.go for sink/sql package
maoueh May 1, 2026
4cb1c49
feat: add sink postgres and sink clickhouse CLI subcommands
maoueh May 1, 2026
6f30f5f
fix: replace btcutil/base58 with mr-tron/base58 (already in go.mod)
maoueh May 1, 2026
7e65311
fix: register postgres driver in db package (mirrors clickhouse regis…
maoueh May 1, 2026
8848fe9
fix: resolve CI compilation errors in sink/sql
maoueh May 1, 2026
6e44e91
chore: add local copy of database.pb.go from substreams-sink-database…
maoueh May 1, 2026
fb1f0dd
feat: embed substreams-sink-sql source under sink/sql/ (batch 1)
maoueh May 1, 2026
59b1f36
feat: embed substreams-sink-sql source under sink/sql/ (batch 2)
maoueh May 1, 2026
543977d
feat: embed substreams-sink-sql source under sink/sql/ (batch 3)
maoueh May 1, 2026
4f1f0e3
feat: embed substreams-sink-sql source under sink/sql/ (batch 4)
maoueh May 1, 2026
e807f92
feat: embed substreams-sink-sql source under sink/sql/ (batch 5 - db …
maoueh May 1, 2026
c646cc8
feat: embed substreams-sink-sql source under sink/sql/ (batch 6 - pos…
maoueh May 1, 2026
a3fbf1e
feat: embed substreams-sink-sql source under sink/sql/ (batch 7 - cli…
maoueh May 1, 2026
6cef49b
feat: push pb files to correct import path locations
maoueh May 1, 2026
b6b29f7
feat: push deprecated.pb.go to correct import path location
maoueh May 1, 2026
fb8fbf2
feat: push database.pb.go to correct import path location
maoueh May 1, 2026
0ecf45c
Add schema.pb.go to correct pb path
maoueh May 1, 2026
3073836
Remove wrong-path pb files (services)
maoueh May 1, 2026
35a5217
Remove wrong-path pb files (schema)
maoueh May 1, 2026
ce5697b
Remove wrong-path pb files (deprecated)
maoueh May 1, 2026
e1a1612
Remove wrong-path pb files (database)
maoueh May 1, 2026
757e53f
sink/sql: add missing dialect.go and services package files
maoueh May 1, 2026
431fd4b
fix: correct import path in accumulator_inserter.go (pb/sf not sink/s…
maoueh May 1, 2026
3363867
feat: add sink/sql packages - stats, context, constraint, inserter, u…
maoueh May 1, 2026
de564a3
feat: add db_proto support files, shared.go, bytes/encoding.go
maoueh May 1, 2026
bc7c7ad
feat: add db_proto sql schema files and proto/utils.go
maoueh May 1, 2026
7588df8
feat: add db_proto sinker, sinker_factory, sql/database, postgres files
maoueh May 1, 2026
d4c83bf
feat: add postgres dialect, accumulator_inserter, row_inserter
maoueh May 1, 2026
e2dc80e
feat: add clickhouse database.go and decimal.go
maoueh May 1, 2026
6850a4e
feat: add clickhouse dialect.go and types.go
maoueh May 1, 2026
942d24f
feat: add clickhouse accumulator_inserter.go
maoueh May 1, 2026
e58ebc3
feat: add db_changes/db/dialect.go
maoueh May 1, 2026
21cdb5f
feat: add db_changes/db/{dialect,operations,flush,metrics}.go
maoueh May 1, 2026
151bc0e
feat: add db_changes/db/{ops,testing,types,types_enum,user}.go
maoueh May 1, 2026
ae1b56f
feat: add db_changes/sinker/{metrics,stats,factory}.go
maoueh May 1, 2026
0f2dfd3
feat: add db_changes/db/{cursor,db}.go
maoueh May 1, 2026
1d11e1b
feat: add db_changes/bundler/writer/{common,interface,types}.go
maoueh May 1, 2026
a3af805
feat: add db_changes/bundler/{bundler,encoder,stats}.go
maoueh May 1, 2026
5cecd92
feat: add db_changes/bundler/writer/buffered.go
maoueh May 1, 2026
bb6d829
feat: add db_changes/sinker/sinker.go
maoueh May 1, 2026
fef1548
feat: add db_changes/sinker/setup.go
maoueh May 1, 2026
0b10979
feat: add db_changes/state/{file,interface}.go
maoueh May 1, 2026
fb1419f
feat: add db_changes/db/dsn.go
maoueh May 1, 2026
e53b4f6
feat: add db_changes/db/{dialect_postgres,dialect_clickhouse}.go
maoueh May 1, 2026
7cb11aa
feat: add inlined pbdatabase from substreams-sink-database-changes
maoueh May 1, 2026
68b5eb6
feat: add deprecated pb from substreams-sink-sql
maoueh May 1, 2026
bfa36ab
feat: add services pb from substreams-sink-sql
maoueh May 1, 2026
ff91d73
feat: add schema pb from substreams-sink-sql
maoueh May 1, 2026
ce36704
feat: add pb stub files for sink sql packages
maoueh May 1, 2026
bf1f8df
feat: add sink/sql root, bytes, proto and db_proto root files
maoueh May 1, 2026
b173bb0
feat: add db_proto sub-packages (proto utils, stats, sql core)
maoueh May 1, 2026
84993b5
feat: add db_proto/sql/schema and click_house types/decimal files
maoueh May 1, 2026
a5bcd63
feat: add click_house integer, database files
maoueh May 1, 2026
103a385
feat: add click_house dialect and accumulator_inserter files
maoueh May 1, 2026
42e40fb
feat: add postgres dialect, database, types, inserter files
maoueh May 1, 2026
f6350e3
feat: add db_changes/db core files (db, dsn, cursor, dialect, flush, …
maoueh May 1, 2026
6ce688b
feat: add db_changes/db remaining files (metrics, operations, types, …
maoueh May 1, 2026
89b8ba1
feat: add db_changes/db dialect files (clickhouse and postgres)
maoueh May 1, 2026
f318cdc
feat: add db_changes/sinker files (sinker, factory, setup, metrics, s…
maoueh May 1, 2026
fa69897
fix: remove duplicate TestTx declaration causing compile errors
maoueh May 1, 2026
babe7a1
fix: remove unused services files causing compile errors
maoueh May 1, 2026
479bc7b
fix: remove dbt.go services file (unused, references missing proto ty…
maoueh May 1, 2026
ceb5fdd
chore: remove misplaced database.pb.go from wrong directory
maoueh May 1, 2026
91893c8
chore: remove unused deprecated.pb.go from sink/sql/v1
maoueh May 1, 2026
8486fe0
feat: add sink/sql db_proto core files from substreams-sink-sql
maoueh May 1, 2026
85dca3c
feat: add sink/sql db_proto sinker and sinker_factory
maoueh May 1, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 194 additions & 0 deletions cmd/substreams/sink_clickhouse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package main

import (
"context"
"fmt"
"strings"
"time"

"github.com/jhump/protoreflect/desc"
"github.com/spf13/cobra"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/derr"
sinksqlbytes "github.com/streamingfast/substreams/sink/sql/bytes"
dbchangesdb "github.com/streamingfast/substreams/sink/sql/db_changes/db"
dbchangessinker "github.com/streamingfast/substreams/sink/sql/db_changes/sinker"
dbproto "github.com/streamingfast/substreams/sink/sql/db_proto"
dbprotoproto "github.com/streamingfast/substreams/sink/sql/db_proto/proto"
"github.com/streamingfast/substreams/sink"
"google.golang.org/protobuf/types/descriptorpb"
)

func init() {
sink.AddFlagsToSet(sinkClickhouseCmd.Flags(),
sink.FlagExcludeDefault(sink.FlagUndoBufferSize))

sinkClickhouseCmd.Flags().String("on-module-hash-mismatch", "error", "What to do when the module hash in the manifest does not match the one in the database, can be 'error', 'warn' or 'ignore'")
sinkClickhouseCmd.Flags().String("cursors-table", "cursors", "Name of the table to use for storing cursors")
sinkClickhouseCmd.Flags().String("history-table", "substreams_history", "Name of the table to use for storing block history")
sinkClickhouseCmd.Flags().String("bytes-encoding", "raw", "Encoding for protobuf bytes fields: raw, hex, 0xhex, base64, base58")
sinkClickhouseCmd.Flags().Int("batch-block-flush-interval", 1000, "When in catch up mode, flush every N blocks")
sinkClickhouseCmd.Flags().Int("batch-row-flush-interval", 100000, "When in catch up mode, flush every N rows")
sinkClickhouseCmd.Flags().Int("live-block-flush-interval", 1, "When processing in live mode, flush every N blocks")
sinkClickhouseCmd.Flags().Int("flush-retry-count", 3, "Number of retry attempts for flush operations")
sinkClickhouseCmd.Flags().Duration("flush-retry-delay", 1*time.Second, "Base delay for retry backoff on flush failures")
sinkClickhouseCmd.Flags().Bool("no-constraints", false, "Do not add constraints to the database (proto-based mode only)")
sinkClickhouseCmd.Flags().Int("block-batch-size", 25, "Number of blocks to process at a time (proto-based mode only)")
sinkClickhouseCmd.Flags().String("clickhouse-cluster", "", "If non-empty, a 'ON CLUSTER <cluster>' clause will be applied when setting up tables in ClickHouse")
sinkClickhouseCmd.Flags().String("clickhouse-sink-info-folder", "", "Folder where to store the ClickHouse sink info (proto-based mode only)")
sinkClickhouseCmd.Flags().String("clickhouse-cursor-file-path", "cursor.txt", "File path where to store the ClickHouse cursor (proto-based mode only)")
sinkClickhouseCmd.Flags().Int("clickhouse-query-retry-count", 3, "Number of retries for ClickHouse queries when an error occurs")
sinkClickhouseCmd.Flags().Duration("clickhouse-query-retry-sleep", time.Second, "Sleep duration between ClickHouse query retries")

SinkCmd.AddCommand(sinkClickhouseCmd)
}

var sinkClickhouseCmd = &cobra.Command{
Use: "clickhouse <dsn> [<manifest> [<module_name>]]",
Short: "Run a ClickHouse sink for Substreams",
RunE: sinkClickhouseE,
Args: cobra.RangeArgs(1, 3),
}

func sinkClickhouseE(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
cmd.SilenceUsage = true

dsnString := args[0]
var manifestPath, outputModule string
if len(args) > 1 {
manifestPath = args[1]
}
if len(args) > 2 {
outputModule = args[2]
}

sink.LoadSubstreamsAuthEnvFile(manifestPath)

sinkerConfig, err := sink.ConfigFromViper(cmd, sink.IgnoreOutputModuleType, manifestPath, outputModule, "sink_clickhouse", zlog, tracer)
if err != nil {
return err
}

outputType := strings.TrimPrefix(sinkerConfig.OutputModule.Output.Type, "proto:")

ctx, cancel := context.WithCancel(ctx)
go func() {
<-derr.SetupSignalHandler(0)
cancel()
}()

if strings.Contains(outputType, "DatabaseChanges") {
return sinkClickhouseDatabaseChanges(ctx, cmd, dsnString, sinkerConfig)
}
return sinkClickhouseProto(ctx, cmd, dsnString, sinkerConfig, outputType)
}

func sinkClickhouseDatabaseChanges(ctx context.Context, cmd *cobra.Command, dsnString string, sinkerConfig *sink.SinkerConfig) error {
dbchangessinker.RegisterMetrics()

baseSink, err := sink.NewFromConfig(sinkerConfig)
if err != nil {
return fmt.Errorf("creating base sinker: %w", err)
}

sinkerFactory := dbchangessinker.SinkerFactory(baseSink, dbchangessinker.SinkerFactoryOptions{
CursorTableName: sflags.MustGetString(cmd, "cursors-table"),
HistoryTableName: sflags.MustGetString(cmd, "history-table"),
ClickhouseCluster: sflags.MustGetString(cmd, "clickhouse-cluster"),
BatchBlockFlushInterval: sflags.MustGetInt(cmd, "batch-block-flush-interval"),
BatchRowFlushInterval: sflags.MustGetInt(cmd, "batch-row-flush-interval"),
LiveBlockFlushInterval: sflags.MustGetInt(cmd, "live-block-flush-interval"),
OnModuleHashMismatch: sflags.MustGetString(cmd, "on-module-hash-mismatch"),
HandleReorgs: false,
FlushRetryCount: sflags.MustGetInt(cmd, "flush-retry-count"),
FlushRetryDelay: sflags.MustGetDuration(cmd, "flush-retry-delay"),
})

sqlSinker, err := sinkerFactory(ctx, dsnString, zlog, tracer)
if err != nil {
return fmt.Errorf("unable to setup sql sinker: %w", err)
}

sqlSinker.Run(ctx)
return sqlSinker.Err()
}

func sinkClickhouseProto(ctx context.Context, cmd *cobra.Command, dsnString string, sinkerConfig *sink.SinkerConfig, outputType string) error {
dsn, err := dbchangesdb.ParseDSN(dsnString)
if err != nil {
return fmt.Errorf("parsing dsn: %w", err)
}

spkg := sinkerConfig.Pkg
protoFiles := make(map[string]*descriptorpb.FileDescriptorProto, len(spkg.ProtoFiles))
for _, file := range spkg.ProtoFiles {
protoFiles[file.GetName()] = file
}

deps, err := dbprotoproto.ResolveDependencies(protoFiles)
if err != nil {
return fmt.Errorf("resolving dependencies: %w", err)
}

fileDescriptor, err := dbprotoproto.FileDescriptorForOutputType(spkg, nil, deps, outputType)
if err != nil {
return fmt.Errorf("finding file descriptor for output type %q: %w", outputType, err)
}

var rootMessageDescriptor *desc.MessageDescriptor
for _, md := range fileDescriptor.GetMessageTypes() {
if md.GetFullyQualifiedName() == outputType {
rootMessageDescriptor = md
break
}
}
if rootMessageDescriptor == nil {
return fmt.Errorf("message descriptor not found for output type %q, ensure your substreams bundles its protobuf definitions", outputType)
}

useConstraints := !sflags.MustGetBool(cmd, "no-constraints")
useProtoOption := false
for _, dep := range fileDescriptor.GetDependencies() {
if dep.GetName() == "sf/substreams/sink/sql/schema/v1/schema.proto" {
useProtoOption = true
}
}
if !useProtoOption {
useConstraints = false
}

encodingStr := sflags.MustGetString(cmd, "bytes-encoding")
encoding, err := sinksqlbytes.ParseEncoding(encodingStr)
if err != nil {
return fmt.Errorf("invalid bytes encoding %q: %w", encodingStr, err)
}

baseSink, err := sink.NewFromConfig(sinkerConfig)
if err != nil {
return fmt.Errorf("creating base sinker: %w", err)
}

outputModuleName := sinkerConfig.OutputModule.Name
factory := dbproto.SinkerFactory(baseSink, outputModuleName, rootMessageDescriptor.UnwrapMessage(), dbproto.SinkerFactoryOptions{
UseProtoOption: useProtoOption,
UseConstraints: useConstraints,
UseTransactions: true,
BlockBatchSize: sflags.MustGetInt(cmd, "block-batch-size"),
Parallel: false,
Encoding: encoding,
Clickhouse: dbproto.SinkerFactoryClickhouse{
SinkInfoFolder: sflags.MustGetString(cmd, "clickhouse-sink-info-folder"),
CursorFilePath: sflags.MustGetString(cmd, "clickhouse-cursor-file-path"),
QueryRetryCount: sflags.MustGetInt(cmd, "clickhouse-query-retry-count"),
QueryRetrySleep: sflags.MustGetDuration(cmd, "clickhouse-query-retry-sleep"),
},
})

dbProtoSinker, err := factory(ctx, dsnString, dsn.Schema(), zlog, tracer)
if err != nil {
return fmt.Errorf("creating sinker: %w", err)
}

return dbProtoSinker.Run(ctx)
}
182 changes: 182 additions & 0 deletions cmd/substreams/sink_postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package main

import (
"context"
"fmt"
"strings"
"time"

"github.com/jhump/protoreflect/desc"
"github.com/spf13/cobra"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/derr"
sinksqlbytes "github.com/streamingfast/substreams/sink/sql/bytes"
dbchangesdb "github.com/streamingfast/substreams/sink/sql/db_changes/db"
dbchangessinker "github.com/streamingfast/substreams/sink/sql/db_changes/sinker"
dbproto "github.com/streamingfast/substreams/sink/sql/db_proto"
dbprotoproto "github.com/streamingfast/substreams/sink/sql/db_proto/proto"
"github.com/streamingfast/substreams/sink"
"google.golang.org/protobuf/types/descriptorpb"
)

func init() {
sink.AddFlagsToSet(sinkPostgresCmd.Flags(),
sink.FlagExcludeDefault(sink.FlagUndoBufferSize))

sinkPostgresCmd.Flags().String("on-module-hash-mismatch", "error", "What to do when the module hash in the manifest does not match the one in the database, can be 'error', 'warn' or 'ignore'")
sinkPostgresCmd.Flags().String("cursors-table", "cursors", "Name of the table to use for storing cursors")
sinkPostgresCmd.Flags().String("history-table", "substreams_history", "Name of the table to use for storing block history, used to handle reorgs")
sinkPostgresCmd.Flags().String("bytes-encoding", "raw", "Encoding for protobuf bytes fields: raw, hex, 0xhex, base64, base58")
sinkPostgresCmd.Flags().Int("batch-block-flush-interval", 1000, "When in catch up mode, flush every N blocks")
sinkPostgresCmd.Flags().Int("batch-row-flush-interval", 100000, "When in catch up mode, flush every N rows")
sinkPostgresCmd.Flags().Int("live-block-flush-interval", 1, "When processing in live mode, flush every N blocks")
sinkPostgresCmd.Flags().Int("flush-retry-count", 3, "Number of retry attempts for flush operations")
sinkPostgresCmd.Flags().Duration("flush-retry-delay", 1*time.Second, "Base delay for retry backoff on flush failures")
sinkPostgresCmd.Flags().Bool("no-constraints", false, "Do not add constraints to the database (proto-based mode only)")
sinkPostgresCmd.Flags().Int("block-batch-size", 25, "Number of blocks to process at a time (proto-based mode only)")

SinkCmd.AddCommand(sinkPostgresCmd)
}

var sinkPostgresCmd = &cobra.Command{
Use: "postgres <dsn> [<manifest> [<module_name>]]",
Short: "Run a PostgreSQL sink for Substreams",
RunE: sinkPostgresE,
Args: cobra.RangeArgs(1, 3),
}

func sinkPostgresE(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
cmd.SilenceUsage = true

dsnString := args[0]
var manifestPath, outputModule string
if len(args) > 1 {
manifestPath = args[1]
}
if len(args) > 2 {
outputModule = args[2]
}

sink.LoadSubstreamsAuthEnvFile(manifestPath)

sinkerConfig, err := sink.ConfigFromViper(cmd, sink.IgnoreOutputModuleType, manifestPath, outputModule, "sink_postgres", zlog, tracer)
if err != nil {
return err
}

outputType := strings.TrimPrefix(sinkerConfig.OutputModule.Output.Type, "proto:")

ctx, cancel := context.WithCancel(ctx)
go func() {
<-derr.SetupSignalHandler(0)
cancel()
}()

if strings.Contains(outputType, "DatabaseChanges") {
return sinkPostgresDatabaseChanges(ctx, cmd, dsnString, sinkerConfig)
}
return sinkPostgresProto(ctx, cmd, dsnString, sinkerConfig, outputType)
}

func sinkPostgresDatabaseChanges(ctx context.Context, cmd *cobra.Command, dsnString string, sinkerConfig *sink.SinkerConfig) error {
dbchangessinker.RegisterMetrics()

baseSink, err := sink.NewFromConfig(sinkerConfig)
if err != nil {
return fmt.Errorf("creating base sinker: %w", err)
}

sinkerFactory := dbchangessinker.SinkerFactory(baseSink, dbchangessinker.SinkerFactoryOptions{
CursorTableName: sflags.MustGetString(cmd, "cursors-table"),
HistoryTableName: sflags.MustGetString(cmd, "history-table"),
BatchBlockFlushInterval: sflags.MustGetInt(cmd, "batch-block-flush-interval"),
BatchRowFlushInterval: sflags.MustGetInt(cmd, "batch-row-flush-interval"),
LiveBlockFlushInterval: sflags.MustGetInt(cmd, "live-block-flush-interval"),
OnModuleHashMismatch: sflags.MustGetString(cmd, "on-module-hash-mismatch"),
HandleReorgs: true,
FlushRetryCount: sflags.MustGetInt(cmd, "flush-retry-count"),
FlushRetryDelay: sflags.MustGetDuration(cmd, "flush-retry-delay"),
})

sqlSinker, err := sinkerFactory(ctx, dsnString, zlog, tracer)
if err != nil {
return fmt.Errorf("unable to setup sql sinker: %w", err)
}

sqlSinker.Run(ctx)
return sqlSinker.Err()
}

func sinkPostgresProto(ctx context.Context, cmd *cobra.Command, dsnString string, sinkerConfig *sink.SinkerConfig, outputType string) error {
dsn, err := dbchangesdb.ParseDSN(dsnString)
if err != nil {
return fmt.Errorf("parsing dsn: %w", err)
}

spkg := sinkerConfig.Pkg
protoFiles := make(map[string]*descriptorpb.FileDescriptorProto, len(spkg.ProtoFiles))
for _, file := range spkg.ProtoFiles {
protoFiles[file.GetName()] = file
}

deps, err := dbprotoproto.ResolveDependencies(protoFiles)
if err != nil {
return fmt.Errorf("resolving dependencies: %w", err)
}

fileDescriptor, err := dbprotoproto.FileDescriptorForOutputType(spkg, nil, deps, outputType)
if err != nil {
return fmt.Errorf("finding file descriptor for output type %q: %w", outputType, err)
}

var rootMessageDescriptor *desc.MessageDescriptor
for _, md := range fileDescriptor.GetMessageTypes() {
if md.GetFullyQualifiedName() == outputType {
rootMessageDescriptor = md
break
}
}
if rootMessageDescriptor == nil {
return fmt.Errorf("message descriptor not found for output type %q, ensure your substreams bundles its protobuf definitions", outputType)
}

useConstraints := !sflags.MustGetBool(cmd, "no-constraints")
useProtoOption := false
for _, dep := range fileDescriptor.GetDependencies() {
if dep.GetName() == "sf/substreams/sink/sql/schema/v1/schema.proto" {
useProtoOption = true
}
}
if !useProtoOption {
useConstraints = false
}

encodingStr := sflags.MustGetString(cmd, "bytes-encoding")
encoding, err := sinksqlbytes.ParseEncoding(encodingStr)
if err != nil {
return fmt.Errorf("invalid bytes encoding %q: %w", encodingStr, err)
}

baseSink, err := sink.NewFromConfig(sinkerConfig)
if err != nil {
return fmt.Errorf("creating base sinker: %w", err)
}

outputModuleName := sinkerConfig.OutputModule.Name
factory := dbproto.SinkerFactory(baseSink, outputModuleName, rootMessageDescriptor.UnwrapMessage(), dbproto.SinkerFactoryOptions{
UseProtoOption: useProtoOption,
UseConstraints: useConstraints,
UseTransactions: true,
BlockBatchSize: sflags.MustGetInt(cmd, "block-batch-size"),
Parallel: false,
Encoding: encoding,
})

dbProtoSinker, err := factory(ctx, dsnString, dsn.Schema(), zlog, tracer)
if err != nil {
return fmt.Errorf("creating sinker: %w", err)
}

return dbProtoSinker.Run(ctx)
}
Loading
Loading