Skip to content

Commit

Permalink
write a line into test.file
Browse files Browse the repository at this point in the history
  • Loading branch information
huifer committed Aug 5, 2023
1 parent 17a6251 commit c6a8b69
Showing 1 changed file with 75 additions and 0 deletions.
75 changes: 75 additions & 0 deletions docs/spring/cs48e655f2-336a-11ee-b7f8-acde48001122.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.huifer.springboot.kafka.consumer.bean;

import com.huifer.springboot.kafka.consumer.service.KafkaConsumerMessageListener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

/**
* <p>Title : KafkaConsumerConfig </p>
* <p>Description : </p>
*
* @author huifer
* @date 2019-06-19
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

@Value("${kafka.consumer.bootstrap-servers}")
private String bootstrap_servers;
@Value("${kafka.consumer.group.id}")
private String group_id;
@Value("${kafka.consumer.enable.auto.commit}")
private String enable_auto_commit;
@Value("${kafka.consumer.auto.commit.interval.ms}")
private String auto_commit_interval_ms;
@Value("${kafka.consumer.key.deserializer}")
private String key_deserializer;
@Value("${kafka.consumer.value.deserializer}")
private String value_deserializer;
@Value("${kafka.consumer.auto.offset.reset}")
private String reset;

public Map config() {
Map<String, Object> conf = new HashMap<>();
conf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
conf.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
conf.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enable_auto_commit);
conf.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, auto_commit_interval_ms);
conf.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer);
conf.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, value_deserializer);
conf.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, reset);
return conf;
}


public ConsumerFactory<Object, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(config());
}

@Bean
public KafkaConsumerMessageListener listener() {
return new KafkaConsumerMessageListener();
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}


}

0 comments on commit c6a8b69

Please sign in to comment.