Skip to content

Latest commit

 

History

History
1264 lines (899 loc) · 39.2 KB

reference.md

File metadata and controls

1264 lines (899 loc) · 39.2 KB

Apache Kafka Kafka function reference

The kdb+/Kafka interface is a thin wrapper for kdb+ around the librdkafka C API for Apache Kafka.

.kfk. Kafka interface

System information

Topics

Callback modifications

  • errcbreg Register an error callback associated with a specific client
  • throttlecbreg Register a throttle callback associated with a specific client
  • statcb Statistics callback
  • logcb Logger callback
  • drcb Delivery report callback (publisher only)
  • offsetcb Offset commit callback for use with consumer groups (subscriber only)
  • consumetopic Called for each message received (subscriber only)

Clients (Consumer or Producer)

  • ClientDel Close consumer and destroy Kafka handle to client
  • ClientName Kafka handle name
  • ClientMemberId Client's broker assigned member ID
  • Consumer Create a consumer according to defined configuration
  • Producer Create a producer according to defined configuration
  • SetLoggerLevel Set the maximum logging level for a client

Offsets

  • CommitOffsets Commit offsets on broker for provided partition list
  • PositionOffsets Current offsets for topics and partitions
  • CommittedOffsets Retrieve committed offsets for topics and partitions
  • queryWatermark Query broker for low (oldest/beginning) and high (newest/end) offsets for partition.

Publishing

  • BatchPub Publish a batch of data to a defined topic
  • Pub Publish a message to a defined topic
  • PubWithHeaders Publish a message to a defined topic with a header
  • OutQLen Current out queue length
  • Flush Wait until all outstanding produce requests, et.al, are completed

Subscriptions

  • Dynamic assigned subscriptions
    • Sub Subscribe to a defined topic
    • Subscribe Subscribe from a consumer to a topic with a specified callback
    • Subscription Most recent topic subscription
  • Manual assigned partitions/offsets
    • Assign Create a new assignment from which data will be consumed
    • AssignOffsets Assignment of partitions to consume
    • AssignAdd Add new assignments to the current assignment
    • AssignDel Remove topic partition assignments from the current assignments
    • Assignment Return the current assignment
  • General Subscription Functions
    • Unsub Unsubscribe from a topic

Polling

System information

Metadata

Information about configuration of brokers and topics

.kfk.Metadata id

Where id is a consumer or producer ID, returns a dictionary populated with the following info:

  • orig_broker_id (int): Broker originating this metadata
  • orig_broker_name (symbol): Name of originating broker
  • brokers (list of dictionaries): Info on current brokers
  • topics (list of dictionaries): Info on current topics

When using a consumer, the metadata response information may trigger a re-join if any subscribed topics have changed partition count or existence state.

Example:

