Skip to content
This repository was archived by the owner on Jun 18, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions kurrentdb/append_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (

// AppendToStreamOptions options of the append stream request.
type AppendToStreamOptions struct {
// Asks the server to check that the stream receiving the event is at the given expected version.
ExpectedRevision ExpectedRevision
// Asks the server to check that the stream receiving the event is at the expected state.
StreamState StreamState
// Asks for authenticated request.
Authenticated *Credentials
// A length of time to use for gRPC deadlines.
Expand All @@ -33,7 +33,7 @@ func (o *AppendToStreamOptions) requiresLeader() bool {
}

func (o *AppendToStreamOptions) setDefaults() {
if o.ExpectedRevision == nil {
o.ExpectedRevision = Any{}
if o.StreamState == nil {
o.StreamState = Any{}
}
}
8 changes: 4 additions & 4 deletions kurrentdb/append_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func appendToStreamSingleEventNoStream(db *kurrentdb.Client) TestCall {
defer cancel()

opts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.NoStream{},
StreamState: kurrentdb.NoStream{},
}

_, err := db.AppendToStream(context, streamID.String(), opts, testEvent)
Expand Down Expand Up @@ -103,7 +103,7 @@ func appendWithInvalidStreamRevision(db *kurrentdb.Client) TestCall {
defer cancel()

opts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.StreamExists{},
StreamState: kurrentdb.StreamExists{},
}

_, err := db.AppendToStream(context, streamID.String(), opts, createTestEvent())
Expand Down Expand Up @@ -139,7 +139,7 @@ func appendToSystemStreamWithIncorrectCredentials(container *Container) TestCall
defer cancel()

opts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.Any{},
StreamState: kurrentdb.Any{},
}

_, err = db.AppendToStream(context, streamID.String(), opts, createTestEvent())
Expand All @@ -156,7 +156,7 @@ func metadataOperation(db *kurrentdb.Client) TestCall {
defer cancel()

opts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.Any{},
StreamState: kurrentdb.Any{},
}

_, err := db.AppendToStream(context, streamID.String(), opts, createTestEvent())
Expand Down
6 changes: 3 additions & 3 deletions kurrentdb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (client *Client) AppendToStream(
return nil, fmt.Errorf("could not construct append operation. Reason: %w", err)
}

header := toAppendHeader(streamID, opts.ExpectedRevision)
header := toAppendHeader(streamID, opts.StreamState)

if err := appendOperation.Send(header); err != nil {
err = client.grpcClient.handleError(handle, trailers, err)
Expand Down Expand Up @@ -248,7 +248,7 @@ func (client *Client) DeleteStream(
callOptions := []grpc.CallOption{grpc.Header(&headers), grpc.Trailer(&trailers)}
callOptions, ctx, cancel := configureGrpcCall(parent, client.config, &opts, callOptions, client.grpcClient.perRPCCredentials)
defer cancel()
deleteRequest := toDeleteRequest(streamID, opts.ExpectedRevision)
deleteRequest := toDeleteRequest(streamID, opts.StreamState)
deleteResponse, err := streamsClient.Delete(ctx, deleteRequest, callOptions...)
if err != nil {
err = client.grpcClient.handleError(handle, trailers, err)
Expand Down Expand Up @@ -278,7 +278,7 @@ func (client *Client) TombstoneStream(
callOptions := []grpc.CallOption{grpc.Header(&headers), grpc.Trailer(&trailers)}
callOptions, ctx, cancel := configureGrpcCall(parent, client.config, &opts, callOptions, client.grpcClient.perRPCCredentials)
defer cancel()
tombstoneRequest := toTombstoneRequest(streamID, opts.ExpectedRevision)
tombstoneRequest := toTombstoneRequest(streamID, opts.StreamState)
tombstoneResponse, err := streamsClient.Tombstone(ctx, tombstoneRequest, callOptions...)

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion kurrentdb/client_certificates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func testInvalidUserCertificates(t *testing.T, endpoint string) {

streamID := uuid.NewString()
opts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.Any{},
StreamState: kurrentdb.Any{},
}

result, err := c.AppendToStream(context.Background(), streamID, opts, testEvent)
Expand Down
4 changes: 2 additions & 2 deletions kurrentdb/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func closeConnection(container *Container) TestCall {
context, cancel := context.WithTimeout(context.Background(), time.Duration(5)*time.Second)
defer cancel()
opts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.NoStream{},
StreamState: kurrentdb.NoStream{},
}
_, err := db.AppendToStream(context, streamID.String(), opts, testEvent)

Expand All @@ -36,7 +36,7 @@ func closeConnection(container *Container) TestCall {
}

db.Close()
opts.ExpectedRevision = kurrentdb.Any{}
opts.StreamState = kurrentdb.Any{}
_, err = db.AppendToStream(context, streamID.String(), opts, testEvent)

esdbErr, ok := kurrentdb.FromError(err)
Expand Down
8 changes: 4 additions & 4 deletions kurrentdb/delete_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (

// DeleteStreamOptions options of the delete stream request.
type DeleteStreamOptions struct {
// Asks the server to check that the stream receiving the event is at the given expected version.
ExpectedRevision ExpectedRevision
// Asks the server to check that the stream receiving the event is at the expected state.
StreamState StreamState
// Asks for authenticated request.
Authenticated *Credentials
// A length of time to use for gRPC deadlines.
Expand All @@ -33,7 +33,7 @@ func (o *DeleteStreamOptions) requiresLeader() bool {
}

func (o *DeleteStreamOptions) setDefaults() {
if o.ExpectedRevision == nil {
o.ExpectedRevision = Any{}
if o.StreamState == nil {
o.StreamState = Any{}
}
}
4 changes: 2 additions & 2 deletions kurrentdb/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func DeleteTests(t *testing.T, db *kurrentdb.Client) {
func canDeleteStream(db *kurrentdb.Client) TestCall {
return func(t *testing.T) {
opts := kurrentdb.DeleteStreamOptions{
ExpectedRevision: kurrentdb.Revision(0),
StreamState: kurrentdb.Revision(0),
}

streamID := NAME_GENERATOR.Generate()
Expand All @@ -43,7 +43,7 @@ func canTombstoneStream(db *kurrentdb.Client) TestCall {

_, err := db.AppendToStream(context.Background(), streamId, kurrentdb.AppendToStreamOptions{}, createTestEvent())
deleteResult, err := db.TombstoneStream(context.Background(), streamId, kurrentdb.TombstoneStreamOptions{
ExpectedRevision: kurrentdb.Revision(0),
StreamState: kurrentdb.Revision(0),
})

if err != nil {
Expand Down
24 changes: 12 additions & 12 deletions kurrentdb/persistent_subscription_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func persistentSubscription_ToExistingStream_StartFromBeginning_AndEventsInIt(cl
streamID := NAME_GENERATOR.Generate()
// append events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events);
opts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.NoStream{},
StreamState: kurrentdb.NoStream{},
}

_, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events...)
Expand Down Expand Up @@ -131,7 +131,7 @@ func persistentSubscription_ToNonExistingStream_StartFromBeginning_AppendEventsA
require.NoError(t, err)
// append events to StreamsClient.AppendToStreamAsync(Stream, stream_revision.StreamRevisionNoStream, Events);
opts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.NoStream{},
StreamState: kurrentdb.NoStream{},
}
_, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events...)
require.NoError(t, err)
Expand Down Expand Up @@ -160,7 +160,7 @@ func persistentSubscription_ToExistingStream_StartFromEnd_EventsInItAndAppendEve
streamID := NAME_GENERATOR.Generate()
// append events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events);
opts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.NoStream{},
StreamState: kurrentdb.NoStream{},
}
_, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events[:10]...)
require.NoError(t, err)
Expand All @@ -177,7 +177,7 @@ func persistentSubscription_ToExistingStream_StartFromEnd_EventsInItAndAppendEve
require.NoError(t, err)

// append 1 event to StreamsClient.AppendToStreamAsync(Stream, new StreamRevision(9), event[10])
opts.ExpectedRevision = kurrentdb.Revision(9)
opts.StreamState = kurrentdb.Revision(9)
_, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events[10:]...)
require.NoError(t, err)

Expand Down Expand Up @@ -205,7 +205,7 @@ func persistentSubscription_ToExistingStream_StartFromEnd_EventsInIt(clientInsta
streamID := NAME_GENERATOR.Generate()
// append events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events);
opts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.NoStream{},
StreamState: kurrentdb.NoStream{},
}

_, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events[:10]...)
Expand Down Expand Up @@ -274,7 +274,7 @@ func persistentSubscription_ToNonExistingStream_StartFromTwo_AppendEventsAfterwa
require.NoError(t, err)
// append 3 event to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, events)
opts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.NoStream{},
StreamState: kurrentdb.NoStream{},
}
_, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events...)
require.NoError(t, err)
Expand Down Expand Up @@ -302,7 +302,7 @@ func persistentSubscription_ToExistingStream_StartFrom10_EventsInItAppendEventsA

// append 10 events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, events[:10]);
opts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.NoStream{},
StreamState: kurrentdb.NoStream{},
}
streamID := NAME_GENERATOR.Generate()
_, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events[:10]...)
Expand All @@ -322,7 +322,7 @@ func persistentSubscription_ToExistingStream_StartFrom10_EventsInItAppendEventsA

// append 1 event to StreamsClient.AppendToStreamAsync(Stream, StreamRevision(9), events[10:)
opts = kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.Revision(9),
StreamState: kurrentdb.Revision(9),
}
_, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events[10:]...)
require.NoError(t, err)
Expand Down Expand Up @@ -351,7 +351,7 @@ func persistentSubscription_ToExistingStream_StartFrom4_EventsInIt(clientInstanc

// append 10 events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, events[:10]);
opts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.NoStream{},
StreamState: kurrentdb.NoStream{},
}
streamID := NAME_GENERATOR.Generate()
_, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events[:10]...)
Expand All @@ -372,7 +372,7 @@ func persistentSubscription_ToExistingStream_StartFrom4_EventsInIt(clientInstanc

// append 1 event to StreamsClient.AppendToStreamAsync(Stream, StreamRevision(9), events)
opts = kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.Revision(9),
StreamState: kurrentdb.Revision(9),
}
_, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events[10:]...)
require.NoError(t, err)
Expand Down Expand Up @@ -402,7 +402,7 @@ func persistentSubscription_ToExistingStream_StartFromHigherRevisionThenEventsIn
// append 10 events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, events[:10]);
streamID := NAME_GENERATOR.Generate()
opts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.NoStream{},
StreamState: kurrentdb.NoStream{},
}
_, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events[:11]...)
require.NoError(t, err)
Expand All @@ -421,7 +421,7 @@ func persistentSubscription_ToExistingStream_StartFromHigherRevisionThenEventsIn

