Skip to content

Commit 86a3431

Browse files
[OGUI-1747] Create simple client p2p connection (#3021)
* implements fetching to other clients via P2P connection and handles listnening on peer-end.
1 parent ea83779 commit 86a3431

File tree

13 files changed

+912
-171
lines changed

13 files changed

+912
-171
lines changed

Tokenization/backend/proto/wrapper.proto

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ service CentralSystem {
2525
rpc ClientStream(stream Payload) returns (stream Payload);
2626
}
2727

28+
// Peer2Peer service handling HTTP-like requests between wrapper clients
29+
service Peer2Peer {
30+
rpc Fetch (HttpLikeRequest) returns (HttpLikeResponse);
31+
}
32+
33+
2834
// ======================================
2935
// MESSAGES
3036
// ======================================
@@ -51,14 +57,37 @@ message Payload {
5157
}
5258
}
5359

60+
// http method enum
61+
enum HttpMethod {
62+
HTTP_METHOD_UNSPECIFIED = 0;
63+
GET = 1;
64+
POST = 2;
65+
PUT = 3;
66+
PATCH = 4;
67+
DELETE = 5;
68+
HEAD = 6;
69+
OPTIONS = 7;
70+
}
71+
72+
message HttpLikeRequest {
73+
HttpMethod method = 1; // GET/POST/...
74+
string path = 2; // request path e.g. "/orders/add"
75+
map<string,string> headers = 3; // "content-type": "application/json"
76+
bytes body = 4; // body (e.g. JSON)
77+
}
78+
79+
message HttpLikeResponse {
80+
int32 status = 1;
81+
map<string,string> headers = 2;
82+
bytes body = 3;
83+
}
84+
85+
5486
// ======================================
5587
// ENUMS
5688
// ======================================
5789

