Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions cli/aeon/cmd/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,23 @@ import (
// It returns an error if fails to collect a configuration,
// instantiate a cluster config or find an instance in the cluster.
func FillConnectCtx(connectCtx *ConnectCtx, uriOpts libconnect.UriOpts,
instanceName string, dataCollectors libcluster.DataCollectorFactory,
instanceName string, factory libcluster.Factory,
) error {
connOpts := libcluster.ConnectOpts{
Username: connectCtx.Username,
Password: connectCtx.Password,
}
collector, cancel, err := libcluster.CreateDataCollector(dataCollectors,
connOpts, uriOpts)
stor, cleanup, storageType, err := libcluster.NewStorageConnection(connOpts, uriOpts)
if err != nil {
return err
}
defer cancel()
defer cleanup()

collector, err := factory.NewRemoteStorage(stor, uriOpts.Prefix,
uriOpts.Params["key"], uriOpts.Timeout, storageType)
if err != nil {
return fmt.Errorf("failed to create %s collector: %w", storageType, err)
}

rawBytes, err := cluster.CollectDataBytes(context.Background(), collector)
if err != nil {
Expand Down
59 changes: 37 additions & 22 deletions cli/cluster/cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,39 +129,54 @@ func validateInstanceConfig(instCfg goconfig.Config, name string) error {
return nil
}

// createPublisherAndCollector creates a new data publisher and collector based on UriOpts.
func createPublisherAndCollector(
publishers libcluster.DataPublisherFactory,
collectors libcluster.DataCollectorFactory,
// openRemoteCollector dials the remote storage described by opts and binds a
// collector-flavored *RawStorage using the given factory's integrity options.
// The returned cleanup releases the connection.
func openRemoteCollector(factory libcluster.Factory,
connOpts libcluster.ConnectOpts, opts libconnect.UriOpts,
) (libcluster.DataCollector, func(), error) {
stor, cleanup, storageType, err := libcluster.NewStorageConnection(connOpts, opts)
if err != nil {
return nil, nil, err
}
collector, err := factory.NewRemoteStorage(stor, opts.Prefix,
opts.Params["key"], opts.Timeout, storageType)
if err != nil {
cleanup()
return nil, nil, fmt.Errorf("failed to create %s collector: %w", storageType, err)
}
return collector, func() { cleanup() }, nil
}

// openCollectorAndPublisher dials the remote storage described by opts and
// binds two *RawStorage instances on the same connection — one for collecting
// (using collectors' integrity verifiers) and one for publishing (using
// publishers' integrity signer/verifiers).
func openCollectorAndPublisher(
collectors, publishers libcluster.Factory,
connOpts libcluster.ConnectOpts,
opts libconnect.UriOpts,
) (libcluster.DataPublisher, libcluster.DataCollector, func(), error) {
) (libcluster.DataCollector, libcluster.DataPublisher, func(), error) {
prefix, key, timeout := opts.Prefix, opts.Params["key"], opts.Timeout

stor, cleanup, storageType, err := libcluster.NewStorageConnection(connOpts, opts)
if err != nil {
return nil, nil, nil, err
}

var publisher libcluster.DataPublisher
if publishers != nil {
publisher, err = publishers.NewRemoteStorage(stor, prefix, key, timeout, storageType)
if err != nil {
cleanup()
return nil, nil, nil,
fmt.Errorf("failed to create %s publisher: %w", storageType, err)
}
publisher, err := publishers.NewRemoteStorage(stor, prefix, key, timeout, storageType)
if err != nil {
cleanup()
return nil, nil, nil,
fmt.Errorf("failed to create %s publisher: %w", storageType, err)
}

var collector libcluster.DataCollector
if collectors != nil {
collector, err = collectors.NewRemoteStorage(stor, prefix, key, timeout, storageType)
if err != nil {
cleanup()
return nil, nil, nil,
fmt.Errorf("failed to create %s collector: %w", storageType, err)
}
collector, err := collectors.NewRemoteStorage(stor, prefix, key, timeout, storageType)
if err != nil {
cleanup()
return nil, nil, nil,
fmt.Errorf("failed to create %s collector: %w", storageType, err)
}

return publisher, collector, func() { cleanup() }, nil
return collector, publisher, func() { cleanup() }, nil
}
33 changes: 29 additions & 4 deletions cli/cluster/cmd/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,31 @@ type SwitchStatusCtx struct {
TaskID string
}

// connectFailoverStorage dials the config storage at uriOpts and returns a
// RawStorage scoped to the configured prefix. Used by the failover commands
// which talk to the storage directly via Get/Put/Watch. Close on the returned
// storage releases the underlying connection.
func connectFailoverStorage(uriOpts connect.UriOpts,
connOpts libcluster.ConnectOpts,
) (*libcluster.RawStorage, error) {
stor, cleanup, storageType, err := libcluster.NewStorageConnection(connOpts, uriOpts)
if err != nil {
return nil, fmt.Errorf("unable to connect to config storage: %w", err)
}

// Failover commands live in a sibling namespace to cluster config; bind the
// storage with an empty objectLocation so keys are not implicitly nested
// under "/config".
raw, err := libcluster.NewStorage(stor, uriOpts.Prefix, uriOpts.Timeout, "",
storageType, nil, "")
if err != nil {
cleanup()
return nil, fmt.Errorf("unable to bind %s storage: %w", storageType, err)
}
raw.SetCleanup(cleanup)
return raw, nil
}

// Switch master instance.
func Switch(url string, switchCtx SwitchCtx) error {
uriOpts, err := connect.CreateUriOpts(url)
Expand All @@ -72,9 +97,9 @@ func Switch(url string, switchCtx SwitchCtx) error {
Password: switchCtx.Password,
}

conn, err := libcluster.ConnectCStorage(uriOpts, connOpts)
conn, err := connectFailoverStorage(uriOpts, connOpts)
if err != nil {
return fmt.Errorf("unable to connect to config storage: %w", err)
return err
}
defer conn.Close()

Expand Down Expand Up @@ -151,9 +176,9 @@ func SwitchStatus(url string, switchCtx SwitchStatusCtx) error {
return fmt.Errorf("invalid URL %q: %w", url, err)
}
var connOpts libcluster.ConnectOpts
conn, err := libcluster.ConnectCStorage(uriOpts, connOpts)
conn, err := connectFailoverStorage(uriOpts, connOpts)
if err != nil {
return fmt.Errorf("unable to connect to config storage: %w", err)
return err
}
defer conn.Close()

Expand Down
15 changes: 6 additions & 9 deletions cli/cluster/cmd/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ type PublishCtx struct {
// is omitted.
Force bool
// Publishers defines a used data publishers factory.
Publishers libcluster.DataPublisherFactory
Publishers libcluster.Factory
// Collectors defines a used data collectors factory.
Collectors libcluster.DataCollectorFactory
Collectors libcluster.Factory
// Src is raw YAML data to publish.
Src []byte
// Config is the decoded payload from Src (map[string]any from YAML
Expand All @@ -47,9 +47,9 @@ func PublishUri(publishCtx PublishCtx, opts connect.UriOpts) error {
Username: publishCtx.Username,
Password: publishCtx.Password,
}
publisher, collector, cancel, err := createPublisherAndCollector(
publishCtx.Publishers,
collector, publisher, cancel, err := openCollectorAndPublisher(
publishCtx.Collectors,
publishCtx.Publishers,
connOpts, opts)
if err != nil {
return err
Expand Down Expand Up @@ -82,7 +82,7 @@ func PublishCluster(publishCtx PublishCtx, path, instance string) error {
return err
}

publisher, err := publishCtx.Publishers.NewFile(path)
publisher, err := publishCtx.Publishers.NewFilePublisher(path)
if err != nil {
return fmt.Errorf("failed to create a file publisher: %w", err)
}
Expand All @@ -92,10 +92,7 @@ func PublishCluster(publishCtx PublishCtx, path, instance string) error {
return publisher.Publish(0, publishCtx.Src)
}

collector, err := publishCtx.Collectors.NewFile(path)
if err != nil {
return fmt.Errorf("failed to create a file collector: %w", err)
}
collector := publishCtx.Collectors.NewFileCollector(path)
collectedBytes, err := cluster.CollectDataBytes(context.Background(), collector)
if err != nil {
return fmt.Errorf("failed to get a cluster configuration to update an instance %q: %w",
Expand Down
4 changes: 2 additions & 2 deletions cli/cluster/cmd/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ func TestPublishCluster_FileWithIntegrity_Errors(t *testing.T) {

ctx := PublishCtx{
Force: true, // skip validation
Publishers: libcluster.NewDataPublisherFactory(
Publishers: libcluster.NewFactory(
libcluster.WithIntegrity(libcluster.IntegrityOptions{}),
),
Collectors: libcluster.NewDataCollectorFactory(),
Collectors: libcluster.NewFactory(),
Src: src,
}

Expand Down
49 changes: 22 additions & 27 deletions cli/cluster/cmd/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (publisher dataKeyPublisher) Publish(key string, revision int64, data []byt
}

// makeStoragePublisher creates publisher for a generic remote storage.
func makeStoragePublisher(factory libcluster.DataPublisherFactory,
func makeStoragePublisher(factory libcluster.Factory,
storage storage.Storage, storageType, prefix string, timeout time.Duration,
) replicaset.DataPublisher {
return dataKeyPublisher(func(key string, revision int64, data []byte) error {
Expand All @@ -38,9 +38,9 @@ type PromoteCtx struct {
// InstName is an instance name to promote.
InstName string
// Publishers is data publisher factory.
Publishers libcluster.DataPublisherFactory
Publishers libcluster.Factory
// Collectors is data collector factory.
Collectors libcluster.DataCollectorFactory
Collectors libcluster.Factory
// Username defines a username for connection.
Username string
// Password defines a password for connection.
Expand Down Expand Up @@ -81,32 +81,27 @@ func pickPatchKey(keys []string, force bool, pathMsg string) (int, error) {
return pos, nil
}

// createDataCollectorAndKeyPublisher creates a new data collector and key publisher.
// createDataCollectorAndKeyPublisher creates a new data collector and a
// per-key replicaset.DataPublisher (one fresh RawStorage is built per Publish
// call so each call can target a different key).
func createDataCollectorAndKeyPublisher(
collectors libcluster.DataCollectorFactory,
publishers libcluster.DataPublisherFactory,
opts connect.UriOpts, connOpts libcluster.ConnectOpts) (
libcluster.DataCollector, replicaset.DataPublisher, func(), error,
) {
collectors libcluster.Factory,
publishers libcluster.Factory,
opts connect.UriOpts, connOpts libcluster.ConnectOpts,
) (libcluster.DataCollector, replicaset.DataPublisher, func(), error) {
prefix, key, timeout := opts.Prefix, opts.Params["key"], opts.Timeout
storage, closeFunc, storageType, err := libcluster.NewStorageConnection(connOpts, opts)
stor, closeFunc, storageType, err := libcluster.NewStorageConnection(connOpts, opts)
if err != nil {
return nil, nil, nil, err
}

var collector libcluster.DataCollector
if collectors != nil {
collector, err = collectors.NewRemoteStorage(storage, prefix, key, timeout, storageType)
if err != nil {
closeFunc()
return nil, nil, nil, fmt.Errorf("failed to create storage collector: %w", err)
}
collector, err := collectors.NewRemoteStorage(stor, prefix, key, timeout, storageType)
if err != nil {
closeFunc()
return nil, nil, nil, fmt.Errorf("failed to create storage collector: %w", err)
}

var publisher replicaset.DataPublisher
if publishers != nil {
publisher = makeStoragePublisher(publishers, storage, storageType, prefix, timeout)
}
publisher := makeStoragePublisher(publishers, stor, storageType, prefix, timeout)

return collector, publisher, closeFunc, nil
}
Expand Down Expand Up @@ -146,9 +141,9 @@ type DemoteCtx struct {
// InstName is an instance name to demote.
InstName string
// Publishers is data publisher factory.
Publishers libcluster.DataPublisherFactory
Publishers libcluster.Factory
// Collectors is data collector factory.
Collectors libcluster.DataCollectorFactory
Collectors libcluster.Factory
// Username defines a username for connection.
Username string
// Password defines a password for connection.
Expand Down Expand Up @@ -193,9 +188,9 @@ type ExpelCtx struct {
// InstName is an instance name to demote.
InstName string
// Publishers is data publisher factory.
Publishers libcluster.DataPublisherFactory
Publishers libcluster.Factory
// Collectors is data collector factory.
Collectors libcluster.DataCollectorFactory
Collectors libcluster.Factory
// Username defines a username for connection.
Username string
// Password defines a password for connection.
Expand Down Expand Up @@ -247,9 +242,9 @@ type RolesChangeCtx struct {
// RoleName is a name of role to add/remove.
RoleName string
// Publishers is data publisher factory.
Publishers libcluster.DataPublisherFactory
Publishers libcluster.Factory
// Collectors is data collector factory.
Collectors libcluster.DataCollectorFactory
Collectors libcluster.Factory
// Username defines a username for connection.
Username string
// Password defines a password for connection.
Expand Down
7 changes: 2 additions & 5 deletions cli/cluster/cmd/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ShowCtx struct {
// Password defines a password for connection.
Password string
// Collectors defines a used data collectors factory for URI-based show.
Collectors libcluster.DataCollectorFactory
Collectors libcluster.Factory
// Integrity holds the integrity context used for file-based show.
Integrity integrity.IntegrityCtx
// Validate defines whether the command will check the showed
Expand All @@ -32,10 +32,7 @@ func ShowUri(showCtx ShowCtx, opts connect.UriOpts) error {
Username: showCtx.Username,
Password: showCtx.Password,
}
_, collector, cancel, err := createPublisherAndCollector(
nil,
showCtx.Collectors,
connOpts, opts)
collector, cancel, err := openRemoteCollector(showCtx.Collectors, connOpts, opts)
if err != nil {
return err
}
Expand Down
Loading
Loading