// append event to StreamsClient.AppendToStreamAsync(Stream, StreamRevision(10), events[11:])
opts = kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.Revision(10),
StreamState: kurrentdb.Revision(10),
}

_, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events[11])
Expand Down
2 changes: 1 addition & 1 deletion kurrentdb/persistent_subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func pushEventsToStream(t *testing.T,
events []kurrentdb.EventData) {

opts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.NoStream{},
StreamState: kurrentdb.NoStream{},
}
_, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events...)

Expand Down
6 changes: 3 additions & 3 deletions kurrentdb/protobuf_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const systemMetadataKeysContentType = "content-type"
const systemMetadataKeysCreated = "created"

// toAppendHeader ...
func toAppendHeader(streamID string, streamRevision ExpectedRevision) *api.AppendReq {
func toAppendHeader(streamID string, streamRevision StreamState) *api.AppendReq {
appendReq := &api.AppendReq{
Content: &api.AppendReq_Options_{
Options: &api.AppendReq_Options{},
Expand Down Expand Up @@ -206,7 +206,7 @@ func toFilterOptions(options *SubscriptionFilterOptions) (*api.ReadReq_Options_F
return &filterOptions, nil
}

func toDeleteRequest(streamID string, streamRevision ExpectedRevision) *api.DeleteReq {
func toDeleteRequest(streamID string, streamRevision StreamState) *api.DeleteReq {
deleteReq := &api.DeleteReq{
Options: &api.DeleteReq_Options{
StreamIdentifier: &shared.StreamIdentifier{
Expand Down Expand Up @@ -237,7 +237,7 @@ func toDeleteRequest(streamID string, streamRevision ExpectedRevision) *api.Dele
return deleteReq
}

func toTombstoneRequest(streamID string, streamRevision ExpectedRevision) *api.TombstoneReq {
func toTombstoneRequest(streamID string, streamRevision StreamState) *api.TombstoneReq {
tombstoneReq := &api.TombstoneReq{
Options: &api.TombstoneReq_Options{
StreamIdentifier: &shared.StreamIdentifier{
Expand Down
2 changes: 1 addition & 1 deletion kurrentdb/read_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func readStreamReturnsEOFAfterCompletion(db *kurrentdb.Client) TestCall {
}

opts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.NoStream{},
StreamState: kurrentdb.NoStream{},
}

streamID := NAME_GENERATOR.Generate()
Expand Down
6 changes: 3 additions & 3 deletions kurrentdb/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ type StreamExists struct{}
// NoStream means the stream being written to should not yet exist.
type NoStream struct{}

// ExpectedRevision the use of expected revision can be a bit tricky especially when discussing guaranties given by
// StreamState the use of expected revision can be a bit tricky especially when discussing guaranties given by
// KurrentDB server. The KurrentDB server will assure idempotency for all requests using any value in
// ExpectedRevision except Any. When using Any, the KurrentDB server will do its best to assure idempotency but
// StreamState except Any. When using Any, the KurrentDB server will do its best to assure idempotency but
// will not guarantee it.
type ExpectedRevision interface {
type StreamState interface {
isExpectedRevision()
}

Expand Down
2 changes: 1 addition & 1 deletion kurrentdb/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func streamSubscriptionDeliversAllEventsInStreamAndListensForNewEvents(db *kurre

// Write a new event
opts2 := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.Revision(5_999),
StreamState: kurrentdb.Revision(5_999),
}
writeResult, err := db.AppendToStream(context.Background(), streamID, opts2, testEvent)
require.NoError(t, err)
Expand Down
8 changes: 4 additions & 4 deletions kurrentdb/tombstone_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import "time"

// TombstoneStreamOptions options of the tombstone stream request.
type TombstoneStreamOptions struct {
// Asks the server to check that the stream receiving the event is at the given expected version.
ExpectedRevision ExpectedRevision
// Asks the server to check that the stream receiving the event is at the expected state.
StreamState StreamState
// Asks for authenticated request.
Authenticated *Credentials
// A length of time to use for gRPC deadlines.
Expand All @@ -31,7 +31,7 @@ func (o *TombstoneStreamOptions) requiresLeader() bool {
}

func (o *TombstoneStreamOptions) setDefaults() {
if o.ExpectedRevision == nil {
o.ExpectedRevision = Any{}
if o.StreamState == nil {
o.StreamState = Any{}
}
}
6 changes: 3 additions & 3 deletions samples/appendingEvents.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func AppendToStream(db *kurrentdb.Client) {
}

options := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.NoStream{},
StreamState: kurrentdb.NoStream{},
}

result, err := db.AppendToStream(context.Background(), "some-stream", options, kurrentdb.EventData{
Expand Down Expand Up @@ -89,7 +89,7 @@ func AppendWithNoStream(db *kurrentdb.Client) {
}

options := kurrentdb.AppendToStreamOptions{
ExpectedRevision: kurrentdb.NoStream{},
StreamState: kurrentdb.NoStream{},
}

_, err = db.AppendToStream(context.Background(), "same-event-stream", options, kurrentdb.EventData{
Expand Down Expand Up @@ -151,7 +151,7 @@ func AppendWithConcurrencyCheck(db *kurrentdb.Client) {
}

aopts := kurrentdb.AppendToStreamOptions{
ExpectedRevision: lastEvent.OriginalStreamRevision(),
StreamState: lastEvent.OriginalStreamRevision(),
}

_, err = db.AppendToStream(context.Background(), "concurrency-stream", aopts, kurrentdb.EventData{
Expand Down