5890
enum MessageEvent {
59-
// Unspecified event type
60-
MESSAGE_EVENT_UNSPECIFIED = 0;
61-
6291
// Default value, represents an empty event
6392
MESSAGE_EVENT_EMPTY = 0;
6493

Tokenization/backend/wrapper/src/client/commands/newToken/newToken.handler.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ export class NewTokenHandler implements CommandHandler<NewTokenCommand> {
4646

4747
for (const dir of directions) {
4848
let conn = this.manager.getConnectionByAddress(targetAddress, dir);
49-
conn ??= this.manager.createNewConnection(targetAddress, dir, token);
49+
conn ??= await this.manager.createNewConnection(targetAddress, dir, token);
5050
conn.token = token;
5151
}
5252
}

Tokenization/backend/wrapper/src/client/connection/Connection.ts

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
*/
1414

1515
import type { ConnectionDirection } from '../../models/message.model';
16+
import type { ConnectionHeaders, FetchOptions, FetchResponse } from '../../models/connection.model';
1617
import { ConnectionStatus } from '../../models/connection.model';
18+
import * as grpc from '@grpc/grpc-js';
1719

1820
/**
1921
* @description This class represents a connection to a target client and manages sending messages to it.
@@ -22,6 +24,7 @@ export class Connection {
2224
private _token: string;
2325
private _targetAddress: string;
2426
private _status: ConnectionStatus;
27+
private _peerClient: any;
2528
public direction: ConnectionDirection;
2629

2730
/**
@@ -31,9 +34,10 @@ export class Connection {
3134
* @param targetAddress - The unique address of the target client.
3235
* @param direction - The direction of the connection (e.g., sending or receiving).
3336
*/
34-
constructor(token: string, targetAddress: string, direction: ConnectionDirection) {
37+
constructor(token: string, targetAddress: string, direction: ConnectionDirection, peerCtor: any) {
3538
this._token = token;
3639
this._targetAddress = targetAddress;
40+
this._peerClient = new peerCtor(targetAddress, grpc.credentials.createInsecure());
3741
this.direction = direction;
3842

3943
this._status = ConnectionStatus.CONNECTED;
@@ -72,11 +76,63 @@ export class Connection {
7276
return this._status;
7377
}
7478

79+
/**
80+
* Sets the status of this connection.
81+
* @param status The new status of the connection.
82+
*/
83+
public set status(status: ConnectionStatus) {
84+
this._status = status;
85+
}
86+
7587
/**
7688
* Returns target address for this Connection object
7789
* @returns Target address
7890
*/
7991
public get targetAddress(): string {
8092
return this._targetAddress;
8193
}
94+
95+
/**
96+
* "HTTP-like" fetch via gRPC protocol
97+
* @returns Promise with peer's response
98+
*/
99+
public fetch(options: FetchOptions = {}): Promise<FetchResponse> {
100+
if (!this._peerClient) {
101+
return Promise.reject(new Error(`Peer client not attached for ${this.targetAddress}`));
102+
}
103+
104+
// Build a request object
105+
const method = (options.method ?? 'POST').toUpperCase();
106+
const path = options.path ?? '/';
107+
const headers: ConnectionHeaders = { ...(options.headers ?? {}) };
108+
109+
let bodyBuf: Buffer = Buffer.alloc(0);
110+
const b = options.body;
111+
if (b != null) {
112+
if (Buffer.isBuffer(b)) bodyBuf = b;
113+
else if (b instanceof Uint8Array) bodyBuf = Buffer.from(b);
114+
else if (typeof b === 'string') bodyBuf = Buffer.from(b, 'utf8');
115+
else return Promise.reject(new Error('Body must be a string/Buffer/Uint8Array'));
116+
}
117+
118+
const req = { method, path, headers, body: bodyBuf };
119+
120+
// Return promise with response
121+
return new Promise<FetchResponse>((resolve, reject) => {
122+
this._peerClient.Fetch(req, (err: any, resp: any) => {
123+
if (err) return reject(err);
124+
125+
const resBody = resp?.body ? Buffer.from(resp.body) : Buffer.alloc(0);
126+
const fetchResponse: FetchResponse = {
127+
status: Number(resp?.status ?? 200),
128+
headers: resp?.headers ?? {},
129+
body: resBody,
130+
text: async () => resBody.toString('utf8'),
131+
json: async () => JSON.parse(resBody.toString('utf8')),
132+
};
133+
134+
resolve(fetchResponse);
135+
});
136+
});
137+
}
82138
}

Tokenization/backend/wrapper/src/client/connectionManager/ConnectionManager.ts

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,9 @@ import { LogManager } from '@aliceo2/web-ui';
2121
import type { Command, CommandHandler } from 'models/commands.model';
2222
import type { DuplexMessageEvent } from '../../models/message.model';
2323
import { ConnectionDirection } from '../../models/message.model';
24+
import { ConnectionStatus } from '../../models/connection.model';
25+
import { peerListener } from '../../utils/connection/peerListener';
2426

25-
/**
26-
* @description Manages all the connection between clients and central system.
27-
*/
2827
/**
2928
* Manages the lifecycle and connection logic for a gRPC client communicating with the central system.
3029
*
@@ -45,6 +44,10 @@ export class ConnectionManager {
4544
private _centralConnection: CentralConnection;
4645
private _sendingConnections = new Map<string, Connection>();
4746
private _receivingConnections = new Map<string, Connection>();
47+
private _wrapper: any;
48+
private _peerCtor: any;
49+
private _peerServer: grpc.Server | undefined;
50+
private _baseAPIPath: string = '';
4851

4952
/**
5053
* Initializes a new instance of the ConnectionManager class.
@@ -64,9 +67,10 @@ export class ConnectionManager {
6467
});
6568

6669
const proto = grpc.loadPackageDefinition(packageDef) as any;
67-
const wrapper = proto.webui.tokenization;
70+
this._wrapper = proto.webui.tokenization;
71+
this._peerCtor = this._wrapper.Peer2Peer;
6872

69-
const client = new wrapper.CentralSystem(centralAddress, grpc.credentials.createInsecure());
73+
const client = new this._wrapper.CentralSystem(centralAddress, grpc.credentials.createInsecure());
7074

7175
// Event dispatcher for central system events
7276
this._centralDispatcher = new CentralCommandDispatcher();
@@ -109,13 +113,15 @@ export class ConnectionManager {
109113
* @param token Optional token for connection
110114
*/
111115
createNewConnection(address: string, direction: ConnectionDirection, token?: string) {
112-
const conn = new Connection(token ?? '', address, direction);
116+
const conn = new Connection(token ?? '', address, direction, this._peerCtor);
113117

114118
if (direction === ConnectionDirection.RECEIVING) {
115119
this._receivingConnections.set(address, conn);
116120
} else {
117121
this._sendingConnections.set(address, conn);
118122
}
123+
conn.status = ConnectionStatus.CONNECTED;
124+
this._logger.infoMessage(`Connection with ${address} has been estabilished. Status: ${conn.status}`);
119125

120126
return conn;
121127
}
@@ -150,4 +156,26 @@ export class ConnectionManager {
150156
receiving: [...this._receivingConnections.values()],
151157
};
152158
}
159+
160+
/** Starts a listener server for p2p connections */
161+
public async listenForPeers(port: number, baseAPIPath?: string): Promise<void> {
162+
if (baseAPIPath) this._baseAPIPath = baseAPIPath;
163+
164+
if (this._peerServer) {
165+
this._peerServer.forceShutdown();
166+
this._peerServer = undefined;
167+
}
168+
169+
this._peerServer = new grpc.Server();
170+
this._peerServer.addService(this._wrapper.Peer2Peer.service, {
171+
Fetch: async (call: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryData<any>) =>
172+
peerListener(call, callback, this._logger, this._receivingConnections, this._peerCtor, this._baseAPIPath),
173+
});
174+
175+
await new Promise<void>((resolve, reject) => {
176+
this._peerServer?.bindAsync(`localhost:${port}`, grpc.ServerCredentials.createInsecure(), (err) => (err ? reject(err) : resolve()));
177+
});
178+
179+
this._logger.infoMessage(`Peer server listening on localhost:${port}`);
180+
}
153181
}

