Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions src/chains/stellar/announcements.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ export class RetentionExceededError extends Error {
* ```
*
* @see {@link getDeployment}
*
* @deprecated Prefer {@link fetchAnnouncementsStream} for memory-efficient streaming.
*/
export async function fetchAnnouncements(
chain?: string,
Expand Down Expand Up @@ -177,6 +179,86 @@ export async function fetchAnnouncements(
return returnsCursor ? { announcements: all, nextCursor } : all;
}

/**
* Streaming version of announcement fetching. Yields announcements page by page
* from the Soroban RPC as they arrive, never holding more than one page in memory.
*
* Cancellation is automatic: breaking out of the `for-await` loop stops the stream.
*
* @param chain The chain identifier (default: "stellar").
* @param sorobanUrl Optional override for the Soroban RPC URL.
*/
export async function* fetchAnnouncementsStream(
chain: string = 'stellar',
sorobanUrl?: string,
): AsyncGenerator<Announcement> {
const deployment = getDeployment(chain);
const url = sorobanUrl || deployment.sorobanUrl;
const announcerContract = deployment.contracts.announcer;

const probeRes = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
jsonrpc: '2.0',
id: 0,
method: 'getEvents',
params: {
startLedger: 1,
filters: [{ type: 'contract', contractIds: [announcerContract] }],
pagination: { limit: 1 },
},
}),
});

const probeData = await probeRes.json();
let startLedger = 1;

if (probeData.error?.message) {
const range = parseLedgerRange(probeData.error.message);
if (range) {
startLedger = Math.max(range.oldest, range.latest - 5000);
} else {
return;
}
}

let cursor: string | undefined;
let hasMore = true;

while (hasMore) {
const params: Record<string, unknown> = {
filters: [{ type: 'contract', contractIds: [announcerContract] }],
pagination: cursor ? { limit: 1000, cursor } : { limit: 1000 },
};

if (!cursor) {
params.startLedger = startLedger;
}

const res = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ jsonrpc: '2.0', id: 2, method: 'getEvents', params }),
});

const data = await res.json();
const events: Record<string, unknown>[] = data.result?.events ?? [];

for (const event of events) {
const ann = parseAnnouncementEvent(event);
if (ann) yield ann;
}

if (events.length < 1000) {
hasMore = false;
} else {
cursor = data.result?.cursor;
if (!cursor) hasMore = false;
}
}
}

async function getSorobanLedgerWindow(
sorobanUrl: string,
announcerContract: string,
Expand Down
4 changes: 2 additions & 2 deletions src/chains/stellar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ export { deriveStealthKeys } from './keys';
export { STEALTH_SIGNING_MESSAGE, SCHEME_ID, META_ADDRESS_PREFIX } from './constants';
export { encodeStealthMetaAddress, decodeStealthMetaAddress } from './meta-address';
export { generateStealthAddress, computeSharedSecret, computeViewTag } from './stealth';
export { checkStealthAddress, scanAnnouncements } from './scan';
export { checkStealthAddress, scanAnnouncements, scanAnnouncementsStream } from './scan';
export { deriveStealthPrivateScalar, signStellarTransaction } from './spend';
export {
seedToScalar,
Expand All @@ -13,7 +13,7 @@ export {
L,
} from './scalar';
export { bytesToHex, hexToBytes } from './utils';
export { fetchAnnouncements, RetentionExceededError } from './announcements';
export { fetchAnnouncements, fetchAnnouncementsStream, RetentionExceededError } from './announcements';
export type { FetchAnnouncementsOptions, FetchAnnouncementsResult } from './announcements';
export { DEPLOYMENTS, getDeployment } from './deployments';
export type { StellarChainDeployment } from './deployments';
Expand Down
73 changes: 73 additions & 0 deletions src/chains/stellar/scan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,75 @@ import { SCHEME_ID } from './constants';
import type { Announcement, MatchedAnnouncement } from './types';
import { hexToBytes } from './utils';

/**
* Streaming announcement scanner. Pulls announcements from `source` in windows of
* `opts.window` (default 64), scans each window, and yields matches immediately.
*
* Peak memory is O(window) — never accumulates the full announcement set.
* Cancellation is clean: breaking out of the `for-await` loop triggers the `finally`
* block which calls `.return()` on the source iterator, stopping upstream I/O.
*
* @param source Async iterable of announcements (e.g. from {@link fetchAnnouncementsStream}).
* @param opts.window Max announcements buffered at once. Smaller = less memory, larger = fewer
* async round-trips to the source. Default: 64.
*/
export async function* scanAnnouncementsStream(
source: AsyncIterable<Announcement>,
viewingKey: Uint8Array,
spendingPubKey: Uint8Array,
spendingScalar: bigint,
opts: { window?: number } = {},
): AsyncGenerator<MatchedAnnouncement> {
const windowSize = Math.max(1, opts.window ?? 64);
const iter = source[Symbol.asyncIterator]();

try {
while (true) {
// Prefetch up to windowSize announcements — bounds peak memory to O(window)
const batch: Announcement[] = [];
for (let i = 0; i < windowSize; i++) {
const next = await iter.next();
if (next.done) break;
batch.push(next.value);
}

if (batch.length === 0) break;

for (const ann of batch) {
if (ann.schemeId !== SCHEME_ID) continue;

const metadataBytes = hexToBytes(ann.metadata);
if (metadataBytes.length === 0) continue;
const viewTag = metadataBytes[0];

const ephPubKey = hexToBytes(ann.ephemeralPubKey);
if (ephPubKey.length !== 32) continue;

const result = checkStealthAddress(ephPubKey, viewingKey, spendingPubKey, viewTag);

if (
result.isMatch &&
result.stealthAddress === ann.stealthAddress &&
result.hashScalar !== null &&
result.stealthPubKeyBytes !== null
) {
const stealthPrivateScalar = (spendingScalar + result.hashScalar) % L;
yield {
...ann,
stealthPrivateScalar,
stealthPubKeyBytes: result.stealthPubKeyBytes,
};
}
}

if (batch.length < windowSize) break;
}
} finally {
// Signal upstream to stop I/O when consumer cancels early
await iter.return?.();
}
}

/**
* Checks whether one Stellar announcement can belong to a recipient.
*
Expand Down Expand Up @@ -61,6 +130,10 @@ export function checkStealthAddress(
/**
* Scans Stellar stealth announcements and returns the ones a recipient can spend.
*
* @deprecated Prefer {@link scanAnnouncementsStream} for memory-efficient streaming.
* For large announcement sets this loads the full array into memory, which can
* exhaust TEE memory budgets. This function is kept for backward compatibility.
*
* Use this after fetching Soroban announcements. The spending scalar is required
* because matched results include the derived stealth private scalar for later
* transaction signing.
Expand Down
Loading