CRUMB a card from devarno-cloud

Event Envelope (Redis Streams)

tektree intermediate 5 min read

ELI5

Every event is a parcel: the box has a tracking number (id), what’s inside printed on the side (type + version), the warehouse it shipped from (source), the master order it belongs to (correlation_id) and the parent parcel that triggered this one (causation_id). The contents themselves are sealed bytes (payload).

Technical Deep Dive

libs/event-bus/pkg/events/event.go defines the canonical envelope; libs/event-bus/pkg/redis/{publisher,subscriber}.go ships the Redis Streams transport.

Envelope Class Diagram

classDiagram
class Event {
+string ID
+string Type
+string Source
+time.Time Timestamp
+string Version
+string CorrelationID
+string CausationID
+json.RawMessage Payload
+map~string,string~ Metadata
}
class Publisher {
+Publish(e *Event) error
+PublishBatch(events []*Event) error
}
class Subscriber {
+SubscriberConfig Cfg
+Subscribe(pattern string, h Handler) error
+Start() error
+Stop() error
}
class SubscriberConfig {
+string StreamName
+string ConsumerGroup
+string ConsumerName
+int BatchSize
+Duration BlockTimeout
+int MaxRetries
+Duration RetryBackoff
+string DLQStream
+bool DLQEnabled
}
Publisher ..> Event
Subscriber ..> Event
Subscriber --> SubscriberConfig

Publish/Consume Flow

sequenceDiagram
autonumber
participant P as Publisher (service A)
participant Stream as Redis Stream "events"
participant S as Subscriber (service B, group "events")
P->>Stream: XADD events * envelope_json (MAXLEN ~ approx)
S->>Stream: XREADGROUP GROUP events <consumer> BLOCK 5s COUNT N
Stream-->>S: pending messages
S->>S: pattern match "user.*", dispatch handler
alt handler ok
S->>Stream: XACK events events <id>
else handler error within MaxRetries
S->>Stream: leave un-ACKed → redeliver
else handler error past MaxRetries
S->>Stream: XADD <DLQStream> * envelope (if DLQEnabled)
S->>Stream: XACK to drop from PEL
end

Type Naming Convention

{domain}.{aggregate}.{action} — the dot-segmented form matches the wildcard pattern syntax in Subscriber.Subscribe("user.*", handler). Version is semantic (“1.0”, “1.1”); breaking payload changes bump major and typically rename the type.

Correlation vs Causation

  • CorrelationID — the same value across every event in a single business transaction (e.g. one user signup spawning user.registered → gamification.xp.earned → notification.sent). Used for distributed tracing.
  • CausationID — the immediate parent event id. Forms a tree (not a flat list) of “this event was caused by that one.” Useful for replaying a fan-out while preserving structure.

Payload Discipline

Payload is json.RawMessage — the bus is type-agnostic. Each event type has its own concrete Go struct (in the publisher’s package); subscribers json.Unmarshal into the matching struct after pattern matching on Type. There is no schema registry; the catalog (docs/docs/architecture/EVENT_CATALOG.md) is the contract.

DLQ State

The DLQStream and DLQEnabled fields exist on SubscriberConfig and the retry comment at subscriber.go:209 indicates the loop is stubbed today — events that fail past MaxRetries are logged but not yet diverted. Treat this as scaffolding, not production behaviour.

Key Terms

  • Stream → Redis key holding the ordered event log (XADD/XREADGROUP); default name "events".
  • Consumer group → bookmarks per group; one consumer in the group acks, others skip.
  • PEL → Pending Entries List, Redis’s per-group “delivered but not yet ACKed” tracker.
  • Wildcard pattern → matched on Type after delivery, e.g. "user.*", not via Redis itself.
  • Approximate trimMAXLEN ~ N keeps the stream bounded with O(1) cost.

Q&A

Q: A subscriber’s handler panics. Will the event be retried? A: Yes — it remains in the PEL. XREADGROUP with > will not redeliver, but the consumer’s idle-handler reclaim (XCLAIM after IdleTimeout) or pending-fetch (XREADGROUP with 0) will. If retries exceed MaxRetries and DLQEnabled ships as production-complete, it lands on DLQStream.

Q: Why is the consumer group default named the same as the stream (events)? A: Convention only. Multiple groups consume the same stream independently; naming the default group after the stream keeps single-consumer setups simple. Production should namespace by service (gamification-events, notifications-events).

Q: Two events have the same CorrelationID but different CausationIDs. Is that valid? A: Yes — they are siblings within the same business transaction, both fanned out from the same parent. The correlation groups them; the causation tells you which one came first.

Examples

Publishing a gamification.xp.earned from gamification-service after handling a knowledge.question.posted:

parent := incoming
out := &events.Event{
ID: uuid.NewString(),
Type: "gamification.xp.earned",
Source: "gamification-service",
Timestamp: time.Now().UTC(),
Version: "1.0",
CorrelationID: parent.CorrelationID,
CausationID: parent.ID,
Payload: mustMarshal(XPEarnedV1{UserID: uid, Amount: 5, Source: parent.Type}),
}
publisher.Publish(out)

neighbors on the map