Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ export type EnvVar =
| 'P2P_TX_POOL_DELETE_TXS_AFTER_REORG'
| 'P2P_MIN_TX_POOL_AGE_MS'
| 'P2P_RPC_PRICE_BUMP_PERCENTAGE'
| 'P2P_TX_VALIDATION_CACHE_SIZE'
| 'DEBUG_P2P_INSTRUMENT_MESSAGES'
| 'PEER_ID_PRIVATE_KEY'
| 'PEER_ID_PRIVATE_KEY_PATH'
Expand Down
14 changes: 13 additions & 1 deletion yarn-project/p2p/src/client/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
createTxValidatorForTransactionsEnteringPendingTxPool,
getDefaultAllowedSetupFunctions,
} from '../msg_validators/index.js';
import { TxValidationCache } from '../msg_validators/tx_validator/tx_validation_cache.js';
import { DummyP2PService } from '../services/dummy_service.js';
import { LibP2PService } from '../services/index.js';
import { createFileStoreTxSources } from '../services/tx_collection/file_store_tx_source.js';
Expand Down Expand Up @@ -137,6 +138,9 @@ export async function createP2PClient(
attestationPool: deps.attestationPool ?? new AttestationPool(attestationStore, telemetry),
};

const txValidationCache =
config.txValidationCacheSize > 0 ? new TxValidationCache(config.txValidationCacheSize) : undefined;

