Worker Pool Job Consumer
tektree intermediate 5 min read
ELI5
The worker pool is a hotel concierge desk after hours: a printer (Redis Stream) spits out request slips into a basket; N concierges take a slip, do the errand (send email, aggregate stats, clean stale rooms), and tear it once done. Untorn slips get redelivered.
Technical Deep Dive
services/worker-pool has no HTTP listener — it is a long-running consumer of the Redis Stream jobs. It is intentionally separate from the event-bus stream events: the bus is for cross-service notifications, the job stream is for “do this work for me, please.”
Architecture
sequenceDiagram autonumber participant Caller as any service participant Stream as Redis Stream "jobs" participant W1 as worker (consumer "worker") participant Mongo Caller->>Stream: XADD jobs * type=SendEmail payload=... loop block 5s W1->>Stream: XREADGROUP GROUP worker-pool worker BLOCK 5s COUNT 1 Stream-->>W1: {id, type, payload} alt SendEmail W1->>W1: deliver via SMTP (stub) else AggregateStats W1->>Mongo: read raw → write rollup else Cleanup W1->>Mongo: delete docs older than CleanupPayload.OlderThan end W1->>Stream: XACK jobs worker-pool <id> endStream Configuration
services/worker-pool/internal/queue/queue.go:13-17:
- Stream:
jobs - Consumer group:
worker-pool - Consumer name:
worker - Block timeout: 5 s on
XREADGROUP
Start(ctx, concurrency, handler) spawns N goroutines all reading the same group with the same consumer name. (Strict Redis consumer groups expect distinct consumer names per goroutine to prevent reclaim ambiguity — verify this before scaling concurrency in production.)
Job Envelope
queue.go:20-24:
type Job struct { ID string Type string Payload json.RawMessage}The worker dispatch in internal/jobs/jobs.go:46-115 is a switch on Type with a per-type payload struct:
| Type | Payload struct | Fields |
|---|---|---|
SendEmail | SendEmailPayload | To, Subject, Body |
AggregateStats | AggregateStatsPayload | MetricType, StartTime, EndTime |
Cleanup | CleanupPayload | Target (sessions / logs / temp_files), OlderThan (days) |
All three are stubs today — the dispatch and ack work; the actual side effects are placeholders.
MongoDB Database
The worker connects to tektree database (jobs.go:33). Job-specific collections are not yet defined.
Bus vs Job Stream
| Concern | events stream (bus) | jobs stream (worker pool) |
|---|---|---|
| Purpose | Pub-sub notification | Work queue |
| Multiple consumers per item | Yes (multiple groups) | No (one worker-pool group) |
| Wildcard subscribe | Yes (pattern on Type) | No (switch on Type) |
| Default consumer group | events | worker-pool |
| Versioning | Semver on envelope | None today |
A service that needs both (“publish for everyone and schedule a side-effect”) should publish an event AND XADD a job. Do not conflate them.
Key Terms
- Job →
{ID, Type, Payload}; opaque payload like the bus envelope. - Consumer group →
worker-pool; bookmarks the next un-ACKed entry. XACK→ mandatory after handler success; missing it leaves the entry pending and eventually redelivered.- Block timeout → 5 s; a graceful poll cycle.
Q&A
Q: A backlog of 50k entries appears on jobs. What’s the first thing to check?
A: Pending Entries List size (XPENDING jobs worker-pool). High pending = workers are reading but not ACKing — usually a panicking handler. Low pending + high stream length = throughput is below ingest rate; raise concurrency or shard the stream.
Q: Two workers run with the same consumer name worker. What happens?
A: Redis treats them as one logical consumer, so reclaim and pending tracking get confused. Use distinct names (worker-1, worker-2, …) when running concurrent goroutines.
Q: How is a failed job retried?
A: There is no explicit retry today — a missed ACK lets Redis redeliver to the next XREADGROUP for that group. There is no max-attempts cap, so a poison job can loop. Adding a max-attempts + DLQ is the highest-impact next step.
Examples
Enqueueing a welcome email after user.registered:
payload, _ := json.Marshal(SendEmailPayload{To: u.Email, Subject: "Welcome", Body: "..."})rdb.XAdd(ctx, &redis.XAddArgs{ Stream: "jobs", Values: map[string]any{"type": "SendEmail", "payload": payload, "id": uuid.NewString()},})neighbors on the map
- End-to-End Chain Execution Request Flow tracing a chain execution through the entire system
- Prompt-DAG Scheduler designing a graph.json for a new repo
- Dependency DAG & Blast Radius estimating the impact of changing a shared rule