A comprehensive Go SDK for building flowctl data pipeline components with minimal boilerplate.
- Three Complete SDKs: Source, Processor, and Consumer
- Event-First API: Work with strongly-typed
*flowctlv1.Eventobjects - Automatic Control Plane Integration: Registration, heartbeats, and discovery
- Built-in Observability: Health checks, metrics, and logging
- Production-Ready: Graceful shutdown, error handling, and backpressure
- Developer-Friendly: 70-85% less code than manual gRPC implementation
go get github.com/withObsrvr/flowctl-sdkBuild a processor that transforms events:
package main
import (
"context"
"fmt"
"github.com/withObsrvr/flowctl-sdk/pkg/processor"
flowctlv1 "github.com/withObsrvr/flow-proto/go/gen/flowctl/v1"
)
func main() {
proc, _ := processor.New(processor.DefaultConfig())
proc.OnProcess(
func(ctx context.Context, event *flowctlv1.Event) (*flowctlv1.Event, error) {
// Transform the event
return &flowctlv1.Event{
Id: fmt.Sprintf("%s-processed", event.Id),
Type: "example.processed.event",
Payload: append(event.Payload, []byte(" - processed")...),
}, nil
},
[]string{"example.event"}, // Input types
[]string{"example.processed.event"}, // Output types
)
proc.Start(context.Background())
waitForSignal()
proc.Stop()
}Build a source that produces events:
import (
"github.com/withObsrvr/flowctl-sdk/pkg/source"
flowctlv1 "github.com/withObsrvr/flow-proto/go/gen/flowctl/v1"
)
func main() {
src, _ := source.New(source.DefaultConfig())
src.OnProduce(func(ctx context.Context, req *flowctlv1.StreamRequest) (<-chan *flowctlv1.Event, error) {
eventCh := make(chan *flowctlv1.Event, 100)
go func() {
defer close(eventCh)
for i := 0; ; i++ {
event := &flowctlv1.Event{
Id: fmt.Sprintf("event-%d", i),
Type: "example.event",
Payload: []byte(fmt.Sprintf("data-%d", i)),
}
select {
case <-ctx.Done():
return
case eventCh <- event:
}
}
}()
return eventCh, nil
})
src.Start(context.Background())
waitForSignal()
src.Stop()
}Build a consumer that stores events:
import (
"github.com/withObsrvr/flowctl-sdk/pkg/consumer"
flowctlv1 "github.com/withObsrvr/flow-proto/go/gen/flowctl/v1"
)
func main() {
cons, _ := consumer.New(consumer.DefaultConfig())
cons.OnConsume(func(ctx context.Context, event *flowctlv1.Event) error {
// Store event in database
return db.Insert(event)
})
cons.Start(context.Background())
waitForSignal()
cons.Stop()
}Enable flowctl integration in your processor configuration:
config := processor.DefaultConfig()
config.ID = "my-processor"
config.Name = "My Custom Processor"
config.Description = "Processes events in a custom way"
config.Endpoint = ":50052"
// Enable flowctl integration
config.FlowctlConfig.Enabled = true
config.FlowctlConfig.Endpoint = "flowctl-control-plane:8080"
config.FlowctlConfig.ServiceID = "my-processor-1" // Optional, will generate if not provided
proc, err := processor.New(config)
if err != nil {
log.Fatalf("Failed to create processor: %v", err)
}Track custom metrics easily:
// Add custom metrics
proc.Metrics().AddCounter("events_processed_by_type_x", 1)
proc.Metrics().AddGauge("queue_depth", 42.5)
// Get all metrics
metrics := proc.GetMetrics()The SDK automatically provides HTTP health check endpoints:
/health- Simple health check/ready- Readiness probe (returns 200 when the processor is ready)/live- Liveness probe (returns 200 when the processor is running)/metrics- Returns current metrics as JSON
config := processor.DefaultConfig()
// Basic configuration
config.ID = "custom-processor"
config.Name = "Custom Event Processor"
config.Description = "A processor that handles custom events"
config.Version = "1.2.3"
config.Endpoint = ":8888"
config.MaxConcurrent = 200
// Flowctl configuration
config.FlowctlConfig.Enabled = true
config.FlowctlConfig.Endpoint = "flowctl.example.com:8080"
config.FlowctlConfig.HeartbeatInterval = 15 * time.Second
config.FlowctlConfig.Metadata = map[string]string{
"deployment": "production",
"region": "us-west-2",
}
// Health check configuration
config.HealthPort = 9090Check out the examples directory for complete examples:
- Basic Processor - A simple processor example
- Advanced Processor - More complex processor with custom handling logic
Contributions are welcome! Please feel free to submit a Pull Request.
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.