UN-3431 [FIX] Restore log_events_id on tool-run dispatch and persist without UI subscriber#1960
Conversation
…without UI subscriber structure_tool_task lost log_events_id from both the agentic_table and structure_pipeline ExecutionContexts during the agentic_table refactor; the executor shim therefore received an empty log_events_id and bailed before publishing anything. Tool-run lines stopped reaching the workflow logs UI for every dispatch through these paths. Two changes: - structure_tool_task: thread log_events_id into both ExecutionContexts. - executor_tool_shim.stream_log: gate only the PROGRESS path on log_events_id; the LOG payload now falls back to execution_id as the routing channel so logs persist to execution_log even when no websocket subscriber exists (API deployments). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
💤 Files with no reviewable changes (1)
Summary by CodeRabbit
WalkthroughWorkflow log streaming is decoupled from progress-event publishing: task dispatch now forwards ChangesWorkflow log publishing resilience
Executor runtime logging & adapter-once instrumentation
Sequence DiagramsequenceDiagram
participant Dispatcher as Dispatcher
participant Executor as Executor (worker)
participant Shim as ExecutorToolShim
participant LogPub as LogPublisher
participant DB as StateStore/TaskState
Dispatcher->>DB: read log_events_id
Dispatcher->>Executor: dispatch task with ExecutionContext(log_events_id)
Executor->>Shim: stream_log(message)
alt execution_id & organization_id present
Shim->>LogPub: publish(workflow_log, channel_id = log_events_id or execution_id)
end
alt log_events_id present
Shim->>LogPub: publish(progress_event, channel_id = log_events_id)
end
Executor->>Shim: log_adapter_once(kind, adapter_id, adapter)
Shim->>LogPub: stream_log("Using {kind}: `label`") [only first time per adapter_id]
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
| Filename | Overview |
|---|---|
| workers/file_processing/structure_tool_task.py | Restores missing log_events_id kwarg on two ExecutionContext dispatches (agentic_table, structure_pipeline), fixing the core regression. |
| workers/executor/executor_tool_shim.py | Moves PROGRESS publish behind log_events_id guard; LOG payload falls back to execution_id channel; adds log_adapter_once dedup helper. Logic is sound. |
| workers/executor/executors/legacy_executor.py | Consolidates noisy step logs into single-line summaries; adds log_adapter_once calls; llm.get_model_name() in summarize path is called without None-fallback, producing backtick-wrapped 'None' if model name is unset. |
| workers/executor/executors/index.py | Removes two verbose intermediate stream_log calls; clean-up only, no logic change. |
Sequence Diagram
sequenceDiagram
participant STT as structure_tool_task
participant D as Dispatcher
participant AT as agentic_table executor
participant LE as legacy executor (structure_pipeline)
participant ETS as ExecutorToolShim.stream_log
participant LP as LogPublisher
STT->>D: "dispatch(at_ctx, log_events_id=log_events_id)"
D->>AT: execute(context)
AT->>ETS: stream_log(msg)
alt log_events_id present
ETS->>LP: "publish(channel=log_events_id, PROGRESS)"
end
ETS->>LP: "publish(channel=log_events_id OR execution_id, LOG)"
LP-->>ETS: persisted to execution_log
STT->>D: "dispatch(pipeline_ctx, log_events_id=log_events_id)"
D->>LE: execute(context)
LE->>ETS: stream_log(Run config)
alt log_events_id present
ETS->>LP: "publish(channel=log_events_id, PROGRESS)"
end
ETS->>LP: "publish(channel=log_events_id OR execution_id, LOG)"
LE->>ETS: stream_log(Pipeline completed N/M)
ETS->>LP: "publish(channel=log_events_id OR execution_id, LOG)"
Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 1
workers/executor/executors/legacy_executor.py:2298-2300
`llm.get_model_name()` is called without a None/empty fallback here. If the adapter returns `None` or an empty string, the log line will display `` `None` `` in the UI. The `log_adapter_once` helper already handles this gracefully with `get_model() or adapter_id`; the same pattern should be applied here.
```suggestion
model_label = llm.get_model_name() or llm_adapter_id
shim.stream_log(
f"Summarizing extracted text using LLM: `{model_label}`"
)
```
Reviews (3): Last reviewed commit: "UN-3431 [MISC] Improve tool-run log narr..." | Re-trigger Greptile
Reshape the per-run shim.stream_log emissions so the workflow logs UI
reads as a per-phase narrative with one start, one end, and adapter
identity surfaced exactly once per unique adapter:
- Add a non-sensitive run-config preamble at the top of
_handle_structure_pipeline: prompt count + single_pass / summarize /
challenge flags. No prompt names or text are logged.
- Introduce ExecutorToolShim.log_adapter_once(kind, adapter_id, adapter)
with a per-shim dedup set so "Using LLM/Embedding/Vector DB:
`<model>`" appears at most once per unique adapter id. Used from
_initialize_adapters, _handle_index, and the summarize path.
- Drop intermediate / redundant lines that did not add information
on their own: "Initializing text extractor", "Using text extractor"
(rolled into the start line), "Extracting text from document",
"Saving extraction metadata", "Initialized embedding and vector DB
adapters", "Indexing file", "Adding nodes to vector db".
- Collapse the index-status trio ("Document already indexed,
re-indexing" + "Indexing document for the first time" + "Indexing
document into vector store") into a single "Indexing document" /
"Re-indexing document" line driven by doc_id_found.
- Gate "Retrieving context for" and "Retrieved N chunks via RAG for"
on chunk_size > 0 so single-pass / full-context paths do not emit
a misleading retrieval line.
- Combine summarize start into one line that names the LLM model.
- Wrap dynamic identifiers (adapter labels, extractor class, prompt
names) in backticks; drop trailing "..." across all stream_log
emissions.
- Emit a final "Pipeline completed: N/M prompts answered" with a
non-null count from structured_output[OUTPUT].
Pairs with the cloud-side log cleanup PR.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Test ResultsSummary
Runner Tests - Full Report
SDK1 Tests - Full Report
|



What
workers/file_processing/structure_tool_task.py: threadlog_events_idinto theagentic_tableandstructure_pipelineExecutionContextdispatches.workers/executor/executor_tool_shim.py: gate only the PROGRESS publish onlog_events_id; the LOG payload now falls back toexecution_idas the routing channel.Why
log_events_id. Tool-run logs ("Pipeline step 1: …", "Processing prompt: about", etc.) never reached the workflow execution logs UI for any run that went throughstructure_pipeline— that is the dominant code path.ExecutorToolShim.stream_logadditionally short-circuited atif not self.log_events_id: returnbefore the LOG-payload publish, so even contexts that have validexecution_id+organization_idbut no websocket subscriber (API deployments) silently dropped theirexecution_logrows.How
log_events_id=log_events_idkwarg on the two regressedExecutionContextconstructions; the third dispatch (agentic_extraction) already had it.ExecutorToolShim.stream_log, move the PROGRESS branch behind a truthylog_events_idcheck and useself.log_events_id or self.execution_idas the LOG channel so persistence works without a UI subscriber. Mirrors backendWorkflowLogchannel-fallback behaviour.Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
log_events_id). LOG persistence only gains coverage — whenlog_events_idwas present, the LOG already used it as channel; when it was absent, the code previously returned and now publishes viaexecution_idinstead.Database Migrations
Env Config
Relevant Docs
Related Issues or PRs
Zipstack/unstract-cloud#1491.Dependencies Versions
Notes on Testing
Built
unstract/worker-unified:testfrom this branch (with the cloud plugin fix overlaid) and recreated all v2 workers locally. Smoke tests to run:unstract.execution_logand stream live in the workflow logs panel.execution_log.Reading document context...,Running single-pass extraction with N fields...lines should appear (covered jointly by this PR + the cloud plugin PR).execution_log(unchanged).Screenshots
Checklist
I have read and understood the Contribution Guidelines.