@@ -49,20 +49,20 @@ import { SYSTEM_SUBBLOCK_IDS } from '@/triggers/constants'
4949const logger = createLogger ( 'BlockExecutor' )
5050
5151export class BlockExecutor {
52+ private execLogger : Logger
53+
5254 constructor (
5355 private blockHandlers : BlockHandler [ ] ,
5456 private resolver : VariableResolver ,
5557 private contextExtensions : ContextExtensions ,
5658 private state : BlockStateWriter
57- ) { }
58-
59- private loggerFor ( ctx : ExecutionContext ) : Logger {
60- return logger . withMetadata ( {
61- workflowId : ctx . workflowId ,
62- workspaceId : ctx . workspaceId ,
63- executionId : ctx . executionId ,
64- userId : ctx . userId ,
65- requestId : ctx . metadata . requestId ,
59+ ) {
60+ this . execLogger = logger . withMetadata ( {
61+ workflowId : this . contextExtensions . metadata ?. workflowId ,
62+ workspaceId : this . contextExtensions . workspaceId ,
63+ executionId : this . contextExtensions . executionId ,
64+ userId : this . contextExtensions . userId ,
65+ requestId : this . contextExtensions . metadata ?. requestId ,
6666 } )
6767 }
6868
@@ -71,7 +71,6 @@ export class BlockExecutor {
7171 node : DAGNode ,
7272 block : SerializedBlock
7373 ) : Promise < NormalizedBlockOutput > {
74- const execLogger = this . loggerFor ( ctx )
7574 const handler = this . findHandler ( block )
7675 if ( ! handler ) {
7776 throw buildBlockExecutionError ( {
@@ -86,9 +85,9 @@ export class BlockExecutor {
8685
8786 let blockLog : BlockLog | undefined
8887 if ( ! isSentinel ) {
89- blockLog = this . createBlockLog ( execLogger , ctx , node . id , block , node )
88+ blockLog = this . createBlockLog ( ctx , node . id , block , node )
9089 ctx . blockLogs . push ( blockLog )
91- await this . callOnBlockStart ( execLogger , ctx , node , block , blockLog . executionOrder )
90+ await this . callOnBlockStart ( ctx , node , block , blockLog . executionOrder )
9291 }
9392
9493 const startTime = performance . now ( )
@@ -117,7 +116,6 @@ export class BlockExecutor {
117116 } catch ( error ) {
118117 cleanupSelfReference ?.( )
119118 return await this . handleBlockError (
120- execLogger ,
121119 error ,
122120 ctx ,
123121 node ,
@@ -145,7 +143,6 @@ export class BlockExecutor {
145143
146144 if ( ctx . onStream ) {
147145 await this . handleStreamingExecution (
148- execLogger ,
149146 ctx ,
150147 node ,
151148 block ,
@@ -193,7 +190,6 @@ export class BlockExecutor {
193190 block,
194191 } )
195192 await this . callOnBlockComplete (
196- execLogger ,
197193 ctx ,
198194 node ,
199195 block ,
@@ -210,7 +206,6 @@ export class BlockExecutor {
210206 return normalizedOutput
211207 } catch ( error ) {
212208 return await this . handleBlockError (
213- execLogger ,
214209 error ,
215210 ctx ,
216211 node ,
@@ -242,7 +237,6 @@ export class BlockExecutor {
242237 }
243238
244239 private async handleBlockError (
245- execLogger : Logger ,
246240 error : unknown ,
247241 ctx : ExecutionContext ,
248242 node : DAGNode ,
@@ -289,7 +283,7 @@ export class BlockExecutor {
289283 }
290284 }
291285
292- execLogger . error (
286+ this . execLogger . error (
293287 phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed' ,
294288 {
295289 blockId : node . id ,
@@ -304,7 +298,6 @@ export class BlockExecutor {
304298 : undefined
305299 const displayOutput = filterOutputForLog ( block . metadata ?. id || '' , errorOutput , { block } )
306300 await this . callOnBlockComplete (
307- execLogger ,
308301 ctx ,
309302 node ,
310303 block ,
@@ -323,7 +316,7 @@ export class BlockExecutor {
323316 if ( blockLog ) {
324317 blockLog . errorHandled = true
325318 }
326- execLogger . info ( 'Block has error port - returning error output instead of throwing' , {
319+ this . execLogger . info ( 'Block has error port - returning error output instead of throwing' , {
327320 blockId : node . id ,
328321 error : errorMessage ,
329322 } )
@@ -353,7 +346,6 @@ export class BlockExecutor {
353346 }
354347
355348 private createBlockLog (
356- execLogger : Logger ,
357349 ctx : ExecutionContext ,
358350 blockId : string ,
359351 block : SerializedBlock ,
@@ -376,7 +368,7 @@ export class BlockExecutor {
376368 blockName = `${ blockName } (iteration ${ loopScope . iteration } )`
377369 iterationIndex = loopScope . iteration
378370 } else {
379- execLogger . warn ( 'Loop scope not found for block' , { blockId, loopId } )
371+ this . execLogger . warn ( 'Loop scope not found for block' , { blockId, loopId } )
380372 }
381373 }
382374 }
@@ -458,7 +450,6 @@ export class BlockExecutor {
458450 }
459451
460452 private async callOnBlockStart (
461- execLogger : Logger ,
462453 ctx : ExecutionContext ,
463454 node : DAGNode ,
464455 block : SerializedBlock ,
@@ -481,7 +472,7 @@ export class BlockExecutor {
481472 ctx . childWorkflowContext
482473 )
483474 } catch ( error ) {
484- execLogger . warn ( 'Block start callback failed' , {
475+ this . execLogger . warn ( 'Block start callback failed' , {
485476 blockId,
486477 blockType,
487478 error : error instanceof Error ? error . message : String ( error ) ,
@@ -491,7 +482,6 @@ export class BlockExecutor {
491482 }
492483
493484 private async callOnBlockComplete (
494- execLogger : Logger ,
495485 ctx : ExecutionContext ,
496486 node : DAGNode ,
497487 block : SerializedBlock ,
@@ -528,7 +518,7 @@ export class BlockExecutor {
528518 ctx . childWorkflowContext
529519 )
530520 } catch ( error ) {
531- execLogger . warn ( 'Block completion callback failed' , {
521+ this . execLogger . warn ( 'Block completion callback failed' , {
532522 blockId,
533523 blockType,
534524 error : error instanceof Error ? error . message : String ( error ) ,
@@ -608,7 +598,6 @@ export class BlockExecutor {
608598 }
609599
610600 private async handleStreamingExecution (
611- execLogger : Logger ,
612601 ctx : ExecutionContext ,
613602 node : DAGNode ,
614603 block : SerializedBlock ,
@@ -625,15 +614,7 @@ export class BlockExecutor {
625614
626615 const stream = streamingExec . stream
627616 if ( typeof stream . tee !== 'function' ) {
628- await this . forwardStream (
629- execLogger ,
630- ctx ,
631- blockId ,
632- streamingExec ,
633- stream ,
634- responseFormat ,
635- selectedOutputs
636- )
617+ await this . forwardStream ( ctx , blockId , streamingExec , stream , responseFormat , selectedOutputs )
637618 return
638619 }
639620
@@ -652,7 +633,6 @@ export class BlockExecutor {
652633 }
653634
654635 const executorConsumption = this . consumeExecutorStream (
655- execLogger ,
656636 executorStream ,
657637 streamingExec ,
658638 blockId ,
@@ -663,7 +643,7 @@ export class BlockExecutor {
663643 try {
664644 await ctx . onStream ?.( clientStreamingExec )
665645 } catch ( error ) {
666- execLogger . error ( 'Error in onStream callback' , { blockId, error } )
646+ this . execLogger . error ( 'Error in onStream callback' , { blockId, error } )
667647 // Cancel the client stream to release the tee'd buffer
668648 await processedClientStream . cancel ( ) . catch ( ( ) => { } )
669649 }
@@ -673,7 +653,6 @@ export class BlockExecutor {
673653 }
674654
675655 private async forwardStream (
676- execLogger : Logger ,
677656 ctx : ExecutionContext ,
678657 blockId : string ,
679658 streamingExec : { stream : ReadableStream ; execution : any } ,
@@ -694,13 +673,12 @@ export class BlockExecutor {
694673 stream : processedStream ,
695674 } )
696675 } catch ( error ) {
697- execLogger . error ( 'Error in onStream callback' , { blockId, error } )
676+ this . execLogger . error ( 'Error in onStream callback' , { blockId, error } )
698677 await processedStream . cancel ( ) . catch ( ( ) => { } )
699678 }
700679 }
701680
702681 private async consumeExecutorStream (
703- execLogger : Logger ,
704682 stream : ReadableStream ,
705683 streamingExec : { execution : any } ,
706684 blockId : string ,
@@ -719,7 +697,7 @@ export class BlockExecutor {
719697 const tail = decoder . decode ( )
720698 if ( tail ) chunks . push ( tail )
721699 } catch ( error ) {
722- execLogger . error ( 'Error reading executor stream for block' , { blockId, error } )
700+ this . execLogger . error ( 'Error reading executor stream for block' , { blockId, error } )
723701 } finally {
724702 try {
725703 await reader . cancel ( ) . catch ( ( ) => { } )
@@ -750,7 +728,10 @@ export class BlockExecutor {
750728 }
751729 return
752730 } catch ( error ) {
753- execLogger . warn ( 'Failed to parse streamed content for response format' , { blockId, error } )
731+ this . execLogger . warn ( 'Failed to parse streamed content for response format' , {
732+ blockId,
733+ error,
734+ } )
754735 }
755736 }
756737
0 commit comments