diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml new file mode 100644 index 00000000..ffc4708f --- /dev/null +++ b/.github/workflows/integration.yml @@ -0,0 +1,100 @@ +name: CI Integration + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +jobs: + setup: + name: test | setup dependencies + runs-on: ubuntu-20.04 + env: + MIX_ENV: test + strategy: + matrix: + pair: + - elixir: 1.16 + otp: 26.1 + + steps: + - name: Cancel previous runs + uses: styfle/cancel-workflow-action@0.9.0 + with: + access_token: ${{ github.token }} + - name: Checkout Github repo + uses: actions/checkout@v2 + - name: Setup elixir & erlang environment + uses: erlef/setup-beam@v1 + with: + elixir-version: ${{matrix.pair.elixir}} # Define the elixir version [required] + otp-version: ${{matrix.pair.otp}} # Define the OTP version [required] + + - name: Retrieve Mix Dependencies Cache + uses: actions/cache@v2 + id: mix-cache # id to use in retrieve action + with: + path: deps + key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }} + + - name: Retrieve Mix Dependencies Compilation Cache + uses: actions/cache@v2 + id: mix-deps-compile-cache # id to use in retrieve action + with: + path: _build + key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-deps-compile-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }} + - name: Install Mix Dependencies + run: | + mix local.rebar --force + mix local.hex --force + mix deps.get + + - name: Compile Mix Dependencies + run: mix deps.compile + + test: + name: runner / Test + needs: [setup] + + runs-on: ubuntu-20.04 + env: + MIX_ENV: test + + strategy: + fail-fast: false + matrix: + pair: + - elixir: 1.16 + otp: 26.1 + + steps: + - uses: actions/checkout@v2 + - name: Setup elixir & erlang environment + uses: erlef/setup-beam@v1 + with: + elixir-version: ${{matrix.pair.elixir}} # Define the elixir version [required] + otp-version: ${{matrix.pair.otp}} # Define the OTP version [required] + + - name: Retrieve Mix Dependencies Cache + uses: actions/cache@v2 + id: mix-cache # id to use in retrieve action + with: + path: deps + key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }} + + - name: Retrieve Mix Dependencies Compilation Cache + uses: actions/cache@v2 + id: mix-deps-compile-cache # id to use in retrieve action + with: + path: _build + key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-deps-compile-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }} + + - name: Docker-compose up + run: ./scripts/docker_up.sh + + - name: Docker ps + run: docker ps -a + + - name: Run Tests + run: ./scripts/ci_tests.sh \ No newline at end of file diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0a8719cc..13f1eadf 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -105,4 +105,4 @@ jobs: run: docker ps -a - name: Run Tests - run: ./scripts/ci_tests.sh \ No newline at end of file + run: mix test \ No newline at end of file diff --git a/lib/kafka_ex/new/client.ex b/lib/kafka_ex/new/client.ex index 3fce96e8..da9528fb 100644 --- a/lib/kafka_ex/new/client.ex +++ b/lib/kafka_ex/new/client.ex @@ -163,6 +163,16 @@ defmodule KafkaEx.New.Client do end end + # Backward compatibility, to be deleted once we delete legacy code + def handle_call({:offset, topic, partition, timestamp}, _from, state) do + partition_data = %{partition_num: partition, timestamp: timestamp} + + case list_offset_request({topic, [partition_data]}, [], state) do + {:error, error} -> {:reply, {:error, error}, state} + {result, updated_state} -> {:reply, result, updated_state} + end + end + def handle_call({:describe_groups, [consumer_group_name], opts}, _from, state) do if KafkaEx.valid_consumer_group?(consumer_group_name) do case describe_group_request(consumer_group_name, opts, state) do diff --git a/lib/kafka_ex/new/client_compatibility.ex b/lib/kafka_ex/new/client_compatibility.ex index d296f3f4..d359977f 100644 --- a/lib/kafka_ex/new/client_compatibility.ex +++ b/lib/kafka_ex/new/client_compatibility.ex @@ -37,20 +37,13 @@ defmodule KafkaEx.New.ClientCompatibility do # note we also try to create the topic if it does not exist state = ensure_topics_metadata(state, [produce_request.topic], true) - produce_request = - default_partitioner().assign_partition( - produce_request, - Adapter.metadata_response(state.cluster_metadata) - ) + metadata_response = Adapter.metadata_response(state.cluster_metadata) + produce_request = default_partitioner().assign_partition(produce_request, metadata_response) {request, topic, partition} = Adapter.produce_request(produce_request) - {response, updated_state} = - kayrock_network_request( - request, - NodeSelector.topic_partition(topic, partition), - state - ) + node_selector = NodeSelector.topic_partition(topic, partition) + {response, updated_state} = kayrock_network_request(request, node_selector, state) response = case response do @@ -256,18 +249,10 @@ defmodule KafkaEx.New.ClientCompatibility do raise KafkaEx.ConsumerGroupRequiredError, offset_fetch end - {request, consumer_group} = - Adapter.offset_fetch_request( - offset_fetch, - state.consumer_group_for_auto_commit - ) + {request, consumer_group} = Adapter.offset_fetch_request(offset_fetch, state.consumer_group_for_auto_commit) - {response, updated_state} = - kayrock_network_request( - request, - NodeSelector.consumer_group(consumer_group), - state - ) + node_selector = NodeSelector.consumer_group(consumer_group) + {response, updated_state} = kayrock_network_request(request, node_selector, state) response = case response do diff --git a/lib/kafka_ex/protocol/offset.ex b/lib/kafka_ex/protocol/offset.ex index 62b0bf38..d75aa161 100644 --- a/lib/kafka_ex/protocol/offset.ex +++ b/lib/kafka_ex/protocol/offset.ex @@ -27,10 +27,9 @@ defmodule KafkaEx.Protocol.Offset do defstruct topic: nil, partition_offsets: [] @type t :: %Response{topic: binary, partition_offsets: list} - def extract_offset([%__MODULE__{partition_offsets: [%{offset: [offset]}]}]), - do: offset - + def extract_offset([%__MODULE__{partition_offsets: [%{offset: [offset]}]}]), do: offset def extract_offset([%__MODULE__{partition_offsets: [%{offset: []}]}]), do: 0 + def extract_offset([%{partition_offsets: [%{offset: offset}]}]) when is_integer(offset), do: offset end @spec create_request(integer, binary, binary, integer, term) :: iolist diff --git a/test/kafka_ex/new/structs/offset/partition_offset_test.exs b/test/kafka_ex/new/structs/offset/partition_offset_test.exs new file mode 100644 index 00000000..b720d0bf --- /dev/null +++ b/test/kafka_ex/new/structs/offset/partition_offset_test.exs @@ -0,0 +1,27 @@ +defmodule KafkaEx.New.Structs.Offset.PartitionOffsetTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Structs.Offset.PartitionOffset + + describe "build/1" do + test "returns struct with missing timestamp" do + result = PartitionOffset.build(%{partition: 1, offset: 2}) + + assert result == %PartitionOffset{ + partition: 1, + offset: 2, + timestamp: -1 + } + end + + test "returns struct with timestamp" do + result = PartitionOffset.build(%{partition: 1, offset: 2, timestamp: 123}) + + assert result == %PartitionOffset{ + partition: 1, + offset: 2, + timestamp: 123 + } + end + end +end diff --git a/test/kafka_ex/new/structs/offset_test.exs b/test/kafka_ex/new/structs/offset_test.exs new file mode 100644 index 00000000..fe222f44 --- /dev/null +++ b/test/kafka_ex/new/structs/offset_test.exs @@ -0,0 +1,40 @@ +defmodule KafkaEx.New.Structs.OffsetTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Structs.Offset + + describe "from_list_offset/2" do + test "creates offset with v0 partition responses" do + result = Offset.from_list_offset("test-topic", [%{offset: 1, partition: 2}]) + + assert result == %Offset{ + topic: "test-topic", + partition_offsets: [ + %Offset.PartitionOffset{offset: 1, partition: 2, timestamp: -1} + ] + } + end + + test "creates offset with v1 partition responses" do + result = Offset.from_list_offset("test-topic", [%{offset: 1, partition: 2}]) + + assert result == %Offset{ + topic: "test-topic", + partition_offsets: [ + %Offset.PartitionOffset{offset: 1, partition: 2, timestamp: -1} + ] + } + end + + test "creates offset with v2 partition responses" do + result = Offset.from_list_offset("test-topic", [%{offset: 1, partition: 2, timestamp: 3}]) + + assert result == %Offset{ + topic: "test-topic", + partition_offsets: [ + %Offset.PartitionOffset{offset: 1, partition: 2, timestamp: 3} + ] + } + end + end +end