Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions workers/executor/executors/legacy_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,10 +619,12 @@ def _failure(child_result: ExecutionResult) -> ExecutionResult:
)
step = 1

extraction_metrics: dict = {}
try:
# ---- Step 1: Extract ----
if not skip_extraction:
step += 1
extraction_start = time.monotonic()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clock-source consistency: extraction times the step with time.monotonic(), while the sibling indexing metric uses wall-clock datetime.datetime.now() (L1029). time.monotonic() is the correct choice for duration measurement (immune to NTP / system-clock adjustments), so this code is right — but the two producers of "time_taken(s)" now use different clocks. Worth a follow-up to align the indexing path onto monotonic() too.

extract_ctx = ExecutionContext(
executor_name=context.executor_name,
operation=Operation.EXTRACT.value,
Expand All @@ -640,6 +642,9 @@ def _failure(child_result: ExecutionResult) -> ExecutionResult:
return _failure(extract_result)
_absorb(extract_result)
extracted_text = extract_result.data.get(IKeys.EXTRACTED_TEXT, "")
extraction_metrics = {
"extraction": {"time_taken(s)": time.monotonic() - extraction_start}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maintainability (distinct from the key-collision note on L647): this hardcodes the literal "extraction", but PromptServiceConstants.EXTRACTION = "extraction" already exists and is imported as PSKeys in this file — use PSKeys.EXTRACTION so the metric key and the constant can't drift. Separately, "time_taken(s)" is duplicated here and at L1031 (index_metrics); consider promoting it to a shared constant.

Minor readability: for parity with the indexing path (elapsed = ...; index_metrics[output_name] = {"indexing": {"time_taken(s)": elapsed}}, L1029-1031), capture the duration in a named local first:

extraction_time = time.monotonic() - extraction_start
extraction_metrics = {PSKeys.EXTRACTION: {"time_taken(s)": extraction_time}}

}
Comment on lines +645 to +647

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid top-level metrics key collision with user prompt names

"extraction" is currently injected as a normal top-level metrics key (Line [645]), but top-level metrics keys are also used for output/prompt names. If a user defines an output named "extraction", the merge at Line [804] will silently co-mingle unrelated metrics under the same key.

Use a reserved namespace for pipeline-level metrics (e.g., metrics["_pipeline"]["extraction"]) or enforce/reserve "extraction" as a disallowed output name before merge.

Also applies to: 804-811

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@workers/executor/executors/legacy_executor.py` around lines 645 - 647, The
current extraction_metrics dict uses a top-level key "extraction" which can
collide with user-defined output/prompt names when merged later; update this to
use a reserved pipeline namespace (e.g., set extraction_metrics = {"_pipeline":
{"extraction": {"time_taken(s)": ...}}}) and then merge into the main metrics
map so pipeline-level metrics live under metrics["_pipeline"]; adjust the merge
logic where metrics and extraction_metrics are combined (the existing merge
around the metrics variable at lines ~804-811) to preserve the "_pipeline"
namespace, or alternatively add a pre-merge check to disallow user outputs named
"extraction" if you prefer the blocking approach. Ensure you update references
to extraction_metrics and the merge operation accordingly.


# ---- Step 2: Summarize (if enabled) ----
if is_summarization:
Expand Down Expand Up @@ -700,6 +705,7 @@ def _failure(child_result: ExecutionResult) -> ExecutionResult:
source_file_name=source_file_name,
extracted_text=extracted_text,
index_metrics=index_metrics,
extraction_metrics=extraction_metrics,
)

output_map = structured_output.get(PSKeys.OUTPUT, {}) or {}
Expand Down Expand Up @@ -787,17 +793,21 @@ def _finalize_pipeline_result(
source_file_name: str,
extracted_text: str,
index_metrics: dict,
extraction_metrics: dict | None = None,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new extraction_metrics param isn't reflected in the docstring. Its shape differs from index_metrics in a load-bearing way: extraction_metrics is a top-level {"extraction": {"time_taken(s)": float}} (a sibling of per-output keys), whereas index_metrics is keyed per output {output_name: {"indexing": {...}}}. A short Args: note documenting both shapes (and that this is None/empty when extraction is skipped) would stop a future maintainer from incorrectly nesting/merging them. Consider typing it as dict[str, dict] | None or a small TypedDict rather than bare dict.

) -> None:
"""Populate metadata/metrics in structured_output after pipeline completion."""
if "metadata" not in structured_output:
structured_output["metadata"] = {}
structured_output["metadata"]["file_name"] = source_file_name
if extracted_text:
structured_output["metadata"]["extracted_text"] = extracted_text
if index_metrics:
new_metrics = self._merge_pipeline_metrics(
index_metrics or {}, extraction_metrics or {}
)
if new_metrics:
existing_metrics = structured_output.get("metrics", {})
structured_output["metrics"] = self._merge_pipeline_metrics(
existing_metrics, index_metrics
existing_metrics, new_metrics
)

def _run_pipeline_summarize(
Expand Down
Loading