Skip to content

Commit

Permalink
Merge pull request #549 from thefunctionalgarden/feature/tx
Browse files Browse the repository at this point in the history
feature: support for transactions
  • Loading branch information
zmstone authored Mar 6, 2024
2 parents 9781a71 + 351a8db commit 6dd0150
Show file tree
Hide file tree
Showing 8 changed files with 1,453 additions and 2 deletions.
23 changes: 21 additions & 2 deletions scripts/setup-test-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ docker ps > /dev/null || {
exit 1
}

function docker_compose {
if command -v docker-compose ; then
docker-compose $@
else
docker compose version &> /dev/null
if [ $? -eq 0 ]; then
docker compose $@
else
exit "couldn't find docker compose, needed for testing"
fi
fi
}

VERSION=${KAFKA_VERSION:-1.1}
if [ -z $VERSION ]; then VERSION=$1; fi

Expand All @@ -26,8 +39,9 @@ export KAFKA_VERSION=$VERSION

TD="$(cd "$(dirname "$0")" && pwd)"

docker-compose -f $TD/docker-compose.yml down || true
docker-compose -f $TD/docker-compose.yml up -d
docker_compose -f $TD/docker-compose.yml down || true
docker_compose -f $TD/docker-compose.yml up -d


n=0
while [ "$(docker exec kafka-1 bash -c '/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --list')" != '' ]; do
Expand Down Expand Up @@ -57,6 +71,11 @@ create_topic "brod-group-coordinator-1" 3 2
create_topic "brod-demo-topic-subscriber" 3 2
create_topic "brod-demo-group-subscriber-koc" 3 2
create_topic "brod-demo-group-subscriber-loc" 3 2
create_topic "brod_txn_SUITE_1" 3 2
create_topic "brod_txn_SUITE_2" 3 2
create_topic "brod_txn_subscriber_input" 3 2
create_topic "brod_txn_subscriber_output_1" 3 2
create_topic "brod_txn_subscriber_output_2" 3 2
create_topic "brod_compression_SUITE"
create_topic "lz4-test"
create_topic "test-topic"
Expand Down
70 changes: 70 additions & 0 deletions src/brod.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@
, sync_produce_request_offset/2
]).

%% Transactions API
-export([ transaction/3
, txn_do/3
, txn_produce/5
, txn_produce/4
, txn_add_offsets/3
, commit/1
, abort/1
]).

%% Simple Consumer API
-export([ consume_ack/2
, consume_ack/4
Expand Down Expand Up @@ -155,6 +165,7 @@
, message_set/0
, offset/0
, offset_time/0
, offsets_to_commit/0
, partition/0
, partition_assignment/0
, partition_fun/0
Expand All @@ -168,6 +179,9 @@
, topic/0
, topic_partition/0
, value/0
, transactional_id/0
, transaction/0
, transaction_config/0
]).

-include("brod_int.hrl").
Expand All @@ -183,6 +197,7 @@
-type partition() :: kpro:partition().
-type topic_partition() :: {topic(), partition()}.
-type offset() :: kpro:offset(). %% Physical offset (an integer)
-type offsets_to_commit() :: kpro:offsets_to_commit().
-type key() :: undefined %% no key, transformed to <<>>
| binary().
-type value() :: undefined %% no value, transformed to <<>>
Expand All @@ -193,6 +208,12 @@
| kpro:msg_input() %% one magic v2 message
| kpro:batch_input(). %% maybe nested batch

-type transactional_id() :: brod_transaction:transactional_id().
-type transaction() :: brod_transaction:transaction().
-type transaction_config() :: brod_transaction:transaction_config().
-type txn_function() :: brod_transaction_processor:process_function().
-type txn_do_options() :: brod_transaction_processor:do_options().

-type msg_input() :: kpro:msg_input().
-type batch_input() :: [msg_input()].

Expand Down Expand Up @@ -1347,6 +1368,55 @@ fetch_committed_offsets(Client, GroupId) ->
main(X) -> brod_cli:main(X).
-endif.

