diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 1ab8ffd0..209da873 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -7,8 +7,8 @@ on: branches: - master env: - OTP_VERSION: "24.1" - REBAR_VERSION: "3.17.0" + OTP_VERSION: "26" + REBAR_VERSION: "3.20.0" jobs: lint: @@ -44,8 +44,8 @@ jobs: strategy: fail-fast: false matrix: - otp: ["24.1", "23.3.4.7", "22.3.4.21"] - kafka: ["2.4", "1.1", "0.11"] + otp: ["26"] + kafka: ["0.9", "0.10", "0.11", "2.8", "1.1", "3.6"] steps: - name: Checkout uses: actions/checkout@v2 @@ -69,7 +69,8 @@ jobs: run: | export KAFKA_VERSION=${{ matrix.kafka }} echo "Running Kafka ${KAFKA_VERSION}" - scripts/setup-test-env.sh && rebar3 do ct,eunit + make test-env + make t - name: Store test logs uses: actions/upload-artifact@v1 if: always() diff --git a/.gitignore b/.gitignore index 68fece43..0e07c25f 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ relx docker/ TAGS .vscode/ +test/data/ssl/*.pem diff --git a/Makefile b/Makefile index 426e39a2..7b7d6db9 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,5 @@ +KAFKA_VERSION ?= 3.6 +export KAFKA_VERSION all: compile compile: @@ -8,6 +10,10 @@ lint: test-env: @./scripts/setup-test-env.sh + @mkdir -p ./test/data/ssl + @docker cp kafka-1:/localhost-ca-crt.pem ./test/data/ssl/ca.pem + @docker cp kafka-1:/localhost-client-key.pem ./test/data/ssl/client-key.pem + @docker cp kafka-1:/localhost-client-crt.pem ./test/data/ssl/client-crt.pem ut: @rebar3 eunit -v --cover_export_name ut-$(KAFKA_VERSION) diff --git a/rebar.config b/rebar.config index 4637d1e4..d6a68551 100644 --- a/rebar.config +++ b/rebar.config @@ -17,9 +17,9 @@ {relx, [{release, {brod, "i"}, % release the interactive shell as brod-i [brod, jsone, docopt]}, {include_erts, true}, - {overlay, [{copy, "scripts/brod", "bin"}, - {copy, "{{lib_dirs}}/crc32cer/priv/crc32cer*.so", "bin"}, - {copy, "{{lib_dirs}}/snappyer/priv/snappyer.so", "bin"} + {overlay, [{copy, "scripts/brod", "bin/"}, + {copy, "{{lib_dirs}}/crc32cer/priv/crc32cer*.so", "bin/"}, + {copy, "{{lib_dirs}}/snappyer/priv/snappyer.so", "bin/"} ]} ]}]}, {test, [ @@ -28,7 +28,7 @@ , {jsone, "1.7.0"} , {meck, "0.9.2"} , {proper, "1.4.0"} - , {snabbkaffe, "1.0.1"} + , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {branch, "1.0.8"}}} ]}, {erl_opts, [warnings_as_errors, {d, build_brod_cli}]} ]} diff --git a/scripts/docker-compose.yml b/scripts/docker-compose.yml index 34d28561..45839f99 100644 --- a/scripts/docker-compose.yml +++ b/scripts/docker-compose.yml @@ -2,14 +2,14 @@ version: "2" services: zookeeper: - image: "zmstone/kafka:${KAFKA_VERSION}" + image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}" container_name: zookeeper command: run zookeeper network_mode: host kafka_1: depends_on: - zookeeper - image: "zmstone/kafka:${KAFKA_VERSION}" + image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}" container_name: "kafka-1" network_mode: host environment: @@ -23,7 +23,7 @@ services: kafka_2: depends_on: - zookeeper - image: "zmstone/kafka:${KAFKA_VERSION}" + image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}" container_name: "kafka-2" network_mode: host environment: diff --git a/scripts/setup-test-env.sh b/scripts/setup-test-env.sh index b877ef2f..25264fd1 100755 --- a/scripts/setup-test-env.sh +++ b/scripts/setup-test-env.sh @@ -1,5 +1,9 @@ #!/bin/bash -eu +if [ -n "${DEBUG:-}" ]; then + set -x +fi + docker ps > /dev/null || { echo "You must be a member of docker group to run this script" exit 1 @@ -18,46 +22,77 @@ function docker_compose { fi } -VERSION=${KAFKA_VERSION:-2.4} -if [ -z $VERSION ]; then VERSION=$1; fi +KAFKA_VERSION=${KAFKA_VERSION:-3.6} +if [ -z $KAFKA_VERSION ]; then KAFKA_VERSION=$1; fi -case $VERSION in +case $KAFKA_VERSION in + 0.9*) + KAFKA_VERSION="0.9";; 0.10*) - VERSION="0.10";; + KAFKA_VERSION="0.10";; 0.11*) - VERSION="0.11";; + KAFKA_VERSION="0.11";; 1.*) - VERSION="1.1";; + KAFKA_VERSION="1.1";; 2.*) - VERSION="2.4";; + KAFKA_VERSION="2.8";; + 3.*) + KAFKA_VERSION="3.6";; *) - VERSION="2.4";; + KAFKA_VERSION="3.6";; esac -echo "Using KAFKA_VERSION=$VERSION" -export KAFKA_VERSION=$VERSION +export KAFKA_IMAGE_VERSION="1.1-${KAFKA_VERSION}" +echo "env KAFKA_IMAGE_VERSION=$KAFKA_IMAGE_VERSION" TD="$(cd "$(dirname "$0")" && pwd)" docker_compose -f $TD/docker-compose.yml down || true docker_compose -f $TD/docker-compose.yml up -d +if [[ "$KAFKA_VERSION" == 2* ]] || [[ "$KAFKA_VERSION" == 3* ]]; then + MAYBE_ZOOKEEPER="--bootstrap-server localhost:9092" +else + MAYBE_ZOOKEEPER="--zookeeper localhost:2181" +fi -n=0 -while [ "$(docker exec kafka-1 bash -c '/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --list')" != '' ]; do - if [ $n -gt 4 ]; then - echo "timeout waiting for kakfa_1" - exit 1 +TOPIC_LIST_CMD="/opt/kafka/bin/kafka-topics.sh $MAYBE_ZOOKEEPER --list" +MAX_WAIT_SEC=10 + +function wait_for_kafka { + local which_kafka="$1" + local n=0 + local port=':9092' + local topic_list listener + if [ "$which_kafka" = 'kafka-2' ]; then + port=':9192' fi - n=$(( n + 1 )) - sleep 1 -done + while true; do + listener="$(netstat -tnlp 2>&1 | grep $port || true)" + if [ "$listener" != '' ]; then + topic_list="$(docker exec $which_kafka $TOPIC_LIST_CMD 2>&1)" + if [ "${topic_list-}" = '' ]; then + break + fi + fi + if [ $n -gt $MAX_WAIT_SEC ]; then + echo "timeout waiting for kafka-1" + echo "last print: ${topic_list:-}" + exit 1 + fi + n=$(( n + 1 )) + sleep 1 + done +} + +wait_for_kafka kafka-1 +wait_for_kafka kafka-2 function create_topic { TOPIC_NAME="$1" PARTITIONS="${2:-1}" REPLICAS="${3:-1}" - CMD="/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --create --partitions $PARTITIONS --replication-factor $REPLICAS --topic $TOPIC_NAME --config min.insync.replicas=1" + CMD="/opt/kafka/bin/kafka-topics.sh $MAYBE_ZOOKEEPER --create --partitions $PARTITIONS --replication-factor $REPLICAS --topic $TOPIC_NAME --config min.insync.replicas=1" docker exec kafka-1 bash -c "$CMD" } @@ -80,7 +115,7 @@ create_topic "brod_compression_SUITE" create_topic "lz4-test" create_topic "test-topic" -if [[ "$KAFKA_VERSION" = 2* ]]; then +if [[ "$KAFKA_VERSION" = 2* ]] || [[ "$KAFKA_VERSION" = 3* ]]; then MAYBE_NEW_CONSUMER="" else MAYBE_NEW_CONSUMER="--new-consumer" @@ -90,5 +125,5 @@ docker exec kafka-1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server l # for kafka 0.11 or later, add sasl-scram test credentials if [[ "$KAFKA_VERSION" != 0.9* ]] && [[ "$KAFKA_VERSION" != 0.10* ]]; then - docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice + docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh $MAYBE_ZOOKEEPER --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice fi diff --git a/src/brod_consumer.erl b/src/brod_consumer.erl index 37cb1756..a4c39d5c 100644 --- a/src/brod_consumer.erl +++ b/src/brod_consumer.erl @@ -462,7 +462,7 @@ handle_fetch_response(#kpro_rsp{ref = Ref1}, } = State) when Ref1 =/= Ref2 -> %% Not expected response, discard {noreply, State}; -handle_fetch_response(#kpro_rsp{ref = Ref} = Rsp, +handle_fetch_response(#kpro_rsp{ref = Ref, vsn = Vsn} = Rsp, #state{ topic = Topic , partition = Partition , last_req_ref = Ref @@ -472,7 +472,7 @@ handle_fetch_response(#kpro_rsp{ref = Ref} = Rsp, {ok, #{ header := Header , batches := Batches }} -> - handle_batches(Header, Batches, State); + handle_batches(Header, Batches, State, Vsn); {error, ErrorCode} -> Error = #kafka_fetch_error{ topic = Topic , partition = Partition @@ -481,7 +481,7 @@ handle_fetch_response(#kpro_rsp{ref = Ref} = Rsp, handle_fetch_error(Error, State) end. -handle_batches(?undef, [], #state{} = State0) -> +handle_batches(?undef, [], #state{} = State0, _Vsn) -> %% It is only possible to end up here in a incremental %% fetch session, empty fetch response implies no %% new messages to fetch, and no changes in partition @@ -491,25 +491,38 @@ handle_batches(?undef, [], #state{} = State0) -> State = maybe_delay_fetch_request(State0), {noreply, State}; handle_batches(_Header, ?incomplete_batch(Size), - #state{max_bytes = MaxBytes} = State0) -> + #state{max_bytes = MaxBytes} = State0, _Vsn) -> %% max_bytes is too small to fetch ONE complete batch true = Size > MaxBytes, %% assert State1 = State0#state{max_bytes = Size}, State = maybe_send_fetch_request(State1), {noreply, State}; -handle_batches(Header, [], #state{begin_offset = BeginOffset} = State0) -> +handle_batches(Header, [], #state{begin_offset = BeginOffset} = State0, Vsn) -> StableOffset = brod_utils:get_stable_offset(Header), State = case BeginOffset < StableOffset of - true -> - %% There are chances that kafka may return empty message set + true when Vsn > 0 -> + %% There are chances that Kafka may return empty message set %% when messages are deleted from a compacted topic. %% Since there is no way to know how big the 'hole' is %% we can only bump begin_offset with +1 and try again. + ?BROD_LOG_WARNING("~s-~p empty_batch_detected_at_offset=~p, " + "fetch_api_vsn=~p, skip_to_offset=~p", + [State0#state.topic, + State0#state.partition, + BeginOffset, + BeginOffset + 1 + ]), State1 = State0#state{begin_offset = BeginOffset + 1}, maybe_send_fetch_request(State1); + true -> + %% Fetch API v0 (Kafka 0.9 and 0.10) seems to have a race condition: + %% Kafka returns empty batch even if BeginOffset is lower than high-watermark + %% if fetch request is sent in a tight loop + %% Retry seems to resolve the issue + maybe_delay_fetch_request(State0); false -> - %% we have either reached the end of a partition + %% We have either reached the end of a partition %% or trying to read uncommitted messages %% try to poll again (maybe after a delay) maybe_delay_fetch_request(State0) @@ -521,7 +534,7 @@ handle_batches(Header, Batches, , begin_offset = BeginOffset , topic = Topic , partition = Partition - } = State0) -> + } = State0, _Vsn) -> StableOffset = brod_utils:get_stable_offset(Header), {NewBeginOffset, Messages} = brod_utils:flatten_batches(BeginOffset, Header, Batches), @@ -721,6 +734,8 @@ send_fetch_request(#state{ begin_offset = BeginOffset } = State) -> (is_integer(BeginOffset) andalso BeginOffset >= 0) orelse erlang:error({bad_begin_offset, BeginOffset}), + %% MaxBytes=0 will make no progress when it's Kafka 0.9 + MaxBytes = max(12, State#state.max_bytes), Request = brod_kafka_request:fetch(Connection, State#state.topic, @@ -728,7 +743,7 @@ send_fetch_request(#state{ begin_offset = BeginOffset State#state.begin_offset, State#state.max_wait_time, State#state.min_bytes, - State#state.max_bytes, + MaxBytes, State#state.isolation_level), case kpro:request_async(Connection, Request) of ok -> diff --git a/src/brod_utils.erl b/src/brod_utils.erl index 61113519..f7732a1a 100644 --- a/src/brod_utils.erl +++ b/src/brod_utils.erl @@ -453,12 +453,25 @@ fetch(Conn, ReqFun, Offset, MaxBytes) -> fetch(Conn, ReqFun, Offset, Size); {ok, #{header := Header, batches := Batches}} -> StableOffset = get_stable_offset(Header), - {NewBeginOffset, Msgs} = flatten_batches(Offset, Header, Batches), + {NewBeginOffset0, Msgs} = flatten_batches(Offset, Header, Batches), case Offset < StableOffset andalso Msgs =:= [] of true -> - %% Not reached the latest stable offset yet, - %% but received an empty batch-set (all messages are dropped). - %% try again with new begin-offset + NewBeginOffset = + case NewBeginOffset0 > Offset of + true -> + %% Not reached the latest stable offset yet, + %% but resulted in an empty batch-set, + %% i.e. all messages are dropped due to they are before + %% the last fetch Offset. + %% try again with new begin-offset. + NewBeginOffset0; + false when NewBeginOffset0 =:= Offset -> + %% There are chances that Kafka may return empty message set + %% when messages are deleted from a compacted topic. + %% Since there is no way to know how big the 'hole' is + %% we can only bump begin_offset with +1 and try again. + NewBeginOffset0 + 1 + end, fetch(Conn, ReqFun, NewBeginOffset, MaxBytes); false -> {ok, {StableOffset, Msgs}} diff --git a/test/brod_SUITE.erl b/test/brod_SUITE.erl index 0ea476be..a7f0bb51 100644 --- a/test/brod_SUITE.erl +++ b/test/brod_SUITE.erl @@ -41,10 +41,11 @@ suite() -> [{timetrap, {minutes, 5}}]. init_per_suite(Config) -> - case os:getenv("KAFKA_VERSION") of - "0.9" -> {skip, - "The given Kafka test image does not have support for these apis"}; - _ -> Config + case kafka_test_helper:kafka_version() of + {0, 9} -> + {skip, "no_topic_manaegment_apis"}; + _ -> + Config end. end_per_suite(_Config) -> diff --git a/test/brod_cli_tests.erl b/test/brod_cli_tests.erl index 4d382fb2..014c9e1f 100644 --- a/test/brod_cli_tests.erl +++ b/test/brod_cli_tests.erl @@ -180,7 +180,13 @@ get_kafka_version() -> {list_to_integer(Major), list_to_integer(Minor)} end. -run(Args) -> +run(Args0) -> + Args = case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + Args0 ++ ["--no-api-vsn-query"]; + _ -> + Args0 + end, _ = cmd(Args), ok. diff --git a/test/brod_compression_SUITE.erl b/test/brod_compression_SUITE.erl index ac8b9122..b51540b9 100644 --- a/test/brod_compression_SUITE.erl +++ b/test/brod_compression_SUITE.erl @@ -201,10 +201,7 @@ start_client(Hosts, ClientId) -> brod:start_client(Hosts, ClientId, Config). client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). %%%_* Emacs ==================================================================== %%% Local Variables: diff --git a/test/brod_consumer_SUITE.erl b/test/brod_consumer_SUITE.erl index 4c60a4a0..93774d1a 100644 --- a/test/brod_consumer_SUITE.erl +++ b/test/brod_consumer_SUITE.erl @@ -76,6 +76,17 @@ end end()). +-define(RETRY(EXPR, Timeout), + retry( + fun() -> + case EXPR of + {ok, R} -> + {ok, R}; + {error, _} -> + false + end + end, Timeout)). + %%%_* ct callbacks ============================================================= suite() -> [{timetrap, {seconds, 30}}]. @@ -104,11 +115,7 @@ init_per_testcase(Case, Config0) -> end, ClientConfig1 = proplists:get_value(client_config, Config, []), brod:stop_client(Client), - ClientConfig = - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end ++ ClientConfig0 ++ ClientConfig1, + ClientConfig = kafka_test_helper:client_config() ++ ClientConfig0 ++ ClientConfig1, ok = brod:start_client(?HOSTS, Client, ClientConfig), ok = brod:start_producer(Client, Topic, ProducerConfig), try ?MODULE:Case(standalone_consumer) of @@ -140,12 +147,21 @@ end_per_testcase(Case, Config) -> ok end. -all() -> [F || {F, _A} <- module_info(exports), +all() -> + Cases = [F || {F, _A} <- module_info(exports), case atom_to_list(F) of "t_" ++ _ -> true; _ -> false - end]. - + end], + Filter = fun(Case) -> + try + ?MODULE:Case(kafka_version_match) + catch + _:_ -> + true + end + end, + lists:filter(Filter, Cases). %%%_* Test functions =========================================================== @@ -153,14 +169,11 @@ all() -> [F || {F, _A} <- module_info(exports), %% messages fetched back should only contain the committed message %% i.e. aborted messages (testing with isolation_level=read_committed) %% should be dropped, control messages (transaction abort) should be dropped +t_drop_aborted(kafka_version_match) -> + has_txn(); t_drop_aborted(Config) when is_list(Config) -> - case has_txn() of - true -> - test_drop_aborted(Config, true), - test_drop_aborted(Config, false); - false -> - ok - end. + test_drop_aborted(Config, true), + test_drop_aborted(Config, false). %% When QueryApiVsn is set to false, %% brod will use lowest supported API version. @@ -173,7 +186,7 @@ test_drop_aborted(Config, QueryApiVsn) -> fun(CommitOrAbort) -> TxnId = make_transactional_id(), {ok, Conn} = connect_txn_coordinator(TxnId, ?config(client_config)), - {ok, TxnCtx} = kpro:txn_init_ctx(Conn, TxnId), + {ok, TxnCtx} = ?RETRY(kpro:txn_init_ctx(Conn, TxnId), 10), ok = kpro:txn_send_partitions(TxnCtx, [{Topic, Partition}]), Key = bin([atom_to_list(CommitOrAbort), "-", make_unique_key()]), Vsn = 3, %% lowest API version which supports transactional produce @@ -217,11 +230,10 @@ test_drop_aborted(Config, QueryApiVsn) -> ], Msgs) end. +t_wait_for_unstable_offsets(kafka_version_match) -> + has_txn(); t_wait_for_unstable_offsets(Config) when is_list(Config) -> - case has_txn() of - true -> t_wait_for_unstable_offsets({run, Config}); - false -> ok - end; + t_wait_for_unstable_offsets({run, Config}); t_wait_for_unstable_offsets({run, Config}) -> Client = ?config(client), Topic = ?TOPIC, @@ -230,7 +242,7 @@ t_wait_for_unstable_offsets({run, Config}) -> {ok, Conn} = connect_txn_coordinator(TxnId, ?config(client_config)), %% ensure we have enough time to test before expire TxnOpts = #{txn_timeout => timer:seconds(30)}, - {ok, TxnCtx} = kpro:txn_init_ctx(Conn, TxnId, TxnOpts), + {ok, TxnCtx} = ?RETRY(kpro:txn_init_ctx(Conn, TxnId, TxnOpts), 10), ok = kpro:txn_send_partitions(TxnCtx, [{Topic, Partition}]), %% Send one message in this transaction, return the offset in kafka ProduceFun = @@ -278,13 +290,10 @@ t_wait_for_unstable_offsets({run, Config}) -> %% Produce large(-ish) transactional batches, then abort them all %% try fetch from offsets in the middle of large batches, %% expect no delivery of any aborted batches. +t_fetch_aborted_from_the_middle(kafka_version_match) -> + has_txn(); t_fetch_aborted_from_the_middle(Config) when is_list(Config) -> - case has_txn() of - true -> - test_fetch_aborted_from_the_middle(Config); - false -> - ok - end. + test_fetch_aborted_from_the_middle(Config). test_fetch_aborted_from_the_middle(Config) when is_list(Config) -> Client = ?config(client), @@ -292,7 +301,7 @@ test_fetch_aborted_from_the_middle(Config) when is_list(Config) -> Partition = 0, TxnId = make_transactional_id(), {ok, Conn} = connect_txn_coordinator(TxnId, ?config(client_config)), - {ok, TxnCtx} = kpro:txn_init_ctx(Conn, TxnId), + {ok, TxnCtx} = ?RETRY(kpro:txn_init_ctx(Conn, TxnId), 10), ok = kpro:txn_send_partitions(TxnCtx, [{Topic, Partition}]), %% make a large-ish message MkMsg = fun(Key) -> @@ -413,6 +422,12 @@ t_fold(Config) when is_list(Config) -> 0, ErrorFoldF, #{})), ok. +%% This test case does not work with Kafka 0.9, not sure aobut 0.10 and 0.11 +%% since all 0.x versions are old enough, we only try to verify this against +%% 1.x or newer +t_direct_fetch_with_small_max_bytes(kafka_version_match) -> + {Major, _Minor} = kafka_test_helper:kafka_version(), + Major > 1; t_direct_fetch_with_small_max_bytes(Config) when is_list(Config) -> Client = ?config(client), Topic = ?TOPIC, @@ -428,6 +443,11 @@ t_direct_fetch_with_small_max_bytes(Config) when is_list(Config) -> ?assertEqual(Key, Msg#kafka_message.key), ok. +%% Starting from version 3, Kafka no longer returns incomplete batch +%% for Fetch request v0, cannot test max_bytes expansion anymore. +t_direct_fetch_expand_max_bytes(kafka_version_match) -> + {Major, _Minor} = kafka_test_helper:kafka_version(), + Major < 3; t_direct_fetch_expand_max_bytes({init, Config}) when is_list(Config) -> %% kafka returns empty message set when it's 0.9 %% or when fetch request sent was version 0 @@ -441,7 +461,7 @@ t_direct_fetch_expand_max_bytes(Config) when is_list(Config) -> Value = crypto:strong_rand_bytes(100), ok = brod:produce_sync(Client, ?TOPIC, Partition, Key, Value), {ok, Offset} = brod:resolve_offset(?HOSTS, Topic, Partition, - ?OFFSET_LATEST, ?config(client_config)), + ?OFFSET_LATEST, ?config(client_config)), {ok, {_, [Msg]}} = brod:fetch({?HOSTS, ?config(client_config)}, Topic, Partition, Offset - 1, #{max_bytes => 13}), @@ -450,6 +470,9 @@ t_direct_fetch_expand_max_bytes(Config) when is_list(Config) -> %% @doc Consumer should be smart enough to try greater max_bytes %% when it's not great enough to fetch one single message +t_consumer_max_bytes_too_small(kafka_version_match) -> + {Major, _Minor} = kafka_test_helper:kafka_version(), + Major < 3; t_consumer_max_bytes_too_small({init, Config}) -> meck:new(brod_kafka_request, [passthrough, no_passthrough_cover, no_history]), %% kafka returns empty message set when it's 0.9 @@ -464,9 +487,8 @@ t_consumer_max_bytes_too_small(Config) -> brod:unsubscribe(Client, ?TOPIC, Partition), Key = make_unique_key(), ValueBytes = 2000, - MaxBytes1 = 8, %% too small for even the header - MaxBytes2 = 12, %% too small but message size is fetched - MaxBytes3 = size(Key) + ValueBytes, + MaxBytes1 = 12, %% too small for even the header + MaxBytes2 = size(Key) + ValueBytes, Tester = self(), F = fun(Conn, Topic, Partition1, BeginOffset, MaxWait, MinBytes, MaxBytes, IsolationLevel) -> @@ -482,8 +504,7 @@ t_consumer_max_bytes_too_small(Config) -> brod:subscribe(Client, self(), ?TOPIC, Partition, Options), ok = brod:produce_sync(Client, ?TOPIC, Partition, Key, Value), ok = wait_for_max_bytes_sequence([{'=', MaxBytes1}, - {'=', MaxBytes2}, - {'>', MaxBytes3}], + {'>', MaxBytes2}], _TriedCount = 0), ?WAIT_ONLY( {ConsumerPid, #kafka_message_set{messages = Messages}}, @@ -843,7 +864,9 @@ wait_for_max_bytes_sequence([{Compare, MaxBytes} | Rest] = Waiting, Cnt) -> wait_for_max_bytes_sequence(Waiting, Cnt + 1); _ -> ct:fail("unexpected ~p, expecting ~p", [Bytes, {Compare, MaxBytes}]) - end + end; + Other -> + error(Other) after 3000 -> ct:fail("timeout", []) @@ -875,13 +898,6 @@ connect_txn_coordinator(TxnId, Config, RetriesLeft, _LastError) -> connect_txn_coordinator(TxnId, Config, RetriesLeft - 1, Reason) end. -has_txn() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> false; - "0.10" ++ _ -> false; - _ -> true - end. - consumer_config() -> [{max_wait_time, 1000}, {sleep_timeout, 10}]. retry(_F, 0) -> error(timeout); @@ -901,6 +917,14 @@ wait_for_consumer_connection(Consumer, OldConn) -> end, retry(F, 5). +has_txn() -> + case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + false; + _ -> + true + end. + %%%_* Emacs ==================================================================== %%% Local Variables: %%% allout-layout: t diff --git a/test/brod_demo_group_subscriber_koc.erl b/test/brod_demo_group_subscriber_koc.erl index 49407ea6..5680c780 100644 --- a/test/brod_demo_group_subscriber_koc.erl +++ b/test/brod_demo_group_subscriber_koc.erl @@ -218,10 +218,7 @@ os_time_utc_str() -> lists:flatten(S). client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). %%%_* Emacs ==================================================================== %%% Local Variables: diff --git a/test/brod_demo_group_subscriber_loc.erl b/test/brod_demo_group_subscriber_loc.erl index 39f5d543..b9f7539e 100644 --- a/test/brod_demo_group_subscriber_loc.erl +++ b/test/brod_demo_group_subscriber_loc.erl @@ -235,10 +235,7 @@ os_time_utc_str() -> lists:flatten(S). client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). %%%_* Emacs ==================================================================== %%% Local Variables: diff --git a/test/brod_demo_topic_subscriber.erl b/test/brod_demo_topic_subscriber.erl index 77de8500..e49d5b73 100644 --- a/test/brod_demo_topic_subscriber.erl +++ b/test/brod_demo_topic_subscriber.erl @@ -177,10 +177,7 @@ os_time_utc_str() -> lists:flatten(S). client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). commit_dir(Topic) -> filename:join(["/tmp", Topic]). diff --git a/test/brod_group_coordinator_SUITE.erl b/test/brod_group_coordinator_SUITE.erl index ace01079..01a3e47f 100644 --- a/test/brod_group_coordinator_SUITE.erl +++ b/test/brod_group_coordinator_SUITE.erl @@ -73,10 +73,7 @@ common_end_per_testcase(_Case, Config) when is_list(Config) -> ok = application:stop(brod). client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). %%%_* Group coordinator callbacks ============================================== diff --git a/test/brod_group_subscriber_SUITE.erl b/test/brod_group_subscriber_SUITE.erl index d55dea72..7ad69d14 100644 --- a/test/brod_group_subscriber_SUITE.erl +++ b/test/brod_group_subscriber_SUITE.erl @@ -550,6 +550,7 @@ t_async_commit({init, Config}) -> integer_to_list(rand:uniform(1000000000))), [{group_id, GroupId} | Config]; t_async_commit(Config) when is_list(Config) -> + GroupId = ?config(group_id), Behavior = ?config(behavior), Topic = ?topic, Partition = 0, @@ -562,7 +563,7 @@ t_async_commit(Config) when is_list(Config) -> , {partition_restart_delay_seconds, 1} , {begin_offset, Offset} ], - {ok, SubscriberPid} = start_subscriber(?group_id, Config, [Topic], + {ok, SubscriberPid} = start_subscriber(GroupId, Config, [Topic], GroupConfig, ConsumerConfig, InitArgs), SubscriberPid diff --git a/test/brod_offset_txn_SUITE.erl b/test/brod_offset_txn_SUITE.erl index 5565a8a4..aee57168 100644 --- a/test/brod_offset_txn_SUITE.erl +++ b/test/brod_offset_txn_SUITE.erl @@ -36,8 +36,13 @@ suite() -> [{timetrap, {seconds, 30}}]. init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(brod), - Config. + case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + {skip, "no_transaction"}; + _ -> + {ok, _} = application:ensure_all_started(brod), + Config + end. end_per_suite(_Config) -> ok. @@ -72,17 +77,14 @@ end_per_testcase(_Case, Config) -> end, Config. -all() -> [F || {F, _A} <- module_info(exports), +all() -> + [F || {F, _A} <- module_info(exports), case atom_to_list(F) of "t_" ++ _ -> true; _ -> false end]. -client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. +client_config() -> kafka_test_helper:client_config(). init(GroupId, #{ client := Client diff --git a/test/brod_producer_SUITE.erl b/test/brod_producer_SUITE.erl index 558340f4..40004da3 100644 --- a/test/brod_producer_SUITE.erl +++ b/test/brod_producer_SUITE.erl @@ -452,10 +452,7 @@ t_configure_produce_api_vsn(Config) when is_list(Config) -> %%%_* Help functions =========================================================== client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). %% os:timestamp should be unique enough for testing make_unique_kv() -> diff --git a/test/brod_topic_subscriber_SUITE.erl b/test/brod_topic_subscriber_SUITE.erl index 49f72e91..db61189d 100644 --- a/test/brod_topic_subscriber_SUITE.erl +++ b/test/brod_topic_subscriber_SUITE.erl @@ -113,7 +113,7 @@ handle_info({ack_offset, Partition, Offset} = Msg, #state{ counter = Counter , worker_id = Ref } = State0) -> %% Participate in state continuity checks - ?tp(topic_subscriber_seen_info, + ?tp(topic_subscriber_seen_info, #{ partition => Partition , offset => Offset , msg => Msg @@ -211,7 +211,7 @@ t_consumer_ack_via_message_passing(Config) when is_list(Config) -> Partition = 0, SendFun = fun(I) -> - produce({?topic, Partition}, <>) + produce({?topic, Partition}, integer_to_binary(I)) end, ?check_trace( %% Run stage: @@ -225,12 +225,12 @@ t_consumer_ack_via_message_passing(Config) when is_list(Config) -> {ok, SubscriberPid} = brod:start_link_topic_subscriber(?CLIENT_ID, ?topic, ConsumerConfig, ?MODULE, InitArgs), - {ok, _} = wait_message(<<1>>), + {ok, _} = wait_message(<<"1">>), %% ack_offset allows consumer to proceed to message 2 SubscriberPid ! {ack_offset, 0, Offset0}, - {ok, _} = wait_message(<<2>>), + {ok, _} = wait_message(<<"2">>), ok = brod_topic_subscriber:stop(SubscriberPid), - _Expected = [<<1>>, <<2>>] + _Expected = [<<"1">>, <<"2">>] end, %% Check stage: fun(Expected, Trace) -> diff --git a/test/brod_txn_SUITE.erl b/test/brod_txn_SUITE.erl index b5fb9294..8892695c 100644 --- a/test/brod_txn_SUITE.erl +++ b/test/brod_txn_SUITE.erl @@ -29,8 +29,13 @@ suite() -> [{timetrap, {seconds, 30}}]. init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(brod), - Config. + case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + {skip, "no_transaction"}; + _ -> + {ok, _} = application:ensure_all_started(brod), + Config + end. end_per_suite(_Config) -> ok. @@ -80,10 +85,7 @@ all() -> [F || {F, _A} <- module_info(exports), end]. client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + []. subscriber_loop(TesterPid) -> receive diff --git a/test/brod_txn_processor_SUITE.erl b/test/brod_txn_processor_SUITE.erl index cc2d5e3d..29ee0e3c 100644 --- a/test/brod_txn_processor_SUITE.erl +++ b/test/brod_txn_processor_SUITE.erl @@ -35,8 +35,13 @@ suite() -> [{timetrap, {seconds, 30}}]. init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(brod), - Config. + case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + {skip, "no_transaction"}; + _ -> + {ok, _} = application:ensure_all_started(brod), + Config + end. end_per_suite(_Config) -> ok. @@ -65,11 +70,7 @@ all() -> [F || {F, _A} <- module_info(exports), _ -> false end]. -client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. +client_config() -> kafka_test_helper:client_config(). rand() -> iolist_to_binary([base64:encode(crypto:strong_rand_bytes(8))]). diff --git a/test/data/ssl/README.md b/test/data/ssl/README.md new file mode 100644 index 00000000..70880474 --- /dev/null +++ b/test/data/ssl/README.md @@ -0,0 +1,3 @@ +This dir holds files for TLS/SSL tests. +The files are copied from Kafka docker image in the `make test-env` step. +See how the docker image is built here: https://github.com/zmstone/docker-kafka diff --git a/test/data/ssl/ca.pem b/test/data/ssl/ca.pem deleted file mode 100644 index 614aa1ef..00000000 --- a/test/data/ssl/ca.pem +++ /dev/null @@ -1,22 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDkzCCAnugAwIBAgIJAPjeRT8z4mElMA0GCSqGSIb3DQEBCwUAMGAxCzAJBgNV -BAYTAlNFMRIwEAYDVQQIDAlTdG9ja2hvbG0xEjAQBgNVBAcMCVN0b2NraG9sbTEN -MAsGA1UECgwEYnJvZDENMAsGA1UECwwEdGVzdDELMAkGA1UEAwwCKi4wHhcNMTYx -MTA0MTYxNDM2WhcNMjYxMTAyMTYxNDM2WjBgMQswCQYDVQQGEwJTRTESMBAGA1UE -CAwJU3RvY2tob2xtMRIwEAYDVQQHDAlTdG9ja2hvbG0xDTALBgNVBAoMBGJyb2Qx -DTALBgNVBAsMBHRlc3QxCzAJBgNVBAMMAiouMIIBIjANBgkqhkiG9w0BAQEFAAOC -AQ8AMIIBCgKCAQEAyIbBpX2DvhIbcXx1uho3Vm+hOLXrZJwNgVL3yDx/anGPvD2a -ZkUjdrJNh8jy5ZFA7jBQGLYIyMQYY8UMyAPIQbCsi0wvFhcWfv+/VTSOfgcK04D+ -QQRni8lkWI66oBcM02Wtwo3K5W7KWJ+LOAaV5hmSvLhcyIsSQC6MRBGRGJ89Oyza -7s1FrCY0HCa6BicY48sLTHTT8MScK5kOMO5KqMK8rY/dLRYynhC2K8/stzqN27HI -MoktDEzzCAfRaNfXE8o1NekJcpFLQNi9/nab7vcbWo/QmUCCF0Ny5BGWEx+GpEp9 -HjVM5KYAYlDqpMm3wttMs7dtU9lEXZk69uCejwIDAQABo1AwTjAdBgNVHQ4EFgQU -I1wMy5ObzZNi7qh3W9VSYKJRctYwHwYDVR0jBBgwFoAUI1wMy5ObzZNi7qh3W9VS -YKJRctYwDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAGmyomSHF4sZh -yNIdDmeUp+nOFE6dXuK1vo8PlKrf+tyajNdic5ZMCF9BzbKSxFnejwHA2fzBlu6P -27kmmMPWuAhvcyxNciLZ/gGCn5gMPutKaweuaD6G93jkIngdXtbz+c6icpwsO9cK -Z0mdVuesJnmLQYLn9pHDzGUGYPFZpHVXwQzyAVw4m9T+aqKwwe/0dL1Z/8b/iuwN -K0S4/c7gLH8rB1jQisHomgHano43TzJq8ZFX7wF1E2tnHDdGk+uEZr5C7VPRgrF8 -/DhGGJnw3AoQgD5g1YqFGA5pA0AXr4RF27Y7bKYnzvbktOkfcNhw/4P2rKXWWs1Q -x2xsU3VaTQ== ------END CERTIFICATE----- diff --git a/test/data/ssl/client-crt.pem b/test/data/ssl/client-crt.pem deleted file mode 100644 index 45df2707..00000000 --- a/test/data/ssl/client-crt.pem +++ /dev/null @@ -1,20 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDPDCCAiQCCQDU7hj/emVKfDANBgkqhkiG9w0BAQsFADBgMQswCQYDVQQGEwJT -RTESMBAGA1UECAwJU3RvY2tob2xtMRIwEAYDVQQHDAlTdG9ja2hvbG0xDTALBgNV -BAoMBGJyb2QxDTALBgNVBAsMBHRlc3QxCzAJBgNVBAMMAiouMB4XDTE2MTEwNDE2 -MTQzNloXDTI2MTEwMjE2MTQzNlowYDELMAkGA1UEBhMCU0UxEjAQBgNVBAgMCVN0 -b2NraG9sbTESMBAGA1UEBwwJU3RvY2tob2xtMQ0wCwYDVQQKDARicm9kMQ0wCwYD -VQQLDAR0ZXN0MQswCQYDVQQDDAIqLjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC -AQoCggEBAKtN+UzF8g9JRhKXn/rnm4jCr26+HpboDQYxG1HSCwwWskdOMK1b/8w4 -ipNzpoV16teRW5AVdq5Z6DDzBE5X43rrJZ9+x6pd25mVyktmwAIxEYscLtxN1UoL -a5EF13D8UPWCyzylThhUwi67bHvbLeWzAKoccKqdV/5ZNjFnqt9Q9seFOxyXNcFE -/qfUQTfkcL4rei2dgkFPFOXbF2rKRgMaiseyVAJP0G8AcsCkQvaYnkQrJ8nAZBtI -vZmq2og9PW7Z8rEbm9TVLnLNtEE5Lx2S1SQS9QPccYJDAyQJLCOw2ikGQPgtDfbs -KILEp+MChTWgEeb/LBlN/qa+zDraDm0CAwEAATANBgkqhkiG9w0BAQsFAAOCAQEA -EdsizFjP+hWSa5A0UFRIETvAztpTd+pjWWVv3DwCCRc2aMys+GYnR5fkHtnwKr7u -diZ8SSMZQFhlxA9MRNe8++wKeKeCzqrwIV1+mQcGqrJLl6sxW5TcMs/bRy5BPwZJ -RGlcz6HdLY8UBZzY2Qy2A4VecqwNe07Vg+7Yebui4w09pt5045S/q33/arb/LKP+ -1CbCjNyF3QC0aww+YgML+PgjnNtqbO/85qV424/dMX+aNAotQ/zBdEfEXyFaCoAE -yCHA99FnhHsQ9gwv9vhMLAX+yiBIEoh3e18EtmZdsvsTpDd1KI4nrh44TJfEY65+ -fNeAXYygkzsN14bbk9PgMw== ------END CERTIFICATE----- diff --git a/test/data/ssl/client-key.pem b/test/data/ssl/client-key.pem deleted file mode 100644 index 641967ab..00000000 --- a/test/data/ssl/client-key.pem +++ /dev/null @@ -1,28 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCrTflMxfIPSUYS -l5/655uIwq9uvh6W6A0GMRtR0gsMFrJHTjCtW//MOIqTc6aFderXkVuQFXauWegw -8wROV+N66yWffseqXduZlcpLZsACMRGLHC7cTdVKC2uRBddw/FD1gss8pU4YVMIu -u2x72y3lswCqHHCqnVf+WTYxZ6rfUPbHhTsclzXBRP6n1EE35HC+K3otnYJBTxTl -2xdqykYDGorHslQCT9BvAHLApEL2mJ5EKyfJwGQbSL2ZqtqIPT1u2fKxG5vU1S5y -zbRBOS8dktUkEvUD3HGCQwMkCSwjsNopBkD4LQ327CiCxKfjAoU1oBHm/ywZTf6m -vsw62g5tAgMBAAECggEADFm50Jww4INC5xJBeYB7STfoGA7i+7RNRBYERzjijQOR -5OwxPD52yc2FyC29Yr/mp5YWSOQTQ2y9/dF3jQJvJyyO8NneIV1U+NTA2gDVdRL+ -lc35Xu7JouYB4lnOd5npaFn+tyef4scxnNbscl2SCI6ITLtyMAraDj92VceInUMF -28srCTMdjbhVLpeq80qdeDVnyzlmua1W8pjR1lNXY2IECS9gTp6+JLiMQ0FJlC9V -r+U5iAoqLCNh+QpdM+2Z8kbkKA5PqsWcAhx+dTTkbRPp59r7Qd2xtxde5PGlm6zp -cqXgbWaXCMlbL5C7eOyPfLty3+KPrR6LGW6jGEqioQKBgQDcK2LGx/1PE2Y27p+O -RImN5SYERiRnYen7bm1CBAoH1J5LDxWWcf8Bz8/y4bNvEZJVosvPDRoNilI4RTYD -JiJw/qXio6FG78yIzvCK0WLIPgq6stufdbd/+UsNrDbGTuhk/qti8TSckEEgrUWg -U0NgEc/zyIMQK/4mZSgqeUpuxQKBgQDHLsxRT3Ile4sT2arxv0/KzSmnEL9hCAa9 -Cf+N0mWPrt6rzJUTD0+FBToXGP3k4auKETRb3XHvSHCwwl6Bi+NTPpVYqBT53jEv -fSb6bMjSlZyja+miVh/7TI2on7keus19XtZyks7PKoHa+i4w61zy5jbBdBC/kU1y -8O3HXF4biQKBgQCI6/5o6vTQmZrmrK3TtzHoacqju89l79GoyPrvpD1ss0CiI0Zk -oo5ZXRjQzqZde4sK8MxY/qfmJdCOKBS4Dp46sVMOyH5C9Fy59CBJ5H/PUi4v/41v -9LBiyPFxFlmWKHqEXJDPXnw+pcOrA7caRs3O0CUIUfmYNBPBYwWArJ+qlQKBgFpO -25BaJvTbqNkdLaZiCTl3/9ShgUPrMbLwH5AbvrSAorDeFxEHNhSnpAjo6eSmdPIq -jsTACHJnM8DQv6yY0j7h9zC1NJ19omtXoR6VyA/CibyGpu1VgzabJPc5Q+Os6pJX -N3/HFEFVkn7IQ70mWYQ/4L+hch6JMMZWeliTho+RAoGADcqzTMLtp7kRH8LQcq1n -oCE2FYJPvpd8PWlMCZ0ewSk6CbIgLvwJ+Hw0040m11HlCG7xVNdJV0rAU68Z7txm -pYIXL3D9MlJWCWMjZ7k11fuN1EtPLYYhMgS7FhADdUfFhnRGDkF2LnbvZIh3UtN6 -H5khVwyCU9LwQoxKfTmuxnY= ------END PRIVATE KEY----- diff --git a/test/kafka_test_helper.erl b/test/kafka_test_helper.erl index 56ac9b4f..fd6f0ce3 100644 --- a/test/kafka_test_helper.erl +++ b/test/kafka_test_helper.erl @@ -17,6 +17,7 @@ , client_config/0 , bootstrap_hosts/0 , kill_process/2 + , kafka_version/0 ]). -include("brod_test_macros.hrl"). @@ -82,9 +83,30 @@ produce_payloads(Topic, Partition, Config) -> {LastOffset, Payloads}. client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] + case kafka_version() of + {0, 9} -> [{query_api_versions, false}]; + _ -> [] + end. + +maybe_zookeeper() -> + case kafka_version() of + {3, _} -> + %% Kafka 2.2 started supporting --bootstap-server, but 2.x still supports --zookeeper + %% Starting from 3.0, --zookeeper is no longer supported, must use --bootstrap-server + "--bootstrap-server localhost:9092"; + _ -> + "--zookeeper localhost:2181" + end. + +kafka_version() -> + VsnStr = os:getenv("KAFKA_VERSION"), + case VsnStr =:= "" orelse VsnStr =:= false of + true -> + ct:pal("KAFKA_VERSION is not set, defaulting to 3.6", []), + {3, 6}; + false -> + [Major, Minor | _] = string:tokens(VsnStr, "."), + {list_to_integer(Major), list_to_integer(Minor)} end. prepare_topic(Topic) when is_binary(Topic) -> @@ -97,12 +119,12 @@ prepare_topic({Topic, NumPartitions, NumReplicas}) -> ok = brod:start_producer(?TEST_CLIENT_ID, Topic, _ProducerConfig = []). delete_topic(Name) -> - Delete = "/opt/kafka/bin/kafka-topics.sh --zookeeper localhost " + Delete = "/opt/kafka/bin/kafka-topics.sh " ++ maybe_zookeeper() ++ " --delete --topic ~s", exec_in_kafka_container(Delete, [Name]). create_topic(Name, NumPartitions, NumReplicas) -> - Create = "/opt/kafka/bin/kafka-topics.sh --zookeeper localhost " + Create = "/opt/kafka/bin/kafka-topics.sh " ++ maybe_zookeeper() ++ " --create --partitions ~p --replication-factor ~p" " --topic ~s --config min.insync.replicas=1", exec_in_kafka_container(Create, [NumPartitions, NumReplicas, Name]).