q)show producer_meta:.kfk.Metadata producer
orig_broker_id  | 0i
orig_broker_name| `localhost:9092/0
brokers         | ,`id`host`port!(0i;`localhost;9092i)
topics          | (`topic`err`partitions!(`test3;`Success;,`id`err`leader`rep..
q)producer_meta`topics
topic              err     partitions                                        ..
-----------------------------------------------------------------------------..
test               Success ,`id`err`leader`replicas`isrs!(0i;`Success;0i;,0i;..
__consumer_offsets Success (`id`err`leader`replicas`isrs!(0i;`Success;0i;,0i;..

ThreadCount

The number of threads in use by librdkafka

.kfk.ThreadCount[]

returns the number of threads currently in use by librdkafka.

q).kfk.ThreadCount[]
5i

Version

Version of librdkafka (integer)

.kfk.Version

Returns the librdkafka version (integer) used within the interface. Can be parsed as follows (according to the kafka version format)

q).kfk.Version
33751295i
q)"." sv 2 cut -8$"0123456789abcdef" 16 vs .kfk.Version
" 2.03.00.ff"

VersionSym

Version of librdkafka (symbol)

.kfk.VersionSym[]

Returns the librdkafka version (symbol) used within the interface.

q).kfk.VersionSym[]
`1.1.0

Topics

Topic

Create a topic on which messages can be sent

.kfk.Topic[id;topic;cfg]

Where

  • id is a consumer or producer ID
  • topic is a name to be assigned to the topic (symbol)
  • cfg is a user-defined topic configuration (dictionary) for adding/overloading configuration. Both keys and values must be of symbol,string,char or symbol list type. Default: ()!()

returns the topic ID (integer).

q)consumer:.kfk.Consumer[kfk_cfg]
q).kfk.Topic[consumer;`test;()!()]
0i
q).kfk.Topic[consumerl`test1;()!()]
1i

👉 edenhill/librdkafka/CONFIGURATION.md for a list of cfg options. Note: column indicating whether only used by a consumer or producer rather than a topic.

TopicDel

Delete a currently defined topic

.kfk.TopicDel topic

Where topic is a topic ID, deletes the topic and returns a null.

q).kfk.Topic[0i;`test;()!()]
0i
q).kfk.TopicDel[0i]
q)/ topic now no longer available for deletion
q).kfk.TopicDel[0i]
'unknown topic

TopicName

Returns the name of a topic

.kfk.TopicName tpcid

Where tpcid is a topic ID, returns its name as a symbol.

q).kfk.Topic[0i;`test;()!()]
0i
q).kfk.Topic[0i;`test1;()!()]
1i
q).kfk.TopicName[0i]
`test
q).kfk.TopicName[1i]
`test1

Callback Modifications

errcbreg

Register an error callback associated with a specific client

.kfk.errcbreg[clid;callback]

Where

  • clid is a client ID (integer)
  • callback is a ternary function

sets callback to be triggered by errors associated with the client, augments the dictionary .kfk.errclient mapping client ID to callback, and returns a null.

The arguments of callback are:

  • cid: ID of client for which this is called (integer)
  • err_int: error status code in Kafka (integer)
  • reason: error message (string)
q)// Assignment prior to registration of new callback
q)// this is the default behavior invoked
q).kfk.errclient
 |{[cid;err_int;reason]}
q)// Attempt to create a consumer which will fail
q).kfk.Consumer[`metadata.broker.list`group.id!`foobar`0]
0i

q)// Update the default behavior to show the output
q).kfk.errclient[`]:{[cid;err_int;reason]show(cid;err_int;reason);}

q)// Attempt to create another failing consumer
q).kfk.Consumer[`metadata.broker.list`group.id!`foobar`0]
1i
q)1i
-193i
"foobar:9092/bootstrap: Failed to resolve 'foobar:9092': nodename nor servnam..
1i
-187i
"1/1 brokers are down"

q)// Start a new q session and register an error callback for cid 0
q).kfk.errcbreg[0i;{[cid;err_int;reason] show err_int;}]
q)// Attempt to create a consumer that will fail
q).kfk.Consumer[`metadata.broker.list`group.id!`foobar`0]
0i
q)-193i
-187i

throttlecbreg

Register an throttle callback associated with a specific client

.kfk.throttlecbreg[clid;callback]

Where

  • clid is a client ID (integer)
  • callback is a quaternary function

sets callback to be triggered on throttling associated with the client, augments the dictionary .kfk.errclient mapping client ID to callback, and returns a null.

The arguments of callback are:

  • cid: ID (integer) of client for which this is called
  • bname: broker name (string)
  • bid: broker ID (integer)
  • throttle_time: accepted throttle time in milliseconds (integer)
q)// Assignment prior to registration of new callback 
q)// this is the default behavior invoked
q).kfk.throttleclient
 |{[cid;bname;bid;throttle_time]}

