Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { sleep } from '@aztec/foundation/sleep';
import { emptyChainConfig } from '@aztec/stdlib/config';
import type { WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server';
import { makeBlockHeader, makeBlockProposal, mockTx } from '@aztec/stdlib/testing';
import { Tx, TxHash } from '@aztec/stdlib/tx';
import { Tx, TxHash, type TxValidator } from '@aztec/stdlib/tx';

import { describe, expect, it, jest } from '@jest/globals';
import { type MockProxy, mock } from 'jest-mock-extended';
Expand All @@ -20,7 +20,6 @@ import type { AttestationPool } from '../../mem_pools/attestation_pool/attestati
import type { TxPoolV2 } from '../../mem_pools/tx_pool_v2/interfaces.js';
import { BatchTxRequester } from '../../services/reqresp/batch-tx-requester/batch_tx_requester.js';
import type { BatchTxRequesterLibP2PService } from '../../services/reqresp/batch-tx-requester/interface.js';
import type { IBatchRequestTxValidator } from '../../services/reqresp/batch-tx-requester/tx_validator.js';
import type { ConnectionSampler } from '../../services/reqresp/connection-sampler/connection_sampler.js';
import { RequestTracker } from '../../services/tx_collection/request_tracker.js';
import { generatePeerIdPrivateKeys } from '../../test-helpers/generate-peer-id-private-keys.js';
Expand All @@ -39,7 +38,7 @@ describe('p2p client integration batch txs', () => {

let mockP2PService: MockProxy<BatchTxRequesterLibP2PService>;
let connectionSampler: MockProxy<ConnectionSampler>;
let txValidator: IBatchRequestTxValidator;
let txValidator: TxValidator;

let logger: Logger;
let p2pBaseConfig: P2PConfig;
Expand All @@ -58,8 +57,7 @@ describe('p2p client integration batch txs', () => {
validateRequestedBlockTxsConsistency: () => Promise.resolve(true),
});
txValidator = {
validateRequestedTx: () => Promise.resolve({ result: 'valid' }),
validateRequestedTxs: txs => Promise.resolve(txs.map(() => ({ result: 'valid' }))),
validateTx: () => Promise.resolve({ result: 'valid' }),
};

logger = createLogger('p2p:test:integration:batch');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { sleep } from '@aztec/foundation/sleep';
import { DateProvider } from '@aztec/foundation/timer';
import { type BlockProposal, PeerErrorSeverity } from '@aztec/stdlib/p2p';
import { makeBlockHeader, makeBlockProposal } from '@aztec/stdlib/testing';
import { Tx, TxArray, TxHash, type TxValidationResult } from '@aztec/stdlib/tx';
import { Tx, TxArray, TxHash, type TxValidationResult, type TxValidator } from '@aztec/stdlib/tx';

import { describe, expect, it, jest } from '@jest/globals';
import type { PeerId } from '@libp2p/interface';
Expand All @@ -26,16 +26,12 @@ import { BatchTxRequester } from './batch_tx_requester.js';
import { DEFAULT_BATCH_TX_REQUESTER_BAD_PEER_THRESHOLD, DEFAULT_BATCH_TX_REQUESTER_TX_BATCH_SIZE } from './config.js';
import type { BatchTxRequesterLibP2PService, IPeerPenalizer } from './interface.js';
import { type IPeerCollection, PeerCollection, RATE_LIMIT_EXCEEDED_PEER_CACHE_TTL } from './peer_collection.js';
import type { IBatchRequestTxValidator } from './tx_validator.js';

/** Mock tx validator for testing that always returns valid */
class AlwaysValidTxValidator implements IBatchRequestTxValidator {
validateRequestedTx(_tx: Tx): Promise<TxValidationResult> {
class AlwaysValidTxValidator implements TxValidator {
validateTx(_tx: Tx): Promise<TxValidationResult> {
return Promise.resolve({ result: 'valid' });
}
validateRequestedTxs(txs: Tx[]): Promise<TxValidationResult[]> {
return Promise.resolve(txs.map(() => ({ result: 'valid' })));
}
}

const TEST_TIMEOUT = 15_000;
Expand All @@ -49,7 +45,7 @@ describe('BatchTxRequester', () => {
let connectionSampler: MockProxy<ConnectionSampler>;
let reqResp: MockProxy<ReqRespInterface>;
let mockP2PService: MockProxy<BatchTxRequesterLibP2PService>;
let txValidator: IBatchRequestTxValidator;
let txValidator: TxValidator;

beforeEach(async () => {
logger = createLogger('test');
Expand Down Expand Up @@ -1191,13 +1187,12 @@ describe('BatchTxRequester', () => {

const invalidTxIndices = new Set([2, 3, 7]); // Mark transactions at indices 2, 3, and 7 as invalid

const customValidator: IBatchRequestTxValidator = {
validateRequestedTx: (tx: Tx) => {
const customValidator: TxValidator = {
validateTx: (tx: Tx) => {
const txIndex = missing.findIndex(h => h.equals(tx.txHash));
const isInvalid = invalidTxIndices.has(txIndex);
return Promise.resolve(isInvalid ? { result: 'invalid', reason: ['test invalid'] } : { result: 'valid' });
},
validateRequestedTxs: (txs: Tx[]) => Promise.all(txs.map(tx => customValidator.validateRequestedTx(tx))),
};

const { mockImplementation } = createRequestLogger(blockProposal, new Set(), peerTransactions);
Expand Down Expand Up @@ -1271,13 +1266,12 @@ describe('BatchTxRequester', () => {
// Validator that rejects transactions at specific indices
// Even indices are rejected, odd indices are accepted
const invalidTxIndices = new Set([0, 2, 4, 6, 8, 10]);
const mixedValidator: IBatchRequestTxValidator = {
validateRequestedTx: (tx: Tx) => {
const mixedValidator: TxValidator = {
validateTx: (tx: Tx) => {
const txIndex = missing.findIndex(h => h.equals(tx.txHash));
const isInvalid = invalidTxIndices.has(txIndex);
return Promise.resolve(isInvalid ? { result: 'invalid', reason: ['test invalid'] } : { result: 'valid' });
},
validateRequestedTxs: (txs: Tx[]) => Promise.all(txs.map(tx => mixedValidator.validateRequestedTx(tx))),
};

const { mockImplementation } = createRequestLogger(blockProposal, new Set(), peerTransactions);
Expand Down Expand Up @@ -1329,8 +1323,8 @@ describe('BatchTxRequester', () => {
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer]);

// Validator that throws errors for specific transactions
const throwingValidator: IBatchRequestTxValidator = {
validateRequestedTx: (tx: Tx) => {
const throwingValidator: TxValidator = {
validateTx: (tx: Tx) => {
const txIndex = missing.findIndex(h => h.equals(tx.txHash));

// Throw error for transactions at indices 1 and 3
Expand All @@ -1345,7 +1339,6 @@ describe('BatchTxRequester', () => {

return Promise.resolve({ result: 'valid' });
},
validateRequestedTxs: (txs: Tx[]) => Promise.all(txs.map(tx => throwingValidator.validateRequestedTx(tx))),
};

const peerTransactions = new Map([[peer.toString(), Array.from({ length: txCount }, (_, i) => i)]]);
Expand Down Expand Up @@ -1698,13 +1691,12 @@ describe('BatchTxRequester', () => {

const invalidTxIndices = new Set([1, 6]); // Mark some transactions as invalid

const customValidator: IBatchRequestTxValidator = {
validateRequestedTx: (tx: Tx) => {
const customValidator: TxValidator = {
validateTx: (tx: Tx) => {
const txIndex = missing.findIndex(h => h.equals(tx.txHash));
const isInvalid = invalidTxIndices.has(txIndex);
return Promise.resolve(isInvalid ? { result: 'invalid', reason: ['test invalid'] } : { result: 'valid' });
},
validateRequestedTxs: (txs: Tx[]) => Promise.all(txs.map(tx => customValidator.validateRequestedTx(tx))),
};

const { mockImplementation } = createRequestLogger(blockProposal, new Set(), peerTransactions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { FifoMemoryQueue, type ISemaphore, Semaphore } from '@aztec/foundation/q
import { sleep } from '@aztec/foundation/sleep';
import { DateProvider } from '@aztec/foundation/timer';
import { PeerErrorSeverity } from '@aztec/stdlib/p2p';
import { Tx, TxArray, TxHash } from '@aztec/stdlib/tx';
import { Tx, TxArray, TxHash, type TxValidator } from '@aztec/stdlib/tx';

import type { PeerId } from '@libp2p/interface';

Expand All @@ -21,7 +21,7 @@ import {
import type { BatchTxRequesterLibP2PService, BatchTxRequesterOptions, ITxMetadataCollection } from './interface.js';
import { MissingTxMetadataCollection } from './missing_txs.js';
import { type IPeerCollection, PeerCollection } from './peer_collection.js';
import { BatchRequestTxValidator, type IBatchRequestTxValidator } from './tx_validator.js';
import { createBatchRequestTxValidator } from './tx_validator.js';

/*
* Tries to fetch all missing transaction until deadline is hit.
Expand Down Expand Up @@ -51,7 +51,7 @@ export class BatchTxRequester {
private readonly txsMetadata: ITxMetadataCollection;
private readonly smartRequesterSemaphore: ISemaphore;
private readonly txQueue: FifoMemoryQueue<Tx>;
private readonly txValidator: IBatchRequestTxValidator;
private readonly txValidator: TxValidator;
private readonly smartParallelWorkerCount: number;
private readonly dumbParallelWorkerCount: number;
private readonly txBatchSize: number;
Expand All @@ -78,7 +78,7 @@ export class BatchTxRequester {
this.opts.dumbParallelWorkerCount ?? DEFAULT_BATCH_TX_REQUESTER_DUMB_PARALLEL_WORKER_COUNT;
this.txBatchSize = this.opts.txBatchSize ?? DEFAULT_BATCH_TX_REQUESTER_TX_BATCH_SIZE;
this.txQueue = new FifoMemoryQueue(this.logger);
this.txValidator = this.opts.txValidator ?? new BatchRequestTxValidator(this.p2pService.txValidatorConfig);
this.txValidator = this.opts.txValidator ?? createBatchRequestTxValidator(this.p2pService.txValidatorConfig);

if (this.opts.peerCollection) {
this.peers = this.opts.peerCollection;
Expand Down Expand Up @@ -509,7 +509,7 @@ export class BatchTxRequester {
const validationResults = await Promise.allSettled(
newTxs.map(async tx => ({
tx,
isValid: (await this.txValidator.validateRequestedTx(tx)).result === 'valid',
isValid: (await this.txValidator.validateTx(tx)).result === 'valid',
})),
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import type { ISemaphore } from '@aztec/foundation/queue';
import type { PeerErrorSeverity } from '@aztec/stdlib/p2p';
import type { Tx, TxHash } from '@aztec/stdlib/tx';
import type { Tx, TxHash, TxValidator } from '@aztec/stdlib/tx';

import type { PeerId } from '@libp2p/interface';

import type { ConnectionSampler } from '../connection-sampler/connection_sampler.js';
import type { BlockTxsRequest, BlockTxsResponse } from '../index.js';
import type { ReqRespInterface } from '../interface.js';
import type { IPeerCollection } from './peer_collection.js';
import type { BatchRequestTxValidatorConfig, IBatchRequestTxValidator } from './tx_validator.js';
import type { BatchRequestTxValidatorConfig } from './tx_validator.js';

export interface IPeerPenalizer {
penalizePeer(peerId: PeerId, penalty: PeerErrorSeverity): void;
Expand Down Expand Up @@ -57,5 +57,5 @@ export interface BatchTxRequesterOptions {
semaphore?: ISemaphore;
peerCollection?: IPeerCollection;
/** Optional tx validator for testing - if not provided, one is created from p2pService.txValidatorConfig */
txValidator?: IBatchRequestTxValidator;
txValidator?: TxValidator;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { ClientProtocolCircuitVerifier } from '@aztec/stdlib/interfaces/server';
import { Tx, type TxValidationResult, type TxValidator } from '@aztec/stdlib/tx';
import type { TxValidator } from '@aztec/stdlib/tx';

import { createTxValidatorForOnDemandReceivedTxs } from '../../../msg_validators/index.js';

Expand All @@ -9,29 +9,9 @@ export interface BatchRequestTxValidatorConfig {
proofVerifier: ClientProtocolCircuitVerifier;
}

export interface IBatchRequestTxValidator {
validateRequestedTx(tx: Tx): Promise<TxValidationResult>;
validateRequestedTxs(txs: Tx[]): Promise<TxValidationResult[]>;
}

export class BatchRequestTxValidator implements IBatchRequestTxValidator {
readonly txValidator: TxValidator;
constructor(private readonly config: BatchRequestTxValidatorConfig) {
this.txValidator = BatchRequestTxValidator.createRequestedTxValidator(this.config);
}

public async validateRequestedTx(tx: Tx): Promise<TxValidationResult> {
return await this.txValidator.validateTx(tx);
}

public async validateRequestedTxs(txs: Tx[]): Promise<TxValidationResult[]> {
return await Promise.all(txs.map(tx => this.validateRequestedTx(tx)));
}

static createRequestedTxValidator(config: BatchRequestTxValidatorConfig): TxValidator {
return createTxValidatorForOnDemandReceivedTxs(config.proofVerifier, {
l1ChainId: config.l1ChainId,
rollupVersion: config.rollupVersion,
});
}
export function createBatchRequestTxValidator(config: BatchRequestTxValidatorConfig): TxValidator {
return createTxValidatorForOnDemandReceivedTxs(config.proofVerifier, {
l1ChainId: config.l1ChainId,
rollupVersion: config.rollupVersion,
});
}
9 changes: 3 additions & 6 deletions yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import type { DataStoreConfig } from '@aztec/stdlib/kv-store';
import { type BlockProposal, P2PMessage } from '@aztec/stdlib/p2p';
import { ChonkProof } from '@aztec/stdlib/proofs';
import { makeAztecAddress, makeBlockHeader, makeBlockProposal, mockTx } from '@aztec/stdlib/testing';
import { Tx, TxHash, type TxValidationResult } from '@aztec/stdlib/tx';
import { Tx, TxHash, type TxValidationResult, type TxValidator } from '@aztec/stdlib/tx';
import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client';

import type { Message, PeerId } from '@libp2p/interface';
Expand All @@ -38,7 +38,6 @@ import { LibP2PService } from '../services/index.js';
import type { PeerManager } from '../services/peer-manager/peer_manager.js';
import { BatchTxRequester } from '../services/reqresp/batch-tx-requester/batch_tx_requester.js';
import type { BatchTxRequesterLibP2PService } from '../services/reqresp/batch-tx-requester/interface.js';
import type { IBatchRequestTxValidator } from '../services/reqresp/batch-tx-requester/tx_validator.js';
import { RateLimitStatus } from '../services/reqresp/rate-limiter/rate_limiter.js';
import type { ReqResp } from '../services/reqresp/reqresp.js';
import type { PeerDiscoveryService } from '../services/service.js';
Expand Down Expand Up @@ -276,10 +275,8 @@ async function runAggregatorBenchmark(
}
}

const noopTxValidator: IBatchRequestTxValidator = {
validateRequestedTx: (_tx: Tx): Promise<TxValidationResult> => Promise.resolve({ result: 'valid' }),
validateRequestedTxs: (txs: Tx[]): Promise<TxValidationResult[]> =>
Promise.resolve(txs.map(() => ({ result: 'valid' }))),
const noopTxValidator: TxValidator = {
validateTx: (_tx: Tx): Promise<TxValidationResult> => Promise.resolve({ result: 'valid' }),
};

timer = new Timer();
Expand Down
Loading