Skip to content

Commit

Permalink
Merge pull request #583 from zmstone/master
Browse files Browse the repository at this point in the history
Test with Kafka 3.6
  • Loading branch information
zmstone authored Jun 18, 2024
2 parents adda7c3 + d5e92b4 commit 40cbc91
Show file tree
Hide file tree
Showing 27 changed files with 265 additions and 220 deletions.
11 changes: 6 additions & 5 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ on:
branches:
- master
env:
OTP_VERSION: "24.1"
REBAR_VERSION: "3.17.0"
OTP_VERSION: "26"
REBAR_VERSION: "3.20.0"

jobs:
lint:
Expand Down Expand Up @@ -44,8 +44,8 @@ jobs:
strategy:
fail-fast: false
matrix:
otp: ["24.1", "23.3.4.7", "22.3.4.21"]
kafka: ["2.4", "1.1", "0.11"]
otp: ["26"]
kafka: ["0.9", "0.10", "0.11", "2.8", "1.1", "3.6"]
steps:
- name: Checkout
uses: actions/checkout@v2
Expand All @@ -69,7 +69,8 @@ jobs:
run: |
export KAFKA_VERSION=${{ matrix.kafka }}
echo "Running Kafka ${KAFKA_VERSION}"
scripts/setup-test-env.sh && rebar3 do ct,eunit
make test-env
make t
- name: Store test logs
uses: actions/upload-artifact@v1
if: always()
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ relx
docker/
TAGS
.vscode/
test/data/ssl/*.pem
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
KAFKA_VERSION ?= 3.6
export KAFKA_VERSION
all: compile

compile:
Expand All @@ -8,6 +10,10 @@ lint:

test-env:
@./scripts/setup-test-env.sh
@mkdir -p ./test/data/ssl
@docker cp kafka-1:/localhost-ca-crt.pem ./test/data/ssl/ca.pem
@docker cp kafka-1:/localhost-client-key.pem ./test/data/ssl/client-key.pem
@docker cp kafka-1:/localhost-client-crt.pem ./test/data/ssl/client-crt.pem

ut:
@rebar3 eunit -v --cover_export_name ut-$(KAFKA_VERSION)
Expand Down
8 changes: 4 additions & 4 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
{relx, [{release, {brod, "i"}, % release the interactive shell as brod-i
[brod, jsone, docopt]},
{include_erts, true},
{overlay, [{copy, "scripts/brod", "bin"},
{copy, "{{lib_dirs}}/crc32cer/priv/crc32cer*.so", "bin"},
{copy, "{{lib_dirs}}/snappyer/priv/snappyer.so", "bin"}
{overlay, [{copy, "scripts/brod", "bin/"},
{copy, "{{lib_dirs}}/crc32cer/priv/crc32cer*.so", "bin/"},
{copy, "{{lib_dirs}}/snappyer/priv/snappyer.so", "bin/"}
]}
]}]},
{test, [
Expand All @@ -28,7 +28,7 @@
, {jsone, "1.7.0"}
, {meck, "0.9.2"}
, {proper, "1.4.0"}
, {snabbkaffe, "1.0.1"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {branch, "1.0.8"}}}
]},
{erl_opts, [warnings_as_errors, {d, build_brod_cli}]}
]}
Expand Down
6 changes: 3 additions & 3 deletions scripts/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ version: "2"

services:
zookeeper:
image: "zmstone/kafka:${KAFKA_VERSION}"
image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}"
container_name: zookeeper
command: run zookeeper
network_mode: host
kafka_1:
depends_on:
- zookeeper
image: "zmstone/kafka:${KAFKA_VERSION}"
image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}"
container_name: "kafka-1"
network_mode: host
environment:
Expand All @@ -23,7 +23,7 @@ services:
kafka_2:
depends_on:
- zookeeper
image: "zmstone/kafka:${KAFKA_VERSION}"
image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}"
container_name: "kafka-2"
network_mode: host
environment:
Expand Down
77 changes: 56 additions & 21 deletions scripts/setup-test-env.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#!/bin/bash -eu

if [ -n "${DEBUG:-}" ]; then
set -x
fi

docker ps > /dev/null || {
echo "You must be a member of docker group to run this script"
exit 1
Expand All @@ -18,46 +22,77 @@ function docker_compose {
fi
}

VERSION=${KAFKA_VERSION:-2.4}
if [ -z $VERSION ]; then VERSION=$1; fi
KAFKA_VERSION=${KAFKA_VERSION:-3.6}
if [ -z $KAFKA_VERSION ]; then KAFKA_VERSION=$1; fi

case $VERSION in
case $KAFKA_VERSION in
0.9*)
KAFKA_VERSION="0.9";;
0.10*)
VERSION="0.10";;
KAFKA_VERSION="0.10";;
0.11*)
VERSION="0.11";;
KAFKA_VERSION="0.11";;
1.*)
VERSION="1.1";;
KAFKA_VERSION="1.1";;
2.*)
VERSION="2.4";;
KAFKA_VERSION="2.8";;
3.*)
KAFKA_VERSION="3.6";;
*)
VERSION="2.4";;
KAFKA_VERSION="3.6";;
esac

echo "Using KAFKA_VERSION=$VERSION"
export KAFKA_VERSION=$VERSION
export KAFKA_IMAGE_VERSION="1.1-${KAFKA_VERSION}"
echo "env KAFKA_IMAGE_VERSION=$KAFKA_IMAGE_VERSION"

TD="$(cd "$(dirname "$0")" && pwd)"

docker_compose -f $TD/docker-compose.yml down || true
docker_compose -f $TD/docker-compose.yml up -d

if [[ "$KAFKA_VERSION" == 2* ]] || [[ "$KAFKA_VERSION" == 3* ]]; then
MAYBE_ZOOKEEPER="--bootstrap-server localhost:9092"
else
MAYBE_ZOOKEEPER="--zookeeper localhost:2181"
fi

n=0
while [ "$(docker exec kafka-1 bash -c '/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --list')" != '' ]; do
if [ $n -gt 4 ]; then
echo "timeout waiting for kakfa_1"
exit 1
TOPIC_LIST_CMD="/opt/kafka/bin/kafka-topics.sh $MAYBE_ZOOKEEPER --list"
MAX_WAIT_SEC=10

function wait_for_kafka {
local which_kafka="$1"
local n=0
local port=':9092'
local topic_list listener
if [ "$which_kafka" = 'kafka-2' ]; then
port=':9192'
fi
n=$(( n + 1 ))
sleep 1
done
while true; do
listener="$(netstat -tnlp 2>&1 | grep $port || true)"
if [ "$listener" != '' ]; then
topic_list="$(docker exec $which_kafka $TOPIC_LIST_CMD 2>&1)"
if [ "${topic_list-}" = '' ]; then
break
fi
fi
if [ $n -gt $MAX_WAIT_SEC ]; then
echo "timeout waiting for kafka-1"
echo "last print: ${topic_list:-}"
exit 1
fi
n=$(( n + 1 ))
sleep 1
done
}

wait_for_kafka kafka-1
wait_for_kafka kafka-2

function create_topic {
TOPIC_NAME="$1"
PARTITIONS="${2:-1}"
REPLICAS="${3:-1}"
CMD="/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --create --partitions $PARTITIONS --replication-factor $REPLICAS --topic $TOPIC_NAME --config min.insync.replicas=1"
CMD="/opt/kafka/bin/kafka-topics.sh $MAYBE_ZOOKEEPER --create --partitions $PARTITIONS --replication-factor $REPLICAS --topic $TOPIC_NAME --config min.insync.replicas=1"
docker exec kafka-1 bash -c "$CMD"
}

Expand All @@ -80,7 +115,7 @@ create_topic "brod_compression_SUITE"
create_topic "lz4-test"
create_topic "test-topic"

if [[ "$KAFKA_VERSION" = 2* ]]; then
if [[ "$KAFKA_VERSION" = 2* ]] || [[ "$KAFKA_VERSION" = 3* ]]; then
MAYBE_NEW_CONSUMER=""
else
MAYBE_NEW_CONSUMER="--new-consumer"
Expand All @@ -90,5 +125,5 @@ docker exec kafka-1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server l

# for kafka 0.11 or later, add sasl-scram test credentials
if [[ "$KAFKA_VERSION" != 0.9* ]] && [[ "$KAFKA_VERSION" != 0.10* ]]; then
docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice
docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh $MAYBE_ZOOKEEPER --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice
fi
35 changes: 25 additions & 10 deletions src/brod_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ handle_fetch_response(#kpro_rsp{ref = Ref1},
} = State) when Ref1 =/= Ref2 ->
%% Not expected response, discard
{noreply, State};
handle_fetch_response(#kpro_rsp{ref = Ref} = Rsp,
handle_fetch_response(#kpro_rsp{ref = Ref, vsn = Vsn} = Rsp,
#state{ topic = Topic
, partition = Partition
, last_req_ref = Ref
Expand All @@ -472,7 +472,7 @@ handle_fetch_response(#kpro_rsp{ref = Ref} = Rsp,
{ok, #{ header := Header
, batches := Batches
}} ->
handle_batches(Header, Batches, State);
handle_batches(Header, Batches, State, Vsn);
{error, ErrorCode} ->
Error = #kafka_fetch_error{ topic = Topic
, partition = Partition
Expand All @@ -481,7 +481,7 @@ handle_fetch_response(#kpro_rsp{ref = Ref} = Rsp,
handle_fetch_error(Error, State)
end.

handle_batches(?undef, [], #state{} = State0) ->
handle_batches(?undef, [], #state{} = State0, _Vsn) ->
%% It is only possible to end up here in a incremental
%% fetch session, empty fetch response implies no
%% new messages to fetch, and no changes in partition
Expand All @@ -491,25 +491,38 @@ handle_batches(?undef, [], #state{} = State0) ->
State = maybe_delay_fetch_request(State0),
{noreply, State};
handle_batches(_Header, ?incomplete_batch(Size),
#state{max_bytes = MaxBytes} = State0) ->
#state{max_bytes = MaxBytes} = State0, _Vsn) ->
%% max_bytes is too small to fetch ONE complete batch
true = Size > MaxBytes, %% assert
State1 = State0#state{max_bytes = Size},
State = maybe_send_fetch_request(State1),
{noreply, State};
handle_batches(Header, [], #state{begin_offset = BeginOffset} = State0) ->
handle_batches(Header, [], #state{begin_offset = BeginOffset} = State0, Vsn) ->
StableOffset = brod_utils:get_stable_offset(Header),
State =
case BeginOffset < StableOffset of
true ->
%% There are chances that kafka may return empty message set
true when Vsn > 0 ->
%% There are chances that Kafka may return empty message set
%% when messages are deleted from a compacted topic.
%% Since there is no way to know how big the 'hole' is
%% we can only bump begin_offset with +1 and try again.
?BROD_LOG_WARNING("~s-~p empty_batch_detected_at_offset=~p, "
"fetch_api_vsn=~p, skip_to_offset=~p",
[State0#state.topic,
State0#state.partition,
BeginOffset,
BeginOffset + 1
]),
State1 = State0#state{begin_offset = BeginOffset + 1},
maybe_send_fetch_request(State1);
true ->
%% Fetch API v0 (Kafka 0.9 and 0.10) seems to have a race condition:
%% Kafka returns empty batch even if BeginOffset is lower than high-watermark
%% if fetch request is sent in a tight loop
%% Retry seems to resolve the issue
maybe_delay_fetch_request(State0);
false ->
%% we have either reached the end of a partition
%% We have either reached the end of a partition
%% or trying to read uncommitted messages
%% try to poll again (maybe after a delay)
maybe_delay_fetch_request(State0)
Expand All @@ -521,7 +534,7 @@ handle_batches(Header, Batches,
, begin_offset = BeginOffset
, topic = Topic
, partition = Partition
} = State0) ->
} = State0, _Vsn) ->
StableOffset = brod_utils:get_stable_offset(Header),
{NewBeginOffset, Messages} =
brod_utils:flatten_batches(BeginOffset, Header, Batches),
Expand Down Expand Up @@ -721,14 +734,16 @@ send_fetch_request(#state{ begin_offset = BeginOffset
} = State) ->
(is_integer(BeginOffset) andalso BeginOffset >= 0) orelse
erlang:error({bad_begin_offset, BeginOffset}),
%% MaxBytes=0 will make no progress when it's Kafka 0.9
MaxBytes = max(12, State#state.max_bytes),
Request =
brod_kafka_request:fetch(Connection,
State#state.topic,
State#state.partition,
State#state.begin_offset,
State#state.max_wait_time,
State#state.min_bytes,
State#state.max_bytes,
MaxBytes,
State#state.isolation_level),
case kpro:request_async(Connection, Request) of
ok ->
Expand Down
21 changes: 17 additions & 4 deletions src/brod_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,25 @@ fetch(Conn, ReqFun, Offset, MaxBytes) ->
fetch(Conn, ReqFun, Offset, Size);
{ok, #{header := Header, batches := Batches}} ->
StableOffset = get_stable_offset(Header),
{NewBeginOffset, Msgs} = flatten_batches(Offset, Header, Batches),
{NewBeginOffset0, Msgs} = flatten_batches(Offset, Header, Batches),
case Offset < StableOffset andalso Msgs =:= [] of
true ->
%% Not reached the latest stable offset yet,
%% but received an empty batch-set (all messages are dropped).
%% try again with new begin-offset
NewBeginOffset =
case NewBeginOffset0 > Offset of
true ->
%% Not reached the latest stable offset yet,
%% but resulted in an empty batch-set,
%% i.e. all messages are dropped due to they are before
%% the last fetch Offset.
%% try again with new begin-offset.
NewBeginOffset0;
false when NewBeginOffset0 =:= Offset ->
%% There are chances that Kafka may return empty message set
%% when messages are deleted from a compacted topic.
%% Since there is no way to know how big the 'hole' is
%% we can only bump begin_offset with +1 and try again.
NewBeginOffset0 + 1
end,
fetch(Conn, ReqFun, NewBeginOffset, MaxBytes);
false ->
{ok, {StableOffset, Msgs}}
Expand Down
9 changes: 5 additions & 4 deletions test/brod_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@
suite() -> [{timetrap, {minutes, 5}}].

init_per_suite(Config) ->
case os:getenv("KAFKA_VERSION") of
"0.9" -> {skip,
"The given Kafka test image does not have support for these apis"};
_ -> Config
case kafka_test_helper:kafka_version() of
{0, 9} ->
{skip, "no_topic_manaegment_apis"};
_ ->
Config
end.

end_per_suite(_Config) ->
Expand Down
8 changes: 7 additions & 1 deletion test/brod_cli_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,13 @@ get_kafka_version() ->
{list_to_integer(Major), list_to_integer(Minor)}
end.

run(Args) ->
run(Args0) ->
Args = case kafka_test_helper:kafka_version() of
{0, Minor} when Minor < 11 ->
Args0 ++ ["--no-api-vsn-query"];
_ ->
Args0
end,
_ = cmd(Args),
ok.

Expand Down
5 changes: 1 addition & 4 deletions test/brod_compression_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,7 @@ start_client(Hosts, ClientId) ->
brod:start_client(Hosts, ClientId, Config).

client_config() ->
case os:getenv("KAFKA_VERSION") of
"0.9" ++ _ -> [{query_api_versions, false}];
_ -> []
end.
kafka_test_helper:client_config().

%%%_* Emacs ====================================================================
%%% Local Variables:
Expand Down
Loading

0 comments on commit 40cbc91

Please sign in to comment.