diff --git a/CHANGELOG.md b/CHANGELOG.md index 1feefd57..820e4874 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +- 3.19.0 + - Forward unhandled messages in topic/group consumer processes to handle_info/2 callbacks + in order to support arbitrary message passing [PR#580](https://github.com/kafka4beam/brod/pull/580) + - 3.18.0 - Add transactional APIs. [PR#549](https://github.com/kafka4beam/brod/pull/549) - Fix unnecessary group coordinator restart due to `hb_timeout` exception. [PR#578](https://github.com/kafka4beam/brod/pull/578) diff --git a/src/brod_group_subscriber_worker.erl b/src/brod_group_subscriber_worker.erl index 4f0c40ae..7dd69f9f 100644 --- a/src/brod_group_subscriber_worker.erl +++ b/src/brod_group_subscriber_worker.erl @@ -22,7 +22,7 @@ -include("brod_int.hrl"). %% brod_topic_subscriber callbacks --export([init/2, handle_message/3, terminate/2]). +-export([init/2, handle_message/3, handle_info/2, terminate/2]). -type start_options() :: #{ group_id := brod:group_id() @@ -91,6 +91,15 @@ handle_message(_Partition, Msg, State) -> {ok, NewState} end. +handle_info(Info, #state{cb_module = CbModule , cb_state = CbState} = State) -> + %% Any unhandled messages are forwarded to the callback module to + %% support arbitrary message-passing. + %% Only the {noreply, State} return value is supported. + case brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], {noreply, CbState}) of + {noreply, NewCbState} -> + {noreply, State#state{cb_state = NewCbState}} + end. + terminate(Reason, #state{cb_module = CbModule, cb_state = State}) -> brod_utils:optional_callback(CbModule, terminate, [Reason, State], ok). diff --git a/src/brod_topic_subscriber.erl b/src/brod_topic_subscriber.erl index 08f6512d..9b8add98 100644 --- a/src/brod_topic_subscriber.erl +++ b/src/brod_topic_subscriber.erl @@ -108,7 +108,12 @@ %% This callback is called before stopping the subscriber -callback terminate(_Reason, cb_state()) -> _. --optional_callbacks([terminate/2]). +%% This callback is called when the subscriber receives a message unrelated to +%% the subscription. +%% The callback must return `{noreply, NewCallbackState}'. +-callback handle_info(_Msg, cb_state()) -> {noreply, cb_state()}. + +-optional_callbacks([terminate/2, handle_info/2]). %%%_* Types and macros ========================================================= @@ -357,8 +362,14 @@ handle_info({'DOWN', _Mref, process, Pid, Reason}, %% not a consumer pid {noreply, State} end; -handle_info(_Info, State) -> - {noreply, State}. +handle_info(Info, #state{cb_module = CbModule, cb_state = CbState} = State) -> + %% Any unhandled messages are forwarded to the callback module to + %% support arbitrary message-passing. + %% Only the {noreply, State} return value is supported. + case brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], {noreply, CbState}) of + {noreply, NewCbState} -> + {noreply, State#state{cb_state = NewCbState}} + end. %% @private handle_call(Call, _From, State) -> diff --git a/test/brod_topic_subscriber_SUITE.erl b/test/brod_topic_subscriber_SUITE.erl index 9c022a9d..49f72e91 100644 --- a/test/brod_topic_subscriber_SUITE.erl +++ b/test/brod_topic_subscriber_SUITE.erl @@ -30,6 +30,7 @@ -export([ init/2 , terminate/2 , handle_message/3 + , handle_info/2 ]). %% Test cases @@ -40,6 +41,7 @@ , t_callback_crash/1 , t_begin_offset/1 , t_cb_fun/1 + , t_consumer_ack_via_message_passing/1 ]). -include("brod_test_setup.hrl"). @@ -107,6 +109,21 @@ handle_message(Partition, Message, #state{ is_async_ack = IsAsyncAck false -> {ok, ack, State} end. +handle_info({ack_offset, Partition, Offset} = Msg, #state{ counter = Counter + , worker_id = Ref + } = State0) -> + %% Participate in state continuity checks + ?tp(topic_subscriber_seen_info, + #{ partition => Partition + , offset => Offset + , msg => Msg + , state => Counter + , worker_id => Ref + }), + State = State0#state{counter = Counter + 1}, + ok = brod_topic_subscriber:ack(self(), Partition, Offset), + {noreply, State}. + terminate(Reason, #state{worker_id = Ref, counter = Counter}) -> ?tp(topic_subscriber_terminate, #{ worker_id => Ref @@ -184,6 +201,44 @@ t_async_acks(Config) when is_list(Config) -> check_init_terminate(Trace) end). +t_consumer_ack_via_message_passing(Config) when is_list(Config) -> + %% Process messages one by one with no prefetch + ConsumerConfig = [ {prefetch_count, 0} + , {prefetch_bytes, 0} + , {sleep_timeout, 0} + , {max_bytes, 0} + ], + Partition = 0, + SendFun = + fun(I) -> + produce({?topic, Partition}, <>) + end, + ?check_trace( + %% Run stage: + begin + O0 = SendFun(0), + %% Send two messages + Offset0 = SendFun(1), + _Offset1 = SendFun(2), + InitArgs = {_IsAsyncAck = true, + _ConsumerOffsets = [{0, O0}]}, + {ok, SubscriberPid} = + brod:start_link_topic_subscriber(?CLIENT_ID, ?topic, ConsumerConfig, + ?MODULE, InitArgs), + {ok, _} = wait_message(<<1>>), + %% ack_offset allows consumer to proceed to message 2 + SubscriberPid ! {ack_offset, 0, Offset0}, + {ok, _} = wait_message(<<2>>), + ok = brod_topic_subscriber:stop(SubscriberPid), + _Expected = [<<1>>, <<2>>] + end, + %% Check stage: + fun(Expected, Trace) -> + check_received_messages(Expected, Trace), + check_state_continuity(Trace), + check_init_terminate(Trace) + end). + t_begin_offset(Config) when is_list(Config) -> ConsumerConfig = [ {prefetch_count, 100} , {prefetch_bytes, 0} %% as discard