Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
393 changes: 393 additions & 0 deletions src/unstract/clone/client.py

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions src/unstract/clone/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,20 @@ class CloneContext:
# touches them once per endpoint, never per resource).
share_cache: dict[str, Any] = field(default_factory=dict)
share_cache_lock: threading.Lock = field(default_factory=threading.Lock)
# Capability-probe memo: (id(client), feature_path) -> present?. Probed
# once per (deployment, feature) so cloud-phase gating costs one GET total.
probe_cache: dict[tuple[int, str], bool] = field(default_factory=dict)

def feature_present(self, client: "PlatformClient", path: str) -> bool:
"""Is ``path`` (a feature's list endpoint) installed on ``client``?

Memoised per run. Plain dict, no lock — probing runs in the
single-threaded orchestrator loop, before any parallel_map fan-out.
"""
key = (id(client), path)
cached = self.probe_cache.get(key)
if cached is not None:
return cached
present = client.probe(path)
self.probe_cache[key] = present
return present
48 changes: 48 additions & 0 deletions src/unstract/clone/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
from unstract.clone.exceptions import CloneError
from unstract.clone.phases import (
AdapterPhase,
AgenticStudioPhase,
APIDeploymentPhase,
ConnectorPhase,
CustomToolPhase,
FilesPhase,
GroupPhase,
LookupsPhase,
ManualReviewPhase,
PipelinePhase,
TagPhase,
ToolInstancePhase,
Expand All @@ -46,18 +49,58 @@
PHASES: list[tuple[str, type[Phase]]] = [
("group", GroupPhase),
("adapter", AdapterPhase),
# Cloud-only; standalone (own project + registry) and FKs four adapters.
# Probe-gated: auto-skips on OSS deployments via ``probe_path``.
("agentic_studio", AgenticStudioPhase),
("connector", ConnectorPhase),
("tag", TagPhase),
("custom_tool", CustomToolPhase),
("files", FilesPhase),
# Cloud-only; consumes custom_tool's prompt + adapter remaps. Probe-gated:
# auto-skips on OSS deployments via ``probe_path``.
("lookups", LookupsPhase),
("workflow", WorkflowPhase),
# Cloud-only; FKs the workflow (RuleEngine / HITLSettings bind to it).
# Probe-gated: auto-skips on OSS deployments via ``probe_path``.
("manual_review", ManualReviewPhase),
("tool_instance", ToolInstancePhase),
("workflow_endpoint", WorkflowEndpointPhase),
("pipeline", PipelinePhase),
("api_deployment", APIDeploymentPhase),
]


def _cloud_phase_runnable(
ctx: CloneContext, report: CloneReport, name: str, probe_path: str
) -> bool:
"""Decide whether a cloud-only phase should run on this deployment pair.

Probe source first; only probe target if source has the feature. A probe
failure (unexpected status / transport) must not abort an otherwise-fine
run — treat it like target-absent: warn + skip, never raise.
"""
try:
if not ctx.feature_present(ctx.source, probe_path):
# OSS source: behave exactly as if this phase didn't exist.
logger.debug("Phase '%s' skipped: feature absent on source", name)
return False
target_present = ctx.feature_present(ctx.target, probe_path)
except Exception as e:
msg = f"Phase '{name}' skipped: capability probe failed ({e})"
logger.warning(msg)
report.warnings.append(msg)
return False
if not target_present:
msg = (
f"Phase '{name}' skipped: feature present on source but not on "
"target deployment"
)
logger.warning(msg)
report.warnings.append(msg)
return False
return True


def clone(
source: OrgEndpoint,
target: OrgEndpoint,
Expand Down Expand Up @@ -93,6 +136,11 @@ def clone(
report.skipped_phases.append(name)
logger.info("Phase '%s' skipped (excluded)", name)
continue
probe_path = getattr(phase_cls, "probe_path", None)
if probe_path is not None and not _cloud_phase_runnable(
ctx, report, name, probe_path
):
continue
logger.info("=== Phase: %s ===", name)
phase_started = time.perf_counter()
try:
Expand Down
6 changes: 6 additions & 0 deletions src/unstract/clone/phases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
"""

from unstract.clone.phases.adapter import AdapterPhase
from unstract.clone.phases.agentic_studio import AgenticStudioPhase
from unstract.clone.phases.api_deployment import APIDeploymentPhase
from unstract.clone.phases.base import Phase
from unstract.clone.phases.connector import ConnectorPhase
from unstract.clone.phases.custom_tool import CustomToolPhase
from unstract.clone.phases.files import FilesPhase
from unstract.clone.phases.group import GroupPhase
from unstract.clone.phases.lookups import LookupsPhase
from unstract.clone.phases.manual_review import ManualReviewPhase
from unstract.clone.phases.pipeline import PipelinePhase
from unstract.clone.phases.tag import TagPhase
from unstract.clone.phases.tool_instance import ToolInstancePhase
Expand All @@ -23,10 +26,13 @@
__all__ = [
"APIDeploymentPhase",
"AdapterPhase",
"AgenticStudioPhase",
"ConnectorPhase",
"CustomToolPhase",
"FilesPhase",
"GroupPhase",
"LookupsPhase",
"ManualReviewPhase",
"Phase",
"PipelinePhase",
"TagPhase",
Expand Down
Loading
Loading