q)// Update the default behavior to show the output
q).kfk.throttleclient[`]:{[cid;bname;bid;throttle_time]show(cid;bid);}

q)// Add a throttle client associated specifically with client 0
q).kfk.throttlecbreg[0i;{[cid;bname;bid;throttle_time]show(cid;throttle_time);}]

q)// Display the updated throttle callback logic
q).kfk.throttleclient
 |{[cid;bname;bid;throttle_time]show(cid;bid);}
0|{[cid;bname;bid;throttle_time]show(cid;throttle_time);}

statcb

Statistics callback

.kfk.statcb[j]

Where j is a json string (see .j.k for parsing json in q). For more information on the format of json, see https://github.com/confluentinc/librdkafka/wiki/Statistics.

Override this variable to provide a new definition that suits your needs. Default is to use .kfk.stats to record the last 100 stats.

The statistics callback is triggered every statistics.interval.ms.

logcb

Logger callback (logs from librdkafka)

.kfk.logcb[level;fac;buf]

Where

  • level is log level (integer) as discussed in SetLoggerLevel
  • fac in the msg type (string)
  • buf is the msg detail (string)

should be set to return nil.

Override this variable to provide a new definition that suits your needs. Default is to print to stdout.

drcb

Delivery report callback

.kfk.drcb[cid;msg]

Where

  • cid is the ID (integer) of client for which this is called
  • msg is the message details (as dictionary)

should be set to return nil. For publishing msgs only (i.e. client that is a producer).

Override this variable to provide a new definition that suits your needs. There is no default action.

The callback is called when a message is succesfully produced or if librdkafka encountered a permanent failure. Delivery errors occur when the retry count is exceeded, when the message.timeout.ms timeout is exceeded or there is a permanent error like RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART.

offsetcb

Offset commit callback for use with consumer groups

.kfk.offsetcb[cid;err;offsets]

Where

  • cid is the ID (integer) of client for which this is called
  • err error details (string) e.g. "Success" if ok
  • offsets is a list of dictionaries, each containing details for each commit with the following details
    • topic (symbol)
    • partition (int)
    • offset (long)
    • metadata (string)

should be set to return nil. For subscribers only.

Override this variable to provide a new definition that suits your needs. There is no default action.

The results of automatic or manual offset commits will be scheduled for this callback. If no partitions had valid offsets to commit this callback will be called with err == RD_KAFKA_RESP_ERR__NO_OFFSET which is not to be considered an error.

The offsets list contains per-partition information:

  • offset: committed offset (attempted)
  • err: commit error

consumetopic

Main unary function called on consumption of data for both default and per topic callback. Called for each message received.

.kfk.consumetopic msg

Where msg is the content of a message received (as dictionary) from any calls to the subscription on the topic.

Override this variable to provide a new definition that suits your needs. There is no default action.

e.g. setting the callback to print each msg

.kfk.consumetopic[`]:{[msg]show msg;};

will print as message as follows:

mtype    | `
topic    | `test1
client   | 0i
partition| 0i
offset   | 1803
msgtime  | 2024.04.23D16:17:52.403000000
data     | 0x323032342e30342e32334431363a31373a35322e333938373134303030
key      | `byte$()
headers  | (`symbol$())!`symbol$()

Clients

The following functions relate to the creation of consumers and producers and their manipulation/interrogation.

ClientDel

Close a consumer and destroy the associated Kafka handle to client

.kfk.ClientDel clid

Where clid is the client to be deleted (integer), returns null on successful deletion. If client unknown, signals 'unknown client.

This call will block until the consumer has revoked its assignment, committed offsets to broker, and left the consumer group (if applicable). The maximum blocking time is roughly limited to session.timeout.ms.

/Client exists
q).kfk.ClientName 0i
`rdkafka#consumer-1
q).kfk.ClientDel 0i
q).kfk.ClientName 0i
'unknown client
/Client can no longer be deleted
q).kfk.ClientDel 0i
'unknown client

ClientMemberId

Returns this client's broker-assigned group member id

.kfk.ClientMemberId clid

Where clid is a client ID (integer), returns the member ID (symbol) assigned to the client, or signals unknown client.

⚠️ Consumer processes only

This function should be called only on a consumer process. This is an external limitation.

q).kfk.ClientMemberId 0i
`rdkafka-881f3ee6-369b-488a-b6b2-c404d45ebc7c
q).kfk.ClientMemberId 1i
'unknown client

ClientName

Kafka handle name

.kfk.ClientName clid

Where clid is a client ID (integer), returns assigned client name (symbol) or signals unknown client.

q).kfk.ClientName 0i
`rdkafka#producer-1
/Client removed
q).kfk.ClientName 1i
'unknown client

Consumer

Create a consumer according to user-defined configuration

.kfk.Consumer cfg

Where cfg is a dictionary for adding/overloading configuration. Both keys and values must be of symbol,string,char or symbol list type. Returns the ID of the consumer as an integer.

q)kfk_cfg
metadata.broker.list  | localhost:9092
group.id              | 0
queue.buffering.max.ms| 1
fetch.wait.max.ms     | 10
statistics.interval.ms| 10000
q).kfk.Consumer kfk_cfg
0i

👉 edenhill/librdkafka/CONFIGURATION.md for a list of cfg options (note: column indicating whether consumer/producer/both).

Producer

Create a producer according to user-defined configuration

.kfk.Producer cfg

Where cfg is a dictionary for adding/overloading configuration. Both keys and values must be of symbol,string,char or symbol list type. Returns the ID of the producer as an integer.

