- Prerequisites:
- Docker Desktop is installed and running.
- Rust toolchain is installed (
rustup,cargo).
- Create your local env file:
cp .env.example .env- Start services:
docker compose up --build -d- Verify service health:
Invoke-WebRequest -Uri http://localhost:3003/health -UseBasicParsing- Run the full postgresflow test suite (with strict warnings):
docker compose up -d db
$env:TEST_DATABASE_URL="postgres://postgres:postgres@localhost:5433/postgresflow_test"
$env:DATABASE_URL=$env:TEST_DATABASE_URL
$env:RUST_TEST_THREADS="1"
$env:RUSTFLAGS="-D warnings"
.\scripts\tests\ci.ps1- Stop services when done:
docker compose down- Architecture:
docs/ARCHITECTURE.md - API reference:
docs/API.md - Operations runbook:
docs/OPERATIONS.md - Benchmarking methodology:
docs/BENCHMARKING.md - Contributor workflow:
CONTRIBUTING.md - Release process:
docs/RELEASE.md
- Copy env template:
- Windows PowerShell: cp .env.example .env
- Start:
docker compose up --build
- Seed demo jobs (no psql needed):
docker compose exec pgflow ./pgflowctl demo
- View admin UI / API:
Endpoints:
- GET /jobs
- POST /jobs
- /jobs/:id/timeline
- /jobs/:id/explain
- /jobs/:id/replay
- /dlq
- /ingest/decisions
- /metrics (JSON)
- /metrics/prom (Prometheus text)
- /health
Disable the admin API by setting PGFLOW_ADMIN_ADDR=off in .env.
Set PGFLOW_API_TOKEN to require x-api-key: <token> (or Authorization: Bearer <token>) on admin API endpoints.
- DLQ view:
curl http://localhost:3003/dlq
- Metrics (last 60s window):
curl http://localhost:3003/metrics
- Timeline for a failed job:
curl http://localhost:3003/jobs/JOB_ID/timeline
- Fast compile and test-build checks (no database needed):
cargo check -p postgresflow
cargo test -p postgresflow --no-runExpected result: both commands finish successfully. Warnings are acceptable; errors are not.
- Targeted API behavior test tied to timeline endpoints in
crates/postgresflow/src/api/mod.rs:
$env:TEST_DATABASE_URL="postgres://postgres:postgres@localhost:5433/postgresflow_test"
cargo test -p postgresflow --test timeline -- --nocaptureExpected result: test timeline_shows_attempt_story ... ok.
If you see TEST_DATABASE_URL missing, set the variable and rerun.
- Full integration suite:
$env:TEST_DATABASE_URL="postgres://postgres:postgres@localhost:5433/postgresflow_test"
$env:DATABASE_URL=$env:TEST_DATABASE_URL
$env:RUST_TEST_THREADS="1"
$env:RUSTFLAGS="-D warnings"
cargo test -p postgresflow -- --nocaptureEquivalent helper script:
$env:TEST_DATABASE_URL="postgres://postgres:postgres@localhost:5433/postgresflow_test"
$env:DATABASE_URL=$env:TEST_DATABASE_URL
.\scripts\tests\ci.ps1- Manual API smoke test (add
x-api-keyheader ifPGFLOW_API_TOKENis set):
Invoke-RestMethod -Method Post -Uri http://localhost:3003/jobs -ContentType "application/json" -Body '{"queue":"default","job_type":"email_send","payload_json":{"user_id":123}}'
Invoke-RestMethod -Uri "http://localhost:3003/jobs?limit=5"
Invoke-RestMethod -Uri "http://localhost:3003/metrics"
Invoke-RestMethod -Uri "http://localhost:3003/dlq"Expected result: enqueue returns a job_id, list/metrics/dlq endpoints return JSON without Authorization headers.
- Teardown (if you started infra for tests):
docker compose downThis project was tested locally using Docker Compose and the admin API. Example evidence below shows successful enqueue, retry/DLQ path, timeline, metrics, and guardrails. Outputs will differ per run.
Environment:
- OS: Windows
- Start:
docker compose up --build
Commands and sample outputs:
Enqueue success:
Invoke-RestMethod -Method Post -Uri http://localhost:3003/jobs -ContentType "application/json" -Body $body
# { "job_id": "JOB_ID_SUCCESS" }
Enqueue failure (DLQ path):
Invoke-RestMethod -Method Post -Uri http://localhost:3003/jobs -ContentType "application/json" -Body $failBody
# { "job_id": "JOB_ID_FAIL" }
Timeline (success):
Invoke-RestMethod -Uri "http://localhost:3003/jobs/JOB_ID_SUCCESS/timeline"
# status: succeeded
Timeline (fail + retries):
Invoke-RestMethod -Uri "http://localhost:3003/jobs/JOB_ID_FAIL/timeline"
# status: queued -> retries -> dlq after max_attempts
DLQ:
Invoke-RestMethod -Uri "http://localhost:3003/dlq"
# items: [ ... job_type=fail_me, status=dlq, dlq_reason_code=MAX_ATTEMPTS_EXCEEDED ... ]
Metrics (JSON + Prom):
Invoke-RestMethod -Uri "http://localhost:3003/metrics"
Invoke-WebRequest -Uri "http://localhost:3003/metrics/prom" -UseBasicParsing
Guardrail (payload too large):
Invoke-RestMethod -Method Post -Uri http://localhost:3003/jobs -ContentType "application/json" -Body $bigBody
# HTTP 413 PAYLOAD_TOO_LARGE
Invoke-RestMethod -Uri "http://localhost:3003/ingest/decisions"
# reason_code: PAYLOAD_TOO_LARGE
PowerShell:
.\scripts\metrics\watch.ps1
Optional parameters:
-IntervalSeconds 1-Queue default-BaseUrl http://localhost:3003
docker compose up -d db
$env:TEST_DATABASE_URL="postgres://postgres:postgres@localhost:5433/postgresflow_test"
$env:DATABASE_URL=$env:TEST_DATABASE_URL
$env:RUST_TEST_THREADS="1"
$env:RUSTFLAGS="-D warnings"
.\scripts\tests\ci.ps1If you see TEST_DATABASE_URL missing or DATABASE_URL must be set, set both env vars as shown above.
Load test script (Docker):
bash scripts/load/run.sh WORKERS JOBS
Example:
PGFLOW_BENCH_SECONDS=30 bash scripts/load/run.sh 2 50000 PGFLOW_BENCH_SECONDS=30 bash scripts/load/run.sh 8 100000
Outputs:
- run-local succeeded delta for the selected dataset
- dataset and global row counts
- approximate jobs/sec
- a machine-readable summary line:
BENCH_RESULT workers=... jobs_per_sec=... global_rows=...
Useful benchmark env vars:
PGFLOW_DATASET_IDto pin a dataset/partition for repeated runsPGFLOW_BENCH_SECONDSto change the sample windowPGFLOW_MAX_ATTEMPTSandPGFLOW_LOAD_JOB_TYPEfor workload shape
Script notes:
- auto-waits for
jobs.dataset_idmigration - creates dataset partition via
ensure_jobs_dataset_partition(...)when available - recreates worker profile containers so code/env changes are applied per run
Outputs approximate jobs/sec and basic container stats. Note: total workers = 1 (pgflow) + (worker profile). Windows: run from Git Bash or WSL.
For repeatable benchmark methodology and reporting format, see docs/BENCHMARKING.md.
- Dataset partitioning on
jobs.dataset_id(hourly/default bucket strategy supported). - Batch dequeue/lease path with dequeue index tuning for queue scans.
- Batch attempt lifecycle writes (
start_attempts_batch) and batch success acks. - Archive/prune maintenance hooks for long-term table/index health.
- Optional API key auth on admin API via
PGFLOW_API_TOKEN.
POST /jobs
Example:
curl -X POST http://localhost:3003/jobs
-H "Content-Type: application/json"
-d '{"queue":"default","job_type":"email_send","payload_json":{"user_id":123},"priority":0,"max_attempts":25}'
Use JobsRepo::enqueue_* plus EnqueueGuard in your producer service to enforce limits
and write ingest_decisions for rejected payloads/rates.
Replace the simulated work in crates/worker/src/main.rs with real job handlers.
Pattern:
- lease job
- start attempt
- run handler based on job_type
- call
runner.on_success(...)orrunner.on_failure(...)Handlers should return meaningful error codes (e.g.,TIMEOUT,BAD_PAYLOAD,UNKNOWN_JOB_TYPE). Handlers can be registered with per-handler concurrency limits and timeouts incrates/worker/src/handlers.rs.
Start extra workers (admin stays on pgflow):
docker compose --profile worker up -d --scale worker=4
- Bind admin to localhost or a private network in production.
- Run the admin API behind a reverse proxy with TLS if exposed beyond localhost.
PGFLOW_MAX_PAYLOAD_BYTESandPGFLOW_MAX_ENQUEUE_PER_MINUTEfor guardrails.PGFLOW_LEASE_SECONDSfor lock timeouts.PGFLOW_DEQUEUE_BATCH_SIZEfor batch leasing per poll.PGFLOW_REAP_INTERVAL_MSto control orphan-lease reap cadence.PGFLOW_VERBOSE_JOB_LOGSto enable/disable per-job hot-path logs.PGFLOW_DB_MAX_CONNECTIONSandPGFLOW_DB_ACQUIRE_TIMEOUT_SECSfor pool sizing.PGFLOW_DISABLE_SYNC_COMMITandPGFLOW_DISABLE_JITfor DB session tuning.ARCHIVE_SUCCEEDED_AFTER_DAYSandPRUNE_HISTORY_AFTER_DAYSfor retention.PGFLOW_API_TOKENto requirex-api-key/ bearer token on admin endpoints.
- Configure enqueue guardrails:
PGFLOW_MAX_PAYLOAD_BYTES,PGFLOW_MAX_ENQUEUE_PER_MINUTE. - Tune retry policy and max_attempts per job type.
- Move admin API behind TLS if exposed beyond localhost.
- Monitor
/metrics/promand DB health (connections, WAL, disk).
- Every attempt is recorded with error_code, error_message, worker_id, and latency.
- Every retry or DLQ transition preserves lineage in job history.
- Leasing is exclusive per job (no two workers hold the same job at once).
- Execution is at-least-once; handlers must be idempotent.
- DLQ is explicit and queryable with a reason code.
- Policy decisions (throttles) are persisted for inspection.
- Replay creates a new job with a pointer to the original.
- One datastore (Postgres) for both durability and coordination.
- Strong consistency and transactional leasing.
- Low ops overhead: no extra brokers, simple local setup.
- Throughput is bounded by a single Postgres primary.
- Heavy write volume competes with application workloads.
- Very large payloads or extremely high enqueue rates require guardrails.
- Long-running jobs reduce effective concurrency unless tuned.
- Exactly-once execution is not guaranteed; design handlers accordingly.
- Horizontal workers (stateless).
- Multiple queues with per-queue policies.
- Read-heavy admin and metrics endpoints.
- Index health on jobs/job_attempts for high volumes.
- Connection pool sizing in workers and admin API.
- Disk I/O and WAL growth under sustained throughput.
- Vacuum/autovacuum tuning for large job tables.