-
Notifications
You must be signed in to change notification settings - Fork 140
Home
Kafka Standalone Consumer will read the messages from Kafka, processes and index them in ElasticSearch.
-
Kafka has a topic named, say
Topic_1
-
Lets say,
Topic_1
has 5 partitions. -
Now, there is a needed to read, process the messages from Kafka and ElasticSearch
-
In order to do that, have 5 Config Files and start 5 instances of this Standalone Consumer by tying each config file to the respective Consumer Instance.
-
Now, we will have 5 Consumer Standalone Daemons running, listening & processing messages from each partition of
Topic_1
in Kafka. -
When there is a new partitions(say
6th partition
) in the sameTopic_1
, then start a new Consumer Daemon instance pointing to the new partition(i.e: 6th partition
) -
This way, there is a clear way of subscribing and processing messages from multiple partitions across multiple topics using this Stand alone Consumer.
1. Download the code. Let's say, $CONSUMER_HOME
contains the code.
2. From the $CONSUMER_HOME
, build the maven project. - this step will create the JAR file of the Consumer and the dependency files in the $CONSUMER_HOME/bin
directory
mvn clean package
3. Create a config file for the Consumer Instance and provide all necessary properties. - Use the existing Config file $CONSUMER_HOME/config/kafkaESConsumer.properties
as template.
cp $CONSUMER_HOME/config/kafkaESConsumer.properties $CONSUMER_HOME/config/<consumerGroupName><topicName><PartitionNum>.properties
vi $CONSUMER_HOME/config/<consumerGroupName><topicName><PartitionNum>.properties - Edit & provide the correct config details.
The details & guide for each property in the config file is given in the property file itself.
It is IMPORTANT to SPECIFY 1 UNIQUE LOG PROPERTY FILE(using the below property) FOR EACH CONSUMER INSTANCE in the respective config file to have logging happen in separate log file for each consumer instance.
#Log property file for the consumer instance. One instance of consumer should have 1 log file.
logPropertyFile=log4j<consumerGroupName><topicName><PartitionNum>.properties
4. Start the Consumer as follows:
cd $CONSUMER_HOME/scripts
./consumer.sh -p start -c $CONSUMER_HOME/config/<consumerGroupName><topicName><PartitionNum>.properties
# ' -p ' - Can take either start | stop | restart
# ' -c ' - the config file for the consumer instance with path
# (e.g: '$CONSUMER_HOME/config/<consumerGroupName><topicName><PartitionNum>.properties')
5. Confirm the successful start of the Consumer by looking into:
The below log file contains INFO during starting, restarting & stopping the Consumer Instance.
#'$consumerGroupName,$topic,$partition' - properties as defined in the consumer instances's config file (i.e: '<consumerGroupName><topicName><PartitionNum>.properties' in this example
vi $CONSUMER_HOME/processLogs/<$consumerGroupName>_<$topic>_<$partition>.out
The below log file contains ERROR during starting, restarting & stopping the Consumer Instance.
#'$consumerGroupName,$topic,$partition' - properties as defined in the consumer instances's config file (i.e: '<consumerGroupName><topicName><PartitionNum>.properties' in this example
vi $CONSUMER_HOME/processLogs/<$consumerGroupName>_<$topic>_<$partition>.err
6. Monitor the processing in the log file defined by the following property in the Consumer's Respective Config file.
(i.e: in this example, $CONSUMER_HOME/config/<consumerGroupName><topicName><PartitionNum>.properties
)
Property/Config name: logPropertyFile
7. To Stop the Consumer Instance:
cd $CONSUMER_HOME/scripts
./consumer.sh -p stop -c $CONSUMER_HOME/config/<consumerGroupName><topicName><PartitionNum>.properties
8. To Restart the Consumer Instance:
cd $CONSUMER_HOME/scripts
./consumer.sh -p restart -c $CONSUMER_HOME/config/<consumerGroupName><topicName><PartitionNum>.properties
The details of each config property can be seen in the template file (below)
Config File with details about each property
-
If the message in your Kafka has to handled in Raw
UTF-8
text, then you can use message handler classorg.elasticsearch.kafka.consumer.messageHandlers.RawMessageStringHandler
-
You can code your own Message Handler class by extending the abstract class
org.elasticsearch.kafka.consumer.MessageHandler
and implementing the methods:transformMessage()
&prepareForPostToElasticSearch()
. -
Also, if the message in your Kafka has to handled in Raw
UTF-8
text and you just want to change the way the raw message is transformed(into your desired format), then extend theorg.elasticsearch.kafka.consumer.messageHandlers.RawMessageStringHandler
class and override/implement thetransformMessage()
method alone. An example can be found here:org.elasticsearch.kafka.consumer.messageHandlers.AccessLogMessageHandler
-
Usually, its effective to Index the message in JSON format in ElasticSearch. This can be done using a Mapper Class and transforming the message from Kafka by overriding/implementing the
transformMessage()
method. An example can be found here:org.elasticsearch.kafka.consumer.messageHandlers.AccessLogMessageHandler
-
Do remember to set the newly created message handler class in the
messageHandlerClass
config property of the consumer instance.
kafka-elasticsearch-standalone-consumer
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.