Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions backend/api_v2/deployment_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,17 @@ def execute_workflow(
f"API hub header caching failed for execution {execution_id}: {e}"
)

hash_values_of_files = SourceConnector.add_input_file_to_api_storage(
pipeline_id=pipeline_id,
workflow_id=workflow_id,
execution_id=execution_id,
file_objs=file_objs,
use_file_history=use_file_history,
)

try:
# Staging runs synchronously (before async dispatch). Keep it inside the
# try so a staging failure marks the execution ERROR instead of leaving
# the PENDING row created above stuck/running forever in the UI (UN-3647).
hash_values_of_files = SourceConnector.add_input_file_to_api_storage(
pipeline_id=pipeline_id,
workflow_id=workflow_id,
execution_id=execution_id,
file_objs=file_objs,
use_file_history=use_file_history,
)
result = WorkflowHelper.execute_workflow_async(
workflow_id=workflow_id,
pipeline_id=pipeline_id,
Expand Down Expand Up @@ -287,6 +289,13 @@ def execute_workflow(
if not include_metrics:
result.remove_result_metrics()
except Exception as error:
# Mark the execution ERROR so it doesn't appear stuck/PENDING in the UI.
# The async-dispatch path marks it internally (execute_workflow_async),
# but synchronous failures (e.g. staging) bypass that, so do it here.
WorkflowExecutionServiceHelper.update_execution_err(
str(execution_id), str(error)
)
Comment thread
athul-rs marked this conversation as resolved.
Outdated

# Release rate limit slot (workflow setup/dispatch failed, async job not started)
APIDeploymentRateLimiter.release_slot(api.organization, str(execution_id))

Expand Down
Empty file.
151 changes: 151 additions & 0 deletions backend/api_v2/tests/test_deployment_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""Regression test for UN-3647.

When an API-deployment run fails synchronously at the "Staging files in API
storage" step (``SourceConnector.add_input_file_to_api_storage``, before async
dispatch), the PENDING ``WorkflowExecution`` row created earlier must be marked
ERROR — otherwise the UI shows the run as stuck/running forever.

Before the fix, staging sat *outside* the try/except in
``DeploymentHelper.execute_workflow``, so a staging exception propagated out of
the method and the row stayed PENDING. The fix moves staging inside the try and
has the except branch call ``update_execution_err``.

Like ``usage_v2/tests/test_helper.py``, this test does not require a live Django
database (the backend test env has no ``pytest-django`` / no DB). It stubs the
module's cross-app imports in ``sys.modules`` *before* importing the helper, so
the real ``execute_workflow`` control flow is exercised with the collaborators
mocked. Runnable under pytest or directly: ``python3 test_deployment_helper.py``.
"""

from __future__ import annotations

import sys
import types
from pathlib import Path
from unittest.mock import MagicMock

# Ensure the backend dir (which holds the ``api_v2`` package) is importable when
# this file is run directly, not just under pytest's rootdir.
_BACKEND_DIR = Path(__file__).resolve().parents[2]
if str(_BACKEND_DIR) not in sys.path:
sys.path.insert(0, str(_BACKEND_DIR))


class _AutoModule(types.ModuleType):
"""Module whose attribute access lazily returns (and caches) a MagicMock."""

def __getattr__(self, name: str) -> MagicMock:
mock = MagicMock(name=f"{self.__name__}.{name}")
setattr(self, name, mock)
return mock


def _install_stub(dotted: str) -> None:
"""Register an ``_AutoModule`` for ``dotted`` and every parent prefix.

Existing entries (e.g. the real, empty ``api_v2`` package) are preserved so
the real ``api_v2.deployment_helper`` submodule can still be imported.
"""
parts = dotted.split(".")
for i in range(1, len(parts) + 1):
prefix = ".".join(parts[:i])
if prefix not in sys.modules:
sys.modules[prefix] = _AutoModule(prefix)


def _load_deployment_helper():
"""Stub cross-app imports, then import and return the real helper module."""
import api_v2 # real, empty package — keep it so the submodule resolves

assert not isinstance(api_v2, _AutoModule), "api_v2 must be the real package"

for dotted in (
"requests",
"configuration.config_registry",
"configuration.models",
"django.conf",
"django.core.files.uploadedfile",
"plugins.workflow_manager.workflow_v2.api_hub_usage_utils",
"rest_framework.request",
"rest_framework.serializers",
"rest_framework.utils.serializer_helpers",
"tags.models",
"usage_v2.helper",
"utils.constants",
"utils.local_context",
"workflow_manager.endpoint_v2.destination",
"workflow_manager.endpoint_v2.source",
"workflow_manager.workflow_v2.dto",
"workflow_manager.workflow_v2.enums",
"workflow_manager.workflow_v2.execution",
"workflow_manager.workflow_v2.models",
"workflow_manager.workflow_v2.workflow_helper",
"api_v2.api_key_validator",
"api_v2.dto",
"api_v2.exceptions",
"api_v2.key_helper",
"api_v2.models",
"api_v2.rate_limiter",
"api_v2.serializers",
"api_v2.utils",
):
_install_stub(dotted)

# ``class DeploymentHelper(BaseAPIKeyValidator)`` needs a real base class,
# not a MagicMock instance.
sys.modules["api_v2.api_key_validator"].BaseAPIKeyValidator = type(
"BaseAPIKeyValidator", (), {}
)

import api_v2.deployment_helper as helper

return helper


def _run_staging_failure() -> None:
"""A staging failure marks the execution ERROR instead of leaving it PENDING."""
helper = _load_deployment_helper()

# Known execution id so we can assert it is the one marked ERROR.
execution_row = MagicMock()
execution_row.id = "exec-123"
helper.WorkflowExecutionServiceHelper.create_workflow_execution.return_value = (
execution_row
)

# Simulate the synchronous staging failure (e.g. the Moody's S3/MinIO 403).
helper.SourceConnector.add_input_file_to_api_storage.side_effect = RuntimeError(
"boom"
)

api = MagicMock()
api.workflow.id = "wf-1"
api.id = "pipe-1"

# Must NOT raise — the failure should be handled, not propagated.
helper.DeploymentHelper.execute_workflow(
organization_name="org",
api=api,
file_objs=[],
timeout=-1,
)

# The PENDING row is marked ERROR with the surfaced reason.
helper.WorkflowExecutionServiceHelper.update_execution_err.assert_called_once_with(
"exec-123", "boom"
)
# And the slot/storage cleanup still runs.
helper.APIDeploymentRateLimiter.release_slot.assert_called_once()
helper.DestinationConnector.delete_api_storage_dir.assert_called_once()

# Async dispatch is never reached when staging fails.
helper.WorkflowHelper.execute_workflow_async.assert_not_called()


def test_staging_failure_marks_execution_error() -> None:
_run_staging_failure()


if __name__ == "__main__":
_run_staging_failure()
print("OK: staging failure marks execution ERROR (UN-3647)")
Loading