Skip to content

Commit

Permalink
Merge pull request #580 from salemove/consumer_handle_info_callback_p…
Browse files Browse the repository at this point in the history
…roposal

Proposal: Forward unhandled messages to optional subscriber handle_info callback
  • Loading branch information
zmstone authored Jun 9, 2024
2 parents c34fa87 + f32caaf commit aba0511
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

- 3.19.0
- Forward unhandled messages in topic/group consumer processes to handle_info/2 callbacks
in order to support arbitrary message passing [PR#580](https://github.com/kafka4beam/brod/pull/580)

- 3.18.0
- Add transactional APIs. [PR#549](https://github.com/kafka4beam/brod/pull/549)
- Fix unnecessary group coordinator restart due to `hb_timeout` exception. [PR#578](https://github.com/kafka4beam/brod/pull/578)
Expand Down
11 changes: 10 additions & 1 deletion src/brod_group_subscriber_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-include("brod_int.hrl").

%% brod_topic_subscriber callbacks
-export([init/2, handle_message/3, terminate/2]).
-export([init/2, handle_message/3, handle_info/2, terminate/2]).

-type start_options() ::
#{ group_id := brod:group_id()
Expand Down Expand Up @@ -91,6 +91,15 @@ handle_message(_Partition, Msg, State) ->
{ok, NewState}
end.

handle_info(Info, #state{cb_module = CbModule , cb_state = CbState} = State) ->
%% Any unhandled messages are forwarded to the callback module to
%% support arbitrary message-passing.
%% Only the {noreply, State} return value is supported.
case brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], {noreply, CbState}) of
{noreply, NewCbState} ->
{noreply, State#state{cb_state = NewCbState}}
end.

terminate(Reason, #state{cb_module = CbModule, cb_state = State}) ->
brod_utils:optional_callback(CbModule, terminate, [Reason, State], ok).

Expand Down
17 changes: 14 additions & 3 deletions src/brod_topic_subscriber.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@
%% This callback is called before stopping the subscriber
-callback terminate(_Reason, cb_state()) -> _.

-optional_callbacks([terminate/2]).
%% This callback is called when the subscriber receives a message unrelated to
%% the subscription.
%% The callback must return `{noreply, NewCallbackState}'.
-callback handle_info(_Msg, cb_state()) -> {noreply, cb_state()}.

-optional_callbacks([terminate/2, handle_info/2]).

%%%_* Types and macros =========================================================

Expand Down Expand Up @@ -357,8 +362,14 @@ handle_info({'DOWN', _Mref, process, Pid, Reason},
%% not a consumer pid
{noreply, State}
end;
handle_info(_Info, State) ->
{noreply, State}.
handle_info(Info, #state{cb_module = CbModule, cb_state = CbState} = State) ->
%% Any unhandled messages are forwarded to the callback module to
%% support arbitrary message-passing.
%% Only the {noreply, State} return value is supported.
case brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], {noreply, CbState}) of
{noreply, NewCbState} ->
{noreply, State#state{cb_state = NewCbState}}
end.

%% @private
handle_call(Call, _From, State) ->
Expand Down
55 changes: 55 additions & 0 deletions test/brod_topic_subscriber_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
-export([ init/2
, terminate/2
, handle_message/3
, handle_info/2
]).

%% Test cases
Expand All @@ -40,6 +41,7 @@
, t_callback_crash/1
, t_begin_offset/1
, t_cb_fun/1
, t_consumer_ack_via_message_passing/1
]).

-include("brod_test_setup.hrl").
Expand Down Expand Up @@ -107,6 +109,21 @@ handle_message(Partition, Message, #state{ is_async_ack = IsAsyncAck
false -> {ok, ack, State}
end.

handle_info({ack_offset, Partition, Offset} = Msg, #state{ counter = Counter
, worker_id = Ref
} = State0) ->
%% Participate in state continuity checks
?tp(topic_subscriber_seen_info,
#{ partition => Partition
, offset => Offset
, msg => Msg
, state => Counter
, worker_id => Ref
}),
State = State0#state{counter = Counter + 1},
ok = brod_topic_subscriber:ack(self(), Partition, Offset),
{noreply, State}.

terminate(Reason, #state{worker_id = Ref, counter = Counter}) ->
?tp(topic_subscriber_terminate,
#{ worker_id => Ref
Expand Down Expand Up @@ -184,6 +201,44 @@ t_async_acks(Config) when is_list(Config) ->
check_init_terminate(Trace)
end).

t_consumer_ack_via_message_passing(Config) when is_list(Config) ->
%% Process messages one by one with no prefetch
ConsumerConfig = [ {prefetch_count, 0}
, {prefetch_bytes, 0}
, {sleep_timeout, 0}
, {max_bytes, 0}
],
Partition = 0,
SendFun =
fun(I) ->
produce({?topic, Partition}, <<I>>)
end,
?check_trace(
%% Run stage:
begin
O0 = SendFun(0),
%% Send two messages
Offset0 = SendFun(1),
_Offset1 = SendFun(2),
InitArgs = {_IsAsyncAck = true,
_ConsumerOffsets = [{0, O0}]},
{ok, SubscriberPid} =
brod:start_link_topic_subscriber(?CLIENT_ID, ?topic, ConsumerConfig,
?MODULE, InitArgs),
{ok, _} = wait_message(<<1>>),
%% ack_offset allows consumer to proceed to message 2
SubscriberPid ! {ack_offset, 0, Offset0},
{ok, _} = wait_message(<<2>>),
ok = brod_topic_subscriber:stop(SubscriberPid),
_Expected = [<<1>>, <<2>>]
end,
%% Check stage:
fun(Expected, Trace) ->
check_received_messages(Expected, Trace),
check_state_continuity(Trace),
check_init_terminate(Trace)
end).

t_begin_offset(Config) when is_list(Config) ->
ConsumerConfig = [ {prefetch_count, 100}
, {prefetch_bytes, 0} %% as discard
Expand Down

0 comments on commit aba0511

Please sign in to comment.