From 19aa7f679923b9f52c2341478bab2b9468159f93 Mon Sep 17 00:00:00 2001 From: Argonus Date: Sat, 7 Oct 2023 08:36:58 +0200 Subject: [PATCH] [Kayrock] Add default protocol for describe groups response --- lib/kafka_ex/new/protocols/describe_groups.ex | 15 +++ .../describe_groups/default_response_impl.ex | 24 ++++ lib/kafka_ex/new/structs/consumer_group.ex | 13 +- .../new/structs/consumer_group/member.ex | 4 +- .../consumer_group/member_assignment.ex | 4 +- .../consumer_group/partition_assignment.ex | 2 +- .../kayrock/describe_groups/response_test.exs | 123 ++++++++++++++++++ 7 files changed, 174 insertions(+), 11 deletions(-) create mode 100644 lib/kafka_ex/new/protocols/describe_groups.ex create mode 100644 lib/kafka_ex/new/protocols/kayrock/describe_groups/default_response_impl.ex create mode 100644 test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs diff --git a/lib/kafka_ex/new/protocols/describe_groups.ex b/lib/kafka_ex/new/protocols/describe_groups.ex new file mode 100644 index 00000000..b0903374 --- /dev/null +++ b/lib/kafka_ex/new/protocols/describe_groups.ex @@ -0,0 +1,15 @@ +defmodule KafkaEx.New.Protocols.DescribeGroups do + @moduledoc """ + This module handles Describe Groups request & response handling & parsing + """ + + defprotocol Response do + @moduledoc """ + This protocol is used to parse Describe Groups response + """ + alias KafkaEx.New.Structs.ConsumerGroup + + @spec parse_response(t()) :: {:ok, [ConsumerGroup.t()]} | {:error, term} + def parse_response(response) + end +end diff --git a/lib/kafka_ex/new/protocols/kayrock/describe_groups/default_response_impl.ex b/lib/kafka_ex/new/protocols/kayrock/describe_groups/default_response_impl.ex new file mode 100644 index 00000000..8c7f26c6 --- /dev/null +++ b/lib/kafka_ex/new/protocols/kayrock/describe_groups/default_response_impl.ex @@ -0,0 +1,24 @@ +defimpl KafkaEx.New.Protocols.DescribeGroups.Response, + for: [Kayrock.DescribeGroups.V0.Response, Kayrock.DescribeGroups.V1.Response] do + def parse_response(%{groups: groups}) do + case Enum.filter(groups, &(&1.error_code != 0)) do + [] -> + groups = Enum.map(groups, &build_consumer_group/1) + {:ok, groups} + + errors -> + error_list = + Enum.map(errors, fn %{group_id: group_id, error_code: error_code} -> + {group_id, Kayrock.ErrorCode.code_to_atom!(error_code)} + end) + + {:error, error_list} + end + end + + defp build_consumer_group(kayrock_group) do + KafkaEx.New.Structs.ConsumerGroup.from_describe_group_response( + kayrock_group + ) + end +end diff --git a/lib/kafka_ex/new/structs/consumer_group.ex b/lib/kafka_ex/new/structs/consumer_group.ex index 397546c9..9a9b3462 100644 --- a/lib/kafka_ex/new/structs/consumer_group.ex +++ b/lib/kafka_ex/new/structs/consumer_group.ex @@ -3,7 +3,7 @@ defmodule KafkaEx.New.Structs.ConsumerGroup do Encapsulates what we know about consumer group """ - alias __MODULE__.Member + alias KafkaEx.New.Structs.ConsumerGroup.Member @type t :: %__MODULE__{ group_id: binary, @@ -17,15 +17,16 @@ defmodule KafkaEx.New.Structs.ConsumerGroup do @spec from_describe_group_response(map) :: __MODULE__.t() def from_describe_group_response(describe_group) do - %KafkaEx.New.ConsumerGroup{ + %__MODULE__{ group_id: describe_group.group_id, state: describe_group.state, protocol_type: describe_group.protocol_type, protocol: describe_group.protocol, - members: - Enum.map(describe_group.members, fn group_member -> - Member.from_describe_group_response(group_member) - end) + members: Enum.map(describe_group.members, &build_consumer_group_member/1) } end + + defp build_consumer_group_member(group_member) do + Member.from_describe_group_response(group_member) + end end diff --git a/lib/kafka_ex/new/structs/consumer_group/member.ex b/lib/kafka_ex/new/structs/consumer_group/member.ex index cf8cd177..179d640e 100644 --- a/lib/kafka_ex/new/structs/consumer_group/member.ex +++ b/lib/kafka_ex/new/structs/consumer_group/member.ex @@ -1,8 +1,8 @@ -defmodule KafkaEx.New.ConsumerGroup.Member do +defmodule KafkaEx.New.Structs.ConsumerGroup.Member do @moduledoc """ Encapsulates what we know about a consumer group member """ - alias KafkaEx.New.ConsumerGroup.Member.MemberAssignment + alias KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment @type t :: %__MODULE__{ member_id: binary, diff --git a/lib/kafka_ex/new/structs/consumer_group/member_assignment.ex b/lib/kafka_ex/new/structs/consumer_group/member_assignment.ex index 4740407d..676f506e 100644 --- a/lib/kafka_ex/new/structs/consumer_group/member_assignment.ex +++ b/lib/kafka_ex/new/structs/consumer_group/member_assignment.ex @@ -1,10 +1,10 @@ -defmodule KafkaEx.New.ConsumerGroup.Member.MemberAssignment do +defmodule KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment do @moduledoc """ Encapsulates what we know about a consumer group. The current assignment provided by the group leader (will only be present if the group is stable). """ - alias KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment + alias KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment.PartitionAssignment @type t :: %__MODULE__{ version: non_neg_integer, diff --git a/lib/kafka_ex/new/structs/consumer_group/partition_assignment.ex b/lib/kafka_ex/new/structs/consumer_group/partition_assignment.ex index 22f56802..2299d9b7 100644 --- a/lib/kafka_ex/new/structs/consumer_group/partition_assignment.ex +++ b/lib/kafka_ex/new/structs/consumer_group/partition_assignment.ex @@ -1,4 +1,4 @@ -defmodule KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment do +defmodule KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment.PartitionAssignment do @moduledoc """ Encapsulates what we know about a consumer group member partition assignment. Will only be present if the group is stable and is assigned to given topic. diff --git a/test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs b/test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs new file mode 100644 index 00000000..fc7e55ad --- /dev/null +++ b/test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs @@ -0,0 +1,123 @@ +defmodule KafkaEx.New.Protocols.DescribeGroups.ResponseTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Protocols.DescribeGroups + + alias Kayrock.DescribeGroups.V0 + alias Kayrock.DescribeGroups.V1 + + describe "parse_response/1" do + @expected_group %KafkaEx.New.ConsumerGroup{ + group_id: "succeeded", + state: "stable", + protocol_type: "protocol_type", + protocol: "protocol", + members: [ + %KafkaEx.New.ConsumerGroup.Member{ + member_id: "member_id", + client_id: "client_id", + client_host: "client_host", + member_metadata: "member_metadata", + member_assignment: %KafkaEx.New.ConsumerGroup.Member.MemberAssignment{ + version: 0, + user_data: "user_data", + partition_assignments: [ + %KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment{ + topic: "test-topic", + partitions: [1, 2, 3] + } + ] + } + } + ] + } + + test "api version 0 - returns response if all groups succeeded" do + response = %V0.Response{ + groups: [ + %{ + group_id: "succeeded", + error_code: 0, + state: "stable", + protocol_type: "protocol_type", + protocol: "protocol", + members: [ + %{ + member_id: "member_id", + client_id: "client_id", + client_host: "client_host", + member_metadata: "member_metadata", + member_assignment: %{ + version: 0, + user_data: "user_data", + partition_assignments: [ + %{topic: "test-topic", partitions: [1, 2, 3]} + ] + } + } + ] + } + ] + } + + assert {:ok, [@expected_group]} == + DescribeGroups.Response.parse_response(response) + end + + test "api version 0 - returns error if any group failed" do + response = %V0.Response{ + groups: [ + %{group_id: "succeeded", error_code: 0}, + %{group_id: "failed", error_code: 1} + ] + } + + assert {:error, [{"failed", :offset_out_of_range}]} == + DescribeGroups.Response.parse_response(response) + end + + test "api version 1 - returns response if all groups succeeded" do + response = %V1.Response{ + groups: [ + %{ + group_id: "succeeded", + error_code: 0, + state: "stable", + protocol_type: "protocol_type", + protocol: "protocol", + members: [ + %{ + member_id: "member_id", + client_id: "client_id", + client_host: "client_host", + member_metadata: "member_metadata", + member_assignment: %{ + version: 0, + user_data: "user_data", + partition_assignments: [ + %{topic: "test-topic", partitions: [1, 2, 3]} + ] + } + } + ] + } + ] + } + + assert {:ok, [@expected_group]} == + DescribeGroups.Response.parse_response(response) + end + + test "api version 1 - returns error if any group failed" do + response = %V1.Response{ + groups: [ + %{group_id: "succeeded", error_code: 0}, + %{group_id: "failed", error_code: 1} + ] + } + + assert {:error, [{"failed", :offset_out_of_range}]} == + DescribeGroups.Response.parse_response(response) + end + end +end