Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,8 @@ RATE_LIMIT_WINDOW_MS=900000
RATE_LIMIT_MAX=100
AUTH_RATE_LIMIT_WINDOW_MS=900000
AUTH_RATE_LIMIT_MAX=20

# Admin API (required in production)
# Generate with: openssl rand -hex 32
# Used to authenticate requests to /api/admin/* endpoints
ADMIN_API_TOKEN=your_admin_api_token_here
5 changes: 4 additions & 1 deletion src/controllers/__tests__/auth.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,10 @@ describe('AuthMiddleware.validateJwt', () => {
token: 'valid.token',
expiresAt: new Date(Date.now() + 60_000),
walletAddress: 'GBTEST',
user: { id: 'user-1', walletAddress: 'GBTEST' },
userId: 'user-1',
id: 'session-1',
network: 'TESTNET',
user: { id: 'user-1', walletAddress: 'GBTEST', isActive: true },
};

beforeEach(() => {
Expand Down
89 changes: 84 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import { type Server } from 'node:http'
import express from 'express'
import cors, { type CorsOptions } from 'cors'
import helmet from 'helmet'
import { config } from './config/env'
import { errorHandler } from './middleware/errorHandler'
import { requestLogger } from './middleware/logger'
import { rateLimiter, authRateLimiter } from './middleware/rateLimiter'

import { logger } from './utils/logger'
import { startAgentLoop } from './agent/loop'
import { startAgentLoop, stopAgentLoop } from './agent/loop'
import { connectDb } from './db'
import { scheduleSessionCleanup } from './jobs/sessionCleanup'
import { startEventListener } from './stellar/events'
import { DeadLetterQueue } from './stellar/dlq'
import { startEventListener, stopEventListener } from './stellar/events'
import healthRouter from './routes/health'
import agentRouter from './routes/agent'
import authRouter from './routes/auth'
Expand Down Expand Up @@ -42,6 +41,10 @@ const serviceStatus: Record<string, ServiceStatus> = {
agentLoop: { ready: false },
}

let isShuttingDown = false
let httpServer: Server | null = null
const REQUEST_DRAIN_TIMEOUT_MS = 30000

function allServicesReady(): boolean {
return Object.values(serviceStatus).every(s => s.ready)
}
Expand Down Expand Up @@ -77,6 +80,14 @@ app.get('/health/live', (_req, res) => {
})

app.get('/health/ready', (_req, res) => {
if (isShuttingDown) {
return res.status(503).json({
status: 'shutting_down',
services: serviceStatus,
timestamp: new Date().toISOString(),
})
}

if (allServicesReady()) {
res.status(200).json({
status: 'ready',
Expand Down Expand Up @@ -112,13 +123,77 @@ app.use(payloadSizeErrorHandler)

// Generic error handler — must always be last
app.use(errorHandler)
// ── Graceful shutdown ────────────────────────────────────────────────────────
//
// On SIGTERM/SIGINT, we:
// 1. Mark as shutting down so readiness probe returns 503
// 2. Close the HTTP server (stops accepting new connections)
// 3. Wait up to REQUEST_DRAIN_TIMEOUT_MS for in-flight requests to complete
// 4. Stop event listener
// 5. Stop agent cron jobs
// 6. Disconnect Prisma
// 7. Exit cleanly

async function gracefulShutdown(signal: string): Promise<void> {
logger.info(`[Shutdown] Received ${signal}, initiating graceful shutdown...`)
isShuttingDown = true

if (!httpServer) {
logger.warn('[Shutdown] No HTTP server to close')
process.exit(0)
}

logger.info('[Shutdown] Closing HTTP server (no new requests accepted)')
httpServer.close(async () => {
logger.info('[Shutdown] HTTP server closed')

try {
logger.info('[Shutdown] Stopping event listener...')
stopEventListener()

logger.info('[Shutdown] Stopping agent loop...')
await stopAgentLoop()

logger.info('[Shutdown] Disconnecting Prisma...')
// Importing here to avoid circular dependency
const db = await import('./db').then(m => m.default)
await db.$disconnect()

logger.info('[Shutdown] ✓ All services stopped gracefully')
process.exit(0)
} catch (error) {
logger.error('[Shutdown] Error during graceful shutdown:', {
error: error instanceof Error ? error.message : String(error),
})
process.exit(1)
}
})

// Force shutdown after timeout
setTimeout(() => {
logger.error('[Shutdown] Timeout reached, forcing shutdown...')
process.exit(1)
}, REQUEST_DRAIN_TIMEOUT_MS)
}

// ── Startup sequence ──────────────────────────────────────────────────────────
//
// The HTTP server does NOT start accepting connections until every critical
// service initialises successfully. If any service fails, we log clearly
// and exit with a nonzero code so process supervisors / K8s restart us.

async function initServices(): Promise<void> {
// 0. Validate production configuration
if (config.nodeEnv === 'production') {
const adminToken = process.env.ADMIN_API_TOKEN
if (!adminToken || adminToken.length < 8) {
const msg = 'ADMIN_API_TOKEN must be set to a strong value in production'
logger.error('[Startup] Configuration validation failed — cannot continue', { error: msg })
throw new Error(msg)
}
logger.info('[Startup] Admin API token configured ✓')
}

// 1. Database
try {
await connectDb()
Expand Down Expand Up @@ -176,11 +251,15 @@ async function main(): Promise<void> {
}

// All services healthy — now accept traffic
app.listen(config.port, () => {
httpServer = app.listen(config.port, () => {
logger.info(`[Startup] HTTP server listening on port ${config.port} ✓`)
logger.info('[Startup] All systems operational — ready to serve requests')
})

// Register graceful shutdown handlers
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'))
process.on('SIGINT', () => gracefulShutdown('SIGINT'))

// Non-critical jobs start after the server is up
scheduleSessionCleanup()
}
Expand Down
16 changes: 14 additions & 2 deletions src/middleware/authenticate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class AuthMiddleware {
const authorization = req.header('Authorization');

if (!authorization) {
res.status(401).json({ error: 'No token provided' });
res.status(401).json({ error: 'Unauthorized' });
return;
}

Expand Down Expand Up @@ -59,9 +59,21 @@ export class AuthMiddleware {
return;
}

// 4. Attach authenticated identity to request
// 4. Reject inactive users
if (!session.user.isActive) {
res.status(401).json({ error: 'User account is inactive' });
return;
}

// 5. Attach authenticated identity to request
req.userId = session.user.id;
req.stellarPubKey = session.walletAddress;
req.auth = {
userId: session.userId,
sessionId: session.id,
walletAddress: session.walletAddress,
network: session.network,
};

next();
} catch (error) {
Expand Down
12 changes: 12 additions & 0 deletions src/middleware/rateLimiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,16 @@ export const authRateLimiter = rateLimit({
message: {
error: 'Too many authentication attempts. Please try again in 15 minutes.'
}
})

// Stricter rate limit for admin endpoints (10 requests per 15 minutes)
export const adminRateLimiter = rateLimit({
windowMs: 15 * 60 * 1000,
max: 10,
standardHeaders: true,
legacyHeaders: false,
message: {
error: 'Too many admin requests. Please try again later.'
},
skip: () => process.env.NODE_ENV === 'test',
})
Loading
Loading