Skip to content

Commit 5b31372

Browse files
Copilotknopers8
andauthored
OCTRL-1008 Attempt reconnecting to ecs-dcs gateway at core startup (#734)
* Initial plan * Initial repository exploration and dependency setup Co-authored-by: knopers8 <14327588+knopers8@users.noreply.github.com> * Fix DCS plugin to reconnect even after initialization failure - Move subscription logic into goroutine to enable continuous retry - Always start reconnection goroutine regardless of initial connection state - Plugin initialization now succeeds even if DCS gateway is unavailable - Add comprehensive test for unavailable gateway scenario - Resolves issue where plugin never attempted reconnection after init failure Co-authored-by: knopers8 <14327588+knopers8@users.noreply.github.com> * Remove unrelated proto-generated files from DCS plugin fix PR Co-authored-by: knopers8 <14327588+knopers8@users.noreply.github.com> * remove test which tests almost nothing --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: knopers8 <14327588+knopers8@users.noreply.github.com> Co-authored-by: Piotr Konopka <piotr.jan.konopka@cern.ch>
1 parent fd5d7a5 commit 5b31372

File tree

1 file changed

+29
-16
lines changed

1 file changed

+29
-16
lines changed

core/integration/dcs/plugin.go

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -289,25 +289,48 @@ func (p *Plugin) Init(instanceId string) error {
289289
in := &dcspb.SubscriptionRequest{
290290
InstanceId: instanceId,
291291
}
292-
evStream, err := p.dcsClient.Subscribe(context.Background(), in, grpc.EmptyCallOption{})
293-
if err != nil {
294-
return fmt.Errorf("failed to subscribe to DCS service on %s, possible network issue or DCS gateway malfunction", viper.GetString("dcsServiceEndpoint"))
295-
}
292+
293+
// Always start the goroutine, even if initial subscription fails
296294
go func() {
295+
var evStream dcspb.Configurator_SubscribeClient
296+
var err error
297+
297298
for {
299+
// Try to establish subscription if we don't have one
300+
if evStream == nil {
301+
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
302+
Debug("attempting to subscribe to DCS service")
303+
304+
evStream, err = p.dcsClient.Subscribe(context.Background(), in, grpc.EmptyCallOption{})
305+
if err != nil {
306+
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
307+
WithError(err).
308+
Warnf("failed to subscribe to DCS service, possible network issue or DCS gateway malfunction")
309+
time.Sleep(3 * time.Second)
310+
continue
311+
} else {
312+
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
313+
WithField("level", infologger.IL_Support).
314+
Info("successfully subscribed to DCS service")
315+
}
316+
}
317+
318+
// Process events from the stream
298319
for {
299320
if evStream == nil {
300321
break
301322
}
302323
ev, streamErr := evStream.Recv()
303324
if streamErr == io.EOF {
304325
log.Info("unexpected EOF from DCS service, possible DCS gateway malfunction")
326+
evStream = nil
305327
break
306328
}
307329

308330
if streamErr != nil {
309331
log.WithError(streamErr).
310332
Error("stream error or bad event from DCS service, dropping stream")
333+
evStream = nil
311334
time.Sleep(3 * time.Second)
312335
break
313336
}
@@ -332,20 +355,10 @@ func (p *Plugin) Init(instanceId string) error {
332355
Debug("received DCS event")
333356
}
334357

358+
// If we reach here, the stream was dropped and evStream is nil
359+
// The loop will continue and try to reestablish the subscription
335360
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
336361
Info("DCS stream dropped, attempting reconnect")
337-
338-
evStream, err = p.dcsClient.Subscribe(context.Background(), in, grpc.EmptyCallOption{})
339-
if err != nil {
340-
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
341-
WithError(err).
342-
Warnf("failed to resubscribe to DCS service, possible network issue or DCS gateway malfunction")
343-
time.Sleep(3 * time.Second)
344-
} else {
345-
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
346-
WithField("level", infologger.IL_Support).
347-
Info("successfully resubscribed to DCS service")
348-
}
349362
}
350363
}()
351364
}

0 commit comments

Comments
 (0)