KafkaNIO is a library to interact with Apache Kafka from the Swift Server ecosystem.
It's built on top of Swift-NIO and implements the Kafka binary protocol.
This project is still in a very early state and not ready for production use, or even to interact with production clusters.
To install KafkaNIO, add the package as a dependency in your Package.swift.
dependencies: [
.package(url: "https://github.com/tbartelmess/kafka-nio.git", .upToNextMinor(from: "0.0.1"))
]
The Consumer API allows fetching records from a Kafka broker. The main consumer API is poll()
, which returns an EventLoopFuture
that gets fulfilled when all brokers that hold partitions for the subscribed topics return.
To connect to a Kafka Cluster, at least one address for a broker is required, from this broker the rest of the cluster state will be discovered. It is possible to provide a list of "bootstrap servers", when a broker is not available the client will attempt to bootstrap using the next cluster in the list.
import NIO
import KafkaNIO
// Create a socket address of a Kafka Server
let bootstrapServer = try SocketAddress.makeAddressResolvingHost(String("my-kafka-host"), port: 9092)
// Create an EventLoopGroup for the consumer
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
let consumer = try Consumer.connect(configuration: .init(bootstrapServers: [bootstrapServer],
subscribedTopics: ["my-topic"],
groupID: "my-group-id",
sessionTimeout: 10000,
rebalanceTimeout: 5000),
eventLoopGroup: eventLoopGroup).wait()
// Call setup, this will bootstrap the cluster and set up the subscription
try consumer.setup().wait()
// Poll for records
try consumer.poll().wait()
KIP | Status |
---|---|
KIP-394: Require member.id for initial join group request | Supported. |
All Kafka versions starting at 0.8 are supported. Some features are not available on some version of Kafka.
The integration test suite runs tests on the following versions:
- 0.10.2.2
- 0.11.0.3
- 1.1.1
- 2.4.0
- 2.5.0