A .NET library to create message consumers.
Ziggurat implements functionalities to help solve common problems when dealing with messages:
- Idempotency
- Middleware: allows to create middlewares to consumers to handle logging, validation and whatever is needed.
The library uses the decorator pattern to execute a middleware pipeline when calling the consumer services. This way is possible to extend the service code adding new functionality.
The Idempotency middleware wraps the service enforcing that the message in only being processed once by tracking the message processing on the database.
Also, it's possible to add custom middlewares to the pipeline.
Ziggurat has support to:
- Storage:
- MS SQL Server
- MongoDB
- Messaging Library
Ziggurat | |
Ziggurat.CapAdapter | |
Ziggurat.SqlServer | |
Ziggurat.MongoDB |
Ziggurat works with middlewares. Registering middlewares adds functionality to the message consumer. Important to note that multiple middlewares can be registered to the same consumer. They are executed following the order of the registration.
Ziggurat integrates with the application Entity Framework Core to track the processed messages and ensures that each message is processed only once. Also, the EF Core migrations are used to create the message tracking table with the correct constraints. If you are not using migration in your project the table must be created manually.
To use Ziggurat is necessary to create a message and a consumer service type:
public class MyMessage : IMessage
{
public string Content { get; set; }
public string MessageId { get; set; }
public string MessageGroup { get; set; }
}
public class MyMessageConsumerService : IConsumerService<MyMessage>
{
private readonly MyDbContext _context;
public MyMessageConsumerService(MyDbContext context)
{
_context = context;
}
public async Task ProcessMessageAsync(MyMessage message)
{
// Change the application bussiness objects tracked by EF Core
_context.SomeEntity.Add(x);
await _context.SaveChangesAsync();
}
}
Ziggurat.SqlServer ensures that the processed messages are tracked by the EF Core DbContext
. Calling SaveChangesAsync
will save the changes made to the business objects and the processed message to the DB.
The message type must implements the interface IMessage
.
It's also required that the consumers are setup on the dependency injection configuration. Besides, it's necessary to add the CAP filter that enriches the message with the required information.
services
.AddConsumerService<MyMessage, MyConsumerService>(
options =>
{
options.UseEntityFrameworkIdempotency<MyMessage, MyDbContext>();
});
services.
.AddCap(x => ...)
.AddSubscribeFilter<BootstrapFilter>();
And finally, the the message tracking DbSet must be added to the DbContext:
public class MyDbContext : DbContext
{
public DbSet<MessageTracking> Messages { get; set; }
...
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.MapMessageTracker();
}
}
Using Ziggurat with MongoDB has some differences compared to SQL Server. The dependency injection registration must call the method UseMongoDbIdempotency
:
builder.Services.AddConsumerService<MyMessage, ConsumerService>(
options => options.UseMongoDbIdempotency("databaseName"));
To keep the consumer operation atomic, is necessary to use the method `StartIdempotentTransaction``:
public class MyMessageConsumerService : IConsumerService<MyMessage>
{
private readonly IMongoClient _client;
public MyMessageConsumerService(IMongoClient client)
{
_client = client;
}
public async Task ProcessMessageAsync(MyMessage message)
{
using var session = _client.StartIdempotentTransaction(message);
// save business object
var collection = _client.GetDatabase("databaseName").GetCollection<SomeEntity>("someEntity");
await collection.InsertOneAsync(session, x);
// must commit transaction
await session.CommitTransactionAsync();
}
}
Since version 8.0.0, Ziggurat has a built-in middleware to log the message processing. It's possible to use it by calling the method UseLoggingMiddleware
:
services
.AddConsumerService<MyMessage, MyConsumerService>(
options =>
{
options.UseLoggingMiddleware<MyMessage>();
});
It's possible to create custom middleware for the consumers.
public class MyMiddleware<TMessage> : IConsumerMiddleware<TMessage>
where TMessage : IMessage
{
public async Task OnExecutingAsync(TMessage message, ConsumerServiceDelegate<TMessage> next)
{
// Do something before
await next(message);
// Do something after
}
}
Also, it's required to register the middleware on the dependency injection configuration.
.AddConsumerService<MyMessage, MyMessageConsumerService>(
options =>
{
options.Use<LoggingMiddleware<MyMessage>>();
});
Important to note that multiple middlewares can be registered to the same consumer. They are executed following the order of the registration.
You can look at the samples folder to see more examples of usage.
The library provides a method to clean old message tracking records. A background service can be added using the extension method AddZigguratCleaner
:
services.AddZigguratCleaner(options => {
options.CleaningInterval = TimeSpan.FromMinutes(15);
options.ExpireAfterInDays = 7;
options.BatchSize = 100_000; // Only works with SQL Server
});
docker compose up -d mongoclustersetup sqlserver
dotnet test