Skip to content

Commit 9504519

Browse files
fix: sync parent update socket side effects
1 parent dc8bea3 commit 9504519

File tree

2 files changed

+289
-26
lines changed

2 files changed

+289
-26
lines changed

apps/sim/socket/handlers/operations.test.ts

Lines changed: 203 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*/
44

55
import { beforeEach, describe, expect, it, vi } from 'vitest'
6+
import { ZodError } from 'zod'
67

78
const { mockPersistWorkflowOperation, mockCheckRolePermission, mockWorkflowOperationParse } =
89
vi.hoisted(() => ({
@@ -26,7 +27,12 @@ vi.mock('@/socket/validation/schemas', () => ({
2627
},
2728
}))
2829

29-
import { BLOCKS_OPERATIONS, EDGES_OPERATIONS, OPERATION_TARGETS } from '@/socket/constants'
30+
import {
31+
BLOCK_OPERATIONS,
32+
BLOCKS_OPERATIONS,
33+
EDGES_OPERATIONS,
34+
OPERATION_TARGETS,
35+
} from '@/socket/constants'
3036
import { setupOperationsHandlers } from '@/socket/handlers/operations'
3137

3238
describe('setupOperationsHandlers', () => {
@@ -696,6 +702,202 @@ describe('setupOperationsHandlers', () => {
696702
)
697703
})
698704

705+
it('broadcasts single block parent side-effect edges to the whole workflow', async () => {
706+
mockPersistWorkflowOperation.mockResolvedValue({
707+
removedEdgeIds: ['edge-removed'],
708+
addedEdges: [
709+
{
710+
id: 'edge-added',
711+
source: 'loop-1',
712+
target: 'block-1',
713+
sourceHandle: 'loop-start-source',
714+
targetHandle: 'target',
715+
type: 'workflowEdge',
716+
},
717+
],
718+
})
719+
720+
const socketEmit = vi.fn()
721+
const socketRoomEmit = vi.fn()
722+
const emitToWorkflow = vi.fn()
723+
const socketHandlers = new Map<string, (data: unknown) => Promise<void>>()
724+
725+
const socket = {
726+
id: 'socket-1',
727+
on: vi.fn((event: string, handler: (data: unknown) => Promise<void>) => {
728+
socketHandlers.set(event, handler)
729+
}),
730+
emit: socketEmit,
731+
to: vi.fn(() => ({
732+
emit: socketRoomEmit,
733+
})),
734+
}
735+
736+
const roomManager = {
737+
io: {} as never,
738+
initialize: vi.fn(),
739+
isReady: vi.fn(() => true),
740+
shutdown: vi.fn(),
741+
addUserToRoom: vi.fn(),
742+
removeUserFromRoom: vi.fn(),
743+
getWorkflowIdForSocket: vi.fn().mockResolvedValue('workflow-1'),
744+
getUserSession: vi.fn().mockResolvedValue({ userId: 'user-1', userName: 'Test User' }),
745+
getWorkflowUsers: vi.fn().mockResolvedValue([
746+
{
747+
socketId: 'socket-1',
748+
userId: 'user-1',
749+
workflowId: 'workflow-1',
750+
userName: 'Test User',
751+
joinedAt: Date.now(),
752+
lastActivity: Date.now(),
753+
role: 'admin',
754+
},
755+
]),
756+
hasWorkflowRoom: vi.fn().mockResolvedValue(true),
757+
updateUserActivity: vi.fn(),
758+
updateRoomLastModified: vi.fn(),
759+
broadcastPresenceUpdate: vi.fn(),
760+
emitToWorkflow,
761+
getUniqueUserCount: vi.fn(),
762+
getTotalActiveConnections: vi.fn(),
763+
handleWorkflowDeletion: vi.fn(),
764+
handleWorkflowRevert: vi.fn(),
765+
handleWorkflowUpdate: vi.fn(),
766+
}
767+
768+
setupOperationsHandlers(socket as never, roomManager)
769+
770+
const workflowOperationHandler = socketHandlers.get('workflow-operation')
771+
772+
await workflowOperationHandler?.({
773+
operationId: 'op-single-parent',
774+
operation: BLOCK_OPERATIONS.UPDATE_PARENT,
775+
target: OPERATION_TARGETS.BLOCK,
776+
payload: { id: 'block-1', parentId: 'loop-1', position: { x: 10, y: 20 } },
777+
timestamp: 123,
778+
})
779+
780+
expect(socketRoomEmit).toHaveBeenCalledWith(
781+
'workflow-operation',
782+
expect.objectContaining({
783+
operation: BLOCK_OPERATIONS.UPDATE_PARENT,
784+
target: OPERATION_TARGETS.BLOCK,
785+
payload: { id: 'block-1', parentId: 'loop-1', position: { x: 10, y: 20 } },
786+
})
787+
)
788+
expect(emitToWorkflow).toHaveBeenNthCalledWith(
789+
1,
790+
'workflow-1',
791+
'workflow-operation',
792+
expect.objectContaining({
793+
operation: EDGES_OPERATIONS.BATCH_REMOVE_EDGES,
794+
target: OPERATION_TARGETS.EDGES,
795+
payload: { ids: ['edge-removed'] },
796+
})
797+
)
798+
expect(emitToWorkflow).toHaveBeenNthCalledWith(
799+
2,
800+
'workflow-1',
801+
'workflow-operation',
802+
expect.objectContaining({
803+
operation: EDGES_OPERATIONS.BATCH_ADD_EDGES,
804+
target: OPERATION_TARGETS.EDGES,
805+
payload: {
806+
edges: [
807+
expect.objectContaining({
808+
id: 'edge-added',
809+
source: 'loop-1',
810+
target: 'block-1',
811+
}),
812+
],
813+
},
814+
})
815+
)
816+
expect(socketEmit).toHaveBeenCalledWith(
817+
'operation-confirmed',
818+
expect.objectContaining({
819+
operationId: 'op-single-parent',
820+
serverTimestamp: expect.any(Number),
821+
})
822+
)
823+
})
824+
825+
it('includes operationId when a zod error happens after parsing', async () => {
826+
const socketEmit = vi.fn()
827+
const socketHandlers = new Map<string, (data: unknown) => Promise<void>>()
828+
829+
const socket = {
830+
id: 'socket-1',
831+
on: vi.fn((event: string, handler: (data: unknown) => Promise<void>) => {
832+
socketHandlers.set(event, handler)
833+
}),
834+
emit: socketEmit,
835+
to: vi.fn(() => ({
836+
emit: vi.fn(),
837+
})),
838+
}
839+
840+
const roomManager = {
841+
io: {} as never,
842+
initialize: vi.fn(),
843+
isReady: vi.fn(() => true),
844+
shutdown: vi.fn(),
845+
addUserToRoom: vi.fn(),
846+
removeUserFromRoom: vi.fn(),
847+
getWorkflowIdForSocket: vi.fn().mockResolvedValue('workflow-1'),
848+
getUserSession: vi.fn().mockResolvedValue({ userId: 'user-1', userName: 'Test User' }),
849+
getWorkflowUsers: vi.fn().mockResolvedValue([
850+
{
851+
socketId: 'socket-1',
852+
userId: 'user-1',
853+
workflowId: 'workflow-1',
854+
userName: 'Test User',
855+
joinedAt: Date.now(),
856+
lastActivity: Date.now(),
857+
role: 'admin',
858+
},
859+
]),
860+
hasWorkflowRoom: vi.fn().mockResolvedValue(true),
861+
updateUserActivity: vi.fn().mockRejectedValue(
862+
new ZodError([
863+
{
864+
code: 'custom',
865+
path: ['payload'],
866+
message: 'Invalid payload',
867+
},
868+
])
869+
),
870+
updateRoomLastModified: vi.fn(),
871+
broadcastPresenceUpdate: vi.fn(),
872+
emitToWorkflow: vi.fn(),
873+
getUniqueUserCount: vi.fn(),
874+
getTotalActiveConnections: vi.fn(),
875+
handleWorkflowDeletion: vi.fn(),
876+
handleWorkflowRevert: vi.fn(),
877+
handleWorkflowUpdate: vi.fn(),
878+
}
879+
880+
setupOperationsHandlers(socket as never, roomManager)
881+
882+
const workflowOperationHandler = socketHandlers.get('workflow-operation')
883+
884+
await workflowOperationHandler?.({
885+
operationId: 'op-zod-after-parse',
886+
operation: BLOCKS_OPERATIONS.BATCH_ADD_BLOCKS,
887+
target: OPERATION_TARGETS.BLOCKS,
888+
payload: { blocks: [], edges: [], loops: {}, parallels: {}, subBlockValues: {} },
889+
timestamp: 123,
890+
})
891+
892+
expect(socketEmit).toHaveBeenCalledWith(
893+
'operation-failed',
894+
expect.objectContaining({
895+
operationId: 'op-zod-after-parse',
896+
error: 'Invalid operation format',
897+
})
898+
)
899+
})
900+
699901
it('does not emit edge side-effect syncs when the operation has no handler support', async () => {
700902
mockPersistWorkflowOperation.mockResolvedValue({
701903
removedEdgeIds: ['edge-removed'],

apps/sim/socket/handlers/operations.ts

Lines changed: 86 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,42 @@ function getPayloadArray(payload: unknown, key: string): unknown[] {
3333
return Array.isArray(value) ? value : []
3434
}
3535

36+
function emitParentUpdateSideEffects(
37+
roomManager: IRoomManager,
38+
workflowId: string,
39+
result: Awaited<ReturnType<typeof persistWorkflowOperation>>,
40+
operationTimestamp: number,
41+
senderId: string,
42+
userId: string,
43+
userName: string
44+
) {
45+
if (result?.removedEdgeIds && result.removedEdgeIds.length > 0) {
46+
roomManager.emitToWorkflow(workflowId, 'workflow-operation', {
47+
operation: EDGES_OPERATIONS.BATCH_REMOVE_EDGES,
48+
target: OPERATION_TARGETS.EDGES,
49+
payload: { ids: result.removedEdgeIds },
50+
timestamp: operationTimestamp,
51+
senderId,
52+
userId,
53+
userName,
54+
metadata: { workflowId, operationId: crypto.randomUUID() },
55+
})
56+
}
57+
58+
if (result?.addedEdges && result.addedEdges.length > 0) {
59+
roomManager.emitToWorkflow(workflowId, 'workflow-operation', {
60+
operation: EDGES_OPERATIONS.BATCH_ADD_EDGES,
61+
target: OPERATION_TARGETS.EDGES,
62+
payload: { edges: result.addedEdges },
63+
timestamp: operationTimestamp,
64+
senderId,
65+
userId,
66+
userName,
67+
metadata: { workflowId, operationId: crypto.randomUUID() },
68+
})
69+
}
70+
}
71+
3672
export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
3773
socket.on('workflow-operation', async (data) => {
3874
const emitOperationError = (
@@ -553,39 +589,63 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
553589
metadata: { workflowId, operationId: crypto.randomUUID() },
554590
})
555591

556-
// Broadcast edge removals if the parent update cleaned up boundary edges
557-
if (result?.removedEdgeIds && result.removedEdgeIds.length > 0) {
558-
roomManager.emitToWorkflow(workflowId, 'workflow-operation', {
559-
operation: EDGES_OPERATIONS.BATCH_REMOVE_EDGES,
560-
target: OPERATION_TARGETS.EDGES,
561-
payload: { ids: result.removedEdgeIds },
562-
timestamp: operationTimestamp,
563-
senderId: socket.id,
564-
userId: session.userId,
565-
userName: session.userName,
566-
metadata: { workflowId, operationId: crypto.randomUUID() },
567-
})
568-
}
592+
emitParentUpdateSideEffects(
593+
roomManager,
594+
workflowId,
595+
result,
596+
operationTimestamp,
597+
socket.id,
598+
session.userId,
599+
session.userName
600+
)
569601

570-
// Broadcast auto-connected edges so clients add them to local state
571-
if (result?.addedEdges && result.addedEdges.length > 0) {
572-
roomManager.emitToWorkflow(workflowId, 'workflow-operation', {
573-
operation: EDGES_OPERATIONS.BATCH_ADD_EDGES,
574-
target: OPERATION_TARGETS.EDGES,
575-
payload: { edges: result.addedEdges },
576-
timestamp: operationTimestamp,
577-
senderId: socket.id,
578-
userId: session.userId,
579-
userName: session.userName,
580-
metadata: { workflowId, operationId: crypto.randomUUID() },
602+
if (operationId) {
603+
socket.emit('operation-confirmed', {
604+
operationId,
605+
serverTimestamp: Date.now(),
606+
appliedPayload: broadcastPayload,
581607
})
582608
}
583609

610+
return
611+
}
612+
613+
if (target === OPERATION_TARGETS.BLOCK && operation === BLOCK_OPERATIONS.UPDATE_PARENT) {
614+
const result = await persistWorkflowOperation(workflowId, {
615+
operation,
616+
target,
617+
payload,
618+
timestamp: operationTimestamp,
619+
userId: session.userId,
620+
})
621+
622+
await roomManager.updateRoomLastModified(workflowId)
623+
624+
socket.to(workflowId).emit('workflow-operation', {
625+
operation,
626+
target,
627+
payload,
628+
timestamp: operationTimestamp,
629+
senderId: socket.id,
630+
userId: session.userId,
631+
userName: session.userName,
632+
metadata: { workflowId, operationId: crypto.randomUUID() },
633+
})
634+
635+
emitParentUpdateSideEffects(
636+
roomManager,
637+
workflowId,
638+
result,
639+
operationTimestamp,
640+
socket.id,
641+
session.userId,
642+
session.userName
643+
)
644+
584645
if (operationId) {
585646
socket.emit('operation-confirmed', {
586647
operationId,
587648
serverTimestamp: Date.now(),
588-
appliedPayload: broadcastPayload,
589649
})
590650
}
591651

@@ -784,6 +844,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
784844
if (error instanceof ZodError) {
785845
logger.error('Invalid operation format:', error.errors)
786846
socket.emit('operation-failed', {
847+
...(operationId ? { operationId } : {}),
787848
error: 'Invalid operation format',
788849
details: error.errors,
789850
})

0 commit comments

Comments
 (0)