Event Envelope (Redis Streams)
tektree intermediate 5 min read
ELI5
Every event is a parcel: the box has a tracking number (id), what’s inside printed on the side (type + version), the warehouse it shipped from (source), the master order it belongs to (correlation_id) and the parent parcel that triggered this one (causation_id). The contents themselves are sealed bytes (payload).
Technical Deep Dive
libs/event-bus/pkg/events/event.go defines the canonical envelope; libs/event-bus/pkg/redis/{publisher,subscriber}.go ships the Redis Streams transport.
Envelope Class Diagram
classDiagram class Event { +string ID +string Type +string Source +time.Time Timestamp +string Version +string CorrelationID +string CausationID +json.RawMessage Payload +map~string,string~ Metadata } class Publisher { +Publish(e *Event) error +PublishBatch(events []*Event) error } class Subscriber { +SubscriberConfig Cfg +Subscribe(pattern string, h Handler) error +Start() error +Stop() error } class SubscriberConfig { +string StreamName +string ConsumerGroup +string ConsumerName +int BatchSize +Duration BlockTimeout +int MaxRetries +Duration RetryBackoff +string DLQStream +bool DLQEnabled } Publisher ..> Event Subscriber ..> Event Subscriber --> SubscriberConfigPublish/Consume Flow
sequenceDiagram autonumber participant P as Publisher (service A) participant Stream as Redis Stream "events" participant S as Subscriber (service B, group "events") P->>Stream: XADD events * envelope_json (MAXLEN ~ approx) S->>Stream: XREADGROUP GROUP events <consumer> BLOCK 5s COUNT N Stream-->>S: pending messages S->>S: pattern match "user.*", dispatch handler alt handler ok S->>Stream: XACK events events <id> else handler error within MaxRetries S->>Stream: leave un-ACKed → redeliver else handler error past MaxRetries S->>Stream: XADD <DLQStream> * envelope (if DLQEnabled) S->>Stream: XACK to drop from PEL endType Naming Convention
{domain}.{aggregate}.{action} — the dot-segmented form matches the wildcard pattern syntax in Subscriber.Subscribe("user.*", handler). Version is semantic (“1.0”, “1.1”); breaking payload changes bump major and typically rename the type.
Correlation vs Causation
CorrelationID— the same value across every event in a single business transaction (e.g. one user signup spawning user.registered → gamification.xp.earned → notification.sent). Used for distributed tracing.CausationID— the immediate parent event id. Forms a tree (not a flat list) of “this event was caused by that one.” Useful for replaying a fan-out while preserving structure.
Payload Discipline
Payload is json.RawMessage — the bus is type-agnostic. Each event type has its own concrete Go struct (in the publisher’s package); subscribers json.Unmarshal into the matching struct after pattern matching on Type. There is no schema registry; the catalog (docs/docs/architecture/EVENT_CATALOG.md) is the contract.
DLQ State
The DLQStream and DLQEnabled fields exist on SubscriberConfig and the retry comment at subscriber.go:209 indicates the loop is stubbed today — events that fail past MaxRetries are logged but not yet diverted. Treat this as scaffolding, not production behaviour.
Key Terms
- Stream → Redis key holding the ordered event log (
XADD/XREADGROUP); default name"events". - Consumer group → bookmarks per group; one consumer in the group acks, others skip.
- PEL → Pending Entries List, Redis’s per-group “delivered but not yet ACKed” tracker.
- Wildcard pattern → matched on
Typeafter delivery, e.g."user.*", not via Redis itself. - Approximate trim →
MAXLEN ~ Nkeeps the stream bounded with O(1) cost.
Q&A
Q: A subscriber’s handler panics. Will the event be retried?
A: Yes — it remains in the PEL. XREADGROUP with > will not redeliver, but the consumer’s idle-handler reclaim (XCLAIM after IdleTimeout) or pending-fetch (XREADGROUP with 0) will. If retries exceed MaxRetries and DLQEnabled ships as production-complete, it lands on DLQStream.
Q: Why is the consumer group default named the same as the stream (events)?
A: Convention only. Multiple groups consume the same stream independently; naming the default group after the stream keeps single-consumer setups simple. Production should namespace by service (gamification-events, notifications-events).
Q: Two events have the same CorrelationID but different CausationIDs. Is that valid?
A: Yes — they are siblings within the same business transaction, both fanned out from the same parent. The correlation groups them; the causation tells you which one came first.
Examples
Publishing a gamification.xp.earned from gamification-service after handling a knowledge.question.posted:
parent := incomingout := &events.Event{ ID: uuid.NewString(), Type: "gamification.xp.earned", Source: "gamification-service", Timestamp: time.Now().UTC(), Version: "1.0", CorrelationID: parent.CorrelationID, CausationID: parent.ID, Payload: mustMarshal(XPEarnedV1{UserID: uid, Amount: 5, Source: parent.Type}),}publisher.Publish(out)neighbors on the map
- EventEnvelope Wire Wrapper publishing a new domain event proto
- ProtocolMessage Envelope adding a new wire message type
- NATS Event Bridge subscribing a Choco service to STRATT lifecycle events