Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs improvements #610

Merged
merged 2 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion guides/examples/elixir/Consumer.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ defmodule BrodSample.PartitionSubscriber do
@impl true
def init({topic, partition}) do
# start the consumer(s)
# if you have more than one partition, do it somewhere else once for all paritions
# if you have more than one partition, do it somewhere else once for all partitions
# (e.g. in the parent process)
:ok = :brod.start_consumer(:kafka_client, topic, begin_offset: :latest)

Expand Down
18 changes: 9 additions & 9 deletions src/brod_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ get_producer(Client, Topic, Partition) ->
Error
end.

%% @doc Get consumer of the given topic-parition.
%% @doc Get consumer of the given topic-partition.
-spec get_consumer(client(), topic(), partition()) ->
{ok, pid()} | {error, get_consumer_error()}.
get_consumer(Client, Topic, Partition) ->
Expand Down Expand Up @@ -255,7 +255,7 @@ get_metadata(Client, Topic) ->
-spec get_metadata_safe(client(), topic()) ->
{ok, kpro:struct()} | {error, any()}.
get_metadata_safe(Client, Topic) ->
safe_gen_call(Client, {get_metadata, {_FetchMetdataForTopic = all, Topic}}, infinity).
safe_gen_call(Client, {get_metadata, {_FetchMetadataForTopic = all, Topic}}, infinity).

%% @doc Get number of partitions for a given topic.
-spec get_partitions_count(client(), topic()) -> {ok, pos_integer()} | {error, any()}.
Expand Down Expand Up @@ -610,11 +610,11 @@ do_get_metadata({all, Topic}, State) ->
do_get_metadata(Topic, State) when not is_tuple(Topic) ->
do_get_metadata(Topic, Topic, State).

do_get_metadata(FetchMetdataFor, Topic,
do_get_metadata(FetchMetadataFor, Topic,
#state{ client_id = ClientId
, workers_tab = Ets
} = State0) ->
Topics = case FetchMetdataFor of
Topics = case FetchMetadataFor of
all -> all; %% in case no topic is given, get all
_ -> [Topic]
end,
Expand Down Expand Up @@ -865,21 +865,21 @@ do_get_partitions_count(Client, Ets, Topic, #{allow_topic_auto_creation := Allow
get_metadata_safe(Client, Topic)
end,
%% the metadata is returned, no need to look up the cache again
%% find the topic's parition count from the metadata directly
%% find the topic's partition count from the metadata directly
find_partition_count_in_metadata(MetadataResponse, Topic)
end.

find_partition_count_in_metadata({ok, Meta}, Topic) ->
TopicMetadataArrary = kf(topics, Meta),
find_partition_count_in_topic_metadata_array(TopicMetadataArrary, Topic);
TopicMetadataArray = kf(topics, Meta),
find_partition_count_in_topic_metadata_array(TopicMetadataArray, Topic);
find_partition_count_in_metadata({error, Reason}, _) ->
{error, Reason}.

find_partition_count_in_topic_metadata_array(TopicMetadataArrary, Topic) ->
find_partition_count_in_topic_metadata_array(TopicMetadataArray, Topic) ->
FilterF = fun(#{name := N}) when N =:= Topic -> true;
(_) -> false
end,
case lists:filter(FilterF, TopicMetadataArrary) of
case lists:filter(FilterF, TopicMetadataArray) of
[TopicMetadata] ->
get_partitions_count_in_metadata(TopicMetadata);
[] ->
Expand Down
2 changes: 1 addition & 1 deletion src/brod_group_subscriber_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@
%% brod_group_coordinator:start_link/6} Optional</li>
%%
%% <li>`consumer_config': For partition consumer,
%% {@link brod_topic_subscriber:start_link/6}. Optional
%% {@link brod_consumer:start_link/5}. Optional
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it should be brod_topic_subscriber:start_link/7 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure that consumer_config option accepts config that is passed to brod_consumer:start_link/5, but here in brod_group_consumer_v2 a link leads to brod_topic_subscriber where other irrelevant options are described.

Please, compare brod_consumer config:
https://hexdocs.pm/brod/brod_consumer.html#start_link/5
WIth brod_topic_subscriber config:
https://hexdocs.pm/brod/brod_topic_subscriber.html#start_link/1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumer_group is used here:

start_worker(Client, Topic, MessageType, Partition, ConsumerConfig,
StartOptions) ->
{ok, Pid} = brod_topic_subscriber:start_link( Client
, Topic
, [Partition]
, ConsumerConfig
, MessageType
, brod_group_subscriber_worker
, StartOptions
),
{ok, Pid}.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I am trying to say is that the link in the description of consumer_config option does not lead to a description of said options, which confused me a bit.

consumer_config options are eventually passed to brod_consumer:start_link/5 and I believe there should be a link to this function, because they are described in it's documentation.

But currently there is a link to brod_topic_subscriber:start_link/1 where docs describe only config of brod_topic_subscriber. consumer_config is one of the options in brod_topic_subscriber's config, but even there a link leads to brod_topic_subscriber itself. I'm going to fix this too in this PR.

In addition, in other places like in brod_group_subscriber description of ConsumerConfig option has a link to brod_consumer docs as I'm suggesting.

%% </li>
%%
%% <li>`message_type': The type of message that is going to be handled
Expand Down
8 changes: 4 additions & 4 deletions src/brod_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@
-include("brod_int.hrl").

%% default number of messages in buffer before block callers
-define(DEFAULT_PARITION_BUFFER_LIMIT, 512).
-define(DEFAULT_PARTITION_BUFFER_LIMIT, 512).
%% default number of message sets sent on wire before block waiting for acks
-define(DEFAULT_PARITION_ONWIRE_LIMIT, 1).
-define(DEFAULT_PARTITION_ONWIRE_LIMIT, 1).
%% by default, send max 1 MB of data in one batch (message set)
-define(DEFAULT_MAX_BATCH_SIZE, 1048576).
%% by default, require acks from all ISR
Expand Down Expand Up @@ -288,8 +288,8 @@ stop(Pid) -> ok = gen_server:call(Pid, stop).
%% @private
init({ClientPid, Topic, Partition, Config}) ->
erlang:process_flag(trap_exit, true),
BufferLimit = ?config(partition_buffer_limit, ?DEFAULT_PARITION_BUFFER_LIMIT),
OnWireLimit = ?config(partition_onwire_limit, ?DEFAULT_PARITION_ONWIRE_LIMIT),
BufferLimit = ?config(partition_buffer_limit, ?DEFAULT_PARTITION_BUFFER_LIMIT),
OnWireLimit = ?config(partition_onwire_limit, ?DEFAULT_PARTITION_ONWIRE_LIMIT),
MaxBatchSize = ?config(max_batch_size, ?DEFAULT_MAX_BATCH_SIZE),
MaxRetries = ?config(max_retries, ?DEFAULT_MAX_RETRIES),
RetryBackoffMs = ?config(retry_backoff_ms, ?DEFAULT_RETRY_BACKOFF_MS),
Expand Down
2 changes: 1 addition & 1 deletion src/brod_topic_subscriber.erl
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ start_link(Client, Topic, Partitions, ConsumerConfig,
%% functions implemented for message processing. Mandatory</li>
%%
%% <li>`consumer_config': For partition consumer, {@link
%% brod_topic_subscriber:start_link/6}. Optional, defaults to `[]'
%% brod_consumer:start_link/5}. Optional, defaults to `[]'
%% </li>
%%
%% <li>`message_type': The type of message that is going to be handled
Expand Down