diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 1ab8ffd0..209da873 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -7,8 +7,8 @@ on: branches: - master env: - OTP_VERSION: "24.1" - REBAR_VERSION: "3.17.0" + OTP_VERSION: "26" + REBAR_VERSION: "3.20.0" jobs: lint: @@ -44,8 +44,8 @@ jobs: strategy: fail-fast: false matrix: - otp: ["24.1", "23.3.4.7", "22.3.4.21"] - kafka: ["2.4", "1.1", "0.11"] + otp: ["26"] + kafka: ["0.9", "0.10", "0.11", "2.8", "1.1", "3.6"] steps: - name: Checkout uses: actions/checkout@v2 @@ -69,7 +69,8 @@ jobs: run: | export KAFKA_VERSION=${{ matrix.kafka }} echo "Running Kafka ${KAFKA_VERSION}" - scripts/setup-test-env.sh && rebar3 do ct,eunit + make test-env + make t - name: Store test logs uses: actions/upload-artifact@v1 if: always() diff --git a/.gitignore b/.gitignore index 68fece43..0e07c25f 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ relx docker/ TAGS .vscode/ +test/data/ssl/*.pem diff --git a/Makefile b/Makefile index 426e39a2..7b7d6db9 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,5 @@ +KAFKA_VERSION ?= 3.6 +export KAFKA_VERSION all: compile compile: @@ -8,6 +10,10 @@ lint: test-env: @./scripts/setup-test-env.sh + @mkdir -p ./test/data/ssl + @docker cp kafka-1:/localhost-ca-crt.pem ./test/data/ssl/ca.pem + @docker cp kafka-1:/localhost-client-key.pem ./test/data/ssl/client-key.pem + @docker cp kafka-1:/localhost-client-crt.pem ./test/data/ssl/client-crt.pem ut: @rebar3 eunit -v --cover_export_name ut-$(KAFKA_VERSION) diff --git a/rebar.config b/rebar.config index 4637d1e4..5e2089b8 100644 --- a/rebar.config +++ b/rebar.config @@ -17,9 +17,9 @@ {relx, [{release, {brod, "i"}, % release the interactive shell as brod-i [brod, jsone, docopt]}, {include_erts, true}, - {overlay, [{copy, "scripts/brod", "bin"}, - {copy, "{{lib_dirs}}/crc32cer/priv/crc32cer*.so", "bin"}, - {copy, "{{lib_dirs}}/snappyer/priv/snappyer.so", "bin"} + {overlay, [{copy, "scripts/brod", "bin/"}, + {copy, "{{lib_dirs}}/crc32cer/priv/crc32cer*.so", "bin/"}, + {copy, "{{lib_dirs}}/snappyer/priv/snappyer.so", "bin/"} ]} ]}]}, {test, [ diff --git a/scripts/docker-compose.yml b/scripts/docker-compose.yml index 34d28561..45839f99 100644 --- a/scripts/docker-compose.yml +++ b/scripts/docker-compose.yml @@ -2,14 +2,14 @@ version: "2" services: zookeeper: - image: "zmstone/kafka:${KAFKA_VERSION}" + image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}" container_name: zookeeper command: run zookeeper network_mode: host kafka_1: depends_on: - zookeeper - image: "zmstone/kafka:${KAFKA_VERSION}" + image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}" container_name: "kafka-1" network_mode: host environment: @@ -23,7 +23,7 @@ services: kafka_2: depends_on: - zookeeper - image: "zmstone/kafka:${KAFKA_VERSION}" + image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}" container_name: "kafka-2" network_mode: host environment: diff --git a/scripts/setup-test-env.sh b/scripts/setup-test-env.sh index b877ef2f..25264fd1 100755 --- a/scripts/setup-test-env.sh +++ b/scripts/setup-test-env.sh @@ -1,5 +1,9 @@ #!/bin/bash -eu +if [ -n "${DEBUG:-}" ]; then + set -x +fi + docker ps > /dev/null || { echo "You must be a member of docker group to run this script" exit 1 @@ -18,46 +22,77 @@ function docker_compose { fi } -VERSION=${KAFKA_VERSION:-2.4} -if [ -z $VERSION ]; then VERSION=$1; fi +KAFKA_VERSION=${KAFKA_VERSION:-3.6} +if [ -z $KAFKA_VERSION ]; then KAFKA_VERSION=$1; fi -case $VERSION in +case $KAFKA_VERSION in + 0.9*) + KAFKA_VERSION="0.9";; 0.10*) - VERSION="0.10";; + KAFKA_VERSION="0.10";; 0.11*) - VERSION="0.11";; + KAFKA_VERSION="0.11";; 1.*) - VERSION="1.1";; + KAFKA_VERSION="1.1";; 2.*) - VERSION="2.4";; + KAFKA_VERSION="2.8";; + 3.*) + KAFKA_VERSION="3.6";; *) - VERSION="2.4";; + KAFKA_VERSION="3.6";; esac -echo "Using KAFKA_VERSION=$VERSION" -export KAFKA_VERSION=$VERSION +export KAFKA_IMAGE_VERSION="1.1-${KAFKA_VERSION}" +echo "env KAFKA_IMAGE_VERSION=$KAFKA_IMAGE_VERSION" TD="$(cd "$(dirname "$0")" && pwd)" docker_compose -f $TD/docker-compose.yml down || true docker_compose -f $TD/docker-compose.yml up -d +if [[ "$KAFKA_VERSION" == 2* ]] || [[ "$KAFKA_VERSION" == 3* ]]; then + MAYBE_ZOOKEEPER="--bootstrap-server localhost:9092" +else + MAYBE_ZOOKEEPER="--zookeeper localhost:2181" +fi -n=0 -while [ "$(docker exec kafka-1 bash -c '/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --list')" != '' ]; do - if [ $n -gt 4 ]; then - echo "timeout waiting for kakfa_1" - exit 1 +TOPIC_LIST_CMD="/opt/kafka/bin/kafka-topics.sh $MAYBE_ZOOKEEPER --list" +MAX_WAIT_SEC=10 + +function wait_for_kafka { + local which_kafka="$1" + local n=0 + local port=':9092' + local topic_list listener + if [ "$which_kafka" = 'kafka-2' ]; then + port=':9192' fi - n=$(( n + 1 )) - sleep 1 -done + while true; do + listener="$(netstat -tnlp 2>&1 | grep $port || true)" + if [ "$listener" != '' ]; then + topic_list="$(docker exec $which_kafka $TOPIC_LIST_CMD 2>&1)" + if [ "${topic_list-}" = '' ]; then + break + fi + fi + if [ $n -gt $MAX_WAIT_SEC ]; then + echo "timeout waiting for kafka-1" + echo "last print: ${topic_list:-}" + exit 1 + fi + n=$(( n + 1 )) + sleep 1 + done +} + +wait_for_kafka kafka-1 +wait_for_kafka kafka-2 function create_topic { TOPIC_NAME="$1" PARTITIONS="${2:-1}" REPLICAS="${3:-1}" - CMD="/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --create --partitions $PARTITIONS --replication-factor $REPLICAS --topic $TOPIC_NAME --config min.insync.replicas=1" + CMD="/opt/kafka/bin/kafka-topics.sh $MAYBE_ZOOKEEPER --create --partitions $PARTITIONS --replication-factor $REPLICAS --topic $TOPIC_NAME --config min.insync.replicas=1" docker exec kafka-1 bash -c "$CMD" } @@ -80,7 +115,7 @@ create_topic "brod_compression_SUITE" create_topic "lz4-test" create_topic "test-topic" -if [[ "$KAFKA_VERSION" = 2* ]]; then +if [[ "$KAFKA_VERSION" = 2* ]] || [[ "$KAFKA_VERSION" = 3* ]]; then MAYBE_NEW_CONSUMER="" else MAYBE_NEW_CONSUMER="--new-consumer" @@ -90,5 +125,5 @@ docker exec kafka-1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server l # for kafka 0.11 or later, add sasl-scram test credentials if [[ "$KAFKA_VERSION" != 0.9* ]] && [[ "$KAFKA_VERSION" != 0.10* ]]; then - docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice + docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh $MAYBE_ZOOKEEPER --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice fi diff --git a/src/brod_consumer.erl b/src/brod_consumer.erl index 37cb1756..d210daff 100644 --- a/src/brod_consumer.erl +++ b/src/brod_consumer.erl @@ -502,7 +502,7 @@ handle_batches(Header, [], #state{begin_offset = BeginOffset} = State0) -> State = case BeginOffset < StableOffset of true -> - %% There are chances that kafka may return empty message set + %% There are chances that Kafka may return empty message set %% when messages are deleted from a compacted topic. %% Since there is no way to know how big the 'hole' is %% we can only bump begin_offset with +1 and try again. diff --git a/src/brod_utils.erl b/src/brod_utils.erl index 61113519..f7732a1a 100644 --- a/src/brod_utils.erl +++ b/src/brod_utils.erl @@ -453,12 +453,25 @@ fetch(Conn, ReqFun, Offset, MaxBytes) -> fetch(Conn, ReqFun, Offset, Size); {ok, #{header := Header, batches := Batches}} -> StableOffset = get_stable_offset(Header), - {NewBeginOffset, Msgs} = flatten_batches(Offset, Header, Batches), + {NewBeginOffset0, Msgs} = flatten_batches(Offset, Header, Batches), case Offset < StableOffset andalso Msgs =:= [] of true -> - %% Not reached the latest stable offset yet, - %% but received an empty batch-set (all messages are dropped). - %% try again with new begin-offset + NewBeginOffset = + case NewBeginOffset0 > Offset of + true -> + %% Not reached the latest stable offset yet, + %% but resulted in an empty batch-set, + %% i.e. all messages are dropped due to they are before + %% the last fetch Offset. + %% try again with new begin-offset. + NewBeginOffset0; + false when NewBeginOffset0 =:= Offset -> + %% There are chances that Kafka may return empty message set + %% when messages are deleted from a compacted topic. + %% Since there is no way to know how big the 'hole' is + %% we can only bump begin_offset with +1 and try again. + NewBeginOffset0 + 1 + end, fetch(Conn, ReqFun, NewBeginOffset, MaxBytes); false -> {ok, {StableOffset, Msgs}} diff --git a/test/brod_SUITE.erl b/test/brod_SUITE.erl index 0ea476be..a7f0bb51 100644 --- a/test/brod_SUITE.erl +++ b/test/brod_SUITE.erl @@ -41,10 +41,11 @@ suite() -> [{timetrap, {minutes, 5}}]. init_per_suite(Config) -> - case os:getenv("KAFKA_VERSION") of - "0.9" -> {skip, - "The given Kafka test image does not have support for these apis"}; - _ -> Config + case kafka_test_helper:kafka_version() of + {0, 9} -> + {skip, "no_topic_manaegment_apis"}; + _ -> + Config end. end_per_suite(_Config) -> diff --git a/test/brod_cli_tests.erl b/test/brod_cli_tests.erl index 4d382fb2..014c9e1f 100644 --- a/test/brod_cli_tests.erl +++ b/test/brod_cli_tests.erl @@ -180,7 +180,13 @@ get_kafka_version() -> {list_to_integer(Major), list_to_integer(Minor)} end. -run(Args) -> +run(Args0) -> + Args = case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + Args0 ++ ["--no-api-vsn-query"]; + _ -> + Args0 + end, _ = cmd(Args), ok. diff --git a/test/brod_consumer_SUITE.erl b/test/brod_consumer_SUITE.erl index 4c60a4a0..4c825cae 100644 --- a/test/brod_consumer_SUITE.erl +++ b/test/brod_consumer_SUITE.erl @@ -76,6 +76,17 @@ end end()). +-define(RETRY(EXPR, Timeout), + retry( + fun() -> + case EXPR of + {ok, R} -> + {ok, R}; + {error, _} -> + false + end + end, Timeout)). + %%%_* ct callbacks ============================================================= suite() -> [{timetrap, {seconds, 30}}]. @@ -140,12 +151,21 @@ end_per_testcase(Case, Config) -> ok end. -all() -> [F || {F, _A} <- module_info(exports), +all() -> + Cases = [F || {F, _A} <- module_info(exports), case atom_to_list(F) of "t_" ++ _ -> true; _ -> false - end]. - + end], + Filter = fun(Case) -> + try + ?MODULE:Case(kafka_version_match) + catch + _:_ -> + true + end + end, + lists:filter(Filter, Cases). %%%_* Test functions =========================================================== @@ -153,14 +173,11 @@ all() -> [F || {F, _A} <- module_info(exports), %% messages fetched back should only contain the committed message %% i.e. aborted messages (testing with isolation_level=read_committed) %% should be dropped, control messages (transaction abort) should be dropped +t_drop_aborted(kafka_version_match) -> + has_txn(); t_drop_aborted(Config) when is_list(Config) -> - case has_txn() of - true -> - test_drop_aborted(Config, true), - test_drop_aborted(Config, false); - false -> - ok - end. + test_drop_aborted(Config, true), + test_drop_aborted(Config, false). %% When QueryApiVsn is set to false, %% brod will use lowest supported API version. @@ -173,7 +190,7 @@ test_drop_aborted(Config, QueryApiVsn) -> fun(CommitOrAbort) -> TxnId = make_transactional_id(), {ok, Conn} = connect_txn_coordinator(TxnId, ?config(client_config)), - {ok, TxnCtx} = kpro:txn_init_ctx(Conn, TxnId), + {ok, TxnCtx} = ?RETRY(kpro:txn_init_ctx(Conn, TxnId), 10), ok = kpro:txn_send_partitions(TxnCtx, [{Topic, Partition}]), Key = bin([atom_to_list(CommitOrAbort), "-", make_unique_key()]), Vsn = 3, %% lowest API version which supports transactional produce @@ -217,11 +234,10 @@ test_drop_aborted(Config, QueryApiVsn) -> ], Msgs) end. +t_wait_for_unstable_offsets(kafka_version_match) -> + has_txn(); t_wait_for_unstable_offsets(Config) when is_list(Config) -> - case has_txn() of - true -> t_wait_for_unstable_offsets({run, Config}); - false -> ok - end; + t_wait_for_unstable_offsets({run, Config}); t_wait_for_unstable_offsets({run, Config}) -> Client = ?config(client), Topic = ?TOPIC, @@ -230,7 +246,7 @@ t_wait_for_unstable_offsets({run, Config}) -> {ok, Conn} = connect_txn_coordinator(TxnId, ?config(client_config)), %% ensure we have enough time to test before expire TxnOpts = #{txn_timeout => timer:seconds(30)}, - {ok, TxnCtx} = kpro:txn_init_ctx(Conn, TxnId, TxnOpts), + {ok, TxnCtx} = ?RETRY(kpro:txn_init_ctx(Conn, TxnId, TxnOpts), 10), ok = kpro:txn_send_partitions(TxnCtx, [{Topic, Partition}]), %% Send one message in this transaction, return the offset in kafka ProduceFun = @@ -278,13 +294,10 @@ t_wait_for_unstable_offsets({run, Config}) -> %% Produce large(-ish) transactional batches, then abort them all %% try fetch from offsets in the middle of large batches, %% expect no delivery of any aborted batches. +t_fetch_aborted_from_the_middle(kafka_version_match) -> + has_txn(); t_fetch_aborted_from_the_middle(Config) when is_list(Config) -> - case has_txn() of - true -> - test_fetch_aborted_from_the_middle(Config); - false -> - ok - end. + test_fetch_aborted_from_the_middle(Config). test_fetch_aborted_from_the_middle(Config) when is_list(Config) -> Client = ?config(client), @@ -292,7 +305,7 @@ test_fetch_aborted_from_the_middle(Config) when is_list(Config) -> Partition = 0, TxnId = make_transactional_id(), {ok, Conn} = connect_txn_coordinator(TxnId, ?config(client_config)), - {ok, TxnCtx} = kpro:txn_init_ctx(Conn, TxnId), + {ok, TxnCtx} = ?RETRY(kpro:txn_init_ctx(Conn, TxnId), 10), ok = kpro:txn_send_partitions(TxnCtx, [{Topic, Partition}]), %% make a large-ish message MkMsg = fun(Key) -> @@ -413,6 +426,12 @@ t_fold(Config) when is_list(Config) -> 0, ErrorFoldF, #{})), ok. +%% This test case does not work with Kafka 0.9, not sure aobut 0.10 and 0.11 +%% since all 0.x versions are old enough, we only try to verify this against +%% 1.x or newer +t_direct_fetch_with_small_max_bytes(kafka_version_match) -> + {Major, _Minor} = kafka_test_helper:kafka_version(), + Major > 1; t_direct_fetch_with_small_max_bytes(Config) when is_list(Config) -> Client = ?config(client), Topic = ?TOPIC, @@ -428,6 +447,11 @@ t_direct_fetch_with_small_max_bytes(Config) when is_list(Config) -> ?assertEqual(Key, Msg#kafka_message.key), ok. +%% Starting from version 3, Kafka no longer returns incomplete batch +%% for Fetch request v0, cannot test max_bytes expansion anymore. +t_direct_fetch_expand_max_bytes(kafka_version_match) -> + {Major, _Minor} = kafka_test_helper:kafka_version(), + Major < 3; t_direct_fetch_expand_max_bytes({init, Config}) when is_list(Config) -> %% kafka returns empty message set when it's 0.9 %% or when fetch request sent was version 0 @@ -441,7 +465,7 @@ t_direct_fetch_expand_max_bytes(Config) when is_list(Config) -> Value = crypto:strong_rand_bytes(100), ok = brod:produce_sync(Client, ?TOPIC, Partition, Key, Value), {ok, Offset} = brod:resolve_offset(?HOSTS, Topic, Partition, - ?OFFSET_LATEST, ?config(client_config)), + ?OFFSET_LATEST, ?config(client_config)), {ok, {_, [Msg]}} = brod:fetch({?HOSTS, ?config(client_config)}, Topic, Partition, Offset - 1, #{max_bytes => 13}), @@ -450,6 +474,9 @@ t_direct_fetch_expand_max_bytes(Config) when is_list(Config) -> %% @doc Consumer should be smart enough to try greater max_bytes %% when it's not great enough to fetch one single message +t_consumer_max_bytes_too_small(kafka_version_match) -> + {Major, _Minor} = kafka_test_helper:kafka_version(), + Major < 3; t_consumer_max_bytes_too_small({init, Config}) -> meck:new(brod_kafka_request, [passthrough, no_passthrough_cover, no_history]), %% kafka returns empty message set when it's 0.9 @@ -843,7 +870,9 @@ wait_for_max_bytes_sequence([{Compare, MaxBytes} | Rest] = Waiting, Cnt) -> wait_for_max_bytes_sequence(Waiting, Cnt + 1); _ -> ct:fail("unexpected ~p, expecting ~p", [Bytes, {Compare, MaxBytes}]) - end + end; + Other -> + error(Other) after 3000 -> ct:fail("timeout", []) @@ -875,13 +904,6 @@ connect_txn_coordinator(TxnId, Config, RetriesLeft, _LastError) -> connect_txn_coordinator(TxnId, Config, RetriesLeft - 1, Reason) end. -has_txn() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> false; - "0.10" ++ _ -> false; - _ -> true - end. - consumer_config() -> [{max_wait_time, 1000}, {sleep_timeout, 10}]. retry(_F, 0) -> error(timeout); @@ -901,6 +923,14 @@ wait_for_consumer_connection(Consumer, OldConn) -> end, retry(F, 5). +has_txn() -> + case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + false; + _ -> + true + end. + %%%_* Emacs ==================================================================== %%% Local Variables: %%% allout-layout: t diff --git a/test/brod_offset_txn_SUITE.erl b/test/brod_offset_txn_SUITE.erl index 5565a8a4..44309ebc 100644 --- a/test/brod_offset_txn_SUITE.erl +++ b/test/brod_offset_txn_SUITE.erl @@ -72,11 +72,19 @@ end_per_testcase(_Case, Config) -> end, Config. -all() -> [F || {F, _A} <- module_info(exports), +all() -> + Cases = [F || {F, _A} <- module_info(exports), case atom_to_list(F) of "t_" ++ _ -> true; _ -> false - end]. + end], + %% no transaction before 0.11 + case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + []; + _ -> + Cases + end. client_config() -> case os:getenv("KAFKA_VERSION") of diff --git a/test/brod_txn_SUITE.erl b/test/brod_txn_SUITE.erl index b5fb9294..34867283 100644 --- a/test/brod_txn_SUITE.erl +++ b/test/brod_txn_SUITE.erl @@ -29,8 +29,13 @@ suite() -> [{timetrap, {seconds, 30}}]. init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(brod), - Config. + case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + {skip, "no_transaction"}; + _ -> + {ok, _} = application:ensure_all_started(brod), + Config + end. end_per_suite(_Config) -> ok. diff --git a/test/brod_txn_processor_SUITE.erl b/test/brod_txn_processor_SUITE.erl index cc2d5e3d..b4c36c19 100644 --- a/test/brod_txn_processor_SUITE.erl +++ b/test/brod_txn_processor_SUITE.erl @@ -35,8 +35,13 @@ suite() -> [{timetrap, {seconds, 30}}]. init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(brod), - Config. + case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + {skip, "no_transaction"}; + _ -> + {ok, _} = application:ensure_all_started(brod), + Config + end. end_per_suite(_Config) -> ok. diff --git a/test/data/ssl/README.md b/test/data/ssl/README.md new file mode 100644 index 00000000..70880474 --- /dev/null +++ b/test/data/ssl/README.md @@ -0,0 +1,3 @@ +This dir holds files for TLS/SSL tests. +The files are copied from Kafka docker image in the `make test-env` step. +See how the docker image is built here: https://github.com/zmstone/docker-kafka diff --git a/test/data/ssl/ca.pem b/test/data/ssl/ca.pem deleted file mode 100644 index 614aa1ef..00000000 --- a/test/data/ssl/ca.pem +++ /dev/null @@ -1,22 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDkzCCAnugAwIBAgIJAPjeRT8z4mElMA0GCSqGSIb3DQEBCwUAMGAxCzAJBgNV -BAYTAlNFMRIwEAYDVQQIDAlTdG9ja2hvbG0xEjAQBgNVBAcMCVN0b2NraG9sbTEN -MAsGA1UECgwEYnJvZDENMAsGA1UECwwEdGVzdDELMAkGA1UEAwwCKi4wHhcNMTYx -MTA0MTYxNDM2WhcNMjYxMTAyMTYxNDM2WjBgMQswCQYDVQQGEwJTRTESMBAGA1UE -CAwJU3RvY2tob2xtMRIwEAYDVQQHDAlTdG9ja2hvbG0xDTALBgNVBAoMBGJyb2Qx -DTALBgNVBAsMBHRlc3QxCzAJBgNVBAMMAiouMIIBIjANBgkqhkiG9w0BAQEFAAOC -AQ8AMIIBCgKCAQEAyIbBpX2DvhIbcXx1uho3Vm+hOLXrZJwNgVL3yDx/anGPvD2a -ZkUjdrJNh8jy5ZFA7jBQGLYIyMQYY8UMyAPIQbCsi0wvFhcWfv+/VTSOfgcK04D+ -QQRni8lkWI66oBcM02Wtwo3K5W7KWJ+LOAaV5hmSvLhcyIsSQC6MRBGRGJ89Oyza -7s1FrCY0HCa6BicY48sLTHTT8MScK5kOMO5KqMK8rY/dLRYynhC2K8/stzqN27HI -MoktDEzzCAfRaNfXE8o1NekJcpFLQNi9/nab7vcbWo/QmUCCF0Ny5BGWEx+GpEp9 -HjVM5KYAYlDqpMm3wttMs7dtU9lEXZk69uCejwIDAQABo1AwTjAdBgNVHQ4EFgQU -I1wMy5ObzZNi7qh3W9VSYKJRctYwHwYDVR0jBBgwFoAUI1wMy5ObzZNi7qh3W9VS -YKJRctYwDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAGmyomSHF4sZh -yNIdDmeUp+nOFE6dXuK1vo8PlKrf+tyajNdic5ZMCF9BzbKSxFnejwHA2fzBlu6P -27kmmMPWuAhvcyxNciLZ/gGCn5gMPutKaweuaD6G93jkIngdXtbz+c6icpwsO9cK -Z0mdVuesJnmLQYLn9pHDzGUGYPFZpHVXwQzyAVw4m9T+aqKwwe/0dL1Z/8b/iuwN -K0S4/c7gLH8rB1jQisHomgHano43TzJq8ZFX7wF1E2tnHDdGk+uEZr5C7VPRgrF8 -/DhGGJnw3AoQgD5g1YqFGA5pA0AXr4RF27Y7bKYnzvbktOkfcNhw/4P2rKXWWs1Q -x2xsU3VaTQ== ------END CERTIFICATE----- diff --git a/test/data/ssl/client-crt.pem b/test/data/ssl/client-crt.pem deleted file mode 100644 index 45df2707..00000000 --- a/test/data/ssl/client-crt.pem +++ /dev/null @@ -1,20 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDPDCCAiQCCQDU7hj/emVKfDANBgkqhkiG9w0BAQsFADBgMQswCQYDVQQGEwJT -RTESMBAGA1UECAwJU3RvY2tob2xtMRIwEAYDVQQHDAlTdG9ja2hvbG0xDTALBgNV -BAoMBGJyb2QxDTALBgNVBAsMBHRlc3QxCzAJBgNVBAMMAiouMB4XDTE2MTEwNDE2 -MTQzNloXDTI2MTEwMjE2MTQzNlowYDELMAkGA1UEBhMCU0UxEjAQBgNVBAgMCVN0 -b2NraG9sbTESMBAGA1UEBwwJU3RvY2tob2xtMQ0wCwYDVQQKDARicm9kMQ0wCwYD -VQQLDAR0ZXN0MQswCQYDVQQDDAIqLjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC -AQoCggEBAKtN+UzF8g9JRhKXn/rnm4jCr26+HpboDQYxG1HSCwwWskdOMK1b/8w4 -ipNzpoV16teRW5AVdq5Z6DDzBE5X43rrJZ9+x6pd25mVyktmwAIxEYscLtxN1UoL -a5EF13D8UPWCyzylThhUwi67bHvbLeWzAKoccKqdV/5ZNjFnqt9Q9seFOxyXNcFE -/qfUQTfkcL4rei2dgkFPFOXbF2rKRgMaiseyVAJP0G8AcsCkQvaYnkQrJ8nAZBtI -vZmq2og9PW7Z8rEbm9TVLnLNtEE5Lx2S1SQS9QPccYJDAyQJLCOw2ikGQPgtDfbs -KILEp+MChTWgEeb/LBlN/qa+zDraDm0CAwEAATANBgkqhkiG9w0BAQsFAAOCAQEA -EdsizFjP+hWSa5A0UFRIETvAztpTd+pjWWVv3DwCCRc2aMys+GYnR5fkHtnwKr7u -diZ8SSMZQFhlxA9MRNe8++wKeKeCzqrwIV1+mQcGqrJLl6sxW5TcMs/bRy5BPwZJ -RGlcz6HdLY8UBZzY2Qy2A4VecqwNe07Vg+7Yebui4w09pt5045S/q33/arb/LKP+ -1CbCjNyF3QC0aww+YgML+PgjnNtqbO/85qV424/dMX+aNAotQ/zBdEfEXyFaCoAE -yCHA99FnhHsQ9gwv9vhMLAX+yiBIEoh3e18EtmZdsvsTpDd1KI4nrh44TJfEY65+ -fNeAXYygkzsN14bbk9PgMw== ------END CERTIFICATE----- diff --git a/test/data/ssl/client-key.pem b/test/data/ssl/client-key.pem deleted file mode 100644 index 641967ab..00000000 --- a/test/data/ssl/client-key.pem +++ /dev/null @@ -1,28 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCrTflMxfIPSUYS -l5/655uIwq9uvh6W6A0GMRtR0gsMFrJHTjCtW//MOIqTc6aFderXkVuQFXauWegw -8wROV+N66yWffseqXduZlcpLZsACMRGLHC7cTdVKC2uRBddw/FD1gss8pU4YVMIu -u2x72y3lswCqHHCqnVf+WTYxZ6rfUPbHhTsclzXBRP6n1EE35HC+K3otnYJBTxTl -2xdqykYDGorHslQCT9BvAHLApEL2mJ5EKyfJwGQbSL2ZqtqIPT1u2fKxG5vU1S5y -zbRBOS8dktUkEvUD3HGCQwMkCSwjsNopBkD4LQ327CiCxKfjAoU1oBHm/ywZTf6m -vsw62g5tAgMBAAECggEADFm50Jww4INC5xJBeYB7STfoGA7i+7RNRBYERzjijQOR -5OwxPD52yc2FyC29Yr/mp5YWSOQTQ2y9/dF3jQJvJyyO8NneIV1U+NTA2gDVdRL+ -lc35Xu7JouYB4lnOd5npaFn+tyef4scxnNbscl2SCI6ITLtyMAraDj92VceInUMF -28srCTMdjbhVLpeq80qdeDVnyzlmua1W8pjR1lNXY2IECS9gTp6+JLiMQ0FJlC9V -r+U5iAoqLCNh+QpdM+2Z8kbkKA5PqsWcAhx+dTTkbRPp59r7Qd2xtxde5PGlm6zp -cqXgbWaXCMlbL5C7eOyPfLty3+KPrR6LGW6jGEqioQKBgQDcK2LGx/1PE2Y27p+O -RImN5SYERiRnYen7bm1CBAoH1J5LDxWWcf8Bz8/y4bNvEZJVosvPDRoNilI4RTYD -JiJw/qXio6FG78yIzvCK0WLIPgq6stufdbd/+UsNrDbGTuhk/qti8TSckEEgrUWg -U0NgEc/zyIMQK/4mZSgqeUpuxQKBgQDHLsxRT3Ile4sT2arxv0/KzSmnEL9hCAa9 -Cf+N0mWPrt6rzJUTD0+FBToXGP3k4auKETRb3XHvSHCwwl6Bi+NTPpVYqBT53jEv -fSb6bMjSlZyja+miVh/7TI2on7keus19XtZyks7PKoHa+i4w61zy5jbBdBC/kU1y -8O3HXF4biQKBgQCI6/5o6vTQmZrmrK3TtzHoacqju89l79GoyPrvpD1ss0CiI0Zk -oo5ZXRjQzqZde4sK8MxY/qfmJdCOKBS4Dp46sVMOyH5C9Fy59CBJ5H/PUi4v/41v -9LBiyPFxFlmWKHqEXJDPXnw+pcOrA7caRs3O0CUIUfmYNBPBYwWArJ+qlQKBgFpO -25BaJvTbqNkdLaZiCTl3/9ShgUPrMbLwH5AbvrSAorDeFxEHNhSnpAjo6eSmdPIq -jsTACHJnM8DQv6yY0j7h9zC1NJ19omtXoR6VyA/CibyGpu1VgzabJPc5Q+Os6pJX -N3/HFEFVkn7IQ70mWYQ/4L+hch6JMMZWeliTho+RAoGADcqzTMLtp7kRH8LQcq1n -oCE2FYJPvpd8PWlMCZ0ewSk6CbIgLvwJ+Hw0040m11HlCG7xVNdJV0rAU68Z7txm -pYIXL3D9MlJWCWMjZ7k11fuN1EtPLYYhMgS7FhADdUfFhnRGDkF2LnbvZIh3UtN6 -H5khVwyCU9LwQoxKfTmuxnY= ------END PRIVATE KEY----- diff --git a/test/kafka_test_helper.erl b/test/kafka_test_helper.erl index 56ac9b4f..693e0188 100644 --- a/test/kafka_test_helper.erl +++ b/test/kafka_test_helper.erl @@ -17,6 +17,7 @@ , client_config/0 , bootstrap_hosts/0 , kill_process/2 + , kafka_version/0 ]). -include("brod_test_macros.hrl"). @@ -87,6 +88,27 @@ client_config() -> _ -> [] end. +maybe_zookeeper() -> + case kafka_version() of + {3, _} -> + %% Kafka 2.2 started supporting --bootstap-server, but 2.x still supports --zookeeper + %% Starting from 3.0, --zookeeper is no longer supported, must use --bootstrap-server + "--bootstrap-server localhost:9092"; + _ -> + "--zookeeper localhost:2181" + end. + +kafka_version() -> + VsnStr = os:getenv("KAFKA_VERSION"), + case VsnStr =:= "" orelse VsnStr =:= false of + true -> + ct:pal("KAFKA_VERSION is not set, defaulting to 3.6", []), + {3, 6}; + false -> + [Major, Minor | _] = string:tokens(VsnStr, "."), + {list_to_integer(Major), list_to_integer(Minor)} + end. + prepare_topic(Topic) when is_binary(Topic) -> prepare_topic({Topic, 1}); prepare_topic({Topic, NumPartitions}) -> @@ -97,12 +119,12 @@ prepare_topic({Topic, NumPartitions, NumReplicas}) -> ok = brod:start_producer(?TEST_CLIENT_ID, Topic, _ProducerConfig = []). delete_topic(Name) -> - Delete = "/opt/kafka/bin/kafka-topics.sh --zookeeper localhost " + Delete = "/opt/kafka/bin/kafka-topics.sh " ++ maybe_zookeeper() ++ " --delete --topic ~s", exec_in_kafka_container(Delete, [Name]). create_topic(Name, NumPartitions, NumReplicas) -> - Create = "/opt/kafka/bin/kafka-topics.sh --zookeeper localhost " + Create = "/opt/kafka/bin/kafka-topics.sh " ++ maybe_zookeeper() ++ " --create --partitions ~p --replication-factor ~p" " --topic ~s --config min.insync.replicas=1", exec_in_kafka_container(Create, [NumPartitions, NumReplicas, Name]).