q)kfk_cfg
metadata.broker.list  | localhost:9092
statistics.interval.ms| 10000
queue.buffering.max.ms| 1
fetch.wait.max.ms     | 10
q).kfk.Producer kfk_cfg
0i

👉 edenhill/librdkafka/CONFIGURATION.md for a list of cfg options (note: column indicating whether consumer/producer/both).

SetLoggerLevel

Specifies the maximum logging level emitted by internal kafka logging and debugging

.kfk.SetLoggerLevel[clid;level]

Where

  • clid is the client ID (int)
  • level is the syslog severity level (int/long/short). Log levels are
    • 0 (emergency)
    • 1 (alert)
    • 2 (critical)
    • 3 (error)
    • 4 (warning)
    • 5 (notice)
    • 6 (info)
    • 7 (debug)

returns a null on successful application of function.

If the "debug" configuration property is set the log level is automatically adjusted to 7 (debug).

q)show client
0i
q).kfk.SetLoggerLevel[client;7]

Offset functionality

The following functions relate to use of offsets within the API to ensure records are read correctly from the broker.

:note: Multiple topic offset assignment

As of v1.4.0 offset functionality can now handle calls associated with multiple topics without overwriting previous definitions. To apply the functionality this must be called for each topic.

CommitOffsets

Commit offsets on broker for provided partitions and offsets

.kfk.CommitOffsets[clid;topic;part_offsets;block_commit]

Where

  • clid is a consumer client ID (int)
  • topic is a topic (symbol)
  • part_offsets is a dictionary of partitions(ints) and last received offsets (longs). The offset should be the offset where consumption will resume i.e. the last processed offset + 1
  • block_commit is whether commit will block until offset commit is complete (boolean)

returns a null on successful commit of offsets.

PositionOffsets

Current offsets for particular topics and partitions

.kfk.PositionOffsets[clid;topic;part_offsets]

Where

  • clid is the consumer ID (int)
  • topic is the topic (symbol)
  • part_offsets is a list of partitions (int, short, or long), or a dictionary of partitions (int) and offsets (long)

returns a table containing the current offset and partition for the topic of interest. The offset field of each requested partition will be set to the offset of the last consumed message + 1, or .kfk.OFFSET.INVALID (-1001) if there was no previous message.

In this context the last consumed message is the offset consumed by the current librdkafka instance and, in case of rebalancing, not necessarily the last message fetched from the partition.

q)client:.kfk.Consumer kfk_cfg
q)TOPIC:`test
q)show seen:exec last offset by partition from data
0|0
// dictionary input
q).kfk.PositionOffsets[client;TOPIC;seen]
topic partition offset metadata
-------------------------------
test  0         26482  ""
// int list input
q).kfk.PositionOffsets[client;TOPIC;0 1i]
topic partition offset metadata
-------------------------------
test  0         26482  ""
test  1         -1001  ""
// long list input
q).kfk.PositionOffsets[client;TOPIC;0 1 2]
topic partition offset metadata
-------------------------------
test  0         26482  ""
test  1         -1001  ""
test  2         -1001  ""

CommittedOffsets

Retrieve the last-committed offset for a topic on a particular partition

.kfk.CommittedOffsets[clid;topic;part_offsets]

Where

  • clid is a consumer ID (integer)
  • topic is a topic (symbol)
  • part_offsets is a list of partitions (int, short, or long), or a dictionary of partitions (int) and offsets (long)

returns a table containing the offset for a particular partition for a topic.

Committed offsets will be returned according to the isolation.level configuration property, if set to read_committed (default) then only stable offsets for fully committed transactions will be returned, while read_uncommitted may return offsets for not yet committed transactions.

Offset will be set to .kfk.OFFSET.INVALID (-1001) if there was no stored offset for that partition.

q)client:.kfk.Consumer[kfk_cfg];
q)TOPIC:`test
q)show seen:exec last offset by partition from data;
0|0
// dictionary input
q).kfk.CommittedOffsets[client;TOPIC;seen]
topic partition offset metadata
-------------------------------
test  0         26481  ""
// integer list input
q).kfk.CommittedOffsets[client;TOPIC;0 1i]
topic partition offset metadata
-------------------------------
test  0         26481  ""
test  1         -1001  ""
// long list input
q).kfk.CommittedOffsets[client;TOPIC;0 1]
topic partition offset metadata
-------------------------------
test  0         26481  ""
test  1         -1001  ""

queryWatermark

Query broker for low (oldest/beginning) and high (newest/end) offsets for partition

.kfk.queryWatermark[client;`test1;0;1000]

Where

  • clid is a consumer ID (integer)
  • topic is a topic (symbol)
  • partition partition (long)
  • timeout timeout in ms (long)

returns 2 element long for low/high watermark

Publishing functionality

BatchPub

Publish a batch of messages to a defined topic

.kfk.BatchPub[tpcid;partid;data;keys]

Where

  • tpcid is the topic (previously created) to be published on (integer)
  • partid is the target partition/s (integer atom or list). If partition is .kfk.PARTITION_UA the configured partitioner will be run for each message (slower), otherwise the messages will be enqueued to the specified partition directly (faster).
  • data is a mixed list payload containing either bytes or strings
  • keys is an empty string for auto key on all messages or a key per message as a mixed list of bytes or strings

returns an integer list denoting the status for each message (zero indicating success)

q)batchMsg :("test message 1";"test message 2")
q)batchKeys:("Key 1";"Key 2")

q)// Send two messages to any partition using default key
q).kfk.BatchPub[;.kfk.PARTITION_UA;batchMsg;""]each(topic1;topic2)
0 0
0 0

q)// Send 2 messages to partition 0 for each topic using default key
q).kfk.BatchPub[;0i;batchMsg;""]each(topic1;topic2)
0 0
0 0

q)// Send 2 messages the first to separate partitions using generated keys
q).kfk.BatchPub[;0 1i;batchMsg;batchKeys]each(topic1;topic2)
0 0
0 0

Pub

Publish a message to a defined topic

.kfk.Pub[tpcid;partid;data;keys]

Where

  • tpcid is the topic to be published on (integer)
  • partid is the target partition (integer). Can be .kfk.PARTITION_UA (unassigned) for automatic partitioning using the topic's partitioner function, or a fixed partition (0..N)
  • data is the payload to be published (string)
  • keys is the message key (string) to be passed with the message to the partition

returns a null on successful publication.

This is an asynchronous non-blocking API. See drcb (delivery report callback) on how to setup a callback to be called once the delivery status (success or failure) is known.

Since producing is asynchronous, you should call Flush before you destroy the producer. Otherwise, any outstanding messages will be silently discarded.

When temporary errors occur, librdkafka automatically retries to produce the messages. Retries are triggered after retry.backoff.ms and when the leader broker for the given partition is available. Otherwise, librdkafka falls back to polling the topic metadata to monitor when a new leader is elected (see the topic.metadata.refresh.fast.interval.ms and topic.metadata.refresh.interval.ms configurations) and then performs a retry. A delivery error will occur if the message could not be produced within message.timeout.ms.

q)producer:.kfk.Producer kfk_cfg
q)test_topic:.kfk.Topic[producer;`test;()!()]
/ partition set as -1i denotes an unassigned partition
q).kfk.Pub[test_topic;-1i;string .z.p;""]
q).kfk.Pub[test_topic;-1i;string .z.p;"test_key"]

PubWithHeaders

Publish a message to a defined topic, with an associated header

.kfk.PubWithHeader[clid;tpcid;partid;data;keys;hdrs]

Where

  • clid is a target client ID (integer)
  • tpcid is the topic to be published on (integer)
  • partid is the target partition (integer)
  • data is the payload to be published (string)
  • keys is the message key (string) to be passed with the message to the partition
  • hdrs is a dictionary mapping a header name (symbol) to a byte array or string

returns a null on successful publication; errors if version conditions not met.

See Pub for further details of how messages are published.

// Create an appropriate producer
producer:.kfk.Producer kfk_cfg

// Create a topic
test_topic:.kfk.Topic[producer;`test;()!()]

// Define the target partition as unassigned
part:-1i

// Define an appropriate payload
payload:string .z.p

// Define the headers to be added
hdrs:`header1`header2!("test1";"test2")

// Publish a message to client #0 with a header but no key
.kfk.PubWithHeaders[0i;test_topic;part;payload;"";hdrs]

// Publish a message to client #1 with headers and a key
.kfk.PubWithHeaders[1i;test_topic;part;payload;"test_key";hdrs]

Support for functionality

This functionality is only available for versions of librdkafka ≥ 0.11.4

OutQLen

Current number of messages that are queued for publishing

.kfk.OutQLen prid

Where prid is the integer value of the producer which we wish to check the number of queued messages, returns as an int the number of messages in the queue.

The out queue length is the sum of

  • number of messages waiting to be sent to, or acknowledged by, the broker
  • number of delivery reports waiting to be served
  • number of callbacks waiting to be served
  • number of events waiting to be served by background queue

An application should wait for the return value of this function to reach zero before terminating to make sure outstanding messages, requests (such as offset commits), callbacks and events are fully processed. See Flush.

q).kfk.OutQLen producer
5i

Flush

Wait until all outstanding produce requests, et.al, are completed

.kfk.Flush[prid;timeout]

Where

  • prid is the integer value of the producer (integer)
  • timeout is the timeout in milliseconds (short/integer/long)

returns a null if all events completed.

This should typically be done prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating. This function may trigger callbacks.

The linger.ms time will be ignored for the duration of the call, queued messages will be sent to the broker as soon as possible.

Relates to []OutQLen](#outqlen).

Subscriptions

⚠️ Mixing Manual and Dynamic Assignments

It isn't possible to mix manual partition assignment (i.e. using assign) with dynamic partition assignment through topic subscription

Dynamic

Used when subscribing to the topics we were interested in and let Kafka dynamically assign a fair share of the partitions for those topics based on the active consumers in the group.

Membership in a consumer group is maintained dynamically: if a process fails, the partitions assigned to it will be reassigned to other consumers in the same group.

The subscribe method is not incremental: you must include the full list of topics that you want to consume from.

:note: Rebalance

A rebalance operation occurs if any one of the following events are triggered:

  • Number of partitions change for any of the subscribed topics
  • A subscribed topic is created or deleted
  • An existing member of the consumer group is shutdown or fails
  • A new member is added to the consumer group

Group rebalances will cause partition offsets to be reset (e.g. application of auto.offset.reset setting)

Sub

High level subscription from a consumer process to a topic

.kfk.Sub[clid;topic;partid]

Where

  • clid is the client ID (integer)
  • topic is the topic/s being subscribed to (symbol atom – or list, for v1.6+)
  • partid is the target partition (enlisted integer) (UNUSED)

returns a null on successful execution. Received msgs are then consumed via a call to the registered consumetopic) function.

The full topic list is retrieved every topic.metadata.refresh.interval.ms to pick up new or delete topics that match the subscription. If there is any change to the matched topics the consumer will immediately rejoin the group with the updated set of subscribed topics.

This is an asynchronous method which returns immediately: background threads will (re)join the group, wait for group rebalance, assign() the assigned partitions, and then start fetching messages. This cycle may take up to session.timeout.ms * 2 or more to complete.

The msg callback (defaults to consumetopic) can return a consumer error UNKNOWN_TOPIC_OR_PART for non-existent topics, and TOPIC_AUTHORIZATION_FAILED for unauthorized topics e.g.

mtype                 topic client partition offset msgtime                       data                                                                        key      headers                 rcvtime                      
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
UNKNOWN_TOPIC_OR_PART test1 0      0         -1001                                "Subscribed topic not available: test1: Broker: Unknown topic or partition" `byte$() (`symbol$())!`symbol$() 2024.04.23D14:04:03.584293000

Subscribing in advance Subscriptions can be made to topics that do not currently exist.

Multiple subscriptions As of v1.4.0 multiple calls to .kfk.Sub for a given client will allow for consumption from multiple topics rather than overwriting the subscribed topic, although each addition will cause a rebalance.

⚠️ Partition ID

The parameter partid is a legacy argument to the function and with recent versions of librdkafka does not have any effect on the subscription. On subscription Kafka handles organisation of consumers based on the active members of a group.id to efficiently distribute consumption amongst the group.

q)client:.kfk.Consumer[kfk_cfg]
q).kfk.PARTITION_UA // subscription defined to be to an unassigned partition
-1i
// List of topics to be subscribed to
q).kfk.Sub[client;(topic1;topic2);enlist .kfk.PARTITION_UA]

Subscribe

Subscribe from a consumer to a topic with a specified callback

.kfk.Subscribe[clid;topic;partid;callback]

