CRUMB a card from devarno-cloud

Async CSV Ingestion Job Pipeline

traceo intermediate 6 min read

ELI5

You drop a stack of forms at the front desk; the desk hands you a numbered ticket and the clerk takes the stack to a back room. While the clerk processes, you call the desk every few seconds quoting your ticket. When the clerk finishes, the desk shows your forms as completed (or failed with a list of issues).

Technical Deep Dive

Endpoints (Engine :8001)

EndpointPurpose
POST /jobs/ingestAccept a CSV/Excel upload, return 202 + job_id
GET /jobs/{job_id}Poll status + progress
GET /jobsList jobs (workspace-scoped)

POST /jobs/ingest:

  1. Validate file extension (.csv, .xlsx).
  2. Generate job_id (UUID).
  3. Read body bytes into Job.content.
  4. JobManager.create(...) — registers a Job with status=pending.
  5. Schedule job_manager.process_job(job_id) on FastAPI BackgroundTasks.
  6. Return 202 Accepted with {"job_id": ..., "status": "pending"}.

Job State Machine

stateDiagram-v2
[*] --> pending
pending --> running : background task starts
running --> completed : all rows processed, no fatal errors
running --> failed : exception or fatal validation error
completed --> [*]
failed --> [*]

The four states come straight from engine/src/engine/ingestion/jobs.py:JobStatus (pending, running, completed, failed). There is no cancelled state; killing the engine while a job runs leaves it stuck in running until restart drops it.

Progress Tracking

The Job dataclass carries:

FieldUse
total_rowsSet after CSV parse
processed_rowsIncremented per row
progress_percentageComputed (processed/total), zero when total_rows==0
issues: List[JobIssueRecord]Per-row validations: severity, issue_type, message, row_number
created_at, started_at, completed_atTimestamps for SLA reporting

Webhook on Completion

When process_job finishes, the engine fires a webhook to POST :8000/webhooks/... so the MCP server can refresh caches, write audit log entries, and notify the client UI without making the client poll forever.

Sequence

sequenceDiagram
participant W as Web client
participant E as Engine /jobs
participant JM as JobManager
participant BG as Background Task
participant M as MCP webhooks
W->>E: POST /jobs/ingest (CSV)
E->>JM: create(job_id)
E->>BG: schedule process_job(job_id)
E-->>W: 202 {job_id, status: pending}
loop Poll
W->>E: GET /jobs/{job_id}
E-->>W: {status, progress, issues}
end
BG->>JM: status=running
BG->>BG: load_csv → validate → write
BG->>JM: status=completed
BG->>M: POST /webhooks/job-complete

Optional Rust Backend

engine/src/engine/rust_backend.py exposes a Rust core for the validation hot path. When unavailable the pipeline falls back to pure Python. /health reports rust_backend: healthy | fallback | unavailable.

Key Terms

  • JobManager → in-memory registry of jobs with thread-safe state transitions.
  • JobIssueRecord → severity-tagged validation finding tied to a row_number.
  • BackgroundTasks → FastAPI primitive used to run process_job after the response is flushed.

Q&A

Q: Where do jobs live across restarts? A: Nowhere — JobManager is in-memory. A restart loses pending and running jobs. Completed-job results are durable only to the extent that rows already landed in requirements and audit_logs.

Q: How does the MCP server know a job finished? A: The engine posts a webhook to POST :8000/webhooks/.... Without that, the MCP would only learn via subsequent reads of requirements.

Q: What if the file isn’t .csv or .xlsx? A: create_ingestion_job rejects with a 4xx before any Job is created — no row ever appears in JobManager.

Examples

A 50,000-row CSV uploaded with the Rust backend healthy completes in seconds; the same file with rust_backend: unavailable runs in pure Python and may take minutes. The client sees identical state transitions; only progress_percentage advances faster on the Rust path.

neighbors on the map