Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 107 additions & 4 deletions apps/mesh/src/api/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import {
createMeshContextFactory,
} from "../core/context-factory";
import type { MeshContext } from "../core/mesh-context";
import { getDb, type MeshDatabase } from "../database";
import { closeDatabase, getDb, type MeshDatabase } from "../database";
import { createEventBus, type EventBus } from "../event-bus";
import {
flushMonitoringData,
meter,
prometheusExporter,
tracer,
Expand Down Expand Up @@ -200,6 +201,7 @@ export interface CreateAppOptions {
*/
export async function createApp(options: CreateAppOptions = {}) {
const database = options.database ?? getDb();
let isShuttingDown = false;

// Clear previous monitoring retention timer (cleanup during HMR)
if (currentRetentionTimer) {
Expand Down Expand Up @@ -341,7 +343,6 @@ export async function createApp(options: CreateAppOptions = {}) {
mcpListCache.teardown();
modelListCache.teardown();
setMcpListCache(null);
natsProvider?.drain().catch(() => {});
};

const app = new Hono<Env>();
Expand Down Expand Up @@ -410,7 +411,7 @@ export async function createApp(options: CreateAppOptions = {}) {
// Health Check & Metrics
// ============================================================================

// Health check endpoint (no auth required)
// Health check endpoint (no auth required) — kept for backwards compatibility
app.get(SYSTEM_PATHS.HEALTH, (c) => {
return c.json({
status: "ok",
Expand All @@ -419,6 +420,55 @@ export async function createApp(options: CreateAppOptions = {}) {
});
});

// Liveness probe — the process is alive and the event loop is not stuck
app.get(SYSTEM_PATHS.HEALTH_LIVE, (c) => {
return c.json({ status: "ok" });
});

// Readiness probe — returns 503 during shutdown so K8s drains traffic before liveness fails,
// and checks that DB and NATS are reachable
app.get(SYSTEM_PATHS.HEALTH_READY, async (c) => {
if (isShuttingDown) {
return c.json({ status: "shutting_down" }, 503);
}

const [dbResult, natsResult] = await Promise.allSettled([
Promise.race([
database.db
.selectFrom("connections")
.select("id")
.limit(1)
.executeTakeFirst(),
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error("timeout")), 2_000),
),
]),
Promise.resolve().then(() => {
if (!natsProvider) return "ok"; // local mode: no NATS
const nc = natsProvider.getConnection();
if (nc.isClosed() || nc.isDraining()) {
throw new Error("NATS connection unavailable");
}
return "ok";
}),
]);

const dbOk = dbResult.status === "fulfilled";
const natsOk = natsResult.status === "fulfilled";
const ready = dbOk && natsOk;

return c.json(
{
status: ready ? "ready" : "not_ready",
checks: {
db: dbOk ? "ok" : "error",
nats: natsOk ? "ok" : "error",
},
},
ready ? 200 : 503,
);
});

// Prometheus metrics endpoint
app.get(SYSTEM_PATHS.METRICS, async (c) => {
try {
Expand Down Expand Up @@ -1278,5 +1328,58 @@ export async function createApp(options: CreateAppOptions = {}) {
);
});

return app;
const markShuttingDown = () => {
isShuttingDown = true;
};

const shutdown = async () => {
console.log("[shutdown] Stopping workers...");

// Phase 1: Stop all workers/consumers in parallel (independent of each other)
await Promise.allSettled([
currentEventBus?.isRunning() ? currentEventBus.stop() : Promise.resolve(),
sseHub.stop(),
currentCronWorkerCleanup
? Promise.resolve(currentCronWorkerCleanup()).finally(() => {
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
currentCronWorkerCleanup = null;
})
: Promise.resolve(),
currentDecopilotCleanup
? Promise.resolve(currentDecopilotCleanup()).finally(() => {
currentDecopilotCleanup = null;
})
: Promise.resolve(),
]);

// Phase 2: Clear timers
if (currentRetentionTimer) {
clearInterval(currentRetentionTimer);
currentRetentionTimer = null;
}

// Phase 3: Drain NATS (after all consumers stopped)
if (natsProvider) {
await natsProvider
.drain()
.catch((err: unknown) =>
console.error("[shutdown] NATS drain error:", err),
);
}

// Phase 4: Flush telemetry
console.log("[shutdown] Flushing telemetry...");
await flushMonitoringData().catch((err: unknown) =>
console.error("[shutdown] Telemetry flush error:", err),
);

// Phase 5: Close database (last — other steps may need DB)
console.log("[shutdown] Closing database...");
await closeDatabase(database).catch((err: unknown) =>
console.error("[shutdown] Database close error:", err),
);

console.log("[shutdown] Cleanup complete.");
};

return Object.assign(app, { markShuttingDown, shutdown });
}
4 changes: 4 additions & 0 deletions apps/mesh/src/api/utils/paths.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
/** System paths that don't require authentication or special handling */
export const SYSTEM_PATHS = {
HEALTH: "/health",
HEALTH_LIVE: "/health/live",
HEALTH_READY: "/health/ready",
METRICS: "/metrics",
} as const;

Expand All @@ -29,6 +31,8 @@ const STATIC_FILE_PATTERN =
function isSystemPath(path: string): boolean {
return (
path === SYSTEM_PATHS.HEALTH ||
path === SYSTEM_PATHS.HEALTH_LIVE ||
path === SYSTEM_PATHS.HEALTH_READY ||
path === SYSTEM_PATHS.METRICS ||
path.startsWith(PATH_PREFIXES.WELL_KNOWN)
);
Expand Down
7 changes: 5 additions & 2 deletions apps/mesh/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ const envSchema = z
OTEL_SERVICE_NAME: z.string().default("mesh"),

// Event Bus & Networking
NATS_URL: z.string().default("nats://localhost:4222"),
NATS_URL: z
.string()
.default("nats://localhost:4222")
.transform((s) => s.split(",").map((u) => u.trim())),

// Config files
CONFIG_PATH: z.string().default("./config.json"),
Expand Down Expand Up @@ -173,7 +176,7 @@ function logConfiguration(e: Env) {
r("OTEL_SERVICE_NAME", e.OTEL_SERVICE_NAME);

sect("Event Bus & Networking");
r("NATS_URL", e.NATS_URL);
r("NATS_URL", e.NATS_URL.join(", "));

sect("Config Files");
r("CONFIG_PATH", e.CONFIG_PATH);
Expand Down
46 changes: 45 additions & 1 deletion apps/mesh/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ if (!process.env.DECO_CLI) {
logConfiguration(env);
}

Bun.serve({
const server = Bun.serve({
// This was necessary because MCP has SSE endpoints (like notification) that disconnects after 10 seconds (default bun idle timeout)
idleTimeout: 0,
port,
Expand Down Expand Up @@ -123,3 +123,47 @@ if (env.DECOCMS_LOCAL_MODE) {
}
});
}

// ============================================================================
// Graceful Shutdown
// ============================================================================

let shuttingDown = false;

async function gracefulShutdown(signal: string) {
if (shuttingDown) return;
shuttingDown = true;
console.log(`\n[shutdown] Received ${signal}, shutting down gracefully...`);

const forceExitTimer = setTimeout(() => {
console.error("[shutdown] Timed out after 55s, forcing exit.");
process.exit(1);
}, 55_000);
forceExitTimer.unref?.();

let exitCode = 0;
try {
// 1. Mark as shutting down — readiness returns 503 immediately
app.markShuttingDown();

// 2. Give K8s time to notice the 503 and stop routing traffic before
// we close connections (~2s is enough for most configurations)
await new Promise((r) => setTimeout(r, 2_000));

// 3. Stop accepting new connections, force-close active ones
// (SSE streams are long-lived and would block graceful drain indefinitely)
await server.stop(true);

// 3. Stop workers, flush telemetry, close DB
await app.shutdown();
} catch (err) {
console.error("[shutdown] Error during shutdown:", err);
exitCode = 1;
}

clearTimeout(forceExitTimer);
process.exit(exitCode);
}

process.on("SIGTERM", () => gracefulShutdown("SIGTERM"));
process.on("SIGINT", () => gracefulShutdown("SIGINT"));
88 changes: 88 additions & 0 deletions deploy/docker-compose/docker-compose.dev.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Local development infrastructure
#
# Starts PostgreSQL and a 3-node NATS JetStream cluster — mirrors production topology.
# The studio server itself runs from source via: bun run --cwd=apps/mesh dev:server
#
# Usage:
# docker compose -f deploy/docker-compose/docker-compose.dev.yml up -d
#
# Connection strings to use when running the server locally:
# DATABASE_URL=postgresql://postgres:postgres@localhost:5432/postgres
# NATS_URL=nats://localhost:4222 (client_advertise propaga os outros nós automaticamente)
#
# Example:
# DECOCMS_LOCAL_MODE=true bun run --cwd=apps/mesh dev:server

services:
# ============================================================================
# PostgreSQL
# ============================================================================
postgres:
image: postgres:15-alpine
container_name: decocms-dev-postgres
restart: unless-stopped
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
ports:
- "5432:5432"
volumes:
- postgres-dev-data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 3s
retries: 10

# ============================================================================
# NATS JetStream cluster (3 nodes — mirrors production)
# ============================================================================
nats-1:
image: nats:latest
container_name: decocms-dev-nats-1
restart: unless-stopped
ports:
- "4222:4222" # client
- "8222:8222" # monitoring (http://localhost:8222)
command: >
-js
-n nats-1
-cluster_name decocms-dev
-client_advertise localhost:4222
-cluster nats://0.0.0.0:6222
-routes nats://nats-2:6222,nats://nats-3:6222
-m 8222

nats-2:
image: nats:latest
container_name: decocms-dev-nats-2
restart: unless-stopped
ports:
- "4223:4222" # client
command: >
-js
-n nats-2
-cluster_name decocms-dev
-client_advertise localhost:4223
-cluster nats://0.0.0.0:6222
-routes nats://nats-1:6222,nats://nats-3:6222

nats-3:
image: nats:latest
container_name: decocms-dev-nats-3
restart: unless-stopped
ports:
- "4224:4222" # client
command: >
-js
-n nats-3
-cluster_name decocms-dev
-client_advertise localhost:4224
-cluster nats://0.0.0.0:6222
-routes nats://nats-1:6222,nats://nats-2:6222


volumes:
postgres-dev-data:
driver: local
2 changes: 1 addition & 1 deletion deploy/helm/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: v2
name: chart-deco-studio
description: A Helm chart for deco Studio self-hosted deployment
type: application
version: 0.1.40
version: 0.1.41
appVersion: "latest"

dependencies:
Expand Down
6 changes: 4 additions & 2 deletions deploy/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,23 @@ securityContext:

livenessProbe:
httpGet:
path: /health
path: /health/live
port: http
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /health
path: /health/ready
port: http
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 4

terminationGracePeriodSeconds: 60

# Optional lifecycle hooks (preStop, postStart)
# lifecycle: {}

Expand Down
Loading