Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ The server exposes the following gRPC services:
- `GetLatestChannelEntriesThatProvide(group, version, kind)` - Stream latest entries providing an API
- `GetDefaultBundleThatProvides(group, version, kind)` - Get default bundle providing an API
- `ListBundles()` - Stream all bundles
- `ListPackageCustomSchemas(schema, packageName)` - Stream custom schema FBC objects as `google.protobuf.Struct`; empty `packageName` queries packageless blobs

**Health Service:**
- Standard gRPC health check service for monitoring
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ fix-lint: $(GOLANGCI_LINT)

.PHONY: codegen
codegen: $(PROTOC) $(PROTOC_GEN_GO_GRPC)
$(PROTOC) --plugin=protoc-gen-go=$(PROTOC_GEN_GO_GRPC) -I pkg/api/ --go_out=pkg/api pkg/api/*.proto
$(PROTOC) --plugin=protoc-gen-go-grpc=$(PROTOC_GEN_GO_GRPC) -I pkg/api/ --go-grpc_out=pkg/api pkg/api/*.proto
$(PROTOC) --plugin=protoc-gen-go=$(PROTOC_GEN_GO_GRPC) -I pkg/api/ -I ./tools/bin/include --go_out=pkg/api pkg/api/*.proto
$(PROTOC) --plugin=protoc-gen-go-grpc=$(PROTOC_GEN_GO_GRPC) -I pkg/api/ -I ./tools/bin/include --go-grpc_out=pkg/api pkg/api/*.proto
Comment thread
perdasilva marked this conversation as resolved.

.PHONY: generate-fakes
generate-fakes:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ GetBundleForChannel
GetBundleThatReplaces
GetChannelEntriesThatProvide
GetChannelEntriesThatReplace
ListPackageCustomSchemas
GetDefaultBundleThatProvides
Comment thread
perdasilva marked this conversation as resolved.
GetLatestChannelEntriesThatProvide
GetPackage
Expand Down
1,058 changes: 376 additions & 682 deletions pkg/api/registry.pb.go

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion pkg/api/registry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ syntax = "proto3";

option go_package = ".;api";

import "google/protobuf/struct.proto";

package api;

service Registry {
Expand All @@ -17,6 +19,7 @@ service Registry {
rpc GetLatestChannelEntriesThatProvide(GetLatestProvidersRequest) returns (stream ChannelEntry) {}
rpc GetDefaultBundleThatProvides(GetDefaultProviderRequest) returns (Bundle) {}
rpc ListBundles(ListBundlesRequest) returns (stream Bundle) {}
rpc ListPackageCustomSchemas(ListPackageCustomSchemasRequest) returns (stream google.protobuf.Struct) {}
}

message Channel{
Expand Down Expand Up @@ -130,4 +133,10 @@ message GetDefaultProviderRequest{

message Deprecation{
string message = 1;
}
}

message ListPackageCustomSchemasRequest{
string schema = 1;
string packageName = 2;
}

65 changes: 65 additions & 0 deletions pkg/api/registry_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type Cache interface {
Build(ctx context.Context, fbc fs.FS) error
Load(ctc context.Context) error
Close() error

ListPackageCustomSchemas(ctx context.Context, schema, packageName string, sender func([]byte) error) error
}

type backend interface {
Expand All @@ -45,6 +47,9 @@ type backend interface {
GetBundle(context.Context, bundleKey) (*api.Bundle, error)
PutBundle(context.Context, bundleKey, *api.Bundle) error

PutMeta(context.Context, metaKey, []byte) error
SendMetas(context.Context, metaKey, func([]byte) error) error

GetDigest(context.Context) (string, error)
ComputeDigest(context.Context, fs.FS) (string, error)
PutDigest(context.Context, string) error
Expand Down Expand Up @@ -237,6 +242,14 @@ func (c *cache) GetBundleThatProvides(ctx context.Context, group, version, kind
return c.packageIndex.GetBundleThatProvides(ctx, c, group, version, kind)
}

func (c *cache) ListPackageCustomSchemas(ctx context.Context, schema, packageName string, sender func([]byte) error) error {
mk, err := newValidatedMetaKey(schema, packageName)
if err != nil {
return fmt.Errorf("invalid custom schema query: %v", err)
}
return c.backend.SendMetas(ctx, mk, sender)
}

func (c *cache) CheckIntegrity(ctx context.Context, fbc fs.FS) error {
existingDigest, err := c.backend.GetDigest(ctx)
if err != nil {
Expand Down Expand Up @@ -290,6 +303,21 @@ func (c *cache) Build(ctx context.Context, fbcFsys fs.FS) error {

walkMu.Lock()
defer walkMu.Unlock()

switch meta.Schema {
case declcfg.SchemaPackage, declcfg.SchemaChannel, declcfg.SchemaBundle, declcfg.SchemaDeprecation:
default:
mk, err := newValidatedMetaKey(meta.Schema, packageName)
if err != nil {
return fmt.Errorf("invalid custom schema meta: %v", err)
}
if err := c.backend.PutMeta(ctx, mk, meta.Blob); err != nil {
return fmt.Errorf("store custom schema meta %v: %v", mk, err)
}
if packageName == "" {
return nil
}
}
if _, err := tmpFile.Write(meta.Blob); err != nil {
return err
}
Expand Down
157 changes: 157 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,163 @@ func TestCache_ListPackages(t *testing.T) {
}
}

var customSchemaFS = fstest.MapFS{
".": &fstest.MapFile{
Mode: fs.ModeDir,
},
"catalog.json": &fstest.MapFile{
Data: []byte(`{
"schema": "olm.package",
"name": "testpkg",
"defaultChannel": "stable"
}
{
"schema": "olm.channel",
"package": "testpkg",
"name": "stable",
"entries": [{"name": "testpkg.v1.0.0"}]
}
{
"schema": "olm.bundle",
"name": "testpkg.v1.0.0",
"package": "testpkg",
"image": "quay.io/test/testpkg:v1.0.0",
"properties": [{"type": "olm.package", "value": {"packageName": "testpkg", "version": "1.0.0"}}]
}
{
"schema": "custom.operator.io",
"package": "testpkg",
"name": "my-custom-resource",
"customField": "customValue"
}
{
"schema": "custom.operator.io",
"package": "testpkg",
"name": "another-custom-resource",
"data": {"key": "value"}
}
{
"schema": "other.custom.schema",
"package": "testpkg",
"name": "other-custom",
"info": "test"
}
`),
},
}

func TestCache_ListPackageCustomSchemas(t *testing.T) {
for name, testCache := range genTestCaches(t, customSchemaFS) {
t.Run(name, func(t *testing.T) {
collect := func(schema, pkg string) []string {
t.Helper()
var blobs []string
err := testCache.ListPackageCustomSchemas(context.TODO(), schema, pkg,
func(blob []byte) error {
blobs = append(blobs, string(blob))
return nil
})
require.NoError(t, err)
return blobs
}

// Multiple blobs for same (schema, package)
blobs := collect("custom.operator.io", "testpkg")
require.Len(t, blobs, 2)

// Different schema
blobs = collect("other.custom.schema", "testpkg")
require.Len(t, blobs, 1)
require.Contains(t, blobs[0], `"info"`)

// Nonexistent schema returns empty
blobs = collect("nonexistent.schema", "testpkg")
require.Empty(t, blobs)

// Nonexistent package returns empty
blobs = collect("custom.operator.io", "nonexistent")
require.Empty(t, blobs)
})
}
}

func TestCache_ListPackageCustomSchemas_PackagelessBlob(t *testing.T) {
packagelessFS := fstest.MapFS{
".": &fstest.MapFile{
Mode: fs.ModeDir,
},
"catalog.json": &fstest.MapFile{
Data: []byte(`{
"schema": "olm.package",
"name": "testpkg",
"defaultChannel": "stable"
}
{
"schema": "olm.channel",
"package": "testpkg",
"name": "stable",
"entries": [{"name": "testpkg.v1.0.0"}]
}
{
"schema": "olm.bundle",
"name": "testpkg.v1.0.0",
"package": "testpkg",
"image": "quay.io/test/testpkg:v1.0.0",
"properties": [{"type": "olm.package", "value": {"packageName": "testpkg", "version": "1.0.0"}}]
}
{
"schema": "custom.packageless",
"data": "no-package-blob"
}
`),
},
}
for name, testCache := range genTestCaches(t, packagelessFS) {
t.Run(name, func(t *testing.T) {
// Querying with empty packageName returns packageless blobs
var blobs []string
err := testCache.ListPackageCustomSchemas(context.TODO(), "custom.packageless", "",
func(blob []byte) error {
blobs = append(blobs, string(blob))
return nil
})
require.NoError(t, err)
require.Len(t, blobs, 1)
require.Contains(t, blobs[0], `"no-package-blob"`)

// Querying with a specific packageName does not return packageless blobs
var blobs2 []string
err = testCache.ListPackageCustomSchemas(context.TODO(), "custom.packageless", "testpkg",
func(blob []byte) error {
blobs2 = append(blobs2, string(blob))
return nil
})
require.NoError(t, err)
require.Empty(t, blobs2)

// Packageless blobs must not leak into the package index
packages, err := testCache.ListPackages(context.TODO())
require.NoError(t, err)
require.Equal(t, []string{"testpkg"}, packages)
})
}
}

func TestCache_ListPackageCustomSchemas_NoCustomSchemas(t *testing.T) {
for name, testCache := range genTestCaches(t, validFS) {
t.Run(name, func(t *testing.T) {
var blobs []string
err := testCache.ListPackageCustomSchemas(context.TODO(), "custom.operator.io", "cockroachdb",
func(blob []byte) error {
blobs = append(blobs, string(blob))
return nil
})
require.NoError(t, err)
require.Empty(t, blobs)
})
}
}

func genTestCaches(t *testing.T, fbcFS fs.FS) map[string]Cache {
t.Helper()

Expand Down
Loading