Async CSV Ingestion Job Pipeline
traceo intermediate 6 min read
ELI5
You drop a stack of forms at the front desk; the desk hands you a numbered ticket and the clerk takes the stack to a back room. While the clerk processes, you call the desk every few seconds quoting your ticket. When the clerk finishes, the desk shows your forms as completed (or failed with a list of issues).
Technical Deep Dive
Endpoints (Engine :8001)
| Endpoint | Purpose |
|---|---|
POST /jobs/ingest | Accept a CSV/Excel upload, return 202 + job_id |
GET /jobs/{job_id} | Poll status + progress |
GET /jobs | List jobs (workspace-scoped) |
POST /jobs/ingest:
- Validate file extension (
.csv,.xlsx). - Generate
job_id(UUID). - Read body bytes into
Job.content. JobManager.create(...)— registers aJobwithstatus=pending.- Schedule
job_manager.process_job(job_id)on FastAPIBackgroundTasks. - Return
202 Acceptedwith{"job_id": ..., "status": "pending"}.
Job State Machine
stateDiagram-v2 [*] --> pending pending --> running : background task starts running --> completed : all rows processed, no fatal errors running --> failed : exception or fatal validation error completed --> [*] failed --> [*]The four states come straight from engine/src/engine/ingestion/jobs.py:JobStatus (pending, running, completed, failed). There is no cancelled state; killing the engine while a job runs leaves it stuck in running until restart drops it.
Progress Tracking
The Job dataclass carries:
| Field | Use |
|---|---|
total_rows | Set after CSV parse |
processed_rows | Incremented per row |
progress_percentage | Computed (processed/total), zero when total_rows==0 |
issues: List[JobIssueRecord] | Per-row validations: severity, issue_type, message, row_number |
created_at, started_at, completed_at | Timestamps for SLA reporting |
Webhook on Completion
When process_job finishes, the engine fires a webhook to POST :8000/webhooks/... so the MCP server can refresh caches, write audit log entries, and notify the client UI without making the client poll forever.
Sequence
sequenceDiagram participant W as Web client participant E as Engine /jobs participant JM as JobManager participant BG as Background Task participant M as MCP webhooks W->>E: POST /jobs/ingest (CSV) E->>JM: create(job_id) E->>BG: schedule process_job(job_id) E-->>W: 202 {job_id, status: pending} loop Poll W->>E: GET /jobs/{job_id} E-->>W: {status, progress, issues} end BG->>JM: status=running BG->>BG: load_csv → validate → write BG->>JM: status=completed BG->>M: POST /webhooks/job-completeOptional Rust Backend
engine/src/engine/rust_backend.py exposes a Rust core for the validation hot path. When unavailable the pipeline falls back to pure Python. /health reports rust_backend: healthy | fallback | unavailable.
Key Terms
- JobManager → in-memory registry of jobs with thread-safe state transitions.
- JobIssueRecord → severity-tagged validation finding tied to a
row_number. - BackgroundTasks → FastAPI primitive used to run
process_jobafter the response is flushed.
Q&A
Q: Where do jobs live across restarts?
A: Nowhere — JobManager is in-memory. A restart loses pending and running jobs. Completed-job results are durable only to the extent that rows already landed in requirements and audit_logs.
Q: How does the MCP server know a job finished?
A: The engine posts a webhook to POST :8000/webhooks/.... Without that, the MCP would only learn via subsequent reads of requirements.
Q: What if the file isn’t .csv or .xlsx?
A: create_ingestion_job rejects with a 4xx before any Job is created — no row ever appears in JobManager.
Examples
A 50,000-row CSV uploaded with the Rust backend healthy completes in seconds; the same file with rust_backend: unavailable runs in pure Python and may take minutes. The client sees identical state transitions; only progress_percentage advances faster on the Rust path.
neighbors on the map
- Producer / Reader Split onboarding a new producer repo
- Site Provisioning Saga State Machine debugging a site stuck mid-provision
- Observability State & Gaps scoping a tracing or structured-logging rollout