Skip to content

Feat/un 3445 pg queue integration#2108

Draft
muhammad-ali-e wants to merge 45 commits into
mainfrom
feat/UN-3445-pg-queue-integration
Draft

Feat/un 3445 pg queue integration#2108
muhammad-ali-e wants to merge 45 commits into
mainfrom
feat/UN-3445-pg-queue-integration

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

Why

How

Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)

Database Migrations

Env Config

Relevant Docs

Related Issues or PRs

Dependencies Versions

Notes on Testing

Screenshots

Checklist

I have read and understood the Contribution Guidelines.

muhammad-ali-e and others added 30 commits June 11, 2026 15:27
…ffold (#2033)

* UN-3534 [FEAT] PG Queue Phase 8a — queue-transport routing gate + scaffold

Add the Strangler-Fig routing seam that lets PG Queue (PGMQ) coexist with
Celery so task types can be migrated one at a time. Scaffold only: the PG
branch is a Celery-routing stub (no PG consumer exists yet), so this is
zero-behaviour-change by construction.

- queue_backend/routing.py: QueueBackend{CELERY,PG} + select_backend(task_name)
  reading the WORKER_PG_QUEUE_ENABLED_TASKS allow-list (default empty -> all
  Celery). Tolerant CSV parsing; never raises.
- dispatch(): consults select_backend(); PG-selected tasks are logged but
  still dispatched via Celery. The send_task call sits outside the PG branch
  so the wire is byte-identical regardless of the routing decision.
- queue_backend/pg_queue/: scaffold subpackage. PGMQ is a core transport
  substrate, so it lives in the seam beside dispatch/routing/barrier, not
  under the git-ignored plugins/ cloud overlay.
- sample.env: documents the flag (default-safe, OSS-friendly, no Flipt server).
- tests: 12 routing tests incl. the byte-identical-dispatch characterisation
  pinning the inert-scaffold invariant.

Barrier axis untouched (WORKER_BARRIER_BACKEND stays chord). Per-org routing
intentionally deferred to the rollout phase.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3534 [FEAT] Address Phase 8a review feedback

- test seam: drop stale WORKER_PG_QUEUE_ENABLED_ORGS reference — the org
  axis was removed, that flag never existed [must-fix]
- observability: routing log DEBUG -> INFO so a cutover survives a default
  log config; log-once per task name bounds volume. Log the configured
  allow-list once per process so a typo'd task name is eyeballable at boot
  even when it never matches a real dispatch [important]
- tests: pin the routing branch with caplog assertions (PG -> log fires,
  Celery -> no log, bounded to once) so the inert gate can't be silently
  deleted; assert allow-list logging too [important — closes test gap]
- QueueBackend: document the is-not-== discipline (StrEnum makes a typo'd
  "== cellery" a silent False) [suggestion]
- routing: drop dead _parse_allow_list(env_var) param, read the constant
  directly; one-pass strip; test imports the constant (single source of
  truth) [nits]
- pg_queue docstring: clarify plugins/ subdirs are git-ignored while the
  dir itself is tracked; soften volatile labs branch/section/filename
  references [nits]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3534 [DOCS] Note migration-coherence constraint in routing gate

Document that the per-task allow-list may only split independent/leaf
tasks across substrates. The coupled execution pipeline (async_execute_bin
-> file processing -> callback, with the barrier fan-in) must run a single
execution entirely on one transport — its migration unit is the execution,
not the task. The next phase resolves transport once at kickoff and carries
it in ExecutionContext; select_backend then honours that carried marker
over the per-task env. Until then, only leaf tasks should be enabled here.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3534 [DOCS] Fix sample.env example to a leaf task (not pipeline)

async_execute_bin is the pipeline kickoff — exactly the task the coherence
note says must NOT be split per-task. Switch the example to a leaf task
(send_webhook_notification) and warn against listing coupled pipeline tasks
until ExecutionContext carries the transport choice.

Addresses Greptile review feedback.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
We don't use the pgmq project (github.com/pgmq/pgmq) — no extension, no
Python package, no copied SQL. The queue is a bespoke SKIP LOCKED schema
(see the extension-free decision on UN-3533). Rename the 5 prose spots
that called our substrate "PGMQ" to "PG Queue" so the code no longer reads
as if it depends on that project. PGMQ stays only as prior-art reference
in the decision record, not as the name of our substrate.

Docs-only; no code or behaviour change.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… LOCKED dequeue (#2036)

* UN-3537 [FEAT] PG Queue Phase 9a — extension-free queue schema + SKIP LOCKED dequeue

The storage + dequeue primitive the routing seam will route to. Inert:
nothing in dispatch() calls it yet (the PG branch still routes to Celery),
so zero behaviour change even on the integration branch.

Backend (schema only — SHARED_APPS, cross-org infra, shared schema):
- new pg_queue app; 0001 is 100% makemigrations-generated (pg_queue_message
  table + dequeue index). No CREATE EXTENSION, no DB-side function — plain
  Django. managed=True model doubles as a typed read handle.

Workers (the client; first direct-DB worker capability):
- queue_backend/pg_queue: send / read / delete + QueueMessage. read() runs
  one atomic UPDATE ... FOR UPDATE SKIP LOCKED ... RETURNING (visibility-
  timeout pattern): claim+commit, process outside the txn, delete on
  success; a crash lets vt expire and the row redelivers (at-least-once,
  no double-delivery, VACUUM-safe, PgBouncer txn-pooling compatible).
- connection.py reuses the backend DB_* env -> PgBouncer in cloud, direct
  in OSS (UN-3533 decision).
- psycopg2-binary==2.9.9 (matches backend), promoted to a direct dep.

Tests: 4 unit (mocked SQL shape) + 4 integration (real Postgres) proving
roundtrip, vt-hiding, vt-expiry redelivery, and no-double-delivery.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3537 [FIX] org_id empty-string default, not null=True (Django S6553)

A string-based field shouldn't have two "no data" values (NULL and "").
Use default="" for "no org" (leaf tasks) instead of null=True; regenerate
the generated 0001 accordingly. The client coerces None -> "" since the
column is now non-null.

Addresses SonarCloud S6553.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3537 [FIX] Address Phase 9a review feedback

Robustness + docstring-accuracy fixes on the PG-queue primitive:

- client: roll back on error and recover a dead connection (drop the
  cached conn on OperationalError/InterfaceError -> next call reconnects);
  add close() + context manager. One connection blip no longer wedges the
  9c consumer.
- client.read(): raise on non-positive vt_seconds/qty (vt<=0 is a silent
  double-delivery window).
- client.delete(): WARN when no row was removed (the at-least-once
  re-delivery signal was previously swallowed).
- QueueMessage: slots=True + doc that the payload is decoded JSONB (the
  dict stays mutable; frozen freezes the binding only).
- connection: parameterise create_pg_connection(env_prefix); wrap connect
  with a self-identifying error log (non-secret host/port/dbname/schema);
  drop the stale pg_queue_read reference; soften the DB_HOST default claim.
- models: fix stale docstrings describing a removed 0002 / DB-level
  defaults / pg_queue_read function.
- tests: narrow integration skip to OperationalError (don't mask
  ImportError/bugs/permission errors as a green skip); rollback before the
  teardown DELETE; integration conn delegates to
  create_pg_connection(env_prefix="TEST_DB_"); add unit coverage for
  create_pg_connection, read() validation, and error-rollback.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3537 [FIX] Address Phase 9a review round 2 + SonarCloud S2068

- perf: ORDER BY (vt, msg_id) so the (queue_name, vt, msg_id) index drives
  an indexed top-N instead of sorting the whole visible backlog on each
  read() — a real regression on a deep queue.
- recovery: a failed rollback now proves the connection is dead, so a
  poisoned connection is recycled regardless of which psycopg2 error
  subclass was raised (not only Operational/Interface); also checks
  conn.closed. Closes the "one blip wedges the consumer" gap more fully.
- docs: clarify the contract — at-least-once means a message CAN be
  processed more than once after vt-expiry; SKIP LOCKED only prevents
  concurrent double-claim. "(no double-delivery)" -> "(no concurrent
  double-claim)".
- tests: cover the recovery/ownership branches (owned conn recycled on
  OperationalError and on failed rollback; injected conn never closed;
  close() owned-vs-injected).
- security (SonarCloud S2068): the create_pg_connection test mapped a
  literal "PASSWORD": "p" -> use a runtime uuid token so it isn't flagged
  as a hard-coded credential. Resolves the Security Rating C gate failure.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
#2043)

* UN-3538 [FEAT] PG Queue Phase 9b — enqueue PG-routed tasks to Postgres

When a task is opted into WORKER_PG_QUEUE_ENABLED_TASKS, dispatch() now
serialises it as a TaskPayload and enqueues to pg_queue_message instead of
Celery, returning a PgDispatchHandle (.id = msg_id). A process-singleton
PgQueueClient is reused across dispatches. The Celery path is unchanged and
the default-empty flag routes everything to Celery — zero behaviour change.

- queue_backend/pg_queue/task_payload.py: TaskPayload TypedDict + to_payload()
  — the producer<->consumer wire contract. This is the *contents* of the
  pg_queue_message.message JSONB column, distinct from the backend's
  PgQueueMessage *row* model (envelope vs payload — they nest, not duplicate).
- dispatch(): PG branch enqueues + returns; cutover log (INFO, once per task);
  .warning docstring that an opt-in requires the 9c consumer running, else the
  task enqueues but never executes.
- sample.env: same warning on the flag.
- backend pg_queue model: note that `message` holds a TaskPayload.
- tests: TestDispatchRouting (PG->enqueue, Celery->send_task), TestCutoverLog,
  to_payload shape + real-PG integration (dispatch lands a decodable row).

Leaf-only (migration-coherence) — pipeline tasks stay on Celery until
execution-level routing (9e). INERT by default.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3538 [FIX] Address Phase 9b review feedback

- dispatch: per-thread PG client via threading.local — a libpq connection
  isn't safe for concurrent use across threads; correct under prefork AND
  a -P threads pool (gevent would need a pool — noted, out of scope).
- dispatch: log the routing *decision* BEFORE send() so a first-dispatch
  failure (DB down / unmigrated) doesn't suppress the one announcement;
  wrap the enqueue with a logger.exception breadcrumb (a raw psycopg2.Error
  or a json.dumps TypeError on a non-serialisable arg otherwise propagate
  with no "PG-routed dispatch" context). No Celery fallback retained.
- dispatch: hoist `pg_queue = queue or _DEFAULT_PG_QUEUE` (one source).
- fairness: share a FairnessPayload TypedDict; to_dict() -> FairnessPayload;
  TaskPayload.fairness uses it instead of a loose dict[str, Any].
- pg_queue/__init__ docstring: describe 9b state (no longer "inert, rides
  Celery"); sample.env: drop a duplicated routes-to-Celery sentence.
- tests: no-fallback-on-enqueue-failure (Critical gap) + log-ordering pin;
  lazy-init + reuse of the per-thread client; cutover-log assertions
  updated for the new message.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ack) (#2045)

* UN-3539 [FEAT] PG Queue Phase 9c — consumer poll loop (claim → run → ack)

PgQueueConsumer drains pg_queue_message and runs each claimed task
in-process: poll_once() claims a batch (SKIP LOCKED + vt via
PgQueueClient.read), runs it via current_app.tasks[name].apply(throw=True),
and acks by deleting on success. Task failure -> leave the row (vt expiry
redelivers, at-least-once); unknown task -> drop + error (no poison loop).
run() adds an empty-queue backoff loop + SIGTERM/SIGINT graceful stop;
main() is a `python -m` entrypoint (env-configured).

Completes the leaf-first end-to-end path: 8a route -> 9b enqueue -> 9a
store -> 9c consume+run. Validated live on the dev stack: a real
send_webhook_notification routed to PG was claimed by the consumer, POSTed
to Slack (HTTP 200), and acked (row removed).

Deployment note: the consumer PROCESS must bootstrap the worker app (import
the task modules) so current_app.tasks resolves the task — an
entrypoint/rollout concern, not consumer logic. Documented in the module
docstring; the rollout phase wires a consumer container that boots the app.

Tests: 5 unit (run+ack, fail->no-ack, unknown->drop, empty, graceful stop)
+ 1 real-PG integration (enqueue -> poll -> execute -> ack).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3539 [FIX] Dedup PG integration test fixtures into conftest (SonarCloud)

The connect-to-dev-DB + skip-if-unreachable/unmigrated block was copy-pasted
across test_pg_queue_client / test_dispatch_pg / test_pg_queue_consumer
(6.3% duplication on new code, over the 3% gate). Extracted into shared
pg_conn / pg_client fixtures + an integration_pg_conn() helper in
tests/conftest.py; the three files now use them. Behaviour unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3539 [FIX] Address Phase 9c review feedback

- fairness header rebuilt from the payload on the PG run path (was dropped)
  — a PG-routed run now mirrors the Celery dispatch contract.
- poison-message guard: surface read_ct (QueueMessage + dequeue RETURNING),
  drop a task that keeps failing past max_attempts (default 5, env
  WORKER_PG_QUEUE_CONSUMER_MAX_ATTEMPTS) with a loud ERROR carrying the
  payload, instead of redelivering forever.
- malformed payload (missing task_name) -> distinct "missing task_name"
  drop log, not a misleading "unknown task None".
- run() wraps poll_once() so a transient read/DB blip backs off and
  continues instead of tearing down the loop (the client self-recovers).
- ack: WARN when delete() finds no row (task exceeded vt -> possible
  double-run).
- __init__ validates positive tuning params; main() wires backoff_max +
  max_attempts via env (prefix helper); non-main-thread signal-install
  failure -> WARNING.
- tests: poison drop, missing task_name, fairness-header propagation,
  multi-message batch, ack-no-row warn, construction validation, backoff
  growth/reset, poll-error resilience; fixed the read mock for read_ct.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3539 [FIX] Address Greptile review on the 9c consumer

- [P1] batch shared-vt window: default batch_size 10 -> 1 so each message
  gets its own visibility window. The batch's vt is set atomically at claim
  but messages run sequentially, so with batch_size>1 the tail could exceed
  vt and be re-claimed mid-run (double-run). Batching stays opt-in; doc the
  vt > batch x worst-case-duration constraint.
- [P2] QueueMessage.read_ct: drop the misleading =0 default (a "never
  claimed" state the dequeue can't produce — read_ct is always >=1). 0 would
  silently bypass the poison guard; now required, all callers supply it.
- [P2] __init__: reject backoff_max < poll_interval (else min(poll*2, max)
  shrinks the backoff below poll_interval instead of growing).
- [P2] dedup the ack-miss warning: client.delete() now logs at DEBUG; the
  consumer keeps the contextual WARNING (it names the task).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…guard (#2047)

* UN-3541 [FEAT] Wire PG-queue consumer into run-worker.sh + bootstrap guard

Make the 9c PG-queue consumer (UN-3539) runnable via the normal worker
flow, safely. Split from #2045 to keep that PR focused on consumer logic.

- Launcher (pg_queue_consumer/__main__.py): set WORKER_TYPE to the source
  worker (default notification) BEFORE `import worker`, so the right tasks
  register. worker.py loads exactly one worker type's tasks; a bare import
  would load the general worker's and drop every notification as unknown.
- run-worker.sh: `pg-queue-consumer` type runs `python -m pg_queue_consumer`
  (not a celery command) from the workers root; queue via env.
- Startup guard: PgQueueConsumer.run() refuses to start on an empty task
  registry — fail loud instead of silently dropping every message.
- Drop the hard-coded 8086 health port (consumer runs no health server).

Integration fixes found during live dev-test (real send_webhook_notification
end-to-end → Slack HTTP 200):
- Opt-in status: consumer is not part of `all`; shown only when running.
- Log path: detach writes to an absolute $worker_dir/$type.log so -L/-C
  find it (also fixes the same latent bug for pluggable workers).
- PID discovery: get_worker_pids matches the `python -m` invocation, so
  --status / -k / -r work for the consumer.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3541 [FIX] Address PR #2047 review feedback

- run-worker.sh: verify liveness after a detached launch (kill -0 + tail);
  `set -e` doesn't apply to `&`, so a fork that died on startup was reported
  as "started" — acute for the health-port-less consumer (High).
- consumer.py: log the registered application task names at startup so a
  *wrong* (non-empty but mismatched) registry is diagnosable — the guard only
  catches an *empty* one (Medium observability).
- consumer.py: type _env() with a TypeVar (was `cast: type -> object`,
  erasing types at the typed __init__) and name the offending var on a bad
  cast instead of a context-free ValueError (Medium).
- tests: add the guard's positive / require_tasks=False bypass / built-in
  filter arms (only the failure arm was covered).
- get_worker_pids: warn on pgrep rc>1 (operational/regex error) instead of
  collapsing it to "not running" (Low).
- run-worker.sh: extract the repeated "pg_queue_consumer" literal into a
  readonly constant (SonarCloud S1192).
- Comment accuracy: build_celery_app configures but does not import tasks;
  `notification` is the worker that owns the leaf task, not the task itself;
  generalise the "every notification dropped" example.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3541 [FIX] Address Greptile review feedback

- __main__.py: move the WORKER_TYPE mutation + `import worker` bootstrap into
  a guarded _bootstrap_and_run() called only under `__name__ == "__main__"`,
  so an accidental import (test/IDE/type-checker walking __main__) no longer
  overwrites WORKER_TYPE or triggers a full worker-app bootstrap.
- run-worker.sh: list `pg-queue-consumer` in the usage/--help worker types,
  plus a note for its WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE / _QUEUE overrides.
- run-worker.sh: clarify the post-launch liveness check is a best-effort
  fast-fail for *immediate* (sub-second) crash-on-import/bad-config faults,
  not a connectivity check; kept general (an immediate crash can hit any
  worker) and noted the `all` subshells overlap the 1s with inter-launch sleep.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…at) (#2051)

* UN-3544 [FEAT] PG-queue consumer liveness endpoint (poll-loop heartbeat)

Give the consumer a /health HTTP endpoint for K8s liveness probing, like
every other worker — but keyed only on a poll-loop heartbeat.

- consumer.py: track _last_poll_monotonic (refreshed at the top of poll_once,
  so a loop wedged on a long task goes stale and is detectable — which
  pgrep-based --status and the launch-liveness check cannot see). Expose
  seconds_since_last_poll() / is_poll_stale(). main() starts a LivenessServer
  when WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT is set (opt-in), stops it on exit.
- LivenessServer: tiny stdlib HTTP server (/health, /healthz, /livez) → 200
  while fresh, 503 once stale. Deliberately lean: a liveness probe must report
  only "is this process making progress?", NOT broker/API reachability or
  resource pressure (those would crash-loop a healthy consumer on a blip).
  So it does NOT reuse the shared HealthChecker (which also bundles an
  api_connectivity check that is both wrong for liveness and currently broken
  — its `from .api_client_singleton` import points at a non-existent module;
  tracked separately).
- run-worker.sh: default port 8090 (outside the 8080-8089 core range, no
  collision), exported opt-in; documented in --help.
- tests: heartbeat fresh/stale + a real bind-and-GET 200->503 endpoint test.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3544 [FIX] Address PR #2051 review feedback

Important:
- Liveness bind failure (OSError/port-in-use) now degrades gracefully: log
  and continue probe-less instead of aborting the consumer before it polls.
  Verified live (2nd consumer on a taken :8090 keeps draining).
- run-worker.sh: 8090 collided with the first auto-discovered pluggable worker
  (8090 + count); pluggable discovery now starts at 8091, 8090 reserved.
- LivenessServer.stop() is defensive (can't raise into main()'s finally and
  mask the real run() exception) and warns if the thread outlives the join.
- Empty WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT now hits the clean opt-out
  (_env treats "" as unset) instead of int("") crashing at launch.

Suggestions:
- LivenessServer: guard double start(); reset state in stop(); wrap
  serve_forever so a thread crash is logged; route handler errors to the
  logger (log_message=pass was hiding log_error too); guard wfile.write
  against client disconnects; single clock read per request.
- Type _httpd/_thread as HTTPServer|None / Thread|None via TYPE_CHECKING
  (drops the Any import); restores static checking.
- run-worker.sh: status line shows the effective -p override, not the map default.
- Docstrings: phrase the helper trigger in terms of `port`; note 0.0.0.0 bind;
  document that a fast-failing loop stays healthy by design (liveness must not
  couple to backend reachability).
- tests: heartbeat-stamped-before-read (pins top-of-poll), /healthz + /livez
  aliases + unknown-path 404, double-start rejection.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3544 [FIX] Address SonarCloud issues

- consumer.py: use logger.exception() in the liveness-bind except block
  (preserves the traceback; S6679).
- test: lift the walrus assignment out of the PgQueueConsumer() argument
  list — plain `client = MagicMock()` first (clearer; S6328).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3544 [FIX] Address Greptile review feedback

- LivenessServer handler strips the query string before matching paths
  (self.path includes it), so /health?probe=k8s returns 200 not 404. Added
  a query-string case to the alias test.
- Document the stale-threshold trade-off prominently: the heartbeat is frozen
  during task execution, so WORKER_PG_QUEUE_CONSUMER_HEALTH_STALE_SECONDS is
  also an upper bound on single-task wall-clock (a longer task trips the probe
  → restart → redelivery). 60s suits the sub-second leaf; raise it above
  max(batch_size x worst_case_task_seconds, backoff_max) for longer tasks.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…laim (#2052)

* UN-3546 [FEAT] Priority-ordered PG-queue dequeue + concurrency-safe claim

Start enforcing the load-independent part of fairness — pipeline_priority
(L3) — directly in the single-table dequeue: higher priority is claimed
first, FIFO (msg_id) within a priority. The org-tier (L1) / workload (L2)
axes + burst_max admission stay deferred to the fair-admission orchestrator.

- Schema: add `priority` (smallint, default 5 = FairnessKey.DEFAULT_PRIORITY)
  to pg_queue_message; swap the dequeue index to (queue_name, priority DESC,
  msg_id) so the priority-ordered claim stays an indexed top-N.
- Enqueue: dispatch() writes priority from fairness.pipeline_priority; a bare
  dispatch (fairness=None) writes the neutral default.
- Dequeue: ORDER BY priority DESC, msg_id.

Also fixes a latent concurrency bug in the original dequeue (9a):
`UPDATE ... WHERE msg_id IN (SELECT ... FOR UPDATE SKIP LOCKED LIMIT n)` can
OVER-CLAIM under concurrent writers — EvalPlanQual re-evaluates the LIMIT
subquery when a row it tried to lock was concurrently touched, so one claim
can return more than n rows. Switched to the canonical PGMQ-safe shape: lock
candidates in a CTE, then `UPDATE ... FROM locked WHERE q.msg_id = locked.msg_id`,
which locks exactly n rows once. The trailing SELECT re-orders RETURNING (which
is otherwise unspecified) so batched claims come back in priority order too.

Tests: priority selection (one-at-a-time) + batch ordering against real
Postgres; send writes priority; dispatch wiring (fairness + neutral default);
read param order. Verified live end-to-end via dispatch()->claim-order (9>7>3a>3b>1).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3546 [FIX] Address PR #2052 review feedback

- Validate priority at the write boundary: client.send() raises ValueError on
  out-of-range (mirrors its vt_seconds/qty guards) — an out-of-range value
  would silently jump/sink the row in the priority DESC claim order.
- Add a DB CheckConstraint (priority 1..10) as the backstop no ORM/raw writer
  can bypass (migration 0003). check= (not condition=) — repo is on Django 4.2.
- Soften the "indexed top-N" comments (client.py + models.py): the dequeue is
  an index walk with vt<=now() as a per-row filter, NOT a guaranteed top-N —
  vt is not in the index, so in-flight (future-vt) high-priority rows are
  scanned past on each claim; the orchestrator's admission is the high-backlog
  answer. Update the module docstring to the CTE FROM-join shape; fix the
  fairness.DEFAULT_PRIORITY symbol reference; drop the duplicated param-order
  comment.
- Tests: send() range-guard (parametrized) + DB CheckConstraint backstop;
  concurrent-writer over-claim guard (two readers, no batch exceeds qty —
  the regression test for the EvalPlanQual fix); vt × priority (visible low
  beats invisible high); FIFO-within-band for multi-member batch bands.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3546 [FIX] Address Greptile review feedback

- Concurrency test: assert the drain worker terminated after join (a hung
  worker now fails the test instead of passing silently while conn_b.close()
  races its in-flight queries).
- Priority-bounds drift guard: backend models.py and workers fairness.py are
  separate codebases that can't import each other, so the DB constraint bounds
  (1/10) duplicate fairness.MIN/MAX_PRIORITY. Replaced the hardcoded "42" reject
  test with test_db_check_constraint_matches_fairness_bounds — raw-inserts at
  MIN/MAX (accepted) and MIN-1/MAX+1 (CheckViolation), pinning the DB constraint
  to the fairness range so a future widening that misses one side fails loudly.
  Documented the canonical source in the model comment.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3546 [DOCS] Note check=/condition= handling on the constraint migration

Breadcrumb for a future Django upgrade: `check=` is correct on the pinned
Django 4.2 (deprecated 5.1, removed 6.0); fresh installs always replay under
the shipped Django, so leave it. When the pin reaches >= 6.0, squash (or do the
behaviour-preserving check= -> condition= edit) so a from-scratch migrate runs.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…R_BACKEND) (#2053)

* UN-3548 [FEAT] PgBarrier — Postgres fan-in barrier (3rd WORKER_BARRIER_BACKEND)

Add a Postgres Barrier substrate selected by WORKER_BARRIER_BACKEND=pg (default
stays chord). Moves the fan-in aggregation ("wait for N header tasks, then fire
the callback with their results") onto a pg_barrier_state row — the same DB that
holds the PG queue, so an execution can coordinate without Redis/RabbitMQ. The
9e pipeline on-ramp primitive.

Mirrors RedisDecrBarrier 1:1 — same Barrier protocol, fairness plumbing,
Celery-dispatched header tasks with .link/.link_error, empty->None,
missing-execution_id->raise, mid-loop dispatch cleanup. Defaults-off, zero
behaviour change until the flag flips.

- Schema (backend/pg_queue): pg_barrier_state (execution_id PK, remaining,
  results jsonb, aborted, expires_at) + migration 0004.
- pg_barrier.py: PgBarrier + barrier_pg_decr_and_check / barrier_pg_abort.
  Atomic decrement is ONE statement (UPDATE ... SET remaining = remaining-1,
  results = results || jsonb_build_array(%s) ... RETURNING remaining, results,
  aborted) — row lock serialises concurrent decrements so exactly one sees 0;
  no Lua. Guards: reads aborted in the same statement (never fires partial),
  row-missing / negative-remaining clean up without firing, callback dispatched
  BEFORE row delete. Orphan bound via expires_at + opportunistic sweep in
  enqueue (periodic sweep is the backstop).
- __init__.py: BarrierBackend.PG -> PgBarrier() in get_barrier().

Tests: protocol shape, TTL env validation, enqueue (upsert/links/fairness/
stale-reset/expiry-sweep/mid-loop-cleanup), decr paths (pending/complete-fires/
aborted/negative/missing/unserialisable), abort (claim+delete/dedup), and a real
two-connection decrement-atomicity check (exactly one sees 0). Selector PG case.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3548 [FIX] Address PR #2053 review feedback

High:
- Abort is now ONE atomic statement: `WITH claimed AS (DELETE ... RETURNING) ...`
  — claim+teardown in a single transaction (no claimed-but-not-deleted window;
  a crash rolls back so a sibling retries). This makes the `aborted` column
  redundant — dropped it; the decrement's "row missing -> abandoned" branch now
  covers the failed-task case. The callback can only fire when remaining hits 0
  (all tasks succeeded), so a failed task (which deletes the row) can never let a
  partial-results fire.
- Dropped the per-enqueue global orphan sweep (unbounded DELETE on the hot path,
  deadlock-prone, shared the UPSERT txn). Reclaim is a future periodic sweep.
- A NUL byte survives json.dumps but jsonb rejects it -> catch the DataError and
  tear the barrier down (fail fast) instead of hanging to expiry.

Medium:
- Post-dispatch row delete is best-effort (logged, not raised) so a delete error
  can't mask the already-fired callback; documented the no-double-fire invariant
  (last decrement + max_retries=0).
- Added a DB CheckConstraint (expires_at > created_at) — the one writer-proof
  invariant; Meta comment warns off a `remaining >= 0` check (teardown needs
  negative). Softened the "periodic sweep" comments to future/not-yet-shipped.

Low:
- Extracted shared `barrier_ttl_seconds()` + `CallbackDescriptor` into barrier.py;
  both backends import them (redis keeps back-compat aliases). signature_kwargs
  dict instead of inline ** spread. Atomicity comment notes the per-transaction
  premise.

Tests: callback-dispatch-failure-preserves-row; decrement-after-abort-no-fire;
atomicity through barrier_pg_decr_and_check (two threads, exactly one fires);
list-result-as-single-element; NUL-byte teardown; DB-constraint; max_retries=0.
92 barrier tests pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3548 [DOCS] Drop stale aborted-column reference in PgBarrier docstring

The wire-model docstring's enqueue step still listed `aborted = false` as an
UPSERT column after the column was removed (abort now dedups via DELETE …
RETURNING / row existence). Remove it so a reader doesn't hunt for — or
re-add — a column that no longer exists.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…tor_lock) (#2056)

* UN-3553 [FEAT] PG Queue 9d slice 1 — leader-election lease (orchestrator_lock)

First slice of 9d (orchestrator/reaper): the singleton-guarantee primitive
the reaper loop and a future fair-admission gate hang off. Ships dark —
nothing acquires leadership yet, so merging changes no runtime behaviour.

Why now: 9d was skipped in the merged spine (9c -> liveness -> priority ->
PgBarrier) and is the safety net 9e needs — without a reaper, every
at-least-once hang / orphaned barrier bottoms out at the 6h TTL with no
recovery. The reaper must run as exactly one instance, so leader election is
the foundation.

Lease, not advisory lock: leadership is a TTL'd row UPDATE (take it if the
leader is free or its lease is stale), not pg_advisory_lock. Session-scoped
advisory locks don't survive the transaction-pooled PgBouncer the queue
connects through (UN-3533) — a plain UPDATE is one transaction, pooling-safe.
All time comparisons are server-side (now()), so candidate clock skew can't
split leadership.

- backend/pg_queue: PgOrchestratorLock single-row model (id PK, leader,
  acquired_at) + CheckConstraint(id=1); generated migration 0005 + a
  reversible RunPython seeding the one free row. Free = empty leader
  (follows the PgQueueMessage.org_id no-nullable-text convention).
- workers/queue_backend/pg_queue/leader_election.py: LeaderLease
  (try_acquire/renew/release), lease_seconds_from_env() (default 10s,
  loud-on-misconfig), default_worker_id(). Instance-owned self-recovering
  connection.
- tests: 20 real-PG tests. Load-bearing properties — concurrent try_acquire
  yields exactly one winner; renew returns False after a stale-lease
  takeover (the signal that stops a stalled leader). Plus lease-expiry
  takeover, release-frees-immediately, non-holder no-ops, env validation,
  single-row constraint.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3553 [FEAT] Address review: connection ownership, logging, recovery tests

Toolkit + SonarCloud review on #2056:

- [High] _owns_conn ownership guard: LeaderLease now mirrors PgQueueClient —
  an injected connection is never closed/swapped on a transient error (it
  would otherwise silently re-point an injected TEST_DB_/caller connection at
  a fresh DB_-env one). _get_conn only recreates an OWNED missing/closed conn.
- [Medium] Log the owned-connection discard in _cursor (worker_id + exc type)
  — a silent rebuild on the reaper singleton correlates with missed renews.
- [Medium] Test the recovery machinery: owned-conn recovered on
  OperationalError, owned-conn recovered when rollback fails, injected-conn
  never swapped. Plus two documented-invariant gaps — same-holder re-acquire
  on a fresh lease returns False, and release after a takeover is a no-op.
- [Low] release() branches on rowcount — only logs "released" when it really
  freed the lease; a no-op release logs debug (truthful post-mortems).
- [Low] Scope the lease_seconds<=0 guard to the explicit-arg branch (dead on
  the env path, which already rejects <=0).
- [Low] Document the exception-propagation contract (raise == "leadership
  unknown, stop acting"), relabel the durable Usage example.
- [Low] Migration: note the seed row is load-bearing (future reaper-bootstrap
  should self-heal with INSERT ... ON CONFLICT DO NOTHING).
- SonarCloud S117: rename the migration's get_model local to lock_model.

25 leader-election tests pass; makemigrations --check clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3553 [FEAT] Address Greptile: idempotent worker id + clearer renew log

- default_worker_id() is now cached (functools.cache) → idempotent per
  process, so a caller passing it inline in a retry/restart loop can't drift
  the worker id out from under renew()/release(). Lazy (first-call), so it's
  fixed after a fork rather than shared across children. Test asserts
  idempotency.
- renew()'s failure warning now reads "not the current leader (taken over by
  another candidate, or the lease was never held)" — accurate for the
  non-holder-renew case too, and fixes the "took over"->"taken over" grammar.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…sweep (#2058)

* UN-3554 [FEAT] PG Queue 9d slice 2 — reaper process + barrier-orphan sweep

Builds on the leader-election lease (UN-3553): stands up the reaper process —
the leader-elected recovery loop — with its first recovery job, the
barrier-orphan sweep. Ships dark (launched explicitly, never in the default
worker set).

- reaper.py:
  - sweep_expired_barriers(conn): DELETE pg_barrier_state WHERE expires_at <
    now() RETURNING + loud per-orphan WARNING. The documented PgBarrier
    backstop — reclaims barriers whose header tasks never all completed; a
    late in-flight decrement then finds no row and abandons (existing
    semantics). Execution terminal-status recovery is 9e's job.
  - PgReaper: leader-elected loop. Each cycle renews (steps down to standby if
    renew() returns False), else tries to acquire; sweeps ONLY while leader.
    run() loops with graceful SIGTERM/SIGINT shutdown + lease release on exit.
    Guard: cycle interval must be shorter than the lease window, or the leader
    thrashes leadership between renews.
  - reaper_interval_from_env() (WORKER_PG_REAPER_INTERVAL_SECONDS, default 5s),
    main(), python -m queue_backend.pg_queue.reaper entrypoint.
- leader_election.py: expose lease_seconds property (reaper validates its
  cycle against it).
- test_pg_reaper.py: 15 tests — env/construction guards (interval < lease),
  leadership gating (sweeps only when leader, steps down on renew-fail,
  releases on stop), real-PG sweep (reclaims only expired, leaves fresh).

Out of scope: run-worker.sh wiring + liveness (followup, like 9c-followup);
pipeline recovery (counter reconstruction, per-stage re-enqueue) deferred to
9e where there's a real PG pipeline to test against.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3554 [FEAT] Address review: sweep rollback, conn recovery, tick contract

Toolkit + SonarCloud review on #2058:

- [CRITICAL] sweep_expired_barriers now rolls back on error before re-raising
  (conn is manual-commit; an un-rolled-back failure left it in an aborted-txn
  state, poisoning every later cycle → silent self-perpetuating stall). This
  also clears SonarCloud's C-reliability gate.
- [HIGH] On a failed sweep PgReaper discards its OWNED connection so the next
  tick reconnects — covers a poisoned/dead handle that `.closed` alone misses.
- [MEDIUM] renew() raising now sets _is_leader=False before propagating
  (honours the lease's "raise == stop acting" contract).
- [MEDIUM] release() failure on shutdown is logged (with the lease-window
  note) instead of silently suppressed.
- [MEDIUM] signal-handler ValueError is re-raised unless we're off the main
  thread (don't mislabel an unrelated ValueError).
- [MEDIUM/type-design] tick() returns a TickOutcome(was_leader, reclaimed)
  NamedTuple instead of an overloaded `-1` int sentinel; added an is_leader
  property; lease param typed against a new LeaderLeaseLike Protocol.
- [LOW] Reworded the sweep race comment, the step-down log (same-cycle
  re-acquire), and the run() self-recovery comment for accuracy.
- Tests: +8 — run() swallows a tick error; owned-conn recreated-when-closed;
  injected-conn never swapped; failed-sweep discards owned conn; sweep SQL
  contract (no DB); sweep rolls back on error; step-down-then-reacquire;
  renew-raising steps down. 23 total; drive paths via is_leader, no
  private-flag poking.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3554 [FIX] Use pytest.approx for float-equality asserts (SonarCloud S1244)

The two reaper-interval asserts compared float returns with ==; the values
are exactly representable so it was harmless, but pytest.approx is the correct
idiom and clears the S1244 reliability bugs.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3554 [FEAT] Address Greptile: manual-commit Layer-4 fixture, close owned conn

- The real-Postgres fixture (barrier_conn) was autocommit, which made
  sweep_expired_barriers' own commit() a no-op and its rollback unreachable —
  so Layer 4 tested a different mode than the production reaper
  (create_pg_connection is manual-commit). Switched the fixture to manual-commit
  and added explicit commits to the seed/read/cleanup helpers, so the real-DB
  tests now exercise the sweep's commit (and rollback) in production mode.
- run() now closes its OWNED sweep connection on shutdown (an injected one is
  the caller's). Harmless for the main() process but keeps PgReaper clean if
  ever embedded / test-driven.

23 tests pass; ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…pe + pg-queue set (#2059)

* UN-3555 [FEAT] PG Queue 9d slice-2 followup — run-worker.sh reaper type + pg-queue set

Wires the reaper (UN-3554) into run-worker.sh and adds a pg-queue set so the
whole PG-queue group launches in one shot. Mirrors the 9c -> 9c-followup split
(launcher wiring as its own slice). Liveness probe is a separate follow-on.

- workers/pg_queue_reaper/: thin entrypoint package (python -m pg_queue_reaper
  -> queue_backend.pg_queue.reaper.main). No worker-app bootstrap (the reaper
  runs no Celery tasks), unlike pg_queue_consumer; exists so the process has a
  stable name run-worker.sh can launch + pgrep-match.
- run-worker.sh:
  - reaper / pg-queue-reaper type — opt-in (NOT in `all`), launches
    `python -m pg_queue_reaper`, runs from workers root, --status/-k/-r match
    via the `-m` pgrep branch (now covers consumer + reaper). Lease/interval env
    documented in --help.
  - pg / pg-queue SET — run_pg_queue_set launches consumer + reaper together
    (always detached, like `all`). Restart (-r pg-queue) kills both members then
    relaunches. list_core_worker_dirs skips the set alias (no phantom status
    entry). Help documents the Celery `all` set and the PG `pg-queue` set as
    independent, runnable in parallel for a dual-transport (strangler-fig) setup.

Dev-tested live: `run-worker.sh reaper -d` acquires leadership + ticks and shows
RUNNING in --status; `run-worker.sh pg-queue` brings up consumer + reaper, both
RUNNING, reaper leader, no phantom set entry; bash -n clean; --help renders.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3555 [FEAT] Address review: set start-failure propagation, restart-kill guard

Toolkit review on #2059:

- [High] run_pg_queue_set swallowed member start-failures (backgrounded
  subshells' status was lost, banner+return 0 unconditional). Now each member
  runs in a FOREGROUND subshell so run_worker's own `return 1` on a
  crash-on-start is captured; the set returns non-zero if any member fails.
  (The reviewer's kill -0 on `$!` would false-fail — that PID is the launcher
  subshell, which exits the instant it backgrounds the nohup'd worker; the
  foreground-subshell return value is the correct signal and isolates `cd`.)
  Documented that the set always runs detached (ignores -d).
- [Medium] Dispatch now `|| exit 1` so a member start-failure reaches the
  script exit code — the only programmatic startup signal (reaper has no
  health port yet).
- [Medium] Set-restart aggregates kill_one_worker failures and aborts the
  relaunch if a member survives SIGKILL (avoids a duplicate consumer
  double-polling Postgres). Mirrors kill_workers' discipline.
- [Medium/minor] Startup banner prints `Queues: n/a` when empty (reaper).
- [Low] Reworded pg_queue_reaper/__main__ docstring: the launcher DOES export
  WORKER_TYPE for every worker; the accurate claim is the reaper neither reads
  nor mutates it (vs the consumer overwriting it before `import worker`).
- [Low] Added a smoke test that pg_queue_reaper.__main__ re-exports the real
  reaper main (guards the `python -m pg_queue_reaper` launch path against an
  ImportError regression).

Dev-tested: `run-worker.sh pg-queue` returns exit 0 with both members up;
24 reaper tests pass; bash -n + ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3555 [FEAT] Address Greptile: set partial-start teardown + --logs set alias

- run_pg_queue_set: on a partial start-failure (one member up, the other
  crashed) tear the whole set down before returning 1 — kill both members so a
  restart-on-failure relaunch can't spawn a second instance over the survivor
  (the consumer would double-poll Postgres). All-or-nothing, mirroring the
  restart path's discipline.
- tail_logs: handle the pg/pg-queue set alias — `--logs pg-queue` now tails
  both member logs (pg_queue_consumer + pg_queue_reaper) instead of looking for
  a non-existent workers/pg-queue/pg-queue.log and printing a misleading
  "no log file" error. Mirrors how list_core_worker_dirs skips the set value.

Dev-tested: `--logs pg-queue` tails 2 files (consumer + reaper); bash -n clean;
24 reaper tests pass.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…ader) (#2061)

* UN-3556 [FEAT] PG Queue 9d — reaper liveness probe (heartbeat + is_leader)

Closes the gap flagged in #2059's review: a reaper that crashes after startup
was invisible (opt-in skip in --status + no health port). Mirrors the
consumer's liveness (UN-3544).

- PgReaper heartbeat: _last_tick_monotonic stamped at the START of every tick
  (a standby tick counts as progress — liveness tracks the loop, not
  leadership) + seconds_since_last_tick() / is_tick_stale().
- ReaperLivenessServer: lean HTTP probe (mirrors the consumer's LivenessServer)
  — /health (also /healthz, /livez) returns 200 while the tick loop is fresh,
  503 when stale. Payload also surfaces is_leader (which pod holds the lease —
  useful for 9e debugging). The 200/503 verdict is PURELY the heartbeat, never
  leadership (a standby is healthy) or DB reachability (a blip must not
  crash-loop a fine process).
- main() wires it from WORKER_PG_REAPER_HEALTH_PORT (unset → no server, no
  stray port); staleness window from WORKER_PG_REAPER_HEALTH_STALE_SECONDS
  (default 30s, comfortably above the 5s tick interval). Bind failure degrades
  gracefully (logs, runs probe-less).
- run-worker.sh: reserve port 8086 for the reaper, export the health-port env
  in the reaper special-case, document the two new env vars in --help.
- Tests (+13, 37 total): heartbeat fresh/stale + tick refresh; liveness server
  200-fresh / 503-stale / is_leader reflected / 404 / double-start; health
  staleness env default+override+invalid; server-disabled-when-no-port.

Dev-tested live: `run-worker.sh reaper -d` → GET :8086/health →
{"status":"healthy","check":"pg_reaper_tick","is_leader":true,...}.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3556 [REFACTOR] Extract shared LivenessServer (SonarCloud duplication)

SonarCloud flagged the new ReaperLivenessServer as duplicating the consumer's
LivenessServer (~19 lines of HTTP-probe boilerplate, over the 3% new-code gate).

Extracted one generic LivenessServer into queue_backend/pg_queue/liveness.py —
parameterised by a freshness callable + the payload's check/age labels + an
optional extra-status callable (the reaper's is_leader). Both sides are now thin
subclasses that preserve their exact constructor signatures and wire payloads:
- consumer LivenessServer(consumer, port=, stale_after=) → check="pg_queue_poll",
  seconds_since_last_poll (unchanged on the wire; its tests pass untouched).
- ReaperLivenessServer(reaper, port=, stale_after=) → check="pg_reaper_tick",
  seconds_since_last_tick, is_leader.

The boilerplate now lives once → duplication cleared, consumer behaviour
preserved. reaper 37 tests + consumer liveness/health tests green; ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3556 [FEAT] Address review: validate health port, type/guard liveness, tests

Toolkit review (10 findings; several already resolved by the dedup refactor
0d2b3f7 — the threading-alias, the query-strip comment, and the
extract-shared-server follow-up itself):

- [Medium] Port parse: extracted _reaper_health_port_from_env() — names the var
  on a bad value (no more context-free int('abc') crash) and range-checks
  0-65535 at parse time, so an out-of-range value can't escape the bind catch as
  OverflowError inside start(). main() uses it.
- [Medium] liveness.py: typed _httpd/_thread as HTTPServer|None / Thread|None via
  TYPE_CHECKING (was Any in the shared server) — restores the lifecycle invariant
  + type-checking on .shutdown()/.join()/etc.
- [Low] LivenessServer.__init__ re-validates stale_after > 0 (a direct caller
  could otherwise build an always-503 probe).
- [Low] bound_port docstring: clarified the port=0 / not-started case.
- [Low] _DEFAULT_HEALTH_STALE_SECONDS comment references the interval constant,
  not a hard-coded "5s".
- [Medium/Low test gaps] +9 tests: port-env helper (unset/empty/valid/non-int
  named/out-of-range); main() wiring (parsed port reaches the wiring + health
  stopped in finally; port=None when unset); _maybe_start_health_server OSError
  graceful-degrade → None + logger.exception; stale_after<=0 constructor guard.

reaper 46 + consumer liveness 10 green; ruff clean. The shared-server extraction
(flagged as a follow-up) was already done in 0d2b3f7.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3556 [FEAT] Address Greptile: protect core payload + per-process log label

- extra_status_fn could silently clobber core payload fields (status / check /
  age_key / stale_after_seconds) that a monitor reads. Now the handler builds
  extra fields first and overlays the core fields, so core ALWAYS wins — a
  future caller's extra dict can't corrupt the status a monitor parses.
  Test: an extra_status_fn returning {"status":"HACKED",...} leaves status
  "healthy" and check intact, while a non-reserved extra key is preserved.
- The dedup refactor moved the consumer's liveness warnings to the shared
  liveness logger with generic text, so log-based filtering keyed on the old
  "PG-queue consumer: ..." would miss them. Added a log_label param (default
  "pg-queue"); consumer passes "pg-queue consumer", reaper "pg-queue reaper", so
  the messages stay attributable to the source process.

57 tests green (reaper 47 + consumer liveness 10); ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
* UN-3560 [FEAT] PG Queue 9 — run-worker.sh `-L celery` log alias

`./run-worker.sh -L pg-queue` already tails the PG-queue set's logs, but
there was no symmetric way to tail only the Celery set: `-L` (no arg) tails
EVERYTHING (Celery + PG-queue consumer/reaper), since list_core_worker_dirs
includes the PG worker dirs.

Add a `-L celery` log alias (mirror of `-L pg-queue`): tails every worker
log EXCEPT the PG-queue members (pg_queue_consumer, pg_queue_reaper). Logs-only
— the Celery set is still run via 'all'.

run-worker.sh only: CELERY_SET constant + a branch in tail_logs() + usage/examples.

Dev-tested with stub PG logs: -L = 11 files, -L celery = 9 (PG excluded),
-L pg-queue = 2 (PG only). bash -n clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3560 [FIX] -L celery review — complement wording + single-source PG members + dedup

Address PR #2066 toolkit review:

- Comment accuracy (P1, x2): reword 'mirrors PG_QUEUE_SET' / 'mirror of the
  pg-queue alias' — the celery set is the COMPLEMENT (all minus the two PG
  members), in a different branch, not a mirror. Drop the directional 'below'.
- Modeling (P2): add a single 'PG_QUEUE_MEMBERS' source of truth (readonly assoc
  array) so 'celery = all - pg-queue' stays correct by construction; the celery
  branch tests membership via it instead of hand-rolling consumer/reaper.
- Simplification (P1): collapse the near-duplicate 'all' and 'celery' tail_logs
  branches into one loop with a membership-guarded skip.

Deferred (P2, inherited): set-aware empty-result message for the shared zero-log
guard — spans all three set aliases, best done as one follow-up.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…2062)

* UN-3559 [FEAT] PG Queue 9e PR 1 — execution transport seam (inert)

Establish the per-execution transport seam for the 9e coupled-pipeline
migration: the transport a workflow execution rides ("celery" | "pg_queue")
is resolved once at the creation chokepoint and carried in the task payload.
Inert — transport always resolves to "celery", so behaviour is unchanged.

Design (chosen = payload-carry, not a WorkflowExecution column):
workers/queue_backend/pg_queue/9e-design.md.

- core: WorkflowTransport enum + DEFAULT_WORKFLOW_TRANSPORT (shared vocabulary).
- backend: resolve_transport() hardwired to celery (signature shaped for PR 3's
  Flipt wiring); create-execution internal API returns "transport";
  execute_workflow_async adds it to the async_execute_bin kwargs.
- workers: scheduler threads transport from the create response into the
  dispatch kwargs; async_execute_bin_general / _execute_general_workflow carry
  it onto the live WorkflowContextData.
- tests: backend resolver + enum (5), worker WorkflowContextData carry/default
  (2), dispatch characterisation updated (+ backend-resolved-transport thread).

Out of scope: live PG routing + per-batch idempotency key (PR 2); Flipt canary
wiring (PR 3); rollout (ops).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3559 [FIX] 9e PR 1 review — fail-closed transport coercion + doc/test fixes

Address PR #2062 review (PR Review Toolkit findings):

- Validation/silent-failure: add normalize_transport() (core) — fail-closed
  coercion of any inbound transport to a known value (unknown/None -> celery +
  warn). Applied at the scheduler read boundary and in WorkflowContextData
  __post_init__, so a garbage payload value can't reach the PR 2 fan-out read.
- Comment accuracy: WorkflowContextData.transport comment now present-tense
  (carried in PR 1; fan-out read lands in PR 2).
- Dead-code hygiene: add 'transport' to EXECUTION_EXCLUDED_PARAMS so the legacy
  execute_bin -> create_workflow_execution path can't TypeError.
- Two-resolution-sites: documented the deliberate two-site design + the PR 3
  single-chokepoint requirement at execute_workflow_async.
- transport.py: drop the unused logger; leave a PR-3 fail-closed marker.
- Design doc: correct anchors (transport.py / internal_api_views / workflow_
  helper, not execution.py:126), class name (WorkflowContextData not
  WorkflowExecutionContext), scope claims (stage-1 only in PR 1), and remove the
  hard-coded Flipt version/date/live-state.
- Tests: normalize_transport (passthrough / invalid->celery / None / logging),
  WorkflowContextData invalid-transport coercion.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3559 [FIX] 9e PR 1 — resolve transport after execution_id guard (greptile P1)

Move the normalize_transport(...) extraction below the `if not execution_id`
guard in _execute_scheduled_workflow. Previously it ran before the guard, so an
error response with no execution_id logged a misleading `[exec:None]` context
and discarded the computed transport. Now transport is only resolved once
execution_id is known non-empty.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3559 [FIX] 9e PR 1 — SonarCloud: drop unused resolve_transport params + dict literal

Address SonarCloud code smells on PR #2062:

- python:S1172 (x3): resolve_transport() no longer declares the unused
  workflow_id/pipeline_id/organization_id params — the PR-1 seam is inert and
  needs no inputs. PR 3 reintroduces them (keyed for Flipt) when it wires the
  evaluation; the two call sites (internal_api_views view, execute_workflow_async)
  now call resolve_transport() with no args. Tests updated.
- Replace dict(...) constructor with a {...} literal in
  test_workflow_context_transport._make_context.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…(dispatch backend override + barrier decrement core) (#2067)

* UN-3561 [FEAT] PG Queue 9e PR 2a — live-PG-pipeline inert foundation (dispatch backend override + barrier decrement core)

First slice of 9e PR 2 (the live PG execution pipeline), split 2a/2b/2c.
2a is the inert foundation: the two seams the live switch (2c) consumes,
each with zero behaviour change on the default path.

- dispatch(backend=...): per-call transport override. None (default, every
  call site today) keeps the env allow-list decision via select_backend —
  byte-identical. When set it wins over the allow-list, so 2c can route a
  whole execution's header/callback onto PG without opting their task names
  into WORKER_PG_QUEUE_ENABLED_TASKS (allow-list is for leaf tasks; the
  pipeline's migration unit is the execution).
- Extract _barrier_pg_decrement(...) plain core out of the @worker_task
  barrier_pg_decr_and_check (now a thin delegator). 2c calls the core in-body
  on the PG-consumed path (a PG-consumed task fires no Celery .link, so the
  decrement runs in-body — fire-and-forget self-chaining).

Inert by construction: default barrier backend is chord (Celery executions
never import the PgBarrier module), and no call site passes backend=. Net
behaviour change: NONE. Transport threading + live switch land in 2c;
per-batch idempotency in 2b.

Tests: dispatch backend-override (3) + decrement-core extraction (3, incl.
real-PG in-body decrement + verbatim delegation). pg_barrier/dispatch/
barrier/routing suites green; ruff clean; worker-app bootstrap clean under
WORKER_BARRIER_BACKEND=pg (both barrier tasks registered, get_barrier→PgBarrier).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3561 address review (muhammad-ali-e): loud in-transaction guard, resolve_backend helper, comment/test fixes

- pg_barrier: enforce the "own committed transaction" decrement contract
  loudly — _barrier_pg_decrement raises at entry if the shared connection is
  mid-transaction (was prose-only; the 2c in-body caller is the real risk).
  Safe for existing paths: Celery .link enters idle, tests use autocommit
  conns → always idle. Fix the inaccurate "one call = one _cursor() txn"
  parenthetical (the delete paths open a 2nd txn) and drop the rot-prone
  "(fire-and-forget self-chaining)" jargon.
- routing: extract resolve_backend(task_name, override) — the override-wins-
  else-allow-list precedence now lives in one self-documenting place (2c
  reuses it); dispatch() calls it instead of inlining the None-means-auto rule.
- dispatch: reword the backend= docstring — avoid the "payload" term collision
  (local to_payload var) and the stale routing.py cross-ref; point at the live
  carrier WorkflowContextData.transport + 9e-design.md.
- tests: +fairness-reaches-row on the backend= override path; +real-row wrapper
  test that catches core-param drift (the mocked delegation test couldn't);
  +open-transaction guard test. Kept the mocked test as the keyword-forwarding pin.

Deferred (LOW, reviewer-aligned): BarrierDecrementResult TypedDict union — lands
in 2c when the in-body caller actually branches on the status.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3561 address greptile (3× P2): docstring refs + routing-log suppression note

All doc/comment-only, no logic change. greptile gave 4/5 "safe to merge"; these
are the staleness its findings flagged, introduced by the resolve_backend
extraction in the prior review round:
- dispatch module docstring + routing "Scaffold posture" now name resolve_backend
  (wrapping select_backend) as the routing seam, not select_backend alone.
- Note at the _pg_routing_logged log-once site that it's keyed on task name only,
  so an override-then-allow-list cutover won't re-announce (benign: override =
  pipeline headers vs allow-list = leaf tasks, no overlap expected; the allow-list
  config is still announced by _log_allow_list_once).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…g_batch_dedup + claim_batch / clear_execution_batches) (#2068)

* UN-3562 [FEAT] PG Queue 9e PR 2b — per-batch idempotency primitive (pg_batch_dedup + claim_batch / clear_execution_batches)

Second slice of 9e PR 2, after 2a (#2067). Inert idempotency primitive: the
durable per-batch dedup marker 2c wires into the at-least-once PG path.

Why: the PG queue is at-least-once, so process_file_batch can be redelivered
after a crash-before-ack → re-run batch + double-decrement the barrier
(non-idempotent, max_retries=0). Recon showed existing per-file protection is
only partial (Redis lock released after write; WFE COMPLETED skips tool re-exec
but not necessarily the destination write; FileHistory is cross-execution only),
so a durable per-batch gate is needed; per-file status stays the partial-crash
backstop.

- backend: PgBatchDedup model + migration 0006 — table pg_batch_dedup with a
  UniqueConstraint(execution_id, batch_index) (the ON CONFLICT target; its
  execution_id-leading index also serves the cleanup DELETE). Django-managed,
  extension-free, same posture as the sibling pg_queue models.
- workers (pg_barrier.py, reusing the barrier's _cursor() → one PG conn per
  worker child): claim_batch(execution_id, batch_index) -> bool (atomic
  INSERT ... ON CONFLICT DO NOTHING RETURNING; True=first/decrement,
  False=redelivery/skip) + clear_execution_batches(execution_id) -> int
  (barrier-teardown cleanup; reaper sweep is the backstop).

No call-site wiring — claim/clear + batch_index threading + transport switch
land in 2c. Inert: new table + two helpers, no callers. Net behaviour: NONE.

Tests: +8 real-PG (first-claim, redelivery-rejected, distinct-batch,
distinct-exec, concurrent-exactly-one-winner, clear-only-target,
clear-empty-zero, reclaim-after-clear). Migration applied to dev DB,
makemigrations --check clean, bootstrap clean under WORKER_BARRIER_BACKEND=pg.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3562 address review (muhammad-ali-e P1-P8): fix reaper-backstop docstring claim, batch_index constraint, race test, nits

- P1 (verified bug): the docstrings claimed the reaper's barrier-orphan sweep
  reclaims orphaned pg_batch_dedup markers — it doesn't. sweep_expired_barriers
  DELETEs only pg_barrier_state (no cascade), so orphaned markers leak today.
  Reworded both PgBatchDedup + clear_execution_batches docstrings to state the
  leak honestly + flag the dedup-orphan sweep as intended future work.
- P4: add CheckConstraint(batch_index >= 0) (writer-proof, mirrors
  PgQueueMessage.priority) + a test that the DB rejects a negative index.
  Regenerated migration 0006 to include it.
- P6: claim_batch docstring no longer says it decrements; defers the single
  decrement to the caller (the function only inserts the marker).
- P7: generalized "partial per-file protection" → "not fully idempotent on
  redelivery" so the rationale can't rot.
- P5: documented created_at as observability-only (future age-based sweep).
- P2: REQUIRE_PG_TESTS env → skip becomes fail, so the idempotency primitive
  can't ship untested-green in CI where PG is expected.
- P3: strengthened the race test — pre-build N=8 conns in the main thread,
  align claims with threading.Barrier, loop 5 trials (forces the contended
  ON CONFLICT path instead of a serial fast-path).
- P8: hoisted import os to module top.

35 pg_barrier + 9 dedup tests green; migration applied to dev DB,
makemigrations --check clean; ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…fire-and-forget) (#2069)

* UN-3563 [FEAT] PG Queue 9e PR 2c — live PG fan-out/barrier/callback (fire-and-forget)

Wires the coupled pipeline's fan-out → barrier → callback onto the PG queue for
a transport=="pg_queue" execution. Gated: resolve_transport() still returns
celery (PR3 Flipt flips it), so the whole PG branch is present-but-unreachable —
default path byte-identical. Orchestrator task (async_execute_bin) stays on
Celery (hybrid); routing it onto PG is a 2d follow-up.

- barrier.py: Barrier Protocol + CeleryChordBarrier/RedisDecrBarrier accept (and
  ignore) a `transport` param; CallbackDescriptor gains an optional `backend`.
- orchestration_utils._barrier_for_transport: pg_queue → fresh PgBarrier()
  (bypasses the WORKER_BARRIER_BACKEND singleton), else the singleton.
- pg_barrier.PgBarrier.enqueue(transport): pg_queue → fire-and-forget mode —
  _dispatch_header_pg sends each header via dispatch(backend=PG) with an injected
  _barrier_context {execution_id, batch_index, callback_descriptor}, no .link;
  descriptor marked backend=pg_queue; UPSERT block also clears pg_batch_dedup
  (greptile #2068 reuse-reset). _fire_barrier_callback self-chains the callback
  onto PG when backend==pg_queue. clear_execution_batches at finalise + abort.
  run_batch_with_barrier(): claim → work → in-body _barrier_pg_decrement;
  redelivery skips; exception → barrier_pg_abort.
- file_processing.process_file_batch(_barrier_context=None): core routes None →
  _run_batch_stages (celery chord path), else → run_batch_with_barrier.
- general/api fan-outs thread transport into create_chord_execution.

Tests: +8 PgBarrier fire-and-forget + 2 orchestration routing + 2 process_file_batch
routing. Each test file green alone; ruff clean. End-to-end forced-pg dev-test
pending before PR.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 fix SonarCloud S1172: drop unused task_instance from _run_batch_stages

The extracted _run_batch_stages never uses task_instance — its only purpose
(deriving celery_task_id) happens in _process_file_batch_core before the call.
Removed the param + updated both call sites. _process_file_batch_core keeps
task_instance (it reads .request.id). Routing test mocks with *a, unaffected.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 address review (muhammad-ali-e, 15): strand-on-failure hardening + typing/dedup/docs/tests

Decision (with reviewer): reaper-as-safety-net for the un-catchable strand
windows + fix what's catchable + document + gate on PR3.

Failure handling:
- [#69 Critical] run_batch_with_barrier wraps BOTH work + decrement in the abort:
  a decrement-side failure (guard / DB / last-batch callback dispatch) tears the
  barrier down in-body instead of stranding to expiry.
- [#79] extracted _abort_barrier_in_body — logs when the teardown itself fails
  (was silently suppressed under a misleading "torn down" message).
- [#74/#81] documented the two un-catchable strand windows (hard-crash-during-work,
  post-commit callback-dispatch-fail) as a HARD reaper dependency for PR3.
- [#86] finalise cleanup split into independent try/excepts with distinct logs.

Typing / clarity:
- [#1] BarrierContext(TypedDict) for _barrier_context (header fan-out, run_batch_with_barrier,
  process_file_batch).
- [#3] renamed CallbackDescriptor "backend" -> "transport" (WorkflowTransport value;
  avoids the QueueBackend "pg" collision).
- [#27] is_pg_transport() predicate in core; used in orchestration_utils + pg_barrier.
- [#20] extracted _dispatch_pg() — single home for cycle-avoiding local import + backend=PG.
- [#35] normalize_transport() at the general worker entry (parity w/ api/scheduler).
- [#94] log when a header has no queue option.
- [#9/#13] fixed born-stale comment + kwargs-not-args docstring.

Tests (+#37/#41): last-batch self-chains callback to PG + cleans up barrier/dedup;
decrement-failure aborts; PG-branch mid-loop dispatch-failure deletes row; header
args/queue/pre-existing-kwargs preservation. 137 barrier/dedup/routing tests green;
bootstrap clean under WORKER_BARRIER_BACKEND=pg.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 fix run_batch_with_barrier strand-window doc inconsistency (review)

The second "NOT catchable" bullet conflated two different things: it described
the in-body catchable abort ("the abort here removes the row") and a *software*
callback-dispatch failure — but that failure is already caught + torn down by
step 3's wrap (paragraph 1), so it doesn't belong under the un-catchable
heading, and on the PG path _fire_barrier_callback IS the enqueue so "committed
but before the enqueue" couldn't both hold.

Rewrote the bullet to the genuinely un-catchable window: a hard crash BETWEEN
the decrement committing (remaining→0) and the callback enqueue completing —
decrement committed (redelivery blocked by the marker), process gone before the
callback enqueues or any abort runs, row survives to expiry, reaper-only
recovery. Explicitly notes a software dispatch failure is the catchable case.
Keeps this list an accurate spec for the PR-3 reaper-recovery dependency.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 address greptile (#2069, 2): clear dedup on mid-loop PG failure + carry fairness on PG callback

Both in the gated PG path (greptile 4/5, safe to merge).

- Issue 1: PgBarrier.enqueue mid-loop dispatch-failure handler now also calls
  clear_execution_batches on the PG path. Earlier headers may have committed a
  claim_batch marker; with the barrier row deleted, their in-flight
  barrier_pg_abort is a no-op (already_aborted) and never reaches the clear
  inside it, so reclaim the markers directly here.
- Issue 2: the PG callback now carries the producer's fairness. Added
  _fairness_from_headers() to reconstruct the FairnessKey from the stored
  x-fairness-key headers and pass it to _dispatch_pg, so the callback rides the
  same org/priority as the Celery path (was always default priority).

Tests: +fairness-carried / +fairness-none-safe on _fire_barrier_callback;
extended the PG mid-loop test to assert an already-claimed marker is reclaimed.
75 barrier/dedup tests green; bootstrap clean under WORKER_BARRIER_BACKEND=pg.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 fix SonarCloud S3776: reduce PgBarrier.enqueue cognitive complexity (17→under 15)

Extracted the per-header dispatch loop into PgBarrier._dispatch_headers — the
deeply-nested for→try/except→if/else→if (PG-vs-celery branch + mid-loop failure
teardown + PG dedup-clear) was the complexity driver. enqueue now calls the
helper; behaviour identical. radon: enqueue C(11)→B(6); ruff C901 passes.
75 barrier/dedup tests green; ruff + ruff-format clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 fix greptile #2069: mid-loop dedup-clear test passed for the wrong reason

The pre-seeded claim_batch marker was wiped by enqueue's UPSERT block (the
reuse-reset DELETE) before the dispatch loop, so the mid-loop
clear_execution_batches deleted 0 rows — the count==0 assertion passed on the
UPSERT, not the guard under test. Now the first dispatch side-effect claims the
marker AFTER the UPSERT (simulating a fast PG consumer), so the mid-loop clear
is what removes it. Verified: with the clear disabled the marker orphans (count=1).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…s ERROR (gates PR 3) (#2070)

* UN-3564 [FEAT] PG Queue 9e — reaper-recovery: mark stranded executions ERROR

The reaper only DELETEd expired pg_barrier_state rows — a stranded execution
vanished silently (stuck in EXECUTING). This makes it RECOVER them — the hard
dependency from PR 2c review for enabling the PG transport (un-catchable strand
windows otherwise bottom out at the ~6h barrier expiry).

Per expired barrier the leader (recover_expired_barriers), best-effort + per-exec:
- Marks the execution ERROR via the internal API (the path the normal callback
  uses — business state never goes direct-DB; the API is functional: execution_time,
  error truncation, attempts, events/notifications, multi-tenant boundary). Message
  distinguishes remaining>0 (work incomplete) vs remaining==0 (callback never fired).
  Reads status first and SKIPS if already terminal (a remaining==0 row can be a
  COMPLETED exec whose row-delete failed; update_execution has no terminal guard) or
  if the row has no org.
- Reclaims pg_batch_dedup + pg_barrier_state directly in PG (same boundary as the
  rest of queue_backend). Recover-THEN-delete: a failed mark leaves the row for the
  next sweep to retry (single-leader → no double-claim).

- backend: organization_id column on PgBarrierState + migration 0007 (reaper needs
  it for the org-scoped status API).
- workers: PgBarrier.enqueue stamps organization_id into the UPSERT; PgReaper holds
  a lazily-built InternalAPIClient; sweep_expired_barriers → recover_expired_barriers.

Tests: reaper suite reworked — real-PG recovery w/ fake API client (mark-ERROR
remaining>0/==0, terminal-skip no-overwrite, org-missing skip, API-failure leaves
row, dedup reclaim, tick-via-real-conn). 109 reaper/barrier/dedup tests green.
Dev-tested vs real InternalAPIClient+backend: PENDING stranded → ERROR + cleaned;
COMPLETED → NOT overwritten + cleaned (terminal-guard verified end-to-end).

Deferred: callback re-fire for remaining==0 (needs callback_descriptor on the row).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* UN-3564 fix SonarCloud S6741: avoid IndexError-prone update_calls[0] in reaper tests

Replaced `api.update_calls[0]` (which Sonar can't prove non-empty → IndexError
risk) with single-element unpacking `(call,) = api.update_calls` — removes the
index and additionally asserts exactly one status mark was made. Behaviour
identical; 52 reaper tests green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3564 address review (muhammad-ali-e, 11): fix read-failure→false-ERROR (Critical) + hardening

- [Critical] _execution_status now RAISES when the status read fails
  (get_workflow_execution returns success=False, never raises) — a transient
  blip no longer reads as "non-terminal" and flips a COMPLETED execution to
  ERROR; the caller's except retains the row for retry. + test.
- [Medium] use ExecutionStatus.is_completed (single source of truth) instead of
  a local _TERMINAL_STATUSES frozenset that could drift; dropped the fake
  in (_TERMINAL_STATUSES) parens.
- [Medium] remaining is NOT-NULL int → typed int, three-way branch
  (>0 work-incomplete / ==0 callback-never-fired / <0 already-torn-down) with
  accurate messages.
- [Medium] re-guard the barrier DELETE on `expires_at < now()` + only clear
  dedup when the barrier row was actually reclaimed — a same-id re-enqueue
  (UPSERT resets expires_at) is no longer torn down mid-recovery.
- [Medium] org-missing now LEAVES the row (doesn't erase the only recovery
  handle) + logs ERROR; PgBarrier.enqueue logs ERROR at write time if a barrier
  is enqueued without an org (should never happen).
- [Low] recover_expired_barriers emits an aggregate summary; escalates to
  logger.error when a non-empty sweep recovers nothing (systemic: API down).
- [Low] refreshed the stale "(future) periodic sweep" comment in pg_barrier.

Tests: _FakeApiClient models the real success/failure contract + records
file_execution; +read-failure-retains-row, +file_execution=False, +multi-row
isolation (one fails, others recover), +org-stamping x3 (stamp/default/UPSERT
refresh). 115 reaper/barrier/dedup tests green; ruff + ruff-format clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3564 fix SonarCloud S6741 (new instance): index-free get_calls access

A new occurrence introduced by the prior review-round test
(test_status_read_passes_file_execution_false used api.get_calls[0][2]). Replaced
with single-element unpacking — [(_exec_id, _org, file_execution)] = api.get_calls
— index-free and additionally pins exactly one status read. 55 reaper tests green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3564 address greptile #2070: re-arm race, status=None edge, aggregate skip-vs-fail

- [main] re-arm race: the ERROR mark now fires only after a re-check that the
  row is STILL expired (_still_expired) immediately before marking — so a same-id
  re-enqueue (expires_at reset to future) between the sweep SELECT and the mark
  can't get its freshly-running execution flagged ERROR. The DELETE stays guarded
  on expires_at < now() as the second line of defence. + test (re-arm via the
  status-read side-effect) asserting no mark + row left intact.
- status=None on a success=True read no longer falls through to mark ERROR —
  treated as indeterminate: skip + leave the row for the next sweep. + test.
- aggregate logging distinguishes genuine failures (exceptions) from benign skips
  (terminal / re-armed / no-status / no-org): the systemic "recovered NONE / API
  down" ERROR escalates only on real failures, not on all-skipped sweeps. + caplog
  test that an all-org-missing sweep doesn't escalate.

58 reaper + 51 barrier tests green; ruff + ruff-format clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3564 fix SonarCloud S125: drop code-like tuple from a test comment

The comment ended with a parenthesised tuple "(exec_id, org, file_execution)" —
a complete Python expression S125 reads as commented-out code. Reworded to prose
(and the stale `sweep_expired_barriers`→recover rename + a couple of inline
`foo()` prose mentions neutralised while here). Comment-only; tests unaffected.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
…esolution (#2071)

* UN-3570 [FEAT] PG Queue 9e PR 3 — Flipt canary wiring for transport resolution

Replace the hardwired Celery result in resolve_transport() with a real
decision: an env master-gate (PG_QUEUE_TRANSPORT_ENABLED, default off) that,
when on, consults the Flipt boolean flag pg_queue_execution_enabled. Fails
closed to Celery on any error.

- entity_id = execution_id (per-execution sticky %-rollout; resolved once,
  carried in the task payload so an in-flight execution never re-buckets)
- context carries org/workflow/pipeline ids (str-coerced — UUIDs in the gRPC
  map<string,string> context would be swallowed as False and silently force
  Celery) for segment rollouts
- both creation chokepoints wired: internal_api_views.create_workflow_execution
  (scheduler path) and workflow_helper.execute_workflow_async (API/manual/async)

Gated off by default — no behaviour change until the flag is enabled, which
requires PG consumers deployed first (deploy-ordering safety).

Tests: 15 cases in test_transport.py (gate off/on, flag true/false, fail-closed,
entity/context shape, UUID-coercion regression).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3570 address review: type honesty, fail-closed org/Flipt guards, observability

- Widen resolve_transport id params to `str | UUID` (callers pass UUIDs; the
  body already str-coerces) — resolves the SonarCloud type-mismatch and makes
  the contract honest.
- Fail closed to celery when organization_id is missing (str(None) must never
  reach the Flipt org segment) — the helper path reads it from StateStore which
  can be empty.
- Fail closed + loud warning when the gate is ON but FLIPT_SERVICE_AVAILABLE is
  not true, so a blind Flipt can't masquerade as a healthy 100%-celery canary.
- Log the resolved transport on the gate-ON path (deliberate celery vs pg_queue
  vs blind are now distinguishable); drop the inaccurate "import-time fault"
  clause from the fail-closed comment.
- Doc: organization_id is Organization.organization_id (X-Organization-ID), not
  the DB pk.
- sample.env: document that pg_queue needs all three (gate + FLIPT_SERVICE_AVAILABLE
  + flag).
- +2 tests (missing-org → celery, gate-on + Flipt-unavailable → celery).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3570 address greptile: organization_id None type + FLIPT_SERVICE_AVAILABLE parse parity

- organization_id annotation widened to `str | UUID | None` — the helper path
  passes UserContext.get_organization_identifier() which can be None at runtime;
  the existing `if not organization_id` guard handles it, so the type should
  admit None rather than mislead callers/static analysis.
- FLIPT_SERVICE_AVAILABLE check now parses exactly like FliptClient (`.lower()`,
  no `.strip()`) so the two can't disagree on a whitespaced value like " true"
  (which would otherwise skip the "Flipt blind" warning).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…G + shared TaskPayload contract (#2072)

* UN-3574 [FEAT] PG Queue 9e 2d — orchestrator (async_execute_bin) on PG + shared TaskPayload

Route the orchestrator task onto the PG queue when transport==pg_queue, so a
pg_queue execution's orchestrate → fan-out → barrier → callback all run on PG
(was hybrid: orchestrator on Celery, only fan-out/callback on PG).

- Promote TaskPayload + FairnessPayload (the PG-message wire contract) to
  unstract.core; workers re-export them so existing imports keep working and the
  backend producer + worker consumer share one definition.
- Backend PG producer (pg_queue/producer.py): enqueue a TaskPayload row to
  pg_queue_message via the ORM, mirroring the workers' PgQueueClient.send. UUIDs
  in args/kwargs are JSON-coerced (the message JSONField has no Django encoder).
- Backend dispatch (execute_workflow_async): when transport==pg_queue, enqueue
  async_execute_bin to PG (general → "celery", api → "celery_api_deployments")
  instead of celery_app.send_task; task_id becomes "pg:<msg_id>".
- Scheduler dispatch: pass backend=QueueBackend.PG when is_pg_transport(transport)
  (dispatch() already had the per-call override from 2a).

Gated off by default — Celery path unchanged. Executor (tool run) + log workers
stay Celery (PR B). Tests: 5 producer unit tests; workers regression suite green
after the core move. Dev-tested end-to-end: a real API deployment ran
async_execute_bin on the orchestrator PG consumer (not Celery), then fan-out →
barrier → callback on PG to COMPLETED, clean teardown.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3574 address review: reduce complexity, consolidate fairness contract, fix dispatch fork

SonarCloud (cognitive complexity 18→<15) + review round on #2072:
- Extract WorkflowHelper._dispatch_orchestrator_task — the PG-vs-Celery fork
  moves out of execute_workflow_async (complexity down) and becomes unit-testable.
- HIGH: a `dispatched` flag — a post-dispatch bookkeeping failure no longer flips
  a running (already-enqueued) execution to ERROR; only pre-dispatch failures do.
- HIGH: task_id is now bare `str(msg_id)` (was `pg:{msg_id}`) — one format across
  entry paths, matching the worker PgDispatchHandle.id.
- Drop the dead Celery-only TimeoutError handler (manual poll never raised it;
  the generic handler covers any stray case).
- Consolidate the fairness contract in unstract.core: WorkloadType (StrEnum) +
  FAIRNESS_MIN/MAX/DEFAULT_PRIORITY + FairnessPayload.workload_type Literal;
  workers re-export, backend producer + workflow_helper reference them (no more
  hand-built "api"/"non_api" literals or triplicated [1,10] bounds).
- Producer: log+re-raise on enqueue failure (parity with worker _enqueue_pg);
  document the TaskPayload.queue field as diagnostic-only.

Tests: +dispatch-fork suite (PG vs Celery, bare id, two org sentinels),
+producer boundary/datetime/failure cases, +scheduler backend=PG override
assertions. 34 backend + 99 workers green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3574 address sonar: use logging.exception() in the dispatch error handlers

Both handlers in execute_workflow_async were `logger.error(..., exc_info=True)`
inside `except Exception` → switch to `logger.exception(...)` (idiomatic, and
exc_info is implicit). The post-dispatch branch drops the redundant `{error}`
from the message since the traceback is now attached.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3574 address greptile: drop redundant stack_info=True from logger.exception

logger.exception() already attaches the active exception's traceback; stack_info
additionally dumps the current call stack, producing a second overlapping stack
trace per error in a direct except handler. Drop it.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…sh PG roles (#2073)

* UN-3566 [FEAT] PG Queue 9f — multi-queue consumer + named run-worker.sh PG roles

Make PG-queue workers first-class in run-worker.sh, runnable individually or as a
set, like the Celery workers — removing the per-process-env footgun.

- Multi-queue consumer: PgQueueConsumer takes queue_names: list[str] and polls
  them round-robin (per-queue read preserves the dequeue index's top-N; no queue
  starves another). WORKER_PG_QUEUE_CONSUMER_QUEUE is comma-parsed; a single
  value stays a one-element list (back-compat with the existing leaf consumer).
- run-worker.sh named roles (PG_CONSUMER_ROLES), each a registry-bound consumer
  with its source worker-type + queue list baked in (no manual env):
  pg-orchestrator-api / pg-orchestrator-general (async_execute_bin has distinct
  impls per registry → split), pg-fileproc / pg-callback (multi-queue, one
  process drains ETL+API). The role name rides in argv (python -m
  pg_queue_consumer <role>) so pgrep (-s/-k/-r) tells co-running roles apart.
- `./run-worker.sh -d pg` launches the 4 pipeline roles + reaper (mirrors `all`);
  each role also runs/kills/restarts/tails individually. resolve_log_file + the
  -L pg set + status/kill enumerate the roles.

Tests: round-robin aggregation, empty-queue-no-starve, empty-list reject,
comma-parse (incl single-value back-compat); existing consumer tests updated to
the list arg.

Dev-tested: `./run-worker.sh -d pg` → status shows 4 roles + reaper RUNNING; an
API execution drains orchestrator-api → fileproc (draining both file_processing
and api_file_processing) → callback → COMPLETED; `-r pg-fileproc` restarts only
that role, leaving pg-callback untouched.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3566 run-worker.sh: accept multiple worker types + tighten generic consumer pgrep

- Multiple positional worker types: `./run-worker.sh -d all pg` starts the Celery
  set and the PG set in one shot (loops WORKER_TYPES). Warns if >1 type is given
  without -d, since a non-detached set `wait`s and would block the rest.
- Tighten the generic pg_queue_consumer pgrep to end-anchored so --status/-k/-r
  for the generic consumer no longer also match (and aggregate/kill) the named
  role consumers, which run as `python -m pg_queue_consumer <role>` and have
  their own role-anchored pattern.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3566 address sonar: hoist PG role-name literals into constants

Define PG_ROLE_ORCH_API / _ORCH_GENERAL / _FILEPROC / _CALLBACK once and
reference them in PG_CONSUMER_ROLES, PG_QUEUE_MEMBERS and WORKERS, instead of
repeating each role-name string literal 4×.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3566 run-worker.sh: add 'celery' run alias for the Celery set (symmetric with 'pg')

`./run-worker.sh -d celery` now runs the Celery set (== `all`, excludes the PG
workers), so celery-only / pg-only / both read symmetrically:
  -d celery   (Celery only)
  -d pg       (PG only)
  -d celery pg | -d all pg   (both)

'celery' maps to "all" in WORKERS → dispatches to run_all_workers, is skipped by
list_core_worker_dirs (no phantom dir), and falls through to the restart-all
branch under -r. The pre-existing `-L celery` log-tail alias is unaffected.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3566 address review: -r pg kills roles, per-queue isolation, dedup, stale comments

Critical/High + cleanup from the #2073 review round:
- [Critical] `-r pg` (restart set) now kills the SAME members run_pg_queue_set
  launches (the 4 named roles + reaper), not just the generic consumer — was
  leaving the roles running, so a relaunch double-polled Postgres.
- [High] poll_once isolates each queue: a read/handle failure on one queue is
  logged and skipped so the others still run AND the work already done this cycle
  still counts (no false empty-queue backoff after a partial failure).
- [High] fixed the stale CELERY_SET comment that claimed "no 'celery' run alias".
- [Medium-High] run_pg_queue_set start-failure teardown aggregates kill_one_worker
  returns and surfaces a survivor (mirrors the restart path).
- [Medium] de-dup + copy queue_names (list(dict.fromkeys(...))) — a duplicate
  would double-read a queue; the copy stops a caller mutation bypassing validation.
- [Medium] _parse_queue_list warns when it drops empty entries (config typo).
- [Med/Low] comment fixes: only fileproc/callback are multi-queue; "4 roles +
  reaper" not "consumer + reaper"; "read once per cycle in list order" not
  round-robin. Cleanups: drop redundant export; cwd predicate uses PG_QUEUE_MEMBERS;
  spell out api_file_processing_callback in help.

Tests: +one-queue-failing-isolation, +batch_size qty per queue, +duplicate dedup.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…ices (gated profile) (#2074)

* UN-3576 [FEAT] PG Queue 9g — docker-compose PG consumer + reaper services (gated profile)

Make the already-built PG-queue pipeline runnable under docker compose, gated
OFF by default, so it runs in containers (not just the host run-worker script).
Designed one-service-per-component for a mechanical K8s mapping later.

- run-worker-docker.sh (image ENTRYPOINT): recognise `pg-queue-consumer` and
  `pg-queue-reaper` launch types -> exec the dedicated Python module instead of
  building a Celery worker command. Dispatched before the Celery two-path; no
  Dockerfile change.
- docker-compose.yaml: 5 services behind the off-by-default `pg-queue` profile
  (4 consumers: orchestrator-api / orchestrator-general / fileproc / callback,
  plus a single-instance reaper). Orchestrators + reaper are broker-free;
  fileproc + callback keep rabbitmq (they still hand off to the Celery executor
  / notifications until those are migrated).
- sample.env: document the new env vars + profile; the backend transport gate
  stays off (ramping traffic is a later step).

Non-regression: default `docker compose up` starts zero PG services. Even with
them running, executions route to Celery until the gate + Flipt flag are set.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3576 address review: guard PG interpreter, warn on missing consumer config, reject typo'd PG commands, comment accuracy

- run-worker-docker.sh: ensure_pg_interpreter() fails loudly if the venv python
  is missing (avoids a silent restart:unless-stopped crash-loop); WARN when
  WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE / _QUEUE are unset (non-compose launches);
  add a `pg-*|*-reaper` catch arm so a typo'd PG command fails loudly instead of
  silently becoming a default Celery worker (log-consumer etc. still pass
  through — verified by routing test).
- docker-compose.yaml: soften "broker-free" wording (fail-closed-to-Celery on a
  missing transport); spell out the executor/notification migration steps
  instead of the ③/④ markers; use the exact `pg_orchestrator_lock` name.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3576 address SonarCloud: add explicit default (*) arm to PG-dispatch case

The PG-queue dispatch case relied on an implicit no-match fall-through to the
Celery logic below. SonarCloud flags a case without a default; add an explicit
`*)` no-op arm documenting the intentional fall-through. Behaviour unchanged
(verified by routing test: legit worker types still reach Celery, typo'd PG
commands still rejected).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3576 address greptile: scope PG near-miss catch to pg- prefix only

Narrow the safety-net catch arm from `pg-*|*-reaper` to `pg-*` so it can never
intercept a pluggable worker whose name ends in `-reaper` (e.g. a future
bulk-reaper / log-history-reaper) — those now fall through to the Celery path.
The `pg-` prefix is reserved for PG-queue components; the exact reaper aliases
are already matched above, and pg-prefixed typos (pg-reapr, pg-queue-reapr) are
still rejected loudly. Verified by routing test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…al-write (inert) (#2080)

* UN-3581 [FEAT] PG Queue 9h-a — pg_periodic_schedule mirror table + dual-write (inert)

First inert slice toward replacing Celery Beat with a PG-backed periodic
scheduler (to be folded into the leader-elected reaper/orchestrator loop). This
slice only mirrors each scheduled pipeline's cron definition into a new Postgres
table; nothing reads it yet, so behaviour is unchanged.

- backend/pg_queue/models.py: new PgPeriodicSchedule (pipeline_id PK, org_id,
  workflow_id, pipeline_name, cron_string, enabled; last_run_at/next_run_at left
  NULL — the scheduler tick owns all cron computation in the next slice). Index
  on (enabled, next_run_at) for the future "due schedules" query.
- migration 0008 (generated; makemigrations --check clean).
- backend/scheduler/tasks.py: dual-write the mirror from the four schedule
  choke-points (create/update, pause, resume, delete) that already manage the
  django_celery_beat PeriodicTask. Toggles are placed right after task.save() so
  a downstream pipeline-status failure can't desync the mirror. Every mirror
  write is best-effort (try/except + log) — a mirror failure can NEVER break the
  existing Beat scheduling path.
- tests: 6 DB-free unit tests (field extraction, enable/disable, delete,
  delete-when-PeriodicTask-missing, best-effort swallow).

Inert / non-regressive: nothing reads the table; the PeriodicTask + celery-beat
container are untouched.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3581 address review: mirror real pipeline_name from helper, harden toggles, tighten types

- [High] pipeline_name now sourced from the Pipeline object in
  SchedulerHelper._schedule_task_job (pipeline.pipeline_name), not task_args[6]
  — which carries the synthetic "Pipeline job-<id>" label, never the user name.
  The mirror upsert moves to the helper (clean named ids, no positional arg
  parsing); create_or_update_periodic_task is Beat-only again.
- [Med] _mirror_periodic_schedule_set_enabled now bumps updated_at explicitly
  (queryset .update() does not fire auto_now) and logs when it matches 0 rows
  (a pre-existing/unmirrored schedule — backfill lands in ②b).
- [Med] dropped the positional task_args[N] parsing entirely (the helper passes
  named fields), removing the silent-placeholder-on-contract-drift risk.
- [Low] tightened mirror helper param types (str / str | None) — the callers
  already pass stringified UUIDs.
- [Nit] index renamed pg_sched_due_idx -> pg_periodic_schedule_due_idx (sibling
  convention); migration 0008 regenerated (not yet merged).
- tests: cover the standalone upsert (+enabled=False, +failure-swallow), the
  helper wiring proving the real pipeline_name flows, disable/enable/delete
  failure-swallow + still-calls-update_pipeline, and the 0-row log. 11 tests.

cron_string validation: the existing 5-field split unpack in
create_or_update_periodic_task already gates the Beat path before the mirror
runs; full cron semantics are validated by croniter in ②b (the reader).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…trator loop (per-schedule ownership) (#2081)

* UN-3596 [FEAT] PG Queue 9h-b — PG scheduler tick in the reaper/orchestrator loop (per-schedule ownership)

Adds the periodic-trigger half of the orchestrator: the leader-elected reaper
now also fires due, PG-owned schedules onto the PG queue — the Celery Beat
replacement — without Beat/RabbitMQ in the trigger path. Dark by default;
never double-fires.

- pg_scheduler.py (new): dispatch_due_schedules() scans pg_periodic_schedule for
  pg_owned + enabled + due rows, enqueues scheduler.tasks.execute_pipeline_task
  on the PG `scheduler` queue AND advances next_run_at in ONE transaction (a
  crash between can't re-fire). A NULL next_run_at records a baseline and does
  NOT fire (no burst when a schedule is handed over; matches Beat). A bad cron on
  one row is logged and skipped without blocking the others. croniter computes
  next-run; all time comparisons use the DB clock.
- pg_queue/models.py + migration 0009: pg_owned flag (default False = Beat owns
  it; the PG scheduler fires only owned rows) + due index (pg_owned, enabled,
  next_run_at). Default-false keeps the table inert until a schedule is handed
  over, and a schedule fires from exactly one side — never both.
- reaper.py: the leader tick runs the scheduler AFTER recovery (a scheduler error
  can't starve the recovery net).
- workers deps: add croniter (already a backend dep).
- run-worker.sh + docker-compose: pg-scheduler consumer role + service
  (profile-gated) that runs the fired execute_pipeline_task.

Out of scope (next slice ②c): the ramp control that flips pg_owned by percentage
+ disables the matching Beat PeriodicTask atomically (reusing the existing Flipt
mechanism), the one-time backfill, and retiring Beat.

Non-regression: pg_owned defaults False, so the reaper fires nothing until rows
are explicitly owned; recovery-only behaviour is unchanged. Tests: 10 scheduler
(real-PG) + 3 reaper-wiring; full reaper suite kept green via a scheduler stub.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3596 address review: per-row DB isolation, self-quiescing bad cron, typed rows, shared INSERT SQL, stronger tests

- [Critical] per-row fire (INSERT+UPDATE+commit) now wrapped in try/except →
  rollback + log + continue, so one bad row can't poison the connection or drop
  the rest of the batch (mirrors recover_expired_barriers). Baseline UPDATE too.
- [High] invalid cron now disables the row (enabled=FALSE) + logs once, instead
  of re-selecting it and emitting a traceback every ~5s tick forever.
- [Med] read step (SELECT now() + due scan) wraps rollback + re-raise so the
  conn isn't handed back in an aborted-txn state.
- [High] softened the "never double-fires" docstring + models.py comment: the
  guarantee is CONDITIONAL on the ②c ramp control disabling Beat; pre-ramp,
  safety rests on pg_owned defaulting to False.
- [Med] _build_trigger_payload -> TaskPayload; workflow_id/pipeline_id typed
  str | uuid.UUID (| None); _DueSchedule NamedTuple binds SELECT columns to
  names at one site (no silent misassign on a reorder).
- [Med] extracted INSERT_MESSAGE_SQL constant in client.py; send() and the
  scheduler share it (no verbatim SQL duplication).
- [Low] comment fixes: reaper tick (ordering not isolation), execute_pipeline_task
  blanks vs Beat populating execution_action, models.py drop "next slice".
- tests: fired == 1 (not >= 1, catches double-fire); next_run asserted at the
  cron's 09:00 match; baseline asserts == 0; +tz-aware next-run; +multi-row
  (fired == 2); +atomicity (advance UPDATE fails post-INSERT → enqueue rolls
  back, next_run unchanged); +reaper scheduler-error-discards-owned-conn. 75 green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3596 chore: drop accidentally-committed 9f-design.md (untracked scoping doc)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3596 address greptile P1: roll back if the bad-cron disable UPDATE fails

_quiesce_invalid_cron used contextlib.suppress around the cursor block, so if
the enabled=FALSE UPDATE raised, commit() was skipped and the connection was
left in an aborted-transaction state — poisoning the NEXT row's INSERT (caught
by the outer handler and mislogged as "failed to fire"). Wrap in try/except with
conn.rollback() on failure so the connection is always clean for the next row.
+test: a forced disable-UPDATE failure on a bad-cron row doesn't stop a
following healthy row from firing.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3596 chore: remove superseded 9f-design.md (impl merged in #2073)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…fill (Beat→PG hand-over) (#2085)

* UN-3597 [FEAT] PG Queue 9h-c — schedule ownership ramp control + backfill (Beat→PG hand-over)

The ramp control that hands schedules from Celery Beat to the PG scheduler,
per-schedule and reversibly. Inert by default (Flipt at 0% / gate off → every
schedule stays on Beat), so deploying it changes nothing until ops ramps.

- scheduler/ownership.py: resolve_schedule_owner (master-gate PG_QUEUE_TRANSPORT_
  ENABLED → pg_scheduler_enabled Flipt flag, keyed on pipeline_id; fail-closed to
  Beat) + reconcile_ownership_for (ONE transaction: set pg_owned AND
  PeriodicTask.enabled = active AND NOT pg_owned). Doing both atomically is what
  makes "never double-fires" real — a pg_owned schedule always has its Beat
  PeriodicTask disabled. Separate Flipt flag from pg_queue_execution_enabled so
  scheduling and execution ramp independently. No new env.
- scheduler/helper.py: reconcile ownership after the mirror upsert on every
  schedule create/update (best-effort; a no-op while the rollout is off).
- pg_queue management command reconcile_pg_schedules: backfills mirror rows for
  pre-existing Beat schedules + reconciles all ownership against the current
  rollout (run once + after each Flipt ramp; idempotent, --dry-run).
- tests: 10 ownership (fail-closed matrix + the disable-Beat/pause invariants) +
  the helper-wiring assertion (21 backend tests green).

Dev-tested live on the real DB: hand-over (pg_owned + Beat disabled), rollback,
pause; backfill of real unmirrored schedules; and the cross-slice link — a
reconciled-to-PG schedule is fired by the ②b reaper. Full container e2e
(pg-scheduler consumer → real execution via the live Flipt flag) folded into the
ops ramp, alongside the execution canary.

Out of scope: stopping the celery-beat container / removing django_celery_beat
(ops, once ramped to 100%; final decommission).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3597 address review: fix resume double-fire, guard command, atomic-rollback proof, ramp robustness

- [HIGH] resume double-fire: enable_task (resume path, separate from reconcile)
  unconditionally re-enabled Beat — on a pg_owned schedule that meant Beat AND PG
  both firing. Now sets PeriodicTask.enabled = NOT pg_owned (reads the mirror).
- [CRITICAL] reconcile_pg_schedules: per-row json.loads guard (+ isinstance list)
  so one malformed PeriodicTask.args row logs+skips instead of aborting the whole
  command (and starving the reconcile step).
- [HIGH] command failure reporting: reconcile_ownership_for returns None on a
  swallowed DB failure; the command tallies failed, writes an ERROR summary, and
  raises CommandError so automation/ops notice (was: counted failures as success).
- [MED] rollback (PG→Beat) now also clears next_run_at, so a later re-hand-over
  re-enters the NULL baseline instead of bursting on a stale timestamp.
- [MED] reconcile_ownership_for: active is keyword-only (boolean-trap guard).
- [MED] docstring stale name reconcile_schedule_ownership -> reconcile_ownership_for.
- [LOW] flag_key passed by keyword; Flipt-failure log downgraded to
  warning(exc_info=True) (expected, runs every edit); single f-string; dry-run
  now previews would-be pg_owned via resolve_schedule_owner (read-only).
- tests: enable_task resume (both pg_owned states), next_run_at reset on rollback,
  None-on-failure, the full command (backfill skip / malformed-args / non-list /
  dry-run preview / CommandError), and a real-DB atomicity test (force the
  PeriodicTask update to fail → pg_owned rolls back). 29 backend tests green.

Deferred [LOW]: bool-vs-enum return (pg_owned maps directly to the BooleanField;
not a wire contract like WorkflowTransport) — see reply.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3597 address SonarCloud: reduce reconcile_pg_schedules complexity + drop redundant except

- Cognitive Complexity 29→<15: extracted handle() into _backfill_mirrors,
  _parse_task_args, and _reconcile_all; handle() is now a thin orchestrator.
- Dropped the redundant `json.JSONDecodeError` from the except tuple — it's a
  ValueError subclass, so `except ValueError` already covers both the parse
  error and the non-array guard.

Behaviour unchanged; 5 command tests + the rest green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3597 address SonarCloud: reduce _backfill_mirrors complexity 17→<15

Move the per-arg extraction (the three positional ternaries) out of the backfill
loop into _mirror_fields_from_args, which returns the mirror kwargs (or None on a
bad row). _backfill_mirrors now just loops + splats the fields, dropping the
nested ternaries that drove the cognitive complexity. Behaviour unchanged;
command tests green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3597 address greptile: race-safe resume, accurate ramp accounting, updated_at, N+1

- [P1] enable_task lost-update race: task.save() could clobber a concurrent
  reconcile_ownership_for that flipped pg_owned=True + PeriodicTask.enabled=False
  → both Beat and PG firing. Now reads pg_owned under select_for_update inside a
  transaction and writes only the enabled column via .update() (no stale full-row
  save). Preserves DoesNotExist on a bad name.
- [P2] reconcile_ownership_for: when there's no mirror row (updated==0) return
  False, not the raw resolved pg_owned — PG can't fire without a row, so the
  effective owner is Beat; returning True would inflate the ramp pg_owned count.
- [P2] reconcile_ownership_for now bumps updated_at in the .update() (queryset
  .update() doesn't fire auto_now), so an ownership flip advances the timestamp.
- [P2] reconcile_pg_schedules._backfill_mirrors N+1: pre-fetch all mirrored
  pipeline ids in one query into a set instead of an EXISTS per PeriodicTask.
- tests updated for all four (incl. select_for_update mocking, no-row→False,
  values_list prefetch). 29 backend tests green; handover + updated_at bump
  re-verified live.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3597 address greptile: add missing import pytest to ownership test

TestReconcileAtomicityRealDB calls pytest.skip() when the DB is unavailable, but
pytest was never imported — so a DB gap would raise NameError instead of skipping
(latent; local runs had a DB so the line was never hit). Add the import.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…the PG path (#2086)

* UN-3602 [FIX] PG Queue — restore API-deployment timeout sync-wait on the PG path

On the PG transport, async_execute_bin dispatch returns the bigint
pg_queue_message msg_id as the handle. Writing it into WorkflowExecution.task_id
(a UUIDField) raised ValueError on save, which the broad post-dispatch handler
swallowed and returned EXECUTING — silently skipping the synchronous timeout
poll loop. Every PG-routed API deployment ignored `timeout` and returned
immediately.

- add nullable BigIntegerField queue_message_id to WorkflowExecution (migration
  0020; metadata-only AddField, no table rewrite, safe on large tables)
- store the PG msg_id in queue_message_id; task_id stays NULL on the PG path
- new WorkflowHelper._record_dispatch_handle routes the handle by transport,
  called inside its own try/except so post-dispatch bookkeeping can never skip
  the timeout wait again (the structural root cause)
- Celery path unchanged

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3602 [FIX] address review — defensive PG handle parse + regression test

Addresses PR #2086 review (toolkit + greptile):
- _record_dispatch_handle: @classmethod -> @staticmethod (never uses cls)
- parse the PG msg_id defensively (try/except TypeError/ValueError) so a
  malformed/future handle format logs a specific cause instead of being
  absorbed by the caller's generic post-dispatch guard
- update_execution_queue_message_id annotation int -> int | None (matches the
  None guard in the body; mirrors update_execution_task)
- add the missing regression test (test_execute_workflow_async_wait.py): a raise
  during handle recording must NOT skip the timeout poll loop
- add a non-numeric-PG-handle test; scope the routing-test docstring
- comment precision: the ValueError was raised inside update_execution_task on
  save (UUID coercion), not at the call site

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…nify gating to a single flag (#2094)

* UN-3603 [GATED-FEAT] PG Queue — blocking executor RPC on Postgres + unify gating to a single flag

A parallel PG-native executor RPC for prompt-studio's blocking dispatch, gated by
the single pg_queue_enabled flag. The SDK ExecutionDispatcher and the Celery
executor worker are left completely untouched.

- pg_task_result store (migration 0010) + PgResultBackend (store / poll-wait;
  idempotent ON CONFLICT; poll-based, PgBouncer-safe — no LISTEN/NOTIFY)
- reply_key on the shared TaskPayload contract + backend producer
- consumer result-write hook: request-reply messages store result/error + ack
  after one attempt; fire-and-forget path unchanged (guarded by reply_key)
- backend executor_rpc: resolve_executor_transport gate + PgExecutionDispatcher
  (enqueue + poll, mirrors the SDK dispatch contract) + RoutingExecutionDispatcher;
  _get_dispatcher returns the routing dispatcher so all call sites stay unchanged
- worker-pg-executor role (run-worker.sh + docker-compose; broker-free)
- unify PG-queue gating to a SINGLE flag pg_queue_enabled: execution, scheduler,
  and executor all read one key (renamed from pg_queue_execution_enabled /
  pg_scheduler_enabled / pg_executor_enabled). One flip gates the whole feature;
  the PG_QUEUE_TRANSPORT_ENABLED env stays the master kill-switch.

Gated off by default. Dev-tested live: on->PG / off->Celery cycle (COMPLETED both
ways) for API + ETL; executor RPC round-trip through worker-pg-executor.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3603 [GATED-FEAT] address SonarCloud — unused param, logging.exception, no-NULL error

- PgExecutionDispatcher.dispatch: drop the unused `headers` param (PG carries
  fairness in the enqueue payload, not Celery headers; the routing dispatcher no
  longer forwards headers to the PG path)
- enqueue failure handler: logger.error(exc_info=True) -> logger.exception
- PgTaskResult.error: TextField(null=True) -> TextField(blank=True, default="")
  (no-NULL-text convention); store_result writes "" on completed; migration 0010
  regenerated; result-backend test updated

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3603 [GATED-FEAT] address review — request-reply robustness, single-flag constant, tests, doc accuracy

Toolkit review (UN-3603):
- Critical: guard the success-path result store in the consumer — a store failure
  now logs + acks (avoids re-running the executor = LLM spend) instead of
  vt-redelivering; pre-execution drop branches (malformed/poison/unknown) and the
  task-raised branch store a definitive failure reply via a new guarded
  _fail_reply so the caller fails fast instead of blocking to its timeout
- Med: shared PgTaskStatus StrEnum in unstract.core (writer + reader agree across
  the process boundary); guard ExecutionResult.from_dict so a malformed completed
  row can't break the never-raises contract; log timeout/failure branches
- High: release the DB connection between dispatch polls (close_old_connections)
  + document dispatch must not run inside a transaction
- Med: single source of truth for the flag key (pg_queue/flags.py PG_QUEUE_FLAG_KEY,
  imported by transport/ownership/executor_rpc; SCHEDULER_FLAG_KEY folded in)
- Med: result-backend logs the swallowed rollback failure
- doc/accuracy: soften the not-yet-wired retention-sweep claims; fix stale
  pg_executor_enabled -> pg_queue_enabled comments; tighten reply_key to NotRequired[str]
- tests: PgExecutionDispatcher.dispatch (8 branches, DB-free) + consumer reply_key
  store/ack + drop-branch + store-failure-still-acks cases

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3603 [GATED-FEAT] address SonarCloud — logging.exception + drop comment-as-code

- consumer.py: the two new request-reply error handlers use logger.exception()
  instead of logger.error(..., exc_info=True)
- test_executor_rpc.py: reword the `# timeout=None` inline comment (Sonar read it
  as commented-out code)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3603 [GATED-FEAT] address greptile — executor consumer VT/health tuning + guard env timeout parse

greptile's load-bearing finding: worker-pg-executor inherited the consumer's
30s/60s vt / health-stale defaults, so an LLM task exceeding 30s would be
re-claimed mid-run once the gate ramps — double execution + token double-spend,
two runs racing the same reply_key.
- docker-compose: set WORKER_PG_QUEUE_CONSUMER_VT_SECONDS=3660 and
  HEALTH_STALE_SECONDS=3720 on worker-pg-executor (above the executor's hard
  EXECUTOR_TASK_TIME_LIMIT=3600), both overridable via env
- executor_rpc.dispatch: guard the EXECUTOR_RESULT_TIMEOUT int parse so a
  misconfigured value can't raise out of dispatch() (never-raises) + test

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…gres (+ register executors in PG consumer) (#2095)

* UN-3605 [GATED-FEAT] PG Queue — structure_tool executor RPC over Postgres (+ register executors in PG consumer)

Routes the in-workflow structure_tool executor dispatch onto the Postgres
executor RPC when the single pg_queue_enabled flag is on; Celery otherwise
(zero-regression by construction). Next slice after UN-3603 (prompt-studio
blocking path). Gated off by default.

- workers queue_backend/pg_queue/executor_rpc.py (new): the workers twin of
  the backend module on worker primitives — resolve_executor_transport
  (master PG_QUEUE_TRANSPORT_ENABLED env, then the single pg_queue_enabled
  Flipt flag, fail-closed to Celery); PgExecutionDispatcher (enqueue
  execute_extraction with a unique reply_key via PgQueueClient, poll
  PgResultBackend; never raises — timeout/failure -> ExecutionResult.failure);
  RoutingExecutionDispatcher (per-call gate routing; async/callback stay on
  Celery); get_executor_dispatcher factory.
- to_payload gains an optional reply_key (request-reply marker; fire-and-forget
  rows stay byte-identical).
- structure_tool_task swaps its dispatcher factory to get_executor_dispatcher;
  the 3 blocking dispatch call sites are unchanged.
- Fix (latent gap exposed by this slice): the PG executor consumer registered
  no executors ("No executor registered with name 'legacy'"). The
  @ExecutorRegistry.register side-effect import lived only in the Celery
  executor/worker.py, but the PG consumer bootstraps via executor/tasks.py.
  Import executor.executors from executor/tasks.py so any worker registering
  execute_extraction (Celery or PG) has the executors — also fixes the
  prompt-studio PG path from UN-3603.
- sample.env: document the worker-side PG_QUEUE_TRANSPORT_ENABLED gate; fix a
  stale pg_queue_execution_enabled -> pg_queue_enabled doc reference.
- Tests: workers executor_rpc unit suite (gate fail-closed matrix,
  zero-regression routing, never-raises dispatch); subprocess regression that
  importing executor.tasks alone registers the executors.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3605 [GATED-FEAT] Address PR review (toolkit + greptile)

Hardening + test-coverage from the review:
- dispatch: row.get("status") (never-raises must not depend on producer keys);
  accept-and-ignore headers param for substitutability with the SDK/Routing
  dispatch shapes; log the EXECUTOR_RESULT_TIMEOUT parse fallback instead of
  swallowing it; surface the parse cause in the malformed-result error.
- timeout branch: document the orphaned-task / retry-double-run risk (greptile);
  de-dup belongs at the file-execution layer (at-least-once + caller-timeout).
- _wait_for_result: note the connection pin is bounded by file_processing
  prefork concurrency (vs the backend twin's close_old_connections per poll).
- executor/tasks.py: reword the registration-import comment (it's executor/
  worker.py, the Celery entrypoint, that the PG consumer bootstrap skips).
- tests: org-less resolve bucketing (entity_id=run_id, no org in context);
  real _enqueue wiring (queue/org_id/payload via PgQueueClient.send); routing
  arg passthrough (gate-off forwards timeout+headers; gate-on drops headers);
  to_payload reply_key set/omitted; structure_tool factory-swap call site.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…tor RPCs run in parallel (#2096)

* UN-3606 [FIX] PG Queue — prefork the consumer so file batches + executor RPCs run in parallel

The PG file-processing path ran files SERIALLY: worker-pg-fileproc was a single
batch=1 consumer, so multi-file ETL batches drained one at a time (Celery runs
them in parallel via prefork --concurrency). Same single-consumer bottleneck on
worker-pg-executor serialized the executor RPCs (the LLM extraction), so even
parallel file fan-out gained nothing end-to-end.

Fix: a prefork supervisor for the PG-queue consumer launcher — the PG analogue of
Celery --pool=prefork --concurrency=N.

- supervisor.py (new): when WORKER_PG_QUEUE_CONSUMER_CONCURRENCY > 1, fork N
  isolated consumer children (each does its own worker bootstrap, so no
  connections are inherited across the fork), monitor + re-fork dead children, and
  own a single fleet-liveness endpoint (503 when the oldest child stalls past the
  threshold). SKIP LOCKED distributes batches across children AND replicas; a
  single execution is still capped by MAX_PARALLEL_FILE_BATCHES; total live
  parallelism = concurrency x replicas (k8s HPA scales replicas).
- The consumer code is UNCHANGED — each child is the current single-threaded
  PgQueueConsumer.run(); concurrency is purely a launch concern. CONCURRENCY=1
  (default) keeps the byte-identical single-process path, so every other PG
  consumer is non-regressive.
- consumer.py: extract build_consumer_from_env()/consumer_env() so the children
  and main() build identical consumers.
- docker-compose: CONCURRENCY=4 on worker-pg-fileproc AND worker-pg-executor
  (mirrors WORKER_FILE_PROCESSING_CONCURRENCY). Also raise their VT_SECONDS to
  3660 — process_file_batch/execute_extraction block on the executor up to 3600s,
  so the prior vt=30 default would re-claim a long batch mid-run (latent
  double-run); health-stale sits just above vt.
- Tests: env-knob parse/clamp/validation, fleet-staleness calc, reap/restart
  matrix (dead -> re-fork, live -> left, shutdown -> not resurrected).

Process model matches Celery prefork (the cloud-trusted choice): full process
isolation, a crash is contained + re-forked, its in-flight message redelivers via
vt (at-least-once). Dev-tested end-to-end: 4 fileproc + 4 executor children, a
2-file ETL runs fully parallel (both files + both executor RPCs overlap on
separate children; ~25s vs ~42s serial), child crash auto-restarts, health green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3606 [FIX] Address PR review (toolkit + greptile) — supervisor hardening

Critical/High:
- Crash-loop no longer hides a wedged fleet from the probe: re-fork does NOT
  reseed the heartbeat (a never-polling slot ages), and a slot that dies
  immediately N times in a row forces freshness=inf → 503 (k8s restarts the pod).
- os.fork() wrapped: initial-fleet failure fails fast with an actionable message;
  reap-path failure logs + leaves the slot for the next tick (no uncaught crash /
  no SIGTERM storm against healthy children).
- Heartbeat publish loop guarded (try/except + log) so a transient error can't
  silently kill the thread and false-stale a healthy child.
- Children reset SIGTERM/SIGINT to SIG_DFL immediately after fork — a signal in
  the fork→run window no longer fires the parent's _on_term against stale pids.

Medium:
- _Fleet class owns pid/last_fork/heartbeat/crash-count/restart-schedule with a
  validated slot and consistent reap (replaces three loose slot-keyed dicts).
- Re-fork backoff is a scheduled not-before (non-blocking), re-checking stopping
  before each fork — no in-loop sleep serialising recovery, no child spawned into
  shutdown.
- Liveness bind: EADDRINUSE logged at error (vs transient), and
  liveness_probe_bound surfaced in the status payload.
- _join_children: per-child grace budget (one slow child can't starve the rest);
  _wait_for_exit helper extracted.

Low:
- real Callable annotations (drop noqas); HEALTH_PORT typed int|None; comment
  fixes (heartbeat writer reasoning, "JSON body differs" for the fleet probe);
  WORKER_TYPE selection deduped into pg_queue_consumer/_bootstrap.py.

Tests rewritten + expanded: _Fleet bookkeeping/crash-loop/freshness, reap
scheduling, restart-due gating, fork OSError + child hard-exit guard, _wait_for_exit
+ _join_children SIGKILL escalation, health no-port/bind-error. 32 tests.
Re-validated live: 4 children, fleet health (crash_looping/liveness_probe_bound
fields), crash → re-fork.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3606 [FIX] Address SonarCloud — reliability + maintainability nits

- test: avoid float equality (math.isinf for the crash-loop freshness; drop the
  0.0 literal) — clears the only New-Code reliability bug (Rating C → A).
- supervisor: clean the `# noqa: ANN201` suppression syntax (explanation moved to
  the docstring); narrow the child-failure catch BaseException → Exception (the
  realistic startup-failure surface; SystemExit/KeyboardInterrupt would exit
  anyway); logger.error(exc_info=True) → logger.exception() on the os.fork() guard.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3606 [FIX] Address greptile — snapshot crash dict in is_crash_looping

is_crash_looping() runs in the LivenessServer daemon thread (via freshness())
while the main thread mutates _consecutive_crashes in schedule_restart() — a bare
.values() iteration could raise "dictionary changed size during iteration".
Snapshot with tuple(...) (atomic under the GIL) before iterating.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
muhammad-ali-e and others added 8 commits June 21, 2026 13:01
…-chained continuations) (#2097)

* UN-3608 [FEAT] PG Queue 9i — executor async/callback path on PG (self-chained continuations)

Migrate the executor RPC's async/callback path off Celery onto the PG queue,
completing the executor-transport migration (the blocking path landed in 9h-c).

- dispatch_async / dispatch_with_callback now route PG-vs-Celery per call via the
  single pg_queue_enabled flag (was always Celery). Backend + workers.
- §5 fire-and-forget self-chaining: on_success/on_error Celery Signatures are
  translated to serialisable ContinuationSpecs carried in the payload; after the
  executor runs execute_extraction the consumer self-chains the matching
  continuation onto the callback queue (result prepended on success, dispatch
  task_id on error), acking regardless to avoid an LLM double-spend.
- New gated worker-pg-ide-callback consumer drains the ide_callback queue
  (Prompt Studio run/index/extract, lookups). Compose profile pg-queue.
- ContinuationSpec + on_success/on_error/task_id added to the shared TaskPayload
  wire contract (unstract.core).

Zero-regression: every new path is gated and fails closed to Celery; gate OFF is
byte-identical to the prior Celery behaviour. Call sites unchanged — they keep
passing Celery Signatures; the dispatcher translates only on the PG branch.

Tests: payload set/unset, signature->spec translation, consumer self-chain
success/error + enqueue-failure guard, routing gate (backend + workers).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3608 [REFACTOR] lift transport-agnostic dispatch helpers to unstract.core

Cuts the new-code duplication SonarCloud flagged on the backend/workers
executor_rpc.py mirror by moving the two genuinely transport-agnostic pieces
of the async/callback path into unstract.core (alongside ContinuationSpec):

- DispatchHandle (the .id-only AsyncResult duck-type)
- signature_to_continuation (Celery Signature -> ContinuationSpec)

Both mirrors now import them instead of redefining; the duplicated
TestSignatureToSpec is removed from both test suites in favour of one shared
test. The dispatch_async/dispatch_with_callback method bodies stay mirrored
(they genuinely differ by transport — Django ORM vs psycopg2); retiring that
residue is the separate shared-package follow-up (UN-3607).

No behaviour change: pure code movement, gate and contract unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3608 [FIX] address review — on_error on drop branches, error text, json-safety, guards

Toolkit review on #2097:
- Critical: chain on_error on the early-drop branches too (malformed / poison /
  unknown task), via a shared _fail_dispatch — a dispatch_with_callback failure
  always reaches its on_error callback, not only when the task body raised.
- Carry the real error text to on_error via callback_kwargs['error'] (PG has no
  Celery AsyncResult for the callback to recover it from); error callbacks prefer it.
- JSON-coerce the self-chained prepend (UUID/datetime in an executor result no
  longer makes client.send's plain json.dumps raise and silently drop the callback).
- Enforce the reply_key XOR on_success/on_error invariant in to_payload + enqueue_task.
- _json_safe the continuation specs in the backend producer.
- signature_to_continuation: guard task name + reject positional-arg callbacks.
- Restore ContinuationSpec/TaskPayload types at the consumer boundary; drop the
  over-broad except in _continuation_org; document the returned-failure->on_success
  parity; reword the ContinuationSpec doc and the compose "Dark" comment.

Tests for every fix. Backend 35, workers 76 green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3608 [FIX] review round 2 — on_success-only redelivery, success-enqueue fallback, ide-callback vt

- Correctness: broaden the failure guard to `reply_key or on_success or on_error`
  so an on_success-only callback dispatch that raises ACKs instead of falling
  through to vt-redelivery (which would re-run the executor / re-spend LLM tokens).
- Silent failure: _chain_continuation now returns whether it enqueued; on a
  success-path enqueue failure, fall back to on_error so the HTTP-202 caller gets
  a terminal event instead of hanging. Failure log carries run_id/task_id/org.
- ide_callback _get_task_error: `if explicit is not None` (don't discard an
  explicit "" error) — greptile.
- worker-pg-ide-callback: set VT_SECONDS=120 / HEALTH_STALE=180 (overridable) so a
  slow internal-API write can't be re-claimed mid-run and double-emit — greptile.

Tests: on_success-only-raise acks (no redelivery); success-enqueue-failure falls
back to on_error. Workers self-chain + ide_callback green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…_batch_dedup orphans) (#2101)

* UN-3610 [FEAT] PG Queue — reaper retention sweep (pg_task_result + pg_batch_dedup orphans)

Wire the deferred retention sweep into the PG reaper, before the gate ramps:

- sweep_expired_results(conn) — DELETE FROM pg_task_result WHERE expires_at <= now().
  Every result row already carries expires_at = now()+3600s (written by the
  consumer's store_result) and the expires index exists; this is the only reader
  that keeps the table from growing unbounded with each executor RPC.
- sweep_orphan_dedup(conn, retention) — DELETE FROM pg_batch_dedup WHERE
  created_at <= now() - interval. Backstop for partial-failure executions whose
  per-batch markers neither clear on teardown nor get reclaimed by barrier
  recovery. Retention (default 24h) must exceed the longest execution.
- Both run in PgReaper.tick() under leadership, after recovery + schedules,
  cadence-gated by WORKER_PG_REAPER_SWEEP_SECONDS (default 300s) so the ~5s loop
  doesn't DELETE every cycle; own try/except discards the owned conn on error.
- Env knobs surfaced on the compose worker-pg-reaper service.

Workers-only: no migration, no writer change, no flag. Idempotent (DELETE WHERE),
dark until rows exist. Tests: env parsers, SQL contract, leader-only / standby /
cadence-gated / after-recovery / error-discards-conn (27 green). Dev-tested live:
the reaper deleted 9 expired pg_task_result + 1 orphaned pg_batch_dedup row.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3610 [FIX] address review — independent sweeps, actionable failures, rollback signal

Toolkit review on #2101:
- [High] Partial-sweep starvation: the two sweeps now run INDEPENDENTLY via
  _run_sweep — a persistent fault in one no longer skips (and then cadence-gates
  out) the other.
- [High] Sweep failures not actionable: _run_sweep logs each failure at the
  boundary with the table name + a consecutive-failure streak (distinct from the
  generic tick-failure log) and does NOT propagate (cleanup must not fail the tick).
- [Med] Surface a rollback that itself fails (don't suppress) via
  _rollback_after_sweep_failure, while still re-raising the original DELETE error.
- [Med] Reword the dedup-retention comment as operator-enforced (not code-coupled).
- [Low] Soften the result-retention coupling claim (holds by default); note the
  pg_batch_dedup.created_at seq-scan; de-ambiguate the compose comment.
- [type-design] _last_sweep_monotonic is now float | None ("never"), not a 0.0
  sentinel.

Tests (+8, 35 green): float-vs-int cast distinction, injected-knob constructor
guards, sweep_orphan_dedup rollback (parametrized) + rollback-itself-fails log,
one-sweep-failing-doesn't-starve-the-other, failing-sweep-still-advances-cadence,
log-emission boundary (counts only when rows deleted / silent otherwise).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3610 [FIX] greptile — accurate env-parse error + sweep-retry log wording

- _positive_duration_from_env: include the cause ("cannot be parsed: <exc>")
  instead of "is not a number" — an int knob given "1.5" IS a number, just not an
  integer, so the old message misled. Test asserts the message.
- _run_sweep failure log: "will retry after the next sweep interval" (not "next
  cycle") — the retry is gated by _sweep_interval (~300s), not the ~5s tick.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
… + injected transport (#2102)

* UN-3607 [REFACTOR] retire the executor_rpc mirror — shared dispatcher + injected transport

The backend and workers carried byte-for-byte mirrors of the executor-RPC dispatch
(gate + reply_key/timeout orchestration + routing); the only real difference is the
transport primitive (Django ORM vs psycopg2). Lift the shared logic into
unstract.workflow_execution.executor_rpc (which both already depend on; it can't live
in unstract.core because sdk1 imports core → circular) and inject the differing
primitive via a QueueTransport Protocol — composition, not inheritance.

- shared: PgExecutionDispatcher (concrete dispatch/async/with_callback + never-raises)
  calling transport.enqueue / wait_for_result; ExecResultRow (normalised result row);
  resolve_pg_transport (master-gate value supplied by caller, then Flipt);
  RoutingExecutionDispatcher (per-call PG-vs-Celery, injected celery/pg/resolve).
- backend adapter: DjangoQueueTransport (enqueue_task + PgTaskResult poll) + settings
  master-gate + factory.
- workers adapter: PgClientQueueTransport (PgQueueClient + PgResultBackend) + env
  master-gate + factory.

~600 duplicated lines collapse to one base + two ~40-line adapters → clears the
SonarCloud duplication gate permanently. Behaviour is byte-identical (gate, routing,
never-raises): zero regression. The dispatch contract + routing + gate matrix are
tested ONCE against a fake transport (workers suite); each side's tests shrink to its
adapter + factory wiring. Both call sites use the unchanged get_executor_dispatcher.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* [FIX] vite dev proxy — forward websocket upgrades to the backend (local dev only)

The Socket.IO log/result channel connects to /api/v1/socket with a websocket-only
transport, but the Vite dev proxy only forwarded /api HTTP (no `ws: true`), so the
upgrade never reached the backend and Prompt Studio results never streamed to the
UI in local dev. ws-proxying was dropped in the CRA→Vite migration (the stale
setupProxy.js comment is the leftover).

Dev-server only: `server.proxy` runs solely under `vite dev`; staging/prod serve a
built bundle behind nginx/Traefik (which already routes /api/v1/socket), so this has
no effect on any deployed environment. Rides the UN-3607 PR.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3607 [FIX] drop the unused headers param from the PG dispatcher (SonarCloud S1172)

The PG path carries org/routing in the enqueue payload, not Celery headers, and the
RoutingExecutionDispatcher strips fairness headers before delegating to the PG
dispatcher — so headers was accepted-but-ignored on dispatch/dispatch_async/
dispatch_with_callback. SonarCloud flagged it as unused (L269/L297). Removed from all
three for consistency; the RoutingExecutionDispatcher (the SDK-substitutable boundary)
keeps headers on its public methods and forwards them only to Celery. Backend 7 +
workers 36 green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3607 [FIX] address review — Protocol conformance, type the seam, de-dup the poll loop

Toolkit review on #2102 (all 14):
- Both transports now explicitly inherit QueueTransport so a type-checker verifies
  each implementation against the seam (not just at the construction site).
- CallbackSignature Protocol in unstract.core types signature_to_continuation +
  dispatch_with_callback params (was Any); _CeleryDispatcher Protocol types the
  router's celery dependency (was Any).
- Extracted the backend poll loop into a shared poll_for_row(fetch, timeout,
  between_polls=...) — the one duplicated logic that survived.
- dispatch gets a docstring; the fire-and-forget enqueues log-before-raise (+ a note
  that a raised enqueue IS the failure signal since on_error can't fire); a COMPLETED
  row with no result is a distinct error; reworded the "strips headers" comment.
- Workers master-gate warns on a non-true/false value (was a silent no-op).
- Lint (I001/D209); tests: on_error translation + default task_id, dispatch_async
  propagates, backend multi-iteration poll. Backend 8 + workers 38 green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3607 [FIX] greptile — unify the PG poll loop (move poll_for_row to unstract.core)

greptile P2: poll_for_row was backend-only — the workers PgResultBackend.wait_for_result
still had its own identical backoff loop, so the constants lived in two places. Since
poll_for_row is pure (no Django/psycopg/SDK deps), moved it to unstract.core.polling
where BOTH PG result pollers import it: the backend DjangoQueueTransport and the workers
PgResultBackend. The backoff now lives in exactly one place to tune.

- new unstract.core.polling.poll_for_row (the shared backoff skeleton)
- workflow_execution.executor_rpc: dropped poll_for_row (+ its time/TypeVar imports)
- backend adapter + PgResultBackend.wait_for_result both delegate to it
- behaviour-identical: backend 8 + workers 38 + result_backend 7 (real-PG) green

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
Dev-phase validation tooling (not product code; no PR ceremony) to prove
PG >= Celery before any Celery decommission. Self-contained top-level package,
no imports into backend/workers — non-regressive by construction.

S1 (measurement spine): `report` + `queue-depth` read the backend DB directly
and compare executions per transport. Transport classified post-hoc from the
persistent queue_message_id/task_id columns; headline signal is server-measured
execution_time + per-file parallelism (sum(file_times)/execution_time).

S2 (load generation): `run` drives N executions at concurrency C against a
running stack, polls each to terminal, and reports wall-clock / server / overhead
/ http latency per transport plus throughput. Transport observed per-execution so
PG and Celery batches bucket correctly even mid-rollout.

26 unit tests (pure stats + classification + probe timing with injected
clock/faked I/O). S1 dev-tested against the live dev DB.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…tra headers/form fields

Validated against a live API deployment: the execute response nests the
execution_id inside message.status_api as a ?execution_id= query param (not a
top-level field). Fix _extract_execution_id to handle it (+ status_api/message
wrappers), and add --header KEY:VALUE / --form KEY=VALUE so the harness can send
the subscription headers + tags/timeout fields a real deployment expects.

One real execution confirmed end-to-end: PG transport, COMPLETED, 24.08s server
time / 19.9s file — measurement path verified. test_trigger.py pins the response
shape + the header/form parsing.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…esting

Instant OpenAI-compatible mock LLM+embedding responder (stdlib http.server,
threaded, dependency-free). Point an OpenAI-compatible adapter's api_base at it
+ pair with Unstract's built-in noOpX2text / noOpVectorDb adapters → a full
execution runs the whole queue -> fan-out -> executor path at $0, so the
transport (not the LLM) is the measured variable.

  python -m pg_benchmark.mock_server --port 8901

Serves /v1/chat/completions, /v1/embeddings (batch), /v1/models, /health;
--content / --embedding-dim / --latency-ms knobs. 6 tests over real HTTP on an
ephemeral port. README documents the zero-cost load-test setup.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… + float-eq + http)

- mock_server: respond with a fixed MOCK_MODEL instead of echoing the request's
  model field — do not reflect untrusted client input into the response body.
- tests: route float comparisons through a math.isclose _close() helper instead of
  ==/pytest.approx (S1244: no equality checks on floats).
- tests: https dummy URLs in fixtures (S5332 clear-text hotspot).

Dev tooling; 40 tests green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…h the transport flag (#2106)

* UN-3616 [FEAT] PG Queue — route API-triggered pipeline trigger through the transport flag

The API-triggered pipeline view dispatched scheduler.tasks.execute_pipeline_task
on Celery unconditionally — so when the flag was on, the trigger stayed on Celery
while the execution it spawns rode PG, breaking uniform flag control. The
SCHEDULED path already routes this task to PG (pg_scheduler.dispatch_due_schedules
→ scheduler queue, run by worker-pg-scheduler); only the API-trigger view bypassed
it.

Route the trigger through resolve_transport (same flag as everything else) via a
new dispatch_pipeline_trigger helper: PG → enqueue to the scheduler queue via
pg_queue.producer.enqueue_task with the exact arg shape pg_scheduler sends; else
Celery send_task (unchanged). Fail-closed — master gate off (prod default) →
Celery → byte-identical to before, zero regression.

Routing logic lives in the helper, not the view. 4 unit tests (PG/Celery routing,
pipeline-id entity bucketing, identical args on both paths). Dev-tested against
the live flag: gate ON → PG scheduler queue, gate OFF → Celery.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3616 [REVIEW] address PR feedback on the pipeline-trigger dispatcher

- Compare the WorkflowTransport enum member instead of its .value string, so a
  literal typo can't silently fall through to the Celery branch.
- Coerce org_id to str at args[1] for consistency with the str(pipeline_id) /
  org_id kwarg coercion.
- Add tests: PG enqueue failure propagates with NO Celery fallback (no
  double-dispatch); UUID pipeline_id is str-coerced in args while resolve_transport
  receives the raw UUID entity.
- Docstring: lead with current behavior + correct the 'byte-identical args' claim
  (PG JSON-normalizes via enqueue_task). Trim the duplicated view comment.

6 unit tests green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3616 [REVIEW] address 2 SonarCloud findings on the dispatcher

- S3516 (Blocker): dispatch_pipeline_trigger returned the same 'transport' var on
  both branches (invariant return the view ignored). Make it a None-returning
  side-effect (if/else, no return); tests assert on the dispatch (enqueue vs
  send_task) instead of the return.
- Typing (High): widen org_id / pipeline_id to 'str | UUID' — what resolve_transport
  accepts and what the UUID test passes; they're str-coerced into the task args.

6 unit tests green; ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3616 [REVIEW] use shared is_pg_transport() helper (greptile)

Replace the hand-rolled 'transport == WorkflowTransport.PG_QUEUE' check with
unstract.core.data_models.is_pg_transport() — the single source for 'what counts
as PG transport' — so this dispatcher doesn't open a second comparison site,
consistent with workers/scheduler/tasks.py.

6 unit tests green; ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: dab2ee79-e39e-40a6-9a70-40135f189798

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/UN-3445-pg-queue-integration

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

muhammad-ali-e and others added 7 commits June 24, 2026 04:22
…9 transient-connect retry) (#2109)

* UN-3618 [REFACTOR] PG Queue — single-flag gating (drop PG_QUEUE_TRANSPORT_ENABLED)

The pg_queue_enabled Flipt flag becomes the sole gate across all four
resolvers (execution, scheduler, executor backend+workers). Removes the
redundant env master-switch and the master_gate_enabled parameter from the
shared resolve_pg_transport. Fully fail-closed (flag off / Flipt blind /
Flipt error / no org -> Celery); verified by unit + real-stack dev-test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3619 [FEAT] PG Queue — bounded retry on transient Postgres connect

create_pg_connection retries a transient psycopg2.OperationalError with
bounded exponential backoff (WORKER_PG_QUEUE_CONNECT_RETRIES / _BACKOFF,
defaults 3 / 0.5s). Connecting is side-effect-free so retry is safe; the
enqueue INSERT is intentionally NOT retried (double-dispatch risk).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3618 UN-3619 [REFACTOR] address PR #2109 review

- Flipt observability (UN-3618): check_feature_flag_status / _variant now log a
  genuine evaluation failure (warning + exc_info) instead of silently returning
  the disabled default — the now-sole gate's outages are visible at the decision
  layer. Resolver defense-in-depth guards kept.
- Update stale shared executor_rpc docstring (Flipt is the sole gate).
- Remove now-dead module logger in workers executor_rpc.
- Connect retry (UN-3619): clamp+warn out-of-range RETRIES (cap 10) / negative
  BACKOFF; drop the unreachable AssertionError sentinel via an inverted loop;
  document the backoff cap + that permanent misconfigs are retried-then-raised;
  document both knobs in sample.env.
- Tests: add backoff-cap, unset-default, invalid/negative-backoff, attempts-clamp,
  geometric-growth, and params-forwarded/stable coverage (13 connection tests).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3619 [FIX] pg_benchmark connection test — drop generator-throw lambda (SonarCloud)

Replace the (_ for _ in ()).throw(...) generator trick with a plain local
def connect that raises — matches the other tests' style and clears the
SonarCloud comprehension finding.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
Sync the long-lived PG-queue integration branch with main (32 commits) to heal
migration-graph drift: account_v2.0005 (referenced by the cloud app_deployment_v2.0002)
was missing on the branch, causing NodeNotFoundError on migrate. uv.lock regenerated
from the merged pyproject (kept main's deps + re-added our scheduler croniter); PG
single-flag gating + connect-retry intact; backend 35 + workers 50 PG tests pass;
migration graph builds clean (251 nodes, no dangling deps).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The backmerge brought workflow_v2.0021_alter_workflow_organization (main) alongside
our 0020_workflowexecution_queue_message_id; both fork from 0019, leaving two leaf
nodes → 'Conflicting migrations detected' on migrate. Add the standard no-op merge
migration 0022 depending on both leaves. detect_conflicts now clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…pooling

The PG-queue workers connect through PgBouncer's transaction-pool alias
(unstract_txn), where the `options=-c search_path=<schema>` startup parameter is
stripped (IGNORE_STARTUP_PARAMETERS=options). Bare table names then resolve to
the default (public) path -> `relation "pg_queue_message" does not exist`. This
works in OSS/local (direct Postgres, search_path honoured) but breaks in cloud.

Fix: name every queue table explicitly as `"<schema>".<table>` via a new
`qualified()` helper (schema read lazily from DB_SCHEMA, validated), so the SQL
is self-contained and resolves through transaction pooling without search_path —
the same way the Celery result backend reaches its public tables. Keeps the
unstract_txn transaction pool (no session-pooling / scale regression).

- new schema.py: qualified() + queue_schema() + QUEUE_TABLES registry (rejects
  unknown / typo'd table names — single source of truth)
- qualify the SQL in client, result_backend, reaper, leader_election,
  pg_scheduler, pg_barrier (24 sites)
- guard test fails the build on any bare unqualified FROM/INTO/UPDATE pg_<table>,
  so a future query cannot silently regress the cloud path

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…nnection (#2121)

* UN-3651 [FIX] PG barrier enqueue — reconnect-retry on stale cached connection

A transient `psycopg2.OperationalError: server closed the connection
unexpectedly` during the PgBarrier enqueue (the barrier UPSERT) aborted the
whole ETL execution -> status ERROR with 0 files. PgBarrier keeps a thread-local
cached connection reused across barrier ops; while idle between ops it can be
reaped server-side (PgBouncer server_idle_timeout / DB failover) and `_get_conn`
can't tell (conn.closed is client-side only), so the first statement after the
idle gap fails. `_cursor` recovered the dead conn but did not retry the op.

Fix: a one-shot reconnect-retry (`_run_idempotent_write`, attempts=2) scoped to
the IDEMPOTENT, pre-dispatch barrier write only — the UPSERT (ON CONFLICT DO
UPDATE -> same row/state) + the per-execution dedup reset. Re-running them after
an ambiguous commit is a no-op and no header has been dispatched yet, so a retry
can neither duplicate a row nor double-dispatch work.

Idempotency boundary preserved: the non-idempotent decrement (remaining - 1) and
claim_batch stay on the plain `_cursor` (recover-but-don't-retry) — a re-applied
decrement could fire the callback early.

Scope: workers only, under the pg_queue_enabled flag (PgBarrier runs only on the
PG transport; flag off -> CeleryChordBarrier, this code never runs -> zero
Celery-path regression).

Tests: +3 unit tests (retry-then-succeed / no-retry-on-non-connection-error /
reraise-after-exhaust); full PgBarrier suite 54 passed; ruff clean. Dev-tested
live (A/B against real Postgres via pg_terminate_backend): old path raises +
loses the write; new path reconnects, retries, and the write lands.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3651 [FIX] address PR #2121 review — observability, backoff, naming, tests

Review feedback (greptile + PR-review-toolkit) — all non-blocking; fix was
confirmed correct. Changes:

- Retry log now names the real error (type + message) and keeps the traceback
  (exc_info=True); reworded so it no longer asserts a connection drop the broad
  psycopg2 catch can't be sure of. (HIGH — observability)
- Add a small fixed backoff (0.5s) before the retry — widens the self-heal
  window on a brief failover and avoids immediately re-hammering a struggling DB
  on the server-side errors the broad OperationalError catch also covers. (MED)
- Rename _run_idempotent_write -> _run_idempotent_pre_dispatch_write and type the
  op as Callable[[PgCursor], None] so the idempotent/pre-dispatch contract is
  loud at the call site (it can't be type-enforced). (MED — type design)
- _BARRIER_WRITE_ATTEMPTS: Final = 2  # total attempts; doc/comment precision:
  decrement hazard reworded (premature/incomplete callback or strand, not
  "fire twice"), "same state" softened (timestamps refresh harmlessly), trimmed
  the redundant call-site comment to a one-line pointer. (LOW)
- Tests: add an end-to-end self-heal test through enqueue() against the real DB
  (asserts the row lands once + every header dispatched exactly once — pins the
  wiring a no-DB test can't); parametrize the retry test over OperationalError
  AND InterfaceError; assert no partial commit (commits==0), exactly one extra
  attempt (executes counters), and the on-retry warning via caplog. (HIGH + MED)

Full PgBarrier suite: 56 passed; ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…rface error to UI (#2122)

* UN-3652 [FIX] PG orchestration failure — reconcile file counters + surface error to UI

When a PG-routed orchestration fails (e.g. the barrier-enqueue blip, or any
failure after total_files is set), two UX gaps remained: the execution went ERROR
but its files showed as perpetually "in progress" (UI = total - successful -
failed, with successful/failed left NULL), and the failure reason never reached
the UI (only workflow_execution.error_message + worker logs).

Fix, gated behind is_pg_transport (Celery branch byte-identical):
- New pure helper WorkflowOrchestrationUtils.pg_failure_file_counts(transport,
  total_files): PG -> {total_files, successful_files: 0, failed_files: total}
  so a failed run reads "N failed" (in-progress = 0); non-PG -> {} (the gate).
  total_files is included because the backend update_status serializer rejects
  file aggregates without it ("total_files is required when file aggregates are
  provided"; also successful + failed <= total).
- general/tasks.py _execute_general_workflow + api-deployment/tasks.py
  _run_workflow_api failure handlers: on PG, pass the counts to
  update_workflow_execution_status (B) and emit the error to the UI/WS via the
  workflow logger (C). api resolves transport defensively (assigned inside try).

Tests: +14 helper/gating unit tests (incl. serializer-constraint guards);
related suites green; ruff clean.

Dev-tested live E2E on a rebuilt image (flag on, real benchmark API deployment,
barrier table renamed to force a non-retryable orchestration failure):
- B: failed PG run -> total=1, successful=0, failed=1 (UI in-progress = 0).
- C: "❌ Workflow orchestration failed: ..." published to the execution's WS log.
The E2E caught a real bug a unit test missed: the first cut omitted total_files
and the backend rejected the update with 400 — fixed and re-verified.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3652 [FIX] address PR #2122 review — guard status update, hoist transport, test handler

Review (greptile P1 + PR-review-toolkit). Changes:

- Extract the PG failure recording into one tested seam,
  WorkflowOrchestrationUtils.record_pg_orchestration_failure (replaces
  pg_failure_file_counts). It (C) surfaces the error to the UI logger (guarded)
  and (B) reconciles counters via update_status, with the update WRAPPED in
  try/except so a status-update failure (e.g. a 400 serializer error) is logged
  but never re-raised — it can no longer mask the original orchestration error or
  skip the caller's re-raise (greptile P1). Never raises.
- api-deployment: resolve `transport` ONCE up front from kwargs (above the try)
  and reference it directly, instead of `locals().get("transport", ...)` which
  fell back to celery for a genuine PG run that failed BEFORE the in-try
  assignment — re-introducing the exact "stuck in progress" symptom. Dropped the
  duplicate normalize_transport.
- Both handlers now branch: PG -> record_pg_orchestration_failure; else -> the
  original bare status update, so the Celery path stays byte-identical.
- general: the UI-logger call is now None-guarded inside the helper (was an
  unguarded workflow_logger.log_error).
- Tests: replace the helper-only dict tests with behaviour tests of the recorder
  (mocked api_client/logger): counter reconciliation + total_files, error
  surfacing, logger-hiccup-doesn't-block-update, update-failure-swallowed-not-
  raised, and a kwarg-seam guard binding the counts against
  InternalAPIClient.update_workflow_execution_status's signature.

Re-verified live E2E on the refactored handler (renamed pg_barrier_state to force
a non-retryable failure): PG run -> ERROR total=1/ok=0/fail=1 (in-progress 0) +
"❌ Workflow orchestration failed" published to the WS log channel.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
@sonarqubecloud

Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant