Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix folding over transactions #589

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 56 additions & 18 deletions src/brod_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
, epoch_ms/0
, fetch/4
, fetch/5
, fetch_one_batch/4
, fold/8
, fetch_committed_offsets/3
, fetch_committed_offsets/4
Expand Down Expand Up @@ -66,6 +67,8 @@
-type req_fun() :: fun((offset(), kpro:count()) -> kpro:req()).
-type fetch_fun() :: fun((offset()) -> {ok, {offset(), [brod:message()]}} |
{error, any()}).
-type fetch_fun2() :: fun((offset()) -> {ok, {offset(), offset(), [brod:message()]}} |
{error, any()}).
-type connection() :: kpro:connection().
-type conn_config() :: brod:conn_config().
-type topic() :: brod:topic().
Expand Down Expand Up @@ -331,7 +334,7 @@ fold(Client, Topic, Partition, Offset, Opts,
?BROD_FOLD_RET(Acc, Offset, {error, Reason})
end;
fold(Conn, Topic, Partition, Offset, Opts, Acc, Fun, Limits) ->
Fetch = make_fetch_fun(Conn, Topic, Partition, Opts),
Fetch = make_fetch_fun2(Conn, Topic, Partition, Opts),
Infinity = 1 bsl 64,
EndOffset = maps:get(reach_offset, Limits, Infinity),
CountLimit = maps:get(message_count, Limits, Infinity),
Expand All @@ -346,13 +349,21 @@ fold(Conn, Topic, Partition, Offset, Opts, Acc, Fun, Limits) ->
-spec make_fetch_fun(pid(), topic(), partition(), brod:fetch_opts()) ->
fetch_fun().
make_fetch_fun(Conn, Topic, Partition, FetchOpts) ->
make_fetch_fun(Conn, Topic, Partition, FetchOpts, fun fetch/4).

-spec make_fetch_fun2(pid(), topic(), partition(), brod:fetch_opts()) ->
fetch_fun2().
make_fetch_fun2(Conn, Topic, Partition, FetchOpts) ->
make_fetch_fun(Conn, Topic, Partition, FetchOpts, fun fetch_one_batch/4).

make_fetch_fun(Conn, Topic, Partition, FetchOpts, FetchFun) ->
WaitTime = maps:get(max_wait_time, FetchOpts, 1000),
MinBytes = maps:get(min_bytes, FetchOpts, 1),
MaxBytes = maps:get(max_bytes, FetchOpts, 1 bsl 20),
IsolationLevel = maps:get(isolation_level, FetchOpts, ?kpro_read_committed),
ReqFun = make_req_fun(Conn, Topic, Partition, WaitTime,
MinBytes, IsolationLevel),
fun(Offset) -> ?MODULE:fetch(Conn, ReqFun, Offset, MaxBytes) end.
fun(Offset) -> FetchFun(Conn, ReqFun, Offset, MaxBytes) end.

-spec make_part_fun(brod:partitioner()) -> brod:partition_fun().
make_part_fun(random) ->
Expand Down Expand Up @@ -445,12 +456,37 @@ do_fetch_committed_offsets(Conn, GroupId, Topics) when is_pid(Conn) ->
-spec fetch(connection(), req_fun(), offset(), kpro:count()) ->
{ok, {offset(), [brod:message()]}} | {error, any()}.
fetch(Conn, ReqFun, Offset, MaxBytes) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean something like this:

fetch(Conn, ReqFun, Offset, MaxBytes, AllowEmptyBatch) ->
  case do_fetch(Conn, ReqFun, Offset, MaxBytes) of
    {ok, {StableOffset, NextOffset, []}} when not AllowEmptyBatch andalso NextOffset < StableOffset ->
      fetch(Conn, ReqFun, NextOffset, MaxBytes, AllowEmptyBatch);
    Other ->
      Other
end.

fetch(Conn, ReqFun, Offset, MaxBytes) ->
  case do_fetch(Conn, ReqFun, Offset, MaxBytes) of
    {ok, {StableOffset, _NextOffset, Msgs}} ->
      {ok, {StableOffset, Msgs}}; %% for backward compatibility
    Other ->
      Other
end.

so the caller will not have to deal with empty batches

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, there already is brod_utils:fetch/5 that is also exported:

fetch(Hosts, Topic, Partition, Offset, Opts) when is_list(Hosts) ->
fetch({Hosts, ConnCfg}, Topic, Partition, Offset, Opts) ->
fetch(Client, Topic, Partition, Offset, Opts) when is_atom(Client) ->
fetch(Conn, Topic, Partition, Offset, Opts) ->

Under the hood, it uses make_fetch_fun, which uses fetch/4.

Is it okay to add a new fetch/5 with quite a different signature? I want to be sure, as I don't write erlang daily. Also, do you think AllowEmptyBatch should be replaced with a list of options instead of a boolean flag? I'm okay with doing it either way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, sorry for missing that.
maybe fetch_one_batch ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zmstone I got distracted, but finally addressed your comments

case do_fetch(Conn, ReqFun, Offset, MaxBytes) of
{ok, {StableOffset, _NextOffset, Msgs}} ->
{ok, {StableOffset, Msgs}}; %% for backward compatibility
Other ->
Other
end.

%% @doc Fetch a message-set. If the given MaxBytes is not enough to fetch a
%% single message, expand it to fetch exactly one message
%% The fetch/4 may return an empty batch even if there can be more messages in
%% the topic. This function returns a non-empty batch unless the stable offset
%% is reached.
-spec fetch_one_batch(connection(), req_fun(), offset(), kpro:count()) ->
{ok, {offset(), offset(), [brod:message()]}} | {error, any()}.
fetch_one_batch(Conn, ReqFun, Offset, MaxBytes) ->
case do_fetch(Conn, ReqFun, Offset, MaxBytes) of
{ok, {StableOffset, NextOffset, []}} when NextOffset < StableOffset ->
fetch_one_batch(Conn, ReqFun, NextOffset, MaxBytes);
Other ->
Other
end.

-spec do_fetch(connection(), req_fun(), offset(), kpro:count()) ->
{ok, {offset(), offset(), [brod:message()]}} | {error, any()}.
do_fetch(Conn, ReqFun, Offset, MaxBytes) ->
Request = ReqFun(Offset, MaxBytes),
case request_sync(Conn, Request, infinity) of
{ok, #{error_code := ErrorCode}} when ?IS_ERROR(ErrorCode) ->
{error, ErrorCode};
{ok, #{batches := ?incomplete_batch(Size)}} ->
fetch(Conn, ReqFun, Offset, Size);
do_fetch(Conn, ReqFun, Offset, Size);
{ok, #{header := Header, batches := Batches}} ->
StableOffset = get_stable_offset(Header),
{NewBeginOffset0, Msgs} = flatten_batches(Offset, Header, Batches),
Expand All @@ -472,9 +508,9 @@ fetch(Conn, ReqFun, Offset, MaxBytes) ->
%% we can only bump begin_offset with +1 and try again.
NewBeginOffset0 + 1
end,
fetch(Conn, ReqFun, NewBeginOffset, MaxBytes);
do_fetch(Conn, ReqFun, NewBeginOffset, MaxBytes);
false ->
{ok, {StableOffset, Msgs}}
{ok, {StableOffset, NewBeginOffset0, Msgs}}
end;
{error, Reason} ->
{error, Reason}
Expand Down Expand Up @@ -636,32 +672,34 @@ do_fold(Spawn, {Pid, Mref}, Offset, Acc, Fun, End, Count) ->

handle_fetch_rsp(_Spawn, {error, Reason}, Offset, Acc, _Fun, _, _) ->
?BROD_FOLD_RET(Acc, Offset, {fetch_failure, Reason});
handle_fetch_rsp(_Spawn, {ok, {StableOffset, []}}, Offset, Acc, _Fun,
handle_fetch_rsp(_Spawn, {ok, {StableOffset, _NextFetchOffset, []}}, Offset, Acc, _Fun,
_End, _Count) when Offset >= StableOffset ->
?BROD_FOLD_RET(Acc, Offset, reached_end_of_partition);
handle_fetch_rsp(Spawn, {ok, {_StableOffset, Msgs}}, Offset, Acc, Fun,
handle_fetch_rsp(Spawn, {ok, {_StableOffset, NextFetchOffset, Msgs}}, Offset, Acc, Fun,
End, Count) ->
#kafka_message{offset = LastOffset} = lists:last(Msgs),
%% start fetching the next batch if not stopping at current
Fetcher = case LastOffset < End andalso length(Msgs) < Count of
true -> Spawn(LastOffset + 1);
Fetcher = case NextFetchOffset =< End andalso length(Msgs) < Count of
true -> Spawn(NextFetchOffset);
false -> undefined
end,
do_acc(Spawn, Fetcher, Offset, Acc, Fun, Msgs, End, Count).
do_acc(Spawn, Fetcher, NextFetchOffset, Offset, Acc, Fun, Msgs, End, Count).

do_acc(_Spawn, Fetcher, Offset, Acc, _Fun, _, _End, 0) ->
do_acc(_Spawn, Fetcher, _NextFetchOffset, Offset, Acc, _Fun, _, _End, 0) ->
undefined = Fetcher, %% assert
?BROD_FOLD_RET(Acc, Offset, reached_message_count_limit);
do_acc(_Spawn, Fetcher, Offset, Acc, _Fun, _, End, _Count) when Offset > End ->
do_acc(_Spawn, Fetcher, _NextFetchOffset, Offset, Acc, _Fun, _, End, _Count) when Offset > End ->
undefined = Fetcher, %% assert
?BROD_FOLD_RET(Acc, Offset, reached_target_offset);
do_acc(Spawn, Fetcher, Offset, Acc, Fun, [], End, Count) ->
do_fold(Spawn, Fetcher, Offset, Acc, Fun, End, Count);
do_acc(Spawn, Fetcher, Offset, Acc, Fun, [Msg | Rest], End, Count) ->
do_acc(_Spawn, Fetcher, NextFetchOffset, _Offset, Acc, _Fun, [], End, _Count)
when NextFetchOffset > End ->
undefined = Fetcher, %% assert
?BROD_FOLD_RET(Acc, NextFetchOffset, reached_target_offset);
do_acc(Spawn, Fetcher, NextFetchOffset, _Offset, Acc, Fun, [], End, Count) ->
do_fold(Spawn, Fetcher, NextFetchOffset, Acc, Fun, End, Count);
do_acc(Spawn, Fetcher, NextFetchOffset, Offset, Acc, Fun, [Msg | Rest], End, Count) ->
try Fun(Msg, Acc) of
{ok, NewAcc} ->
NextOffset = Msg#kafka_message.offset + 1,
do_acc(Spawn, Fetcher, NextOffset, NewAcc, Fun, Rest, End, Count - 1);
do_acc(Spawn, Fetcher, NextFetchOffset, NextOffset, NewAcc, Fun, Rest, End, Count - 1);
{error, Reason} ->
ok = kill_fetcher(Fetcher),
?BROD_FOLD_RET(Acc, Offset, Reason)
Expand Down
21 changes: 21 additions & 0 deletions test/brod_consumer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
, t_fetch_aborted_from_the_middle/1
, t_direct_fetch/1
, t_fold/1
, t_fold_transactions/1
, t_direct_fetch_with_small_max_bytes/1
, t_direct_fetch_expand_max_bytes/1
, t_resolve_offset/1
Expand Down Expand Up @@ -422,6 +423,26 @@ t_fold(Config) when is_list(Config) ->
0, ErrorFoldF, #{})),
ok.

t_fold_transactions(kafka_version_match) ->
has_txn();
t_fold_transactions(Config) when is_list(Config) ->
Client = ?config(client),
Topic = ?TOPIC,
Partition = 0,
Batch = [#{value => <<"one">>}, #{value => <<"two">>}],
{ok, Tx} = brod:transaction(Client, <<"some_transaction">>, []),
{ok, Offset} = brod:txn_produce(Tx, ?TOPIC, Partition, Batch),
ok = brod:commit(Tx),
FoldF =
fun F(#kafka_message{value = V}, Acc) -> {ok, F(V, Acc)};
F(V, Acc) -> [V | Acc]
end,
FetchOpts = #{max_bytes => 1},
?assertMatch({Result, O, reached_end_of_partition}
when O =:= Offset + length(Batch) + 1 andalso length(Result) =:= 2,
brod:fold(Client, Topic, Partition, Offset, FetchOpts, [], FoldF, #{})),
ok.

%% This test case does not work with Kafka 0.9, not sure aobut 0.10 and 0.11
%% since all 0.x versions are old enough, we only try to verify this against
%% 1.x or newer
Expand Down
Loading