Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
bfb2c4d
UN-3534 [FEAT] PG Queue Phase 8a — queue-transport routing gate + sca…
muhammad-ali-e Jun 11, 2026
85c36ff
UN-3534 [DOCS] Standardize on "PG Queue" naming; drop PGMQ branding
muhammad-ali-e Jun 11, 2026
2b6714f
UN-3537 [FEAT] PG Queue Phase 9a — extension-free queue schema + SKIP…
muhammad-ali-e Jun 11, 2026
8087677
UN-3538 [FEAT] PG Queue Phase 9b — enqueue PG-routed tasks to Postgre…
muhammad-ali-e Jun 11, 2026
12631c1
UN-3539 [FEAT] PG Queue Phase 9c — consumer poll loop (claim → run → …
muhammad-ali-e Jun 12, 2026
b58ee1e
UN-3541 [FEAT] Wire PG-queue consumer into run-worker.sh + bootstrap …
muhammad-ali-e Jun 12, 2026
9d7124f
UN-3544 [FEAT] PG-queue consumer liveness endpoint (poll-loop heartbe…
muhammad-ali-e Jun 12, 2026
0955df9
UN-3546 [FEAT] Priority-ordered PG-queue dequeue + concurrency-safe c…
muhammad-ali-e Jun 12, 2026
7c7617b
UN-3548 [FEAT] PgBarrier — Postgres fan-in barrier (3rd WORKER_BARRIE…
muhammad-ali-e Jun 15, 2026
52a28fd
UN-3553 [FEAT] PG Queue 9d slice 1 — leader-election lease (orchestra…
muhammad-ali-e Jun 15, 2026
649e06b
UN-3554 [FEAT] PG Queue 9d slice 2 — reaper process + barrier-orphan …
muhammad-ali-e Jun 16, 2026
35c735f
UN-3555 [FEAT] PG Queue 9d slice-2 followup — run-worker.sh reaper ty…
muhammad-ali-e Jun 16, 2026
f312388
UN-3556 [FEAT] PG Queue 9d — reaper liveness probe (heartbeat + is_le…
muhammad-ali-e Jun 16, 2026
473603a
UN-3560 [FEAT] PG Queue 9 — run-worker.sh `-L celery` log alias (#2066)
muhammad-ali-e Jun 17, 2026
d6cead2
UN-3559 [FEAT] PG Queue 9e PR 1 — execution transport seam (inert) (#…
muhammad-ali-e Jun 17, 2026
59716f5
UN-3561 [FEAT] PG Queue 9e PR 2a — live-PG-pipeline inert foundation …
muhammad-ali-e Jun 17, 2026
5f2d18d
UN-3562 [FEAT] PG Queue 9e PR 2b — per-batch idempotency primitive (p…
muhammad-ali-e Jun 17, 2026
b3f25d9
UN-3563 [FEAT] PG Queue 9e PR 2c — live PG fan-out/barrier/callback (…
muhammad-ali-e Jun 17, 2026
c872b34
UN-3564 [FEAT] PG Queue 9e — reaper-recovery: mark stranded execution…
muhammad-ali-e Jun 17, 2026
c2e6a6f
UN-3570 [FEAT] PG Queue 9e PR 3 — Flipt canary wiring for transport r…
muhammad-ali-e Jun 18, 2026
76f39d2
UN-3574 [FEAT] PG Queue 9e 2d — orchestrator (async_execute_bin) on P…
muhammad-ali-e Jun 18, 2026
5f4cdf0
UN-3566 [FEAT] PG Queue 9f — multi-queue consumer + named run-worker.…
muhammad-ali-e Jun 18, 2026
b2ff1b5
UN-3576 [FEAT] PG Queue 9g — docker-compose PG consumer + reaper serv…
muhammad-ali-e Jun 18, 2026
87f9ede
UN-3581 [FEAT] PG Queue 9h-a — pg_periodic_schedule mirror table + du…
muhammad-ali-e Jun 18, 2026
e202ae0
UN-3596 [FEAT] PG Queue 9h-b — PG scheduler tick in the reaper/orches…
muhammad-ali-e Jun 18, 2026
7d775ce
UN-3597 [FEAT] PG Queue 9h-c — schedule ownership ramp control + back…
muhammad-ali-e Jun 19, 2026
ca73756
UN-3602 [FIX] PG Queue — restore API-deployment timeout sync-wait on …
muhammad-ali-e Jun 19, 2026
fd38637
UN-3603 [GATED-FEAT] PG Queue — blocking executor RPC on Postgres + u…
muhammad-ali-e Jun 19, 2026
5a7d3d1
UN-3605 [GATED-FEAT] PG Queue — structure_tool executor RPC over Post…
muhammad-ali-e Jun 20, 2026
ab80a49
UN-3606 [FIX] PG Queue — prefork the consumer so file batches + execu…
muhammad-ali-e Jun 20, 2026
46c53d1
UN-3608 [FEAT] PG Queue 9i — executor async/callback path on PG (self…
muhammad-ali-e Jun 21, 2026
1c7723b
UN-3610 [FEAT] PG Queue — reaper retention sweep (pg_task_result + pg…
muhammad-ali-e Jun 22, 2026
20c6e0b
UN-3607 [REFACTOR] retire the executor_rpc mirror — shared dispatcher…
muhammad-ali-e Jun 22, 2026
ea7a97b
UN-3445 [TOOL] pg_benchmark — PG-vs-Celery execution benchmark harness
muhammad-ali-e Jun 22, 2026
bc33c8c
UN-3445 [TOOL] pg_benchmark — parse real API-deployment response + ex…
muhammad-ali-e Jun 22, 2026
d9b2e95
UN-3445 [TOOL] pg_benchmark — mock OpenAI server for zero-cost load t…
muhammad-ali-e Jun 22, 2026
556e27e
UN-3445 [TOOL] pg_benchmark — address SonarCloud findings (reflection…
muhammad-ali-e Jun 23, 2026
4fee363
UN-3616 [FEAT] PG Queue — route API-triggered pipeline trigger throug…
muhammad-ali-e Jun 23, 2026
edc78bb
Commit uv.lock changes
muhammad-ali-e Jun 24, 2026
d5b4a2f
UN-3618 [REFACTOR] PG Queue — gate solely on the Flipt flag (+ UN-361…
muhammad-ali-e Jun 24, 2026
ebe2bec
Merge branch 'main' into feat/UN-3445-pg-queue-integration
muhammad-ali-e Jun 26, 2026
d7b27ce
UN-3445 [FIX] workflow_v2 — merge migration after main backmerge
muhammad-ali-e Jun 26, 2026
1633552
UN-3624 [FIX] PG queue — schema-qualify worker SQL for PgBouncer txn …
muhammad-ali-e Jun 27, 2026
2159ffd
UN-3651 [FIX] PG barrier enqueue — reconnect-retry on stale cached co…
muhammad-ali-e Jun 29, 2026
5d07155
UN-3652 [FIX] PG orchestration failure — reconcile file counters + su…
muhammad-ali-e Jun 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/backend/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ def filter(self, record):
# For the organization model
"account_v2",
"account_usage",
# PG Queue — extension-free bespoke queue (cross-org infra, shared schema)
"pg_queue",
# Django apps should go below this line
"django.contrib.admin",
"django.contrib.auth",
Expand Down
Empty file added backend/pg_queue/__init__.py
Empty file.
7 changes: 7 additions & 0 deletions backend/pg_queue/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from django.apps import AppConfig


class PgQueueConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "pg_queue"
verbose_name = "PG Queue"
113 changes: 113 additions & 0 deletions backend/pg_queue/executor_rpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""Executor-RPC for the PG path — backend (Django) transport adapter.

The gate + reply_key/timeout orchestration + routing live ONCE in
``unstract.workflow_execution.executor_rpc`` (shared with the workers). This module
is the thin Django half: a :class:`DjangoQueueTransport` that enqueues via the ORM
(``enqueue_task``) and polls ``PgTaskResult``, plus the :func:`get_executor_dispatcher`
factory that wires them together.

Zero-regression: with the ``pg_queue_enabled`` Flipt flag off the routing dispatcher
delegates every mode to the unchanged Celery ``ExecutionDispatcher`` and no
``pg_task_result`` row is created.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from django.db import close_old_connections

from pg_queue.models import PgTaskResult
from pg_queue.producer import enqueue_task
from unstract.core.polling import poll_for_row
from unstract.sdk1.execution.dispatcher import ExecutionDispatcher
from unstract.workflow_execution.executor_rpc import (
EXECUTE_TASK,
ExecResultRow,
PgExecutionDispatcher,
QueueTransport,
RoutingExecutionDispatcher,
resolve_pg_transport,
)

if TYPE_CHECKING:
from unstract.core.data_models import ContinuationSpec
from unstract.sdk1.execution.context import ExecutionContext

# Re-exported so existing ``from pg_queue.executor_rpc import …`` imports keep working.
__all__ = [
"DjangoQueueTransport",
"PgExecutionDispatcher",
"RoutingExecutionDispatcher",
"get_executor_dispatcher",
"resolve_executor_transport",
]


def resolve_executor_transport(context: ExecutionContext) -> bool:
"""True → route this executor dispatch over PG; False → Celery (default).

The single ``pg_queue_enabled`` Flipt flag (fail-closed).
"""
return resolve_pg_transport(context)


class DjangoQueueTransport(QueueTransport):
""":class:`QueueTransport` over the Django ORM (the backend half).

Inherits the Protocol so a type-checker verifies this implementation against the
seam independently of the ``PgExecutionDispatcher(...)`` construction site.
"""

def enqueue(
self,
*,
queue: str,
context: ExecutionContext,
org_id: str,
reply_key: str | None = None,
on_success: ContinuationSpec | None = None,
on_error: ContinuationSpec | None = None,
task_id: str | None = None,
) -> None:
enqueue_task(
task_name=EXECUTE_TASK,
queue=queue,
args=[context.to_dict()],
org_id=org_id,
reply_key=reply_key,
on_success=on_success,
on_error=on_error,
task_id=task_id,
)

def wait_for_result(self, reply_key: str, timeout: float) -> ExecResultRow | None:
"""Poll ``pg_task_result`` until the row appears or *timeout* elapses.

Uses the shared :func:`poll_for_row` backoff skeleton, releasing the DB
connection between polls (``close_old_connections``) so a long-running RPC
does not pin a backend connection and exhaust the pool. Each poll is its own
autocommit query, so a row committed by the executor consumer becomes visible
— **dispatch must NOT be called inside an open transaction**
(``transaction.atomic`` / ``ATOMIC_REQUESTS`` would pin one snapshot and never
see the new row).
"""

def _fetch() -> ExecResultRow | None:
row = PgTaskResult.objects.filter(pk=reply_key).first()
if row is None:
return None
return ExecResultRow(status=row.status, result=row.result, error=row.error)

return poll_for_row(_fetch, timeout, between_polls=close_old_connections)


def get_executor_dispatcher(
celery_app: object | None = None,
) -> RoutingExecutionDispatcher:
"""Factory: the gate-routed executor dispatcher (PG when enabled, else Celery)."""
return RoutingExecutionDispatcher(
celery=ExecutionDispatcher(celery_app=celery_app),
pg=PgExecutionDispatcher(DjangoQueueTransport()),
resolve=resolve_executor_transport,
)
12 changes: 12 additions & 0 deletions backend/pg_queue/flags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Single source of truth for the PG-queue rollout flag key.

One Flipt flag gates the whole PG-queue feature — execution
(``workflow_v2/transport.py``), scheduler (``scheduler/ownership.py``), and
executor (``pg_queue/executor_rpc.py``) all read this one key. Kept in a neutral
leaf module so the three resolvers import a single constant instead of
duplicating the literal (a grep on ``PG_QUEUE_FLAG_KEY`` finds every use), making
"one flag" a structural guarantee. This flag is the **sole** rollout control —
fail-closed to Celery on a blind/unreachable Flipt or any error.
"""

PG_QUEUE_FLAG_KEY = "pg_queue_enabled"
Empty file.
Empty file.
148 changes: 148 additions & 0 deletions backend/pg_queue/management/commands/reconcile_pg_schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""Backfill the pg_periodic_schedule mirror + reconcile Beat/PG schedule
ownership (Phase 9, ②c).

Run this:
- **once** after deploying the mirror, to backfill rows for schedules created
before the mirror existed (the dual-write only covers schedules touched since);
- **after each Flipt ramp change** to ``pg_scheduler_enabled``, to apply the new
percentage — flipping ``pg_owned`` and the matching Beat ``PeriodicTask`` for
every schedule (the create/update path only reconciles the schedule it edits).

It is idempotent and safe to run anytime: with the rollout off it leaves every
schedule on Beat. Could later be driven periodically (e.g. by the orchestrator);
kept a command here so the ramp stays an explicit, auditable ops action.
"""

import json
from typing import Any

from django.core.management.base import BaseCommand, CommandError
from django_celery_beat.models import CrontabSchedule, PeriodicTask
from scheduler.ownership import reconcile_ownership_for, resolve_schedule_owner
from scheduler.tasks import mirror_periodic_schedule_upsert

from pg_queue.models import PgPeriodicSchedule

# Only the pipeline-trigger PeriodicTasks are scheduled pipelines (other periodic
# tasks — metrics, audit — are not mirrored).
_PIPELINE_TASK_PATH = "scheduler.tasks.execute_pipeline_task"


def _cron_from_crontab(crontab: CrontabSchedule | None) -> str:
"""Reconstruct the 5-field cron string from a CrontabSchedule row."""
if crontab is None:
return ""
return (
f"{crontab.minute} {crontab.hour} {crontab.day_of_month} "
f"{crontab.month_of_year} {crontab.day_of_week}"
)


class Command(BaseCommand):
help = (
"Backfill pg_periodic_schedule mirrors for pre-existing schedules and "
"reconcile Beat/PG ownership against the current pg_scheduler_enabled "
"rollout. Idempotent; with the rollout off, leaves everything on Beat."
)

def add_arguments(self, parser: Any) -> None:
parser.add_argument(
"--dry-run",
action="store_true",
help="Report what would change without writing.",
)

def handle(self, *args: Any, **options: Any) -> None:
dry_run = options["dry_run"]
backfilled = self._backfill_mirrors(dry_run)
reconciled, pg_owned, failed = self._reconcile_all(dry_run)

prefix = "[dry-run] " if dry_run else ""
summary = (
f"{prefix}backfilled={backfilled} reconciled={reconciled} "
f"pg_owned={pg_owned} failed={failed}"
)
if failed:
# Surface failures where the operator looks (and to automation).
self.stderr.write(self.style.ERROR(summary))
raise CommandError(f"{failed} schedule(s) failed to reconcile")
self.stdout.write(self.style.SUCCESS(summary))

def _mirror_fields_from_args(self, pt: Any, pipeline_id: str) -> dict | None:
"""Extract the mirror fields from PeriodicTask.args, or None (logged) for a
malformed/non-array row — a bad row must not abort the whole command.
"""
try:
# json.JSONDecodeError is a ValueError subclass, so one except covers
# both the parse error and the non-array guard below.
task_args = json.loads(pt.args or "[]")
if not isinstance(task_args, list):
raise ValueError(f"expected JSON array, got {type(task_args).__name__}")
except ValueError as exc:
self.stderr.write(
self.style.ERROR(
f"skipping pipeline {pipeline_id}: bad PeriodicTask.args ({exc})"
)
)
return None
return {
"workflow_id": task_args[0] if len(task_args) > 0 else None,
"organization_id": (task_args[1] if len(task_args) > 1 else "") or "",
# args[6] is the synthetic "Pipeline job-<id>" label; the real name
# self-heals via the dual-write on the next schedule edit.
"pipeline_name": task_args[6] if len(task_args) > 6 else "",
}

def _backfill_mirrors(self, dry_run: bool) -> int:
"""Create a mirror row for every pipeline-trigger PeriodicTask lacking one."""
# Pre-fetch the already-mirrored ids in one query (avoid an EXISTS per row).
mirrored = {
str(pk)
for pk in PgPeriodicSchedule.objects.values_list("pipeline_id", flat=True)
}
backfilled = 0
for pt in PeriodicTask.objects.filter(task=_PIPELINE_TASK_PATH):
pipeline_id = pt.name # = str(pipeline.pk)
if pipeline_id in mirrored:
continue
fields = self._mirror_fields_from_args(pt, pipeline_id)
if fields is None:
continue
self.stdout.write(
f"backfill mirror for pipeline {pipeline_id} (enabled={pt.enabled})"
)
if not dry_run:
mirror_periodic_schedule_upsert(
pipeline_id=pipeline_id,
cron_string=_cron_from_crontab(pt.crontab),
enabled=pt.enabled,
**fields,
)
backfilled += 1
return backfilled

def _reconcile_all(self, dry_run: bool) -> tuple[int, int, int]:
"""Reconcile ownership for every mirror row against the current rollout.
Returns (reconciled, pg_owned, failed).
"""
reconciled = pg_owned = failed = 0
for row in PgPeriodicSchedule.objects.all():
if dry_run:
# Preview only — read the would-be owner (no DB write) so an
# operator can see how many a ramp change would hand to PG.
reconciled += 1
if resolve_schedule_owner(str(row.pipeline_id), row.organization_id):
pg_owned += 1
continue
# mirror.enabled tracks pipeline.active (dual-write); use it as the
# 'active' input so a paused schedule isn't re-enabled by reconcile.
result = reconcile_ownership_for(
str(row.pipeline_id), row.organization_id, active=row.enabled
)
if result is None: # transaction failed (already logged)
failed += 1
continue
reconciled += 1
if result:
pg_owned += 1
return reconciled, pg_owned, failed
34 changes: 34 additions & 0 deletions backend/pg_queue/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Generated by Django 4.2.1 on 2026-06-11 14:42

import django.utils.timezone
from django.db import migrations, models


class Migration(migrations.Migration):
initial = True

dependencies = []

operations = [
migrations.CreateModel(
name="PgQueueMessage",
fields=[
("msg_id", models.BigAutoField(primary_key=True, serialize=False)),
("queue_name", models.TextField()),
("message", models.JSONField()),
("org_id", models.TextField(blank=True, default="")),
("enqueued_at", models.DateTimeField(default=django.utils.timezone.now)),
("vt", models.DateTimeField(default=django.utils.timezone.now)),
("read_ct", models.IntegerField(default=0)),
],
options={
"db_table": "pg_queue_message",
"indexes": [
models.Index(
fields=["queue_name", "vt", "msg_id"],
name="pg_queue_message_dequeue_idx",
)
],
},
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Generated by Django 4.2.1 on 2026-06-12 11:47

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("pg_queue", "0001_initial"),
]

operations = [
migrations.RemoveIndex(
model_name="pgqueuemessage",
name="pg_queue_message_dequeue_idx",
),
migrations.AddField(
model_name="pgqueuemessage",
name="priority",
field=models.SmallIntegerField(default=5),
),
migrations.AddIndex(
model_name="pgqueuemessage",
index=models.Index(
models.F("queue_name"),
models.OrderBy(models.F("priority"), descending=True),
models.F("msg_id"),
name="pg_queue_message_dequeue_idx",
),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Generated by Django 4.2.1 on 2026-06-12 14:05

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("pg_queue", "0002_remove_pgqueuemessage_pg_queue_message_dequeue_idx_and_more"),
]

operations = [
# `check=` is correct for the pinned Django (4.2). It's deprecated in
# 5.1 in favour of `condition=` and removed in 6.0 — but fresh installs
# always replay this under the Django we ship, so leave it as-is.
# When the pin is bumped to >= 6.0, squash these migrations (or do the
# behaviour-preserving `check=` -> `condition=` edit) as part of that
# upgrade so a from-scratch migrate still runs.
migrations.AddConstraint(
model_name="pgqueuemessage",
constraint=models.CheckConstraint(
check=models.Q(("priority__gte", 1), ("priority__lte", 10)),
name="pg_queue_message_priority_range",
),
),
]
Loading