@@ -8,13 +8,15 @@ import {
88 EDGE_OPERATIONS ,
99 EDGES_OPERATIONS ,
1010 OPERATION_TARGETS ,
11+ SUBBLOCK_OPERATIONS ,
1112 SUBFLOW_OPERATIONS ,
1213 VARIABLE_OPERATIONS ,
1314 WORKFLOW_OPERATIONS ,
1415} from '@sim/realtime-protocol/constants'
1516import { getActiveWorkflowContext } from '@sim/workflow-authz'
1617import { loadWorkflowFromNormalizedTablesRaw } from '@sim/workflow-persistence/load'
1718import { mergeSubBlockValues } from '@sim/workflow-persistence/subblocks'
19+ import { isWorkflowBlockProtected } from '@sim/workflow-types/workflow'
1820import { and , eq , inArray , isNull , or , sql } from 'drizzle-orm'
1921import { drizzle } from 'drizzle-orm/postgres-js'
2022import postgres from 'postgres'
@@ -46,26 +48,6 @@ interface DbBlockRef {
4648 data : unknown
4749}
4850
49- /**
50- * Checks if a block is protected (locked or inside a locked ancestor).
51- * Works with raw DB records.
52- */
53- function isDbBlockProtected ( blockId : string , blocksById : Record < string , DbBlockRef > ) : boolean {
54- const block = blocksById [ blockId ]
55- if ( ! block ) return false
56- if ( block . locked ) return true
57- const visited = new Set < string > ( )
58- let parentId = ( block . data as Record < string , unknown > | null ) ?. parentId as string | undefined
59- while ( parentId && ! visited . has ( parentId ) ) {
60- visited . add ( parentId )
61- if ( blocksById [ parentId ] ?. locked ) return true
62- parentId = ( blocksById [ parentId ] ?. data as Record < string , unknown > | null ) ?. parentId as
63- | string
64- | undefined
65- }
66- return false
67- }
68-
6951/**
7052 * Finds all descendant block IDs of a container (recursive).
7153 * Works with raw DB block arrays.
@@ -251,6 +233,9 @@ export async function persistWorkflowOperation(workflowId: string, operation: an
251233 case OPERATION_TARGETS . SUBFLOW :
252234 await handleSubflowOperationTx ( tx , workflowId , op , payload )
253235 break
236+ case OPERATION_TARGETS . SUBBLOCK :
237+ await handleSubblockOperationTx ( tx , workflowId , op , payload )
238+ break
254239 case OPERATION_TARGETS . VARIABLE :
255240 await handleVariableOperationTx ( tx , workflowId , op , payload )
256241 break
@@ -876,7 +861,7 @@ async function handleBlocksOperationTx(
876861 )
877862
878863 // Filter out protected blocks from deletion request
879- const deletableIds = ids . filter ( ( id ) => ! isDbBlockProtected ( id , blocksById ) )
864+ const deletableIds = ids . filter ( ( id ) => ! isWorkflowBlockProtected ( id , blocksById ) )
880865 if ( deletableIds . length === 0 ) {
881866 logger . info ( 'All requested blocks are protected, skipping deletion' )
882867 return
@@ -991,14 +976,14 @@ async function handleBlocksOperationTx(
991976 // Collect all blocks to toggle including descendants of containers
992977 for ( const id of blockIds ) {
993978 const block = blocksById [ id ]
994- if ( ! block || isDbBlockProtected ( id , blocksById ) ) continue
979+ if ( ! block || isWorkflowBlockProtected ( id , blocksById ) ) continue
995980
996981 blocksToToggle . add ( id )
997982
998983 // If it's a loop or parallel, also include all non-locked descendants
999984 if ( block . type === 'loop' || block . type === 'parallel' ) {
1000985 for ( const descId of findDbDescendants ( id , allBlocks ) ) {
1001- if ( ! isDbBlockProtected ( descId , blocksById ) ) {
986+ if ( ! isWorkflowBlockProtected ( descId , blocksById ) ) {
1002987 blocksToToggle . add ( descId )
1003988 }
1004989 }
@@ -1053,7 +1038,7 @@ async function handleBlocksOperationTx(
10531038
10541039 // Filter to only toggle handles on unprotected blocks
10551040 const blocksToToggle = blockIds . filter (
1056- ( id ) => blocksById [ id ] && ! isDbBlockProtected ( id , blocksById )
1041+ ( id ) => blocksById [ id ] && ! isWorkflowBlockProtected ( id , blocksById )
10571042 )
10581043 if ( blocksToToggle . length === 0 ) {
10591044 logger . info ( 'All requested blocks are protected, skipping handles toggle' )
@@ -1165,13 +1150,13 @@ async function handleBlocksOperationTx(
11651150 if ( ! id ) continue
11661151
11671152 // Skip protected blocks (locked or inside locked container)
1168- if ( isDbBlockProtected ( id , blocksById ) ) {
1153+ if ( isWorkflowBlockProtected ( id , blocksById ) ) {
11691154 logger . info ( `Skipping block ${ id } parent update - block is protected` )
11701155 continue
11711156 }
11721157
11731158 // Skip if trying to move into a locked container (or any of its ancestors)
1174- if ( parentId && isDbBlockProtected ( parentId , blocksById ) ) {
1159+ if ( parentId && isWorkflowBlockProtected ( parentId , blocksById ) ) {
11751160 logger . info ( `Skipping block ${ id } parent update - target parent ${ parentId } is protected` )
11761161 continue
11771162 }
@@ -1295,7 +1280,7 @@ async function handleEdgeOperationTx(tx: any, workflowId: string, operation: str
12951280 }
12961281 }
12971282
1298- if ( isDbBlockProtected ( payload . target , blocksById ) ) {
1283+ if ( isWorkflowBlockProtected ( payload . target , blocksById ) ) {
12991284 logger . info ( `Skipping edge add - target block is protected` )
13001285 break
13011286 }
@@ -1383,7 +1368,7 @@ async function handleEdgeOperationTx(tx: any, workflowId: string, operation: str
13831368 }
13841369 }
13851370
1386- if ( isDbBlockProtected ( edgeToRemove . targetBlockId , blocksById ) ) {
1371+ if ( isWorkflowBlockProtected ( edgeToRemove . targetBlockId , blocksById ) ) {
13871372 logger . info ( `Skipping edge remove - target block is protected` )
13881373 break
13891374 }
@@ -1494,7 +1479,7 @@ async function handleEdgesOperationTx(
14941479 }
14951480
14961481 const safeEdgeIds = edgesToRemove
1497- . filter ( ( e : EdgeToRemove ) => ! isDbBlockProtected ( e . targetBlockId , blocksById ) )
1482+ . filter ( ( e : EdgeToRemove ) => ! isWorkflowBlockProtected ( e . targetBlockId , blocksById ) )
14981483 . map ( ( e : EdgeToRemove ) => e . id )
14991484
15001485 if ( safeEdgeIds . length === 0 ) {
@@ -1581,7 +1566,7 @@ async function handleEdgesOperationTx(
15811566
15821567 // Filter edges - only add edges where target block is not protected
15831568 const safeEdges = ( edges as Array < Record < string , unknown > > ) . filter (
1584- ( e ) => ! isDbBlockProtected ( e . target as string , blocksById )
1569+ ( e ) => ! isWorkflowBlockProtected ( e . target as string , blocksById )
15851570 )
15861571
15871572 if ( safeEdges . length === 0 ) {
@@ -1734,6 +1719,86 @@ async function handleSubflowOperationTx(
17341719 }
17351720}
17361721
1722+ function valuesEqual ( left : unknown , right : unknown ) : boolean {
1723+ return JSON . stringify ( left ) === JSON . stringify ( right )
1724+ }
1725+
1726+ // Subblock operations - targeted value updates without replacing workflow state
1727+ async function handleSubblockOperationTx (
1728+ tx : any ,
1729+ workflowId : string ,
1730+ operation : string ,
1731+ payload : any
1732+ ) {
1733+ switch ( operation ) {
1734+ case SUBBLOCK_OPERATIONS . BATCH_UPDATE : {
1735+ const updates = payload . updates
1736+ if ( ! Array . isArray ( updates ) || updates . length === 0 ) {
1737+ return
1738+ }
1739+
1740+ const allBlocks = await tx
1741+ . select ( {
1742+ id : workflowBlocks . id ,
1743+ subBlocks : workflowBlocks . subBlocks ,
1744+ locked : workflowBlocks . locked ,
1745+ data : workflowBlocks . data ,
1746+ } )
1747+ . from ( workflowBlocks )
1748+ . where ( eq ( workflowBlocks . workflowId , workflowId ) )
1749+
1750+ type SubblockUpdateBlockRecord = ( typeof allBlocks ) [ number ]
1751+ const blocksById : Record < string , SubblockUpdateBlockRecord > = Object . fromEntries (
1752+ allBlocks . map ( ( block : SubblockUpdateBlockRecord ) => [ block . id , block ] )
1753+ )
1754+
1755+ for ( const update of updates ) {
1756+ const { blockId, subblockId, value, expectedValue } = update
1757+ if ( ! blockId || ! subblockId ) {
1758+ throw new Error ( 'Missing required fields for subblock batch update' )
1759+ }
1760+
1761+ const block = blocksById [ blockId ]
1762+ if ( ! block ) {
1763+ throw new Error ( `Block ${ blockId } not found` )
1764+ }
1765+
1766+ if ( isWorkflowBlockProtected ( blockId , blocksById ) ) {
1767+ throw new Error ( `Block ${ blockId } is locked or inside a locked container` )
1768+ }
1769+
1770+ const subBlocks = { ...( ( block . subBlocks as Record < string , any > ) || { } ) }
1771+ const currentSubBlock = subBlocks [ subblockId ]
1772+ const currentValue = currentSubBlock ?. value
1773+ if ( expectedValue !== undefined && ! valuesEqual ( currentValue , expectedValue ) ) {
1774+ throw new Error ( `Subblock ${ blockId } .${ subblockId } changed since replacement was planned` )
1775+ }
1776+
1777+ subBlocks [ subblockId ] = currentSubBlock
1778+ ? { ...currentSubBlock , value }
1779+ : { id : subblockId , type : 'unknown' , value }
1780+
1781+ await tx
1782+ . update ( workflowBlocks )
1783+ . set ( {
1784+ subBlocks,
1785+ updatedAt : new Date ( ) ,
1786+ } )
1787+ . where ( and ( eq ( workflowBlocks . id , blockId ) , eq ( workflowBlocks . workflowId , workflowId ) ) )
1788+
1789+ blocksById [ blockId ] = { ...block , subBlocks }
1790+ }
1791+
1792+ logger . debug ( `Batch updated ${ updates . length } subblocks for workflow ${ workflowId } ` )
1793+ break
1794+ }
1795+
1796+ default :
1797+ logger . warn ( `Unknown subblock operation: ${ operation } ` )
1798+ throw new Error ( `Unsupported subblock operation: ${ operation } ` )
1799+ }
1800+ }
1801+
17371802// Variable operations - updates workflow.variables JSON field
17381803async function handleVariableOperationTx (
17391804 tx : any ,
0 commit comments