Skip to content

Commit

Permalink
Merge pull request #487 from Argonus/kayrock-describe-groups
Browse files Browse the repository at this point in the history
[Kayrock] Add describe consumer groups endpoint as kayrock
  • Loading branch information
Argonus authored Mar 11, 2024
2 parents fe1fb30 + de40fb4 commit 0f58f22
Show file tree
Hide file tree
Showing 47 changed files with 1,374 additions and 289 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:

jobs:
setup:
name: test / setup
name: test | setup dependencies
runs-on: ubuntu-20.04
env:
MIX_ENV: test
Expand Down
94 changes: 94 additions & 0 deletions docker-compose-arm.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
version: '3.9'

services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.4.arm64
restart: unless-stopped
ports:
- '32181:32181'
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
healthcheck:
test: "zookeeper-shell 127.0.01:32181 ls /"
interval: 10s
timeout: 10s
retries: 5

kafka-1:
image: confluentinc/cp-kafka:7.0.4.arm64
ports:
- '9092:9092'
depends_on:
zookeeper:
condition: service_healthy
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
healthcheck:
test: kafka-topics --bootstrap-server kafka-1:29092 --list
interval: 30s
timeout: 10s
retries: 4
volumes:
- ./ssl/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks:ro,z
- ./ssl/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks:ro,z

kafka-2:
image: confluentinc/cp-kafka:7.0.4.arm64
ports:
- '9093:9093'
depends_on:
zookeeper:
condition: service_healthy
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093
healthcheck:
test: kafka-topics --bootstrap-server kafka-2:29093 --list
interval: 30s
timeout: 10s
retries: 4
volumes:
- ./ssl/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks:ro,z
- ./ssl/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks:ro,z

kafka-3:
image: confluentinc/cp-kafka:7.0.4.arm64
ports:
- '9094:9094'
depends_on:
zookeeper:
condition: service_healthy
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29094,EXTERNAL://localhost:9094
healthcheck:
test: kafka-topics --bootstrap-server kafka-2:29093 --list
interval: 30s
timeout: 10s
retries: 4
volumes:
- ./ssl/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks:ro,z
- ./ssl/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks:ro,z

kafka_setup:
image: confluentinc/cp-kafka:7.0.4.arm64
depends_on:
zookeeper:
condition: service_healthy
kafka-1:
condition: service_healthy
kafka-2:
condition: service_healthy
kafka-3:
condition: service_healthy
command: "bash -c 'kafka-topics --zookeeper zookeeper:32181 --create --if-not-exists --partitions 4 --replication-factor 2 --topic consumer_group_implementation_test && \
kafka-topics --zookeeper zookeeper:32181 --create --if-not-exists --partitions 4 --replication-factor 2 --topic test0p8p0 && \
kafka-topics --zookeeper zookeeper:32181 --list'"
16 changes: 16 additions & 0 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,22 @@ defmodule KafkaEx do
Server.call(worker, :consumer_group)
end

@doc """
Sends a request to describe a group identified by its name.
We support only one consumer group per request for now, as we don't
group requests by group coordinator.
This is a new client implementation, and is not compatible with the old clients
"""
@spec describe_group(binary, Keyword.t()) :: {:ok, any} | {:error, any}
def describe_group(consumer_group_name, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker())

case Server.call(worker_name, {:describe_groups, [consumer_group_name]}) do
{:ok, [group]} -> {:ok, group}
{:error, error} -> {:error, error}
end
end

@doc """
Sends a request to join a consumer group.
"""
Expand Down
72 changes: 72 additions & 0 deletions lib/kafka_ex/new/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ defmodule KafkaEx.New.Client do
alias KafkaEx.Config
alias KafkaEx.NetworkClient

alias KafkaEx.New.Client.RequestBuilder
alias KafkaEx.New.Client.ResponseParser
alias KafkaEx.New.Structs.Broker
alias KafkaEx.New.Structs.ClusterMetadata
alias KafkaEx.New.Structs.NodeSelector
Expand Down Expand Up @@ -163,6 +165,16 @@ defmodule KafkaEx.New.Client do
{:reply, {:ok, topic_metadata}, updated_state}
end

def handle_call({:describe_groups, [consumer_group_name]}, _from, state) do
if KafkaEx.valid_consumer_group?(consumer_group_name) do
{response, updated_state} = describe_group_request(consumer_group_name, state)

{:reply, response, updated_state}
else
{:reply, {:error, :invalid_consumer_group}, state}
end
end

def handle_call({:kayrock_request, request, node_selector}, _from, state) do
{response, updated_state} = kayrock_network_request(request, node_selector, state)

Expand Down Expand Up @@ -245,6 +257,66 @@ defmodule KafkaEx.New.Client do
end
end

defp describe_group_request(consumer_group_name, state) do
node_selector = NodeSelector.consumer_group(consumer_group_name)

[consumer_group_name]
|> RequestBuilder.describe_groups_request(state)
|> handle_describe_group_request(node_selector, state)
end

defp handle_describe_group_request(
_,
_,
_,
retry_count \\ @retry_count,
_last_error \\ nil
)

defp handle_describe_group_request(_, _, state, 0, last_error) do
{{:error, last_error}, state}
end

defp handle_describe_group_request(
request,
node_selector,
state,
retry_count,
_last_error
) do
case kayrock_network_request(request, node_selector, state) do
{{:ok, response}, state_out} ->
case ResponseParser.describe_groups_response(response) do
{:ok, consumer_groups} ->
{{:ok, consumer_groups}, state_out}

{:error, [error | _]} ->
Logger.warn(
"Unable to fetch consumer group metadata for #{inspect(request.group_ids)}"
)

handle_describe_group_request(
request,
node_selector,
state,
retry_count - 1,
error
)
end

{_, _state_out} ->
Logger.warn("Unable to fetch consumer group metadata for #{inspect(request.group_ids)}")

handle_describe_group_request(
request,
node_selector,
state,
retry_count - 1,
:unknown
)
end
end

defp maybe_connect_broker(broker, state) do
case Broker.connected?(broker) do
true ->
Expand Down
34 changes: 34 additions & 0 deletions lib/kafka_ex/new/client/request_builder.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule KafkaEx.New.Client.RequestBuilder do
@moduledoc """
This module is used to build request for KafkaEx.New.Client.
It's main decision point which protocol to use for building request and what
is required version.
"""
@protocol Application.compile_env(
:kafka_ex,
:protocol,
KafkaEx.New.Protocols.KayrockProtocol
)

@default_api_version %{
describe_groups: 1
}

alias KafkaEx.New.Client.State

@doc """
Builds request for Describe Groups API
"""
@spec describe_groups_request([binary], State.t()) :: term
def describe_groups_request(group_names, state) do
api_version = get_api_version(state, :describe_groups)

@protocol.build_request(:describe_groups, api_version, group_names: group_names)
end

# -----------------------------------------------------------------------------
defp get_api_version(state, request_type) do
default = Map.fetch!(@default_api_version, request_type)
State.max_supported_api_version(state, request_type, default)
end
end
22 changes: 22 additions & 0 deletions lib/kafka_ex/new/client/response_parser.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule KafkaEx.New.Client.ResponseParser do
@moduledoc """
This module is used to parse response from KafkaEx.New.Client.
It's main decision point which protocol to use for parsing response
"""
alias KafkaEx.New.Structs.ConsumerGroup

@protocol Application.compile_env(
:kafka_ex,
:protocol,
KafkaEx.New.Protocols.KayrockProtocol
)

@doc """
Parses response for Describe Groups API
"""
@spec describe_groups_response(term) ::
{:ok, [ConsumerGroup.t()]} | {:error, term}
def describe_groups_response(response) do
@protocol.parse_response(:describe_groups, response)
end
end
17 changes: 16 additions & 1 deletion lib/kafka_ex/new/kafka_ex_api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule KafkaEx.New.KafkaExAPI do

alias KafkaEx.New.Client
alias KafkaEx.New.Structs.ClusterMetadata
alias KafkaEx.New.Structs.ConsumerGroup
alias KafkaEx.New.Structs.Topic
alias KafkaEx.New.Structs.NodeSelector

Expand Down Expand Up @@ -56,6 +57,20 @@ defmodule KafkaEx.New.KafkaExAPI do
end
end

@doc """
Sends a request to describe a group identified by its name.
We support only one consumer group per request for now, as we don't
group requests by group coordinator.
"""
@spec describe_group(client, Keyword.t()) ::
{:ok, ConsumerGroup.t()} | {:error, any}
def describe_group(client, consumer_group_name) do
case GenServer.call(client, {:describe_groups, [consumer_group_name]}) do
{:ok, [group]} -> {:ok, group}
{:error, error} -> {:error, error}
end
end

@doc """
Get topic metadata for the given topics
Expand All @@ -73,7 +88,7 @@ defmodule KafkaEx.New.KafkaExAPI do
Returns the cluster metadata from the given client
"""
@spec cluster_metadata(client) :: {:ok, ClusterMetadata.t()}
def(cluster_metadata(client)) do
def cluster_metadata(client) do
GenServer.call(client, :cluster_metadata)
end

Expand Down
25 changes: 25 additions & 0 deletions lib/kafka_ex/new/protocols/kayrock/describe_groups.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
defprotocol KafkaEx.New.Protocols.Kayrock.DescribeGroups do

Check warning on line 1 in lib/kafka_ex/new/protocols/kayrock/describe_groups.ex

View workflow job for this annotation

GitHub Actions / runner / Test (1.14, 24.3)

protocols must define at least one function, but none was defined
@moduledoc """
This module handles Describe Groups request & response parsing.
Request is built using Kayrock protocol, response is parsed to
native KafkaEx structs.
"""

defprotocol Request do
@moduledoc """
This protocol is used to build Describe Groups request
"""
@spec build_request(t(), [binary]) :: t()
def build_request(request, consumer_group_names)
end

defprotocol Response do
@moduledoc """
This protocol is used to parse Describe Groups response
"""
alias KafkaEx.New.Structs.ConsumerGroup

@spec parse_response(t()) :: {:ok, [ConsumerGroup.t()]} | {:error, term}
def parse_response(response)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defimpl KafkaEx.New.Protocols.Kayrock.DescribeGroups.Request,
for: [Kayrock.DescribeGroups.V0.Request, Kayrock.DescribeGroups.V1.Request] do
def build_request(request_template, consumer_group_names) do
Map.put(request_template, :group_ids, consumer_group_names)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defimpl KafkaEx.New.Protocols.Kayrock.DescribeGroups.Response,
for: [Kayrock.DescribeGroups.V0.Response, Kayrock.DescribeGroups.V1.Response] do
def parse_response(%{groups: groups}) do
case Enum.filter(groups, &(&1.error_code != 0)) do
[] ->
groups = Enum.map(groups, &build_consumer_group/1)
{:ok, groups}

errors ->
error_list =
Enum.map(errors, fn %{group_id: group_id, error_code: error_code} ->
{group_id, Kayrock.ErrorCode.code_to_atom!(error_code)}
end)

{:error, error_list}
end
end

defp build_consumer_group(kayrock_group) do
KafkaEx.New.Structs.ConsumerGroup.from_describe_group_response(kayrock_group)
end
end
Loading

0 comments on commit 0f58f22

Please sign in to comment.