const p2pService = await createP2PService(
config,
archiver,
Expand All @@ -151,9 +155,15 @@ export async function createP2PClient(
packageVersion,
logger.createChild('libp2p_service'),
telemetry,
txValidationCache,
);

const txValidatorForTxCollection = createTxValidatorForReqResponseReceivedTxs(proofVerifier, config);
const txValidatorForTxCollection = createTxValidatorForReqResponseReceivedTxs(
proofVerifier,
config,
/*bindings=*/ undefined,
txValidationCache,
);
const nodeSources = [
...createNodeRpcTxSources(config.txCollectionNodeRpcUrls, txValidatorForTxCollection, config),
...(deps.rpcTxProviders ?? []).map(
Expand Down Expand Up @@ -230,6 +240,7 @@ async function createP2PService(
packageVersion: string,
logger: Logger,
telemetry: TelemetryClient,
txValidationCache?: TxValidationCache,
) {
if (!config.p2pEnabled) {
logger.verbose('P2P is disabled. Using dummy P2P service.');
Expand All @@ -253,6 +264,7 @@ async function createP2PService(
blockMinFeesProvider,
telemetry,
logger: logger.createChild(`libp2p_service`),
txValidationCache,
});

return p2pService;
Expand Down
8 changes: 8 additions & 0 deletions yarn-project/p2p/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ export interface P2PConfig
/** The node's seen message ID cache size */
seenMessageCacheSize: number;

/** Maximum number of (validator, tx) pairs to keep in the tx validation LRU cache. */
txValidationCacheSize: number;

/** True to disable the status handshake on peer connected. */
p2pDisableStatusHandshake?: boolean;

Expand Down Expand Up @@ -471,6 +474,11 @@ export const p2pConfigMappings: ConfigMappingsType<P2PConfig> = {
description: 'The number of messages to keep in the seen message cache',
...numberConfigHelper(100_000), // 100K
},
txValidationCacheSize: {
env: 'P2P_TX_VALIDATION_CACHE_SIZE',
description: 'Maximum number of (validator, tx) pairs to keep in the tx validation LRU cache.',
...numberConfigHelper(10_000),
},
p2pDisableStatusHandshake: {
env: 'P2P_DISABLE_STATUS_HANDSHAKE',
description: 'True to disable the status handshake on peer connected.',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { mockTx } from '@aztec/stdlib/testing';
import type { AnyTx, TxValidationResult, TxValidator } from '@aztec/stdlib/tx';

import { jest } from '@jest/globals';

import { CachedTxValidator } from './cached_tx_validator.js';
import type { ITxValidationCache } from './tx_validation_cache.js';

describe('CachedTxValidator', () => {
class TestValidator implements TxValidator<AnyTx> {
public readonly identifier = Symbol('TestValidator');

constructor(private readonly validateImpl: (tx: AnyTx) => Promise<TxValidationResult>) {}

public validateTx(tx: AnyTx): Promise<TxValidationResult> {
return this.validateImpl(tx);
}
}

class TestTxValidatorCache implements ITxValidationCache {
public readonly getOrValidate: jest.MockedFunction<ITxValidationCache['getOrValidate']>;

constructor(impl?: ITxValidationCache['getOrValidate']) {
this.getOrValidate = jest.fn(impl ?? ((_s, _h, validate) => validate()));
}
}

it('returns inner validator unchanged when cache is not provided', () => {
const inner = new TestValidator(() => Promise.resolve({ result: 'valid' }));

const wrapped = CachedTxValidator.new(inner, undefined);

expect(wrapped).toBe(inner);
});

it('delegates validation to cache.getOrValidate using validator identifier and tx hash', async () => {
const tx = await mockTx(1);
const validate = jest.fn<(tx: AnyTx) => Promise<TxValidationResult>>().mockResolvedValue({ result: 'valid' });
const inner = new TestValidator(txArg => validate(txArg));
const cache = new TestTxValidatorCache();

const wrapped = CachedTxValidator.new(inner, cache);
await wrapped.validateTx(tx);

expect(cache.getOrValidate).toHaveBeenCalledTimes(1);
expect(cache.getOrValidate).toHaveBeenCalledWith(inner.identifier, tx.getTxHash(), expect.any(Function));
expect(validate).toHaveBeenCalledTimes(1);
});

it('returns the value produced by cache.getOrValidate', async () => {
const tx = await mockTx(2);
const result: TxValidationResult = { result: 'invalid', reason: ['cache-hit'] };
const validate = jest.fn<(tx: AnyTx) => Promise<TxValidationResult>>().mockResolvedValue({ result: 'valid' });
const inner = new TestValidator(txArg => validate(txArg));
const cache = new TestTxValidatorCache(() => Promise.resolve(result));

const wrapped = CachedTxValidator.new(inner, cache);

await expect(wrapped.validateTx(tx)).resolves.toEqual(result);
expect(validate).not.toHaveBeenCalled();
});

it('propagates rejections from cache.getOrValidate', async () => {
const tx = await mockTx(3);
const error = new Error('cache failed');
const cache = new TestTxValidatorCache(() => Promise.reject(error));
const wrapped = CachedTxValidator.new(new TestValidator(() => Promise.resolve({ result: 'valid' })), cache);

await expect(wrapped.validateTx(tx)).rejects.toThrow(error.message);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type { AnyTx, TxValidationResult, TxValidator } from '@aztec/stdlib/tx';
import { getTxHash } from '@aztec/stdlib/tx';

import type { ITxValidationCache } from './tx_validation_cache.js';

/** Wraps a {@link TxValidator} to cache its results in a shared {@link ITxValidationCache}. */
export class CachedTxValidator<T extends AnyTx> implements TxValidator<T> {
constructor(
private readonly inner: TxValidator<T>,
private readonly validatorSymbol: symbol,
private readonly cache: ITxValidationCache,
) {}

public static new<T extends AnyTx>(
inner: TxValidator<T> & { identifier: symbol },
cache?: ITxValidationCache,
): TxValidator<T> {
return CachedTxValidator.newWithIdentifier(inner, inner.identifier, cache);
}

public static newWithIdentifier<T extends AnyTx>(
inner: TxValidator<T>,
identifier: symbol,
cache?: ITxValidationCache,
): TxValidator<T> {
return cache ? new CachedTxValidator(inner, identifier, cache) : inner;
}

public validateTx(tx: T): Promise<TxValidationResult> {
return this.cache.getOrValidate(this.validatorSymbol, getTxHash(tx), () => this.inner.validateTx(tx));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import {
} from '@aztec/stdlib/tx';

export class DataTxValidator implements TxValidator<Tx> {
public readonly identifier: symbol = Symbol('DataTxValidator');

#log: Logger;

constructor(bindings?: LoggerBindings) {
Expand Down
30 changes: 21 additions & 9 deletions yarn-project/p2p/src/msg_validators/tx_validator/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import type { TxMetaData } from '../../mem_pools/tx_pool_v2/tx_metadata.js';
import { AggregateTxValidator } from './aggregate_tx_validator.js';
import { ArchiveCache } from './archive_cache.js';
import { type ArchiveSource, BlockHeaderTxValidator } from './block_header_validator.js';
import { CachedTxValidator } from './cached_tx_validator.js';
import { ContractInstanceTxValidator } from './contract_instance_validator.js';
import { DataTxValidator } from './data_validator.js';
import { DoubleSpendTxValidator, type NullifierSource } from './double_spend_validator.js';
Expand All @@ -64,6 +65,7 @@ import { SizeTxValidator } from './size_validator.js';
import { TimestampTxValidator } from './timestamp_validator.js';
import { TxPermittedValidator } from './tx_permitted_validator.js';
import { TxProofValidator } from './tx_proof_validator.js';
import { TxValidationCache } from './tx_validation_cache.js';

/**
* A validator paired with a peer penalty severity.
Expand Down Expand Up @@ -99,6 +101,7 @@ export function createFirstStageTxValidationsForGossipedTransactions(
allowedInSetup: AllowedElement[] = [],
bindings?: LoggerBindings,
gasLimitOpts?: { rollupManaLimit?: number; maxBlockL2Gas?: number; maxBlockDAGas?: number },
cache?: TxValidationCache,
): Record<string, TransactionValidator> {
const merkleTree = worldStateSynchronizer.getCommitted();

Expand Down Expand Up @@ -165,7 +168,7 @@ export function createFirstStageTxValidationsForGossipedTransactions(
severity: PeerErrorSeverity.MidToleranceError,
},
dataValidator: {
validator: new DataTxValidator(bindings),
validator: CachedTxValidator.new(new DataTxValidator(bindings), cache),
severity: PeerErrorSeverity.MidToleranceError,
},
contractInstanceValidator: {
Expand All @@ -185,10 +188,11 @@ export function createFirstStageTxValidationsForGossipedTransactions(
export function createSecondStageTxValidationsForGossipedTransactions(
proofVerifier: ClientProtocolCircuitVerifier,
bindings?: LoggerBindings,
cache?: TxValidationCache,
): Record<string, TransactionValidator> {
return {
proofValidator: {
validator: new TxProofValidator(proofVerifier, bindings),
validator: CachedTxValidator.new(new TxProofValidator(proofVerifier, bindings), cache),
severity: PeerErrorSeverity.LowToleranceError,
},
};
Expand All @@ -210,8 +214,9 @@ function createTxValidatorForMinimumTxIntegrityChecks(
rollupVersion: number;
},
bindings?: LoggerBindings,
cache?: TxValidationCache,
): TxValidator {
return new AggregateTxValidator(
const aggregate = new AggregateTxValidator(
new MetadataTxValidator(
{
l1ChainId: new Fr(l1ChainId),
Expand All @@ -222,10 +227,14 @@ function createTxValidatorForMinimumTxIntegrityChecks(
bindings,
),
new SizeTxValidator(bindings),
new DataTxValidator(bindings),
CachedTxValidator.new(new DataTxValidator(bindings), cache),
new ContractInstanceTxValidator(bindings),
new TxProofValidator(verifier, bindings),
CachedTxValidator.new(new TxProofValidator(verifier, bindings), cache),
);

// This validator is not state-dependent so we can cache it.
const identifier = Symbol('TxValidatorForMinimumTxIntegrityChecks');
return CachedTxValidator.newWithIdentifier(aggregate, identifier, cache);
}

/**
Expand All @@ -244,8 +253,9 @@ export function createTxValidatorForReqResponseReceivedTxs(
rollupVersion: number;
},
bindings?: LoggerBindings,
cache?: TxValidationCache,
): TxValidator {
return createTxValidatorForMinimumTxIntegrityChecks(verifier, { l1ChainId, rollupVersion }, bindings);
return createTxValidatorForMinimumTxIntegrityChecks(verifier, { l1ChainId, rollupVersion }, bindings, cache);
}

/**
Expand All @@ -263,8 +273,9 @@ export function createTxValidatorForBlockProposalReceivedTxs(
rollupVersion: number;
},
bindings?: LoggerBindings,
cache?: TxValidationCache,
): TxValidator {
return createTxValidatorForMinimumTxIntegrityChecks(verifier, { l1ChainId, rollupVersion }, bindings);
return createTxValidatorForMinimumTxIntegrityChecks(verifier, { l1ChainId, rollupVersion }, bindings, cache);
}

/**
Expand Down Expand Up @@ -303,6 +314,7 @@ export function createTxValidatorForAcceptingTxsOverRPC(
maxBlockDAGas?: number;
},
bindings?: LoggerBindings,
cache?: TxValidationCache,
): TxValidator<Tx> {
const validators: TxValidator<Tx>[] = [
new TxPermittedValidator(txsPermitted, bindings),
Expand All @@ -326,7 +338,7 @@ export function createTxValidatorForAcceptingTxsOverRPC(
new PhasesTxValidator(contractDataSource, setupAllowList, timestamp, bindings),
new BlockHeaderTxValidator(new ArchiveCache(db), bindings),
new DoubleSpendTxValidator(new NullifierCache(db), bindings),
new DataTxValidator(bindings),
CachedTxValidator.new(new DataTxValidator(bindings), cache),
new ContractInstanceTxValidator(bindings),
];

Expand All @@ -341,7 +353,7 @@ export function createTxValidatorForAcceptingTxsOverRPC(
}

if (verifier) {
validators.push(new TxProofValidator(verifier, bindings));
validators.push(CachedTxValidator.new(new TxProofValidator(verifier, bindings), cache));
}

return new AggregateTxValidator(...validators);
Expand Down
2 changes: 2 additions & 0 deletions yarn-project/p2p/src/msg_validators/tx_validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ export * from './tx_permitted_validator.js';
export * from './timestamp_validator.js';
export * from './size_validator.js';
export * from './factory.js';
export * from './tx_validation_cache.js';
export * from './cached_tx_validator.js';
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import type { ClientProtocolCircuitVerifier } from '@aztec/stdlib/interfaces/ser
import { TX_ERROR_INVALID_PROOF, Tx, type TxValidationResult, type TxValidator } from '@aztec/stdlib/tx';

export class TxProofValidator implements TxValidator<Tx> {
public readonly identifier: symbol = Symbol('TxProofValidator');

#log: Logger;

constructor(
Expand Down
Loading
Loading