Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/ripe-plums-agree.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink-deployments-framework": minor
---

(feat) Support catalog deletes for the datastore
164 changes: 42 additions & 122 deletions datastore/catalog/remote/address_ref_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,97 +212,31 @@ func (s *catalogAddressRefStore) Filter(
}

func (s *catalogAddressRefStore) Add(_ context.Context, record datastore.AddressRef) error {
// Convert the datastore record to protobuf
protoRef := s.addressRefToProto(record)

// Create the edit request with INSERT semantics
editRequest := &pb.AddressReferenceEditRequest{
Record: protoRef,
Semantics: pb.EditSemantics_SEMANTICS_INSERT,
}

// Create the request
editReq := &pb.DataAccessRequest{
req := &pb.DataAccessRequest{
Operation: &pb.DataAccessRequest_AddressReferenceEditRequest{
AddressReferenceEditRequest: editRequest,
AddressReferenceEditRequest: &pb.AddressReferenceEditRequest{
Record: s.addressRefToProto(record),
Semantics: pb.EditSemantics_SEMANTICS_INSERT,
},
},
}

// Create a bidirectional stream with the initial request for HMAC
stream, err := s.client.DataAccess(editReq)
if err != nil {
return fmt.Errorf("failed to create data access stream: %w", err)
}

if sendErr := stream.Send(editReq); sendErr != nil {
return fmt.Errorf("failed to send edit request: %w", sendErr)
}

// Receive the edit response
editResponse, err := stream.Recv()
if err != nil {
return fmt.Errorf("failed to receive edit response: %w", err)
}

// Check for errors in the edit response
if err := parseResponseStatus(editResponse.Status); err != nil {
return fmt.Errorf("add address ref failed: %w", err)
}

// Extract the edit response to validate it
editResp := editResponse.GetAddressReferenceEditResponse()
if editResp == nil {
return errors.New("unexpected edit response type")
}

return nil
return executeEdit(s.client, req,
(*pb.DataAccessResponse).GetAddressReferenceEditResponse, nil)
}

func (s *catalogAddressRefStore) Upsert(_ context.Context, record datastore.AddressRef) error {
// Convert the datastore record to protobuf
protoRef := s.addressRefToProto(record)

// Create the edit request with UPSERT semantics
editRequest := &pb.AddressReferenceEditRequest{
Record: protoRef,
Semantics: pb.EditSemantics_SEMANTICS_UPSERT,
}

// Create the request
request := &pb.DataAccessRequest{
req := &pb.DataAccessRequest{
Operation: &pb.DataAccessRequest_AddressReferenceEditRequest{
AddressReferenceEditRequest: editRequest,
AddressReferenceEditRequest: &pb.AddressReferenceEditRequest{
Record: s.addressRefToProto(record),
Semantics: pb.EditSemantics_SEMANTICS_UPSERT,
},
},
}

// Create a bidirectional stream with the initial request for HMAC
stream, err := s.client.DataAccess(request)
if err != nil {
return fmt.Errorf("failed to create data access stream: %w", err)
}

if sendErr := stream.Send(request); sendErr != nil {
return fmt.Errorf("failed to send edit request: %w", sendErr)
}

// Receive the response
response, err := stream.Recv()
if err != nil {
return fmt.Errorf("failed to receive response: %w", err)
}

// Check for errors in the response
if err := parseResponseStatus(response.Status); err != nil {
return fmt.Errorf("upsert address ref failed: %w", err)
}

// Extract the edit response to validate it
editResponse := response.GetAddressReferenceEditResponse()
if editResponse == nil {
return errors.New("unexpected response type")
}

return nil
return executeEdit(s.client, req,
(*pb.DataAccessResponse).GetAddressReferenceEditResponse, nil)
}

func (s *catalogAddressRefStore) Update(ctx context.Context, record datastore.AddressRef) error {
Expand All @@ -319,56 +253,42 @@ func (s *catalogAddressRefStore) Update(ctx context.Context, record datastore.Ad
}

// Record exists, proceed with updating it
// Convert the datastore record to protobuf
protoRef := s.addressRefToProto(record)

// Create the edit request with UPDATE semantics
editRequest := &pb.AddressReferenceEditRequest{
Record: protoRef,
Semantics: pb.EditSemantics_SEMANTICS_UPDATE,
}

// Create the request
editReq := &pb.DataAccessRequest{
req := &pb.DataAccessRequest{
Operation: &pb.DataAccessRequest_AddressReferenceEditRequest{
AddressReferenceEditRequest: editRequest,
AddressReferenceEditRequest: &pb.AddressReferenceEditRequest{
Record: s.addressRefToProto(record),
Semantics: pb.EditSemantics_SEMANTICS_UPDATE,
},
},
}

// Create a bidirectional stream with the initial request for HMAC
stream, streamErr := s.client.DataAccess(editReq)
if streamErr != nil {
return fmt.Errorf("failed to create data access stream: %w", streamErr)
}

if sendErr := stream.Send(editReq); sendErr != nil {
return fmt.Errorf("failed to send edit request: %w", sendErr)
}

// Receive the edit response
editResponse, err := stream.Recv()
if err != nil {
return fmt.Errorf("failed to receive edit response: %w", err)
}
return executeEdit(s.client, req,
(*pb.DataAccessResponse).GetAddressReferenceEditResponse, nil)
}

// Check for errors in the edit response
if err := parseResponseStatus(editResponse.Status); err != nil {
return fmt.Errorf("update address ref failed: %w", err)
}
func (s *catalogAddressRefStore) Delete(_ context.Context, key datastore.AddressRefKey) error {
return s.deleteRecord(key)
}

// Extract the edit response to validate it
editResp := editResponse.GetAddressReferenceEditResponse()
if editResp == nil {
return errors.New("unexpected edit response type")
func (s *catalogAddressRefStore) deleteRecord(key datastore.AddressRefKey) error {
req := &pb.DataAccessRequest{
Operation: &pb.DataAccessRequest_AddressReferenceEditRequest{
AddressReferenceEditRequest: &pb.AddressReferenceEditRequest{
Record: &pb.AddressReference{
Domain: s.domain,
Environment: s.environment,
ChainSelector: key.ChainSelector(),
ContractType: string(key.Type()),
Version: key.Version().String(),
Qualifier: key.Qualifier(),
},
Semantics: pb.EditSemantics_SEMANTICS_DELETE,
},
},
}

return nil
}

func (s *catalogAddressRefStore) Delete(_ context.Context, _ datastore.AddressRefKey) error {
// The catalog API does not support delete operations
// This is intentional as catalogs are typically immutable reference stores
return errors.New("delete operation not supported by catalog API")
return executeEdit(s.client, req,
(*pb.DataAccessResponse).GetAddressReferenceEditResponse, nil)
}

// keyToFilter converts a datastore.AddressRefKey to a protobuf AddressReferenceKeyFilter
Expand Down
94 changes: 85 additions & 9 deletions datastore/catalog/remote/address_ref_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,17 +288,93 @@ func TestCatalogAddressRefStore_Upsert(t *testing.T) {

func TestCatalogAddressRefStore_Delete(t *testing.T) {
t.Parallel()
store := setupTestStore(t, "", "")

version := semver.MustParse("1.0.0")
key := datastore.NewAddressRefKey(12345, "LinkToken", version, "test")
tests := []struct {
name string
run func(t *testing.T, store *catalogAddressRefStore)
}{
{
name: "delete_existing_record",
run: func(t *testing.T, store *catalogAddressRefStore) {
t.Helper()
ref := newRandomAddressRef()
require.NoError(t, store.Add(t.Context(), ref))

key := datastore.NewAddressRefKey(ref.ChainSelector, ref.Type, ref.Version, ref.Qualifier)
require.NoError(t, store.Delete(t.Context(), key))

_, err := store.Get(t.Context(), key)
require.ErrorIs(t, err, datastore.ErrAddressRefNotFound)
},
},
{
name: "delete_nonexistent_key_is_noop",
run: func(t *testing.T, store *catalogAddressRefStore) {
t.Helper()
ref := newRandomAddressRef()
key := datastore.NewAddressRefKey(ref.ChainSelector, ref.Type, ref.Version, ref.Qualifier)
require.NoError(t, store.Delete(t.Context(), key))
},
},
{
name: "delete_already_deleted_is_noop",
run: func(t *testing.T, store *catalogAddressRefStore) {
t.Helper()
ref := newRandomAddressRef()
require.NoError(t, store.Add(t.Context(), ref))

key := datastore.NewAddressRefKey(ref.ChainSelector, ref.Type, ref.Version, ref.Qualifier)
require.NoError(t, store.Delete(t.Context(), key))
require.NoError(t, store.Delete(t.Context(), key))
},
},
{
name: "delete_then_readd",
run: func(t *testing.T, store *catalogAddressRefStore) {
t.Helper()
ref := newRandomAddressRef()
require.NoError(t, store.Add(t.Context(), ref))

key := datastore.NewAddressRefKey(ref.ChainSelector, ref.Type, ref.Version, ref.Qualifier)
require.NoError(t, store.Delete(t.Context(), key))

resurrected := ref
resurrected.Address = "0x" + randomHex(40)
resurrected.Labels = datastore.NewLabelSet("resurrected")
require.NoError(t, store.Add(t.Context(), resurrected))

got, err := store.Get(t.Context(), key)
require.NoError(t, err)
require.Equal(t, resurrected.Address, got.Address)
require.Equal(t, resurrected.Labels.List(), got.Labels.List())
},
},
{
name: "delete_excluded_from_find",
run: func(t *testing.T, store *catalogAddressRefStore) {
t.Helper()
ref := newRandomAddressRef()
require.NoError(t, store.Add(t.Context(), ref))

// Execute
err := store.Delete(t.Context(), key)
key := datastore.NewAddressRefKey(ref.ChainSelector, ref.Type, ref.Version, ref.Qualifier)
_, err := store.Get(t.Context(), key)
require.NoError(t, err)

// Verify
require.Error(t, err)
require.Contains(t, err.Error(), "delete operation not supported")
require.NoError(t, store.Delete(t.Context(), key))

_, err = store.Get(t.Context(), key)
require.ErrorIs(t, err, datastore.ErrAddressRefNotFound)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
store := setupTestStore(t, "test-domain", "catalog_testing")
tt.run(t, store)
})
}
}

func TestCatalogAddressRefStore_FetchAndFilter(t *testing.T) {
Expand Down Expand Up @@ -657,7 +733,7 @@ func setupTestStore(t *testing.T, domain, environment string) *catalogAddressRef
}

// randomHex generates a random hex string of specified length
func randomHex(length int) string {
func randomHex(length int) string { //nolint:unparam // this is a test function and we usually want a 40 digit hex string
bytes := make([]byte, length/2)
if _, err := rand.Read(bytes); err != nil {
panic(fmt.Sprintf("failed to generate random bytes: %v", err))
Expand Down
Loading
Loading