Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
346 changes: 321 additions & 25 deletions package-lock.json

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
"benchmark:compare": "ts-node scripts/compare-benchmarks.ts"
},
"dependencies": {
"@bull-board/api": "^7.1.5",
"@bull-board/express": "^7.1.5",
"@bull-board/nestjs": "^7.1.5",
"@libsql/client": "^0.17.0",
"@nestjs/bullmq": "^11.0.4",
"@nestjs/common": "^11.1.12",
"@nestjs/config": "^4.0.2",
"@nestjs/core": "^11.1.12",
Expand All @@ -42,6 +46,7 @@
"@prisma/adapter-libsql": "^7.3.0",
"@prisma/client": "^7.3.0",
"@types/pg": "^8.16.0",
"bullmq": "^5.77.6",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.3",
"dotenv": "^17.2.3",
Expand Down
19 changes: 19 additions & 0 deletions src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { Module, Logger } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { BullModule } from '@nestjs/bullmq';
import { BullBoardModule } from '@bull-board/nestjs';
import { ExpressAdapter } from '@bull-board/express';
import { ThrottlerModule } from '@nestjs/throttler';
import { APP_GUARD, APP_INTERCEPTOR } from '@nestjs/core';
import { AppController } from './app.controller';
Expand Down Expand Up @@ -47,12 +50,12 @@

async increment(
key: string,
ttl: number,

Check failure on line 53 in src/app.module.ts

View workflow job for this annotation

GitHub Actions / Lint

Async method 'increment' has no 'await' expression
limit: number,
blockDuration: number,
throttlerName: string,
): Promise<{ totalHits: number; timeToExpire: number; isBlocked: boolean; timeToBlockExpire: number }> {
const now = Date.now();

Check failure on line 58 in src/app.module.ts

View workflow job for this annotation

GitHub Actions / Lint

'throttlerName' is defined but never used
const record = this.storage.get(key);

if (!record) {
Expand Down Expand Up @@ -126,7 +129,7 @@
ttl: number,
limit: number,
blockDuration: number,
throttlerName: string,

Check failure on line 132 in src/app.module.ts

View workflow job for this annotation

GitHub Actions / Lint

Unsafe assignment of an `any` value
): Promise<{ totalHits: number; timeToExpire: number; isBlocked: boolean; timeToBlockExpire: number }> {
const blockKey = `${key}:blocked`;
const [blocked, blockTimeToExpire] = await Promise.all([
Expand All @@ -135,7 +138,7 @@
]);

if (blocked) {
const timeToExpire = await this.redis.pttl(key);

Check failure on line 141 in src/app.module.ts

View workflow job for this annotation

GitHub Actions / Lint

'throttlerName' is defined but never used
return {
totalHits: await this.redis.get(key).then((value: string | null) => Number(value) || limit + 1),
timeToExpire: timeToExpire > 0 ? timeToExpire : ttl,
Expand All @@ -143,9 +146,9 @@
timeToBlockExpire: blockTimeToExpire > 0 ? blockTimeToExpire : 0,
};
}

Check failure on line 149 in src/app.module.ts

View workflow job for this annotation

GitHub Actions / Lint

Unsafe array destructuring of a tuple element with an `any` value

Check failure on line 149 in src/app.module.ts

View workflow job for this annotation

GitHub Actions / Lint

Unsafe array destructuring of a tuple element with an `any` value
const [totalHits, existingTtl] = await Promise.all([

Check failure on line 150 in src/app.module.ts

View workflow job for this annotation

GitHub Actions / Lint

Unsafe member access .exists on an `any` value

Check failure on line 150 in src/app.module.ts

View workflow job for this annotation

GitHub Actions / Lint

Unsafe call of an `any` typed value
this.redis.incr(key),

Check failure on line 151 in src/app.module.ts

View workflow job for this annotation

GitHub Actions / Lint

Unsafe call of an `any` typed value
this.redis.pttl(key),
]);

Expand Down Expand Up @@ -245,6 +248,22 @@
};
},
}),
BullModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (configService: ConfigService) => ({
connection: {
host: configService.get<string>('REDIS_HOST', 'localhost'),
port: configService.get<number>('REDIS_PORT', 6379),
password: configService.get<string>('REDIS_PASSWORD'),
db: configService.get<number>('REDIS_DB', 0),
},
}),
}),
BullBoardModule.forRoot({
route: '/admin/queues',
adapter: ExpressAdapter,
}),
RedisModule,
LoggerModule,
AuthModule,
Expand Down
9 changes: 5 additions & 4 deletions src/audit/services/audit-trail.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { AuditActionType, AuditEntityType } from '../entities/audit-log.entity';

