Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .claude/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"permissions": {
"allow": [
"Bash(npm test *)",
"Bash(npm install *)",
"Bash(npx prisma *)",
"Bash(npx tsc *)",
"Bash(git add *)"
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AddUniqueConstraint
ALTER TABLE "StreamEvent" ADD CONSTRAINT "StreamEvent_transactionHash_eventType_key" UNIQUE ("transactionHash", "eventType");
1 change: 1 addition & 0 deletions backend/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,5 @@ model StreamEvent {
@@index([transactionHash])
@@index([createdAt])
@@index([streamId, createdAt])
@@unique([transactionHash, eventType])
}
4 changes: 4 additions & 0 deletions backend/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { swaggerSpec } from './config/swagger.js';
import { apiVersionMiddleware, type VersionedRequest } from './middleware/api-version.middleware.js';
import { sandboxMiddleware } from './middleware/sandbox.middleware.js';
import { globalRateLimiter } from './middleware/rate-limiter.middleware.js';
import { requestIdMiddleware } from './middleware/requestId.js';
import v1Routes from './routes/v1/index.js';

import healthRoutes from './routes/health.routes.js';
Expand All @@ -25,6 +26,9 @@ if (!process.env.CORS_ALLOWED_ORIGINS && !isProduction) {
// Apply global rate limiter first
app.use(globalRateLimiter);

// Request ID tracing
app.use(requestIdMiddleware);

app.disable('x-powered-by');

// Helmet-equivalent core headers without external dependency.
Expand Down
4 changes: 3 additions & 1 deletion backend/src/controllers/sse.controller.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Request, Response } from 'express';
import { sseService } from '../services/sse.service.js';
import { prisma } from '../lib/prisma.js';
import { requestContext } from '../logger.js';
import type { AuthenticatedRequest } from '../types/auth.types.js';
import { z } from 'zod';

Expand Down Expand Up @@ -72,7 +73,8 @@ export const subscribe = async (req: Request, res: Response) => {
'X-Accel-Buffering': 'no',
});

res.write(`data: ${JSON.stringify({ type: 'connected', clientId })}\n\n`);
const requestId = requestContext.getStore()?.requestId;
res.write(`data: ${JSON.stringify({ type: 'connected', clientId, requestId })}\n\n`);

sseService.addClient(clientId, res, subscriptions, sourceIp);
} catch (error: any) {
Expand Down
8 changes: 8 additions & 0 deletions backend/src/logger.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import { AsyncLocalStorage } from 'async_hooks';
import { createLogger, format, transports } from 'winston';

export const requestContext = new AsyncLocalStorage<{ requestId: string }>();

const logger = createLogger({
level: process.env.LOG_LEVEL || 'info',
format: format.combine(
format.timestamp(),
format((info) => {
const ctx = requestContext.getStore();
if (ctx?.requestId) info.requestId = ctx.requestId;
return info;
})(),
format.json(),
),
transports: [new transports.Console()],
Expand Down
32 changes: 32 additions & 0 deletions backend/src/middleware/requestId.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { randomUUID } from 'crypto';
import type { Request, Response, NextFunction } from 'express';
import logger, { requestContext } from '../logger.js';

const MAX_REQUEST_ID_LENGTH = 128;

export function requestIdMiddleware(req: Request, res: Response, next: NextFunction): void {
const header = req.headers['x-request-id'];
const requestId =
typeof header === 'string' && header.length > 0 && header.length <= MAX_REQUEST_ID_LENGTH
? header
: randomUUID();

res.setHeader('X-Request-ID', requestId);

const startMs = Date.now();

res.on('finish', () => {
logger.info('response sent', {
method: req.method,
path: req.path,
status: res.statusCode,
durationMs: Date.now() - startMs,
requestId,
});
});

requestContext.run({ requestId }, () => {
logger.info('request received', { method: req.method, path: req.path, requestId });
next();
});
}
180 changes: 120 additions & 60 deletions backend/src/workers/soroban-event-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,17 +355,27 @@
},
});

