Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 167 additions & 103 deletions src/__tests__/sse-bridge.test.ts
Original file line number Diff line number Diff line change
@@ -1,166 +1,230 @@
/**
* Tests for services/sse-bridge.ts — SSE endpoint auth, tenant scoping, connection limiting.
*
* Issue #4393: /sse endpoint was unauthenticated and unscoped.
*/
import { describe, it, expect, vi, beforeEach } from 'vitest';
import type { EventBus, BusEvent, BusEventHandler } from '../event-bus.js';

class MockEventBus implements EventBus {
handlers = new Map<string, Set<BusEventHandler>>();
events: BusEvent[] = [];
nextId = 1;

publish(channel: string, type: string, data: Record<string, unknown>): number {
const id = this.nextId++;
const ev: BusEvent = { channel, id, type, timestamp: new Date().toISOString(), data };
this.events.push(ev);
for (const [pattern, handlers] of this.handlers.entries()) {
if (pattern === channel || pattern.includes('*')) {
for (const h of handlers) h(ev);
}
}
return id;
}

subscribe(channel: string, handler: BusEventHandler): () => void {
let set = this.handlers.get(channel);
if (!set) { set = new Set(); this.handlers.set(channel, set); }
set.add(handler);
return () => set!.delete(handler);
}

async replaySince(channel: string, lastEventId: number): Promise<BusEvent[]> {
return this.events.filter(e => e.channel === channel && e.id > lastEventId);
}

destroy(): void {
this.handlers.clear();
}
import { registerSSEBridge } from '../services/sse-bridge.js';
import type { RouteContext } from '../routes/context.js';

// Mock isGlobalEventVisibleToRequest — always visible in basic tests
vi.mock('../routes/events.js', () => ({
isGlobalEventVisibleToRequest: vi.fn(() => true),
}));

function makeRouteCtx(overrides: Record<string, any> = {}): RouteContext {
return {
sessions: {
getSession: vi.fn(),
listSessions: vi.fn(() => []),
approve: vi.fn(),
} as any,
auth: {} as any,
quotas: {} as any,
config: {} as any,
metrics: {} as any,
monitor: {} as any,
eventBus: {
subscribeGlobal: vi.fn(() => vi.fn()),
getGlobalEventsSince: vi.fn(() => []),
} as any,
channels: {} as any,
jsonlWatcher: {} as any,
pipelines: {} as any,
toolRegistry: {} as any,
getAuditLogger: vi.fn(() => undefined),
alertManager: {} as any,
sseLimiter: {
acquire: vi.fn(() => ({ allowed: true, connectionId: 'c1', current: 1, limit: 10 })),
release: vi.fn(),
registerWriter: vi.fn(),
unregisterWriter: vi.fn(),
} as any,
memoryBridge: null,
requestKeyMap: new Map(),
validateWorkDir: vi.fn(async () => '/tmp'),
draining: false,
...overrides,
} as any;
}

function createMockFastify() {
const routes: Array<{ method: string; url: string; handler: (req: any, reply: any) => void }> = [];
const routes: Array<{ method: string; url: string; handler: (req: any, reply: any) => any }> = [];
return {
get: vi.fn((url: string, opts: any, handler: (req: any, reply: any) => void) => {
// Fastify .get(url, opts, handler) or .get(url, handler)
const actualHandler = typeof opts === 'function' ? opts : handler;
get: vi.fn((url: string, optsOrHandler: any, handler?: (req: any, reply: any) => any) => {
const actualHandler = typeof optsOrHandler === 'function' ? optsOrHandler : handler!;
routes.push({ method: 'GET', url, handler: actualHandler });
}),
_routes: routes,
} as any;
}

function createMockReqReply(query: Record<string, string> = {}) {
function createMockReqReply(overrides: Record<string, any> = {}) {
const written: string[] = [];
const headers: Record<string, string> = {};
const closeHandlers: Array<() => void> = [];
const raw = {
setHeader: vi.fn((k: string, v: string) => { headers[k] = v; }),
writeHead: vi.fn((_status: number, hdrs: Record<string, string>) => { Object.assign(headers, hdrs); }),
write: vi.fn((data: string) => { written.push(data); return true; }),
_written: written,
_headers: headers,
};
const closeHandlers: Array<() => void> = [];
return {
request: { query, raw: { on: vi.fn((event: string, handler: () => void) => { if (event === 'close') closeHandlers.push(handler); }) }, id: 'test-req' },
reply: { raw, _closeHandlers: closeHandlers },
request: {
ip: '127.0.0.1',
id: 'test-req',
tenantId: undefined,
authKeyId: undefined,
authRole: undefined,
raw: { on: vi.fn((event: string, handler: () => void) => { if (event === 'close') closeHandlers.push(handler); }) },
...overrides,
},
reply: {
raw,
status: vi.fn(function(this: any, _code: number) { return this; }),
send: vi.fn(function(this: any, _body: any) { return this; }),
_closeHandlers: closeHandlers,
_written: written,
},
written,
};
}

describe('SSE Bridge', () => {
let eventBus: MockEventBus;
describe('SSE Bridge (#4393)', () => {
let routeCtx: RouteContext;
let mockFastify: ReturnType<typeof createMockFastify>;

beforeEach(() => {
eventBus = new MockEventBus();
vi.clearAllMocks();
routeCtx = makeRouteCtx();
mockFastify = createMockFastify();
});

it('registers /sse route on the Fastify server', async () => {
const { createSSEBridge } = await import('../services/sse-bridge.js');
const bridge = createSSEBridge(eventBus, mockFastify as any);
bridge.register(mockFastify as any);
expect(mockFastify.get).toHaveBeenCalled();
bridge.destroy();
it('registers /v1/sse route (not /sse)', () => {
registerSSEBridge(mockFastify, routeCtx);
expect(mockFastify.get).toHaveBeenCalledWith('/v1/sse', expect.any(Function));
});

it('sends SSE events to connected clients', async () => {
const { createSSEBridge } = await import('../services/sse-bridge.js');
const bridge = createSSEBridge(eventBus, mockFastify as any);
bridge.register(mockFastify as any);
it('rejects connection when SSE limiter denies (per-IP limit)', async () => {
(routeCtx.sseLimiter.acquire as any).mockReturnValue({
allowed: false,
reason: 'per_ip_limit',
current: 11,
limit: 10,
});

const { request, reply, written } = createMockReqReply();
registerSSEBridge(mockFastify, routeCtx);
const handler = mockFastify._routes[0].handler;
await handler(request, reply);

eventBus.publish('global', 'test', { foo: 'bar' });
const { request, reply } = createMockReqReply();

const output = written.join('');
expect(output).toContain('event: test');
expect(output).toContain('"foo":"bar"');
await handler(request, reply);

bridge.destroy();
expect(reply.status).toHaveBeenCalledWith(429);
expect(reply.send).toHaveBeenCalledWith(expect.objectContaining({ reason: 'per_ip_limit' }));
});

it('replays events when lastEventId is provided', async () => {
const { createSSEBridge } = await import('../services/sse-bridge.js');
const bridge = createSSEBridge(eventBus, mockFastify as any);
bridge.register(mockFastify as any);

eventBus.publish('global', 'before1', {});
eventBus.publish('global', 'before2', {});
it('rejects connection when SSE limiter denies (global limit)', async () => {
(routeCtx.sseLimiter.acquire as any).mockReturnValue({
allowed: false,
reason: 'global_limit',
current: 101,
limit: 100,
});

const { request, reply, written } = createMockReqReply({ lastEventId: '1' });
registerSSEBridge(mockFastify, routeCtx);
const handler = mockFastify._routes[0].handler;
await handler(request, reply);
const { request, reply } = createMockReqReply();

const output = written.join('');
expect(output).toContain('before2');
await handler(request, reply);

bridge.destroy();
expect(reply.status).toHaveBeenCalledWith(503);
});

it('removes client on connection close', async () => {
const { createSSEBridge } = await import('../services/sse-bridge.js');
const bridge = createSSEBridge(eventBus, mockFastify as any);
bridge.register(mockFastify as any);
it('subscribes to global events and forwards to client', async () => {
const unsubscribe = vi.fn();
(routeCtx.eventBus.subscribeGlobal as any).mockReturnValue(unsubscribe);

const { request, reply, written } = createMockReqReply();
registerSSEBridge(mockFastify, routeCtx);
const handler = mockFastify._routes[0].handler;
const { request, reply, written } = createMockReqReply();

await handler(request, reply);

for (const h of reply._closeHandlers) h();
// Should have subscribed
expect(routeCtx.eventBus.subscribeGlobal).toHaveBeenCalledWith(expect.any(Function));

const lenBefore = written.length;
eventBus.publish('global', 'after-close', {});
expect(written.length).toBe(lenBefore);
// Simulate an event
const subscribeHandler = (routeCtx.eventBus.subscribeGlobal as any).mock.calls[0][0];
subscribeHandler({ id: 1, event: 'status.dead', sessionId: 's1' });

bridge.destroy();
});
expect(written.join('')).toContain('status.dead');

it('destroy unsubscribes from EventBus', async () => {
const { createSSEBridge } = await import('../services/sse-bridge.js');
const bridge = createSSEBridge(eventBus, mockFastify as any);
bridge.register(mockFastify as any);
// Cleanup
for (const h of reply._closeHandlers) h();
expect(unsubscribe).toHaveBeenCalled();
expect(routeCtx.sseLimiter.release).toHaveBeenCalledWith('c1');
});

it('sends connected event on successful connection', async () => {
registerSSEBridge(mockFastify, routeCtx);
const handler = mockFastify._routes[0].handler;
const { request, reply, written } = createMockReqReply();
await mockFastify._routes[0].handler(request, reply);

bridge.destroy();
await handler(request, reply);

const lenBefore = written.length;
eventBus.publish('global', 'post-destroy', {});
expect(written.length).toBe(lenBefore);
expect(written.join('')).toContain('connected');
});

it('sets correct SSE headers', async () => {
const { createSSEBridge } = await import('../services/sse-bridge.js');
const bridge = createSSEBridge(eventBus, mockFastify as any);
bridge.register(mockFastify as any);
registerSSEBridge(mockFastify, routeCtx);
const handler = mockFastify._routes[0].handler;
const { request, reply } = createMockReqReply();

await handler(request, reply);

expect(reply.raw.writeHead).toHaveBeenCalledWith(200, expect.objectContaining({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
}));
});

it('returns 500 if subscription fails', async () => {
(routeCtx.eventBus.subscribeGlobal as any).mockImplementation(() => {
throw new Error('subscription failed');
});

registerSSEBridge(mockFastify, routeCtx);
const handler = mockFastify._routes[0].handler;
const { request, reply } = createMockReqReply();
await mockFastify._routes[0].handler(request, reply);

expect(reply.raw._headers['Content-Type']).toBe('text/event-stream');
expect(reply.raw._headers['Cache-Control']).toBe('no-cache');
expect(reply.raw._headers['Connection']).toBe('keep-alive');
await handler(request, reply);

expect(reply.status).toHaveBeenCalledWith(500);
expect(routeCtx.sseLimiter.release).toHaveBeenCalledWith('c1');
});

it('uses request tenant context for event filtering', async () => {
const { isGlobalEventVisibleToRequest } = await import('../routes/events.js');
const mockVisible = vi.mocked(isGlobalEventVisibleToRequest);
mockVisible.mockReturnValue(false); // Block all events

bridge.destroy();
registerSSEBridge(mockFastify, routeCtx);
const handler = mockFastify._routes[0].handler;
const { request, reply, written } = createMockReqReply({
tenantId: 'tenant-42',
authKeyId: 'key-1',
});

await handler(request, reply);

// Fire an event — should be filtered out
const subscribeHandler = (routeCtx.eventBus.subscribeGlobal as any).mock.calls[0][0];
subscribeHandler({ id: 1, sessionId: 's1' });

// The event data should NOT be written (only the initial connected event)
const output = written.join('');
expect(output).not.toContain('"sessionId"');
expect(output).toContain('connected'); // Initial event still sent
});
});
2 changes: 1 addition & 1 deletion src/middleware/auth-setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export function setupAuth(app: FastifyInstance, ctx: AppContext): void {
}
}

const isSSERoute = /^\/v1\/events$|^\/v1\/sessions\/[^/]+\/(events|stream)$/.test(urlPath);
const isSSERoute = /^\/v1\/(events|sse)$|^\/v1\/sessions\/[^/]+\/(events|stream)$/.test(urlPath);
let token: string | undefined;
const header = req.headers.authorization;
if (header?.startsWith('Bearer ')) {
Expand Down
9 changes: 3 additions & 6 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ import { SessionEventBus } from './events.js';

import { SSEConnectionLimiter } from './sse-limiter.js';
import { PipelineManager } from './pipeline.js';
import { createSSEBridge } from './services/sse-bridge.js';
import { LocalEventBus } from './local-event-bus.js';
import { registerSSEBridge } from './services/sse-bridge.js';
import { ToolRegistry } from './tool-registry.js';
import {
AuthManager,
Expand Down Expand Up @@ -546,10 +545,8 @@ new JsonFileBackend(path.join(ctx.config.stateDir, 'analytics-cache.json')),
requestKeyMap,
});

// Wire SSE bridge (EventBus -> /sse) so dashboard can consume live events
const sseEventBus = new LocalEventBus();
const sseBridge = createSSEBridge(sseEventBus, app);
sseBridge.register(app);
// Issue #4393: Wire SSE bridge (/v1/sse) — auth-protected, tenant-scoped, connection-limited
registerSSEBridge(app, routeCtx);

// Issue #361: Store interval refs so graceful shutdown can clear them
timers.setInterval(() => reapStaleSessions(ctx.config.maxSessionAgeMs, ctx, { logger, eventBus, channels }), ctx.config.reaperIntervalMs);
Expand Down
Loading
Loading