Skip to content

Commit

Permalink
Add docker to tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Argonus committed Feb 25, 2024
1 parent a64ac78 commit f4877ec
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 59 deletions.
5 changes: 0 additions & 5 deletions .credo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,7 @@
{Credo.Check.Consistency.MultiAliasImportRequireUse, false},

# Deprecated checks (these will be deleted after a grace period)
#
{Credo.Check.Readability.Specs, false},
{Credo.Check.Warning.NameRedeclarationByAssignment, false},
{Credo.Check.Warning.NameRedeclarationByCase, false},
{Credo.Check.Warning.NameRedeclarationByDef, false},
{Credo.Check.Warning.NameRedeclarationByFn, false},

# Custom checks can be created using `mix credo.gen.check`.
#
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,11 @@ jobs:
path: _build
key: ${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-${{ hashFiles('mix.lock') }}

- name: Docker-compose up
run: ./scripts/docker_up.sh

- name: Docker ps
run: docker ps -a

- name: Run tests
run: ./scripts/ci_tests.sh
34 changes: 16 additions & 18 deletions lib/kafka_ex/api_versions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,31 @@ defmodule KafkaEx.ApiVersions do
end)
end

def find_api_version([:unsupported], _, {min_implemented_version, _}), do: {:ok, min_implemented_version}

def find_api_version(
api_versions_map,
message_type,
{min_implemented_version, max_implemented_version}
) do
if api_versions_map == [:unsupported] do
{:ok, min_implemented_version}
else
case KafkaEx.Protocol.api_key(message_type) do
nil ->
:unknown_message_for_client
case KafkaEx.Protocol.api_key(message_type) do
nil ->
:unknown_message_for_client

api_key ->
case api_versions_map[api_key] do
%{min_version: min} when min > max_implemented_version ->
:no_version_supported
api_key ->
case api_versions_map[api_key] do
%{min_version: min} when min > max_implemented_version ->
:no_version_supported

%{max_version: max} when max < min_implemented_version ->
:no_version_supported
%{max_version: max} when max < min_implemented_version ->
:no_version_supported

%{max_version: max} ->
{:ok, Enum.min([max_implemented_version, max])}
%{max_version: max} ->
{:ok, Enum.min([max_implemented_version, max])}

_ ->
:unknown_message_for_server
end
end
_ ->
:unknown_message_for_server
end
end
end
end
15 changes: 2 additions & 13 deletions lib/kafka_ex/default_partitioner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ defmodule KafkaEx.DefaultPartitioner do
alias KafkaEx.Utils.Murmur, as: Murmur
require Logger

@spec assign_partition(
request :: %ProduceRequest{},
metadata :: %MetadataResponse{}
) :: %ProduceRequest{}
@spec assign_partition(request :: ProduceRequest.t(), metadata :: MetadataResponse.t()) ::
ProduceRequest.t()
def assign_partition(%ProduceRequest{partition: partition} = request, _)
when is_number(partition) do
request
Expand All @@ -39,10 +37,6 @@ defmodule KafkaEx.DefaultPartitioner do
end
end

@spec assign_partition_randomly(
request :: %ProduceRequest{},
metadata :: %MetadataResponse{}
) :: %ProduceRequest{}
defp assign_partition_randomly(
%ProduceRequest{topic: topic} = request,
metadata
Expand All @@ -56,11 +50,6 @@ defmodule KafkaEx.DefaultPartitioner do
%{request | partition: partition_id}
end

@spec assign_partition_with_key(
request :: %ProduceRequest{},
metadata :: %MetadataResponse{},
key :: binary
) :: %ProduceRequest{}
defp assign_partition_with_key(
%ProduceRequest{topic: topic} = request,
metadata,
Expand Down
20 changes: 3 additions & 17 deletions lib/kafka_ex/legacy_partitioner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ defmodule KafkaEx.LegacyPartitioner do
alias KafkaEx.Utils.Murmur, as: Murmur
require Logger

@spec assign_partition(
request :: %ProduceRequest{},
metadata :: %MetadataResponse{}
) :: %ProduceRequest{}
@spec assign_partition(request :: ProduceRequest.t(), metadata :: MetadataResponse.t()) ::
ProduceRequest.t()
def assign_partition(%ProduceRequest{partition: partition} = request, _)
when is_number(partition) do
request
Expand All @@ -40,14 +38,7 @@ defmodule KafkaEx.LegacyPartitioner do
end
end

@spec assign_partition_randomly(
request :: %ProduceRequest{},
metadata :: %MetadataResponse{}
) :: %ProduceRequest{}
defp assign_partition_randomly(
%ProduceRequest{topic: topic} = request,
metadata
) do
defp assign_partition_randomly(%ProduceRequest{topic: topic} = request, metadata) do
partition_id =
case MetadataResponse.partitions_for_topic(metadata, topic) do
[] -> 0
Expand All @@ -57,11 +48,6 @@ defmodule KafkaEx.LegacyPartitioner do
%{request | partition: partition_id}
end

@spec assign_partition_with_key(
request :: %ProduceRequest{},
metadata :: %MetadataResponse{},
key :: binary
) :: %ProduceRequest{}
defp assign_partition_with_key(
%ProduceRequest{topic: topic} = request,
metadata,
Expand Down
9 changes: 3 additions & 6 deletions lib/kafka_ex/partitioner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ defmodule KafkaEx.Partitioner do
alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest
alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse

@callback assign_partition(
request :: %ProduceRequest{},
metadata :: %MetadataResponse{}
) :: %ProduceRequest{}

@callback assign_partition(request :: ProduceRequest.t(), metadata :: MetadataResponse.t()) ::
ProduceRequest.t()
defmacro __using__(_) do
quote location: :keep do
@behaviour KafkaEx.Partitioner
Expand All @@ -24,7 +21,7 @@ defmodule KafkaEx.Partitioner do
either `{:ok, nil}` if no key was found, `{:ok, key}` when key was found,
or `{:error, atom}` when error happens while looking for the key.
"""
@spec get_key(request :: %ProduceRequest{}) ::
@spec get_key(request :: ProduceRequest.t()) ::
{:ok, nil | binary} | {:error, atom}
def get_key(%ProduceRequest{messages: messages}) when length(messages) > 0 do
case unique_keys(messages) do
Expand Down

0 comments on commit f4877ec

Please sign in to comment.