Skip to content

Commit 4aacfb8

Browse files
authored
Merge pull request #310 from MatrixAI/nodeconnectionmanager
Extracting Node Connection Management out of `NodeManager` to `NodeConnectionManager`
2 parents ae21266 + b09e000 commit 4aacfb8

File tree

100 files changed

+6083
-2597
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

100 files changed

+6083
-2597
lines changed

.eslintrc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@
5252
"ignoreConsecutiveComments": true
5353
}
5454
],
55+
"curly": [
56+
"error",
57+
"multi-line",
58+
"consistent"
59+
],
5560
"import/order": [
5661
"error",
5762
{

src/PolykeyAgent.ts

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { FileSystem } from './types';
22
import type { PolykeyWorkerManagerInterface } from './workers/types';
33
import type { Host, Port } from './network/types';
4-
import type { NodeMapping } from './nodes/types';
4+
import type { SeedNodes } from './nodes/types';
55

66
import type { RootKeyPairChangeData } from './keys/types';
77
import path from 'path';
@@ -14,15 +14,16 @@ import { Status } from './status';
1414
import { Schema } from './schema';
1515
import { VaultManager } from './vaults';
1616
import { ACL } from './acl';
17-
import { NodeManager } from './nodes';
17+
import { NodeConnectionManager, NodeGraph, NodeManager } from './nodes';
1818
import { NotificationsManager } from './notifications';
1919
import { GestaltGraph } from './gestalts';
2020
import { Sigchain } from './sigchain';
2121
import { Discovery } from './discovery';
2222
import { SessionManager } from './sessions';
2323
import { GRPCServer } from './grpc';
2424
import { IdentitiesManager, providers } from './identities';
25-
import { ForwardProxy, ReverseProxy } from './network';
25+
import ForwardProxy from './network/ForwardProxy';
26+
import ReverseProxy from './network/ReverseProxy';
2627
import { EventBus, captureRejectionSymbol } from './events';
2728
import { createAgentService, AgentServiceService } from './agent';
2829
import { createClientService, ClientServiceService } from './client';
@@ -61,6 +62,7 @@ class PolykeyAgent {
6162
networkConfig = {},
6263
forwardProxyConfig = {},
6364
reverseProxyConfig = {},
65+
nodeConnectionManagerConfig = {},
6466
seedNodes = {},
6567
// Optional dependencies
6668
status,
@@ -73,6 +75,8 @@ class PolykeyAgent {
7375
gestaltGraph,
7476
fwdProxy,
7577
revProxy,
78+
nodeGraph,
79+
nodeConnectionManager,
7680
nodeManager,
7781
discovery,
7882
vaultManager,
@@ -102,8 +106,13 @@ class PolykeyAgent {
102106
connConnectTime?: number;
103107
connTimeoutTime?: number;
104108
};
109+
nodeConnectionManagerConfig?: {
110+
connConnectTime?: number;
111+
connTimeoutTime?: number;
112+
initialClosestNodes?: number;
113+
};
105114
networkConfig?: NetworkConfig;
106-
seedNodes?: NodeMapping;
115+
seedNodes?: SeedNodes;
107116
status?: Status;
108117
schema?: Schema;
109118
keyManager?: KeyManager;
@@ -114,6 +123,8 @@ class PolykeyAgent {
114123
gestaltGraph?: GestaltGraph;
115124
fwdProxy?: ForwardProxy;
116125
revProxy?: ReverseProxy;
126+
nodeGraph?: NodeGraph;
127+
nodeConnectionManager?: NodeConnectionManager;
117128
nodeManager?: NodeManager;
118129
discovery?: Discovery;
119130
vaultManager?: VaultManager;
@@ -146,6 +157,10 @@ class PolykeyAgent {
146157
...config.defaults.reverseProxyConfig,
147158
...utils.filterEmptyObject(reverseProxyConfig),
148159
};
160+
const nodeConnectionManagerConfig_ = {
161+
...config.defaults.nodeConnectionManagerConfig,
162+
...utils.filterEmptyObject(nodeConnectionManagerConfig),
163+
};
149164
await utils.mkdirExists(fs, nodePath);
150165
const statusPath = path.join(nodePath, config.defaults.statusBase);
151166
const statusLockPath = path.join(nodePath, config.defaults.statusLockBase);
@@ -254,26 +269,45 @@ class PolykeyAgent {
254269
...reverseProxyConfig_,
255270
logger: logger.getChild(ReverseProxy.name),
256271
});
272+
nodeGraph =
273+
nodeGraph ??
274+
(await NodeGraph.createNodeGraph({
275+
db,
276+
fresh,
277+
keyManager,
278+
logger: logger.getChild(NodeGraph.name),
279+
}));
280+
nodeConnectionManager =
281+
nodeConnectionManager ??
282+
new NodeConnectionManager({
283+
keyManager,
284+
nodeGraph,
285+
fwdProxy,
286+
revProxy,
287+
seedNodes,
288+
...nodeConnectionManagerConfig_,
289+
logger: logger.getChild(NodeConnectionManager.name),
290+
});
257291
nodeManager =
258292
nodeManager ??
259-
(await NodeManager.createNodeManager({
293+
new NodeManager({
260294
db,
261-
seedNodes,
262295
sigchain,
263296
keyManager,
264-
fwdProxy,
265-
revProxy,
297+
nodeGraph,
298+
nodeConnectionManager,
266299
logger: logger.getChild(NodeManager.name),
267-
fresh,
268-
}));
300+
});
269301
// Discovery uses in-memory CreateDestroy pattern
270302
// Therefore it should be destroyed during stop
271303
discovery =
272304
discovery ??
273305
(await Discovery.createDiscovery({
306+
keyManager,
274307
gestaltGraph,
275308
identitiesManager,
276309
nodeManager,
310+
sigchain,
277311
logger: logger.getChild(Discovery.name),
278312
}));
279313
vaultManager =
@@ -282,7 +316,7 @@ class PolykeyAgent {
282316
vaultsKey: keyManager.vaultKey,
283317
vaultsPath,
284318
keyManager,
285-
nodeManager,
319+
nodeConnectionManager,
286320
gestaltGraph,
287321
acl,
288322
db,
@@ -295,6 +329,7 @@ class PolykeyAgent {
295329
(await NotificationsManager.createNotificationsManager({
296330
acl,
297331
db,
332+
nodeConnectionManager,
298333
nodeManager,
299334
keyManager,
300335
logger: logger.getChild(NotificationsManager.name),
@@ -324,7 +359,6 @@ class PolykeyAgent {
324359
await notificationsManager?.stop();
325360
await vaultManager?.stop();
326361
await discovery?.destroy();
327-
await nodeManager?.stop();
328362
await revProxy?.stop();
329363
await fwdProxy?.stop();
330364
await gestaltGraph?.stop();
@@ -349,6 +383,8 @@ class PolykeyAgent {
349383
gestaltGraph,
350384
fwdProxy,
351385
revProxy,
386+
nodeGraph,
387+
nodeConnectionManager,
352388
nodeManager,
353389
discovery,
354390
vaultManager,
@@ -380,6 +416,8 @@ class PolykeyAgent {
380416
public readonly gestaltGraph: GestaltGraph;
381417
public readonly fwdProxy: ForwardProxy;
382418
public readonly revProxy: ReverseProxy;
419+
public readonly nodeGraph: NodeGraph;
420+
public readonly nodeConnectionManager: NodeConnectionManager;
383421
public readonly nodeManager: NodeManager;
384422
public readonly discovery: Discovery;
385423
public readonly vaultManager: VaultManager;
@@ -404,6 +442,8 @@ class PolykeyAgent {
404442
gestaltGraph,
405443
fwdProxy,
406444
revProxy,
445+
nodeGraph,
446+
nodeConnectionManager,
407447
nodeManager,
408448
discovery,
409449
vaultManager,
@@ -426,6 +466,8 @@ class PolykeyAgent {
426466
gestaltGraph: GestaltGraph;
427467
fwdProxy: ForwardProxy;
428468
revProxy: ReverseProxy;
469+
nodeGraph: NodeGraph;
470+
nodeConnectionManager: NodeConnectionManager;
429471
nodeManager: NodeManager;
430472
discovery: Discovery;
431473
vaultManager: VaultManager;
@@ -449,6 +491,8 @@ class PolykeyAgent {
449491
this.gestaltGraph = gestaltGraph;
450492
this.fwdProxy = fwdProxy;
451493
this.revProxy = revProxy;
494+
this.nodeGraph = nodeGraph;
495+
this.nodeConnectionManager = nodeConnectionManager;
452496
this.nodeManager = nodeManager;
453497
this.discovery = discovery;
454498
this.vaultManager = vaultManager;
@@ -513,7 +557,9 @@ class PolykeyAgent {
513557
keyManager: this.keyManager,
514558
vaultManager: this.vaultManager,
515559
nodeManager: this.nodeManager,
560+
nodeGraph: this.nodeGraph,
516561
sigchain: this.sigchain,
562+
nodeConnectionManager: this.nodeConnectionManager,
517563
notificationsManager: this.notificationsManager,
518564
});
519565
const clientService = createClientService({
@@ -522,6 +568,8 @@ class PolykeyAgent {
522568
gestaltGraph: this.gestaltGraph,
523569
identitiesManager: this.identitiesManager,
524570
keyManager: this.keyManager,
571+
nodeGraph: this.nodeGraph,
572+
nodeConnectionManager: this.nodeConnectionManager,
525573
nodeManager: this.nodeManager,
526574
notificationsManager: this.notificationsManager,
527575
sessionManager: this.sessionManager,
@@ -575,9 +623,9 @@ class PolykeyAgent {
575623
ingressPort: networkConfig_.ingressPort,
576624
tlsConfig,
577625
});
578-
await this.nodeManager.start({ fresh });
579-
await this.nodeManager.getConnectionsToSeedNodes();
580-
await this.nodeManager.syncNodeGraph();
626+
await this.nodeConnectionManager.start();
627+
await this.nodeGraph.start({ fresh });
628+
await this.nodeConnectionManager.syncNodeGraph();
581629
await this.vaultManager.start({ fresh });
582630
await this.notificationsManager.start({ fresh });
583631
await this.sessionManager.start({ fresh });
@@ -597,7 +645,6 @@ class PolykeyAgent {
597645
await this.notificationsManager?.stop();
598646
await this.vaultManager?.stop();
599647
await this.discovery?.destroy();
600-
await this.nodeManager?.stop();
601648
await this.revProxy?.stop();
602649
await this.fwdProxy?.stop();
603650
await this.grpcServerAgent?.stop();
@@ -625,7 +672,8 @@ class PolykeyAgent {
625672
await this.notificationsManager.stop();
626673
await this.vaultManager.stop();
627674
await this.discovery.destroy();
628-
await this.nodeManager.stop();
675+
await this.nodeConnectionManager.stop();
676+
await this.nodeGraph.stop();
629677
await this.revProxy.stop();
630678
await this.fwdProxy.stop();
631679
await this.grpcServerAgent.stop();
@@ -649,7 +697,7 @@ class PolykeyAgent {
649697
await this.sessionManager.destroy();
650698
await this.notificationsManager.destroy();
651699
await this.vaultManager.destroy();
652-
await this.nodeManager.destroy();
700+
await this.nodeGraph.destroy();
653701
await this.gestaltGraph.destroy();
654702
await this.acl.destroy();
655703
await this.sigchain.destroy();

src/agent/GRPCClientAgent.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,16 @@ class GRPCClientAgent extends GRPCClient<AgentServiceClient> {
3333
tlsConfig,
3434
proxyConfig,
3535
timeout = Infinity,
36+
destroyCallback = async () => {},
3637
logger = new Logger(this.name),
3738
}: {
3839
nodeId: NodeId;
3940
host: Host;
4041
port: Port;
41-
proxyConfig?: ProxyConfig;
4242
tlsConfig?: Partial<TLSConfig>;
43+
proxyConfig?: ProxyConfig;
4344
timeout?: number;
45+
destroyCallback?: () => Promise<void>;
4446
logger?: Logger;
4547
}): Promise<GRPCClientAgent> {
4648
const { client, serverCertChain, flowCountInterceptor } =
@@ -63,6 +65,7 @@ class GRPCClientAgent extends GRPCClient<AgentServiceClient> {
6365
proxyConfig,
6466
serverCertChain,
6567
flowCountInterceptor,
68+
destroyCallback,
6669
logger,
6770
});
6871
return grpcClientAgent;

src/agent/service/index.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import type { KeyManager } from '../../keys';
22
import type { VaultManager } from '../../vaults';
3-
import type { NodeManager } from '../../nodes';
3+
import type {
4+
NodeGraph,
5+
NodeManager,
6+
NodeConnectionManager,
7+
} from '../../nodes';
48
import type { NotificationsManager } from '../../notifications';
59
import type { Sigchain } from '../../sigchain';
610
import type { IAgentServiceServer } from '../../proto/js/polykey/v1/agent_service_grpc_pb';
@@ -20,7 +24,9 @@ import { AgentServiceService } from '../../proto/js/polykey/v1/agent_service_grp
2024
function createService(container: {
2125
keyManager: KeyManager;
2226
vaultManager: VaultManager;
27+
nodeConnectionManager: NodeConnectionManager;
2328
nodeManager: NodeManager;
29+
nodeGraph: NodeGraph;
2430
notificationsManager: NotificationsManager;
2531
sigchain: Sigchain;
2632
}) {

src/agent/service/nodesChainDataGet.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,25 @@
11
import type * as grpc from '@grpc/grpc-js';
2-
import type { ClaimIdEncoded } from '../../claims/types';
3-
import type { NodeManager } from '../../nodes';
2+
import type { Sigchain } from '../../sigchain';
43
import type * as utilsPB from '../../proto/js/polykey/v1/utils/utils_pb';
4+
import type { ClaimIdEncoded } from '../../claims/types';
55
import { utils as grpcUtils } from '../../grpc';
66
import * as nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb';
77

88
/**
99
* Retrieves the ChainDataEncoded of this node.
1010
*/
11-
function nodesChainDataGet({ nodeManager }: { nodeManager: NodeManager }) {
11+
function nodesChainDataGet({ sigchain }: { sigchain: Sigchain }) {
1212
return async (
1313
call: grpc.ServerUnaryCall<utilsPB.EmptyMessage, nodesPB.ChainData>,
1414
callback: grpc.sendUnaryData<nodesPB.ChainData>,
1515
): Promise<void> => {
1616
try {
1717
const response = new nodesPB.ChainData();
18-
const chainData = await nodeManager.getChainData();
18+
const chainData = await sigchain.getChainData();
1919
// Iterate through each claim in the chain, and serialize for transport
20-
for (const c in chainData) {
21-
const claimId = c as ClaimIdEncoded;
22-
const claim = chainData[claimId];
20+
let claimIdEncoded: ClaimIdEncoded;
21+
for (claimIdEncoded in chainData) {
22+
const claim = chainData[claimIdEncoded];
2323
const claimMessage = new nodesPB.AgentClaim();
2424
// Will always have a payload (never undefined) so cast as string
2525
claimMessage.setPayload(claim.payload as string);
@@ -32,7 +32,7 @@ function nodesChainDataGet({ nodeManager }: { nodeManager: NodeManager }) {
3232
claimMessage.getSignaturesList().push(signature);
3333
}
3434
// Add the serialized claim
35-
response.getChainDataMap().set(claimId, claimMessage);
35+
response.getChainDataMap().set(claimIdEncoded, claimMessage);
3636
}
3737
callback(null, response);
3838
return;

src/agent/service/nodesClosestLocalNodesGet.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type * as grpc from '@grpc/grpc-js';
2-
import type { NodeManager } from '../../nodes';
2+
import type { NodeConnectionManager } from '../../nodes';
33
import type { NodeId } from '../../nodes/types';
44
import { utils as grpcUtils } from '../../grpc';
55
import { utils as nodesUtils } from '../../nodes';
@@ -12,9 +12,9 @@ import * as nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb';
1212
* to some provided node ID.
1313
*/
1414
function nodesClosestLocalNodesGet({
15-
nodeManager,
15+
nodeConnectionManager,
1616
}: {
17-
nodeManager: NodeManager;
17+
nodeConnectionManager: NodeConnectionManager;
1818
}) {
1919
return async (
2020
call: grpc.ServerUnaryCall<nodesPB.Node, nodesPB.NodeTable>,
@@ -38,7 +38,9 @@ function nodesClosestLocalNodesGet({
3838
},
3939
);
4040
// Get all local nodes that are closest to the target node from the request
41-
const closestNodes = await nodeManager.getClosestLocalNodes(nodeId);
41+
const closestNodes = await nodeConnectionManager.getClosestLocalNodes(
42+
nodeId,
43+
);
4244
for (const node of closestNodes) {
4345
const addressMessage = new nodesPB.Address();
4446
addressMessage.setHost(node.address.host);

0 commit comments

Comments
 (0)