Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/unstract/clone/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,3 +921,45 @@ def list_agentic_registries(
params["agentic_project"] = agentic_project
result = self._request("GET", "agentic-studio-registry/", params=params)
return result if isinstance(result, list) else (result or {}).get("results", [])

def list_agentic_documents(self, project_id: str) -> list[dict[str, Any]]:
"""List a project's uploaded documents. Rows carry ``id`` and
``original_filename``. Agentic docs are a store of their own, distinct
from Prompt Studio ``prompt-document`` rows.
"""
result = self._request(
"GET", "agentic/documents/", params={"project_id": project_id}
)
return result if isinstance(result, list) else (result or {}).get("results", [])

def download_agentic_document(self, document_id: str) -> bytes:
"""Download an agentic document's original bytes.

The ``file`` route serves the raw binary (not a JSON envelope), so this
bypasses the JSON-decoding request path.
"""
url = self._url(f"agentic/documents/{document_id}/file/")
logger.debug("GET %s", url)
resp = self._session.get(url, timeout=self.timeout, verify=self.verify)
if not 200 <= resp.status_code < 300:
raise PlatformAPIError(
f"GET agentic/documents/{document_id}/file/ "
f"returned {resp.status_code}",
status_code=resp.status_code,
body=resp.text[:2000],
)
return resp.content

def upload_agentic_document(
self, project_id: str, file_name: str, data: bytes, mime_type: str
) -> dict[str, Any]:
"""Upload a document into a target agentic project.

Creates the ``AgenticDocument`` row; extraction/summary stay a UI step,
as with Prompt Studio uploads. Callers pre-check filenames to avoid
duplicates on re-runs.
"""
files = {"file": (file_name, data, mime_type)}
return self._request(
"POST", f"agentic/projects/{project_id}/documents/upload/", files=files
)
109 changes: 107 additions & 2 deletions src/unstract/clone/phases/agentic_studio.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from __future__ import annotations

import logging
import mimetypes
import threading
from typing import Any

Expand Down Expand Up @@ -170,6 +171,7 @@ def _clone_project(
self._replicate_share(src, name, tgt_project_id, result, lock)
self._clone_prompt_versions(name, src_project_id, tgt_project_id, result, lock)
self._clone_schemas(name, src_project_id, tgt_project_id, result, lock)
self._clone_documents(name, src_project_id, tgt_project_id, result, lock)
self._republish_registry(name, src_project_id, tgt_project_id, result, lock)

def _build_project_payload(
Expand Down Expand Up @@ -422,6 +424,103 @@ def _clone_schemas(
with lock:
result.created += 1

# ----- documents -----

def _clone_documents(
self,
name: str,
src_project_id: str,
tgt_project_id: str,
result: PhaseResult,
lock: threading.Lock,
) -> None:
try:
src_docs = self.ctx.source.list_agentic_documents(src_project_id)
except Exception as e:
logger.exception("agentic '%s': document listing failed: %s", name, e)
with lock:
result.failed += 1
result.errors.append(f"agentic {name} list documents: {e}")
return
if not src_docs:
return

try:
tgt_docs = self.ctx.target.list_agentic_documents(tgt_project_id)
except Exception as e:
logger.warning(
"agentic '%s': target document listing failed "
"(re-run may duplicate): %s",
name,
e,
)
tgt_docs = []
target_names = {d.get("original_filename") for d in tgt_docs}

for src in src_docs:
file_name = src.get("original_filename")
src_doc_id = src.get("id")
if not file_name or not src_doc_id:
continue
if file_name in target_names:
with lock:
result.skipped += 1
logger.info(
"agentic '%s': document '%s' already on target — skipping",
name,
file_name,
)
continue
self._clone_one_document(
name, tgt_project_id, src_doc_id, file_name, result, lock
)
Comment thread
greptile-apps[bot] marked this conversation as resolved.

def _clone_one_document(
self,
name: str,
tgt_project_id: str,
src_doc_id: str,
file_name: str,
result: PhaseResult,
lock: threading.Lock,
) -> None:
try:
raw = self.ctx.source.download_agentic_document(src_doc_id)
except Exception as e:
logger.exception(
"agentic '%s': document '%s' download failed: %s", name, file_name, e
)
with lock:
result.failed += 1
result.errors.append(f"agentic {name} download {file_name}: {e}")
return

if len(raw) > self.ctx.options.max_file_size:
with lock:
result.skipped += 1
result.warnings.append(
f"agentic {name}: document {file_name} exceeds size cap — "
"upload it manually on target"
)
return

mime = mimetypes.guess_type(file_name)[0] or "application/pdf"
try:
self.ctx.target.upload_agentic_document(
tgt_project_id, file_name, raw, mime
)
except Exception as e:
logger.exception(
"agentic '%s': document '%s' upload failed: %s", name, file_name, e
)
with lock:
result.failed += 1
result.errors.append(f"agentic {name} upload {file_name}: {e}")
return
with lock:
result.created += 1
logger.info("agentic '%s': uploaded document '%s'", name, file_name)

# ----- registry -----

def _republish_registry(
Expand Down Expand Up @@ -507,8 +606,9 @@ def _plan_children(
result: PhaseResult,
lock: threading.Lock,
) -> None:
"""Dry-run: count source prompt versions + schemas as planned and
record planned prompt-version ids so downstream resolves don't miss.
"""Dry-run: count source prompt versions + schemas + documents as
planned and record planned prompt-version ids so downstream resolves
don't miss.
"""
try:
src_versions = self.ctx.source.list_agentic_prompt_versions(
Expand All @@ -522,11 +622,16 @@ def _plan_children(
)
except Exception:
src_schemas = []
try:
src_docs = self.ctx.source.list_agentic_documents(src_project_id)
except Exception:
src_docs = []
with lock:
for v in src_versions:
self.ctx.remap.record_planned("agentic_prompt_version", v["id"])
result.created += 1
result.created += len(src_schemas)
result.created += len(src_docs)

# ----- settings -----

Expand Down
9 changes: 7 additions & 2 deletions src/unstract/clone/phases/tool_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,17 @@ def _clone_workflow_tools(
src_ti_id = src_ti["id"]
src_tool_id = src_ti["tool_id"]

# tool_id is a registry id: Prompt Studio tools register under
# prompt_studio_registry, exported agentic projects under
# agentic_studio_registry. Resolve against both.
with lock:
tgt_tool_id = self.ctx.remap.resolve("prompt_studio_registry", src_tool_id)
tgt_tool_id = self.ctx.remap.resolve(
"prompt_studio_registry", src_tool_id
) or self.ctx.remap.resolve("agentic_studio_registry", src_tool_id)
if not tgt_tool_id:
logger.warning(
"skipping tool_instance %s — no registry remap for tool_id %s "
"(custom tool likely unpublished on source)",
"(tool likely unpublished on source)",
src_ti_id,
src_tool_id,
)
Expand Down
66 changes: 66 additions & 0 deletions tests/clone/test_agentic_studio_phase.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def __init__(
settings=None,
registries=None,
users=None,
documents=None,
document_blobs=None,
):
self.projects = list(projects or [])
self.users = list(users or [])
Expand All @@ -50,13 +52,18 @@ def __init__(
self.settings = list(settings or [])
# project_id -> list of registry rows
self.registries = {k: list(v) for k, v in (registries or {}).items()}
# project_id -> list of document rows
self.documents = {k: list(v) for k, v in (documents or {}).items()}
# document_id -> raw bytes
self.document_blobs = dict(document_blobs or {})

self.created_projects: list[dict] = []
self.created_versions: list[dict] = []
self.created_schemas: list[dict] = []
self.created_settings: list[dict] = []
self.updated_settings: list[tuple[str, dict]] = []
self.exported_projects: list[str] = []
self.uploaded_documents: list[tuple[str, str, bytes]] = []
self._next_id = 1

def _mint(self, prefix: str) -> str:
Expand Down Expand Up @@ -141,6 +148,20 @@ def export_agentic_project(self, project_id, *, force=True):
def list_agentic_registries(self, *, agentic_project=None):
return list(self.registries.get(agentic_project, []))

# ----- documents -----

def list_agentic_documents(self, project_id):
return list(self.documents.get(project_id, []))

def download_agentic_document(self, document_id):
return self.document_blobs.get(document_id, b"%PDF-fake")

def upload_agentic_document(self, project_id, file_name, data, mime_type):
self.uploaded_documents.append((project_id, file_name, data))
row = {"id": self._mint("tgt-doc"), "original_filename": file_name}
self.documents.setdefault(project_id, []).append(row)
return {"data": [{"document_name": file_name}]}


def _ctx(source, target, *, remap=None, **opt_overrides):
return CloneContext(
Expand Down Expand Up @@ -507,3 +528,48 @@ def test_source_group_share_replicated_via_remap():
assert len(tgt.shared_projects) == 1
_, payload = tgt.shared_projects[0]
assert payload["shared_groups"] == [70]


def test_documents_cloned_skipping_those_already_on_target():
src = FakeClient(
projects=[_src_project("src-p", "Receipts")],
documents={
"src-p": [
{"id": "d-new", "original_filename": "new.pdf"},
{"id": "d-dup", "original_filename": "dup.pdf"},
]
},
document_blobs={"d-new": b"%PDF-new", "d-dup": b"%PDF-dup"},
)
tgt = FakeClient()
# Adopt an existing target project so the dup doc can be pre-seeded on it.
tgt.projects.append({"id": "tgt-proj-0001", "name": "Receipts"})
tgt.documents["tgt-proj-0001"] = [{"id": "t-dup", "original_filename": "dup.pdf"}]
ctx = _ctx(src, tgt)

result = AgenticStudioPhase(ctx).run(CloneReport())

assert result.failed == 0
# 'new.pdf' uploaded, 'dup.pdf' skipped (already present).
assert tgt.uploaded_documents == [("tgt-proj-0001", "new.pdf", b"%PDF-new")]
assert result.skipped == 1


def test_dry_run_counts_documents_without_uploading():
src = FakeClient(
projects=[_src_project("src-p", "Receipts")],
documents={
"src-p": [
{"id": "d1", "original_filename": "a.pdf"},
{"id": "d2", "original_filename": "b.pdf"},
]
},
)
tgt = FakeClient()
ctx = _ctx(src, tgt, dry_run=True)

result = AgenticStudioPhase(ctx).run(CloneReport())

assert tgt.uploaded_documents == []
# 1 project + 2 documents predicted as creates.
assert result.created == 3
18 changes: 18 additions & 0 deletions tests/clone/test_tool_instance_phase.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,24 @@ def test_skip_when_registry_remap_missing():
assert tgt.create_calls == []


def test_resolves_tool_id_via_agentic_registry():
# An exported agentic project registers under agentic_studio_registry,
# not prompt_studio_registry; its tool_instance must still resolve.
src = FakeClient()
src.instances[SRC_WF] = [_src_ti("src-ti-1", SRC_WF, SRC_REG, {})]
tgt = FakeClient()
remap = RemapTable()
remap.record("workflow", SRC_WF, TGT_WF)
remap.record("agentic_studio_registry", SRC_REG, TGT_REG)
ctx = _ctx(src, tgt, remap=remap)

result = ToolInstancePhase(ctx).run(CloneReport())

assert result.created == 1
assert result.skipped == 0
assert tgt.create_calls[0]["tool_id"] == TGT_REG


def test_adopt_existing_target_instance_and_repatch_metadata():
src = FakeClient()
src_meta = {"llm": "My OpenAI"}
Expand Down
Loading