Skip to content

Commit

Permalink
Documented Kafka events
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed Jan 4, 2020
1 parent 87c6bb5 commit c4cf0f4
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 35 deletions.
2 changes: 2 additions & 0 deletions docs/_data/navigation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ docs:
url: /docs/advanced/message-keys
- title: IInboundMessage
url: /docs/advanced/iinboundmessage
- title: Kafka Events
url: /docs/advanced/kafka-events
- title: Default Message Headers
url: /docs/advanced/headers
- title: Extras
Expand Down
5 changes: 5 additions & 0 deletions docs/_docs/0-introduction/003-releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ permalink: /docs/releases
toc: true
---

## [1.2.0](https://github.com/BEagle1984/silverback/releases/tag/1.2.0)

### What's new
* Some new events are published to the internal bus as a consequence to the Kafka events such as partitions assigned or revoked (see [Kafka Events]({{ site.baseurl }}/docs/advanced/kafka-events)) [[#34](https://github.com/BEagle1984/silverback/issues/34)]

## [1.1.0](https://github.com/BEagle1984/silverback/releases/tag/1.1.0)

### What's new
Expand Down
67 changes: 53 additions & 14 deletions docs/_docs/2-configuration/203-inbound.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,16 @@ public void Configure(BusConfigurator busConfigurator)
**Note:** The batch is consider a unit of work: it will be processed in the same DI scope, it will be atomically committed, the error policies will be applied to the batch as a whole and all messages will be acknowledged at once when the batch is successfully processed.
{: .notice--info}

Two additional events are published to the internal bus when batch processing:
Some additional events are published to the internal bus when batch processing:

Event | Description
:-- | :--
`BatchReadyEvent` | Fired when the batch has been filled and is ready to be processed. This event can be subscribed to perform some operations before the messages are processed or to implement all sorts of custom logics, having access to the entire batch.
`BatchStartedEvent` | Fired when the batch has been filled and just before the first message is published. This event can be subscribed to perform some operations before the messages are processed.
`BatchCompleteEvent` | Fired when all the messages in a batch have been published.
`BatchProcessedEvent` | Fired after all messages have been successfully processed. It can tipically be used to commit the transaction.
`BatchAbortedEvent` | Fired when an exception occured during the processing of the batch. It can tipically be used to rollback the transaction.

The usage should be similar to the following example.
The usage should be similar to the following examples.

```c#
public class InventoryService : ISubscriber
Expand All @@ -247,33 +249,70 @@ public class InventoryService : ISubscriber
_db = db;
}

public void OnBatchReady(BatchReadyEvent message)
public void OnBatchStarted(BatchStartedEvent message)
{
_logger.LogInformation(
$"Batch '{message.BatchId} ready " +
$"Processing batch '{message.BatchId} " +
$"({message.BatchSize} messages)");
}

public void OnMessageReceived(InventoryUpdateEvent @event)
{
// Process the event (but don't call SaveChanges)
}

public async Task OnBatchProcessed(BatchProcessedEvent message)
{
// Commit all changes in a single transaction
await _db.SaveChangesAsync();

_logger.LogInformation(
$"Successfully processed batch '{message.BatchId} " +
$"({message.BatchSize} messages)");
}

public void OnBatchAborted(BatchAbortedEvent message)
{
_logger.LogError(
$"An error occurred while processing batch '{message.BatchId} " +
$"({message.BatchSize} messages)");
}
}
```

...or...

```c#
public class InventoryService : ISubscriber
{
private DbContext _db;

public InventoryService(MyDbContext db)
{
_db = db;
}

public void OnBatchStarted(BatchStartedEvent message)
{
}

public async Task OnMessageReceived(
IEnumerable<InventoryUpdateEvent> events)
IReadOnlyCollection<InventoryUpdateEvent> events)
{
_logger.LogInformation(
$"Processing {events.Count} messages");

// Process all items
foreach (var event in events)
{
...
}

// Commit all changes in a single transaction
await _db.SaveChangesAsync();
}

void OnBatchProcessed(BatchProcessedEvent message)
{
_db.SaveChanges();

_logger.LogInformation(
$"Successfully processed batch '{message.BatchId} " +
$"({message.BatchSize} messages)");
$"Successfully processed {events.Count} messages");
}
}
```
Expand Down
2 changes: 1 addition & 1 deletion docs/_docs/3-advanced/303-kafka-key.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: Kafka Message Key (Partitioning)
permalink: /docs/advanced/kafka-message-key
toc: true
toc: false
---

Apache Kafka require a message key for different purposes, such as:
Expand Down
45 changes: 45 additions & 0 deletions docs/_docs/3-advanced/308-kafka-events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
---
title: Kafka Events
permalink: /docs/advanced/kafka-events
toc: false
---

To let you catch the Kafka events (fired for example when the partions are assigned or revoked) they are mapped to some events that are published to the internal bus.

Event | Description
:-- | :--
`KafkaPartitionsAssignedEvent` | The event fired when a new consumer group partition assignment has been received by a consumer. Corresponding to each of this events there will be a `KafkaPartitionsRevokedEvent`.
`KafkaPartitionsRevokedEvent` | The event fired prior to a group partition assignment being revoked. Corresponding to each of this events there will be a `KafkaPartitionsAssignedEvent`.
`KafkaOffsetsCommittedEvent` | The event fired to report the result of the offset commits.
`KafkaErrorEvent` | The event fired when an error is reported by the `Confluent.Kafka.Consumer` (e.g. connection failures or all brokers down). Note that the system (either the Kafka client itself or Silverback) will try to automatically recover from all errors automatically, so these errors have to be considered purely informational.
`KafkaStatisticsEvent` | The event fired when statistics are received. Statistics are provided as a JSON formatted string as defined in the [librdkafka documentation](https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md). You can enable statistics and set the statistics interval using the `StatisticsIntervalMs` configuration parameter (disabled by default).

```c#
public class KafkaEventsSubscriber : ISubscriber
{
public void OnPartitionsAssigned(KafkaPartitionsAssignedEvent message)
{
...
}

public void OnPartitionsRevoked(KafkaPartitionsRevokedEvent message)
{
...
}

public void OnOffsetCommitted(KafkaOffsetsCommittedEvent message)
{
...
}

public void OnError(KafkaErrorEvent message)
{
...
}

public void OnStatisticsReceived(KafkaStatisticsEvent message)
{
...
}
}
```
10 changes: 10 additions & 0 deletions src/Silverback.Integration.Kafka/Messaging/Messages/IKafkaEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright (c) 2019 Sergio Aquilini
// This code is licensed under MIT license (see LICENSE file for details)

namespace Silverback.Messaging.Messages
{
public interface IKafkaEvent : ISilverbackEvent
{

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Silverback.Messaging.Messages
/// Note that the system (either the Kafka client itself or Silverback) will try to automatically recover from
/// all errors automatically, so these errors have to be considered purely informational.
/// </summary>
public class KafkaErrorEvent : ISilverbackEvent
public class KafkaErrorEvent : IKafkaEvent
{
public KafkaErrorEvent(Confluent.Kafka.Error error)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace Silverback.Messaging.Messages
/// </item>
/// </list>
/// </remarks>
public class KafkaOffsetsCommittedEvent : ISilverbackEvent
public class KafkaOffsetsCommittedEvent : IKafkaEvent
{
public KafkaOffsetsCommittedEvent(Confluent.Kafka.CommittedOffsets committedOffsets)
: this(committedOffsets?.Offsets.ToList(), committedOffsets?.Error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Silverback.Messaging.Messages
/// <remarks>
/// Corresponding to each of this events there will be a <see cref="KafkaPartitionsRevokedEvent" />.
/// </remarks>
public class KafkaPartitionsAssignedEvent : ISilverbackEvent
public class KafkaPartitionsAssignedEvent : IKafkaEvent
{
public KafkaPartitionsAssignedEvent(
IReadOnlyCollection<Confluent.Kafka.TopicPartition> partitions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
namespace Silverback.Messaging.Messages
{
/// <summary>
/// The event fired prior to a group partition assignment being revoked. The second parameter provides
/// The event fired prior to a group partition assignment being revoked.
/// </summary>
/// <remarks>
/// Corresponding to each of this events there will be a <see cref="KafkaPartitionsAssignedEvent" />.
/// </remarks>
public class KafkaPartitionsRevokedEvent : ISilverbackEvent
public class KafkaPartitionsRevokedEvent : IKafkaEvent
{
public KafkaPartitionsRevokedEvent(
IReadOnlyCollection<Confluent.Kafka.TopicPartitionOffset> partitions,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) 2019 Sergio Aquilini
// This code is licensed under MIT license (see LICENSE file for details)

namespace Silverback.Messaging.Messages
{
/// <summary>
/// The event fired when statistics are received. Statistics are provided as a JSON formatted string
/// as defined here: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md
/// </summary>
/// <remarks>
/// You can enable statistics and set the statistics interval using the <c>StatisticsIntervalMs</c>
/// configuration parameter (disabled by default).
/// </remarks>
public class KafkaStatisticsEvent : IMessage
{
public KafkaStatisticsEvent(string json)
{
Json = json;
}

/// <summary>
/// Gets the statistics JSON (see https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md)
/// </summary>
public string Json { get; }
}
}

This file was deleted.

0 comments on commit c4cf0f4

Please sign in to comment.