Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions core/integration/dcs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,25 +287,48 @@ func (p *Plugin) Init(instanceId string) error {
in := &dcspb.SubscriptionRequest{
InstanceId: instanceId,
}
evStream, err := p.dcsClient.Subscribe(context.Background(), in, grpc.EmptyCallOption{})
if err != nil {
return fmt.Errorf("failed to subscribe to DCS service on %s, possible network issue or DCS gateway malfunction", viper.GetString("dcsServiceEndpoint"))
}

// Always start the goroutine, even if initial subscription fails
go func() {
var evStream dcspb.Configurator_SubscribeClient
var err error

for {
// Try to establish subscription if we don't have one
if evStream == nil {
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
Debug("attempting to subscribe to DCS service")

evStream, err = p.dcsClient.Subscribe(context.Background(), in, grpc.EmptyCallOption{})
if err != nil {
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
WithError(err).
Warnf("failed to subscribe to DCS service, possible network issue or DCS gateway malfunction")
time.Sleep(3 * time.Second)
continue
} else {
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
WithField("level", infologger.IL_Support).
Info("successfully subscribed to DCS service")
}
}

// Process events from the stream
for {
if evStream == nil {
break
}
ev, streamErr := evStream.Recv()
if streamErr == io.EOF {
log.Info("unexpected EOF from DCS service, possible DCS gateway malfunction")
evStream = nil
break
}

if streamErr != nil {
log.WithError(streamErr).
Error("stream error or bad event from DCS service, dropping stream")
evStream = nil
time.Sleep(3 * time.Second)
break
}
Expand All @@ -330,20 +353,10 @@ func (p *Plugin) Init(instanceId string) error {
Debug("received DCS event")
}

// If we reach here, the stream was dropped and evStream is nil
// The loop will continue and try to reestablish the subscription
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
Info("DCS stream dropped, attempting reconnect")

evStream, err = p.dcsClient.Subscribe(context.Background(), in, grpc.EmptyCallOption{})
if err != nil {
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
WithError(err).
Warnf("failed to resubscribe to DCS service, possible network issue or DCS gateway malfunction")
time.Sleep(3 * time.Second)
} else {
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
WithField("level", infologger.IL_Support).
Info("successfully resubscribed to DCS service")
}
}
}()
}
Expand Down
Loading