diff --git a/scripts/setup-test-env.sh b/scripts/setup-test-env.sh index a67cb4cb..b39fe63e 100755 --- a/scripts/setup-test-env.sh +++ b/scripts/setup-test-env.sh @@ -5,6 +5,19 @@ docker ps > /dev/null || { exit 1 } +function docker_compose { + if command -v docker-compose ; then + docker-compose $@ + else + docker compose version &> /dev/null + if [ $? -eq 0 ]; then + docker compose $@ + else + exit "couldn't find docker compose, needed for testing" + fi + fi +} + VERSION=${KAFKA_VERSION:-1.1} if [ -z $VERSION ]; then VERSION=$1; fi @@ -26,8 +39,9 @@ export KAFKA_VERSION=$VERSION TD="$(cd "$(dirname "$0")" && pwd)" -docker-compose -f $TD/docker-compose.yml down || true -docker-compose -f $TD/docker-compose.yml up -d +docker_compose -f $TD/docker-compose.yml down || true +docker_compose -f $TD/docker-compose.yml up -d + n=0 while [ "$(docker exec kafka-1 bash -c '/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --list')" != '' ]; do @@ -57,6 +71,11 @@ create_topic "brod-group-coordinator-1" 3 2 create_topic "brod-demo-topic-subscriber" 3 2 create_topic "brod-demo-group-subscriber-koc" 3 2 create_topic "brod-demo-group-subscriber-loc" 3 2 +create_topic "brod_txn_SUITE_1" 3 2 +create_topic "brod_txn_SUITE_2" 3 2 +create_topic "brod_txn_subscriber_input" 3 2 +create_topic "brod_txn_subscriber_output_1" 3 2 +create_topic "brod_txn_subscriber_output_2" 3 2 create_topic "brod_compression_SUITE" create_topic "lz4-test" create_topic "test-topic" diff --git a/src/brod.erl b/src/brod.erl index 8b16d16d..ee866a5c 100644 --- a/src/brod.erl +++ b/src/brod.erl @@ -58,6 +58,16 @@ , sync_produce_request_offset/2 ]). +%% Transactions API +-export([ transaction/3 + , txn_do/3 + , txn_produce/5 + , txn_produce/4 + , txn_add_offsets/3 + , commit/1 + , abort/1 + ]). + %% Simple Consumer API -export([ consume_ack/2 , consume_ack/4 @@ -155,6 +165,7 @@ , message_set/0 , offset/0 , offset_time/0 + , offsets_to_commit/0 , partition/0 , partition_assignment/0 , partition_fun/0 @@ -168,6 +179,9 @@ , topic/0 , topic_partition/0 , value/0 + , transactional_id/0 + , transaction/0 + , transaction_config/0 ]). -include("brod_int.hrl"). @@ -183,6 +197,7 @@ -type partition() :: kpro:partition(). -type topic_partition() :: {topic(), partition()}. -type offset() :: kpro:offset(). %% Physical offset (an integer) +-type offsets_to_commit() :: kpro:offsets_to_commit(). -type key() :: undefined %% no key, transformed to <<>> | binary(). -type value() :: undefined %% no value, transformed to <<>> @@ -193,6 +208,12 @@ | kpro:msg_input() %% one magic v2 message | kpro:batch_input(). %% maybe nested batch +-type transactional_id() :: brod_transaction:transactional_id(). +-type transaction() :: brod_transaction:transaction(). +-type transaction_config() :: brod_transaction:transaction_config(). +-type txn_function() :: brod_transaction_processor:process_function(). +-type txn_do_options() :: brod_transaction_processor:do_options(). + -type msg_input() :: kpro:msg_input(). -type batch_input() :: [msg_input()]. @@ -1347,6 +1368,55 @@ fetch_committed_offsets(Client, GroupId) -> main(X) -> brod_cli:main(X). -endif. +%% @doc Start a new transaction, `TxId' will be the id of the transaction +%% @equiv brod_transaction:start_link/3 +-spec transaction(client(), transactional_id(), transaction_config()) -> {ok, transaction()}. +transaction(Client, TxnId, Config) -> + brod_transaction:new(Client, TxnId, Config). + +%% @doc Execute the function in the context of a fetch-produce cycle +%% with access to an open transaction. +%% @see brod_transaction_processor:do/3 +-spec txn_do(txn_function(), client(), txn_do_options()) -> {ok, pid()} + | {error, any()}. +txn_do(ProcessFun, Client, Options) -> + brod_transaction_processor:do(ProcessFun, Client, Options). + +%% @doc Produce the message (key and value) to the indicated topic-partition +%% synchronously. +%% @see brod_transaction:produce/5 +-spec txn_produce(transaction(), topic(), partition(), key(), value()) -> + {ok, offset()} | {error, any()}. +txn_produce(Transaction, Topic, Partition, Key, Value) -> + brod_transaction:produce(Transaction, Topic, Partition, Key, Value). + +%% @doc Produce the batch of messages to the indicated topic-partition +%% synchronously. +%% @see brod_transaction:produce/5 +-spec txn_produce(transaction(), topic(), partition(), batch_input()) -> + {ok, offset()} | {error, any()}. +txn_produce(Transaction, Topic, Partition, Batch) -> + brod_transaction:produce(Transaction, Topic, Partition, Batch). + +%% @doc Add the offset consumed by a group to the transaction. +%% @see brod_transaction:add_offsets/3 +-spec txn_add_offsets(transaction(), group_id(), offsets_to_commit()) -> + ok | {error, any()}. +txn_add_offsets(Transaction, ConsumerGroup, Offsets) -> + brod_transaction:add_offsets(Transaction, ConsumerGroup, Offsets). + +%% @doc Commit the transaction +%% @see brod_transaction:commit/1 +-spec commit(transaction()) -> ok | {error, any()}. +commit(Transaction) -> + brod_transaction:commit(Transaction). + +%% @doc Abort the transaction +%% @see brod_transaction:abort/1 +-spec abort(transaction()) -> ok | {error, any()}. +abort(Transaction) -> + brod_transaction:abort(Transaction). + %%%_* Emacs ==================================================================== %%% Local Variables: %%% allout-layout: t diff --git a/src/brod_client.erl b/src/brod_client.erl index 18914751..66f52268 100644 --- a/src/brod_client.erl +++ b/src/brod_client.erl @@ -28,6 +28,7 @@ -export([ get_consumer/3 , get_connection/3 , get_group_coordinator/2 + , get_transactional_coordinator/2 , get_leader_connection/3 , get_metadata/2 , get_metadata_safe/2 @@ -92,6 +93,7 @@ -type partition() :: brod:partition(). -type config() :: proplists:proplist(). -type group_id() :: brod:group_id(). +-type transactional_id() :: brod:transactional_id(). -type partition_worker_key() :: ?PRODUCER_KEY(topic(), partition()) | ?CONSUMER_KEY(topic(), partition()). @@ -281,6 +283,14 @@ get_partitions_count_safe(Client, Topic) -> get_group_coordinator(Client, GroupId) -> safe_gen_call(Client, {get_group_coordinator, GroupId}, infinity). +%% @doc Get broker endpoint and connection config for +%% connecting a transactional coordinator. +-spec get_transactional_coordinator(client(), transactional_id()) -> + {ok, {endpoint(), brod:conn_config()}} | {error, any()}. +get_transactional_coordinator(Client, TransactionId) -> + safe_gen_call(Client, {get_transactional_coordinator, TransactionId}, infinity). + + %% @doc Register self() as a partition producer. %% %% The pid is registered in an ETS table, then the callers @@ -384,6 +394,9 @@ handle_call({get_connection, Host, Port}, _From, State) -> handle_call({get_group_coordinator, GroupId}, _From, State) -> {Result, NewState} = do_get_group_coordinator(State, GroupId), {reply, Result, NewState}; +handle_call({get_transactional_coordinator, TransactionId}, _From, State) -> + {Result, NewState} = do_get_transactional_coordinator(State, TransactionId), + {reply, Result, NewState}; handle_call({start_producer, TopicName, ProducerConfig}, _From, State) -> {Reply, NewState} = do_start_producer(TopicName, ProducerConfig, State), {reply, Reply, NewState}; @@ -650,6 +663,19 @@ do_get_group_coordinator(State0, GroupId) -> {{error, Reason}, State} end. +-spec do_get_transactional_coordinator(state(), transactional_id()) -> + {Result, state()} when Result :: {ok, connection()} | {error, any()}. +do_get_transactional_coordinator(State0, TransactionId) -> + State = ensure_metadata_connection(State0), + MetaConn = get_metadata_connection(State), + Timeout = timeout(State), + case kpro:discover_coordinator(MetaConn, txn, TransactionId, Timeout) of + {ok, Endpoint} -> + {{ok, {Endpoint, conn_config(State)}}, State}; + {error, Reason} -> + {{error, Reason}, State} + end. + timeout(#state{config = Config}) -> timeout(Config); timeout(Config) -> diff --git a/src/brod_transaction.erl b/src/brod_transaction.erl new file mode 100644 index 00000000..11c1ad81 --- /dev/null +++ b/src/brod_transaction.erl @@ -0,0 +1,407 @@ +%%% +%%% Copyright (c) 2023 @axs-mvd and contributors +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%% + +%% @doc A `brod_transaction' is a process that orchestates a set of +%% producers to store messages within a transaction, it also supports +%% committing offsets in the same transaction. +%% +%% Simple produce sample: +%% +%% ``` +%% {ok, Tx} = brod_transaction:new(Client, TxId, []), +%% lists:foreach(fun(Partition) -> +%% Key = rand(), Value = rand(), +%% {ok, _Offset} = +%% brod_transaction:produce(Tx, +%% Topic, +%% Partition, +%% Key, +%% Value), +%% end, Partitions), +%% brod_transaction:commit(Tx), +%% ''' +%% +%% handle callback of a group subscriber using offset commit within a +%% transaction: +%% +%% ``` +%% handle_message(Topic, +%% Partition, +%% #kafka_message{ offset = Offset +%% , key = Key +%% , value = Value}, +%% #{ client := Client +%% , group_id := GroupId} = State) -> +%% {ok, Tx} = brod_transaction:new(Client), +%% {ok, _ProducedOffset} = brod_transaction:produce(Tx, ?TOPIC_OUTPUT, Partition, Key, Value), +%% ok = brod_transaction:txn_add_offsets(Tx, GroupId, #{{Topic, Partition} => Offset}), +%% ok = brod_transaction:commit(Tx) +%% +%% {ok, ack_no_commit, State}. +%% ''' +%% + +-module(brod_transaction). +-behaviour(gen_server). + +% public API +-export([ produce/5 + , produce/4 + , add_offsets/3 + , commit/1 + , abort/1 + , stop/1 + , new/3 + , start_link/3 + ]). + +% gen_server callbacks +-export([ init/1 + , handle_cast/2 + , handle_call/3 + , terminate/2 + ]). + +% type exports +-export_type([ batch_input/0 + , call_ref/0 + , client/0 + , client_id/0 + , transaction_config/0 + , group_id/0 + , key/0 + , offset/0 + , offsets_to_commit/0 + , partition/0 + , topic/0 + , transaction/0 + , transactional_id/0 + , txn_ctx/0 + , value/0 + ]). + +%%============================================================================== +%% Type declarations +%%============================================================================== + +-type call_ref() :: brod:call_ref(). +-type client() :: client_id() | pid(). +-type client_id() :: atom(). +-type transaction_config() :: [ {timeout, non_neg_integer()} + | {backoff_step, non_neg_integer()} + | {max_retries, non_neg_integer()} + ]. +-type group_id() :: kpro:group_id(). +-type key() :: brod:key(). +-type offset() :: kpro:offset(). +-type offsets_to_commit() :: kpro:offsets_to_commit(). +-type partition() :: kpro:partition(). +-type topic() :: kpro:topic(). +-type transaction() :: pid(). +-type transactional_id() :: kpro:transactional_id(). +-type txn_ctx() :: kpro:txn_ctx(). +-type value() :: brod:value(). +-type batch_input() :: kpro:batch_input(). + +-record(state, + { client_pid :: client() + , context :: txn_ctx() + , timeout :: pos_integer() + , sequences :: map() + , sent_partitions :: map() + , max_retries :: pos_integer() + , backoff_step :: pos_integer() + }). + +-type state() :: #state{}. + +%%============================================================================== +%% API functions +%%============================================================================== + +%% @see start_link/3 +-spec new(pid(), transactional_id(), transaction_config()) -> {ok, transaction()}. +new(ClientPid, TxId, Config) -> + gen_server:start_link(?MODULE, + {ClientPid, TxId, Config}, + []). + +%% @doc Start a new transaction, `TxId'will be the id of the transaction +%% `Config' is a proplist, all values are optional: +%% `timeout':`Connection timeout in millis +%% `backoff_step': after each retry it will sleep for 2^Attempt * backoff_step +%% millis +%% `max_retries' +-spec start_link(pid(), transactional_id(), transaction_config()) -> {ok, pid()}. +start_link(ClientPid, TxId, Config) -> + gen_server:start_link(?MODULE, {ClientPid, TxId, Config}, []). + +%% @doc Produce the message (key and value) to the indicated topic-partition +%% synchronously. +-spec produce(transaction(), topic(), partition(), key(), value()) -> + {ok, offset()} | {error, any()}. +produce(Transaction, Topic, Partition, Key, Value) -> + gen_server:call(Transaction, {produce, Topic, Partition, Key, Value}). + +%% @doc Synchronously produce the batch of messages to the indicated +%% topic-partition +-spec produce(transaction(), topic(), partition(), batch_input()) -> + {ok, offset()} | {error, any()}. +produce(Transaction, Topic, Partition, Batch) -> + gen_server:call(Transaction, {produce, Topic, Partition, Batch}). + +%% @doc Add the offset consumed by a group to the transaction. +-spec add_offsets(transaction(), group_id(), offsets_to_commit()) -> ok | {error, any()}. +add_offsets(Transaction, ConsumerGroup, Offsets) -> + gen_server:call(Transaction, {add_offsets, ConsumerGroup, Offsets}). + +%% @doc Commit the transaction, after this, the gen_server will stop +-spec commit(transaction()) -> ok | {error, any()}. +commit(Transaction) -> + gen_server:call(Transaction, commit). + +%% @doc Abort the transaction, after this, the gen_server will stop +-spec abort(transaction()) -> ok | {error, any()}. +abort(Transaction) -> + gen_server:call(Transaction, abort). + +%% @doc Stop the transaction. +-spec stop(transaction()) -> ok | {error, any()}. +stop(Transaction) -> + gen_server:call(Transaction, terminate). + +%%============================================================================== +%% gen_server callbacks +%%============================================================================== + +init({Client, TxId, PropListConfig}) -> + ClientPid = pid(Client), + erlang:process_flag(trap_exit, true), + + Config = + #{ max_retries := MaxRetries + , backoff_step := BackOffStep + , timeout := Timeout + } = lists:foldl(fun({K, V}, M) -> + M#{K => V} + end, + #{ max_retries => 5 + , backoff_step => 100 + , timeout => 1000 + }, PropListConfig), + + {ok, CTX} = make_txn_context(ClientPid, TxId, Config), + {ok, #state{ client_pid = ClientPid + , context = CTX + , max_retries = MaxRetries + , backoff_step= BackOffStep + , timeout = Timeout + , sequences = #{} + , sent_partitions = #{} + }}. +handle_call({add_offsets, ConsumerGroup, Offsets}, _From, + #state{ client_pid = Client + , context = CTX + , max_retries = MaxRetries + , backoff_step = BackOffStep + } = State) -> + Resp = do_add_offsets(Client, CTX, ConsumerGroup, Offsets, + #{ max_retries => MaxRetries + , backoff_step => BackOffStep + }), + {reply, Resp, State}; +handle_call({produce, Topic, Partition, Key, Value}, _From, + #state{} = OldState) -> + case do_produce(Topic, Partition, Key, Value, OldState) of + {ok, {Offset, State}} -> {reply, {ok, Offset}, State}; + {error, Reason} -> {reply, {error, Reason}, OldState} + end; +handle_call({produce, Topic, Partition, Batch}, _From, + #state{} = OldState) -> + case do_batch_produce(Topic, Partition, Batch, OldState) of + {ok, {Offset, State}} -> {reply, {ok, Offset}, State}; + {error, Reason} -> {reply, {error, Reason}, OldState} + end; +handle_call(commit, _From, #state{context = CTX} = State) -> + {stop, normal, kpro:txn_commit(CTX), State}; +handle_call(terminate, _From, State) -> + {stop, normal, ok, #state{} = State}; +handle_call(abort, _From, + #state{context = CTX} = State) -> + {stop, normal, kpro:txn_abort(CTX), State}; +handle_call({abort, Timeout}, _From, + #state{context = CTX} = State) -> + {stop, normal, + kpro:txn_abort(CTX, #{timeout => Timeout}), + State}; +handle_call(stop, _From, #state{} = State) -> + {stop, normal, ok, State}; +handle_call(Call, _From, #state{} = State) -> + {reply, {error, {unsupported_call, Call}}, State}. +handle_cast(_Cast, #state{} = State) -> + {noreply, State}. +terminate(_Reason, #state{context = CTX}) -> + kpro:txn_abort(CTX). + +%%============================================================================== +%% Internal functions +%%============================================================================== + +make_txn_context(Client, TxId, #{ max_retries := MaxRetries + , backoff_step := BackOffStep + })-> + persistent_call(fun() -> + make_txn_context_internal(Client, TxId) + end, MaxRetries, BackOffStep). + +make_txn_context_internal(Client, TxId) -> + case brod_client:get_transactional_coordinator(Client, TxId) of + {ok, {{Host, Port}, _}} -> + case brod_client:get_connection(Client, Host, Port) of + {ok, Conn} -> kpro:txn_init_ctx(Conn, TxId); + + {error, Reason} -> {error, Reason} + end; + {error, Reason} -> {error, Reason} + end. + +do_add_offsets(Client, CTX, ConsumerGroup, Offsets, + #{ max_retries := MaxRetries + , backoff_step := BackOffStep + }) -> + persistent_call(fun() -> + do_add_offsets_internal(Client, CTX, + ConsumerGroup, Offsets) + end, + MaxRetries, BackOffStep). + +do_add_offsets_internal(Client, CTX, ConsumerGroup, Offsets) -> + case brod_client:get_group_coordinator(Client, ConsumerGroup) of + {ok, {{Host, Port}, _}} -> + case brod_client:get_connection(Client, Host, Port) of + {ok, Conn} -> send_cg_and_offset(Conn, CTX, ConsumerGroup, Offsets); + {error, Reason} -> {error, Reason} + end; + {error, Reason} -> {error, Reason} + end. + +send_cg_and_offset(GroupCoordConn, CTX, ConsumerGroup, Offsets) -> + %% before adding the offset we need to let kafka know we are going to commit + %% the offsets. + case kpro:txn_send_cg(CTX, ConsumerGroup) of + ok -> kpro:txn_offset_commit(GroupCoordConn, ConsumerGroup, CTX, Offsets); + {error, Reason} -> {error, Reason} + end. + +-spec do_produce(topic(), partition(), key(), value(), state()) -> + {error, string()} | {ok, {offset(), state()}}. +do_produce(Topic, Partition, Key, Value, State) -> + do_batch_produce(Topic, Partition, [#{ key => Key + , value => Value + , ts => kpro_lib:now_ts() + }], State). + +-spec do_batch_produce(topic(), partition(), batch_input(), state()) -> + {error, string()} | {ok, {offset(), state()}}. +do_batch_produce(Topic, Partition, Batch, #state{ max_retries = MaxRetries + , backoff_step = BackOffStep + } = State) -> + persistent_call(fun() -> + do_batch_produce_internal(Topic, Partition, + Batch, State) + end, MaxRetries, BackOffStep). + +do_batch_produce_internal(Topic, Partition, Batch, + #state{ client_pid = ClientPid + , timeout = Timeout + , context = CTX + , sequences = Sequences + , sent_partitions = OldSentPartitions + } = State) -> + + case conn_and_vsn(ClientPid, Topic, Partition) of + {ok, {Connection, Vsn}} -> + FirstSequence = maps:get({Topic, Partition}, Sequences, 0), + + ProduceReq = kpro_req_lib:produce(Vsn, Topic, Partition, + Batch, + #{ txn_ctx => CTX + , first_sequence => FirstSequence + }), + + SentPartitions = + case maps:get({Topic, Partition}, OldSentPartitions, not_found) of + not_found -> + ok = kpro:txn_send_partitions(CTX, [{Topic, Partition}]), + maps:put({Topic, Partition}, true, OldSentPartitions); + _ -> OldSentPartitions + end, + + case send_req(Connection, ProduceReq, Timeout) of + {ok, Offset} -> + {ok, {Offset, State#state{ sent_partitions = SentPartitions + , sequences = maps:put({Topic, Partition}, + FirstSequence + length(Batch), + Sequences) + }}}; + {error, Reason} -> {error, Reason} + end; + {error, Reason} -> {error, Reason} + end. + +send_req(Connection, ProduceReq, Timeout) -> + case kpro:request_sync(Connection, ProduceReq, Timeout) of + {ok, Rsp} -> brod_utils:parse_rsp(Rsp); + {error, Reason} -> {error, Reason} + end. + +conn_and_vsn(ClientPid, Topic, Partition) -> + case brod_client:get_leader_connection(ClientPid, Topic, Partition) of + {ok, Connection} -> + case kpro:get_api_versions(Connection) of + {ok, #{ produce := {_, Vsn} + , fetch := {_, _} + }} -> {ok, {Connection, Vsn}}; + {error, Reason} -> {error, Reason} + end; + {error, Reason} -> {error, Reason} + end. + +-spec pid(client()) -> pid(). +pid(Client) when is_atom(Client) -> whereis(Client); +pid(Client) when is_pid(Client) -> Client. + +backoff(Attempt, BackOffStep) -> + timer:sleep(trunc(math:pow(2, Attempt) * BackOffStep)). + +persistent_call(Fun, MaxRetries, BackOffStep) -> + persistent_call(Fun, 0, MaxRetries, BackOffStep). + +persistent_call(Fun, Attempt, MaxRetries, BackOffStep) -> + case Fun() of + ok -> ok; + {ok, R} -> {ok, R}; + {error, _} when Attempt + 1 < MaxRetries -> + backoff(Attempt, BackOffStep), + persistent_call(Fun, Attempt + 1, MaxRetries, BackOffStep); + {error, Reason} -> {error, Reason} + end. + +%%%_* Emacs ==================================================================== +%%% Local Variables: +%%% allout-layout: t +%%% erlang-indent-level: 2 +%%% End: diff --git a/src/brod_transaction_processor.erl b/src/brod_transaction_processor.erl new file mode 100644 index 00000000..e586d8f2 --- /dev/null +++ b/src/brod_transaction_processor.erl @@ -0,0 +1,214 @@ +%%% +%%% Copyright (c) 2023 @axs-mvd and contributors +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%% + +%% @doc `brod_transaction_processor' allows the execution of a function in the context +%% of a transaction. It abstracts the usage of a group subscriber reading and writing +%% using a transaction in each fetch cycle. +%% For example, the following snippets are equivalent +%% +%%------------------------------------------------- +%% +%%function_that_does_something(Messages, ...) -> +%% write_some_messages_into_some_topic(Messages, ...), +%% write_some_other_messages_into_yet_another_topic(Messages, ...). +%% +%%handle_message(Topic, Partition, Messages, State) -> +%% {ok, Tx} = brod:transaction(...) % opens a transaction +%% function_that_does_something(Messages, ...) % adds the writes to the transaction +%% ok = brod:txn_add_offsets(...) % add offsets to the transsaction +%% ok = btrod:commit(Tx) % commit +%% {ok, ack_no_commit, State} +%% +%%------------------------------------------------- +%% +%%brod_transaction_processor:do( +%% fun(Transaction, Messages) -> +%% write_some_messages_into_some_topic(Messages, ...), +%% write_some_other_messages_into_yet_another_topic(Messages, ...) +%% end, +%% ...) +%% +%%------------------------------------------------- +%% +-module(brod_transaction_processor). + +-include("brod.hrl"). + +%public API +-export([do/3]). + +% group subscriber callbacks +-export([ init/2 + , handle_message/4 + , get_committed_offsets/3]). + +% type exports +-export_type([ do_options/0 + , process_function/0]). + +%%============================================================================== +%% Type declarations +%%============================================================================== + +-type client() :: client_id() | pid(). +-type client_id() :: atom(). +-type do_options() :: #{ group_config => proplists:proplist() + , consumer_config => proplists:proplist() + , transaction_config => proplists:proplist() + , group_id => binary() + , topics => [binary()]}. +-type message_set() :: #kafka_message_set{}. +-type transaction() :: brod_transaction:transaction(). + + +-type process_function() :: fun((transaction(), message_set()) -> ok + | {error, any()}). + +%% @doc executes the ProcessFunction within the context of a transaction. +%% Options is a map that can include +%% `group_config' as the configuration for the group suscriber. +%% `consumer_config' as the configuration for the consumer suscriber. +%% `transaction_config' transacction config. +%% `group_id' as the subscriber group id. +%% `topics' topics to fetch from. +%% +%% FizzBuzz sample: +%% +%% fizz_buzz(N) when (N rem 15) == 0 -> "FizzBuzz" +%% fizz_buzz(N) when (N rem 3) == 0 -> "Fizz" +%% fizz_buzz(N) when (N rem 5) == 0 -> "Buzz"; +%% fizz_buzz(N) -> N end. +%% +%% brod_transaction_processor:do( +%% fun(Transaction, #kafka_message_set{ topic = _Topic +%% , partition = Partition +%% , messages = Messages} = _MessageSet) -> +%% FizzBuzzed = +%% lists:map(fun(#kafka_message{ key = Key +%% , value = Value}) -> +%% #{ key => Key +%% , value => fizz_buzz(Value)} +%% end, Messages), +%% +%% brod:txn_produce(Transaction, +%% ?OUTPUT_TOPIC, +%% Partition, +%% FizzBuzzed), +%% +%% ok +%% end, Client, #{ topics => [?INPUT_TOPIC] +%% , group_id => ?PROCESSOR_GROUP_ID}). +%% +-spec do(process_function(), client(), do_options()) -> {ok, pid()} + | {error, any()}. +do(ProcessFun, Client, Opts) -> + + Defaults = #{ group_config => [{offset_commit_policy, consumer_managed}] + %% note that if you change the group_config you must include + %% the above option, as it enables our fetcher to manage + %% the offsets itself + , consumer_config => []}, + + #{ group_id := GroupId + , topics := Topics + , group_config := GroupConfig + , consumer_config := ConsumerConfig} = maps:merge(Defaults, Opts), + + InitState = #{client => Client, + process_function => ProcessFun}, + + brod:start_link_group_subscriber(Client, + GroupId, + Topics, + GroupConfig, + ConsumerConfig, + message_set, + ?MODULE, + InitState). + +%%============================================================================== +%% group subscriber callbacks +%%============================================================================== +init(GroupId, #{ client := Client + , process_function := ProcessFun + } = Opts) -> + #{ tx_id := TxId + , transaction_config := Config} = + maps:merge(#{ tx_id => make_transactional_id() + , transaction_config => [] + }, Opts), + + {ok, #{ client => Client + , transaction_config => Config + , tx_id => TxId + , process_function => ProcessFun + , group_id => GroupId + }}. + +handle_message(Topic, + Partition, + #kafka_message_set{ topic = Topic + , partition = Partition + , messages = _Messages + } = MessageSet, + #{ process_function := ProcessFun + , client := Client + , tx_id := TxId + , transaction_config := TransactionConfig + , group_id := GroupId + } = State) -> + + {ok, Tx} = brod:transaction(Client, TxId, TransactionConfig), + ok = ProcessFun(Tx, MessageSet), + ok = brod:txn_add_offsets(Tx, GroupId, offsets_to_commit(MessageSet)), + ok = brod:commit(Tx), + {ok, ack_no_commit, State}. + +get_committed_offsets(GroupId, TPs, #{client := Client} = State) -> + {ok, Offsets} = brod:fetch_committed_offsets(Client, GroupId), + TPOs = + lists:flatmap( fun(#{name := Topic, partitions := Partitions}) -> + [{{Topic, Partition}, COffset} || + #{ partition_index := Partition + , committed_offset := COffset + } <- Partitions, + lists:member({Topic, Partition}, TPs)] + end + , Offsets + ), + {ok, TPOs, State}. + +%%============================================================================== +%% Internal functions +%%============================================================================== + +make_transactional_id() -> + iolist_to_binary([atom_to_list(?MODULE), "-txn-", + base64:encode(crypto:strong_rand_bytes(8))]). + + +offsets_to_commit(#kafka_message_set{ topic = Topic + , partition = Partition + , messages = Messages + }) -> + #kafka_message{offset = Offset} = lists:last(Messages), + #{{Topic, Partition} => Offset}. + +%%%_* Emacs ==================================================================== +%%% Local Variables: +%%% allout-layout: t +%%% erlang-indent-level: 2 +%%% End: diff --git a/test/brod_offset_txn_SUITE.erl b/test/brod_offset_txn_SUITE.erl new file mode 100644 index 00000000..5565a8a4 --- /dev/null +++ b/test/brod_offset_txn_SUITE.erl @@ -0,0 +1,249 @@ +-module(brod_offset_txn_SUITE). + +-export([ init_per_suite/1 + , end_per_suite/1 + , init_per_testcase/2 + , end_per_testcase/2 + , all/0 + , suite/0 + ]). + +-export([ t_simple_test/1 + , t_no_commit_test/1 + ]). + +-export([ init/2 + , handle_message/4 + , get_committed_offsets/3 + ]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-include("include/brod.hrl"). + +-define(HOSTS, [{"localhost", 9092}]). +-define(TOPIC_OUTPUT_1, <<"brod_txn_subscriber_output_1">>). +-define(TOPIC_OUTPUT_2, <<"brod_txn_subscriber_output_2">>). +-define(TOPIC_INPUT, <<"brod_txn_subscriber_input">>). +-define(CLIENT_ID, client_consumer_group). +-define(GROUP_ID, <<"group_id_for_testing">>). +-define(TIMEOUT, 4000). +-define(config(Name), proplists:get_value(Name, Config)). + +%%%_* ct callbacks ============================================================= + +suite() -> [{timetrap, {seconds, 30}}]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(brod), + Config. + +end_per_suite(_Config) -> ok. + +init_per_testcase(Case, Config) -> + try ?MODULE:Case({'init', Config}) + catch error : function_clause -> + init_client(Case, Config) + end. + +init_client(Case, Config) -> + Client = Case, + brod:stop_client(Client), + ClientConfig = client_config(), + ok = brod:start_client(?HOSTS, Client, ClientConfig), + + [ {client, Client} + , {client_config, ClientConfig} | Config]. + +end_per_testcase(_Case, Config) -> + Subscriber = ?config(subscriber), + is_pid(Subscriber) andalso unlink(Subscriber), + is_pid(Subscriber) andalso exit(Subscriber, kill), + Pid = whereis(?config(client)), + try + Ref = erlang:monitor(process, Pid), + brod:stop_client(?config(client)), + receive + {'DOWN', Ref, process, Pid, _} -> ok + end + catch _ : _ -> + ok + end, + Config. + +all() -> [F || {F, _A} <- module_info(exports), + case atom_to_list(F) of + "t_" ++ _ -> true; + _ -> false + end]. + +client_config() -> + case os:getenv("KAFKA_VERSION") of + "0.9" ++ _ -> [{query_api_versions, false}]; + _ -> [] + end. + +init(GroupId, + #{ client := Client + , observer := OPid}) -> + {ok, #{ client => Client + , observer => OPid + , group_id => GroupId}}. + +handle_message(Topic, + Partition, + #kafka_message{ offset = Offset + , key = Key + , value = Value}, + #{ client := Client + , group_id := GroupId + , observer := ObserverPid} = State) -> + + {ok, Tx} = brod:transaction(Client, <<"some_transaction">>, []), + {ok, _} = brod:txn_produce(Tx, ?TOPIC_OUTPUT_1, Partition, Key, Value), + {ok, _} = brod:txn_produce(Tx, ?TOPIC_OUTPUT_2, Partition, Key, Value), + ok = brod:txn_add_offsets(Tx, GroupId, #{{Topic, Partition} => Offset}), + + case Value of + <<"no_commit">> -> + ok = brod:abort(Tx); + _ -> + ok = brod:commit(Tx) + end, + + ObserverPid ! {offset, Offset}, + + {ok, ack_no_commit, State}. + +get_committed_offsets(GroupId, TPs, #{client := Client} = State) -> + {ok, Offsets} = brod:fetch_committed_offsets(Client, GroupId), + TPOs = + lists:filter(fun({TP, _Offset}) -> + lists:member(TP, TPs) + end, + lists:foldl(fun(#{ name := Topic + , partitions := Partitions}, TPOs) -> + lists:append(TPOs, + lists:map(fun(#{ committed_offset := COffset + , partition_index := Partition}) -> + {{Topic, Partition}, COffset} + end, Partitions)) + end, [], Offsets)), + {ok, + TPOs, + State}. + +get_offset() -> + timer:sleep(100), + case get_committed_offsets(?GROUP_ID, + [{?TOPIC_INPUT, 0}], + #{client => ?CLIENT_ID}) of + + {ok, [{{?TOPIC_INPUT, 0}, Offset}], _} -> Offset; + {ok, [], _} -> 0 + end. + +send_no_commit_message() -> + send_message(rand(), <<"no_commit">>). + +send_simple_message() -> + send_message(rand(), <<"simple">>). + +send_message(Key, Value) -> + brod:start_producer(?CLIENT_ID, ?TOPIC_INPUT, []), + {ok, Offset} = brod:produce_sync_offset(?CLIENT_ID, ?TOPIC_INPUT, 0, Key, Value), + Offset. + +start_subscriber() -> + GroupConfig = [{offset_commit_policy, consumer_managed}], + + ConsumerConfig = [ {prefetch_count, 3} + , {sleep_timeout, 0} + , {max_wait_time, 100} + , {partition_restart_delay_seconds, 1} + , {begin_offset, 0} + ], + + brod:start_link_group_subscriber(?CLIENT_ID, + ?GROUP_ID, + [?TOPIC_INPUT], + GroupConfig, + ConsumerConfig, + message, + ?MODULE, + #{ client => ?CLIENT_ID + , observer => self()}). + +wait_to_last() -> + receive + _ -> wait_to_last() + after ?TIMEOUT -> done + end. + +wait_for_offset(ExpectedOffset) -> + receive + {offset, Offset} when Offset == ExpectedOffset -> + done; + {offset, _UnexpectedOffset} -> + wait_for_offset(ExpectedOffset) + after ?TIMEOUT -> timeout + end. + +rand() -> base64:encode(crypto:strong_rand_bytes(8)). + +t_simple_test(Config) when is_list(Config) -> + ok = brod:start_client(?HOSTS, ?CLIENT_ID, []), + {ok, SubscriberPid} = start_subscriber(), + done = wait_to_last(), + InitialOffset = get_offset(), + {ok, OffsetOutput1} = brod:resolve_offset(?HOSTS, ?TOPIC_OUTPUT_1, 0), + {ok, OffsetOutput2} = brod:resolve_offset(?HOSTS, ?TOPIC_OUTPUT_2, 0), + MessageOffset = send_simple_message(), + done = wait_for_offset(MessageOffset), + CurrentOffset = get_offset(), + ?assertMatch(MessageOffset, CurrentOffset), + + ?assertMatch(true, InitialOffset =< CurrentOffset), + ok = brod_group_subscriber:stop(SubscriberPid), + ?assertMatch(false, is_process_alive(SubscriberPid)), + + {ok, {_, MessagesO1}} = brod:fetch(?CLIENT_ID, + ?TOPIC_OUTPUT_1, + 0, OffsetOutput1, + #{isolation_level => read_committed}), + ?assertMatch(1, length(MessagesO1)), + + {ok, {_, MessagesO2}} = brod:fetch(?CLIENT_ID, + ?TOPIC_OUTPUT_2, + 0, OffsetOutput2, + #{isolation_level => read_committed}), + ?assertMatch(1, length(MessagesO2)), + ok. + +t_no_commit_test(Config) when is_list(Config) -> + ok = brod:start_client(?HOSTS, ?CLIENT_ID, []), + {ok, SubscriberPid} = start_subscriber(), + wait_to_last(), + {ok, OutputOffset1} = brod:resolve_offset(?HOSTS, ?TOPIC_OUTPUT_1, 0), + {ok, OutputOffset2} = brod:resolve_offset(?HOSTS, ?TOPIC_OUTPUT_2, 0), + InitialOffset = send_no_commit_message(), + done = wait_for_offset(InitialOffset), + CurrentOffset = get_offset(), + + ?assertMatch(true, InitialOffset >= CurrentOffset), + ok = brod_group_subscriber:stop(SubscriberPid), + false = is_process_alive(SubscriberPid), + + {ok, {_, MessagesO1}} = brod:fetch(?CLIENT_ID, + ?TOPIC_OUTPUT_1, + 0, OutputOffset1, + #{isolation_level => read_committed}), + ?assertMatch(0, length(MessagesO1)), + + {ok, {_, MessagesO2}} = brod:fetch(?CLIENT_ID, + ?TOPIC_OUTPUT_2, + 0, OutputOffset2, + #{isolation_level => read_committed}), + ?assertMatch(0, length(MessagesO2)), + ok. diff --git a/test/brod_txn_SUITE.erl b/test/brod_txn_SUITE.erl new file mode 100644 index 00000000..b5fb9294 --- /dev/null +++ b/test/brod_txn_SUITE.erl @@ -0,0 +1,225 @@ +-module(brod_txn_SUITE). +%% Test framework +-export([ init_per_suite/1 + , end_per_suite/1 + , init_per_testcase/2 + , end_per_testcase/2 + , all/0 + , suite/0 + ]). + +-export([ t_multiple_writes_transaction/1 + , t_simple_transaction/1 + , t_abort_transaction/1 + , t_batch_transaction/1 + ]). + +-include_lib("stdlib/include/assert.hrl"). + +-include("include/brod.hrl"). + +-define(HOSTS, [{"localhost", 9092}]). +-define(TOPIC_1, list_to_binary(atom_to_list(?MODULE)++"_1")). +-define(TOPIC_2, list_to_binary(atom_to_list(?MODULE)++"_2")). +-define(TIMEOUT, 280000). +-define(config(Name), proplists:get_value(Name, Config)). + +%%%_* ct callbacks ============================================================= + +suite() -> [{timetrap, {seconds, 30}}]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(brod), + Config. + +end_per_suite(_Config) -> ok. + +init_per_testcase(Case, Config) -> + try ?MODULE:Case({'init', Config}) + catch error : function_clause -> + init_client(Case, Config) + end. + +init_client(Case, Config) -> + Client = Case, + brod:stop_client(Client), + ClientConfig = client_config(), + ok = brod:start_client(?HOSTS, Client, ClientConfig), + TesterPid = self(), + Subscriber = spawn_link(fun() -> subscriber_loop(TesterPid) end), + Topics = [?TOPIC_1, ?TOPIC_2], + lists:foreach(fun(Topic) -> + ok = brod:start_consumer(Client, Topic, []), + brod:subscribe(Client, Subscriber, Topic, 0, []) + end, Topics), + + [{client, Client}, + {client_config, ClientConfig}, + {topics, Topics} | Config]. + +end_per_testcase(_Case, Config) -> + Subscriber = ?config(subscriber), + is_pid(Subscriber) andalso unlink(Subscriber), + is_pid(Subscriber) andalso exit(Subscriber, kill), + Pid = whereis(?config(client)), + try + Ref = erlang:monitor(process, Pid), + brod:stop_client(?config(client)), + receive + {'DOWN', Ref, process, Pid, _} -> ok + end + catch _ : _ -> + ok + end, + Config. + +all() -> [F || {F, _A} <- module_info(exports), + case atom_to_list(F) of + "t_" ++ _ -> true; + _ -> false + end]. + +client_config() -> + case os:getenv("KAFKA_VERSION") of + "0.9" ++ _ -> [{query_api_versions, false}]; + _ -> [] + end. + +subscriber_loop(TesterPid) -> + receive + {ConsumerPid, KMS} -> + #kafka_message_set{ messages = Messages + , partition = Partition} = KMS, + lists:foreach(fun(#kafka_message{offset = Offset, key = K, value = V}) -> + TesterPid ! {Partition, K, V}, + ok = brod:consume_ack(ConsumerPid, Offset) + end, Messages), + subscriber_loop(TesterPid); + Msg -> + ct:fail("unexpected message received by test subscriber.\n~p", [Msg]) + end. + +receive_messages(none) -> + receive + {_Partition, _K, _V} = M -> {unexpected_message, M} + after 1000 -> ok + end; + +receive_messages(ExpectedMessages) -> + case sets:is_empty(ExpectedMessages) of + true -> ok; + _ -> + receive + {_Partition, _K, _V} = M -> + case sets:is_element(M, ExpectedMessages) of + false -> {unexpected_message, M}; + true -> + receive_messages(sets:del_element(M, ExpectedMessages)) + end + after ?TIMEOUT -> + {still_waiting_for, ExpectedMessages} + end + end. + +rand() -> base64:encode(crypto:strong_rand_bytes(8)). + +t_simple_transaction(Config) when is_list(Config) -> + + {ok, Tx} = brod:transaction(?config(client), <<"transaction-id">>, []), + ?assertMatch(true, is_process_alive(Tx)), + + Results = lists:map(fun(Topic) -> + Partition = 0, + Key = rand(), + Value = rand(), + {ok, _Offset} = brod:txn_produce(Tx, Topic, Partition, Key, Value), + {Partition, Key, Value} + end, ?config(topics)), + + ?assertMatch(ok, receive_messages(none)), + ?assertMatch(ok, brod:commit(Tx)), + ?assertMatch(false, is_process_alive(Tx)), + ?assertMatch(ok, receive_messages(sets:from_list(Results))), + ?assertMatch(ok, receive_messages(none)), + ok. + +t_batch_transaction(Config) when is_list(Config) -> + + {ok, Tx} = brod:transaction(?config(client), <<"transaction-id">>, []), + ?assertMatch(true, is_process_alive(Tx)), + + Results = + lists:flatten(lists:map(fun(Topic) -> + Batch = lists:map( + fun(_) -> + #{key => rand() + , value => rand() + , ts => kpro_lib:now_ts()} + end, lists:seq(1, 10)), + + Partition = 0, + {ok, _Offset} = brod:txn_produce(Tx, Topic, Partition, Batch), + + lists:map(fun(#{key := Key + , value := Value}) -> + {Partition, Key, Value} + end, Batch) + + end, ?config(topics))), + + ?assertMatch(ok, receive_messages(none)), + ?assertMatch(ok, brod:commit(Tx)), + ?assertMatch(false, is_process_alive(Tx)), + ?assertMatch(ok, receive_messages(sets:from_list(Results))), + ?assertMatch(ok, receive_messages(none)), + ok. + +t_abort_transaction(Config) when is_list(Config) -> + + {ok, Tx} = brod:transaction(?config(client), <<"transaction-id">>, []), + ?assertMatch(true, is_process_alive(Tx)), + + _ = lists:map(fun(Topic) -> + Partition = 0, + Key = rand(), + Value = rand(), + {ok, _Offset} = brod:txn_produce(Tx, Topic, Partition, Key, Value), + {Partition, Key, Value} + end, ?config(topics)), + + ?assertMatch(ok, receive_messages(none)), + ?assertMatch(ok, brod:abort(Tx)), + ?assertMatch(false, is_process_alive(Tx)), + ?assertMatch(ok, receive_messages(none)), + ok. + +t_multiple_writes_transaction(Config) when is_list(Config) -> + + {ok, Tx} = brod:transaction(?config(client), <<"transaction-id">>, []), + ?assertMatch(true, is_process_alive(Tx)), + + FirstWave = lists:map(fun(Topic) -> + Partition = 0, + Key = rand(), + Value = rand(), + {ok, _Offset} = brod:txn_produce(Tx, Topic, Partition, Key, Value), + {Partition, Key, Value} + end, ?config(topics)), + + SecondWave = lists:map(fun(Topic) -> + Partition = 0, + Key = rand(), + Value = rand(), + {ok, _Offset} = brod:txn_produce(Tx, Topic, Partition, Key, Value), + {Partition, Key, Value} + end, ?config(topics)), + + Results = lists:append(FirstWave, SecondWave), + + ?assertMatch(ok, receive_messages(none)), + ?assertMatch(ok, brod:commit(Tx)), + ?assertMatch(false, is_process_alive(Tx)), + ?assertMatch(ok, receive_messages(sets:from_list(Results))), + ?assertMatch(ok, receive_messages(none)), + ok. + diff --git a/test/brod_txn_processor_SUITE.erl b/test/brod_txn_processor_SUITE.erl new file mode 100644 index 00000000..cc2d5e3d --- /dev/null +++ b/test/brod_txn_processor_SUITE.erl @@ -0,0 +1,241 @@ +-module(brod_txn_processor_SUITE). + +-export([ init_per_suite/1 + , end_per_suite/1 + , init_per_testcase/2 + , end_per_testcase/2 + , all/0 + , suite/0 + ]). + +-export([ t_simple_test/1 + , t_broken_test/1 + ]). + +-export([ init/2 + , handle_message/4 + ]). + +-include_lib("stdlib/include/assert.hrl"). + +-include("include/brod.hrl"). + +-define(HOSTS, [{"localhost", 9092}]). + +-define(INPUT_TOPIC, <<"brod_txn_subscriber_input">>). +-define(OUTPUT_TOPIC_1, <<"brod_txn_subscriber_output_1">>). +-define(OUTPUT_TOPIC_2, <<"brod_txn_subscriber_output_2">>). +-define(GROUP_ID, <<"group_id_for_testing">>). +-define(PROCESSOR_GROUP_ID, <<"processor_group_id_for_testing">>). +-define(TIMEOUT, 10000). +-define(config(Name), proplists:get_value(Name, Config)). + +%%%_* ct callbacks ============================================================= + +suite() -> [{timetrap, {seconds, 30}}]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(brod), + Config. + +end_per_suite(_Config) -> ok. + +init_per_testcase(Case, Config) -> + try ?MODULE:Case({'init', Config}) + catch error : function_clause -> + init_client(Case, Config) + end. + +init_client(Case, Config) -> + Client = Case, + brod:stop_client(Client), + ClientConfig = client_config(), + ok = brod:start_client(?HOSTS, Client, ClientConfig), + + [ {client, Client} + , {client_config, ClientConfig} | Config]. + +end_per_testcase(_Case, Config) -> + brod:stop_client(?config(client)), + Config. + +all() -> [F || {F, _A} <- module_info(exports), + case atom_to_list(F) of + "t_" ++ _ -> true; + _ -> false + end]. + +client_config() -> + case os:getenv("KAFKA_VERSION") of + "0.9" ++ _ -> [{query_api_versions, false}]; + _ -> [] + end. + +rand() -> + iolist_to_binary([base64:encode(crypto:strong_rand_bytes(8))]). + +produce_messages(Client) -> + ok = brod:start_producer(Client, ?INPUT_TOPIC, []), + + lists:map(fun(_) -> + Key = rand(), + Value = rand(), + {ok, _} = brod:produce_sync_offset(Client, ?INPUT_TOPIC, 0, Key, Value), + + Key + end, lists:seq(1, 10)). + +receive_messages(ExpectedMessages) -> + case sets:is_empty(ExpectedMessages) of + true -> ok; + false -> + receive + {'EXIT', _, _} -> + receive_messages(ExpectedMessages); + {_Topic, _Key} = M -> + receive_messages(sets:del_element(M, ExpectedMessages)) + after + ?TIMEOUT -> + {error, timeout} + end + end. + +t_simple_test(Config) -> + Client = ?config(client), + {ok, FetcherPid} = start_fetchers(self(), Client), + {ok, ProcessorPid} = start_processor(Client), + + ?assertMatch(true, is_process_alive(FetcherPid)), + ?assertMatch(true, is_process_alive(ProcessorPid)), + + Keys = produce_messages(Client), + + ExpectedMessages = sets:from_list( + lists:flatten( + lists:map(fun(Key) -> + [{?OUTPUT_TOPIC_1, Key}, + {?OUTPUT_TOPIC_2, Key}] + end, Keys))), + + ?assertMatch(ok, receive_messages(ExpectedMessages)), + + ?assertMatch(ok, gen_server:stop(FetcherPid)), + ?assertMatch(ok, gen_server:stop(ProcessorPid)), + + ok. + +t_broken_test(Config) -> + + Client = ?config(client), + {ok, FetcherPid} = start_fetchers(self(), Client), + process_flag(trap_exit, true), + {ok, ProcessorPid} = start_broken_processor(Client), + + ?assertMatch(true, is_process_alive(FetcherPid)), + + Keys = produce_messages(Client), + + ExpectedMessages = sets:from_list( + lists:flatten( + lists:map(fun(Key) -> + [{?OUTPUT_TOPIC_1, Key}, + {?OUTPUT_TOPIC_2, Key}] + end, Keys))), + process_flag(trap_exit, false), + + ?assertMatch({error, timeout}, receive_messages(ExpectedMessages)), + + ?assertMatch(ok, gen_server:stop(FetcherPid)), + ?assertMatch(ok, gen_server:stop(ProcessorPid)), + + ok. + +start_broken_processor(Client) -> + brod:txn_do( + fun(Transaction, #kafka_message_set{ topic = _Topic + , partition = Partition + , messages = Messages + } = _MessageSet) -> + lists:foreach(fun(#kafka_message{ key = Key + , value = Value + }) -> + brod:txn_produce(Transaction, + ?OUTPUT_TOPIC_1, + Partition, + [#{ key => Key + , value => Value + }]), + + brod:txn_produce(Transaction, + ?OUTPUT_TOPIC_2, + Partition, + [#{ key => Key + , value => Value + }]), + %% this should break a few things .) + false = is_process_alive(self()) + end, Messages), + ok + end, Client, #{ topics => [?INPUT_TOPIC] + , group_id => ?PROCESSOR_GROUP_ID}). + +start_processor(Client) -> + brod:txn_do( + fun(Transaction, #kafka_message_set{ topic = _Topic + , partition = Partition + , messages = Messages + } = _MessageSet) -> + + lists:foreach(fun(#kafka_message{ key = Key + , value = Value}) -> + brod:txn_produce(Transaction, + ?OUTPUT_TOPIC_1, + Partition, + [#{ key => Key + , value => Value + }]), + + brod:txn_produce(Transaction, + ?OUTPUT_TOPIC_2, + Partition, + [#{ key => Key + , value => Value + }]) + end, Messages), + ok + end, Client, #{ topics => [?INPUT_TOPIC] + , group_id => ?GROUP_ID}). + + +start_fetchers(ObserverPid, Client) -> + brod:start_link_group_subscriber(Client, + ?GROUP_ID, + [?OUTPUT_TOPIC_1, ?OUTPUT_TOPIC_2], + [], + [{isolation_level, read_committed}], + message_set, + ?MODULE, + #{ client => Client + , observer_pid => ObserverPid}). + + +%========== group subscriber callbacks +init(GroupId, #{ client := Client + , observer_pid := ObserverPid}) -> + {ok, #{ client => Client + , group_id => GroupId + , observer_pid => ObserverPid}}. + +handle_message(Topic, + Partition, + #kafka_message_set{ topic = Topic + , partition = Partition + , messages = Messages + }, + #{observer_pid := ObserverPid} = State) -> + + lists:foreach(fun(#kafka_message{key = Key}) -> + ObserverPid ! {Topic, Key} + end, Messages), + + {ok, ack, State}.