Tokenization/backend/wrapper/src/client/gRPCWrapper.ts

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414

1515
import { ConnectionManager } from './connectionManager/ConnectionManager';
1616
import { RevokeTokenHandler } from './commands/revokeToken/revokeToken.handler';
17-
import { DuplexMessageEvent } from '../models/message.model';
18-
import type { Connection } from './connection/Connection';
17+
import { ConnectionDirection, DuplexMessageEvent } from '../models/message.model';
1918
import { NewTokenHandler } from './commands/newToken/newToken.handler';
19+
import type { Connection } from './connection/Connection';
2020

2121
/**
2222
* @description Wrapper class for managing secure gRPC wrapper.
@@ -56,12 +56,37 @@ export class gRPCWrapper {
5656
}
5757

5858
/**
59-
* Starts the Connection Manager stream connection with Central System
59+
* Connects to the central system using the underlying ConnectionManager.
60+
*
61+
* @remarks
62+
* This method starts the duplex stream connection with the central gRPC server.
6063
*/
6164
public connectToCentralSystem() {
6265
this._connectionManager.connectToCentralSystem();
6366
}
6467

68+
/**
69+
* Establishes a new connection to a target client.
70+
*
71+
* @param address - The target address of the client.
72+
* @param token - Optional authentication token for the connection.
73+
*
74+
* @returns A promise that resolves to the newly created connection ready to use for fetching data.
75+
*/
76+
public async connectToClient(address: string, token?: string): Promise<Connection> {
77+
return this._connectionManager.createNewConnection(address, ConnectionDirection.SENDING, token ?? '');
78+
}
79+
80+
/**
81+
* Starts a listener server for p2p connections.
82+
* @param port The port number to bind the p2p server to.
83+
* @param baseAPIPath Optional base API path to forward requests to e.g. '/api'.
84+
* @returns A promise that resolves when the p2p listener server is started.
85+
*/
86+
public async listenForPeers(port: number, baseAPIPath?: string): Promise<void> {
87+
return this._connectionManager.listenForPeers(port, baseAPIPath);
88+
}
89+
6590
/**
6691
* Returns all saved connections.
6792
*

Tokenization/backend/wrapper/src/models/connection.model.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,35 @@ export enum ConnectionStatus {
4444
// The connection is refreshing its authentication token
4545
TOKEN_REFRESH = 'TOKEN_REFRESH',
4646
}
47+
48+
export type ConnectionHeaders = Record<string, string>;
49+
50+
export type FetchOptions = {
51+
method?: string;
52+
path?: string;
53+
headers?: ConnectionHeaders;
54+
body?: string | Buffer | Uint8Array | null;
55+
};
56+
57+
export type FetchResponse = {
58+
status: number;
59+
headers: ConnectionHeaders;
60+
body: Buffer;
61+
text: () => Promise<string>;
62+
json: () => Promise<any>;
63+
};
64+
65+
export type HttpLikeRequest = {
66+
method: string;
67+
path: string;
68+
headers: Headers;
69+
body: Buffer;
70+
correlation_id?: string;
71+
sequence_number?: number;
72+
};
73+
74+
export type HttpLikeResponse = {
75+
status: number;
76+
headers: Headers;
77+
body: Buffer;
78+
};

Tokenization/backend/wrapper/src/models/message.model.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
* @property MESSAGE_EVENT_REVOKE_TOKEN: Event for revoking an existing token.
2424
*/
2525
export enum DuplexMessageEvent {
26-
MESSAGE_EVENT_UNSPECIFIED = 'MESSAGE_EVENT_UNSPECIFIED',
2726
MESSAGE_EVENT_EMPTY = 'MESSAGE_EVENT_EMPTY',
2827
MESSAGE_EVENT_NEW_TOKEN = 'MESSAGE_EVENT_NEW_TOKEN',
2928
MESSAGE_EVENT_REVOKE_TOKEN = 'MESSAGE_EVENT_REVOKE_TOKEN',

Tokenization/backend/wrapper/src/test/client/commands/newToken.test.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ import { Connection } from '../../../client/connection/Connection';
1818
import { ConnectionManager } from '../../../client/connectionManager/ConnectionManager';
1919
import { Command } from 'models/commands.model';
2020
import { ConnectionDirection, DuplexMessageEvent } from '../../../models/message.model';
21+
import * as grpc from '@grpc/grpc-js';
22+
import * as protoLoader from '@grpc/proto-loader';
23+
import path from 'path';
2124

2225
/**
2326
* Helper to create a new token command with given address, direction, and token.
@@ -36,6 +39,19 @@ function createEventMessage(targetAddress: string, connectionDirection: Connecti
3639
describe('NewTokenHandler', () => {
3740
let manager: ConnectionManager;
3841

42+
const protoPath = path.join(__dirname, '..', '..', '..', '..', '..', 'proto', 'wrapper.proto');
43+
const packageDef = protoLoader.loadSync(protoPath, {
44+
keepCase: true,
45+
longs: String,
46+
enums: String,
47+
defaults: true,
48+
oneofs: true,
49+
});
50+
51+
const proto = grpc.loadPackageDefinition(packageDef) as any;
52+
const wrapper = proto.webui.tokenization;
53+
const peerCtor = wrapper.Peer2Peer;
54+
3955
beforeEach(() => {
4056
manager = {
4157
sendingConnections: new Map<string, Connection>(),
@@ -49,7 +65,7 @@ describe('NewTokenHandler', () => {
4965
return undefined;
5066
}),
5167
createNewConnection: jest.fn(function (this: any, address: string, dir: ConnectionDirection, token: string) {
52-
const conn = new Connection(token, address, dir);
68+
const conn = new Connection(token, address, dir, peerCtor);
5369
if (dir === ConnectionDirection.SENDING) {
5470
this.sendingConnections.set(address, conn);
5571
} else {
@@ -62,7 +78,7 @@ describe('NewTokenHandler', () => {
6278

6379
it('should update token on existing SENDING connection', async () => {
6480
const targetAddress = 'peer-123';
65-
const conn = new Connection('old-token', targetAddress, ConnectionDirection.SENDING);
81+
const conn = new Connection('old-token', targetAddress, ConnectionDirection.SENDING, peerCtor);
6682
(manager as any).sendingConnections.set(targetAddress, conn);
6783

6884
const handler = new NewTokenHandler(manager);

0 commit comments

Comments
 (0)