CRUMB a card from devarno-cloud

Worker Pool Job Consumer

tektree intermediate 5 min read

ELI5

The worker pool is a hotel concierge desk after hours: a printer (Redis Stream) spits out request slips into a basket; N concierges take a slip, do the errand (send email, aggregate stats, clean stale rooms), and tear it once done. Untorn slips get redelivered.

Technical Deep Dive

services/worker-pool has no HTTP listener — it is a long-running consumer of the Redis Stream jobs. It is intentionally separate from the event-bus stream events: the bus is for cross-service notifications, the job stream is for “do this work for me, please.”

Architecture

sequenceDiagram
autonumber
participant Caller as any service
participant Stream as Redis Stream "jobs"
participant W1 as worker (consumer "worker")
participant Mongo
Caller->>Stream: XADD jobs * type=SendEmail payload=...
loop block 5s
W1->>Stream: XREADGROUP GROUP worker-pool worker BLOCK 5s COUNT 1
Stream-->>W1: {id, type, payload}
alt SendEmail
W1->>W1: deliver via SMTP (stub)
else AggregateStats
W1->>Mongo: read raw → write rollup
else Cleanup
W1->>Mongo: delete docs older than CleanupPayload.OlderThan
end
W1->>Stream: XACK jobs worker-pool <id>
end

Stream Configuration

services/worker-pool/internal/queue/queue.go:13-17:

  • Stream: jobs
  • Consumer group: worker-pool
  • Consumer name: worker
  • Block timeout: 5 s on XREADGROUP

Start(ctx, concurrency, handler) spawns N goroutines all reading the same group with the same consumer name. (Strict Redis consumer groups expect distinct consumer names per goroutine to prevent reclaim ambiguity — verify this before scaling concurrency in production.)

Job Envelope

queue.go:20-24:

type Job struct {
ID string
Type string
Payload json.RawMessage
}

The worker dispatch in internal/jobs/jobs.go:46-115 is a switch on Type with a per-type payload struct:

TypePayload structFields
SendEmailSendEmailPayloadTo, Subject, Body
AggregateStatsAggregateStatsPayloadMetricType, StartTime, EndTime
CleanupCleanupPayloadTarget (sessions / logs / temp_files), OlderThan (days)

All three are stubs today — the dispatch and ack work; the actual side effects are placeholders.

MongoDB Database

The worker connects to tektree database (jobs.go:33). Job-specific collections are not yet defined.

Bus vs Job Stream

Concernevents stream (bus)jobs stream (worker pool)
PurposePub-sub notificationWork queue
Multiple consumers per itemYes (multiple groups)No (one worker-pool group)
Wildcard subscribeYes (pattern on Type)No (switch on Type)
Default consumer groupeventsworker-pool
VersioningSemver on envelopeNone today

A service that needs both (“publish for everyone and schedule a side-effect”) should publish an event AND XADD a job. Do not conflate them.

Key Terms

  • Job{ID, Type, Payload}; opaque payload like the bus envelope.
  • Consumer groupworker-pool; bookmarks the next un-ACKed entry.
  • XACK → mandatory after handler success; missing it leaves the entry pending and eventually redelivered.
  • Block timeout → 5 s; a graceful poll cycle.

Q&A

Q: A backlog of 50k entries appears on jobs. What’s the first thing to check? A: Pending Entries List size (XPENDING jobs worker-pool). High pending = workers are reading but not ACKing — usually a panicking handler. Low pending + high stream length = throughput is below ingest rate; raise concurrency or shard the stream.

Q: Two workers run with the same consumer name worker. What happens? A: Redis treats them as one logical consumer, so reclaim and pending tracking get confused. Use distinct names (worker-1, worker-2, …) when running concurrent goroutines.

Q: How is a failed job retried? A: There is no explicit retry today — a missed ACK lets Redis redeliver to the next XREADGROUP for that group. There is no max-attempts cap, so a poison job can loop. Adding a max-attempts + DLQ is the highest-impact next step.

Examples

Enqueueing a welcome email after user.registered:

payload, _ := json.Marshal(SendEmailPayload{To: u.Email, Subject: "Welcome", Body: "..."})
rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "jobs",
Values: map[string]any{"type": "SendEmail", "payload": payload, "id": uuid.NewString()},
})

neighbors on the map