describe('AuditTrailService - IP Security', () => {
let service: AuditTrailService;
let repository: jest.Mocked<Repository<AuditLog>>;
let repository: any;
let mockRequest: any;

beforeEach(async () => {
Expand Down Expand Up @@ -153,6 +153,7 @@ describe('AuditTrailService - IP Security', () => {

const serviceWithoutRequest =
moduleWithoutRequest.get<AuditTrailService>(AuditTrailService);
const innerRepo = moduleWithoutRequest.get(getRepositoryToken(AuditLog)) as any;

const auditInput = {
actionType: AuditActionType.CLAIM_CREATED,
Expand All @@ -161,15 +162,15 @@ describe('AuditTrailService - IP Security', () => {
description: 'Test without request',
};

repository.create.mockReturnValue({
innerRepo.create.mockReturnValue({
...auditInput,
ipAddress: undefined,
});
repository.save.mockResolvedValue({ id: 'audit-4' });
innerRepo.save.mockResolvedValue({ id: 'audit-4' });

await serviceWithoutRequest.log(auditInput);

expect(repository.create).toHaveBeenCalledWith(
expect(innerRepo.create).toHaveBeenCalledWith(
expect.objectContaining({
ipAddress: undefined,
}),
Expand Down
2 changes: 1 addition & 1 deletion src/claims/claim-resolution.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ClaimResolutionService } from "./claim-resolution.service";

describe('Confidence Scoring', () => {
const service = new ClaimResolutionService(null as any);
const service = new ClaimResolutionService(null as any, null as any);

it('returns high confidence for strong consensus', () => {
const score = service.computeConfidenceScore({
Expand Down
19 changes: 19 additions & 0 deletions src/claims/claims.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Test, TestingModule } from '@nestjs/testing';
import { BadRequestException } from '@nestjs/common';
import { getRepositoryToken } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { ClaimsService } from './claims.service';
Expand Down Expand Up @@ -194,6 +195,24 @@ describe('ClaimsService', () => {

expect(redisService.del).toHaveBeenCalledWith('claims:latest');
});

it('should throw BadRequestException if claim content length exceeds 5000 characters', async () => {
const longContent = 'a'.repeat(5001);
const createClaimDto = ClaimFactory.createCreateClaimDto({ content: longContent });

await expect(service.createClaim(createClaimDto)).rejects.toThrow(
new BadRequestException('Claim content exceeds maximum length of 5000 characters')
);
});

it('should throw BadRequestException if claim title length exceeds 200 characters', async () => {
const longTitle = 'a'.repeat(201);
const createClaimDto = ClaimFactory.createCreateClaimDto({ title: longTitle });

await expect(service.createClaim(createClaimDto)).rejects.toThrow(
new BadRequestException('Claim title exceeds maximum length of 200 characters')
);
});
});

describe('findOne', () => {
Expand Down
23 changes: 9 additions & 14 deletions src/claims/claims.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable, Logger } from '@nestjs/common';
import { Injectable, Logger, BadRequestException } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { Claim } from './entities/claim.entity';
Expand Down Expand Up @@ -89,11 +89,17 @@ export class ClaimsService {
captureAfterState: true,
})
async createClaim(createClaimDto: CreateClaimDto): Promise<Claim> {
if (createClaimDto.title && createClaimDto.title.length > 200) {
throw new BadRequestException('Claim title exceeds maximum length of 200 characters');
}
if (createClaimDto.content && createClaimDto.content.length > 5000) {
throw new BadRequestException('Claim content exceeds maximum length of 5000 characters');
}
const claim = this.claimRepo.create({
title: createClaimDto.title,
content: createClaimDto.content,
source: createClaimDto.source,
metadata: createClaimDto.metadata,
source: createClaimDto.source ?? null,
metadata: createClaimDto.metadata ?? null,
resolvedVerdict: null, // Will be computed later
confidenceScore: null, // Will be computed later
finalized: false,
Expand Down Expand Up @@ -172,16 +178,5 @@ export class ClaimsService {

return updatedClaim;
}
async findOne(id: string): Promise<Claim> {
const claim = await this.repo.findOne({
where: { id },
});

if (!claim) {
throw new NotFoundException(`Claim with id ${id} not found`);
}

return claim;
}
}

2 changes: 1 addition & 1 deletion src/claims/entities/claim.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class Claim {
@Column({ type: 'varchar', length: 200 })
title: string;

@Column({ type: 'text' })
@Column({ type: 'varchar', length: 5000 })
content: string;

@Column({ type: 'varchar', length: 500, nullable: true })
Expand Down
21 changes: 18 additions & 3 deletions src/jobs/jobs.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,25 @@ import { Wallet } from '../entities/wallet.entity';
import { Claim } from '../claims/entities/claim.entity';
import { User } from '../entities/user.entity';
import { AggregationModule } from '../aggregation/aggregation.module';
import { BullModule } from '@nestjs/bullmq';
import { BullBoardModule } from '@bull-board/nestjs';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { JobsProcessor } from './jobs.processor';

@Module({
imports: [RedisModule, TypeOrmModule.forFeature([Stake, Wallet, Claim, User]), AggregationModule],
providers: [JobsService],
exports: [JobsService],
imports: [
RedisModule,
TypeOrmModule.forFeature([Stake, Wallet, Claim, User]),
AggregationModule,
BullModule.registerQueue({
name: 'jobs-queue',
}),
BullBoardModule.forFeature({
name: 'jobs-queue',
adapter: BullMQAdapter,
}),
],
providers: [JobsService, JobsProcessor],
exports: [JobsService, BullModule],
})
export class JobsModule {}
28 changes: 28 additions & 0 deletions src/jobs/jobs.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { Injectable, Logger } from '@nestjs/common';
import { JobsService } from './jobs.service';

@Processor('jobs-queue')
@Injectable()
export class JobsProcessor extends WorkerHost {
private readonly logger = new Logger(JobsProcessor.name);

constructor(private readonly jobsService: JobsService) {
super();
}

async process(job: Job<any, any, string>): Promise<any> {
this.logger.log(`Processing job ${job.id} of name ${job.name}`);
switch (job.name) {
case 'compute-scores':
await this.jobsService.computeScores();
return { success: true };
case 'compute-reputation':
await this.jobsService.computeReputation();
return { success: true };
default:
throw new Error(`Unknown job name: ${job.name}`);
}
}
}
150 changes: 150 additions & 0 deletions src/jobs/jobs.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import { Test, TestingModule } from '@nestjs/testing';
import { getRepositoryToken } from '@nestjs/typeorm';
import { JobsService } from './jobs.service';
import { JobsProcessor } from './jobs.processor';
import { Stake } from '../staking/entities/stake.entity';
import { Wallet } from '../entities/wallet.entity';
import { Claim } from '../claims/entities/claim.entity';
import { User } from '../entities/user.entity';
import { ClaimsCache } from '../cache/claims.cache';
import { RedisService } from '../redis/redis.service';
import { AggregationService } from '../aggregation/aggregation.service';
import { getQueueToken } from '@nestjs/bullmq';
import { Repository } from 'typeorm';
import { Job } from 'bullmq';

describe('Jobs (BullMQ & Scheduling)', () => {
let service: JobsService;
let processor: JobsProcessor;
let queueMock: any;
let stakeRepo: Repository<Stake>;
let walletRepo: Repository<Wallet>;
let claimRepo: Repository<Claim>;
let userRepo: Repository<User>;

beforeEach(async () => {
queueMock = {
getRepeatableJobs: jest.fn().mockResolvedValue([
{ key: 'old-scores-key' },
{ key: 'old-reputation-key' },
]),
removeRepeatableByKey: jest.fn().mockResolvedValue(true),
add: jest.fn().mockResolvedValue({ id: 'new-job' }),
};

const module: TestingModule = await Test.createTestingModule({
providers: [
JobsService,
JobsProcessor,
{
provide: getQueueToken('jobs-queue'),
useValue: queueMock,
},
{
provide: getRepositoryToken(Stake),
useClass: Repository,
},
{
provide: getRepositoryToken(Wallet),
useClass: Repository,
},
{
provide: getRepositoryToken(Claim),
useClass: Repository,
},
{
provide: getRepositoryToken(User),
useClass: Repository,
},
{
provide: ClaimsCache,
useValue: {
invalidateClaim: jest.fn(),
},
},
{
provide: RedisService,
useValue: {},
},
{
provide: AggregationService,
useValue: {
aggregate: jest.fn().mockReturnValue({
confidence: 60,
status: 'VERIFIED_TRUE',
}),
},
},
],
}).compile();

service = module.get<JobsService>(JobsService);
processor = module.get<JobsProcessor>(JobsProcessor);
stakeRepo = module.get<Repository<Stake>>(getRepositoryToken(Stake));
walletRepo = module.get<Repository<Wallet>>(getRepositoryToken(Wallet));
claimRepo = module.get<Repository<Claim>>(getRepositoryToken(Claim));
userRepo = module.get<Repository<User>>(getRepositoryToken(User));
});

it('should be defined', () => {
expect(service).toBeDefined();
expect(processor).toBeDefined();
});

describe('onModuleInit', () => {
it('should clear old repeatable jobs and schedule new ones', async () => {
await service.onModuleInit();

expect(queueMock.getRepeatableJobs).toHaveBeenCalled();
expect(queueMock.removeRepeatableByKey).toHaveBeenCalledTimes(2);
expect(queueMock.removeRepeatableByKey).toHaveBeenNthCalledWith(1, 'old-scores-key');
expect(queueMock.removeRepeatableByKey).toHaveBeenNthCalledWith(2, 'old-reputation-key');

expect(queueMock.add).toHaveBeenCalledTimes(2);
expect(queueMock.add).toHaveBeenNthCalledWith(1, 'compute-scores', {}, expect.any(Object));
expect(queueMock.add).toHaveBeenNthCalledWith(2, 'compute-reputation', {}, expect.any(Object));
});
});

describe('JobsProcessor', () => {
it('should invoke computeScores when processing compute-scores job', async () => {
const computeScoresSpy = jest.spyOn(service, 'computeScores').mockResolvedValue(undefined);

const mockJob = {
id: '1',
name: 'compute-scores',
data: {},
} as Job;

const result = await processor.process(mockJob);

expect(computeScoresSpy).toHaveBeenCalled();
expect(result).toEqual({ success: true });
});

it('should invoke computeReputation when processing compute-reputation job', async () => {
const computeReputationSpy = jest.spyOn(service, 'computeReputation').mockResolvedValue(undefined);

const mockJob = {
id: '2',
name: 'compute-reputation',
data: {},
} as Job;

const result = await processor.process(mockJob);

expect(computeReputationSpy).toHaveBeenCalled();
expect(result).toEqual({ success: true });
});

it('should throw error for unknown job name', async () => {
const mockJob = {
id: '3',
name: 'unknown-job',
data: {},
} as Job;

await expect(processor.process(mockJob)).rejects.toThrow('Unknown job name: unknown-job');
});
});
});
Loading
Loading