Skip to content

Commit b0beded

Browse files
[OGUI-1754] Create reconnection scheduler (#3034)
* In the scope of the PR the reconnection scheduling was introduced in order to have an automatic reconnaction fallback logic in case of loosing connection to the CentralSystem
1 parent 4efd384 commit b0beded

File tree

4 files changed

+228
-18
lines changed

4 files changed

+228
-18
lines changed

Tokenization/backend/wrapper/src/client/connectionManager/CentralConnection.ts

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import type * as grpc from '@grpc/grpc-js';
1616
import { LogManager } from '@aliceo2/web-ui';
1717
import type { CentralCommandDispatcher } from './eventManagement/CentralCommandDispatcher';
1818
import type { DuplexMessageModel } from '../../models/message.model';
19+
import { ReconnectionScheduler } from '../../utils/connection/reconnectionScheduler';
1920

2021
/**
2122
* This class manages the duplex stream with the CentralSystem gRPC service.
@@ -24,6 +25,11 @@ import type { DuplexMessageModel } from '../../models/message.model';
2425
export class CentralConnection {
2526
private _logger = LogManager.getLogger('CentralConnection');
2627
private _stream?: grpc.ClientDuplexStream<unknown, unknown>;
28+
private _reconnectionScheduler: ReconnectionScheduler = new ReconnectionScheduler(
29+
() => this.connect(),
30+
{ initialDelay: 1000, maxDelay: 30000 },
31+
this._logger
32+
);
2733

2834
/**
2935
* Constructor for the CentralConnection class.
@@ -44,32 +50,23 @@ export class CentralConnection {
4450

4551
this._stream?.on('data', (payload: DuplexMessageModel) => {
4652
this._logger.debugMessage(`Received payload: ${JSON.stringify(payload)}`);
53+
this._reconnectionScheduler.reset();
4754
this._dispatcher.dispatch(payload);
4855
});
4956

5057
this._stream?.on('end', () => {
5158
this._logger.infoMessage(`Stream ended, attempting to reconnect...`);
5259
this._stream = undefined;
53-
this.scheduleReconnect();
60+
this._reconnectionScheduler.schedule();
5461
});
5562

5663
this._stream?.on('error', (err: any) => {
5764
this._logger.infoMessage('Stream error:', err, ' attempting to reconnect...');
5865
this._stream = undefined;
59-
this.scheduleReconnect();
66+
this._reconnectionScheduler.schedule();
6067
});
6168
}
6269

63-
/**
64-
* Schedules a reconnect with exponential backoff.
65-
*/
66-
private scheduleReconnect() {
67-
setTimeout(() => {
68-
this._logger.infoMessage(`Trying to reconnect...`);
69-
this.connect();
70-
}, 2000);
71-
}
72-
7370
/**
7471
* Starts the connection to the central system.
7572
*/
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/**
2+
* @license
3+
* Copyright 2019-2020 CERN and copyright holders of ALICE O2.
4+
* See http://alice-o2.web.cern.ch/copyright for details of the copyright holders.
5+
* All rights not expressly granted are reserved.
6+
*
7+
* This software is distributed under the terms of the GNU General Public
8+
* License v3 (GPL Version 3), copied verbatim in the file "COPYING".
9+
*
10+
* In applying this license CERN does not waive the privileges and immunities
11+
* granted to it by virtue of its status as an Intergovernmental Organization
12+
* or submit itself to any jurisdiction.
13+
*/
14+
15+
import { ReconnectionScheduler } from '../../../utils/connection/reconnectionScheduler';
16+
17+
describe('ReconnectionScheduler', () => {
18+
let reconnectCallback: jest.Mock;
19+
let logger: { infoMessage: jest.Mock; errorMessage?: jest.Mock };
20+
let scheduler: ReconnectionScheduler;
21+
22+
beforeEach(() => {
23+
jest.useFakeTimers();
24+
reconnectCallback = jest.fn();
25+
logger = {
26+
infoMessage: jest.fn(),
27+
errorMessage: jest.fn(),
28+
};
29+
30+
scheduler = new ReconnectionScheduler(
31+
reconnectCallback,
32+
{
33+
initialDelay: 1000,
34+
maxDelay: 8000,
35+
},
36+
logger as any
37+
);
38+
});
39+
40+
afterEach(() => {
41+
jest.clearAllTimers();
42+
jest.useRealTimers();
43+
});
44+
45+
test("schedule's first attempt should schedule and call reconnectCallback", () => {
46+
scheduler.schedule();
47+
48+
expect(logger.infoMessage).toHaveBeenCalledWith('Recconection attempt #1: Sleep for 2000 ms.');
49+
50+
expect(reconnectCallback).not.toHaveBeenCalled();
51+
jest.advanceTimersByTime(1999);
52+
expect(reconnectCallback).not.toHaveBeenCalled();
53+
jest.advanceTimersByTime(1);
54+
expect(reconnectCallback).toHaveBeenCalledTimes(1);
55+
});
56+
57+
test('Schedule attempts should be exponential', () => {
58+
scheduler.schedule();
59+
jest.advanceTimersByTime(2000);
60+
61+
scheduler.schedule();
62+
expect(logger.infoMessage).toHaveBeenLastCalledWith('Recconection attempt #2: Sleep for 4000 ms.');
63+
jest.advanceTimersByTime(4000);
64+
expect(reconnectCallback).toHaveBeenCalledTimes(2);
65+
});
66+
67+
test("schedule's delay should be limited by maxDelay", () => {
68+
scheduler = new ReconnectionScheduler(
69+
reconnectCallback,
70+
{
71+
initialDelay: 1000,
72+
maxDelay: 3000,
73+
},
74+
logger as any
75+
);
76+
77+
scheduler.schedule();
78+
expect(logger.infoMessage).toHaveBeenLastCalledWith('Recconection attempt #1: Sleep for 2000 ms.');
79+
jest.advanceTimersByTime(2000);
80+
81+
scheduler.schedule();
82+
expect(logger.infoMessage).toHaveBeenLastCalledWith('Recconection attempt #2: Sleep for 3000 ms.');
83+
jest.advanceTimersByTime(3000);
84+
85+
scheduler.schedule();
86+
expect(logger.infoMessage).toHaveBeenLastCalledWith('Recconection attempt #3: Sleep for 3000 ms.');
87+
});
88+
89+
test('schedule() should not schedule again if it is scheduled', () => {
90+
scheduler.schedule();
91+
scheduler.schedule();
92+
93+
expect(logger.infoMessage).toHaveBeenCalledTimes(1);
94+
95+
jest.advanceTimersByTime(100000);
96+
expect(reconnectCallback).toHaveBeenCalledTimes(1);
97+
});
98+
99+
test('reset() should clear timer, reset attemptCount and currentDelay', () => {
100+
scheduler.schedule();
101+
102+
jest.advanceTimersByTime(500);
103+
expect(reconnectCallback).not.toHaveBeenCalled();
104+
105+
scheduler.reset();
106+
107+
jest.advanceTimersByTime(100000);
108+
expect(reconnectCallback).not.toHaveBeenCalled();
109+
110+
scheduler.schedule();
111+
expect(logger.infoMessage).toHaveBeenLastCalledWith('Recconection attempt #1: Sleep for 2000 ms.');
112+
});
113+
114+
test('reset() should ignore another reset due to isResseting variable', () => {
115+
scheduler.schedule();
116+
scheduler.reset();
117+
scheduler.reset();
118+
scheduler.schedule();
119+
120+
jest.advanceTimersByTime(2000);
121+
expect(reconnectCallback).toHaveBeenCalledTimes(1);
122+
});
123+
});
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/**
2+
* @license
3+
* Copyright 2019-2020 CERN and copyright holders of ALICE O2.
4+
* See http://alice-o2.web.cern.ch/copyright for details of the copyright holders.
5+
* All rights not expressly granted are reserved.
6+
*
7+
* This software is distributed under the terms of the GNU General Public
8+
* License v3 (GPL Version 3), copied verbatim in the file "COPYING".
9+
*
10+
* In applying this license CERN does not waive the privileges and immunities
11+
* granted to it by virtue of its status as an Intergovernmental Organization
12+
* or submit itself to any jurisdiction.
13+
*/
14+
15+
export interface ReconnectionOptions {
16+
initialDelay?: number; // Initial delay in ms
17+
maxDelay?: number; // Maximum delay in ms
18+
}
19+
20+
/**
21+
* A scheduler that manages reconnection attempts with an exponential backoff.
22+
*/
23+
export class ReconnectionScheduler {
24+
private reconnectCallback: any;
25+
private initialDelay: number;
26+
private maxDelay: number;
27+
private currentDelay: number;
28+
private attemptCount: number;
29+
private timeoutId: any;
30+
private logger: Logger;
31+
32+
private isResetting: boolean = false;
33+
private isScheduling: boolean = false;
34+
35+
/**
36+
* Creates a new instance of the ReconnectionScheduler.
37+
* @param {any} reconnectCallback - The callback to be called when a reconnection attempt is scheduled.
38+
* @param {ReconnectionOptions} [options] - Options for the reconnection schedule.
39+
* @param {Logger} logger - The logger instance to be used for logging messages.
40+
*/
41+
constructor(reconnectCallback: any, options: ReconnectionOptions = {}, logger: Logger) {
42+
this.reconnectCallback = reconnectCallback;
43+
this.initialDelay = options.initialDelay ?? 1000;
44+
this.maxDelay = options.maxDelay ?? 30000;
45+
46+
this.currentDelay = this.initialDelay;
47+
this.attemptCount = 0;
48+
this.timeoutId = null;
49+
50+
this.logger = logger;
51+
}
52+
53+
/**
54+
* Schedules the next reconnection attempt using exponential backoff.
55+
*/
56+
schedule() {
57+
if (this.isScheduling) return;
58+
this.isScheduling = true;
59+
this.isResetting = false;
60+
this.attemptCount++;
61+
62+
// Exponential backoff calculation
63+
const delay = this.initialDelay * Math.pow(2, this.attemptCount);
64+
65+
this.currentDelay = Math.min(this.maxDelay, delay);
66+
67+
this.logger.infoMessage(`Recconection attempt #${this.attemptCount}: Sleep for ${this.currentDelay.toFixed(0)} ms.`);
68+
69+
// Plan the reconnection attempt
70+
this.timeoutId = setTimeout(() => {
71+
this.isScheduling = false;
72+
this.reconnectCallback();
73+
}, this.currentDelay);
74+
}
75+
76+
/**
77+
* Resets the scheduler to its initial state.
78+
*/
79+
reset() {
80+
if (this.isResetting) return;
81+
this.isScheduling = false;
82+
this.isResetting = true;
83+
84+
clearTimeout(this.timeoutId);
85+
this.attemptCount = 0;
86+
this.currentDelay = this.initialDelay;
87+
}
88+
}

Tokenization/backend/wrapper/src/utils/types/webui.d.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414

1515
declare module '@aliceo2/web-ui' {
1616
export const LogManager: {
17-
getLogger: (name: string) => {
18-
infoMessage: (...args: any[]) => void;
19-
errorMessage: (...args: any[]) => void;
20-
warnMessage: (...args: any[]) => void;
21-
debugMessage: (...args: any[]) => void;
22-
};
17+
getLogger: (name: string) => Logger;
2318
};
2419
}
20+
21+
declare interface Logger {
22+
infoMessage: (...args: any[]) => void;
23+
errorMessage: (...args: any[]) => void;
24+
warnMessage: (...args: any[]) => void;
25+
debugMessage: (...args: any[]) => void;
26+
}

0 commit comments

Comments
 (0)