Skip to content

Commit

Permalink
[Kayrock] Add default protocol for describe groups response
Browse files Browse the repository at this point in the history
  • Loading branch information
Argonus committed Oct 7, 2023
1 parent c59ab0f commit 19aa7f6
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 11 deletions.
15 changes: 15 additions & 0 deletions lib/kafka_ex/new/protocols/describe_groups.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule KafkaEx.New.Protocols.DescribeGroups do
@moduledoc """
This module handles Describe Groups request & response handling & parsing
"""

defprotocol Response do
@moduledoc """
This protocol is used to parse Describe Groups response
"""
alias KafkaEx.New.Structs.ConsumerGroup

@spec parse_response(t()) :: {:ok, [ConsumerGroup.t()]} | {:error, term}
def parse_response(response)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defimpl KafkaEx.New.Protocols.DescribeGroups.Response,
for: [Kayrock.DescribeGroups.V0.Response, Kayrock.DescribeGroups.V1.Response] do
def parse_response(%{groups: groups}) do
case Enum.filter(groups, &(&1.error_code != 0)) do
[] ->
groups = Enum.map(groups, &build_consumer_group/1)
{:ok, groups}

errors ->
error_list =
Enum.map(errors, fn %{group_id: group_id, error_code: error_code} ->
{group_id, Kayrock.ErrorCode.code_to_atom!(error_code)}
end)

{:error, error_list}
end
end

defp build_consumer_group(kayrock_group) do
KafkaEx.New.Structs.ConsumerGroup.from_describe_group_response(
kayrock_group
)
end
end
13 changes: 7 additions & 6 deletions lib/kafka_ex/new/structs/consumer_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule KafkaEx.New.Structs.ConsumerGroup do
Encapsulates what we know about consumer group
"""

alias __MODULE__.Member
alias KafkaEx.New.Structs.ConsumerGroup.Member

@type t :: %__MODULE__{
group_id: binary,
Expand All @@ -17,15 +17,16 @@ defmodule KafkaEx.New.Structs.ConsumerGroup do

@spec from_describe_group_response(map) :: __MODULE__.t()
def from_describe_group_response(describe_group) do
%KafkaEx.New.ConsumerGroup{
%__MODULE__{
group_id: describe_group.group_id,
state: describe_group.state,
protocol_type: describe_group.protocol_type,
protocol: describe_group.protocol,
members:
Enum.map(describe_group.members, fn group_member ->
Member.from_describe_group_response(group_member)
end)
members: Enum.map(describe_group.members, &build_consumer_group_member/1)
}
end

defp build_consumer_group_member(group_member) do
Member.from_describe_group_response(group_member)
end
end
4 changes: 2 additions & 2 deletions lib/kafka_ex/new/structs/consumer_group/member.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
defmodule KafkaEx.New.ConsumerGroup.Member do
defmodule KafkaEx.New.Structs.ConsumerGroup.Member do
@moduledoc """
Encapsulates what we know about a consumer group member
"""
alias KafkaEx.New.ConsumerGroup.Member.MemberAssignment
alias KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment

@type t :: %__MODULE__{
member_id: binary,
Expand Down
4 changes: 2 additions & 2 deletions lib/kafka_ex/new/structs/consumer_group/member_assignment.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
defmodule KafkaEx.New.ConsumerGroup.Member.MemberAssignment do
defmodule KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment do
@moduledoc """
Encapsulates what we know about a consumer group.
The current assignment provided by the group leader (will only be present if the group is stable).
"""

alias KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment
alias KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment.PartitionAssignment

@type t :: %__MODULE__{
version: non_neg_integer,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment do
defmodule KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment.PartitionAssignment do
@moduledoc """
Encapsulates what we know about a consumer group member partition assignment.
Will only be present if the group is stable and is assigned to given topic.
Expand Down
123 changes: 123 additions & 0 deletions test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
defmodule KafkaEx.New.Protocols.DescribeGroups.ResponseTest do
use ExUnit.Case, async: true

alias KafkaEx.New.Protocols.DescribeGroups

alias Kayrock.DescribeGroups.V0
alias Kayrock.DescribeGroups.V1

describe "parse_response/1" do
@expected_group %KafkaEx.New.ConsumerGroup{
group_id: "succeeded",
state: "stable",
protocol_type: "protocol_type",
protocol: "protocol",
members: [
%KafkaEx.New.ConsumerGroup.Member{
member_id: "member_id",
client_id: "client_id",
client_host: "client_host",
member_metadata: "member_metadata",
member_assignment: %KafkaEx.New.ConsumerGroup.Member.MemberAssignment{
version: 0,
user_data: "user_data",
partition_assignments: [
%KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment{

Check failure on line 25 in test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.11, 21.3)

** (CompileError) test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs:25: KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment.__struct__/1 is undefined, cannot expand struct KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment. Make sure the struct name is correct. If the struct name exists and is correct but it still cannot be found, you likely have cyclic module usage in your code

Check failure on line 25 in test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.14, 25.2)

** (CompileError) test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs:25: KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment.__struct__/1 is undefined, cannot expand struct KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment. Make sure the struct name is correct. If the struct name exists and is correct but it still cannot be found, you likely have cyclic module usage in your code

Check failure on line 25 in test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.13, 24.3)

** (CompileError) test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs:25: KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment.__struct__/1 is undefined, cannot expand struct KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment. Make sure the struct name is correct. If the struct name exists and is correct but it still cannot be found, you likely have cyclic module usage in your code

Check failure on line 25 in test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.8, 20.3)

** (CompileError) test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs:25: KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment.__struct__/1 is undefined, cannot expand struct KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment
topic: "test-topic",
partitions: [1, 2, 3]
}
]
}
}
]
}

test "api version 0 - returns response if all groups succeeded" do
response = %V0.Response{
groups: [
%{
group_id: "succeeded",
error_code: 0,
state: "stable",
protocol_type: "protocol_type",
protocol: "protocol",
members: [
%{
member_id: "member_id",
client_id: "client_id",
client_host: "client_host",
member_metadata: "member_metadata",
member_assignment: %{
version: 0,
user_data: "user_data",
partition_assignments: [
%{topic: "test-topic", partitions: [1, 2, 3]}
]
}
}
]
}
]
}

assert {:ok, [@expected_group]} ==
DescribeGroups.Response.parse_response(response)
end

test "api version 0 - returns error if any group failed" do
response = %V0.Response{
groups: [
%{group_id: "succeeded", error_code: 0},
%{group_id: "failed", error_code: 1}
]
}

assert {:error, [{"failed", :offset_out_of_range}]} ==
DescribeGroups.Response.parse_response(response)
end

test "api version 1 - returns response if all groups succeeded" do
response = %V1.Response{
groups: [
%{
group_id: "succeeded",
error_code: 0,
state: "stable",
protocol_type: "protocol_type",
protocol: "protocol",
members: [
%{
member_id: "member_id",
client_id: "client_id",
client_host: "client_host",
member_metadata: "member_metadata",
member_assignment: %{
version: 0,
user_data: "user_data",
partition_assignments: [
%{topic: "test-topic", partitions: [1, 2, 3]}
]
}
}
]
}
]
}

assert {:ok, [@expected_group]} ==
DescribeGroups.Response.parse_response(response)
end

test "api version 1 - returns error if any group failed" do
response = %V1.Response{
groups: [
%{group_id: "succeeded", error_code: 0},
%{group_id: "failed", error_code: 1}
]
}

assert {:error, [{"failed", :offset_out_of_range}]} ==
DescribeGroups.Response.parse_response(response)
end
end
end

0 comments on commit 19aa7f6

Please sign in to comment.