%% @doc Start a new transaction, `TxId' will be the id of the transaction
%% @equiv brod_transaction:start_link/3
-spec transaction(client(), transactional_id(), transaction_config()) -> {ok, transaction()}.
transaction(Client, TxnId, Config) ->
brod_transaction:new(Client, TxnId, Config).

%% @doc Execute the function in the context of a fetch-produce cycle
%% with access to an open transaction.
%% @see brod_transaction_processor:do/3
-spec txn_do(txn_function(), client(), txn_do_options()) -> {ok, pid()}
| {error, any()}.
txn_do(ProcessFun, Client, Options) ->
brod_transaction_processor:do(ProcessFun, Client, Options).

%% @doc Produce the message (key and value) to the indicated topic-partition
%% synchronously.
%% @see brod_transaction:produce/5
-spec txn_produce(transaction(), topic(), partition(), key(), value()) ->
{ok, offset()} | {error, any()}.
txn_produce(Transaction, Topic, Partition, Key, Value) ->
brod_transaction:produce(Transaction, Topic, Partition, Key, Value).

%% @doc Produce the batch of messages to the indicated topic-partition
%% synchronously.
%% @see brod_transaction:produce/5
-spec txn_produce(transaction(), topic(), partition(), batch_input()) ->
{ok, offset()} | {error, any()}.
txn_produce(Transaction, Topic, Partition, Batch) ->
brod_transaction:produce(Transaction, Topic, Partition, Batch).

%% @doc Add the offset consumed by a group to the transaction.
%% @see brod_transaction:add_offsets/3
-spec txn_add_offsets(transaction(), group_id(), offsets_to_commit()) ->
ok | {error, any()}.
txn_add_offsets(Transaction, ConsumerGroup, Offsets) ->
brod_transaction:add_offsets(Transaction, ConsumerGroup, Offsets).

%% @doc Commit the transaction
%% @see brod_transaction:commit/1
-spec commit(transaction()) -> ok | {error, any()}.
commit(Transaction) ->
brod_transaction:commit(Transaction).

%% @doc Abort the transaction
%% @see brod_transaction:abort/1
-spec abort(transaction()) -> ok | {error, any()}.
abort(Transaction) ->
brod_transaction:abort(Transaction).

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
Expand Down
26 changes: 26 additions & 0 deletions src/brod_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
-export([ get_consumer/3
, get_connection/3
, get_group_coordinator/2
, get_transactional_coordinator/2
, get_leader_connection/3
, get_metadata/2
, get_metadata_safe/2
Expand Down Expand Up @@ -92,6 +93,7 @@
-type partition() :: brod:partition().
-type config() :: proplists:proplist().
-type group_id() :: brod:group_id().
-type transactional_id() :: brod:transactional_id().

-type partition_worker_key() :: ?PRODUCER_KEY(topic(), partition())
| ?CONSUMER_KEY(topic(), partition()).
Expand Down Expand Up @@ -281,6 +283,14 @@ get_partitions_count_safe(Client, Topic) ->
get_group_coordinator(Client, GroupId) ->
safe_gen_call(Client, {get_group_coordinator, GroupId}, infinity).

%% @doc Get broker endpoint and connection config for
%% connecting a transactional coordinator.
-spec get_transactional_coordinator(client(), transactional_id()) ->
{ok, {endpoint(), brod:conn_config()}} | {error, any()}.
get_transactional_coordinator(Client, TransactionId) ->
safe_gen_call(Client, {get_transactional_coordinator, TransactionId}, infinity).


%% @doc Register self() as a partition producer.
%%
%% The pid is registered in an ETS table, then the callers
Expand Down Expand Up @@ -384,6 +394,9 @@ handle_call({get_connection, Host, Port}, _From, State) ->
handle_call({get_group_coordinator, GroupId}, _From, State) ->
{Result, NewState} = do_get_group_coordinator(State, GroupId),
{reply, Result, NewState};
handle_call({get_transactional_coordinator, TransactionId}, _From, State) ->
{Result, NewState} = do_get_transactional_coordinator(State, TransactionId),
{reply, Result, NewState};
handle_call({start_producer, TopicName, ProducerConfig}, _From, State) ->
{Reply, NewState} = do_start_producer(TopicName, ProducerConfig, State),
{reply, Reply, NewState};
Expand Down Expand Up @@ -650,6 +663,19 @@ do_get_group_coordinator(State0, GroupId) ->
{{error, Reason}, State}
end.

-spec do_get_transactional_coordinator(state(), transactional_id()) ->
{Result, state()} when Result :: {ok, connection()} | {error, any()}.
do_get_transactional_coordinator(State0, TransactionId) ->
State = ensure_metadata_connection(State0),
MetaConn = get_metadata_connection(State),
Timeout = timeout(State),
case kpro:discover_coordinator(MetaConn, txn, TransactionId, Timeout) of
{ok, Endpoint} ->
{{ok, {Endpoint, conn_config(State)}}, State};
{error, Reason} ->
{{error, Reason}, State}
end.

timeout(#state{config = Config}) ->
timeout(Config);
timeout(Config) ->
Expand Down
Loading

0 comments on commit 6dd0150

Please sign in to comment.