Skip to content

Commit

Permalink
Merge pull request #613 from mtvch/configure-unknown-topic-cache-ttl
Browse files Browse the repository at this point in the history
Added support for unknown_topic_cache_ttl option in brod_client
  • Loading branch information
zmstone authored Dec 14, 2024
2 parents e18151c + cf4b857 commit 40a6ef6
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 16 deletions.
6 changes: 5 additions & 1 deletion src/brod.erl
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,10 @@ start_client(BootstrapEndpoints, ClientId) ->
%% Producer configuration to use when auto_start_producers is true.
%% See {@link brod_producer:start_link/4} for details about producer config</li>
%%
%% <li>`unknown_topic_cache_ttl' (optional, default=120000)
%%
%% For how long unknown_topic error will be cached, in ms.</li>
%%
%% </ul>
%%
%% Connection options can be added to the same proplist. See
Expand Down Expand Up @@ -510,7 +514,7 @@ start_consumer(Client, TopicName, ConsumerConfig) ->
%% is not statically configured for them.
%% It is up to the callers how they want to distribute their data
%% (e.g. random, roundrobin or consistent-hashing) to the partitions.
%% NOTE: The partitions count is cached for 120 seconds.
%% NOTE: The partitions count is cached.
-spec get_partitions_count(client(), topic()) ->
{ok, pos_integer()} | {error, any()}.
get_partitions_count(Client, Topic) ->
Expand Down
42 changes: 27 additions & 15 deletions src/brod_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@

-define(DEFAULT_RECONNECT_COOL_DOWN_SECONDS, 1).
-define(DEFAULT_GET_METADATA_TIMEOUT_SECONDS, 5).
-define(DEFAULT_UNKNOWN_TOPIC_CACHE_TTL, 120000).

%% ClientId as ets table name.
-define(ETS(ClientId), ClientId).
Expand All @@ -85,8 +86,6 @@
-define(CONSUMER(Topic, Partition, Pid),
{?CONSUMER_KEY(Topic, Partition), Pid}).

-define(UNKNOWN_TOPIC_CACHE_EXPIRE_SECONDS, 120).

-type endpoint() :: brod:endpoint().
-type client() :: brod:client().
-type client_id() :: brod:client_id().
Expand Down Expand Up @@ -613,6 +612,7 @@ do_get_metadata(Topic, State) when not is_tuple(Topic) ->
do_get_metadata(FetchMetadataFor, Topic,
#state{ client_id = ClientId
, workers_tab = Ets
, config = Config
} = State0) ->
Topics = case FetchMetadataFor of
all -> all; %% in case no topic is given, get all
Expand All @@ -624,8 +624,11 @@ do_get_metadata(FetchMetadataFor, Topic,
case request_sync(State, Request) of
{ok, #kpro_rsp{api = metadata, msg = Metadata}} ->
TopicMetadataArray = kf(topics, Metadata),
ok = update_partitions_count_cache(Ets, TopicMetadataArray),
ok = maybe_cache_unknown_topic_partition(Ets, Topic, TopicMetadataArray),
UnknownTopicCacheTtl = config(unknown_topic_cache_ttl, Config,
?DEFAULT_UNKNOWN_TOPIC_CACHE_TTL),
ok = update_partitions_count_cache(Ets, TopicMetadataArray, UnknownTopicCacheTtl),
ok = maybe_cache_unknown_topic_partition(Ets, Topic, TopicMetadataArray,
UnknownTopicCacheTtl),
{{ok, Metadata}, State};
{error, Reason} ->
?BROD_LOG_ERROR("~p failed to fetch metadata for topics: ~p\n"
Expand Down Expand Up @@ -819,29 +822,30 @@ is_cooled_down(Ts, #state{config = Config}) ->

%% call this function after fetched metadata for all topics
%% to cache the not-found status of a given topic
maybe_cache_unknown_topic_partition(Ets, Topic, TopicMetadataArray) ->
maybe_cache_unknown_topic_partition(Ets, Topic, TopicMetadataArray, UnknownTopicCacheTtl) ->
case find_partition_count_in_topic_metadata_array(TopicMetadataArray, Topic) of
{error, unknown_topic_or_partition} = Err ->
_ = ets:insert(Ets, {?TOPIC_METADATA_KEY(Topic), Err, os:timestamp()}),
_ = ets:insert(Ets, {?TOPIC_METADATA_KEY(Topic), Err,
{expire, expire_ts(UnknownTopicCacheTtl)}}),
ok;
_ ->
%% do nothing when ok or any other error
ok
end.

-spec update_partitions_count_cache(ets:tab(), [kpro:struct()]) -> ok.
update_partitions_count_cache(_Ets, []) -> ok;
update_partitions_count_cache(Ets, [TopicMetadata | Rest]) ->
-spec update_partitions_count_cache(ets:tab(), [kpro:struct()], non_neg_integer()) -> ok.
update_partitions_count_cache(_Ets, [], _UnknownTopicCacheTtl) -> ok;
update_partitions_count_cache(Ets, [TopicMetadata | Rest], UnknownTopicCacheTtl) ->
Topic = kf(name, TopicMetadata),
case get_partitions_count_in_metadata(TopicMetadata) of
{ok, Cnt} ->
ets:insert(Ets, {?TOPIC_METADATA_KEY(Topic), Cnt, os:timestamp()});
ets:insert(Ets, {?TOPIC_METADATA_KEY(Topic), Cnt, {added, now_ts()}});
{error, ?unknown_topic_or_partition} = Err ->
ets:insert(Ets, {?TOPIC_METADATA_KEY(Topic), Err, os:timestamp()});
ets:insert(Ets, {?TOPIC_METADATA_KEY(Topic), Err, {expire, expire_ts(UnknownTopicCacheTtl)}});
{error, _Reason} ->
ok
end,
update_partitions_count_cache(Ets, Rest).
update_partitions_count_cache(Ets, Rest, UnknownTopicCacheTtl).

%% Get partition counter from cache.
%% If cache is not hit, send meta data request to retrieve.
Expand Down Expand Up @@ -893,9 +897,8 @@ lookup_partitions_count_cache(Ets, Topic) ->
try ets:lookup(Ets, ?TOPIC_METADATA_KEY(Topic)) of
[{_, Count, _Ts}] when is_integer(Count) ->
{ok, Count};
[{_, {error, Reason}, Ts}] ->
case timer:now_diff(os:timestamp(), Ts) =<
?UNKNOWN_TOPIC_CACHE_EXPIRE_SECONDS * 1000000 of
[{_, {error, Reason}, {expire, ExpireTs}}] when is_integer(ExpireTs) ->
case is_expired(ExpireTs) of
true -> {error, Reason};
false -> false
end;
Expand Down Expand Up @@ -987,6 +990,15 @@ safe_gen_call(Server, Call, Timeout) ->
-spec kf(kpro:field_name(), kpro:struct()) -> kpro:field_value().
kf(FieldName, Struct) -> kpro:find(FieldName, Struct).

-spec expire_ts(integer()) -> integer().
expire_ts(Ttl) -> now_ts() + Ttl.

-spec is_expired(integer()) -> boolean().
is_expired(ExpireTs) -> now_ts() - ExpireTs < 0.

-spec now_ts() -> integer().
now_ts() -> erlang:monotonic_time(millisecond).

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
Expand Down
17 changes: 17 additions & 0 deletions test/brod_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
, t_sasl_plain_file_ssl/1
, t_sasl_callback/1
, t_magic_version/1
, t_get_partitions_count_configure_cache_ttl/1
, t_get_partitions_count_safe/1
, t_double_stop_consumer/1
]).
Expand Down Expand Up @@ -125,6 +126,22 @@ t_get_partitions_count_safe(Config) when is_list(Config) ->
?assertMatch({error, unknown_topic_or_partition}, Res2),
ok = brod:stop_client(Client).


t_get_partitions_count_configure_cache_ttl(Config) when is_list(Config) ->
Client = ?FUNCTION_NAME,
ClientConfig = [{unknown_topic_cache_ttl, 100}],
ok = start_client(?HOSTS, Client, ClientConfig),
Topic = <<"unknown-topic-001">>,
Res = brod:get_partitions_count_safe(Client, Topic),
?assertMatch({error, unknown_topic_or_partition}, Res),
Res2 = brod_client:lookup_partitions_count_cache(Client, Topic),
?assertMatch({error, unknown_topic_or_partition}, Res2),
timer:sleep(101),
Res3 = brod_client:lookup_partitions_count_cache(Client, Topic),
?assertMatch(false, Res3),
ok = brod:stop_client(Client).


t_skip_unreachable_endpoint(Config) when is_list(Config) ->
Client = t_skip_unreachable_endpoint,
ok = start_client([{"localhost", 8192} | ?HOSTS], Client),
Expand Down

0 comments on commit 40a6ef6

Please sign in to comment.