UN-3266 [FEAT] Async Executor Backend for Prompt Studio#1849
UN-3266 [FEAT] Async Executor Backend for Prompt Studio#1849harini-venkataraman wants to merge 81 commits intomainfrom
Conversation
Conflicts resolved: - docker-compose.yaml: Use main's dedicated dashboard_metric_events queue for worker-metrics - PromptCard.jsx: Keep tool_id matching condition from our async socket feature - PromptRun.jsx: Merge useEffect import from main with our branch - ToolIde.jsx: Keep fire-and-forget socket approach (spinner waits for socket event) - SocketMessages.js: Keep both session-store and socket-custom-tool imports + updateCusToolMessages dep - SocketContext.js: Keep simpler path-based socket connection approach - usePromptRun.js: Keep Celery fire-and-forget with socket delivery over polling - setupProxy.js: Accept main's deletion (migrated to Vite)
for more information, see https://pre-commit.ci
… into feat/execution-backend
for more information, see https://pre-commit.ci
… into feat/execution-backend
for more information, see https://pre-commit.ci
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Rename PascalCase local variables to snake_case to comply with S117: - legacy_executor.py: rename tuple-unpacked _get_prompt_deps() results (AnswerPromptService→answer_prompt_svc, RetrievalService→retrieval_svc, VariableReplacementService→variable_replacement_svc, LLM→llm_cls, EmbeddingCompat→embedding_compat_cls, VectorDB→vector_db_cls) and update all downstream usages including _apply_type_conversion and _handle_summarize - test_phase1_log_streaming.py: rename Mock* local variables to mock_* snake_case equivalents - test_sanity_phase3.py: rename MockDispatcher→mock_dispatcher_cls and MockShim→mock_shim_cls across all 10 test methods - test_sanity_phase5.py: rename MockShim→mock_shim, MockX2Text→mock_x2text in 6 test methods; MockDispatcher→mock_dispatcher_cls in dispatch test; fix LLM_cls→llm_cls, EmbeddingCompat→embedding_compat_cls, VectorDB→vector_db_cls in _mock_prompt_deps helper Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
for more information, see https://pre-commit.ci
- test_sanity_phase2/4.py, test_answer_prompt.py: rename PascalCase local variables in _mock_prompt_deps/_mock_deps to snake_case (RetrievalService→retrieval_svc, VariableReplacementService→ variable_replacement_svc, Index→index_cls, LLM_cls→llm_cls, EmbeddingCompat→embedding_compat_cls, VectorDB→vector_db_cls, AnswerPromptService→answer_prompt_svc_cls) — fixes S117 - test_sanity_phase3.py: remove unused local variable "result" — fixes S1481 - structure_tool_task.py: remove redundant json.JSONDecodeError from except clause (subclass of ValueError) — fixes S5713 - shared/workflow/execution/service.py: replace generic Exception with RuntimeError for structure tool failure — fixes S112 - run-worker-docker.sh: define EXECUTOR_WORKER_TYPE constant and replace 10 literal "executor" occurrences — fixes S1192 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…iolations - Reduce cognitive complexity in answer_prompt.py: - Extract _build_grammar_notes, _run_webhook_postprocess helpers - _is_safe_public_url: extracted _resolve_host_addresses helper - handle_json: early-return pattern eliminates nesting - construct_prompt: delegates grammar loop to _build_grammar_notes - Reduce cognitive complexity in legacy_executor.py: - Extract _execute_single_prompt, _run_table_extraction helpers - Extract _run_challenge_if_enabled, _run_evaluation_if_enabled - Extract _inject_table_settings, _finalize_pipeline_result - Extract _convert_number_answer, _convert_scalar_answer - Extract _sanitize_dict_values helper - _handle_answer_prompt CC reduced from 50 to ~7 - Reduce CC in structure_tool_task.py: guard-clause refactor - Reduce CC in backend: dto.py, deployment_helper.py, api_deployment_views.py, prompt_studio_helper.py - Fix S117: rename PascalCase local vars in test_answer_prompt.py - Fix S1192: extract EXECUTOR_WORKER_TYPE constant in run-worker.sh - Fix S1172: remove unused params from structure_tool_task.py - Fix S5713: remove redundant JSONDecodeError in json_repair_helper.py - Fix S112/S5727 in test_execution.py Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…er_prompt Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
for more information, see https://pre-commit.ci
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…002192 - Add @staticmethod to _sanitize_null_values (fixes S2325 missing self) - Reduce _execute_single_prompt params from 25 to 11 (S107) by grouping services as deps tuple and extracting exec params from context.executor_params - Add NOSONAR suppression for raise exc in test helper (S112) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
for more information, see https://pre-commit.ci
execution_id, file_hash, log_events_id, custom_data are now extracted inside _execute_single_prompt from context.executor_params. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: harini-venkataraman <115449948+harini-venkataraman@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 4
Note
Due to the large number of review comments, Critical severity comments were prioritized as inline comments.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py (1)
1717-1720:⚠️ Potential issue | 🟠 MajorBroaden the cleanup around
dynamic_indexer()setup.The indexing flag is set before platform-key lookup, child-context creation, and dispatch, but the cleanup only runs for
(IndexingError, IndexingAPIError, SdkError). A local setup failure outside that tuple leaves the document permanently marked as indexing.Also applies to: 1740-1779
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py` around lines 1717 - 1720, The code sets DocumentIndexingService.set_document_indexing(...) before doing dynamic_indexer() platform-key lookup, child-context creation, and dispatch but only clears the flag on (IndexingError, IndexingAPIError, SdkError), which can leave a document permanently marked as indexing if any other local setup error occurs; wrap the entire setup+dispatch sequence that begins with DocumentIndexingService.set_document_indexing and includes dynamic_indexer(), platform key lookup and child context creation in a try/finally (or move the set_document_indexing call to after successful setup) so that the cleanup call that clears the indexing flag always runs regardless of exception type, and apply the same change to the analogous block referenced at the later section (around lines 1740-1779) to ensure consistent behavior.
🟠 Major comments (33)
workers/shared/enums/task_enums.py-36-37 (1)
36-37:⚠️ Potential issue | 🟠 MajorAdd a file-processing route for
execute_structure_tool.
TaskName.EXECUTE_STRUCTURE_TOOLis introduced here, butworkers/shared/infrastructure/config/registry.pystill does not route that task toQueueName.FILE_PROCESSING. If this task is queued by name without an explicit queue, it can fall back to the default queue and never reach the intended worker.Suggested follow-up in
workers/shared/infrastructure/config/registry.pyWorkerType.FILE_PROCESSING: WorkerTaskRouting( worker_type=WorkerType.FILE_PROCESSING, routes=[ TaskRoute("process_file_batch", QueueName.FILE_PROCESSING), TaskRoute("process_file_batch_api", QueueName.FILE_PROCESSING_API), + TaskRoute("execute_structure_tool", QueueName.FILE_PROCESSING), ], ),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/shared/enums/task_enums.py` around lines 36 - 37, TaskName.EXECUTE_STRUCTURE_TOOL was added but not routed to the file-processing queue; update the task routing in the registry so this task maps to QueueName.FILE_PROCESSING. In workers/shared/infrastructure/config/registry.py locate the routing map (the dict or function that assigns TaskName values to queues) and add an entry for TaskName.EXECUTE_STRUCTURE_TOOL -> QueueName.FILE_PROCESSING (or include it in whatever list/group is used for file-processing tasks), ensuring any default/fallback logic won’t send it to the default queue.workers/tests/conftest.py-13-14 (1)
13-14:⚠️ Potential issue | 🟠 MajorForce
.env.testto override ambient env vars.
load_dotenv()keeps existing variables by default (override=False), so developer or CI values forINTERNAL_API_BASE_URL/INTERNAL_SERVICE_API_KEYwill take precedence over.env.test. This makes the test suite environment-dependent and can send tests to the wrong backend.Suggested fix
_env_test = Path(__file__).resolve().parent.parent / ".env.test" -load_dotenv(_env_test) +load_dotenv(_env_test, override=True)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/tests/conftest.py` around lines 13 - 14, The call to load_dotenv currently preserves existing environment variables, causing ambient CI/dev values to override .env.test; update the call in workers/tests/conftest.py to force loading .env.test by passing override=True (e.g., change load_dotenv(_env_test) to load_dotenv(_env_test, override=True)) so _env_test always replaces existing vars like INTERNAL_API_BASE_URL and INTERNAL_SERVICE_API_KEY during tests.docker/sample.compose.override.yaml-323-327 (1)
323-327:⚠️ Potential issue | 🟠 MajorAdd the new executor worker to the dev override too.
This PR adds
worker-executor-v2indocker/docker-compose.yaml, but the sample override only wires up the callback worker. In local dev, Compose will still try to pullunstract/worker-unified:${VERSION}for the executor, and even if that image exists it won’t include localworkers/changes, so the new async flow can’t be exercised reliably.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docker/sample.compose.override.yaml` around lines 323 - 327, The dev override is missing the new service definition for worker-executor-v2 so Compose will pull the remote image instead of building the local worker; add a service block named worker-executor-v2 to the sample override (matching how worker-prompt-studio-callback is defined) that points to the local build (dockerfile: docker/dockerfiles/backend.Dockerfile and context: ..), and include the same volumes, environment and any depends_on or networks used by other local worker services so the executor is built from local workers/ sources and participates in the local async flow.frontend/src/components/custom-tools/tool-ide/ToolIde.jsx-267-275 (1)
267-275:⚠️ Potential issue | 🟠 MajorClear the indexing state on non-async success too.
docIdnow gets removed fromindexDocsonly when the POST fails. Ifasync_prompt_executionis off, or the backend returns a successful response without a follow-up socket event, the document stays permanently “indexing” and future retries are blocked.Suggested change
pushIndexDoc(docId); - return axiosPrivate(requestOptions).catch((err) => { - // Only clear spinner on POST network failure (not 2xx). - // On success the spinner stays until a socket event arrives. - deleteIndexDoc(docId); - setAlertDetails( - handleException(err, `${doc?.document_name} - Failed to index`), - ); - }); + return axiosPrivate(requestOptions) + .then((res) => { + if (res.status !== 202 || !res.data?.task_id) { + deleteIndexDoc(docId); + } + return res; + }) + .catch((err) => { + deleteIndexDoc(docId); + setAlertDetails( + handleException(err, `${doc?.document_name} - Failed to index`), + ); + });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/components/custom-tools/tool-ide/ToolIde.jsx` around lines 267 - 275, The code only calls deleteIndexDoc(docId) inside the .catch branch so the doc remains "indexing" when the POST succeeds without a socket event; change the axiosPrivate call to ensure deleteIndexDoc(docId) runs for successful non-async responses as well (e.g., call deleteIndexDoc in a .finally or in a .then branch after inspecting the response), keep setAlertDetails(handleException(...)) in the .catch branch, and preserve pushIndexDoc(docId) before the request; update the axiosPrivate(requestOptions) call around pushIndexDoc/deleteIndexDoc to call deleteIndexDoc(docId) on both success and failure (use the existing pushIndexDoc, deleteIndexDoc, axiosPrivate, setAlertDetails, and handleException symbols to locate and modify the code).workers/executor/executors/retrievers/base_retriever.py-10-35 (1)
10-35:⚠️ Potential issue | 🟠 MajorRemove
@staticmethodand raiseNotImplementedErrorto enforce the base contract.The current implementation silently returns an empty set, which masks incomplete implementations. All 7 concrete retrievers properly override this method, but the base class should fail fast if called directly. Ideally, promote
BaseRetrieverto an abstract base class with@abstractmethodto prevent instantiation entirely—this pattern already exists elsewhere in the codebase (BaseTool, FileStorageInterface).Suggested change
class BaseRetriever: def __init__( self, @@ -30,6 +30,6 @@ class BaseRetriever: self.top_k = top_k self.llm = llm if llm else None - `@staticmethod` - def retrieve() -> set[str]: - return set() + def retrieve(self) -> set[str]: + raise NotImplementedError🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/executor/executors/retrievers/base_retriever.py` around lines 10 - 35, The BaseRetriever class currently defines retrieve as a `@staticmethod` returning an empty set; change this to an abstract instance method that enforces the contract: remove the `@staticmethod` decorator on BaseRetriever.retrieve, make BaseRetriever inherit from abc.ABC (or otherwise mark it abstract), and implement retrieve(self, ...) to raise NotImplementedError (or use `@abstractmethod`) so calling the base method fails fast; reference the BaseRetriever class and its retrieve method when making this change so concrete retrievers continue to override the instance method.workers/executor/worker.py-36-48 (1)
36-48:⚠️ Potential issue | 🟠 MajorDon't report a worker with zero executors as healthy.
ExecutorRegistry.list_executors()can return an empty list when import-time registration breaks, but both the registered health check and the task-levelhealthcheckstill report success. That makes a worker that cannot execute anything look ready to orchestration and monitoring. Please degrade when no executors are registered, and have the task reuse that computed status/details instead of hardcoding"healthy".Also applies to: 67-75
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/executor/worker.py` around lines 36 - 48, The health check currently always returns HealthStatus.HEALTHY and hardcodes message/details even when ExecutorRegistry.list_executors() returns an empty list; change logic in the health check (the function that builds the HealthCheckResult using ExecutorRegistry.list_executors()) to detect an empty executors list and set status to HealthStatus.UNHEALTHY (or a degraded status), adjust the message and details appropriately (e.g., note "no executors registered"), and then reuse that same computed HealthCheckResult/details in the task-level healthcheck (instead of hardcoding "healthy"/queues) so both the registered health check and the task-level healthcheck reflect the computed status and details. Ensure references: ExecutorRegistry.list_executors(), HealthCheckResult, HealthStatus, and the task-level healthcheck function are updated.frontend/src/hooks/usePromptStudioSocket.js-45-72 (1)
45-72:⚠️ Potential issue | 🟠 MajorCompleted events need the same cleanup fallback as failed events.
handleCompleted()only clears local state fromresult. If the callback succeeds with[]or with a payload that omits the IDs you need, the pending prompt/index status never gets removed and the UI spinner stays stuck. Passextrainto the success path and reuse the metadata-based fallback clearing you already have inhandleFailed().Also applies to: 132-135
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/hooks/usePromptStudioSocket.js` around lines 45 - 72, handleCompleted currently only clears state using result payload and can leave pending statuses if result is [] or missing IDs; update handleCompleted (and the similar block around the other occurrence at lines ~132-135) to accept the extra metadata argument and call the same metadata-based fallback clearing used in handleFailed: pass extra into handleCompleted, and inside each success branch (fetch_response, single_pass_extraction, index_document) call clearResultStatuses(result, extra) or invoke the metadata-based cleanup logic used by handleFailed (using prompt/index IDs from extra when result lacks them) so updatePromptOutputState, updateCustomTool, deleteIndexDoc, and setAlertDetails remain but pending statuses are always cleared via the extra fallback.frontend/src/hooks/usePromptRun.js-51-67 (1)
51-67:⚠️ Potential issue | 🟠 MajorTimeout callback is not cleaned up on unmount - potential memory leak.
The
setTimeoutcreated in the.then()callback is not tracked or cleared. If the component unmounts before the 5-minute timeout fires, the callback will still execute, potentially causing:
- State updates on unmounted component
- Stale closure issues with store references
Since this hook is used in
PromptRun.jsxwhich may unmount when navigating away, this could cause issues.🐛 Proposed fix - track and clear timeouts
+import { useRef, useEffect } from "react"; + const usePromptRun = () => { + const timeoutRefs = useRef(new Map()); + + // Cleanup timeouts on unmount + useEffect(() => { + return () => { + timeoutRefs.current.forEach((timeoutId) => clearTimeout(timeoutId)); + timeoutRefs.current.clear(); + }; + }, []); + // ... existing code ... const runPromptApi = (api) => { const [promptId, docId, profileId] = api.split("__"); const runId = generateUUID(); + const timeoutKey = `${promptId}__${docId}__${profileId}`; // ... body and requestOptions ... makeApiRequest(requestOptions) .then(() => { - setTimeout(() => { + const timeoutId = setTimeout(() => { + timeoutRefs.current.delete(timeoutKey); const statusKey = generateApiRunStatusId(docId, profileId); // ... rest of timeout logic }, SOCKET_TIMEOUT_MS); + timeoutRefs.current.set(timeoutKey, timeoutId); })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/hooks/usePromptRun.js` around lines 51 - 67, The timeout created after makeApiRequest in the hook is not tracked or cleared, risking state updates after unmount; modify the hook to store the timeout ID (e.g., in a ref or local variable tied to the hook) when calling setTimeout (the block that uses generateApiRunStatusId, usePromptRunStatusStore, removePromptStatus, and setAlertDetails with SOCKET_TIMEOUT_MS) and ensure you clearTimeout on cleanup/unmount and/or before scheduling a new timeout (also clear it when the API request or socket result arrives) so the callback cannot run against an unmounted component or stale closures.workers/executor/tasks.py-19-27 (1)
19-27:⚠️ Potential issue | 🟠 MajorAvoid blanket autoretry for executor operations.
This retries the entire task, not just a transport call. A timeout after an indexing/LLM/vector-store request has already been accepted will rerun the operation and can double-write or double-bill unless every executor path is idempotent end-to-end. Prefer retries inside the specific client calls, or carry an idempotency key/checkpoint through the executor flow before enabling task-level retries.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/executor/tasks.py` around lines 19 - 27, The `@shared_task` decorator on TaskName.EXECUTE_EXTRACTION currently applies blanket autoretry for the whole executor task which can cause double-writes/double-billing; change this by removing or disabling task-level autoretry for the execute_extraction task and instead implement targeted retries inside the specific client calls (e.g., indexing, LLM, vector-store client functions) or propagate an idempotency key/checkpoint through the executor flow so retries are safe end-to-end; locate the decorator annotation used on the execute_extraction task and update the retry strategy accordingly, adding per-client retry logic around transport calls or adding an idempotency token passed into execute_extraction and checked before performing side-effecting operations.workers/executor/executors/retrieval.py-71-87 (1)
71-87:⚠️ Potential issue | 🟠 MajorPreserve retrieval rank instead of coercing unordered chunks to a list.
The new retrievers currently expose
set[str], soreturn list(context)makes output order hash-dependent rather than relevance-dependent. If callers concatenate chunks in returned order, answer quality becomes nondeterministic. Please move the contract to an ordered sequence and dedupe without dropping rank.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/executor/executors/retrieval.py` around lines 71 - 87, The current code does list(context) which converts a set to an unordered list and breaks relevance ranking; instead, treat the retriever result as an ordered sequence and preserve rank while removing duplicates: iterate over the returned context from retriever.retrieve() in order (do not call list(context) directly), build a new list by appending items only if not seen before (use a local seen set keyed by chunk id or the chunk itself) and return that deduped list; keep the existing metrics/logging (context_retrieval_metrics, prompt_key, doc_id, retrieval_type, top_k) unchanged and apply deduping before len(context) and the returned value.workers/executor/executors/retrievers/router.py-37-59 (1)
37-59:⚠️ Potential issue | 🟠 Major
keyword_searchis still the same semantic engine.This helper builds the same query-engine path as the base tool and only changes
similarity_top_k. The router metadata promises exact-term matching, but the implementation never switches retrieval strategy, so exact-match queries can be routed to a tool that does not actually provide keyword behavior.🔧 Minimal fallback if a real keyword backend is not ready yet
- name="keyword_search", + name="expanded_vector_search", description=( - "Best for finding specific terms, names, numbers, dates, " - "or exact phrases. Use when looking for precise matches." + "Broader semantic search with more candidates." ),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/executor/executors/retrievers/router.py` around lines 37 - 59, The current _add_keyword_search_tool creates a semantic engine (vector_store_index.as_query_engine) and only changes similarity_top_k so it does not perform true exact-term matching; update _add_keyword_search_tool to construct a keyword/term-based retriever instead of the default semantic retriever: try to obtain a keyword retriever from vector_store_index (e.g., a method like as_retriever(search_type="keyword") or a retriever configured for exact matching) and pass that retriever into vector_store_index.as_query_engine (or build a QueryEngine that uses that retriever) before creating the QueryEngineTool with metadata name="keyword_search"; if the vector store has no native keyword retriever, fall back to a simple filter/exact-match pass-through (or a low-level text/term scan) and ensure exceptions are caught and logged from this new retrieval creation path.workers/executor/executors/variable_replacement.py-31-35 (1)
31-35:⚠️ Potential issue | 🟠 MajorDon't treat valid falsey values as “missing.”
Both branches skip replacement on
if not output_value, so0,False, empty strings, and empty collections leave unresolved template tokens in the prompt even when the key exists.Proposed fix
- if not output_value: + if output_value is None: return prompt @@ - if not output_value: + if output_value is None: return promptAlso applies to: 90-94
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/executor/executors/variable_replacement.py` around lines 31 - 35, The code treats any falsy output_value as missing, causing valid values like 0, False, "" or [] to skip replacement; update the checks in variable_replacement.py (where VariableReplacementHelper.check_static_variable_run_status is called) to only treat None as missing (e.g., change "if not output_value" to "if output_value is None"), and apply the same change to the other identical branch later in the file so only None (or an explicit sentinel) prevents replacement.workers/executor/executors/postprocessor.py-34-53 (1)
34-53:⚠️ Potential issue | 🟠 MajorValidate the webhook response shape before inspecting it.
response.json()can return any JSON value, not just an object. If the webhook answers with a scalar liketrueor42, this membership check raises instead of falling back toparsed_data, which turns a bad webhook payload into a hard failure.Proposed fix
def _process_successful_response( - response_data: dict, parsed_data: dict, highlight_data: list | None + response_data: Any, parsed_data: dict, highlight_data: list | None ) -> tuple[dict[str, Any], list | None]: """Process successful webhook response.""" + if not isinstance(response_data, dict): + logger.warning("Ignoring postprocessing due to invalid webhook response type") + return parsed_data, highlight_data + if "structured_output" not in response_data: logger.warning("Response missing 'structured_output' key") return parsed_data, highlight_data🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/executor/executors/postprocessor.py` around lines 34 - 53, The function _process_successful_response assumes response_data is a dict and does "if 'structured_output' not in response_data", which will raise when response_data is a non-object JSON (e.g., a scalar); guard against that by first checking isinstance(response_data, dict) (or dict-like) and return parsed_data, highlight_data if it's not a dict, then proceed with existing logic (use response_data.get("structured_output")/response_data.get("highlight_data") and keep calling _validate_structured_output and _validate_highlight_data as before).workers/file_processing/structure_tool_task.py-645-651 (1)
645-651:⚠️ Potential issue | 🟠 MajorDon't reset
METADATA.jsonafter a failed read.If
fs.read()orjson.loads()fails here, the code falls back to{}and then rewrites the file, which drops priortool_metadataandtotal_elapsed_time. Log the read error and abort this update instead of silently overwriting accumulated metadata.Proposed fix
if fs.exists(metadata_path): try: existing_raw = fs.read(path=metadata_path, mode="r") if existing_raw: existing = json.loads(existing_raw) - except Exception: - pass + except Exception as e: + logger.warning("Failed to read existing METADATA.json: %s", e) + return🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/file_processing/structure_tool_task.py` around lines 645 - 651, When reading METADATA.json (metadata_path) you currently swallow exceptions from fs.read() and json.loads(), then fall back to an empty dict and overwrite accumulated fields; instead catch the exception, log the error (including the exception details) and abort this update so you do not reset prior tool_metadata or total_elapsed_time. Specifically, in the block around fs.read(path=metadata_path, mode="r") and json.loads(existing_raw) (the variables existing_raw and existing), replace the bare except: pass with logging the exception (e.g., logger.exception or process_logger.error with the exception) and return/raise to skip rewriting METADATA.json.workers/executor/executors/variable_replacement.py-65-69 (1)
65-69:⚠️ Potential issue | 🟠 MajorCatch
TypeErrorin the JSON-to-string fallback.
json.dumps()raisesTypeErrorfor non-serializable objects. Right now values likedatetime,Decimal, or SDK objects will escape this helper and abort prompt rendering instead of falling back tostr().Proposed fix
def handle_json_and_str_types(value: Any) -> str: try: formatted_value = json.dumps(value) - except ValueError: + except (TypeError, ValueError): formatted_value = str(value) return formatted_value🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/executor/executors/variable_replacement.py` around lines 65 - 69, The helper handle_json_and_str_types currently only catches ValueError from json.dumps, but json.dumps raises TypeError for non-serializable objects (e.g., datetime, Decimal, SDK objects), causing failures; update the except clause to catch TypeError as well (or catch both ValueError and TypeError) and then fall back to str(value) so non-JSON-serializable inputs are safely converted for prompt rendering.workers/file_processing/structure_tool_task.py-270-287 (1)
270-287:⚠️ Potential issue | 🟠 MajorPreserve exported tool defaults when no workflow override is provided.
These
get(..., False)calls overwrite the fetched tool’s own settings withFalse, so summarize/highlight/single-pass/challenge can be disabled just because the instance payload omitted the key. The agentic path already falls back to exported metadata; the regular path should do the same.Proposed fix
# ---- Extract settings from tool_metadata ---- settings = tool_instance_metadata - is_challenge_enabled = settings.get(_SK.ENABLE_CHALLENGE, False) - is_summarization_enabled = settings.get(_SK.SUMMARIZE_AS_SOURCE, False) - is_single_pass_enabled = settings.get(_SK.SINGLE_PASS_EXTRACTION_MODE, False) - challenge_llm = settings.get(_SK.CHALLENGE_LLM_ADAPTER_ID, "") - is_highlight_enabled = settings.get(_SK.ENABLE_HIGHLIGHT, False) - is_word_confidence_enabled = settings.get(_SK.ENABLE_WORD_CONFIDENCE, False) + tool_id = tool_metadata[_SK.TOOL_ID] + tool_settings = tool_metadata[_SK.TOOL_SETTINGS] + outputs = tool_metadata[_SK.OUTPUTS] + is_challenge_enabled = settings.get( + _SK.ENABLE_CHALLENGE, tool_settings.get(_SK.ENABLE_CHALLENGE, False) + ) + is_summarization_enabled = settings.get( + _SK.SUMMARIZE_AS_SOURCE, tool_settings.get(_SK.SUMMARIZE_AS_SOURCE, False) + ) + is_single_pass_enabled = settings.get( + _SK.SINGLE_PASS_EXTRACTION_MODE, + tool_settings.get(_SK.ENABLE_SINGLE_PASS_EXTRACTION, False), + ) + challenge_llm = settings.get( + _SK.CHALLENGE_LLM_ADAPTER_ID, tool_settings.get(_SK.CHALLENGE_LLM, "") + ) + is_highlight_enabled = settings.get( + _SK.ENABLE_HIGHLIGHT, tool_settings.get(_SK.ENABLE_HIGHLIGHT, False) + ) + is_word_confidence_enabled = settings.get( + _SK.ENABLE_WORD_CONFIDENCE, + tool_settings.get(_SK.ENABLE_WORD_CONFIDENCE, False), + ) @@ - tool_id = tool_metadata[_SK.TOOL_ID] - tool_settings = tool_metadata[_SK.TOOL_SETTINGS] - outputs = tool_metadata[_SK.OUTPUTS]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/file_processing/structure_tool_task.py` around lines 270 - 287, The instance-level settings (variable settings = tool_instance_metadata) currently use settings.get(..., False) which unintentionally overrides exported tool defaults; update the lookups for is_challenge_enabled, is_summarization_enabled, is_single_pass_enabled, challenge_llm, is_highlight_enabled, and is_word_confidence_enabled to fall back to the exported tool defaults from tool_settings (tool_metadata[_SK.TOOL_SETTINGS]) when the key is missing, e.g. use settings.get(KEY, tool_settings.get(KEY)) or similar so the tool’s exported defaults are preserved when the instance payload omits a key; keep using the same _SK key symbols (e.g., _SK.ENABLE_CHALLENGE, _SK.SUMMARIZE_AS_SOURCE, _SK.SINGLE_PASS_EXTRACTION_MODE, _SK.CHALLENGE_LLM_ADAPTER_ID, _SK.ENABLE_HIGHLIGHT, _SK.ENABLE_WORD_CONFIDENCE) to locate and change the lookups.frontend/src/components/custom-tools/prompt-card/PromptCard.jsx-75-79 (1)
75-79:⚠️ Potential issue | 🟠 MajorDon't bind per-card progress to tool-global socket messages.
details?.tool_idis shared by every prompt card in the tool, so this predicate lets any tool-scoped INFO/ERROR message leak into every card'sprogressMsg. During concurrent runs, one prompt can end up showing another prompt's progress or error state.Suggested narrowing
.find( (item) => (item?.component?.prompt_id === promptDetailsState?.prompt_id || - item?.component?.prompt_key === promptKey || - item?.component?.tool_id === details?.tool_id) && + item?.component?.prompt_key === promptKey || + (!item?.component?.prompt_id && + !item?.component?.prompt_key && + item?.component?.tool_id === details?.tool_id)) && (item?.level === "INFO" || item?.level === "ERROR"), );🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/components/custom-tools/prompt-card/PromptCard.jsx` around lines 75 - 79, Predicate in PromptCard.jsx is too broad because checking details?.tool_id allows tool-global INFO/ERROR socket messages to apply to every card; update the filter used for progressMsg so it only accepts messages explicitly targeted to this prompt instance by requiring a match on a unique prompt identifier (e.g., item?.component?.prompt_id === promptDetailsState?.prompt_id or item?.component?.prompt_key === promptKey) and not just details?.tool_id, or if messages include a run/instance id use that (e.g., item?.component?.run_id or instance_id) combined with prompt_id/prompt_key to correlate; change the predicate that references item?.component?.prompt_id, promptDetailsState?.prompt_id, promptKey and details?.tool_id so it narrows scope to the specific prompt instance rather than any message with the same tool_id.workers/executor/executors/dto.py-26-31 (1)
26-31:⚠️ Potential issue | 🟠 MajorValidate the full chunking invariant here.
Only rejecting
chunk_size == 0still allows negative sizes andchunk_overlap >= chunk_size; that produces a zero/negative stride for chunkers and can break or hang indexing downstream.Suggested fix
def __post_init__(self) -> None: - if self.chunk_size == 0: - raise ValueError( - "Indexing cannot be done for zero chunks." - "Please provide a valid chunk_size." - ) + if self.chunk_size <= 0: + raise ValueError("chunk_size must be greater than 0") + if self.chunk_overlap < 0: + raise ValueError("chunk_overlap cannot be negative") + if self.chunk_overlap >= self.chunk_size: + raise ValueError( + "chunk_overlap must be smaller than chunk_size" + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/executor/executors/dto.py` around lines 26 - 31, In __post_init__ validate the full chunking invariant: ensure chunk_size is > 0, chunk_overlap is >= 0, and chunk_overlap < chunk_size (so stride = chunk_size - chunk_overlap is positive); if any check fails raise a ValueError with a clear message mentioning chunk_size and chunk_overlap; update the validation logic in the __post_init__ method (which currently only checks chunk_size == 0) to perform these three checks.workers/tests/test_legacy_executor_scaffold.py-252-268 (1)
252-268:⚠️ Potential issue | 🟠 MajorThis Flask check is session-global, not module-specific.
Scanning all
flask*entries insys.modulesmakes the test pass or fail based on unrelated imports from the rest of the test run, not on whatexecutor.executors.exceptionsdid. Please assert on imports triggered by this module itself instead of inspecting global interpreter state afterward.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/tests/test_legacy_executor_scaffold.py` around lines 252 - 268, The test_no_flask_import currently inspects global sys.modules, which is session-global; change it to check only imports caused by importing executor.executors.exceptions by recording sys.modules before and after the import inside the test_no_flask_import function and asserting that no new keys starting with "flask" were added; specifically, capture a snapshot (e.g., pre_modules = set(sys.modules)), import or reload the module under test (executor.executors.exceptions), compute added = set(sys.modules) - pre_modules, and then assert that no name in added startswith "flask" to ensure the module itself didn't pull in Flask.backend/prompt_studio/prompt_studio_core_v2/views.py-389-423 (1)
389-423:⚠️ Potential issue | 🟠 MajorKeep the async rollout behind
async_prompt_execution.These branches now always dispatch to Celery and return
202 Accepted. That removes the synchronous fallback instead of guarding it behind the rollout flag, so backend behavior changes even when the async feature is supposed to be off.Also applies to: 464-504, 561-595
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/prompt_studio/prompt_studio_core_v2/views.py` around lines 389 - 423, The current code in PromptStudio views (around the dispatch block that calls PromptStudioHelper._get_dispatcher() and dispatch_with_callback, currently pre-generating executor_task_id and returning HTTP_202_ACCEPTED) always uses Celery; wrap this async dispatch and the 202 response behind the async rollout flag (async_prompt_execution) and preserve the synchronous fallback when the flag is false. Concretely, in the view(s) that build context via PromptStudioHelper.build_index_payload, fetch the rollout flag (async_prompt_execution), and only call PromptStudioHelper._get_dispatcher(), generate executor_task_id, call dispatcher.dispatch_with_callback(...) and return Response(..., status=HTTP_202_ACCEPTED) when the flag is true; otherwise keep the existing synchronous execution path (no dispatcher, no task id) and return the original synchronous result. Apply the same guard to the other similar dispatch blocks referenced (lines around 464-504 and 561-595) so all async Celery dispatches are behind async_prompt_execution.backend/prompt_studio/prompt_studio_core_v2/views.py-452-453 (1)
452-453:⚠️ Potential issue | 🟠 MajorScope prompt/document lookups to the current tool.
These raw
objects.get(pk=...)calls let callers mix IDs across projects. A user who can access one tool can submit a prompt or document UUID from another tool and have this endpoint operate on it. Resolve both through the current tool and return 404 when they do not belong to it.Also applies to: 461-462, 536-537
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/prompt_studio/prompt_studio_core_v2/views.py` around lines 452 - 453, Scope all prompt and document lookups to the current tool: replace raw ToolStudioPrompt.objects.get(pk=prompt_id) (and the similar lookups at the other occurrences) with a scoped lookup that includes the current tool, e.g. use get_object_or_404(ToolStudioPrompt, pk=prompt_id, tool=current_tool) or ToolStudioPrompt.objects.get(pk=prompt_id, tool=current_tool) wrapped to raise Http404; do the same for the document model lookups (e.g. ToolDocument or the document class used at the other locations) so requests for IDs from other tools return 404.backend/api_v2/deployment_helper.py-279-284 (1)
279-284:⚠️ Potential issue | 🟠 Major
include_metrics=Truecurrently keeps full inner metadata.Both branches only call
remove_inner_result_metadata()when both flags are false. After usage enrichment, theinclude_metrics and not include_metadatapath still returns the entireresult.metadataobject, not just usage/metrics, which broadens the response contract unexpectedly.Also applies to: 483-488
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/api_v2/deployment_helper.py` around lines 279 - 284, The current logic enriches usage metadata via cls._enrich_result_with_usage_metadata(result) but only calls result.remove_inner_result_metadata() when both include_metadata and include_metrics are false, leaving full inner metadata when include_metrics=True and include_metadata=False; change the branching so that after enrichment you unconditionally call result.remove_inner_result_metadata() whenever include_metadata is False (i.e., if not include_metadata: result.remove_inner_result_metadata()), and still call result.remove_result_metrics() when not include_metrics; apply this fix to both occurrences around the cls._enrich_result_with_usage_metadata/result.remove_inner_result_metadata blocks (the block shown and the similar block at lines 483-488) so metrics-only responses no longer contain full inner metadata.backend/prompt_studio/prompt_studio_core_v2/test_tasks.py-289-336 (1)
289-336:⚠️ Potential issue | 🟠 MajorThese tests are pinned to deleted implementation details.
views.pyin this PR no longer callsrun_* .apply_asyncor readsStateStoredirectly; it builds anExecutionContextand dispatches through the new dispatcher. The Phase 8 assertions will fail against the current code, and the Phase 9 cases only grep source, so they do not verifytask_status()at runtime.Also applies to: 361-394
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/prompt_studio/prompt_studio_core_v2/test_tasks.py` around lines 289 - 336, The tests currently assert old implementation details (run_*.apply_async and direct StateStore.get) which no longer exist; update tests for PromptStudioCoreView.index_document, PromptStudioCoreView.fetch_response, and PromptStudioCoreView.single_pass_extraction to (1) assert the view dispatches via the new dispatcher (e.g., check for dispatcher.dispatch or dispatcher.async_dispatch usage in the source or mock the dispatcher and assert it was called), (2) assert an ExecutionContext (or the helper that builds it) is constructed/passed and includes captures for Common.LOG_EVENTS_ID and Common.REQUEST_ID, and (3) replace the Phase 9 greps with a runtime assertion that the dispatcher call results in the expected task status behavior (e.g., mock dispatcher.dispatch/async_dispatch and assert task_status()/response code is HTTP_202_ACCEPTED). Use the unique symbols PromptStudioCoreView.index_document, PromptStudioCoreView.fetch_response, PromptStudioCoreView.single_pass_extraction, ExecutionContext, dispatcher.dispatch/async_dispatch, and Common.LOG_EVENTS_ID/Common.REQUEST_ID to locate the code to change.backend/prompt_studio/prompt_studio_core_v2/views.py-597-629 (1)
597-629:⚠️ Potential issue | 🟠 MajorValidate task ownership before returning Celery results.
This detail action never resolves the
CustomTool, so any object-level permission logic onIsOwnerOrSharedUserOrSharedToOrgis skipped, andpkis not tied totask_idat all. A caller who learns another task ID can query it through any tool they can access unless you check that the task belongs to this tool/run first.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/prompt_studio/prompt_studio_core_v2/views.py` around lines 597 - 629, The task_status action skips resolving the CustomTool and thus bypasses object-level permission checks; fix it by calling self.get_object() at the start of task_status to enforce IsOwnerOrSharedUserOrSharedToOrg, then after obtaining the AsyncResult (using AsyncResult and get_worker_celery_app()) verify that the Celery task actually belongs to that tool/run (e.g., compare the tool's PK to an identifier stored on the task result/meta/payload) and return HTTP 403 if it does not match before returning task status/result.unstract/sdk1/src/unstract/sdk1/execution/dispatcher.py-133-164 (1)
133-164:⚠️ Potential issue | 🟠 MajorKeep
dispatch()failure-safe around broker send and result decoding.
send_task()andExecutionResult.from_dict()are outside thetry, so a broker outage or malformed worker payload will raise instead of returningExecutionResult.failure(...). That breaks the public contract of this synchronous API.Suggested fix
- async_result = self._app.send_task( - _TASK_NAME, - args=[context.to_dict()], - queue=queue, - ) - logger.info( - "Task sent: celery_task_id=%s, waiting for result...", - async_result.id, - ) - try: + async_result = self._app.send_task( + _TASK_NAME, + args=[context.to_dict()], + queue=queue, + ) + logger.info( + "Task sent: celery_task_id=%s, waiting for result...", + async_result.id, + ) # disable_sync_subtasks=False: safe because the executor task # runs on a separate worker pool (worker-v2) — no deadlock # risk even when dispatch() is called from inside a Django # Celery task. result_dict = async_result.get( timeout=timeout, disable_sync_subtasks=False, ) + return ExecutionResult.from_dict(result_dict) except Exception as exc: logger.error( "Dispatch failed: executor=%s operation=%s run_id=%s error=%s", context.executor_name, context.operation, context.run_id, exc, ) return ExecutionResult.failure( error=f"{type(exc).__name__}: {exc}", ) - - return ExecutionResult.from_dict(result_dict)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@unstract/sdk1/src/unstract/sdk1/execution/dispatcher.py` around lines 133 - 164, The dispatch() method currently only wraps async_result.get(...) in try/except, leaving self._app.send_task(...) and ExecutionResult.from_dict(...) able to raise; update dispatch() to make the entire send/get/decode sequence failure-safe by expanding the try block (or adding an outer try) to include the call to self._app.send_task(...) and the call to ExecutionResult.from_dict(...), catch Exception as exc, log the error with context.executor_name, context.operation, context.run_id and the exception, and return ExecutionResult.failure(error=f"{type(exc).__name__}: {exc}") so broker send failures and malformed worker payloads return a failure result instead of raising.backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py-390-393 (1)
390-393:⚠️ Potential issue | 🟠 MajorKeep
build_index_payload()side-effect free.This helper marks the document as indexing before it even returns the
ExecutionContext. If anything fails between this return and the eventual task publish, no worker ever sees the job and the document is left stuck in the pending state.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py` around lines 390 - 393, build_index_payload currently performs a side-effect by calling DocumentIndexingService.set_document_indexing(org_id=org_id, user_id=user_id, doc_id_key=doc_id_key) before returning the ExecutionContext; remove that call from build_index_payload so the function is side-effect free and only constructs/returns the payload/ExecutionContext, then ensure the caller (the code that publishes the indexing task) invokes DocumentIndexingService.set_document_indexing only after the task publish/queueing succeeds (use the same org_id, user_id, doc_id_key parameters), so a document is marked indexing only when a worker will actually receive the job.backend/prompt_studio/prompt_studio_core_v2/tasks.py-142-144 (1)
142-144:⚠️ Potential issue | 🟠 MajorMirror the indexing cleanup in the generic callback exception path.
Only the explicit executor-failure branch clears the in-progress marker. If post-success bookkeeping raises before
mark_document_indexed()finishes, this path emits the websocket error and re-raises, but the document can remain stuck as"being indexed".Also applies to: 174-183
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/prompt_studio/prompt_studio_core_v2/tasks.py` around lines 142 - 144, The generic callback exception path doesn’t clear the in-progress marker like the executor-failure branch, so ensure DocumentIndexingService.remove_document_indexing(org_id=org_id, user_id=user_id, doc_id_key=doc_id_key) is called on all error paths (including the generic exception handler that emits the websocket error and re-raises) and when post-success bookkeeping raises before mark_document_indexed(); move or add the remove_document_indexing call into a shared cleanup/finally block around mark_document_indexed()/post-success bookkeeping in the task function so both the executor-failure branch and the generic callback exception path (and the post-success error case) always clear the "being indexed" marker.backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py-186-188 (1)
186-188:⚠️ Potential issue | 🟠 MajorDon't fail open when
created_byis missing.This early return skips every adapter ownership check. An orphaned or imported
ProfileManagercan then execute with adapters that are notshared_to_org, which effectively bypasses the permission gate this method is supposed to enforce.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py` around lines 186 - 188, The early return when profile_manager_owner is None skips ownership checks; instead remove that early return and enforce ownership validation: if profile_manager_owner is None, try to derive owner from profile_manager.created_by, and if created_by is also missing treat the profile manager as orphaned and require that every adapter in profile_manager.adapters has shared_to_org == True (otherwise raise an authorization error). Update the logic in the ownership-checking function that references profile_manager_owner/profile_manager to perform this fallback and to deny access when neither owner nor created_by is present unless all adapters are shared_to_org.backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py-572-577 (1)
572-577:⚠️ Potential issue | 🟠 MajorCreate the summary artifact before switching the payload path.
Both builders rewrite the payload to
.../summarize/<stem>.txt, but neither code path generates that file first. On a first-run document or after cache cleanup, the subsequent hash/read will fail because the summary artifact does not exist yet.Also applies to: 666-667, 815-821
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py` around lines 572 - 577, The code changes extract_path to the summarize path before the summary file is created, causing subsequent reads/hashes to fail; before assigning extract_path to Path(.../"summarize"/(p.stem+".txt")) ensure the summary artifact is created/written first (e.g., call the existing summary-writing routine or write the summarized content to that target path using the current payload) and only then set profile_manager.chunk_size and replace extract_path; apply the same fix to the other similar blocks referenced in the diff (the other summarize branches around the other occurrences).workers/executor/executors/legacy_executor.py-694-704 (1)
694-704:⚠️ Potential issue | 🟠 MajorUse each output's adapter tuple when deduplicating and indexing.
This code always reads
vector-db,embedding, andx2text_adapterfromtool_settings, even though those fields can vary per output. Mixed-output pipelines will currently collapse distinct adapter combinations onto oneparam_keyand may build the childindex_ctxwith the wrong backend IDs.Suggested fix
- vector_db = tool_settings.get("vector-db", "") - embedding = tool_settings.get("embedding", "") - x2text = tool_settings.get("x2text_adapter", "") + vector_db = output.get("vector-db", tool_settings.get("vector-db", "")) + embedding = output.get("embedding", tool_settings.get("embedding", "")) + x2text = output.get( + "x2text_adapter", tool_settings.get("x2text_adapter", "") + )Also applies to: 725-738
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/executor/executors/legacy_executor.py` around lines 694 - 704, The deduplication/indexing param_key is built using the global tool_settings instead of each output's own adapter settings, causing different outputs to be collapsed; update the code that constructs param_key (and the similar block later around the creation of index_ctx) to read vector-db, embedding, and x2text_adapter from the specific output's settings (e.g. output.get("tool_settings", {}) or the per-output metadata) rather than from the outer tool_settings, so param_key reflects chunk_size, chunk_overlap and the output-specific adapter tuple and the child index_ctx is created with the correct backend IDs.workers/tests/test_sanity_phase4.py-168-351 (1)
168-351:⚠️ Potential issue | 🟠 MajorThese helpers mirror the payload contract instead of exercising it.
_make_ide_prompt()and the_ide_*_ctx()factories hardcode the same shape thatprompt_studio_helper.pyis supposed to produce, andTestIDEPayloadKeyCompatibilityonly re-asserts those local literals. If the real helper drifts, this module can still go green because both sides of the “contract” live here. Build the contexts through the production helper path or a shared factory/DTO so these tests actually fail on contract regressions.Also applies to: 835-899
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/tests/test_sanity_phase4.py` around lines 168 - 351, The tests currently replicate the exact payload shape instead of using the real builder, so update the factories (_make_ide_prompt, _ide_extract_ctx, _ide_index_ctx, _ide_answer_prompt_ctx, _ide_single_pass_ctx) to build their payloads via the production prompt studio helper (or a shared DTO/factory) rather than hardcoding literals; replace the direct dict constructions with calls to the real helper functions in prompt_studio_helper.py (or a new shared factory used by both prod and tests) and pass overrides through that helper so tests fail if the real contract changes.workers/tests/test_sanity_phase4.py-810-829 (1)
810-829:⚠️ Potential issue | 🟠 MajorThis test never proves the IDE-specific variable-replacement path ran.
result.successonly shows the request completed. Also, ifis_variables_present()gates replacement, returningFalsehere bypasses the very branch the docstring says this test covers. Drive the IDE path and assert on the mocked variable-replacement call/kwargs that consume the IDE flag.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/tests/test_sanity_phase4.py` around lines 810 - 829, The test currently only checks result.success and sets var_service.is_variables_present.return_value = False which prevents the variable-replacement branch; change the mock so the VariableReplacementService will run the replacement (set is_variables_present.return_value = True or otherwise allow the replacement to be invoked), then after calling _run_task(eager_app, ctx.to_dict()) assert that the variable-replacement method on var_service (e.g., replace_variables / perform_replacement) was called and that its call/kwargs include is_ide=True, using the existing mocks (var_service, _ide_answer_prompt_ctx, _run_task, ExecutionResult) to prove the IDE-specific path executed.workers/tests/test_sanity_phase4.py-797-805 (1)
797-805:⚠️ Potential issue | 🟠 MajorAssert the no-write behavior you describe.
This test says IDE mode skips metadata writes, but there is no assertion on
mock_fs. As written, it still passes if_update_exec_metadatacallsdump_json()in IDE mode.✅ Add the missing assertion
result_dict = _run_task(eager_app, ctx.to_dict()) result = ExecutionResult.from_dict(result_dict) assert result.success is True - - # For IDE source, _update_exec_metadata should NOT write - # (it only writes for execution_source="tool") - # This is verified by the fact that no dump_json was called - # on the fs mock. In IDE mode, whisper_hash metadata is skipped. + mock_fs.dump_json.assert_not_called()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/tests/test_sanity_phase4.py` around lines 797 - 805, The test lacks an assertion that mock_fs.dump_json was not called for IDE execution_source; update the test_sanity_phase4.py test (the block invoking _run_task(eager_app, ctx.to_dict()) and creating result via ExecutionResult.from_dict) to assert that the filesystem mock's dump_json (mock_fs.dump_json or the specific mock variable used in the test) was not called—i.e., add an assertion like assert mock_fs.dump_json.call_count == 0 or assert not mock_fs.dump_json.called after verifying result.success, so _update_exec_metadata's no-write behavior in IDE mode is actually enforced.
| from unstract.sdk1.file_storage import FileStorage, FileStorageProvider | ||
| from unstract.sdk1.platform import PlatformHelper | ||
| from unstract.sdk1.tool.base import BaseTool | ||
| from unstract.sdk1.tool.stream import StreamMixin |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Verify StreamMixin interface and callers of generate_index_key
echo "=== 1. Check StreamMixin class definition ==="
ast-grep --pattern $'class StreamMixin$_:
$$$
'
echo ""
echo "=== 2. Check PlatformHelper.get_adapter_config signature and usage of first param ==="
ast-grep --pattern $'def get_adapter_config($_, $_):
$$$
'
echo ""
echo "=== 3. Find all callers of generate_index_key ==="
rg -n -B2 -A5 'generate_index_key\(' --type py
echo ""
echo "=== 4. Check if BaseTool inherits from or includes StreamMixin ==="
rg -n 'class BaseTool' --type py -A10Repository: Zipstack/unstract
Length of output: 15172
🏁 Script executed:
head -25 unstract/sdk1/src/unstract/sdk1/utils/indexing.pyRepository: Zipstack/unstract
Length of output: 846
🏁 Script executed:
cat unstract/sdk1/src/unstract/sdk1/utils/indexing.pyRepository: Zipstack/unstract
Length of output: 2355
Critical breaking change: tool parameter is now required but no callers pass it.
The tool: StreamMixin parameter (line 18) is required without a default value, and the function uses it in three places (lines 50-52). However, all ~15 existing callers in the codebase do not pass this parameter, which will cause TypeError: missing 1 required positional argument: 'tool' at runtime.
Either:
- All callers must be updated to pass a
StreamMixin-compatible object (e.g., a tool instance), or - The parameter should have a default value if it's optional.
Note: The type narrowing from BaseTool to StreamMixin itself is not an issue since BaseTool already extends StreamMixin.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@unstract/sdk1/src/unstract/sdk1/utils/indexing.py` at line 5, The new
required parameter tool: StreamMixin causes callers to break; change the
function signature to make tool optional (tool: Optional[StreamMixin] = None)
and update all uses where tool is invoked (the three places that reference tool
around lines 50–52) to guard against None (e.g., if tool is not None:
tool.method(...) or use a local no-op fallback) so existing callers work without
passing a tool; ensure imports include Optional and keep the StreamMixin type
for clarity.
| index_metrics = self._run_pipeline_index( | ||
| context=context, | ||
| index_template=index_template, | ||
| answer_params=answer_params, | ||
| extracted_text=extracted_text, | ||
| ) |
There was a problem hiding this comment.
Abort structure_pipeline when the index step fails.
_run_pipeline_index() currently degrades a failed _handle_index() to a warning, and _handle_structure_pipeline() only gets a metrics dict back. The pipeline then continues into _handle_answer_prompt() against an index that was never created, which hides the real failure behind secondary prompt errors or empty output.
Also applies to: 740-746
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/executor/executors/legacy_executor.py` around lines 493 - 498, The
index step failure is being downgraded to a warning in
_run_pipeline_index/_handle_index, causing _handle_structure_pipeline to proceed
with only a metrics dict and then call _handle_answer_prompt against a
non-existent index; update _run_pipeline_index (and underlying _handle_index) to
return a clear failure signal (raise a specific exception or return None/False)
and modify _handle_structure_pipeline to check that signal and abort/raise
immediately instead of continuing to _handle_answer_prompt; make the same change
for the other pipeline call site (the block that mirrors this logic around the
second call) so any index creation failure short-circuits the pipeline and
surfaces the original error.
| all_retriever = vector_store_index.as_retriever( | ||
| similarity_top_k=1000, # Get all nodes | ||
| filters=MetadataFilters( | ||
| filters=[ | ||
| ExactMatchFilter(key="doc_id", value=self.doc_id), | ||
| ], | ||
| ), | ||
| ) | ||
|
|
||
| # Retrieve all nodes to build keyword index | ||
| all_nodes = all_retriever.retrieve(" ") |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's see the full context of the file
wc -l workers/executor/executors/retrievers/keyword_table.pyRepository: Zipstack/unstract
Length of output: 1885
🏁 Script executed:
# Read the relevant section with context
cat -n workers/executor/executors/retrievers/keyword_table.py | head -60Repository: Zipstack/unstract
Length of output: 1885
🏁 Script executed:
# Search for other patterns of node retrieval or docstore usage in the codebase
rg "docstore|doc_store" --type py -A 3 -B 3Repository: Zipstack/unstract
Length of output: 1885
🏁 Script executed:
# Check for imports and understand what vector_store_index is
rg "vector_store_index|VectorStoreIndex" workers/executor/executors/retrievers/ -B 2 -A 2Repository: Zipstack/unstract
Length of output: 1885
🌐 Web query:
LlamaIndex VectorStoreIndex as_retriever retrieve with MetadataFilters top-k behavior
💡 Result:
In VectorStoreIndex.as_retriever(...), the “top‑k” you set (via similarity_top_k=) is passed into the retriever and then into the underlying VectorStoreQuery together with your MetadataFilters (filters=). The vector store is asked to return up to similarity_top_k results subject to those filters. [1], [2]
What that means in practice
-
Filters constrain the candidate set; top‑k is then taken within that constrained set.
For example,SimpleVectorStoreexplicitly prefilters node IDs by metadata (build_metadata_filter_fn(...)) and only then computes similarities and selectsget_top_k_*withsimilarity_top_k=query.similarity_top_k. So top‑k is effectively “top‑k among the filtered nodes.” If fewer than k nodes match the filter, you’ll get fewer than k results. [3] -
Some vector stores over-fetch internally (especially for MMR / ANN) but still target returning k final results.
E.g., the Chroma integration calculates aprefetch_k(at leastsimilarity_top_k, often larger) to get enough candidates for MMR-style selection. This affects internal candidate retrieval, not the meaning ofsimilarity_top_kas the requested result count. [4]
Why you might see “less than k” even if your index is large
- Your metadata filter matches < k nodes (common / expected). [3]
- Your backing vector DB’s filtered-ANN implementation may need overfetch to reliably find k filtered neighbors; in those cases, look for a store-specific “prefetch/overfetch/num_candidates” knob (like Chroma’s
prefetch_k) or increase the relevant vector-store kwargs. [4]
Sources:
[1] Vector retriever API / retriever construction showing filters + similarity_top_k wiring (LlamaIndex docs) (docs.llamaindex.ai)
[2] VectorIndexRetriever._build_vector_store_query(...) includes both similarity_top_k and filters (LlamaIndex docs) (docs.llamaindex.ai)
[3] SimpleVectorStore.query(...) prefilters by metadata, then applies similarity_top_k in get_top_k_* (LlamaIndex docs) (docs.llamaindex.ai)
[4] Chroma vector store uses prefetch_k computed from similarity_top_k (LlamaIndex docs) (docs.llamaindex.ai)
Citations:
- 1: https://docs.llamaindex.ai/en/stable/api_reference/retrievers/vector/
- 2: https://docs.llamaindex.ai/en/stable/api_reference/retrievers/vector/
- 3: https://docs.llamaindex.ai/en/stable/api_reference/storage/vector_store/simple/
- 4: https://docs.llamaindex.ai/en/stable/api_reference/storage/vector_store/chroma/
🌐 Web query:
LlamaIndex load all nodes by doc_id docstore API
💡 Result:
To fetch all nodes for a given document id in LlamaIndex, use the docstore’s ref doc tracking:
# doc_id == ref_doc_id
ref_doc_id = "doc_id_0"
ref_info = index.docstore.get_ref_doc_info(ref_doc_id)
if ref_info is None:
nodes = []
else:
nodes = index.docstore.get_nodes(ref_info.node_ids)get_ref_doc_info(ref_doc_id)returns aRefDocInfo(which includes thenode_idscreated from that document). [1]get_nodes(node_ids)loads the fullBaseNodeobjects from the docstore. [1]- You can also inspect all tracked documents via
index.ref_doc_info(mapsdoc_id -> RefDocInfo(node_ids=...)). [2] - Or from the docstore directly:
get_all_ref_doc_info()returnsref_doc_id -> RefDocInfofor all ingested documents. [1]
Sources:
[1] LlamaIndex docstore API reference (BaseDocumentStore: get_ref_doc_info, get_nodes, get_all_ref_doc_info) (docs.llamaindex.ai)
[2] LlamaIndex “Document Tracking” showing index.ref_doc_info contains node_ids per doc_id (llamaindexxx.readthedocs.io)
Citations:
- 1: https://docs.llamaindex.ai/en/stable/api_reference/storage/docstore/
- 2: https://llamaindexxx.readthedocs.io/en/latest/module_guides/indexing/document_management.html
Load all document nodes via docstore instead of vector retrieval.
as_retriever(...).retrieve(" ") performs top-k similarity selection on a whitespace query, which does not guarantee all nodes in the document are returned. Since the keyword index must cover all document nodes to be effective, use the docstore API instead:
ref_info = index.docstore.get_ref_doc_info(self.doc_id)
if ref_info:
all_nodes = index.docstore.get_nodes(ref_info.node_ids)This ensures the keyword index is built from the complete set of document nodes regardless of similarity scoring.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/executor/executors/retrievers/keyword_table.py` around lines 30 - 40,
Replace the current vector-based retrieval (vector_store_index.as_retriever(...)
and all_retriever.retrieve(" ")) with docstore-based loading so you get every
node for the document; call index.docstore.get_ref_doc_info(self.doc_id) and, if
it returns ref_info, load the complete node list via
index.docstore.get_nodes(ref_info.node_ids) and assign that to all_nodes so the
keyword index covers all document nodes.
| def fetch_dynamic_variable_value(url: str, data: str) -> Any: | ||
| """Fetch dynamic variable value from an external URL. | ||
|
|
||
| Ported from prompt-service make_http_request — simplified to direct | ||
| requests.post since we don't need Flask error classes. | ||
| """ | ||
| headers = {"Content-Type": "text/plain"} | ||
| try: | ||
| response = pyrequests.post(url, data=data, headers=headers, timeout=30) | ||
| response.raise_for_status() | ||
| if response.headers.get("content-type") == "application/json": | ||
| return response.json() | ||
| return response.text | ||
| except RequestException as e: | ||
| logger.error("HTTP request error fetching dynamic variable: %s", e) | ||
| status_code = None | ||
| if getattr(e, "response", None) is not None: | ||
| status_code = getattr(e.response, "status_code", None) | ||
| raise LegacyExecutorError( | ||
| message=f"HTTP POST to {url} failed: {e!s}", | ||
| code=status_code or 500, | ||
| ) from e |
There was a problem hiding this comment.
Harden dynamic-variable HTTP calls against SSRF.
The target URL comes straight from the prompt template and is POSTed without any allowlist/public-host validation or redirect blocking. That lets a crafted template drive the worker toward internal network targets. Reuse the webhook URL safety check here via a shared helper and send the request with redirects disabled.
🧰 Tools
🪛 Ruff (0.15.5)
[warning] 174-174: Consider moving this statement to an else block
(TRY300)
[warning] 176-176: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/executor/executors/variable_replacement.py` around lines 162 - 183,
The fetch_dynamic_variable_value function must validate the target URL against
the shared webhook/webhook-safety helper before calling pyrequests.post and must
disable redirects; call the existing URL-safety helper (e.g.,
is_safe_webhook_url or validate_webhook_url) at the top of
fetch_dynamic_variable_value and raise LegacyExecutorError if the URL is
disallowed, then invoke pyrequests.post(..., allow_redirects=False) so redirects
are blocked; add any necessary import for the shared helper and keep the
existing error handling around pyrequests.post and LegacyExecutorError intact.
Auto-fixed 48 lint errors across 56 files: import ordering, block statements, unused variable prefixing, and formatting issues. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
frontend/src/components/custom-tools/prompt-card/PromptOutput.jsx (1)
433-446:⚠️ Potential issue | 🟡 MinorScope progress messages per profile before rendering them here.
progressMsgis computed once per prompt card infrontend/src/components/custom-tools/prompt-card/PromptCard.jsx:71-93without anyprofile_idfilter, but this branch passes that same object into every profile-specificDisplayPromptResult. If two LLM profiles are running at the same time, multiple loading panes can show the latest message from the wrong profile. Please key the progress state by{promptId, profileId}or avoid rendering the shared message in the multi-profile path.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/components/custom-tools/prompt-card/PromptOutput.jsx` around lines 433 - 446, The shared progressMsg is computed per prompt card and passed into every DisplayPromptResult, causing cross-profile progress updates; update the progress state to be keyed by both promptId and profileId (or derive a profile-scoped message) and pass the profile-scoped message into DisplayPromptResult instead of the global progressMsg; locate the progress computation in PromptCard.jsx (lines computing progressMsg) and change its state shape to use a compound key like {promptId, profileId} or maintain a per-profile map, then update the prop passed from PromptOutput.jsx (where DisplayPromptResult is rendered) to use the profile-scoped value so each profile's loading pane shows only its own messages.frontend/src/components/custom-tools/prompt-card/PromptCard.jsx (1)
71-93:⚠️ Potential issue | 🟠 MajorMissing dependencies in useEffect may cause stale closure bugs.
The effect references
promptDetailsState,promptKey, anddetailsbut only[messages]is in the dependency array. When any of these values change, the effect won't re-run and will filter messages using outdated values, potentially displaying progress for the wrong prompt or tool.🐛 Proposed fix to add missing dependencies
- }, [messages]); + }, [messages, promptDetailsState?.prompt_id, promptKey, details?.tool_id]);Alternatively, if you need the full objects for other reasons:
- }, [messages]); + }, [messages, promptDetailsState, promptKey, details]);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/components/custom-tools/prompt-card/PromptCard.jsx` around lines 71 - 93, The useEffect that computes msg from messages captures promptDetailsState, promptKey, and details but only lists [messages] as dependencies, which can cause stale closures; update the dependency array for the effect that calls setProgressMsg to include promptDetailsState, promptKey, and details (so it becomes [messages, promptDetailsState, promptKey, details]) or refactor the filter logic into a memoized callback (e.g., using useCallback/useMemo) referenced by the effect to ensure it re-runs whenever those values change.
🧹 Nitpick comments (3)
frontend/src/components/input-output/configure-ds/ConfigureDs.jsx (1)
210-212: Keep analytics non-blocking, but don’t make failures invisible.These empty
catchblocks preserve the user flow, but they also discard the only signal that PostHog instrumentation is broken. A tiny shared helper that logs at debug level or reports to monitoring would keep this non-fatal without swallowing it completely.Also applies to: 293-295, 313-315
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/components/input-output/configure-ds/ConfigureDs.jsx` around lines 210 - 212, Replace the empty catch blocks around PostHog calls by invoking a small shared helper (e.g., logNonBlockingError(error, context)) that logs the error at debug level and optionally reports it to monitoring; implement logNonBlockingError in this module (or a shared utils file) to use the app logger if available or console.debug and to attach a minimal context (source: "PostHog", component: "ConfigureDs", and the operation name), then call logNonBlockingError(err, {operation: "setCustomEvent"}) inside the catch blocks that currently swallow errors for posthog/analytics calls (referencing the posthog.* calls in ConfigureDs.jsx).frontend/src/components/deployments/create-api-deployment-from-prompt-studio/CreateApiDeploymentFromPromptStudio.jsx (1)
323-325: Remove redundant try-catch; PostHog error handling should occur at the hook level.The hook
setPostHogCustomEventalready wrapsposthog.capture()in a try-catch that silently ignores failures (seeusePostHogEvents.js:83-88). The try-catch here is redundant—errors are already swallowed upstream with no logging. If improved observability into PostHog failures is needed, fix it at the source: either add logging to the hook itself or use an existing frontend error-reporting path, then remove this duplicate catch block.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/components/deployments/create-api-deployment-from-prompt-studio/CreateApiDeploymentFromPromptStudio.jsx` around lines 323 - 325, The catch block that swallows errors around the call to setPostHogCustomEvent is redundant because setPostHogCustomEvent already handles and ignores PostHog failures; remove the surrounding try-catch in CreateApiDeploymentFromPromptStudio (the block catching _err) and rely on the hook's internal handling, or if you want observability instead, add logging/error-reporting inside usePostHogEvents (around posthog.capture in setPostHogCustomEvent) and then remove this outer catch to avoid duplicate suppression of errors.frontend/src/components/custom-tools/prompt-card/PromptCard.jsx (1)
262-264: Consider logging silently swallowed exceptions for debuggability.The intent to not interrupt the main flow for telemetry failures is valid, and the
_errconvention correctly signals the unused parameter. However, completely swallowing exceptions can make debugging difficult if PostHog integration starts failing silently.💡 Optional: Add debug-level logging
} catch (_err) { - // If an error occurs while setting custom posthog event, ignore it and continue + // PostHog event failures are non-critical; log for debugging but don't interrupt flow + console.debug("PostHog event failed:", _err); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/components/custom-tools/prompt-card/PromptCard.jsx` around lines 262 - 264, The catch block in the PromptCard component is silently swallowing PostHog errors (catch (_err) { ... }), which makes failures hard to debug; update that catch to log the error at a debug/trace level (e.g., console.debug or the app logger) including context like "PromptCard: posthog event failed" and the caught error (_err) so telemetry failures are non-fatal but visible during debugging; keep the handler non-throwing so main flow continues.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@frontend/src/components/deployments/create-api-deployment-from-prompt-studio/CreateApiDeploymentFromPromptStudio.jsx`:
- Around line 98-100: When the early return for missing toolDetails?.tool_id is
hit, clear the tool-specific state so stale values don't persist: call the state
setters (e.g. setToolFunctionName('') and setToolSchema(null) — and any other
related setters like setToolInputs([]) or setSelectedTool(null) if present)
immediately before returning from the component logic in
CreateApiDeploymentFromPromptStudio.jsx so step 2 and the tool-creation path use
a fresh, empty state.
In `@frontend/src/components/input-output/configure-ds/ConfigureDs.jsx`:
- Around line 120-125: The effect watching selectedSourceId currently returns
early when metadata is falsy, leaving formData populated with the previous
source's values; update the useEffect (the effect that references
selectedSourceId, metadata, and setFormData) so that when metadata is
null/undefined you explicitly reset the form state (e.g., call setFormData with
an empty/default object) instead of returning early, otherwise keep the existing
behavior of setting form data from metadata when present.
In `@frontend/src/hooks/usePromptRun.js`:
- Line 18: The hook usePromptOutput currently only exposes
generatePromptOutputKey but the socket consumer in usePromptStudioSocket still
imports and calls updatePromptOutputState for fetch_response and
single_pass_extraction completions; restore and export updatePromptOutputState
from usePromptOutput (preserving its existing signature and behavior) so
usePromptStudioSocket's handlers continue to work, or alternatively update
usePromptStudioSocket to stop calling updatePromptOutputState in the same
PR—make sure updatePromptOutputState (the function name) is present and returned
by usePromptOutput so socket handlers do not receive undefined.
- Around line 54-67: The timeout cleanup can clear a newer run's status because
it only keys by promptId+statusKey; modify the logic so the timeout is tied to
the specific execution (runId) or is cancelable when the socket response
arrives: when scheduling the setTimeout (using SOCKET_TIMEOUT_MS) include/runId
in the generated key (generateApiRunStatusId) or store the returned timer id in
a map keyed by runId in usePromptRunStatusStore, and on the socket event handler
clearTimeout for that runId and remove the stored timer before updating status;
update calls that currently call removePromptStatus(promptId, statusKey) and
setAlertDetails to instead operate only for the matching runId to avoid removing
newer runs.
---
Outside diff comments:
In `@frontend/src/components/custom-tools/prompt-card/PromptCard.jsx`:
- Around line 71-93: The useEffect that computes msg from messages captures
promptDetailsState, promptKey, and details but only lists [messages] as
dependencies, which can cause stale closures; update the dependency array for
the effect that calls setProgressMsg to include promptDetailsState, promptKey,
and details (so it becomes [messages, promptDetailsState, promptKey, details])
or refactor the filter logic into a memoized callback (e.g., using
useCallback/useMemo) referenced by the effect to ensure it re-runs whenever
those values change.
In `@frontend/src/components/custom-tools/prompt-card/PromptOutput.jsx`:
- Around line 433-446: The shared progressMsg is computed per prompt card and
passed into every DisplayPromptResult, causing cross-profile progress updates;
update the progress state to be keyed by both promptId and profileId (or derive
a profile-scoped message) and pass the profile-scoped message into
DisplayPromptResult instead of the global progressMsg; locate the progress
computation in PromptCard.jsx (lines computing progressMsg) and change its state
shape to use a compound key like {promptId, profileId} or maintain a per-profile
map, then update the prop passed from PromptOutput.jsx (where
DisplayPromptResult is rendered) to use the profile-scoped value so each
profile's loading pane shows only its own messages.
---
Nitpick comments:
In `@frontend/src/components/custom-tools/prompt-card/PromptCard.jsx`:
- Around line 262-264: The catch block in the PromptCard component is silently
swallowing PostHog errors (catch (_err) { ... }), which makes failures hard to
debug; update that catch to log the error at a debug/trace level (e.g.,
console.debug or the app logger) including context like "PromptCard: posthog
event failed" and the caught error (_err) so telemetry failures are non-fatal
but visible during debugging; keep the handler non-throwing so main flow
continues.
In
`@frontend/src/components/deployments/create-api-deployment-from-prompt-studio/CreateApiDeploymentFromPromptStudio.jsx`:
- Around line 323-325: The catch block that swallows errors around the call to
setPostHogCustomEvent is redundant because setPostHogCustomEvent already handles
and ignores PostHog failures; remove the surrounding try-catch in
CreateApiDeploymentFromPromptStudio (the block catching _err) and rely on the
hook's internal handling, or if you want observability instead, add
logging/error-reporting inside usePostHogEvents (around posthog.capture in
setPostHogCustomEvent) and then remove this outer catch to avoid duplicate
suppression of errors.
In `@frontend/src/components/input-output/configure-ds/ConfigureDs.jsx`:
- Around line 210-212: Replace the empty catch blocks around PostHog calls by
invoking a small shared helper (e.g., logNonBlockingError(error, context)) that
logs the error at debug level and optionally reports it to monitoring; implement
logNonBlockingError in this module (or a shared utils file) to use the app
logger if available or console.debug and to attach a minimal context (source:
"PostHog", component: "ConfigureDs", and the operation name), then call
logNonBlockingError(err, {operation: "setCustomEvent"}) inside the catch blocks
that currently swallow errors for posthog/analytics calls (referencing the
posthog.* calls in ConfigureDs.jsx).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a2662be5-cfa0-4cca-8f86-68063a8d6d60
📒 Files selected for processing (56)
frontend/src/App.jsxfrontend/src/components/agency/agency/Agency.jsxfrontend/src/components/agency/configure-connector-modal/ConfigureConnectorModal.jsxfrontend/src/components/agency/markdown-renderer/MarkdownRenderer.jsxfrontend/src/components/common/PromptStudioModal.jsxfrontend/src/components/custom-tools/add-llm-profile/AddLlmProfile.jsxfrontend/src/components/custom-tools/combined-output/CombinedOutput.jsxfrontend/src/components/custom-tools/custom-data-settings/CustomDataSettings.jsxfrontend/src/components/custom-tools/document-parser/DocumentParser.jsxfrontend/src/components/custom-tools/header/Header.jsxfrontend/src/components/custom-tools/import-tool/ImportTool.jsxfrontend/src/components/custom-tools/list-of-tools/ListOfTools.jsxfrontend/src/components/custom-tools/manage-llm-profiles/ManageLlmProfiles.jsxfrontend/src/components/custom-tools/notes-card/NotesCard.jsxfrontend/src/components/custom-tools/output-analyzer/OutputAnalyzer.jsxfrontend/src/components/custom-tools/output-analyzer/OutputAnalyzerCard.jsxfrontend/src/components/custom-tools/prompt-card/DisplayPromptResult.jsxfrontend/src/components/custom-tools/prompt-card/OutputForIndex.jsxfrontend/src/components/custom-tools/prompt-card/PromptCard.jsxfrontend/src/components/custom-tools/prompt-card/PromptCardItems.jsxfrontend/src/components/custom-tools/prompt-card/PromptOutput.jsxfrontend/src/components/custom-tools/prompt-card/PromptRun.jsxfrontend/src/components/custom-tools/prompts-reorder/DraggablePrompt.jsxfrontend/src/components/custom-tools/prompts-reorder/PromptsReorder.jsxfrontend/src/components/custom-tools/retrieval-strategy-modal/RetrievalStrategyModal.jsxfrontend/src/components/custom-tools/tool-ide/ToolIde.jsxfrontend/src/components/custom-tools/tools-main/ToolsMain.jsxfrontend/src/components/custom-tools/tools-main/ToolsMainActionBtns.jsxfrontend/src/components/deployments/create-api-deployment-from-prompt-studio/CreateApiDeploymentFromPromptStudio.jsxfrontend/src/components/helpers/auth/RequireAuth.jsfrontend/src/components/helpers/auth/RequireGuest.jsfrontend/src/components/helpers/socket-messages/SocketMessages.jsfrontend/src/components/input-output/add-source/AddSource.jsxfrontend/src/components/input-output/configure-ds/ConfigureDs.jsxfrontend/src/components/input-output/data-source-card/DataSourceCard.jsxfrontend/src/components/input-output/list-of-sources/ListOfSources.jsxfrontend/src/components/input-output/manage-files/ManageFiles.jsxfrontend/src/components/logs-and-notifications/DisplayLogsAndNotifications.jsxfrontend/src/components/metrics-dashboard/RecentActivity.jsxfrontend/src/components/pipelines-or-deployments/file-history-modal/FileHistoryModal.jsxfrontend/src/components/set-org/SetOrg.jsxfrontend/src/components/settings/default-triad/DefaultTriad.jsxfrontend/src/components/settings/invite/InviteEditUser.jsxfrontend/src/components/settings/platform/PlatformSettings.jsxfrontend/src/components/settings/users/Users.jsxfrontend/src/components/tool-settings/tool-settings/ToolSettings.jsxfrontend/src/hooks/usePromptOutput.jsfrontend/src/hooks/usePromptRun.jsfrontend/src/hooks/usePromptStudioSocket.jsfrontend/src/hooks/useRequestUrl.jsfrontend/src/layouts/rjsf-form-layout/CustomObjectFieldTemplate.jsxfrontend/src/store/alert-store.jsfrontend/src/store/prompt-run-queue-store.jsfrontend/src/store/prompt-studio-store.jsfrontend/src/store/retrieval-strategies-store.jsfrontend/src/store/workflow-store.js
✅ Files skipped from review due to trivial changes (23)
- frontend/src/components/common/PromptStudioModal.jsx
- frontend/src/hooks/usePromptOutput.js
- frontend/src/components/logs-and-notifications/DisplayLogsAndNotifications.jsx
- frontend/src/components/input-output/list-of-sources/ListOfSources.jsx
- frontend/src/components/custom-tools/output-analyzer/OutputAnalyzerCard.jsx
- frontend/src/store/prompt-run-queue-store.js
- frontend/src/store/alert-store.js
- frontend/src/components/custom-tools/prompt-card/OutputForIndex.jsx
- frontend/src/components/custom-tools/retrieval-strategy-modal/RetrievalStrategyModal.jsx
- frontend/src/components/input-output/manage-files/ManageFiles.jsx
- frontend/src/components/custom-tools/custom-data-settings/CustomDataSettings.jsx
- frontend/src/components/input-output/add-source/AddSource.jsx
- frontend/src/components/agency/configure-connector-modal/ConfigureConnectorModal.jsx
- frontend/src/components/metrics-dashboard/RecentActivity.jsx
- frontend/src/components/custom-tools/document-parser/DocumentParser.jsx
- frontend/src/components/custom-tools/prompts-reorder/PromptsReorder.jsx
- frontend/src/components/helpers/auth/RequireGuest.js
- frontend/src/components/custom-tools/output-analyzer/OutputAnalyzer.jsx
- frontend/src/layouts/rjsf-form-layout/CustomObjectFieldTemplate.jsx
- frontend/src/components/agency/markdown-renderer/MarkdownRenderer.jsx
- frontend/src/components/custom-tools/notes-card/NotesCard.jsx
- frontend/src/store/prompt-studio-store.js
- frontend/src/store/workflow-store.js
🚧 Files skipped from review as they are similar to previous changes (3)
- frontend/src/components/helpers/socket-messages/SocketMessages.js
- frontend/src/components/custom-tools/tool-ide/ToolIde.jsx
- frontend/src/hooks/usePromptStudioSocket.js
| if (!toolDetails?.tool_id) { | ||
| return; | ||
| } |
There was a problem hiding this comment.
Reset tool-specific state before returning here.
If the modal is reopened for a tool without tool_id, this exits with the previous toolFunctionName and toolSchema still in state. That can show stale settings in step 2 and let Line 379 create the new tool instance with the last tool's function name instead of the current selection.
Proposed fix
if (!toolDetails?.tool_id) {
+ setToolFunctionName(null);
+ setToolSchema(null);
+ setIsSchemaLoading(false);
return;
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@frontend/src/components/deployments/create-api-deployment-from-prompt-studio/CreateApiDeploymentFromPromptStudio.jsx`
around lines 98 - 100, When the early return for missing toolDetails?.tool_id is
hit, clear the tool-specific state so stale values don't persist: call the state
setters (e.g. setToolFunctionName('') and setToolSchema(null) — and any other
related setters like setToolInputs([]) or setSelectedTool(null) if present)
immediately before returning from the component logic in
CreateApiDeploymentFromPromptStudio.jsx so step 2 and the tool-creation path use
a fresh, empty state.
| useEffect(() => { | ||
| if (!metadata) return; | ||
| if (!metadata) { | ||
| return; | ||
| } | ||
| setFormData(metadata); | ||
| }, [selectedSourceId, metadata, setFormData]); |
There was a problem hiding this comment.
Reset formData when the new source has no metadata.
The early return on Line 121 means a selectedSourceId change with no metadata keeps the previous source's values in the form. That lets stale connector/adapter settings be submitted against the newly selected source.
Proposed fix
useEffect(() => {
if (!metadata) {
+ setFormData({});
return;
}
setFormData(metadata);
}, [selectedSourceId, metadata, setFormData]);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@frontend/src/components/input-output/configure-ds/ConfigureDs.jsx` around
lines 120 - 125, The effect watching selectedSourceId currently returns early
when metadata is falsy, leaving formData populated with the previous source's
values; update the useEffect (the effect that references selectedSourceId,
metadata, and setFormData) so that when metadata is null/undefined you
explicitly reset the form state (e.g., call setFormData with an empty/default
object) instead of returning early, otherwise keep the existing behavior of
setting form data from metadata when present.
| const { pushPromptRunApi, freeActiveApi } = usePromptRunQueueStore(); | ||
| const { generatePromptOutputKey, updatePromptOutputState } = | ||
| usePromptOutput(); | ||
| const { generatePromptOutputKey } = usePromptOutput(); |
There was a problem hiding this comment.
Restore updatePromptOutputState until the socket consumer is migrated.
frontend/src/hooks/usePromptStudioSocket.js:26-58 still does const { updatePromptOutputState } = usePromptOutput(); and calls it for fetch_response / single_pass_extraction completions. If this PR makes usePromptOutput() expose only generatePromptOutputKey, that handler will get undefined and throw before results are written or RUNNING status is cleared. Either keep updatePromptOutputState in the hook API or update the socket hook in the same change.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@frontend/src/hooks/usePromptRun.js` at line 18, The hook usePromptOutput
currently only exposes generatePromptOutputKey but the socket consumer in
usePromptStudioSocket still imports and calls updatePromptOutputState for
fetch_response and single_pass_extraction completions; restore and export
updatePromptOutputState from usePromptOutput (preserving its existing signature
and behavior) so usePromptStudioSocket's handlers continue to work, or
alternatively update usePromptStudioSocket to stop calling
updatePromptOutputState in the same PR—make sure updatePromptOutputState (the
function name) is present and returned by usePromptOutput so socket handlers do
not receive undefined.
| // Timeout safety net: clear stale status if socket event never arrives. | ||
| setTimeout(() => { | ||
| const statusKey = generateApiRunStatusId(docId, profileId); | ||
| const current = usePromptRunStatusStore.getState().promptRunStatus; | ||
| if ( | ||
| current?.[promptId]?.[statusKey] === PROMPT_RUN_API_STATUSES.RUNNING | ||
| ) { | ||
| removePromptStatus(promptId, statusKey); | ||
| setAlertDetails({ | ||
| type: "warning", | ||
| content: "Prompt execution timed out. Please try again.", | ||
| }); | ||
| } | ||
| }, SOCKET_TIMEOUT_MS); |
There was a problem hiding this comment.
Key the timeout cleanup to the specific execution.
Line 55 schedules a timer that only checks promptId + statusKey. If the user reruns the same prompt/doc/profile before that 5-minute timer fires, the stale timer can remove the newer run's RUNNING state and show a false timeout. Please tie the timeout to runId (or cancel the timer when the socket result arrives) before clearing status.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@frontend/src/hooks/usePromptRun.js` around lines 54 - 67, The timeout cleanup
can clear a newer run's status because it only keys by promptId+statusKey;
modify the logic so the timeout is tied to the specific execution (runId) or is
cancelable when the socket response arrives: when scheduling the setTimeout
(using SOCKET_TIMEOUT_MS) include/runId in the generated key
(generateApiRunStatusId) or store the returned timer id in a map keyed by runId
in usePromptRunStatusStore, and on the socket event handler clearTimeout for
that runId and remove the stored timer before updating status; update calls that
currently call removePromptStatus(promptId, statusKey) and setAlertDetails to
instead operate only for the matching runId to avoid removing newer runs.
…Workflows Resolves vite build warning about SharePermission.jsx being both dynamically and statically imported across the codebase. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: harini-venkataraman <115449948+harini-venkataraman@users.noreply.github.com>
- Remove unnecessary try-catch around PostHog event calls - Flip negated condition in PromptOutput.handleTable for clarity Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
frontend/src/components/custom-tools/prompt-card/PromptCard.jsx (1)
71-93:⚠️ Potential issue | 🟡 MinorMissing useEffect dependencies and overly broad message matching.
Two concerns here:
The
tool_idmatch (line 79) is very broad—any message for the entire tool will match, potentially causing progress messages from unrelated prompts to appear in this prompt card.The useEffect uses
promptDetailsState?.prompt_id,promptKey, anddetails?.tool_idbut the dependency array only includes[messages]. This can cause stale closure issues where the effect captures outdated values.Suggested fix
- }, [messages]); + }, [messages, promptDetailsState?.prompt_id, promptKey, details?.tool_id]);Please also verify that the
tool_idmatching logic is intentional—consider if it should only match when noprompt_idorprompt_keyis present in the message.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/components/custom-tools/prompt-card/PromptCard.jsx` around lines 71 - 93, The effect in useEffect that finds a matching message uses overly broad tool-level matching and has an incomplete dependency array; update the matching logic in the effect (the .find call) so tool-level matching (item?.component?.tool_id === details?.tool_id) is only considered when the message does not include a prompt_id or prompt_key (i.e., prefer prompt_id or prompt_key matches first, fallback to tool_id only when both prompt identifiers are absent), and expand the dependency array to include messages, promptDetailsState?.prompt_id, promptKey, and details?.tool_id so the effect re-runs when any of those values change; keep the setProgressMsg usage unchanged but ensure you reference the current values inside the effect.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@frontend/src/components/custom-tools/prompt-card/PromptCard.jsx`:
- Around line 257-261: In handleRun, make the PostHog call fire-and-forget: wrap
the setPostHogCustomEvent(...) call in a try/catch (or call it in a non-blocking
promise) so any thrown errors are swallowed/logged locally and do not propagate;
after guarding the analytics call, continue to call validateInputs(...) and
handlePromptRunRequest(...) normally. Ensure you reference the
setPostHogCustomEvent() invocation inside handleRun and do not change the
subsequent validateInputs and handlePromptRunRequest call flow.
---
Outside diff comments:
In `@frontend/src/components/custom-tools/prompt-card/PromptCard.jsx`:
- Around line 71-93: The effect in useEffect that finds a matching message uses
overly broad tool-level matching and has an incomplete dependency array; update
the matching logic in the effect (the .find call) so tool-level matching
(item?.component?.tool_id === details?.tool_id) is only considered when the
message does not include a prompt_id or prompt_key (i.e., prefer prompt_id or
prompt_key matches first, fallback to tool_id only when both prompt identifiers
are absent), and expand the dependency array to include messages,
promptDetailsState?.prompt_id, promptKey, and details?.tool_id so the effect
re-runs when any of those values change; keep the setProgressMsg usage unchanged
but ensure you reference the current values inside the effect.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 29a872fa-c10a-4838-a8bd-6c253b1c4ae3
📒 Files selected for processing (4)
frontend/src/components/custom-tools/prompt-card/PromptCard.jsxfrontend/src/components/custom-tools/prompt-card/PromptOutput.jsxfrontend/src/components/deployments/create-api-deployment-from-prompt-studio/CreateApiDeploymentFromPromptStudio.jsxfrontend/src/components/input-output/configure-ds/ConfigureDs.jsx
🚧 Files skipped from review as they are similar to previous changes (1)
- frontend/src/components/input-output/configure-ds/ConfigureDs.jsx
| const handleRun = (promptRunType, promptId, profileId, documentId) => { | ||
| try { | ||
| setPostHogCustomEvent("ps_prompt_run", { | ||
| info: "Click on 'Run Prompt' button (Multi Pass)", | ||
| }); | ||
| } catch (err) { | ||
| // If an error occurs while setting custom posthog event, ignore it and continue | ||
| } | ||
| setPostHogCustomEvent("ps_prompt_run", { | ||
| info: "Click on 'Run Prompt' button (Multi Pass)", | ||
| }); | ||
|
|
There was a problem hiding this comment.
Analytics error could block prompt execution.
The try/catch was removed from the PostHog event call. If setPostHogCustomEvent throws (network failure, library error, etc.), the exception will bubble up and prevent validateInputs and handlePromptRunRequest from executing. Analytics should be fire-and-forget.
Suggested fix
const handleRun = (promptRunType, promptId, profileId, documentId) => {
+ try {
setPostHogCustomEvent("ps_prompt_run", {
info: "Click on 'Run Prompt' button (Multi Pass)",
});
+ } catch {
+ // Analytics should not block core functionality
+ }
const validateInputs = () => {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const handleRun = (promptRunType, promptId, profileId, documentId) => { | |
| try { | |
| setPostHogCustomEvent("ps_prompt_run", { | |
| info: "Click on 'Run Prompt' button (Multi Pass)", | |
| }); | |
| } catch (err) { | |
| // If an error occurs while setting custom posthog event, ignore it and continue | |
| } | |
| setPostHogCustomEvent("ps_prompt_run", { | |
| info: "Click on 'Run Prompt' button (Multi Pass)", | |
| }); | |
| const handleRun = (promptRunType, promptId, profileId, documentId) => { | |
| try { | |
| setPostHogCustomEvent("ps_prompt_run", { | |
| info: "Click on 'Run Prompt' button (Multi Pass)", | |
| }); | |
| } catch { | |
| // Analytics should not block core functionality | |
| } | |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@frontend/src/components/custom-tools/prompt-card/PromptCard.jsx` around lines
257 - 261, In handleRun, make the PostHog call fire-and-forget: wrap the
setPostHogCustomEvent(...) call in a try/catch (or call it in a non-blocking
promise) so any thrown errors are swallowed/logged locally and do not propagate;
after guarding the analytics call, continue to call validateInputs(...) and
handlePromptRunRequest(...) normally. Ensure you reference the
setPostHogCustomEvent() invocation inside handleRun and do not change the
subsequent validateInputs and handlePromptRunRequest call flow.
Frontend Lint Report (Biome)✅ All checks passed! No linting or formatting issues found. |
Test ResultsSummary
Runner Tests - Full Report
|
|
Greptile SummaryThis PR introduces a major architectural shift for Prompt Studio IDE execution: replacing synchronous, blocking Django HTTP calls with a fire-and-forget Celery dispatch model backed by a new pluggable Key findings from the review:
Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant FE as Frontend (PromptRun.jsx)
participant WS as Socket.IO
participant DJ as Django View (views.py)
participant CB as Callback Worker (tasks.py)
participant EX as Executor Worker (tasks.py)
participant LLM as LLM / VectorDB
FE->>DJ: POST /fetch_response/{tool_id}<br/>(fire-and-forget)
DJ->>DJ: build_fetch_response_payload()<br/>(ORM loads, extract, index)
DJ->>EX: dispatch_with_callback()<br/>celery_executor_legacy queue
DJ-->>FE: HTTP 202 {task_id, run_id, status:"accepted"}
Note over FE: Sets 5-min timeout safety net<br/>Spinner stays on
EX->>LLM: LegacyExecutor._handle_answer_prompt()<br/>RetrievalService → LLM call
LLM-->>EX: Extracted answer
EX-->>CB: Celery link callback<br/>ide_prompt_complete(result_dict, cb_kwargs)<br/>prompt_studio_callback queue
CB->>CB: OutputManagerHelper.handle_prompt_output_update()<br/>(ORM write)
CB->>WS: _emit_websocket_event("prompt_studio_result")
WS-->>FE: Socket.IO event {status:"completed", operation, result}
FE->>FE: usePromptStudioSocket.onResult()<br/>updatePromptOutputState() → clears spinner
Note over EX,CB: On infrastructure failure (ConnectionError etc.)<br/>link_error fires ide_prompt_error instead
EX--xCB: ide_prompt_error(failed_task_id, cb_kwargs)
CB->>WS: _emit_websocket_event("prompt_studio_result")<br/>{status:"failed", error}
WS-->>FE: Socket.IO error event
FE->>FE: handleFailed() → clears spinner + shows error alert
Prompt To Fix All With AIThis is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/views.py
Line: 364-422
Comment:
**Async path is unconditional — feature flag gating is absent**
The PR description explicitly states that all three endpoints (`index_document`, `fetch_response`, `single_pass_extraction`) are gated behind the `async_prompt_execution` feature flag with the old sync path preserved as a fallback. However, the actual implementation contains no feature flag check anywhere in these view methods. All three endpoints now **always** return HTTP 202 and dispatch to Celery workers, making this a hard breaking change for all users regardless of the flag.
The PR description also says:
> "When flag is OFF (default), all 3 endpoints use the old sync path returning HTTP 200. No behavior change for existing users."
This behavior is not implemented. Any deployment of this PR will immediately switch all Prompt Studio execution to the async path. If the `worker-executor-v2` or `worker-prompt-studio-callback` services are not running, every prompt execution will silently hang forever waiting for socket events that never arrive.
The conditional dispatch pattern would look like:
```python
from utils.feature_flags import check_feature_flag # or however flags are checked
if check_feature_flag("async_prompt_execution"):
context, cb_kwargs = PromptStudioHelper.build_index_payload(...)
# ... dispatch with callback, return 202
else:
# legacy sync path
unique_id = PromptStudioHelper.index_document(...)
return Response({"message": "Document indexed successfully."}, status=status.HTTP_200_OK)
```
The same conditional must be applied to `fetch_response` and `single_pass_extraction`. Also note that `usePromptStudioSocket.js` on the frontend is unconditionally mounted without any feature flag guard either, which reinforces that the gating mechanism described in the PR is incomplete.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 322-324
Comment:
**Dead code — `if not tool:` guard is unreachable**
`CustomTool.objects.get(pk=tool_id)` is called two lines earlier (line 312). Django's `.get()` raises `CustomTool.DoesNotExist` if no matching row exists, and returns a truthy model instance on success. The `if not tool:` check therefore can never be true — either the call raises before reaching this line, or `tool` is a valid model object that is always truthy. Additionally, `ProfileManager.get_default_llm_profile(tool)` is called on line 321 *before* the guard, so even if `.get()` somehow returned a falsy value, the guard would be too late.
This check should be moved to *before* the ORM call, or (more idiomatically) handled by catching `CustomTool.DoesNotExist` in the caller.
```suggestion
default_profile = ProfileManager.get_default_llm_profile(tool)
PromptStudioHelper.validate_adapter_status(default_profile)
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 527-532
Comment:
**Dead code — `if not profile_manager:` guard fires after it is already dereferenced**
`validate_adapter_status(profile_manager)` (line 527) and `validate_profile_manager_owner_access(profile_manager)` (line 528) are both called *before* the `if not profile_manager:` guard on line 531. If `profile_manager` is `None`, both validation helpers will dereference it and raise an `AttributeError` before the guard is reached — making the explicit `raise DefaultProfileError()` dead code.
The same pattern is repeated in `build_single_pass_payload` at line 1428. Move the null check to immediately after `profile_manager` is resolved:
```python
profile_manager = prompt.profile_manager
if profile_manager_id:
profile_manager = ProfileManagerHelper.get_profile_manager(
profile_manager_id=profile_manager_id
)
if not profile_manager: # ← guard BEFORE any dereference
raise DefaultProfileError()
monitor_llm, challenge_llm = PromptStudioHelper._resolve_llm_ids(tool)
PromptStudioHelper.validate_adapter_status(profile_manager)
PromptStudioHelper.validate_profile_manager_owner_access(profile_manager)
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/views.py
Line: 401-402
Comment:
**Redundant inline imports of `uuid`**
`uuid` is already imported at module level (line 3 of this file). The inline `import uuid as _uuid` inside the function body is unnecessary and appears in all three view methods (`index_document` at line 401, `fetch_response` at line 482, `single_pass_extraction` at line 573). Use the existing top-level import directly.
```suggestion
executor_task_id = str(uuid.uuid4())
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/backend/worker_celery.py
Line: 70-74
Comment:
**`settings.DB_USER` is not URL-encoded in the result backend URL**
`quote_plus` is correctly applied to `DB_PASSWORD`, but `DB_USER` is interpolated raw. If the database username contains any URL-special characters (e.g. `@`, `:`, `/`), the resulting connection string would be malformed and the Celery result backend would fail to connect. Apply the same `quote_plus` encoding to `settings.DB_USER` for consistency and correctness, just as is done for `settings.DB_PASSWORD`.
How can I resolve this? If you propose a fix, please make it concise.Last reviewed commit: 4200ac1 |
| @@ -380,104 +384,249 @@ def index_document(self, request: HttpRequest, pk: Any = None) -> Response: | |||
| document_id: str = serializer.validated_data.get(ToolStudioPromptKeys.DOCUMENT_ID) | |||
| document: DocumentManager = DocumentManager.objects.get(pk=document_id) | |||
| file_name: str = document.document_name | |||
| # Generate a run_id | |||
| run_id = CommonUtils.generate_uuid() | |||
|
|
|||
| unique_id = PromptStudioHelper.index_document( | |||
| context, cb_kwargs = PromptStudioHelper.build_index_payload( | |||
| tool_id=str(tool.tool_id), | |||
| file_name=file_name, | |||
| org_id=UserSessionUtils.get_organization_id(request), | |||
| user_id=tool.created_by.user_id, | |||
| document_id=document_id, | |||
| run_id=run_id, | |||
| ) | |||
| if unique_id: | |||
| return Response( | |||
| {"message": "Document indexed successfully."}, | |||
| status=status.HTTP_200_OK, | |||
| ) | |||
| else: | |||
| logger.error("Error occured while indexing. Unique ID is not valid.") | |||
| raise IndexingAPIError() | |||
|
|
|||
| dispatcher = PromptStudioHelper._get_dispatcher() | |||
|
|
|||
| # Pre-generate task ID so callbacks can reference it | |||
| import uuid as _uuid | |||
|
|
|||
| executor_task_id = str(_uuid.uuid4()) | |||
| cb_kwargs["executor_task_id"] = executor_task_id | |||
|
|
|||
| task = dispatcher.dispatch_with_callback( | |||
| context, | |||
| on_success=signature( | |||
| "ide_index_complete", | |||
| kwargs={"callback_kwargs": cb_kwargs}, | |||
| queue="prompt_studio_callback", | |||
| ), | |||
| on_error=signature( | |||
| "ide_index_error", | |||
| kwargs={"callback_kwargs": cb_kwargs}, | |||
| queue="prompt_studio_callback", | |||
| ), | |||
| task_id=executor_task_id, | |||
| ) | |||
| return Response( | |||
| {"task_id": task.id, "run_id": run_id, "status": "accepted"}, | |||
| status=status.HTTP_202_ACCEPTED, | |||
There was a problem hiding this comment.
Async path is unconditional — feature flag gating is absent
The PR description explicitly states that all three endpoints (index_document, fetch_response, single_pass_extraction) are gated behind the async_prompt_execution feature flag with the old sync path preserved as a fallback. However, the actual implementation contains no feature flag check anywhere in these view methods. All three endpoints now always return HTTP 202 and dispatch to Celery workers, making this a hard breaking change for all users regardless of the flag.
The PR description also says:
"When flag is OFF (default), all 3 endpoints use the old sync path returning HTTP 200. No behavior change for existing users."
This behavior is not implemented. Any deployment of this PR will immediately switch all Prompt Studio execution to the async path. If the worker-executor-v2 or worker-prompt-studio-callback services are not running, every prompt execution will silently hang forever waiting for socket events that never arrive.
The conditional dispatch pattern would look like:
from utils.feature_flags import check_feature_flag # or however flags are checked
if check_feature_flag("async_prompt_execution"):
context, cb_kwargs = PromptStudioHelper.build_index_payload(...)
# ... dispatch with callback, return 202
else:
# legacy sync path
unique_id = PromptStudioHelper.index_document(...)
return Response({"message": "Document indexed successfully."}, status=status.HTTP_200_OK)The same conditional must be applied to fetch_response and single_pass_extraction. Also note that usePromptStudioSocket.js on the frontend is unconditionally mounted without any feature flag guard either, which reinforces that the gating mechanism described in the PR is incomplete.
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/views.py
Line: 364-422
Comment:
**Async path is unconditional — feature flag gating is absent**
The PR description explicitly states that all three endpoints (`index_document`, `fetch_response`, `single_pass_extraction`) are gated behind the `async_prompt_execution` feature flag with the old sync path preserved as a fallback. However, the actual implementation contains no feature flag check anywhere in these view methods. All three endpoints now **always** return HTTP 202 and dispatch to Celery workers, making this a hard breaking change for all users regardless of the flag.
The PR description also says:
> "When flag is OFF (default), all 3 endpoints use the old sync path returning HTTP 200. No behavior change for existing users."
This behavior is not implemented. Any deployment of this PR will immediately switch all Prompt Studio execution to the async path. If the `worker-executor-v2` or `worker-prompt-studio-callback` services are not running, every prompt execution will silently hang forever waiting for socket events that never arrive.
The conditional dispatch pattern would look like:
```python
from utils.feature_flags import check_feature_flag # or however flags are checked
if check_feature_flag("async_prompt_execution"):
context, cb_kwargs = PromptStudioHelper.build_index_payload(...)
# ... dispatch with callback, return 202
else:
# legacy sync path
unique_id = PromptStudioHelper.index_document(...)
return Response({"message": "Document indexed successfully."}, status=status.HTTP_200_OK)
```
The same conditional must be applied to `fetch_response` and `single_pass_extraction`. Also note that `usePromptStudioSocket.js` on the frontend is unconditionally mounted without any feature flag guard either, which reinforces that the gating mechanism described in the PR is incomplete.
How can I resolve this? If you propose a fix, please make it concise.| default_profile = ProfileManager.get_default_llm_profile(tool) | ||
| if not tool: | ||
| raise ToolNotValid() |
There was a problem hiding this comment.
Dead code — if not tool: guard is unreachable
CustomTool.objects.get(pk=tool_id) is called two lines earlier (line 312). Django's .get() raises CustomTool.DoesNotExist if no matching row exists, and returns a truthy model instance on success. The if not tool: check therefore can never be true — either the call raises before reaching this line, or tool is a valid model object that is always truthy. Additionally, ProfileManager.get_default_llm_profile(tool) is called on line 321 before the guard, so even if .get() somehow returned a falsy value, the guard would be too late.
This check should be moved to before the ORM call, or (more idiomatically) handled by catching CustomTool.DoesNotExist in the caller.
| default_profile = ProfileManager.get_default_llm_profile(tool) | |
| if not tool: | |
| raise ToolNotValid() | |
| default_profile = ProfileManager.get_default_llm_profile(tool) | |
| PromptStudioHelper.validate_adapter_status(default_profile) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 322-324
Comment:
**Dead code — `if not tool:` guard is unreachable**
`CustomTool.objects.get(pk=tool_id)` is called two lines earlier (line 312). Django's `.get()` raises `CustomTool.DoesNotExist` if no matching row exists, and returns a truthy model instance on success. The `if not tool:` check therefore can never be true — either the call raises before reaching this line, or `tool` is a valid model object that is always truthy. Additionally, `ProfileManager.get_default_llm_profile(tool)` is called on line 321 *before* the guard, so even if `.get()` somehow returned a falsy value, the guard would be too late.
This check should be moved to *before* the ORM call, or (more idiomatically) handled by catching `CustomTool.DoesNotExist` in the caller.
```suggestion
default_profile = ProfileManager.get_default_llm_profile(tool)
PromptStudioHelper.validate_adapter_status(default_profile)
```
How can I resolve this? If you propose a fix, please make it concise.|
|
||
| PromptStudioHelper.validate_adapter_status(profile_manager) | ||
| PromptStudioHelper.validate_profile_manager_owner_access(profile_manager) | ||
|
|
||
| if not profile_manager: | ||
| raise DefaultProfileError() |
There was a problem hiding this comment.
Dead code — if not profile_manager: guard fires after it is already dereferenced
validate_adapter_status(profile_manager) (line 527) and validate_profile_manager_owner_access(profile_manager) (line 528) are both called before the if not profile_manager: guard on line 531. If profile_manager is None, both validation helpers will dereference it and raise an AttributeError before the guard is reached — making the explicit raise DefaultProfileError() dead code.
The same pattern is repeated in build_single_pass_payload at line 1428. Move the null check to immediately after profile_manager is resolved:
profile_manager = prompt.profile_manager
if profile_manager_id:
profile_manager = ProfileManagerHelper.get_profile_manager(
profile_manager_id=profile_manager_id
)
if not profile_manager: # ← guard BEFORE any dereference
raise DefaultProfileError()
monitor_llm, challenge_llm = PromptStudioHelper._resolve_llm_ids(tool)
PromptStudioHelper.validate_adapter_status(profile_manager)
PromptStudioHelper.validate_profile_manager_owner_access(profile_manager)Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 527-532
Comment:
**Dead code — `if not profile_manager:` guard fires after it is already dereferenced**
`validate_adapter_status(profile_manager)` (line 527) and `validate_profile_manager_owner_access(profile_manager)` (line 528) are both called *before* the `if not profile_manager:` guard on line 531. If `profile_manager` is `None`, both validation helpers will dereference it and raise an `AttributeError` before the guard is reached — making the explicit `raise DefaultProfileError()` dead code.
The same pattern is repeated in `build_single_pass_payload` at line 1428. Move the null check to immediately after `profile_manager` is resolved:
```python
profile_manager = prompt.profile_manager
if profile_manager_id:
profile_manager = ProfileManagerHelper.get_profile_manager(
profile_manager_id=profile_manager_id
)
if not profile_manager: # ← guard BEFORE any dereference
raise DefaultProfileError()
monitor_llm, challenge_llm = PromptStudioHelper._resolve_llm_ids(tool)
PromptStudioHelper.validate_adapter_status(profile_manager)
PromptStudioHelper.validate_profile_manager_owner_access(profile_manager)
```
How can I resolve this? If you propose a fix, please make it concise.| import uuid as _uuid | ||
|
|
There was a problem hiding this comment.
Redundant inline imports of uuid
uuid is already imported at module level (line 3 of this file). The inline import uuid as _uuid inside the function body is unnecessary and appears in all three view methods (index_document at line 401, fetch_response at line 482, single_pass_extraction at line 573). Use the existing top-level import directly.
| import uuid as _uuid | |
| executor_task_id = str(uuid.uuid4()) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/views.py
Line: 401-402
Comment:
**Redundant inline imports of `uuid`**
`uuid` is already imported at module level (line 3 of this file). The inline `import uuid as _uuid` inside the function body is unnecessary and appears in all three view methods (`index_document` at line 401, `fetch_response` at line 482, `single_pass_extraction` at line 573). Use the existing top-level import directly.
```suggestion
executor_task_id = str(uuid.uuid4())
```
How can I resolve this? If you propose a fix, please make it concise.| result_backend = ( | ||
| f"db+postgresql://{settings.DB_USER}:" | ||
| f"{quote_plus(settings.DB_PASSWORD)}" | ||
| f"@{settings.DB_HOST}:{settings.DB_PORT}/" | ||
| f"{settings.CELERY_BACKEND_DB_NAME}" |
There was a problem hiding this comment.
settings.DB_USER is not URL-encoded in the result backend URL
quote_plus is correctly applied to DB_PASSWORD, but DB_USER is interpolated raw. If the database username contains any URL-special characters (e.g. @, :, /), the resulting connection string would be malformed and the Celery result backend would fail to connect. Apply the same quote_plus encoding to settings.DB_USER for consistency and correctness, just as is done for settings.DB_PASSWORD.
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/backend/worker_celery.py
Line: 70-74
Comment:
**`settings.DB_USER` is not URL-encoded in the result backend URL**
`quote_plus` is correctly applied to `DB_PASSWORD`, but `DB_USER` is interpolated raw. If the database username contains any URL-special characters (e.g. `@`, `:`, `/`), the resulting connection string would be malformed and the Celery result backend would fail to connect. Apply the same `quote_plus` encoding to `settings.DB_USER` for consistency and correctness, just as is done for `settings.DB_PASSWORD`.
How can I resolve this? If you propose a fix, please make it concise.
Greptile SummaryThis PR introduces a pluggable async executor framework that replaces blocking Django HTTP calls to the prompt-service with a fire-and-forget Celery dispatch chain ( Key issues found during review:
Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant FE as Frontend (PromptRun.jsx)
participant DJV as Django View (views.py)
participant PSH as PromptStudioHelper
participant DISP as ExecutionDispatcher
participant RMQ as RabbitMQ
participant EW as Executor Worker (execute_extraction)
participant LE as LegacyExecutor
participant CBW as Callback Worker (prompt_studio_callback)
participant SIO as Socket.IO
participant FEH as Frontend (usePromptStudioSocket)
FE->>DJV: POST /fetch_response/{tool_id}
DJV->>PSH: build_fetch_response_payload() [ORM + extract + index - blocking]
PSH-->>DJV: (ExecutionContext, cb_kwargs)
DJV->>DISP: dispatch_with_callback(context, on_success=ide_prompt_complete, on_error=ide_prompt_error)
DISP->>RMQ: send_task("execute_extraction", queue="celery_executor_legacy")
DJV-->>FE: HTTP 202 {task_id, run_id, status:"accepted"}
RMQ->>EW: execute_extraction(context_dict)
EW->>LE: LegacyExecutor.execute(context)
LE-->>EW: ExecutionResult
EW-->>RMQ: result.to_dict() [Celery link callback triggered]
RMQ->>CBW: ide_prompt_complete(result_dict, callback_kwargs)
CBW->>CBW: OutputManagerHelper.handle_prompt_output_update() [ORM write]
CBW->>SIO: _emit_websocket_event(room=log_events_id, event="prompt_studio_result")
SIO-->>FEH: "prompt_studio_result" {status:"completed", operation, result}
FEH->>FEH: handleCompleted() → updatePromptOutputState() + clearResultStatuses()
Prompt To Fix All With AIThis is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/views.py
Line: 364-595
Comment:
**Missing feature flag gate on async endpoints**
The PR description states that all three IDE endpoints (`index_document`, `fetch_response`, `single_pass_extraction`) are gated behind the `async_prompt_execution` Flipt feature flag, with the old synchronous path preserved as a fallback when the flag is `OFF`. However, none of the three view methods contain any feature flag check — they unconditionally invoke the async/Celery path and return HTTP 202.
This means the breaking architectural change (fire-and-forget + Socket.IO result delivery) is deployed for **all users** regardless of the feature flag, directly contradicting the safe-rollout strategy described in the PR. When `async_prompt_execution=false`, users would still receive HTTP 202 with no result, because the old synchronous code path is never reached.
The sync fallback (e.g. delegating to the old `run_index_document` / `run_fetch_response` / `run_single_pass_extraction` Celery tasks or the direct helper methods) should be invoked when the flag is off.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/views.py
Line: 597-629
Comment:
**`task_status` lacks task-ownership verification (IDOR risk)**
The endpoint looks up `task_id` directly in the Celery result backend without verifying that the task belongs to the tool identified by `pk`. A user who has legitimate access to any Prompt Studio tool can supply an arbitrary `task_id` from a different tool/user's execution and retrieve that execution's `result` (the full `ExecutionResult` dict, which may contain extracted document data).
For example:
```
GET /prompt-studio/<my_tool_pk>/task-status/<other_users_task_id>
```
The permission check only validates access to `pk` (via `IsOwnerOrSharedUserOrSharedToOrg`), not whether `task_id` was produced by operations on that tool.
Consider either (a) storing a `(tool_id, task_id)` mapping server-side and validating the lookup, or (b) returning only the task's `status` from this endpoint (omitting the full `result` payload, since the real result is already delivered via Socket.IO).
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/views.py
Line: 401-403
Comment:
**Redundant `import uuid as _uuid` inside method bodies**
`uuid` is already imported at the module level (line 2). The three identical inner imports (`import uuid as _uuid` in `index_document`, `fetch_response`, and `single_pass_extraction`) are redundant. Simply use the already-imported `uuid.uuid4()`.
```suggestion
executor_task_id = str(uuid.uuid4())
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 520-532
Comment:
**Null guard after the variable is already dereferenced**
`validate_adapter_status(profile_manager)` and `validate_profile_manager_owner_access(profile_manager)` are both called **before** the `if not profile_manager` guard. If `profile_manager` is `None` (e.g. when `prompt.profile_manager` is unset and no `profile_manager_id` is passed), those helper calls will raise an `AttributeError` inside them, not the intended `DefaultProfileError`. The guard at line 531–532 is effectively dead code for the `None` case.
The null check should be moved to immediately after `profile_manager` is resolved:
```python
profile_manager = prompt.profile_manager
if profile_manager_id:
profile_manager = ProfileManagerHelper.get_profile_manager(
profile_manager_id=profile_manager_id
)
if not profile_manager:
raise DefaultProfileError()
# Only then call validators
PromptStudioHelper.validate_adapter_status(profile_manager)
PromptStudioHelper.validate_profile_manager_owner_access(profile_manager)
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 733-747
Comment:
**Null guard on `default_profile` comes after it is already used**
`default_profile.chunk_size = 0` mutates the object **before** the `if not default_profile: raise DefaultProfileError()` check. If `ProfileManager.get_default_llm_profile(tool)` returns `None`, the assignment at line 744 would raise `AttributeError` rather than the intended `DefaultProfileError`. The guard is dead code for the `None` case.
Move the null check to immediately after `default_profile` is assigned (before the validators and the `chunk_size` assignment):
```python
default_profile = ProfileManager.get_default_llm_profile(tool)
if not default_profile:
raise DefaultProfileError()
PromptStudioHelper.validate_adapter_status(default_profile)
PromptStudioHelper.validate_profile_manager_owner_access(default_profile)
default_profile.chunk_size = 0
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/backend/worker_celery.py
Line: 85-92
Comment:
**Configured queue name `"executor"` doesn't match the actual dispatch queue**
`get_worker_celery_app()` registers `task_queues=[Queue("executor")]`, but `ExecutionDispatcher._get_queue()` (in `sdk1/execution/dispatcher.py`) constructs the actual queue name as `celery_executor_{executor_name}` — for the legacy executor this becomes `"celery_executor_legacy"`.
The queue declared on the app (`"executor"`) never matches the queue used by `send_task`, so this `task_queues` setting has no practical effect. While `send_task` with an explicit `queue` parameter bypasses queue routing and the task is delivered correctly, the misconfigured `task_queues` setting means any queue-routing policies (e.g. prefetch limits, fair scheduling) configured on `"executor"` will not apply.
Either align the queue name to `"celery_executor_legacy"` (or the appropriate prefix), or remove the stale `task_queues` declaration from this app's config if it is intentionally unused.
How can I resolve this? If you propose a fix, please make it concise.Last reviewed commit: 4200ac1 |
| @@ -380,104 +384,249 @@ def index_document(self, request: HttpRequest, pk: Any = None) -> Response: | |||
| document_id: str = serializer.validated_data.get(ToolStudioPromptKeys.DOCUMENT_ID) | |||
| document: DocumentManager = DocumentManager.objects.get(pk=document_id) | |||
| file_name: str = document.document_name | |||
| # Generate a run_id | |||
| run_id = CommonUtils.generate_uuid() | |||
|
|
|||
| unique_id = PromptStudioHelper.index_document( | |||
| context, cb_kwargs = PromptStudioHelper.build_index_payload( | |||
| tool_id=str(tool.tool_id), | |||
| file_name=file_name, | |||
| org_id=UserSessionUtils.get_organization_id(request), | |||
| user_id=tool.created_by.user_id, | |||
| document_id=document_id, | |||
| run_id=run_id, | |||
| ) | |||
| if unique_id: | |||
| return Response( | |||
| {"message": "Document indexed successfully."}, | |||
| status=status.HTTP_200_OK, | |||
| ) | |||
| else: | |||
| logger.error("Error occured while indexing. Unique ID is not valid.") | |||
| raise IndexingAPIError() | |||
|
|
|||
| dispatcher = PromptStudioHelper._get_dispatcher() | |||
|
|
|||
| # Pre-generate task ID so callbacks can reference it | |||
| import uuid as _uuid | |||
|
|
|||
| executor_task_id = str(_uuid.uuid4()) | |||
| cb_kwargs["executor_task_id"] = executor_task_id | |||
|
|
|||
| task = dispatcher.dispatch_with_callback( | |||
| context, | |||
| on_success=signature( | |||
| "ide_index_complete", | |||
| kwargs={"callback_kwargs": cb_kwargs}, | |||
| queue="prompt_studio_callback", | |||
| ), | |||
| on_error=signature( | |||
| "ide_index_error", | |||
| kwargs={"callback_kwargs": cb_kwargs}, | |||
| queue="prompt_studio_callback", | |||
| ), | |||
| task_id=executor_task_id, | |||
| ) | |||
| return Response( | |||
| {"task_id": task.id, "run_id": run_id, "status": "accepted"}, | |||
| status=status.HTTP_202_ACCEPTED, | |||
| ) | |||
|
|
|||
| @action(detail=True, methods=["post"]) | |||
| def fetch_response(self, request: HttpRequest, pk: Any = None) -> Response: | |||
| """API Entry point method to fetch response to prompt. | |||
|
|
|||
| Args: | |||
| request (HttpRequest): _description_ | |||
| Builds the full execution payload (ORM work), then fires a | |||
| single executor task with Celery link/link_error callbacks. | |||
|
|
|||
| Raises: | |||
| FilenameMissingError: _description_ | |||
| Args: | |||
| request (HttpRequest) | |||
|
|
|||
| Returns: | |||
| Response | |||
| """ | |||
| custom_tool = self.get_object() | |||
| tool_id: str = str(custom_tool.tool_id) | |||
| document_id: str = request.data.get(ToolStudioPromptKeys.DOCUMENT_ID) | |||
| id: str = request.data.get(ToolStudioPromptKeys.ID) | |||
| prompt_id: str = request.data.get(ToolStudioPromptKeys.ID) | |||
| run_id: str = request.data.get(ToolStudioPromptKeys.RUN_ID) | |||
| profile_manager: str = request.data.get(ToolStudioPromptKeys.PROFILE_MANAGER_ID) | |||
| profile_manager_id: str = request.data.get( | |||
| ToolStudioPromptKeys.PROFILE_MANAGER_ID | |||
| ) | |||
| if not run_id: | |||
| # Generate a run_id | |||
| run_id = CommonUtils.generate_uuid() | |||
|
|
|||
| # Check output count before prompt run for HubSpot notification | |||
| # Filter through tool FK to scope by organization (PromptStudioOutputManager | |||
| # lacks DefaultOrganizationManagerMixin) | |||
| output_count_before = PromptStudioOutputManager.objects.filter( | |||
| tool_id__in=CustomTool.objects.values_list("tool_id", flat=True) | |||
| ).count() | |||
| org_id = UserSessionUtils.get_organization_id(request) | |||
| user_id = custom_tool.created_by.user_id | |||
|
|
|||
| response: dict[str, Any] = PromptStudioHelper.prompt_responder( | |||
| id=id, | |||
| tool_id=tool_id, | |||
| org_id=UserSessionUtils.get_organization_id(request), | |||
| user_id=custom_tool.created_by.user_id, | |||
| # Resolve prompt | |||
| prompt = ToolStudioPrompt.objects.get(pk=prompt_id) | |||
|
|
|||
| # Build file path | |||
| doc_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( | |||
| org_id, | |||
| is_create=False, | |||
| user_id=user_id, | |||
| tool_id=str(custom_tool.tool_id), | |||
| ) | |||
| document: DocumentManager = DocumentManager.objects.get(pk=document_id) | |||
| doc_path = str(Path(doc_path) / document.document_name) | |||
|
|
|||
| context, cb_kwargs = PromptStudioHelper.build_fetch_response_payload( | |||
| tool=custom_tool, | |||
| doc_path=doc_path, | |||
| doc_name=document.document_name, | |||
| prompt=prompt, | |||
| org_id=org_id, | |||
| user_id=user_id, | |||
| document_id=document_id, | |||
| run_id=run_id, | |||
| profile_manager_id=profile_manager, | |||
| profile_manager_id=profile_manager_id, | |||
| ) | |||
|
|
|||
| # Notify HubSpot about first prompt run | |||
| notify_hubspot_event( | |||
| user=request.user, | |||
| event_name="PROMPT_RUN", | |||
| is_first_for_org=output_count_before == 0, | |||
| action_label="prompt run", | |||
| # If document is being indexed, return pending status | |||
| if context is None: | |||
| return Response(cb_kwargs, status=status.HTTP_200_OK) | |||
|
|
|||
| dispatcher = PromptStudioHelper._get_dispatcher() | |||
|
|
|||
| import uuid as _uuid | |||
|
|
|||
| executor_task_id = str(_uuid.uuid4()) | |||
| cb_kwargs["executor_task_id"] = executor_task_id | |||
|
|
|||
| task = dispatcher.dispatch_with_callback( | |||
| context, | |||
| on_success=signature( | |||
| "ide_prompt_complete", | |||
| kwargs={"callback_kwargs": cb_kwargs}, | |||
| queue="prompt_studio_callback", | |||
| ), | |||
| on_error=signature( | |||
| "ide_prompt_error", | |||
| kwargs={"callback_kwargs": cb_kwargs}, | |||
| queue="prompt_studio_callback", | |||
| ), | |||
| task_id=executor_task_id, | |||
| ) | |||
| return Response( | |||
| {"task_id": task.id, "run_id": run_id, "status": "accepted"}, | |||
| status=status.HTTP_202_ACCEPTED, | |||
| ) | |||
|
|
|||
| return Response(response, status=status.HTTP_200_OK) | |||
|
|
|||
| @action(detail=True, methods=["post"]) | |||
| def single_pass_extraction(self, request: HttpRequest, pk: uuid) -> Response: | |||
| """API Entry point method to fetch response to prompt. | |||
| """API Entry point method for single pass extraction. | |||
|
|
|||
| Builds the full execution payload (ORM work), then fires a | |||
| single executor task with Celery link/link_error callbacks. | |||
|
|
|||
| Args: | |||
| request (HttpRequest): _description_ | |||
| pk (Any): Primary key of the CustomTool | |||
| request (HttpRequest) | |||
| pk: Primary key of the CustomTool | |||
|
|
|||
| Returns: | |||
| Response | |||
| """ | |||
| # TODO: Handle fetch_response and single_pass_ | |||
| # extraction using common function | |||
| custom_tool = self.get_object() | |||
| tool_id: str = str(custom_tool.tool_id) | |||
| document_id: str = request.data.get(ToolStudioPromptKeys.DOCUMENT_ID) | |||
| run_id: str = request.data.get(ToolStudioPromptKeys.RUN_ID) | |||
| if not run_id: | |||
| # Generate a run_id | |||
| run_id = CommonUtils.generate_uuid() | |||
| response: dict[str, Any] = PromptStudioHelper.prompt_responder( | |||
| tool_id=tool_id, | |||
| org_id=UserSessionUtils.get_organization_id(request), | |||
| user_id=custom_tool.created_by.user_id, | |||
|
|
|||
| org_id = UserSessionUtils.get_organization_id(request) | |||
| user_id = custom_tool.created_by.user_id | |||
|
|
|||
| # Build file path | |||
| doc_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( | |||
| org_id, | |||
| is_create=False, | |||
| user_id=user_id, | |||
| tool_id=str(custom_tool.tool_id), | |||
| ) | |||
| document: DocumentManager = DocumentManager.objects.get(pk=document_id) | |||
| doc_path = str(Path(doc_path) / document.document_name) | |||
|
|
|||
| # Fetch prompts eligible for single-pass extraction. | |||
| # Mirrors the filtering in _execute_prompts_in_single_pass: | |||
| # only active, non-NOTES, non-TABLE/RECORD prompts. | |||
| prompts = list( | |||
| ToolStudioPrompt.objects.filter(tool_id=custom_tool.tool_id).order_by( | |||
| "sequence_number" | |||
| ) | |||
| ) | |||
| prompts = [ | |||
| p | |||
| for p in prompts | |||
| if p.prompt_type != ToolStudioPromptKeys.NOTES | |||
| and p.active | |||
| and p.enforce_type != ToolStudioPromptKeys.TABLE | |||
| and p.enforce_type != ToolStudioPromptKeys.RECORD | |||
| ] | |||
| if not prompts: | |||
| return Response( | |||
| {"error": "No active prompts found for single pass extraction."}, | |||
| status=status.HTTP_400_BAD_REQUEST, | |||
| ) | |||
|
|
|||
| context, cb_kwargs = PromptStudioHelper.build_single_pass_payload( | |||
| tool=custom_tool, | |||
| doc_path=doc_path, | |||
| doc_name=document.document_name, | |||
| prompts=prompts, | |||
| org_id=org_id, | |||
| document_id=document_id, | |||
| run_id=run_id, | |||
| ) | |||
| return Response(response, status=status.HTTP_200_OK) | |||
|
|
|||
| dispatcher = PromptStudioHelper._get_dispatcher() | |||
|
|
|||
| import uuid as _uuid | |||
|
|
|||
| executor_task_id = str(_uuid.uuid4()) | |||
| cb_kwargs["executor_task_id"] = executor_task_id | |||
|
|
|||
| task = dispatcher.dispatch_with_callback( | |||
| context, | |||
| on_success=signature( | |||
| "ide_prompt_complete", | |||
| kwargs={"callback_kwargs": cb_kwargs}, | |||
| queue="prompt_studio_callback", | |||
| ), | |||
| on_error=signature( | |||
| "ide_prompt_error", | |||
| kwargs={"callback_kwargs": cb_kwargs}, | |||
| queue="prompt_studio_callback", | |||
| ), | |||
| task_id=executor_task_id, | |||
| ) | |||
| return Response( | |||
| {"task_id": task.id, "run_id": run_id, "status": "accepted"}, | |||
| status=status.HTTP_202_ACCEPTED, | |||
| ) | |||
There was a problem hiding this comment.
Missing feature flag gate on async endpoints
The PR description states that all three IDE endpoints (index_document, fetch_response, single_pass_extraction) are gated behind the async_prompt_execution Flipt feature flag, with the old synchronous path preserved as a fallback when the flag is OFF. However, none of the three view methods contain any feature flag check — they unconditionally invoke the async/Celery path and return HTTP 202.
This means the breaking architectural change (fire-and-forget + Socket.IO result delivery) is deployed for all users regardless of the feature flag, directly contradicting the safe-rollout strategy described in the PR. When async_prompt_execution=false, users would still receive HTTP 202 with no result, because the old synchronous code path is never reached.
The sync fallback (e.g. delegating to the old run_index_document / run_fetch_response / run_single_pass_extraction Celery tasks or the direct helper methods) should be invoked when the flag is off.
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/views.py
Line: 364-595
Comment:
**Missing feature flag gate on async endpoints**
The PR description states that all three IDE endpoints (`index_document`, `fetch_response`, `single_pass_extraction`) are gated behind the `async_prompt_execution` Flipt feature flag, with the old synchronous path preserved as a fallback when the flag is `OFF`. However, none of the three view methods contain any feature flag check — they unconditionally invoke the async/Celery path and return HTTP 202.
This means the breaking architectural change (fire-and-forget + Socket.IO result delivery) is deployed for **all users** regardless of the feature flag, directly contradicting the safe-rollout strategy described in the PR. When `async_prompt_execution=false`, users would still receive HTTP 202 with no result, because the old synchronous code path is never reached.
The sync fallback (e.g. delegating to the old `run_index_document` / `run_fetch_response` / `run_single_pass_extraction` Celery tasks or the direct helper methods) should be invoked when the flag is off.
How can I resolve this? If you propose a fix, please make it concise.| @action(detail=True, methods=["get"]) | ||
| def task_status( | ||
| self, request: HttpRequest, pk: Any = None, task_id: str = None | ||
| ) -> Response: | ||
| """Poll the status of an async Prompt Studio task. | ||
|
|
||
| Task IDs now point to executor worker tasks dispatched via the | ||
| worker-v2 Celery app. Both apps share the same PostgreSQL | ||
| result backend, so we use the worker app to look up results. | ||
|
|
||
| Args: | ||
| request (HttpRequest) | ||
| pk: Primary key of the CustomTool (for permission check) | ||
| task_id: Celery task ID returned by the 202 response | ||
|
|
||
| Returns: | ||
| Response with {task_id, status} and optionally result or error | ||
| """ | ||
| from celery.result import AsyncResult | ||
|
|
||
| from backend.worker_celery import get_worker_celery_app | ||
|
|
||
| result = AsyncResult(task_id, app=get_worker_celery_app()) | ||
| if not result.ready(): | ||
| return Response({"task_id": task_id, "status": "processing"}) | ||
| if result.successful(): | ||
| return Response( | ||
| {"task_id": task_id, "status": "completed", "result": result.result} | ||
| ) | ||
| return Response( | ||
| {"task_id": task_id, "status": "failed", "error": str(result.result)}, | ||
| status=status.HTTP_500_INTERNAL_SERVER_ERROR, | ||
| ) |
There was a problem hiding this comment.
task_status lacks task-ownership verification (IDOR risk)
The endpoint looks up task_id directly in the Celery result backend without verifying that the task belongs to the tool identified by pk. A user who has legitimate access to any Prompt Studio tool can supply an arbitrary task_id from a different tool/user's execution and retrieve that execution's result (the full ExecutionResult dict, which may contain extracted document data).
For example:
GET /prompt-studio/<my_tool_pk>/task-status/<other_users_task_id>
The permission check only validates access to pk (via IsOwnerOrSharedUserOrSharedToOrg), not whether task_id was produced by operations on that tool.
Consider either (a) storing a (tool_id, task_id) mapping server-side and validating the lookup, or (b) returning only the task's status from this endpoint (omitting the full result payload, since the real result is already delivered via Socket.IO).
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/views.py
Line: 597-629
Comment:
**`task_status` lacks task-ownership verification (IDOR risk)**
The endpoint looks up `task_id` directly in the Celery result backend without verifying that the task belongs to the tool identified by `pk`. A user who has legitimate access to any Prompt Studio tool can supply an arbitrary `task_id` from a different tool/user's execution and retrieve that execution's `result` (the full `ExecutionResult` dict, which may contain extracted document data).
For example:
```
GET /prompt-studio/<my_tool_pk>/task-status/<other_users_task_id>
```
The permission check only validates access to `pk` (via `IsOwnerOrSharedUserOrSharedToOrg`), not whether `task_id` was produced by operations on that tool.
Consider either (a) storing a `(tool_id, task_id)` mapping server-side and validating the lookup, or (b) returning only the task's `status` from this endpoint (omitting the full `result` payload, since the real result is already delivered via Socket.IO).
How can I resolve this? If you propose a fix, please make it concise.| import uuid as _uuid | ||
|
|
||
| executor_task_id = str(_uuid.uuid4()) |
There was a problem hiding this comment.
Redundant import uuid as _uuid inside method bodies
uuid is already imported at the module level (line 2). The three identical inner imports (import uuid as _uuid in index_document, fetch_response, and single_pass_extraction) are redundant. Simply use the already-imported uuid.uuid4().
| import uuid as _uuid | |
| executor_task_id = str(_uuid.uuid4()) | |
| executor_task_id = str(uuid.uuid4()) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/views.py
Line: 401-403
Comment:
**Redundant `import uuid as _uuid` inside method bodies**
`uuid` is already imported at the module level (line 2). The three identical inner imports (`import uuid as _uuid` in `index_document`, `fetch_response`, and `single_pass_extraction`) are redundant. Simply use the already-imported `uuid.uuid4()`.
```suggestion
executor_task_id = str(uuid.uuid4())
```
How can I resolve this? If you propose a fix, please make it concise.| profile_manager = prompt.profile_manager | ||
| if profile_manager_id: | ||
| profile_manager = ProfileManagerHelper.get_profile_manager( | ||
| profile_manager_id=profile_manager_id | ||
| ) | ||
|
|
||
| monitor_llm, challenge_llm = PromptStudioHelper._resolve_llm_ids(tool) | ||
|
|
||
| PromptStudioHelper.validate_adapter_status(profile_manager) | ||
| PromptStudioHelper.validate_profile_manager_owner_access(profile_manager) | ||
|
|
||
| if not profile_manager: | ||
| raise DefaultProfileError() |
There was a problem hiding this comment.
Null guard after the variable is already dereferenced
validate_adapter_status(profile_manager) and validate_profile_manager_owner_access(profile_manager) are both called before the if not profile_manager guard. If profile_manager is None (e.g. when prompt.profile_manager is unset and no profile_manager_id is passed), those helper calls will raise an AttributeError inside them, not the intended DefaultProfileError. The guard at line 531–532 is effectively dead code for the None case.
The null check should be moved to immediately after profile_manager is resolved:
profile_manager = prompt.profile_manager
if profile_manager_id:
profile_manager = ProfileManagerHelper.get_profile_manager(
profile_manager_id=profile_manager_id
)
if not profile_manager:
raise DefaultProfileError()
# Only then call validators
PromptStudioHelper.validate_adapter_status(profile_manager)
PromptStudioHelper.validate_profile_manager_owner_access(profile_manager)Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 520-532
Comment:
**Null guard after the variable is already dereferenced**
`validate_adapter_status(profile_manager)` and `validate_profile_manager_owner_access(profile_manager)` are both called **before** the `if not profile_manager` guard. If `profile_manager` is `None` (e.g. when `prompt.profile_manager` is unset and no `profile_manager_id` is passed), those helper calls will raise an `AttributeError` inside them, not the intended `DefaultProfileError`. The guard at line 531–532 is effectively dead code for the `None` case.
The null check should be moved to immediately after `profile_manager` is resolved:
```python
profile_manager = prompt.profile_manager
if profile_manager_id:
profile_manager = ProfileManagerHelper.get_profile_manager(
profile_manager_id=profile_manager_id
)
if not profile_manager:
raise DefaultProfileError()
# Only then call validators
PromptStudioHelper.validate_adapter_status(profile_manager)
PromptStudioHelper.validate_profile_manager_owner_access(profile_manager)
```
How can I resolve this? If you propose a fix, please make it concise.| default_profile = ProfileManager.get_default_llm_profile(tool) | ||
|
|
||
| challenge_llm_instance: AdapterInstance | None = tool.challenge_llm | ||
| challenge_llm: str | None = None | ||
| if challenge_llm_instance: | ||
| challenge_llm = str(challenge_llm_instance.id) | ||
| else: | ||
| challenge_llm = str(default_profile.llm.id) | ||
|
|
||
| PromptStudioHelper.validate_adapter_status(default_profile) | ||
| PromptStudioHelper.validate_profile_manager_owner_access(default_profile) | ||
| default_profile.chunk_size = 0 | ||
|
|
||
| if not default_profile: | ||
| raise DefaultProfileError() |
There was a problem hiding this comment.
Null guard on default_profile comes after it is already used
default_profile.chunk_size = 0 mutates the object before the if not default_profile: raise DefaultProfileError() check. If ProfileManager.get_default_llm_profile(tool) returns None, the assignment at line 744 would raise AttributeError rather than the intended DefaultProfileError. The guard is dead code for the None case.
Move the null check to immediately after default_profile is assigned (before the validators and the chunk_size assignment):
default_profile = ProfileManager.get_default_llm_profile(tool)
if not default_profile:
raise DefaultProfileError()
PromptStudioHelper.validate_adapter_status(default_profile)
PromptStudioHelper.validate_profile_manager_owner_access(default_profile)
default_profile.chunk_size = 0Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 733-747
Comment:
**Null guard on `default_profile` comes after it is already used**
`default_profile.chunk_size = 0` mutates the object **before** the `if not default_profile: raise DefaultProfileError()` check. If `ProfileManager.get_default_llm_profile(tool)` returns `None`, the assignment at line 744 would raise `AttributeError` rather than the intended `DefaultProfileError`. The guard is dead code for the `None` case.
Move the null check to immediately after `default_profile` is assigned (before the validators and the `chunk_size` assignment):
```python
default_profile = ProfileManager.get_default_llm_profile(tool)
if not default_profile:
raise DefaultProfileError()
PromptStudioHelper.validate_adapter_status(default_profile)
PromptStudioHelper.validate_profile_manager_owner_access(default_profile)
default_profile.chunk_size = 0
```
How can I resolve this? If you propose a fix, please make it concise.| app.conf.update( | ||
| result_backend=result_backend, | ||
| task_queues=[Queue("executor")], | ||
| task_serializer="json", | ||
| accept_content=["json"], | ||
| result_serializer="json", | ||
| result_extended=True, | ||
| ) |
There was a problem hiding this comment.
Configured queue name "executor" doesn't match the actual dispatch queue
get_worker_celery_app() registers task_queues=[Queue("executor")], but ExecutionDispatcher._get_queue() (in sdk1/execution/dispatcher.py) constructs the actual queue name as celery_executor_{executor_name} — for the legacy executor this becomes "celery_executor_legacy".
The queue declared on the app ("executor") never matches the queue used by send_task, so this task_queues setting has no practical effect. While send_task with an explicit queue parameter bypasses queue routing and the task is delivered correctly, the misconfigured task_queues setting means any queue-routing policies (e.g. prefetch limits, fair scheduling) configured on "executor" will not apply.
Either align the queue name to "celery_executor_legacy" (or the appropriate prefix), or remove the stale task_queues declaration from this app's config if it is intentionally unused.
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/backend/worker_celery.py
Line: 85-92
Comment:
**Configured queue name `"executor"` doesn't match the actual dispatch queue**
`get_worker_celery_app()` registers `task_queues=[Queue("executor")]`, but `ExecutionDispatcher._get_queue()` (in `sdk1/execution/dispatcher.py`) constructs the actual queue name as `celery_executor_{executor_name}` — for the legacy executor this becomes `"celery_executor_legacy"`.
The queue declared on the app (`"executor"`) never matches the queue used by `send_task`, so this `task_queues` setting has no practical effect. While `send_task` with an explicit `queue` parameter bypasses queue routing and the task is delivered correctly, the misconfigured `task_queues` setting means any queue-routing policies (e.g. prefetch limits, fair scheduling) configured on `"executor"` will not apply.
Either align the queue name to `"celery_executor_legacy"` (or the appropriate prefix), or remove the stale `task_queues` declaration from this app's config if it is intentionally unused.
How can I resolve this? If you propose a fix, please make it concise.

What
Introduces a pluggable executor system that replaces Docker-container-based tool execution with Celery worker tasks, and migrates the Prompt Studio IDE to an async execution model using Socket.IO for result delivery. Gated behind the
async_prompt_executionfeature flag for safe rollout.Why
The existing architecture has several limitations:
How
Backend (65 files)
index_document,fetch_response,single_pass_extractionnow return HTTP 202 (accepted) with atask_idinstead of blocking. Gated byasync_prompt_executionfeature flag — old sync path preserved as fallbackbackend/prompt_studio/prompt_studio_core_v2/tasks.py):ide_index_complete,ide_prompt_complete,ide_prompt_erroretc. run onprompt_studio_callbackqueue, perform ORM writes viaOutputManagerHelper, and emitprompt_studio_resultSocket.IO eventsbackend/backend/worker_celery.py): A second Celery app instance that coexists with Django's Celery app, configured to route tasks to executor workersprompt_studio_helper.pyrewrite: RemovedPromptToolHTTP calls entirely. Newbuild_index_payload(),build_fetch_response_payload(),build_single_pass_payload()methods constructExecutionContextobjects with all ORM data pre-loadedbackend/backend/workers/,file_execution_tasks.py,celery_task.py(old in-process workers)Workers (70 files, ~19,500 new lines)
workers/executor/): NewWorkerType.EXECUTORCelery worker withLegacyExecutorhandling all operations:extract,index,answer_prompt,single_pass_extraction,summarize,agentic_extraction,structure_pipelineBaseExecutor→ExecutorRegistry(class-decorator self-registration) →ExecutionOrchestrator→ExecutionDispatcher(Celerysend_task)ExecutorToolShim: Lightweight stand-in forBaseToolthat satisfies SDK1 adapter interfaces without Docker contextworkers/file_processing/structure_tool_task.py): Celery-native replacement for Docker-basedStructureTool.run()with profile overrides, smart table detection, and output file managementSDK1 (22 files)
unstract/sdk1/src/unstract/sdk1/execution/):ExecutionContext,ExecutionResult(serializable DTOs for Celery JSON transport),ExecutionDispatcher(dispatch()+dispatch_with_callback()),BaseExecutor,ExecutorRegistryFrontend (275 files)
usePromptStudioSockethook listens forprompt_studio_resultSocket.IO events.usePromptRunrewritten from polling to fire-and-forget.PromptRun.jsxconditionally renders async or sync path based on feature flagDocker / Infrastructure
worker-executor-v2,worker-prompt-studio-callback,worker-metricsworkers-v2services from opt-in (profiles: [workers-v2]) to defaultArchitecture Change
Can this PR break any existing features? If yes, please list possible items. If no, please explain why.
Yes, potential breaking changes — mitigated by feature flag:
Prompt Studio IDE async path — gated by
async_prompt_executionfeature flag. When flag is OFF (default), all 3 endpoints (index_document,fetch_response,single_pass_extraction) use the old sync path returning HTTP 200. No behavior change for existing users.Review Guidelines
This PR touches 441 files across backend, frontend, workers, and SDK1. Below is a structured review path to navigate it efficiently.
Code Structure Overview
Recommended Review Order
Review in dependency order — each layer builds on the previous:
execution/context.py,result.py,dispatcher.py,registry.pyto_dict()/from_dict()round-trips correct? Is theOperationenum complete? Queue naming (celery_executor_{name}).executor/tasks.py,executor/worker.pyexecute_extraction: retry policy, error handling, log correlation.executors/legacy_executor.py(focus on_OPERATION_MAP+execute())answer_prompt.py,index.py,retrieval.pyexecutor_paramsmatch whatbuild_*_payload()sends? Lazy import pattern (_get_prompt_deps(),_get_indexing_deps()).views.pylines 351–583dispatch_with_callbackusage with correct callback task names and queue.prompt_studio_helper.py(build_index_payload,build_fetch_response_payload,build_single_pass_payload)executor_params? Key compatibility with executor handlers.tasks.py(callback tasks)ide_prompt_complete: ORM writes viaOutputManagerHelper. Socket.IO emission shape. Error callback cleanup. State store setup/teardown.usePromptRun.js,usePromptStudioSocket.js,PromptRun.jsx_emit_result(). Timeout handling. Status cleanup on failure.docker/docker-compose.yamlworker-executor-v2,worker-prompt-studio-callback. Removed old workers. Queue bindings.workers/tests/test_sanity_phase*.pyData Flow (End-to-End)
Known Code Duplication
views.py— 3 view actionsbuild_payload → get_dispatcher → dispatch_with_callback → return 202tasks.py— callback taskside_index_completeandide_prompt_completefollow same structure: extract kwargs → setup state → check result → ORM work → emit → cleanuptasks.py— legacy tasksrun_index_document,run_fetch_response,run_single_pass_extractionkept alongside new callback tasksFiles Safe to Skim
workers/tests/— 24 test files, ~10,000 lines. Well-structured but high volume. Focus ontest_sanity_phase2.py(full Celery chain) andtest_sanity_phase4.py(IDE payload compatibility) as representative examples.workers/executor/executors/retrievers/— 7 retriever implementations. All follow the same pattern. Reviewing one (simple.py) covers the pattern.architecture-*.md,phase*.md) — Reference material, not code.Relevant Docs
architecture-executor-system.md,architecture-flow-diagram.md,architecture-sequence-diagrams.mdin repo rootarchitecture-migration-phases.mdrollout-plan.mdRelated Issues or PRs
Dependencies Versions / Env Variables
New env variables:
FLIPT_SERVICE_AVAILABLEfalseNotes on Testing
cd workers && uv run pytest -v— 490+ tests (444 inworkers/tests/+ extras)cd unstract/sdk1 && uv run pytest -v— 146+ testscd backend && python -m pytest prompt_studio/prompt_studio_core_v2/test_tasks.py -vasync_prompt_execution=true), trigger prompt runs in IDE, verify Socket.IO events deliver results via Network → WS → Messages tabScreenshots
N/A (primarily backend/worker architecture change; frontend UX unchanged when feature flag is off)
Checklist
I have read and understood the Contribution Guidelines.