Where

  • clid is the client ID (integer)
  • topic is the topic/s being subscribed to (symbol atom – or list, for v1.6+)
  • partid is the target partition (enlisted integer) (UNUSED)
  • callback is a callback function defined related to the subscribed topic. This function should take as input a single parameter, the content of a message received from any calls to the subscription on the topic.

returns a null on successful execution. As per Sub but augments .kfk.consumetopic with a new callback function for the consumer.

⚠️ Partition ID

The parameter partid is a legacy argument to the function and with recent versions of librdkafka does not have any effect on the subscription. On subscription Kafka handles organization of consumers based on the active members of a group.id to efficiently distribute consumption among the group.

q)// create a client with a user created config kfk_cfg
q)client:.kfk.Consumer kfk_cfg
q)// Subscription consumes from any available partition
q)part:.kfk.PARTITION_UA 
q)// List of topics to be subscribed to
q)topicname:`test
q)// Display consumer callbacks prior to new subscription
q).kfk.consumetopic
     | {[msg]}
q).kfk.Subscribe[client;topicname;enlist part;{[msg]show msg;}]
q)// Display consumer callbacks following invocation of Subscribe
q).kfk.consumetopic
    | {[msg]}
test| {[msg]show msg;}

Consume callbacks

The addition of callbacks specific to a topic was added in v1.5.0 a call of .kfk.Subscribe augments the dictionary .kfk.consumetopic where the key maps topic name to the callback function in question. A check for a custom callback is made on each call to .kfk.consumecb following v1.5.0. If an appropriate key is found the associated callback will be invoked. The default callback can be modified via modification of .kfk.consumetopic[`]

Subscription

Most-recent subscription to a topic

.kfk.Subscription clid

Where clid is the client ID (integer) which the subscription is being requested for, returns a table with the topic, partition, offset and metadata of the most recent subscription.

