Skip to content

Commit

Permalink
provide consumer example
Browse files Browse the repository at this point in the history
issue #73
  • Loading branch information
rsoika committed Apr 18, 2019
1 parent c9d45e1 commit 034e7cb
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 25 deletions.
8 changes: 4 additions & 4 deletions imixs-adapters-kafka/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ services:
- "8787:8787"

# Imixs Admin Client
imixsadmin:
image: imixs/imixs-admin
ports:
- "8888:8080"
# imixsadmin:
# image: imixs/imixs-admin
# ports:
# - "8888:8080"


#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,17 @@
@ConcurrencyManagement(ConcurrencyManagementType.BEAN)
public class ConsumerService implements Serializable {

public static String KAFKA_BROKERS = "localhost:9092";
public static Integer MESSAGE_COUNT = 1000;
public static String CLIENT_ID = "client1";


public static String TOPIC_NAME = "1.0.1"; // just an example
public static String TOPIC_NAME = "IN-1.0.1"; // just an example
public static String GROUP_ID_CONFIG = "consumerGroup1";
public static Integer MAX_NO_MESSAGE_FOUND_COUNT = 100;
public static String OFFSET_RESET_LATEST = "latest";
public static String OFFSET_RESET_EARLIER = "earliest";
public static Integer MAX_POLL_RECORDS = 1;

private static final long serialVersionUID = 1L;
@SuppressWarnings("unused")
private static Logger logger = Logger.getLogger(ConsumerService.class.getName());


Consumer<Long, String> consumer;

/**
Expand Down Expand Up @@ -77,26 +71,23 @@ public class ConsumerService implements Serializable {
void init() {

Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
ConfigService.getEnv(ConfigService.ENV_KAFKA_BROKERS, "kafka:9092"));
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);

props.put(ProducerConfig.CLIENT_ID_CONFIG, ConfigService.getEnv(ConfigService.ENV_KAFKA_CLIENTID, "Imixs-Workflow-1"));


props.put(ProducerConfig.CLIENT_ID_CONFIG,
ConfigService.getEnv(ConfigService.ENV_KAFKA_CLIENTID, "Imixs-Workflow-1"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET_EARLIER);

consumer = new KafkaConsumer<>(props);



// here we need to subsribe the topics!
// logger.info("...register topic: " + TOPIC_NAME);
// consumer.subscribe(Collections.singletonList(TOPIC_NAME));
// runConsumer();
logger.info("...register topic: " + TOPIC_NAME);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
runConsumer();
}

void runConsumer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class TestProducer {

@Before
public void setup() {
Properties props = new Properties();
Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094");

Expand All @@ -58,10 +58,10 @@ public void teardown() {
/**
* test the fieldlist of the first line of the file
*/
@Test
@Test
public void testSendMessages() {

ProducerRecord<Long, String> record = new ProducerRecord<Long, String>("1.0.1", "some data test...");
ProducerRecord<Long, String> record = new ProducerRecord<Long, String>("IN-1.0.1", "some data test Hey!...");

try {
RecordMetadata metadata = producer.send(record).get();
Expand Down

0 comments on commit 034e7cb

Please sign in to comment.