From 6ee6c859125da9d91f53f4502ab887ca8fd14937 Mon Sep 17 00:00:00 2001 From: zmstone Date: Sat, 21 Sep 2024 20:57:08 +0200 Subject: [PATCH] feat: optimize fetch latency Introduced `share_leader_conn` consumer config option. Set to `true' to consume less TCP connections towards Kafka, but may lead to higher fetch latency. This is because Kafka can ony accumulate messages for the oldest fetch request, later requests behind it may get blocked until `max_wait_time' expires for the oldest one --- CHANGELOG.md | 5 ++ src/brod.erl | 1 + src/brod_client.erl | 11 +++- src/brod_consumer.erl | 104 +++++++++++++++++++++++++---------- test/brod_consumer_SUITE.erl | 51 ++++++++++++++--- 5 files changed, 134 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index de7f7056..efe60210 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +- 4.2.0 + - Optimize consumer fetch latency. + Introduced the `share_leader_conn` consumer configuration option (default: `false`). + This setting allows users to opt for the previous behavior if preferred (set to `true`). + - 4.1.1 - Upgrade `kafka_protocol` from version 4.1.5 to 4.1.9. diff --git a/src/brod.erl b/src/brod.erl index ee866a5c..d7dbedfb 100644 --- a/src/brod.erl +++ b/src/brod.erl @@ -262,6 +262,7 @@ | {offset_reset_policy, brod_consumer:offset_reset_policy()} | {size_stat_window, non_neg_integer()} | {isolation_level, brod_consumer:isolation_level()} + | {share_leader_conn, boolean()} ]. %% Consumer configuration. %% diff --git a/src/brod_client.erl b/src/brod_client.erl index 8f4aad24..d52f1b38 100644 --- a/src/brod_client.erl +++ b/src/brod_client.erl @@ -1,5 +1,5 @@ -%%% %%% Copyright (c) 2015-2021 Klarna Bank AB (publ) +%%% Copyright (c) 2022-2024 kafka4beam contributors %%% %%% Licensed under the Apache License, Version 2.0 (the "License"); %%% you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ , get_group_coordinator/2 , get_transactional_coordinator/2 , get_leader_connection/3 + , get_bootstrap/1 , get_metadata/2 , get_metadata_safe/2 , get_partitions_count/2 @@ -227,6 +228,10 @@ stop_consumer(Client, TopicName) -> get_leader_connection(Client, Topic, Partition) -> safe_gen_call(Client, {get_leader_connection, Topic, Partition}, infinity). +-spec get_bootstrap(client()) -> {ok, brod:bootstrap()} | {error, any()}. +get_bootstrap(Client) -> + safe_gen_call(Client, get_bootstrap, infinity). + %% @doc Get connection to a kafka broker. %% %% Return already established connection towards the broker, @@ -388,6 +393,10 @@ handle_call({stop_consumer, Topic}, _From, State) -> handle_call({get_leader_connection, Topic, Partition}, _From, State) -> {Result, NewState} = do_get_leader_connection(State, Topic, Partition), {reply, Result, NewState}; +handle_call(get_bootstrap, _From, State) -> + #state{bootstrap_endpoints = Endpoints} = State, + ConnConfig = conn_config(State), + {reply, {ok, {Endpoints, ConnConfig}}, State}; handle_call({get_connection, Host, Port}, _From, State) -> {Result, NewState} = maybe_connect(State, {Host, Port}), {reply, Result, NewState}; diff --git a/src/brod_consumer.erl b/src/brod_consumer.erl index b84e001f..a2647e69 100644 --- a/src/brod_consumer.erl +++ b/src/brod_consumer.erl @@ -1,4 +1,5 @@ %%% Copyright (c) 2014-2021 Klarna Bank AB (publ) +%%% Copyright (c) 2022-2024 kafka4beam contributors %%% %%% Licensed under the Apache License, Version 2.0 (the "License"); %%% you may not use this file except in compliance with the License. @@ -93,7 +94,10 @@ -type pending_acks() :: #pending_acks{}. -type isolation_level() :: kpro:isolation_level(). --record(state, { bootstrap :: pid() | brod:bootstrap() +-define(GET_FROM_CLIENT, get). +-define(IGNORE, ignore). +-record(state, { client_pid :: ?IGNORE | pid() + , bootstrap :: ?IGNORE | ?GET_FROM_CLIENT | brod:bootstrap() , connection :: ?undef | pid() , topic :: binary() , partition :: integer() @@ -136,6 +140,7 @@ -define(INIT_CONNECTION, init_connection). -define(DEFAULT_AVG_WINDOW, 5). -define(DEFAULT_ISOLATION_LEVEL, ?kpro_read_committed). +-define(DEFAULT_SHARE_LEADER_CONN, false). %%%_* APIs ===================================================================== %% @equiv start_link(ClientPid, Topic, Partition, Config, []) @@ -220,6 +225,16 @@ start_link(Bootstrap, Topic, Partition, Config) -> %% and `read_committed' to get only the records from committed %% transactions %% +%%
  • `share_leader_conn': (optional, default = `false') +%% +%% Whether or not share the partition leader connection with +%% other producers or consumers. +%% Set to `true' to consume less TCP connections towards Kafka, +%% but may lead to higher fetch latency. This is because Kafka can +%% ony accumulate messages for the oldest fetch request, later +%% requests behind it may get blocked until `max_wait_time' expires +%% for the oldest one
  • +%% %% %% @end -spec start_link(pid() | brod:bootstrap(), @@ -286,7 +301,7 @@ get_connection(Pid) -> %%%_* gen_server callbacks ===================================================== -init({Bootstrap, Topic, Partition, Config}) -> +init({Bootstrap0, Topic, Partition, Config}) -> erlang:process_flag(trap_exit, true), Cfg = fun(Name, Default) -> proplists:get_value(Name, Config, Default) @@ -300,15 +315,33 @@ init({Bootstrap, Topic, Partition, Config}) -> BeginOffset = Cfg(begin_offset, ?DEFAULT_BEGIN_OFFSET), OffsetResetPolicy = Cfg(offset_reset_policy, ?DEFAULT_OFFSET_RESET_POLICY), IsolationLevel = Cfg(isolation_level, ?DEFAULT_ISOLATION_LEVEL), - - %% If bootstrap is a client pid, register self to the client - case is_shared_conn(Bootstrap) of + IsShareConn = Cfg(share_leader_conn, ?DEFAULT_SHARE_LEADER_CONN), + + %% resolve connection bootstrap args + {ClientPid, Bootstrap} = + case is_pid(Bootstrap0) of + true when IsShareConn -> + %% share leader connection with other producers/consumers + %% the connection is to be managed by brod_client + {Bootstrap0, ?IGNORE}; + true -> + %% not sharing leader connection with other producers/consumers + %% the bootstrap args will be resolved later when it's + %% time to establish a connection to partition leader + {Bootstrap0, ?GET_FROM_CLIENT}; + false -> + %% this consumer process is not started from `brod' APIs + %% maybe managed by other supervisors. + {?IGNORE, Bootstrap0} + end, + case is_pid(ClientPid) of true -> ok = brod_client:register_consumer(Bootstrap, Topic, Partition); false -> ok end, - {ok, #state{ bootstrap = Bootstrap + {ok, #state{ client_pid = ClientPid + , bootstrap = Bootstrap , topic = Topic , partition = Partition , begin_offset = BeginOffset @@ -418,20 +451,26 @@ handle_cast(Cast, State) -> {noreply, State}. %% @private -terminate(Reason, #state{ bootstrap = Bootstrap +terminate(Reason, #state{ client_pid = ClientPid , topic = Topic , partition = Partition , connection = Connection + , connection_mref = Mref }) -> - IsShared = is_shared_conn(Bootstrap), IsNormal = brod_utils:is_normal_reason(Reason), %% deregister consumer if it's shared connection and normal shutdown - IsShared andalso IsNormal andalso - brod_client:deregister_consumer(Bootstrap, Topic, Partition), - %% close connection if it's working standalone - case not IsShared andalso is_pid(Connection) of - true -> kpro:close_connection(Connection); - false -> ok + case is_pid(ClientPid) andalso IsNormal of + true -> + brod_client:deregister_consumer(ClientPid, Topic, Partition); + false -> + ok + end, + %% close connection if it's owned by this consumer + case Mref =:= ?undef andalso is_pid(Connection) andalso is_process_alive(Connection) of + true -> + kpro:close_connection(Connection); + false -> + ok end, %% write a log if it's not a normal reason IsNormal orelse ?BROD_LOG_ERROR("Consumer ~s-~w terminate reason: ~p", @@ -858,17 +897,19 @@ safe_gen_call(Server, Call, Timeout) -> -spec maybe_init_connection(state()) -> {ok, state()} | {{error, any()}, state()}. maybe_init_connection( - #state{ bootstrap = Bootstrap + #state{ client_pid = ClientPid + , bootstrap = Bootstrap , topic = Topic , partition = Partition , connection = ?undef } = State0) -> %% Lookup, or maybe (re-)establish a connection to partition leader - case connect_leader(Bootstrap, Topic, Partition) of + {MonitorOrLink, Result} = connect_leader(ClientPid, Bootstrap, Topic, Partition), + case Result of {ok, Connection} -> - Mref = case is_shared_conn(Bootstrap) of - true -> erlang:monitor(process, Connection); - false -> ?undef %% linked + Mref = case MonitorOrLink of + monitor -> erlang:monitor(process, Connection); + linked -> ?undef end, %% Switching to a new connection %% the response for last_req_ref will be lost forever @@ -883,13 +924,23 @@ maybe_init_connection( maybe_init_connection(State) -> {ok, State}. -connect_leader(ClientPid, Topic, Partition) when is_pid(ClientPid) -> - brod_client:get_leader_connection(ClientPid, Topic, Partition); -connect_leader(Endpoints, Topic, Partition) when is_list(Endpoints) -> - connect_leader({Endpoints, []}, Topic, Partition); -connect_leader({Endpoints, ConnCfg}, Topic, Partition) -> +connect_leader(ClientPid, ?IGNORE, Topic, Partition) when is_pid(ClientPid) -> + {monitor, brod_client:get_leader_connection(ClientPid, Topic, Partition)}; +connect_leader(ClientPid, ?GET_FROM_CLIENT, Topic, Partition) when is_pid(ClientPid) -> + case brod_client:get_bootstrap(ClientPid) of + {ok, Bootstrap} -> + link_connect_leader(Bootstrap, Topic, Partition); + {error, Reason} -> + {linked, {error, Reason}} + end; +connect_leader(?IGNORE, Bootstrap, Topic, Partition) -> + link_connect_leader(Bootstrap, Topic, Partition). + +link_connect_leader(Endpoints, Topic, Partition) when is_list(Endpoints) -> + link_connect_leader({Endpoints, []}, Topic, Partition); +link_connect_leader({Endpoints, ConnCfg}, Topic, Partition) -> %% connection pid is linked to self() - kpro:connect_partition_leader(Endpoints, ConnCfg, Topic, Partition). + {linked, kpro:connect_partition_leader(Endpoints, ConnCfg, Topic, Partition)}. %% Send a ?INIT_CONNECTION delayed loopback message to re-init. -spec maybe_send_init_connection(state()) -> ok. @@ -900,9 +951,6 @@ maybe_send_init_connection(#state{subscriber = Subscriber}) -> erlang:send_after(Timeout, self(), ?INIT_CONNECTION), ok. -%% In case 'bootstrap' is a client pid, connection is shared with other workers. -is_shared_conn(Bootstrap) -> is_pid(Bootstrap). - %%%_* Tests ==================================================================== -ifdef(TEST). diff --git a/test/brod_consumer_SUITE.erl b/test/brod_consumer_SUITE.erl index 77e602c0..f8e72262 100644 --- a/test/brod_consumer_SUITE.erl +++ b/test/brod_consumer_SUITE.erl @@ -37,7 +37,8 @@ , t_direct_fetch_expand_max_bytes/1 , t_resolve_offset/1 , t_consumer_max_bytes_too_small/1 - , t_consumer_connection_restart/1 + , t_consumer_connection_restart_0/1 + , t_consumer_connection_restart_1/1 , t_consumer_connection_restart_2/1 , t_consumer_resubscribe/1 , t_consumer_resubscribe_earliest/1 @@ -536,8 +537,20 @@ t_consumer_max_bytes_too_small(Config) -> end). %% @doc Consumer should auto recover from connection down, subscriber should not -%% notice a thing except for a few seconds of break in data streaming -t_consumer_connection_restart(Config) -> +%% notice a thing except for a few seconds of break in data streaming. +%% Covers the case when connection is shared with other partition leaders +t_consumer_connection_restart_0(Config) -> + ConsumerConfig = [{share_leader_conn, true} | consumer_config()], + consumer_connection_restart(Config, ConsumerConfig). + +%% @doc Consumer should auto recover from connection down, subscriber should not +%% notice a thing except for a few seconds of break in data streaming. +%% Covers the case when connection is NOT shared with other partition leaders +t_consumer_connection_restart_1(Config) -> + ConsumerConfig = [{share_leader_conn, false} | consumer_config()], + consumer_connection_restart(Config, ConsumerConfig). + +consumer_connection_restart(Config, ConsumerConfig) -> Client = ?config(client), Topic = ?TOPIC, Partition = 0, @@ -546,7 +559,7 @@ t_consumer_connection_restart(Config) -> , {prefetch_bytes, 0} , {min_bytes, 1} , {max_bytes, 12} %% ensure fetch exactly one message at a time - | consumer_config() + | ConsumerConfig ], {ok, ConsumerPid} = brod_consumer:start_link(whereis(Client), Topic, Partition, ConsumerCfg), @@ -586,11 +599,31 @@ t_consumer_connection_restart(Config) -> Nums2 = Receive(Nums1, 5000), ?assertError(timeout, Receive(Nums2, 100)), ?assertEqual(NumsCnt - 2, length(Nums2)), - ?assertEqual({ok, NewConnPid}, - brod_client:get_leader_connection(Client, Topic, Partition)), - ok = brod_consumer:stop(ConsumerPid), - ?assertNot(is_process_alive(ConsumerPid)), - ?assert(is_process_alive(NewConnPid)), %% managed by brod_client + case proplists:get_bool(share_leader_conn, ConsumerConfig) of + true -> + ?assertEqual({ok, NewConnPid}, + brod_client:get_leader_connection(Client, Topic, Partition)), + ok = brod_consumer:stop(ConsumerPid), + ?assertNot(is_process_alive(ConsumerPid)), + ?assert(is_process_alive(NewConnPid)); + false -> + %% assert normal shutdown + Ref1 = erlang:monitor(process, NewConnPid), + Ref2 = erlang:monitor(process, ConsumerPid), + %% assert connection linked to consumer + {links, Links} = process_info(ConsumerPid, links), + ?assert(lists:member(NewConnPid, Links)), + ok = brod_consumer:stop(ConsumerPid), + Wait = fun() -> + ?WAIT_ONLY({'DOWN', Ref, process, _, Reason}, + begin + ?assertEqual(normal, Reason), + ?assert(Ref =:= Ref1 orelse Ref =:= Ref2) + end) + end, + Wait(), + Wait() + end, ok. %% @doc same as t_consumer_connection_restart,