Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 70 additions & 23 deletions pkg/chipingress/batch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package batch
import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -252,34 +253,80 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback)

go func() {
defer func() { <-b.maxConcurrentSends }()
// this is specifically to prevent long running network calls
ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout)
defer cancel()

events := make([]*chipingress.CloudEventPb, len(messages))
for i, msg := range messages {
events[i] = msg.event
}
batchReq := &chipingress.CloudEventBatch{Events: events}
batchBytes := proto.Size(batchReq)
startedAt := time.Now()
_, err := b.client.PublishBatch(ctxTimeout, batchReq)
b.metrics.recordSend(context.Background(), len(messages), batchBytes, time.Since(startedAt), err == nil)
if err != nil {
b.log.Errorw("failed to publish batch", "error", err)
}
// the callbacks are placed in their own goroutine to not block releasing the semaphore
// we use a wait group, to ensure all callbacks are completed if .Stop() is called.
b.callbackWg.Go(func() {
for _, msg := range messages {
if msg.callback != nil {
msg.callback(err)
}
for _, batchMessages := range splitMessagesByRequestSize(messages, b.maxGRPCRequestSize) {
batchReq, batchBytes := newBatchRequest(batchMessages)
if b.maxGRPCRequestSize > 0 && batchBytes > b.maxGRPCRequestSize {
err := fmt.Errorf("publish batch serialized size %d exceeds max gRPC request size %d", batchBytes, b.maxGRPCRequestSize)
b.metrics.recordSend(context.Background(), len(batchMessages), batchBytes, 0, false)
b.log.Errorw("failed to publish batch", "error", err)
b.completeBatchCallbacks(batchMessages, err)
continue
}
})

// this is specifically to prevent long running network calls
ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout)
startedAt := time.Now()
_, err := b.client.PublishBatch(ctxTimeout, batchReq)
cancel()

b.metrics.recordSend(context.Background(), len(batchMessages), batchBytes, time.Since(startedAt), err == nil)
if err != nil {
b.log.Errorw("failed to publish batch", "error", err)
}
b.completeBatchCallbacks(batchMessages, err)
}
}()
}

func (b *Client) completeBatchCallbacks(messages []*messageWithCallback, err error) {
callbackMessages, callbackErr := messages, err
// the callbacks are placed in their own goroutine to not block releasing the semaphore
// we use a wait group, to ensure all callbacks are completed if .Stop() is called.
b.callbackWg.Go(func() {
for _, msg := range callbackMessages {
if msg.callback != nil {
msg.callback(callbackErr)
}
}
})
}

func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize int) [][]*messageWithCallback {
if len(messages) == 0 {
return nil
}
if maxRequestSize <= 0 {
return [][]*messageWithCallback{messages}
}

var batches [][]*messageWithCallback
current := make([]*messageWithCallback, 0, len(messages))
for _, msg := range messages {
candidate := append(current, msg)
_, candidateBytes := newBatchRequest(candidate)
if len(current) > 0 && candidateBytes > maxRequestSize {
batches = append(batches, current)
current = []*messageWithCallback{msg}
continue
}
current = candidate
}
if len(current) > 0 {
batches = append(batches, current)
}
return batches
}

func newBatchRequest(messages []*messageWithCallback) (*chipingress.CloudEventBatch, int) {
events := make([]*chipingress.CloudEventPb, len(messages))
for i, msg := range messages {
events[i] = msg.event
}
batchReq := &chipingress.CloudEventBatch{Events: events}
return batchReq, proto.Size(batchReq)
}

