Windshift is an event-stream framework on top of NATS Jetstream, that enforces strong typing using Protobuf with support for tracing using OpenTelemetry.
- 🌊 Stream management, declare streams and bind subjects to them, with configurable retention and limits
- 📄 Event data in Protobuf format, for strong typing and schema evolution
- 📤 Publish events to subjects, with idempotency and OpenTelemetry tracing
- 📥 Durable consumers with distributed processing
- 🕒 Ephemeral consumers for one off event processing
- 🔄 Automatic redelivery of failed events
- 🔔 Ability to extend processing time by pinging events
go get github.com/levelfourab/windshift-go
Creating a client:
natsClient, err := nats.Connect("localhost:4222")
eventsClient, err := windshift.NewEvents(natsClient)
Data in Windshift is represented using Protobuf. You will need to generate Go code from your Protobuf files to use the library.
Buf can be used to generate the necessary files, and provides linting and breaking change detection.
Streams in Windshift store events. They can be used to replay events, or to store events for a certain amount of time. Streams can be configured with retention policies, sources, and more.
Streams can receive events from multiple subjects, but each subject can only
be bound to one stream. For example, if you have a stream called orders
which
receives events from the orders.created
subject, you cannot create another
stream that also receives events from orders.created
.
See Streams in the NATS Jetstream documentation for more details.
Example:
stream, err := eventsClient.EnsureStream(
ctx,
"orders",
streams.WithSubjects("orders.>"),
)
It is possible to control the retention policy of the stream via things like
streams.MaxAge
, streams.MaxBytes
, and streams.MaxEvents
.
Example:
stream, err := eventsClient.EnsureStream(
ctx,
"orders",
streams.WithSubjects("orders.>"),
streams.MaxAge(30 * time.Days),
)
Events can be published if there is a stream that matches the subject of the event. The event will be stored in the stream and can be consumed by consumers subscribed to the stream.
Example:
eventsClient.Publish(ctx, &events.OutgoingEvent{
Subject: "orders.created",
Data: &ordersv1.OrderCreated{
ID: "123",
},
})
Features:
- Timestamps for when the event occurred can be specified with
timestamp
. - Idempotency keys can be specified using
idempotency_key
. If an event with the same idempotency key has already been published, the event will not be published again. The window for detecting duplicates can be configured via the stream. - Optimistic concurrency control can be used via
expected_last_id
. If the last event in the stream does not have the specified id, the event will not be published.
Consumers in Windshift are used to subscribe to events in streams. Consumers can be ephemeral or durable. Ephemeral consumers are automatically removed after they have been inactive for a certain amount of time.
To create a durable consumer give it a name:
consumer, err := events.EnsureConsumer(ctx, "orders", consumers.WithName("idOfConsumer"))
To create an ephemeral consumer omit the name:
consumer, err := events.EnsureConsumer(ctx, "orders")
Consumers can be configured with options. Options include:
WithSubjects
- a subset of subjects to subscribe to.WithProcessingTimeout
- the time to wait for an event to be acknowledged, rejected, or pinged before requeuing it.WithConsumeFrom
- the position in the stream to start consuming events from.
Events can be consumed by subscribing to a consumer. For durable consumers multiple subscriptions may be made, and events will be distributed between the subscriptions.
Example:
events, err := eventsClient.Subscribe(ctx, "orders", "idOfConsumer")
for event := range events {
// Context includes tracing data from the publishing of the event
ctx := event.Context()
// Unmarshal the event data to process it
data, err := event.UnmarshalNew()
// Acknowledge that event was processed (or reject it)
err := event.Ack(ctx)
if err != nil {
// Handle error
}
}
Subscriptions stay active as long as the context remains uncanceled and will reconnect to NATS if the connection is lost.
Events need to be acknowledge or rejected to indicate if we have successfully handled them or not. If an event is not acknowledged or rejected within the processing timeout for the consumer it will be redelivered.
To acknowledge an event call Ack
:
err := event.Ack(ctx)
To reject an event call Reject
:
// Reject and requeue the event for immediate redelivery
err := event.Reject(ctx)
// Reject and requeue the event for redelivery after a delay
err := event.Reject(ctx, events.WithRedeliveryDelay(5 * time.Minute))
// Reject permanently
err := event.Reject(ctx, events.Permanently())
// Reject with customizable delay
err := event.Reject(ctx, events.WithRedeliveryDecider(func(event events.Event) time.Duration {
return ... // Calculate delay based on delivery attempt, headers, data etc
}))
To extend the time available for processing an event, the event can be pinged.
err := event.Ping(ctx)
It also possible to control how the library attempts to retry acking or rejecting
an event by using either events.WithNoRetry
or events.WithBackoff
:
err := event.Ack(ctx, events.WithBackoff(
delays.StopAfterMaxTime(delays.Exponential(10*time.Millisecond, 2), 5*time.Second),
))