Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion src/discovery/Discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,11 @@ class Discovery {
return;
}
// Iterate over each of the claims in the chain (already verified).
for (const signedClaim of Object.values(vertexChainData)) {
const processedClaimIds: Set<string> = new Set();
for (const [claimIdString, signedClaim] of Object.entries(
vertexChainData,
)) {
processedClaimIds.add(claimIdString);
switch (signedClaim.payload.typ) {
case 'ClaimLinkNode':
await this.processClaimLinkNode(
Expand All @@ -483,6 +487,23 @@ class Discovery {
);
}
}
// Queue up known linked vertices that weren't just processed
for await (const [gestaltId, gestaltLink] of this.gestaltGraph.getLinks([
'node',
nodeId,
])) {
const claimIdString = decodeClaimId(
gestaltLink[1].claim.payload.jti,
)!.toString();
if (!processedClaimIds.has(claimIdString)) {
await this.scheduleDiscoveryForVertex(
gestaltId,
undefined,
lastProcessedCutoffTime,
['node', nodeId],
);
}
}
await this.gestaltGraph.setVertexProcessedTime(
gestaltNodeId,
processedTime,
Expand Down
203 changes: 134 additions & 69 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import type {
NodeAddress,
NodeBucket,
NodeBucketIndex,
NodeContact,
NodeContactAddressData,
NodeId,
NodeIdEncoded,
Expand Down Expand Up @@ -1145,48 +1146,68 @@ class NodeManager<Manifest extends AgentClientManifestNodeManager> {
nodeConnectionsQueue: NodeConnectionQueue,
ctx: ContextTimed,
) {
await this.nodeConnectionManager.withConnF(nodeId, ctx, async (conn) => {
const nodeIdEncoded = nodesUtils.encodeNodeId(nodeIdTarget);
const closestConnectionsRequestP = (async () => {
const resultStream =
await conn.rpcClient.methods.nodesClosestActiveConnectionsGet(
{
nodeIdEncoded: nodeIdEncoded,
},
ctx,
);
// Collecting results
for await (const result of resultStream) {
ctx.signal.throwIfAborted();
const nodeIdNew = nodesUtils.decodeNodeId(result.nodeId);
if (nodeIdNew == null) {
utils.never(`failed to decode NodeId "${result.nodeId}"`);
const nodeIdEncoded = nodesUtils.encodeNodeId(nodeIdTarget);
const closestConnectionsRequestP = (async () => {
const data = await this.nodeConnectionManager.withConnF(
nodeId,
ctx,
async (conn) => {
const resultStream =
await conn.rpcClient.methods.nodesClosestActiveConnectionsGet(
{
nodeIdEncoded: nodeIdEncoded,
},
ctx,
);
const connections: Array<NodeId> = [];
// Collecting results
for await (const result of resultStream) {
ctx.signal.throwIfAborted();
const nodeIdNew = nodesUtils.decodeNodeId(result.nodeId);
if (nodeIdNew == null) {
utils.never(`failed to decode NodeId "${result.nodeId}"`);
}
connections.push(nodeIdNew);
}
nodeConnectionsQueue.queueNodeSignal(nodeIdNew, nodeId);
}
})();
const closestNodesRequestP = (async () => {
const resultStream =
await conn.rpcClient.methods.nodesClosestLocalNodesGet(
{
nodeIdEncoded: nodeIdEncoded,
},
ctx,
);
for await (const { nodeIdEncoded, nodeContact } of resultStream) {
ctx.signal.throwIfAborted();
const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded);
if (nodeId == null) {
utils.never(`failed to decode NodeId "${nodeIdEncoded}"`);
return connections;
},
);
for (const nodeIdNew of data) {
nodeConnectionsQueue.queueNodeSignal(nodeIdNew, nodeId);
}
})();
const closestNodesRequestP = (async () => {
const data = await this.nodeConnectionManager.withConnF(
nodeId,
ctx,
async (conn) => {
const resultStream =
await conn.rpcClient.methods.nodesClosestLocalNodesGet(
{
nodeIdEncoded: nodeIdEncoded,
},
ctx,
);
const data: Array<[NodeId, NodeContact]> = [];
for await (const { nodeIdEncoded, nodeContact } of resultStream) {
ctx.signal.throwIfAborted();
const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded);
if (nodeId == null) {
utils.never(`failed to decode NodeId "${nodeIdEncoded}"`);
}
data.push([nodeId, nodeContact]);
}
nodeConnectionsQueue.queueNodeDirect(nodeId, nodeContact);
}
})();
await Promise.allSettled([
closestConnectionsRequestP,
closestNodesRequestP,
]);
});
return data;
},
);
for (const [nodeId, nodeContact] of data) {
nodeConnectionsQueue.queueNodeDirect(nodeId, nodeContact);
}
})();
await Promise.allSettled([
closestConnectionsRequestP,
closestNodesRequestP,
]);
}

/**
Expand Down Expand Up @@ -1256,44 +1277,75 @@ class NodeManager<Manifest extends AgentClientManifestNodeManager> {
}
}

/**
* Will attempt to make a direct connection without ICE.
* This will only succeed due to these conditions
* 1. connection already exists to target.
* 2. Nat already allows port due to already being punched.
* 3. Port is publicly accessible due to nat configuration .
* Will return true if connection was established or already exists, false otherwise.
*/
public pingNodeAddressMultiple(
nodeId: NodeId,
addresses: Array<[Host, Port]>,
ctx?: Partial<ContextTimedInput>,
): PromiseCancellable<boolean>;
@startStop.ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
@decorators.timedCancellable(
true,
(nodeConnectionManager: NodeConnectionManager<Manifest>) =>
nodeConnectionManager.connectionConnectTimeoutTime,
)
public async pingNodeAddressMultiple(
nodeId: NodeId,
addresses: Array<[Host, Port]>,
@decorators.context ctx: ContextTimed,
): Promise<boolean> {
if (this.nodeConnectionManager.hasConnection(nodeId)) return true;
try {
await this.nodeConnectionManager.createConnectionMultiple(
[nodeId],
addresses,
ctx,
);
return true;
} catch (e) {
if (!nodesUtils.isConnectionError(e)) throw e;
return false;
}
}

/**
* Connects to the target node, and retrieves its sigchain data.
* Verifies and returns the decoded chain as ChainData. Note: this will drop
* any unverifiable claims.
* For node1 -> node2 claims, the verification process also involves connecting
* to node2 to verify the claim (to retrieve its signing public key).
* @param targetNodeId Id of the node to connect request the chain data of.
* @param _claimId If set then we get the claims newer that this claim ID.
* @param claimId If set then we get the claims newer that this claim ID.
* @param ctx
*/
public requestChainData(
targetNodeId: NodeId,
_claimId?: ClaimId,
claimId?: ClaimId,
ctx?: Partial<ContextTimed>,
): PromiseCancellable<Record<ClaimId, SignedClaim>>;
@decorators.timedCancellable(true)
public async requestChainData(
targetNodeId: NodeId,
_claimId: ClaimId | undefined,
claimId: ClaimId | undefined,
@decorators.context ctx: ContextTimed,
): Promise<Record<ClaimId, SignedClaim>> {
// Verify the node's chain with its own public key
return await this.withConnF(targetNodeId, ctx, async (connection) => {
const claims: Record<ClaimId, SignedClaim> = {};
const client = connection.getClient();

// Let claimIdEncoded: ClaimIdEncoded | undefined;

// if (claimId != null) {
// claimIdEncoded = claimsUtils.encodeClaimId(claimId);
// } else {
// claimIdEncoded = undefined;
// }

const claimIdEncoded: ClaimIdEncoded | undefined =
claimId != null ? claimsUtils.encodeClaimId(claimId) : undefined;
for await (const agentClaim of await client.methods.nodesClaimsGet(
{
// Needs to be addressed later - causes test failures in Discovery.test.ts
// seek: claimIdEncoded,
seek: claimIdEncoded,
},
ctx,
)) {
Expand Down Expand Up @@ -2339,7 +2391,7 @@ class NodeManager<Manifest extends AgentClientManifestNodeManager> {
let removedNodes = 0;
const unsetLock = new Lock();
const pendingPromises: Array<Promise<void>> = [];
for (const [nodeId] of bucket) {
for (const [nodeId, nodeContact] of bucket) {
if (removedNodes >= pendingNodes.size) break;
await semaphore.waitForUnlock(ctx);
if (ctx.signal?.aborted === true) break;
Expand All @@ -2351,21 +2403,34 @@ class NodeManager<Manifest extends AgentClientManifestNodeManager> {
signal: ctx.signal,
timer: connectionConnectTimeoutTime,
};
const pingResult = await this.pingNode(nodeId, pingCtx);
if (pingResult != null) {
// Succeeded so update
const [nodeAddress, nodeContactAddressData] = pingResult;
await this.setNode(
nodeId,
nodeAddress,
nodeContactAddressData,
false,
false,
undefined,
tran,
ctx,
);
} else {
// Getting known addresses for the ping
const desiredAddresses: Array<NodeAddress> = [];
for (const [
nodeContactAddress,
nodeContactAddressData,
] of Object.entries(nodeContact)) {
if (nodeContactAddressData.mode === 'direct') {
desiredAddresses.push(
nodesUtils.parseNodeContactAddress(nodeContactAddress),
);
}
}

const resolvedAddresses = await networkUtils.resolveHostnames(
desiredAddresses,
undefined,
this.dnsServers,
ctx,
);

const pingResult = await this.pingNodeAddressMultiple(
nodeId,
resolvedAddresses,
pingCtx,
);

// If ping fails we remove it, otherwise we don't update
if (!pingResult) {
// We don't remove node the ping was aborted
if (ctx.signal.aborted) return;
// We need to lock this since it's concurrent
Expand Down
4 changes: 1 addition & 3 deletions src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ function sleepCancellable(ms: number): PromiseCancellable<void> {
/**
* Checks if value is an object.
* Arrays are also considered objects.
* The type guard here says `o is any`.
* TODO: When TS 4.9.x is released, change this to `o is object`.
* At that point `'x' in o` checks become type guards that
* can assert the property's existence.
*/
Expand Down Expand Up @@ -298,8 +296,8 @@ function promise<T = void>(): PromiseDeconstructed<T> {
* Promise constructed from signal
* This rejects when the signal is aborted
*/
// fixme: There is also a one signal to many `signalPromise` relationship in the NM connection queue that needs to be fixed.
function signalPromise(signal: AbortSignal): PromiseCancellable<void> {
setMaxListeners(signal);
return new PromiseCancellable((resolve, _, signalCancel) => {
// Short circuit if signal already aborted
if (signal.aborted) return resolve();
Expand Down
16 changes: 8 additions & 8 deletions tests/nodes/NodeManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ describe(`${NodeManager.name}`, () => {
);
});
test('should not add new node if bucket is full and old nodes are responsive', async () => {
const mockedPingNode = jest.spyOn(nodeManager, 'pingNode');
const mockedPingNode = jest.spyOn(nodeManager, 'pingNodeAddressMultiple');
// Fill bucket
const nodeId = generateNodeIdForBucket(keyRing.getNodeId(), 255, 0);
for (let i = 0; i < 20; i++) {
Expand All @@ -362,7 +362,7 @@ describe(`${NodeManager.name}`, () => {
});
}

mockedPingNode.mockResolvedValue([nodeAddress, nodeContactAddressData]);
mockedPingNode.mockResolvedValue(true);
// Add 21st node
await nodeManager.setNode(
nodeId,
Expand All @@ -374,7 +374,7 @@ describe(`${NodeManager.name}`, () => {
expect(await nodeGraph.getNodeContact(nodeId)).toBeUndefined();
});
test('should add new node if bucket is full and old nodes are responsive but force is set', async () => {
const mockedPingNode = jest.spyOn(nodeManager, 'pingNode');
const mockedPingNode = jest.spyOn(nodeManager, 'pingNodeAddressMultiple');
// Fill bucket
const nodeId = generateNodeIdForBucket(keyRing.getNodeId(), 255, 0);
for (let i = 0; i < 20; i++) {
Expand All @@ -386,7 +386,7 @@ describe(`${NodeManager.name}`, () => {
});
}

mockedPingNode.mockResolvedValue([nodeAddress, nodeContactAddressData]);
mockedPingNode.mockResolvedValue(true);
// Add 21st node
await nodeManager.setNode(
nodeId,
Expand All @@ -399,15 +399,15 @@ describe(`${NodeManager.name}`, () => {
expect(await nodeGraph.getNodeContact(nodeId)).toBeDefined();
});
test('should add new node if bucket is full and old nodes are unresponsive', async () => {
const mockedPingNode = jest.spyOn(nodeManager, 'pingNode');
const mockedPingNode = jest.spyOn(nodeManager, 'pingNodeAddressMultiple');
// Fill bucket
const nodeId = generateNodeIdForBucket(keyRing.getNodeId(), 255, 0);
for (let i = 0; i < 20; i++) {
const nodeId = generateNodeIdForBucket(keyRing.getNodeId(), 255, i + 1);
await nodeManager.setNode(nodeId, nodeAddress, nodeContactAddressData);
}

mockedPingNode.mockResolvedValue(undefined);
mockedPingNode.mockResolvedValue(false);
// Add 21st node
await nodeManager.setNode(
nodeId,
Expand All @@ -419,7 +419,7 @@ describe(`${NodeManager.name}`, () => {
expect(await nodeGraph.getNodeContact(nodeId)).toBeDefined();
});
test('should not block when bucket is full', async () => {
const mockedPingNode = jest.spyOn(nodeManager, 'pingNode');
const mockedPingNode = jest.spyOn(nodeManager, 'pingNodeAddressMultiple');
// Fill bucket
const nodeId = generateNodeIdForBucket(keyRing.getNodeId(), 255, 0);
for (let i = 0; i < 20; i++) {
Expand All @@ -436,7 +436,7 @@ describe(`${NodeManager.name}`, () => {
mockedPingNode.mockImplementation(() => {
return new PromiseCancellable(async (resolve) => {
await waitP;
resolve(undefined);
resolve(false);
});
});
// Add 21st node
Expand Down