Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 1 addition & 21 deletions apps/api/src/controllers/live.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,10 @@ import {
transformMinimalEvent,
} from '@openpanel/db';
import { setSuperJson } from '@openpanel/json';
import {
psubscribeToPublishedEvent,
subscribeToPublishedEvent,
} from '@openpanel/redis';
import { subscribeToPublishedEvent } from '@openpanel/redis';
import { getProjectAccess } from '@openpanel/trpc';
import { getOrganizationAccess } from '@openpanel/trpc/src/access';

export function getLiveEventInfo(key: string) {
return key.split(':').slice(2) as [string, string];
}

export function wsVisitors(
socket: WebSocket,
req: FastifyRequest<{
Expand All @@ -36,21 +29,8 @@ export function wsVisitors(
}
});

const punsubscribe = psubscribeToPublishedEvent(
'__keyevent@0__:expired',
(key) => {
const [projectId] = getLiveEventInfo(key);
if (projectId && projectId === params.projectId) {
eventBuffer.getActiveVisitorCount(params.projectId).then((count) => {
socket.send(String(count));
});
}
},
);

socket.on('close', () => {
unsubscribe();
punsubscribe();
});
}

Expand Down
69 changes: 44 additions & 25 deletions packages/db/src/buffers/event-buffer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ describe('EventBuffer', () => {
// Get initial count
const initialCount = await eventBuffer.getBufferSize();

// Add event
await eventBuffer.add(event);
// Add event and flush (events are micro-batched)
eventBuffer.add(event);
await eventBuffer.flush();

// Buffer counter should increase by 1
const newCount = await eventBuffer.getBufferSize();
Expand Down Expand Up @@ -109,7 +110,8 @@ describe('EventBuffer', () => {

// Add first screen_view
const count1 = await eventBuffer.getBufferSize();
await eventBuffer.add(view1);
eventBuffer.add(view1);
await eventBuffer.flush();

// Should be stored as "last" but NOT in queue yet
const count2 = await eventBuffer.getBufferSize();
Expand All @@ -124,7 +126,8 @@ describe('EventBuffer', () => {
expect(last1!.createdAt.toISOString()).toBe(view1.created_at);

// Add second screen_view
await eventBuffer.add(view2);
eventBuffer.add(view2);
await eventBuffer.flush();

// Now view1 should be in buffer
const count3 = await eventBuffer.getBufferSize();
Expand All @@ -138,7 +141,8 @@ describe('EventBuffer', () => {
expect(last2!.createdAt.toISOString()).toBe(view2.created_at);

// Add third screen_view
await eventBuffer.add(view3);
eventBuffer.add(view3);
await eventBuffer.flush();

// Now view2 should also be in buffer
const count4 = await eventBuffer.getBufferSize();
Expand Down Expand Up @@ -174,14 +178,16 @@ describe('EventBuffer', () => {

// Add screen_view
const count1 = await eventBuffer.getBufferSize();
await eventBuffer.add(view);
eventBuffer.add(view);
await eventBuffer.flush();

// Should be stored as "last", not in buffer yet
const count2 = await eventBuffer.getBufferSize();
expect(count2).toBe(count1);

// Add session_end
await eventBuffer.add(sessionEnd);
eventBuffer.add(sessionEnd);
await eventBuffer.flush();

// Both should now be in buffer (+2)
const count3 = await eventBuffer.getBufferSize();
Expand All @@ -207,7 +213,8 @@ describe('EventBuffer', () => {
} as any;

const count1 = await eventBuffer.getBufferSize();
await eventBuffer.add(sessionEnd);
eventBuffer.add(sessionEnd);
await eventBuffer.flush();

// Only session_end should be in buffer (+1)
const count2 = await eventBuffer.getBufferSize();
Expand All @@ -224,7 +231,8 @@ describe('EventBuffer', () => {
created_at: new Date().toISOString(),
} as any;

await eventBuffer.add(view);
eventBuffer.add(view);
await eventBuffer.flush();

// Query by profileId
const result = await eventBuffer.getLastScreenView({
Expand All @@ -248,7 +256,8 @@ describe('EventBuffer', () => {
created_at: new Date().toISOString(),
} as any;

await eventBuffer.add(view);
eventBuffer.add(view);
await eventBuffer.flush();

// Query by sessionId
const result = await eventBuffer.getLastScreenView({
Expand All @@ -275,43 +284,47 @@ describe('EventBuffer', () => {
expect(await eventBuffer.getBufferSize()).toBe(0);

// Add regular event
await eventBuffer.add({
eventBuffer.add({
project_id: 'p6',
name: 'event1',
created_at: new Date().toISOString(),
} as any);
await eventBuffer.flush();

expect(await eventBuffer.getBufferSize()).toBe(1);

// Add another regular event
await eventBuffer.add({
eventBuffer.add({
project_id: 'p6',
name: 'event2',
created_at: new Date().toISOString(),
} as any);
await eventBuffer.flush();

expect(await eventBuffer.getBufferSize()).toBe(2);

// Add screen_view (not counted until flushed)
await eventBuffer.add({
eventBuffer.add({
project_id: 'p6',
profile_id: 'u6',
session_id: 'session_6',
name: 'screen_view',
created_at: new Date().toISOString(),
} as any);
await eventBuffer.flush();

// Still 2 (screen_view is pending)
expect(await eventBuffer.getBufferSize()).toBe(2);

// Add another screen_view (first one gets flushed)
await eventBuffer.add({
eventBuffer.add({
project_id: 'p6',
profile_id: 'u6',
session_id: 'session_6',
name: 'screen_view',
created_at: new Date(Date.now() + 1000).toISOString(),
} as any);
await eventBuffer.flush();

// Now 3 (2 regular + 1 flushed screen_view)
expect(await eventBuffer.getBufferSize()).toBe(3);
Expand All @@ -330,8 +343,9 @@ describe('EventBuffer', () => {
created_at: new Date(Date.now() + 1000).toISOString(),
} as any;

await eventBuffer.add(event1);
await eventBuffer.add(event2);
eventBuffer.add(event1);
eventBuffer.add(event2);
await eventBuffer.flush();

expect(await eventBuffer.getBufferSize()).toBe(2);

Expand Down Expand Up @@ -361,12 +375,13 @@ describe('EventBuffer', () => {

// Add 4 events
for (let i = 0; i < 4; i++) {
await eb.add({
eb.add({
project_id: 'p8',
name: `event${i}`,
created_at: new Date(Date.now() + i).toISOString(),
} as any);
}
await eb.flush();

const insertSpy = vi
.spyOn(ch, 'insert')
Expand Down Expand Up @@ -396,7 +411,8 @@ describe('EventBuffer', () => {
created_at: new Date().toISOString(),
} as any;

await eventBuffer.add(event);
eventBuffer.add(event);
await eventBuffer.flush();

const count = await eventBuffer.getActiveVisitorCount('p9');
expect(count).toBeGreaterThanOrEqual(1);
Expand Down Expand Up @@ -439,10 +455,11 @@ describe('EventBuffer', () => {
created_at: new Date(t0 + 2000).toISOString(),
} as any;

await eventBuffer.add(view1a);
await eventBuffer.add(view2a);
await eventBuffer.add(view1b); // Flushes view1a
await eventBuffer.add(view2b); // Flushes view2a
eventBuffer.add(view1a);
eventBuffer.add(view2a);
eventBuffer.add(view1b); // Flushes view1a
eventBuffer.add(view2b); // Flushes view2a
await eventBuffer.flush();

// Should have 2 events in buffer (one from each session)
expect(await eventBuffer.getBufferSize()).toBe(2);
Expand Down Expand Up @@ -470,7 +487,8 @@ describe('EventBuffer', () => {
} as any;

const count1 = await eventBuffer.getBufferSize();
await eventBuffer.add(view);
eventBuffer.add(view);
await eventBuffer.flush();

// Should go directly to buffer (no session_id)
const count2 = await eventBuffer.getBufferSize();
Expand Down Expand Up @@ -498,8 +516,9 @@ describe('EventBuffer', () => {
created_at: new Date(t0 + 1000).toISOString(),
} as any;

await eventBuffer.add(view1);
await eventBuffer.add(view2);
eventBuffer.add(view1);
eventBuffer.add(view2);
await eventBuffer.flush();

// Both sessions should have their own "last"
const lastSession1 = await eventBuffer.getLastScreenView({
Expand Down
Loading