// WithBatchSize sets the number of messages to accumulate before sending a batch
func WithBatchSize(batchSize int) Opt {
return func(c *Client) {
Expand Down
120 changes: 120 additions & 0 deletions pkg/chipingress/batch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"testing"
"time"

cepb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks"
Expand Down Expand Up @@ -211,6 +213,112 @@ func TestSendBatch(t *testing.T) {

mockClient.AssertExpectations(t)
})

t.Run("splits oversized batch by max gRPC request size", func(t *testing.T) {
events := []*chipingress.CloudEventPb{
largeTestEvent("test-id-1"),
largeTestEvent("test-id-2"),
largeTestEvent("test-id-3"),
largeTestEvent("test-id-4"),
largeTestEvent("test-id-5"),
}
maxRequestSize := proto.Size(&chipingress.CloudEventBatch{Events: events[:2]})
require.LessOrEqual(t, proto.Size(&chipingress.CloudEventBatch{Events: events[:1]}), maxRequestSize)
require.Greater(t, proto.Size(&chipingress.CloudEventBatch{Events: events[:3]}), maxRequestSize)

mockClient := mocks.NewClient(t)
done := make(chan struct{})
callbackDone := make(chan error, len(events))
var mu sync.Mutex
var publishedIDs []string
var publishedSizes []int

mockClient.
On("PublishBatch",
mock.Anything,
mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool {
return len(batch.Events) > 0 && proto.Size(batch) <= maxRequestSize
}),
).
Return(&chipingress.PublishResponse{}, nil).
Run(func(args mock.Arguments) {
batch := args.Get(1).(*chipingress.CloudEventBatch)
mu.Lock()
for _, event := range batch.Events {
publishedIDs = append(publishedIDs, event.Id)
}
publishedSizes = append(publishedSizes, proto.Size(batch))
if len(publishedIDs) == len(events) {
close(done)
}
mu.Unlock()
}).
Times(3)

client, err := NewBatchClient(mockClient, WithMaxGRPCRequestSize(maxRequestSize))
require.NoError(t, err)

messages := make([]*messageWithCallback, 0, len(events))
for _, event := range events {
messages = append(messages, &messageWithCallback{
event: event,
callback: func(err error) {
callbackDone <- err
},
})
}

client.sendBatch(t.Context(), messages)

select {
case <-done:
case <-time.After(time.Second):
t.Fatal("timeout waiting for split batches to be sent")
}
for range events {
select {
case err := <-callbackDone:
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("timeout waiting for split batch callback")
}
}

assert.Equal(t, []string{"test-id-1", "test-id-2", "test-id-3", "test-id-4", "test-id-5"}, publishedIDs)
for _, size := range publishedSizes {
assert.LessOrEqual(t, size, maxRequestSize)
}
mockClient.AssertExpectations(t)
})

t.Run("doesn't publish a single event over max gRPC request size", func(t *testing.T) {
mockClient := mocks.NewClient(t)
callbackDone := make(chan error, 1)
event := largeTestEvent("oversized-id")
maxRequestSize := proto.Size(&chipingress.CloudEventBatch{Events: []*chipingress.CloudEventPb{event}}) - 1

client, err := NewBatchClient(mockClient, WithMaxGRPCRequestSize(maxRequestSize))
require.NoError(t, err)

client.sendBatch(t.Context(), []*messageWithCallback{
{
event: event,
callback: func(err error) {
callbackDone <- err
},
},
})

select {
case err := <-callbackDone:
require.Error(t, err)
assert.Contains(t, err.Error(), "exceeds max gRPC request size")
case <-time.After(time.Second):
t.Fatal("timeout waiting for oversized batch callback")
}

mockClient.AssertNotCalled(t, "PublishBatch", mock.Anything, mock.Anything)
})
}

func TestStart(t *testing.T) {
Expand Down Expand Up @@ -903,6 +1011,18 @@ func countCounters(counters *sync.Map) int {
return n
}

func largeTestEvent(id string) *chipingress.CloudEventPb {
return &chipingress.CloudEventPb{
Id: id,
Source: "test-source",
Type: "test.event.type",
SpecVersion: "1.0",
Data: &cepb.CloudEvent_BinaryData{
BinaryData: []byte("0123456789abcdefghijklmnopqrstuvwxyz"),
},
}
}

func TestSeqnum(t *testing.T) {
t.Run("dropped messages consume seqnum and create detectable gaps", func(t *testing.T) {
client, err := NewBatchClient(nil, WithMessageBuffer(1))
Expand Down
Loading