From 03bf9084ab3e8f79781323b898d2c844fa9c9dbb Mon Sep 17 00:00:00 2001 From: Matvei Karpov Date: Mon, 25 Nov 2024 09:09:20 +0300 Subject: [PATCH 1/2] Docs improvements --- guides/examples/elixir/Consumer.md | 2 +- src/brod_client.erl | 18 +++++++++--------- src/brod_group_subscriber_v2.erl | 2 +- src/brod_producer.erl | 8 ++++---- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/guides/examples/elixir/Consumer.md b/guides/examples/elixir/Consumer.md index cd020fcd..e513f8b3 100644 --- a/guides/examples/elixir/Consumer.md +++ b/guides/examples/elixir/Consumer.md @@ -108,7 +108,7 @@ defmodule BrodSample.PartitionSubscriber do @impl true def init({topic, partition}) do # start the consumer(s) - # if you have more than one partition, do it somewhere else once for all paritions + # if you have more than one partition, do it somewhere else once for all partitions # (e.g. in the parent process) :ok = :brod.start_consumer(:kafka_client, topic, begin_offset: :latest) diff --git a/src/brod_client.erl b/src/brod_client.erl index b708578c..8ebe455d 100644 --- a/src/brod_client.erl +++ b/src/brod_client.erl @@ -168,7 +168,7 @@ get_producer(Client, Topic, Partition) -> Error end. -%% @doc Get consumer of the given topic-parition. +%% @doc Get consumer of the given topic-partition. -spec get_consumer(client(), topic(), partition()) -> {ok, pid()} | {error, get_consumer_error()}. get_consumer(Client, Topic, Partition) -> @@ -255,7 +255,7 @@ get_metadata(Client, Topic) -> -spec get_metadata_safe(client(), topic()) -> {ok, kpro:struct()} | {error, any()}. get_metadata_safe(Client, Topic) -> - safe_gen_call(Client, {get_metadata, {_FetchMetdataForTopic = all, Topic}}, infinity). + safe_gen_call(Client, {get_metadata, {_FetchMetadataForTopic = all, Topic}}, infinity). %% @doc Get number of partitions for a given topic. -spec get_partitions_count(client(), topic()) -> {ok, pos_integer()} | {error, any()}. @@ -610,11 +610,11 @@ do_get_metadata({all, Topic}, State) -> do_get_metadata(Topic, State) when not is_tuple(Topic) -> do_get_metadata(Topic, Topic, State). -do_get_metadata(FetchMetdataFor, Topic, +do_get_metadata(FetchMetadataFor, Topic, #state{ client_id = ClientId , workers_tab = Ets } = State0) -> - Topics = case FetchMetdataFor of + Topics = case FetchMetadataFor of all -> all; %% in case no topic is given, get all _ -> [Topic] end, @@ -865,21 +865,21 @@ do_get_partitions_count(Client, Ets, Topic, #{allow_topic_auto_creation := Allow get_metadata_safe(Client, Topic) end, %% the metadata is returned, no need to look up the cache again - %% find the topic's parition count from the metadata directly + %% find the topic's partition count from the metadata directly find_partition_count_in_metadata(MetadataResponse, Topic) end. find_partition_count_in_metadata({ok, Meta}, Topic) -> - TopicMetadataArrary = kf(topics, Meta), - find_partition_count_in_topic_metadata_array(TopicMetadataArrary, Topic); + TopicMetadataArray = kf(topics, Meta), + find_partition_count_in_topic_metadata_array(TopicMetadataArray, Topic); find_partition_count_in_metadata({error, Reason}, _) -> {error, Reason}. -find_partition_count_in_topic_metadata_array(TopicMetadataArrary, Topic) -> +find_partition_count_in_topic_metadata_array(TopicMetadataArray, Topic) -> FilterF = fun(#{name := N}) when N =:= Topic -> true; (_) -> false end, - case lists:filter(FilterF, TopicMetadataArrary) of + case lists:filter(FilterF, TopicMetadataArray) of [TopicMetadata] -> get_partitions_count_in_metadata(TopicMetadata); [] -> diff --git a/src/brod_group_subscriber_v2.erl b/src/brod_group_subscriber_v2.erl index 47ece4b0..e4eb4537 100644 --- a/src/brod_group_subscriber_v2.erl +++ b/src/brod_group_subscriber_v2.erl @@ -159,7 +159,7 @@ %% brod_group_coordinator:start_link/6} Optional %% %%
  • `consumer_config': For partition consumer, -%% {@link brod_topic_subscriber:start_link/6}. Optional +%% {@link brod_consumer:start_link/5}. Optional %%
  • %% %%
  • `message_type': The type of message that is going to be handled diff --git a/src/brod_producer.erl b/src/brod_producer.erl index 1cd55583..c6c389a1 100644 --- a/src/brod_producer.erl +++ b/src/brod_producer.erl @@ -54,9 +54,9 @@ -include("brod_int.hrl"). %% default number of messages in buffer before block callers --define(DEFAULT_PARITION_BUFFER_LIMIT, 512). +-define(DEFAULT_PARTITION_BUFFER_LIMIT, 512). %% default number of message sets sent on wire before block waiting for acks --define(DEFAULT_PARITION_ONWIRE_LIMIT, 1). +-define(DEFAULT_PARTITION_ONWIRE_LIMIT, 1). %% by default, send max 1 MB of data in one batch (message set) -define(DEFAULT_MAX_BATCH_SIZE, 1048576). %% by default, require acks from all ISR @@ -288,8 +288,8 @@ stop(Pid) -> ok = gen_server:call(Pid, stop). %% @private init({ClientPid, Topic, Partition, Config}) -> erlang:process_flag(trap_exit, true), - BufferLimit = ?config(partition_buffer_limit, ?DEFAULT_PARITION_BUFFER_LIMIT), - OnWireLimit = ?config(partition_onwire_limit, ?DEFAULT_PARITION_ONWIRE_LIMIT), + BufferLimit = ?config(partition_buffer_limit, ?DEFAULT_PARTITION_BUFFER_LIMIT), + OnWireLimit = ?config(partition_onwire_limit, ?DEFAULT_PARTITION_ONWIRE_LIMIT), MaxBatchSize = ?config(max_batch_size, ?DEFAULT_MAX_BATCH_SIZE), MaxRetries = ?config(max_retries, ?DEFAULT_MAX_RETRIES), RetryBackoffMs = ?config(retry_backoff_ms, ?DEFAULT_RETRY_BACKOFF_MS), From 831da7492cc1a01abf3395d0810817435006512e Mon Sep 17 00:00:00 2001 From: Matvey Karpov Date: Sun, 1 Dec 2024 23:20:14 +0300 Subject: [PATCH 2/2] FIX: Link to consumer config in brod_topic_subscriber --- src/brod_topic_subscriber.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/brod_topic_subscriber.erl b/src/brod_topic_subscriber.erl index 9b8add98..dfea8aa3 100644 --- a/src/brod_topic_subscriber.erl +++ b/src/brod_topic_subscriber.erl @@ -234,7 +234,7 @@ start_link(Client, Topic, Partitions, ConsumerConfig, %% functions implemented for message processing. Mandatory
  • %% %%
  • `consumer_config': For partition consumer, {@link -%% brod_topic_subscriber:start_link/6}. Optional, defaults to `[]' +%% brod_consumer:start_link/5}. Optional, defaults to `[]' %%
  • %% %%
  • `message_type': The type of message that is going to be handled