await tx.streamEvent.create({
data: {
streamId,
eventType: 'CREATED',
amount: depositedAmount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp: startTime,
metadata: JSON.stringify({ tokenAddress, ratePerSecond }),
},
const existingEvent = await tx.streamEvent.findUnique({

Check failure on line 358 in backend/src/workers/soroban-event-worker.ts

View workflow job for this annotation

GitHub Actions / Backend npm test

src/__tests__/integration/streams.test.ts > Stream Lifecycle Integration Tests > Indexer processes stream_created event -> stream appears in GET /v1/streams/{id}

TypeError: tx.streamEvent.findUnique is not a function ❯ src/workers/soroban-event-worker.ts:358:50
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'CREATED' } },
select: { id: true },
});
if (existingEvent) {
logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=CREATED`);
} else {
await tx.streamEvent.upsert({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'CREATED' } },
create: {
streamId,
eventType: 'CREATED',
amount: depositedAmount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp: startTime,
metadata: JSON.stringify({ tokenAddress, ratePerSecond }),
},
update: {},
});
}
});

sseService.broadcastToStream(String(streamId), 'stream.created', {
Expand Down Expand Up @@ -414,17 +424,27 @@
},
});

await tx.streamEvent.create({
data: {
streamId,
eventType: 'TOPPED_UP',
amount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ newDepositedAmount }),
},
const existingEvent = await tx.streamEvent.findUnique({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'TOPPED_UP' } },
select: { id: true },
});
if (existingEvent) {
logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=TOPPED_UP`);
} else {
await tx.streamEvent.upsert({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'TOPPED_UP' } },
create: {
streamId,
eventType: 'TOPPED_UP',
amount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ newDepositedAmount }),
},
update: {},
});
}
});

sseService.broadcastToStream(String(streamId), 'stream.topped_up', {
Expand Down Expand Up @@ -470,17 +490,27 @@
},
});

await tx.streamEvent.create({
data: {
streamId,
eventType: 'WITHDRAWN',
amount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ recipient }),
},
const existingEvent = await tx.streamEvent.findUnique({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } },
select: { id: true },
});
if (existingEvent) {
logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=WITHDRAWN`);
} else {
await tx.streamEvent.upsert({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } },
create: {
streamId,
eventType: 'WITHDRAWN',
amount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ recipient }),
},
update: {},
});
}
});

sseService.broadcastToStream(String(streamId), 'stream.withdrawn', {
Expand Down Expand Up @@ -518,17 +548,27 @@
},
});

await tx.streamEvent.create({
data: {
streamId,
eventType: 'CANCELLED',
amount: refundedAmount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ amountWithdrawn, refundedAmount }),
},
const existingEvent = await tx.streamEvent.findUnique({

Check failure on line 551 in backend/src/workers/soroban-event-worker.ts

View workflow job for this annotation

GitHub Actions / Backend npm test

src/__tests__/integration/streams.test.ts > Stream Lifecycle Integration Tests > Indexer processes stream_cancelled -> stream isActive = false

TypeError: tx.streamEvent.findUnique is not a function ❯ src/workers/soroban-event-worker.ts:551:50
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'CANCELLED' } },
select: { id: true },
});
if (existingEvent) {
logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=CANCELLED`);
} else {
await tx.streamEvent.upsert({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'CANCELLED' } },
create: {
streamId,
eventType: 'CANCELLED',
amount: refundedAmount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ amountWithdrawn, refundedAmount }),
},
update: {},
});
}
});

sseService.broadcastToStream(String(streamId), 'stream.cancelled', {
Expand Down Expand Up @@ -566,17 +606,27 @@
},
});

await tx.streamEvent.create({
data: {
streamId,
eventType: 'COMPLETED',
amount: totalWithdrawn,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ recipient }),
},
const existingEvent = await tx.streamEvent.findUnique({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'COMPLETED' } },
select: { id: true },
});
if (existingEvent) {
logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=COMPLETED`);
} else {
await tx.streamEvent.upsert({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'COMPLETED' } },
create: {
streamId,
eventType: 'COMPLETED',
amount: totalWithdrawn,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ recipient }),
},
update: {},
});
}
});

sseService.broadcastToStream(String(streamId), 'stream.completed', {
Expand Down Expand Up @@ -605,20 +655,30 @@
const token = decodeAddress(body['token']);
const timestamp = Math.floor(Date.now() / 1000);

await prisma.streamEvent.create({
data: {
streamId,
eventType: 'FEE_COLLECTED',
amount: feeAmount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ treasury, token }),
},
const existingEvent = await prisma.streamEvent.findUnique({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'FEE_COLLECTED' } },
select: { id: true },
});
if (existingEvent) {
logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=FEE_COLLECTED`);
} else {
await prisma.streamEvent.upsert({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'FEE_COLLECTED' } },
create: {
streamId,
eventType: 'FEE_COLLECTED',
amount: feeAmount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ treasury, token }),
},
update: {},
});
}

// Broadcast to admin channel for treasury reporting
sseService.broadcastToAdmin('stream.fee_collected', {

Check failure on line 681 in backend/src/workers/soroban-event-worker.ts

View workflow job for this annotation

GitHub Actions / Backend npm test

tests/soroban-event-worker.test.ts > SorobanEventWorker > Event processing idempotency > should handle duplicate fee collection events

TypeError: sseService.broadcastToAdmin is not a function ❯ SorobanEventWorker.handleFeeCollected src/workers/soroban-event-worker.ts:681:16 ❯ tests/soroban-event-worker.test.ts:197:7
streamId,
treasury,
feeAmount,
Expand Down
Loading
Loading