q)client:.kfk.Consumer kfk_cfg
q).kfk.Sub[client;`test2;enlist -1i]
q).kfk.Subscription client
topic partition offset metadata
-------------------------------
test2 -1        -1001  ""

Manual

Permits the user to be in full control of consumption of messages from their choosen topic partition and offset.

Dynamic partition assignment and consumer group coordination will be disabled on use.

The assign methods are not incremental: you must include the full list of topics that you want to consume from.

Manual partition assignment does not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer acts independently even if it shares a groupId with another consumer. To avoid offset commit conflicts, you should usually ensure that the groupId is unique for each consumer instance.

Assign

Create a new assignment from which to consume data; remove previous assignments.

.kfk.Assign[clid;tpc_part]

Where

  • clid is an integer denoting the client ID which the assignment is to applied
  • tpc_part is a dictionary mapping topic names (symbol) to partitions (long), or from v.1.6 onwards a Symbol!Dictionary mapping of topic to partitions/offset (dictionary mapping of integer partition to long offset location)

returns a null on successful execution. Received msgs are then consumed via a call to the registered consumetopic) function.

// subscribe to test (partition 0) and test1 (partition 1)
.kfk.Assign[cid;`test`test1!0 1]

// subscribe to test1 (partition 0 from offset 10) and test2 
// (partition 0 from offset 10)
.kfk.Assign[client;`test1`test2!(((1#0i)!1#10);((1#0i)!1#10))]

AssignOffsets

Assign partitions to be consumed.

.kfk.AssignOffsets[clid;topic;part_offsets]

Where

  • clid is the consumer ID (integer)
  • topic is the topic (symbol)
  • part_offsets is a dictionary of partitions and where to start consuming them

returns a null on successful execution. Received msgs are then consumed via a call to the registered consumetopic) function.

If previous assignments for a different topic/partition have already been communicated to the Kafka infrastructure, these assignments will be reapplied.

From v1.6, .kfk.Assign is preferred for assigning multiple offsets/topics.

q).kfk.OFFSET.END   // start consumption at end of partition
-1
q).kfk.OFFSET.BEGINNING // start consumption at start of partition
-2
q).kfk.AssignOffsets[client;TOPIC;(1#0i)!1#.kfk.OFFSET.END]

Last-committed offset

In the above examples an offset of .kfk.OFFSET.INVALID (-1001) is a special value. It indicates the offset could not be determined and the consumer will read from the last-committed offset once one becomes available.

AssignAdd

Add additional topic-partition pairs to the current assignment.

.kfk.Assign[clid;tpc_part]

Where

  • clid is the client ID (integer)
  • tpc_part is a dictionary mapping topic names (symbol) to partitions (long), to be added to the current assignment

returns a null on successful execution; will display inappropriate assignments if necessary

From v1.6, .kfk.Assign is preferred for assigning multiple offsets/topics.

If previous assignments have already been communicated to the Kafka infrastructure, these assignments will be reapplied.

q)// Create a new assignment
q).kfk.Assign[cid;`test`test1!0 0]

q)// Retrieve the current assignment
q).kfk.Assignment[cid]
topic partition offset metadata
-------------------------------
test1 0         -1001  ""      
test2 0         -1001  ""      

q)// Add new assignments to the current assignment
q).kfk.AssignAdd[cid;`test`test1!1 1]

q)// Retrieve the current assignment
q).kfk.Assignment[cid]
topic partition offset metadata
-------------------------------
test  1         -1001  ""      
test1 0         -1001  ""      
test1 1         -1001  ""      
test2 0         -1001  ""      

q)// Attempt to assign an already assigned topic partition pair
q).kfk.AssignAdd[cid;`test`test1!1 1]
`test  1
`test1 1
'The above topic-partition pairs already exist, please modify dictionary

AssignDel

Delete a set of topic-partition pairs from the current assignment.

.kfk.AssignDel[clid;tpc_part]

Where

  • clid is a client ID (integer)
  • tpc_part is a dictionary mapping topic names (symbol) to partitions (long)

removes the topic-partition pairs and returns a null; will display inappropriate assignment deletion if necessary.

From v1.6, .kfk.Assign is preferred for assigning multiple offsets/topics.

If previous assignments have already been communicated to the Kafka infrastructure, these remaining assignments will be reapplied

q)// Create a new assignment
q).kfk.Assign[cid;`test`test`test1`test1!0 1 0 1]

q)// Retrieve the current assignment
q).kfk.Assignment[cid]
topic partition offset metadata
-------------------------------
test  0         -1001  ""
test  1         -1001  ""
test1 0         -1001  ""
test1 1         -1001  ""

q)// Add new assignments to the current assignment
q).kfk.AssignDel[cid;`test`test1!1 1]

q)// Retrieve the current assignment
q).kfk.Assignment[cid]
topic partition offset metadata
-------------------------------
test  0         -1001  ""
test1 0         -1001  ""

q)// Attempt to assign an already unassigned topic partition pair
q).kfk.AssignDel[cid;`test`test1!1 1]
`test  1
`test1 1
'The above topic-partition pairs cannot be deleted as they are not assigned

Assignment

Retrieve the current assignment for a specified client

.kfk.Assignment clid

Where clid is a client ID, returns a list of dictionaries describing the current assignment for it.

q)// Attempt to retrieve assignment without a current assignment
q).kfk.Assignment cid 
topic partition offset metadata
-------------------------------

q)// Create a new assignment
q).kfk.Assign[cid;`test`test1!0 1]

q)// Retrieve the new current assignment
q).kfk.Assignment[cid]
topic partition offset metadata
-------------------------------
test  0         -1001  ""
test1 1         -1001  ""

General Subscription Functions

Unsub

Unsubscribe from all topics associated with Client regardless of whether created via manual or dynamic assigned subscriptions

.kfk.Unsub clid

Where clid is a client ID (integer), unsubscribes it from all topics and returns a null; signals an error if client is unknown.

q).kfk.Unsub[0i]
q).kfk.Unsub[1i]
'unknown client

Polling

MaxMsgsPerPoll

Set the maximum number of messages per poll

.kfk.MaxMsgsPerPoll max_messages

Where max_messages is the maximum number of messages (integer) per poll returns the set limit.

q).kfk.MaxMsgsPerPoll 100
100

Upper limit set by .kfk.MaxMsgsPerPoll vs max_messages in .kfk.Poll

The argument max_messages passed to .kfk.Poll is preferred to the global limit of maximum number of messages set by .kfk.MaxMsgPerPoll. The latter limit is used only when max_messages passed to .kfk.Poll is 0.

Poll

Manually poll the messages from the message feed

.kfk.Poll[cid;timeout;max_messages]

Where

  • cid is a client ID (integer)
  • timeout is max time in ms to block the process (long)
  • max_messages is the max number of messages to be polled (long)

returns the number of messages polled within the allotted time.

q).kfk.Poll[0i;5;100]
0
q).kfk.Poll[0i;100;100]
10