Skip to content

Commit

Permalink
Make Elasticsearch consumer as auto-config
Browse files Browse the repository at this point in the history
* Fix all its Checkstyle violations
* Fix Checkstyle violations in the `spring-kafka-publisher`
  • Loading branch information
artembilan committed Jan 9, 2024
1 parent 0e7e010 commit 68e0990
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 196 deletions.
4 changes: 2 additions & 2 deletions consumer/spring-elasticsearch-consumer/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ A consumer that allows you to index document records into Elasticsearch.

## Beans for injection

You can import `ElasticsearchConsumerConfiguration` in the application and then inject the following bean.
The `ElasticsearchConsumerConfiguration` auto-configuration provides the following bean:

`Consumer<Message<?>> elasticsearchConsumer`

Expand All @@ -23,7 +23,7 @@ All configuration properties are prefixed with `elasticsearch.consumer`.
For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/consumer/elasticsearch/ElasticsearchConsumerConfiguration.java[ElasticsearchConsumerProperties].

In addition to these options, the consumer makes use of Spring Boot autoconfiguration for Elasticsearch.
Therefore you need to use https://github.com/spring-projects/spring-boot/blob/master/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/elasticsearch/ElasticsearchRestClientProperties.java[these properties] for configuration Elasticsearch.
Therefore, you need to use https://github.com/spring-projects/spring-boot/blob/master/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/elasticsearch/ElasticsearchRestClientProperties.java[these properties] for configuration Elasticsearch.
See this https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#boot-features-elasticsearch[section] from Spring Boot docs.

## Examples
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,14 +39,15 @@

import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchClientAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor;
import org.springframework.integration.aggregator.MessageCountReleaseStrategy;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.config.AggregatorFactoryBean;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
Expand All @@ -57,14 +58,18 @@
import org.springframework.util.StringUtils;

/**
* Auto-configuration for Elasticsearch.
*
* @author Soby Chacko
* @author Andrea Montemaggio
* @author Corneil du Plessis
*/
@Configuration
@AutoConfiguration(after = ElasticsearchClientAutoConfiguration.class)
@EnableConfigurationProperties(ElasticsearchConsumerProperties.class)
public class ElasticsearchConsumerConfiguration {

private static final Log LOGGER = LogFactory.getLog(ElasticsearchConsumerConfiguration.class);

/**
* Message header for the Index id.
*/
Expand All @@ -75,13 +80,12 @@ public class ElasticsearchConsumerConfiguration {
*/
public static final String INDEX_NAME_HEADER = "INDEX_NAME";

private static final Log logger = LogFactory.getLog(ElasticsearchConsumerConfiguration.class);

@Bean
FactoryBean<MessageHandler> aggregator(MessageGroupStore messageGroupStore,
ElasticsearchConsumerProperties consumerProperties) {

AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setCorrelationStrategy(message -> "");
aggregatorFactoryBean.setCorrelationStrategy((message) -> "");
aggregatorFactoryBean.setReleaseStrategy(new MessageCountReleaseStrategy(consumerProperties.getBatchSize()));
if (consumerProperties.getGroupTimeout() >= 0) {
aggregatorFactoryBean
Expand Down Expand Up @@ -126,40 +130,43 @@ MessageGroupStore messageGroupStore() {
IntegrationFlow elasticsearchConsumerFlow(@Qualifier("aggregator") MessageHandler aggregator,
ElasticsearchConsumerProperties properties, @Qualifier("indexingHandler") MessageHandler indexingHandler) {

final IntegrationFlowBuilder builder = IntegrationFlow.from(MessageConsumer.class,
gateway -> gateway.beanName("elasticsearchConsumer"));
if (properties.getBatchSize() > 1) {
builder.handle(aggregator);
}
return builder.handle(indexingHandler).get();
return (flow) -> {
if (properties.getBatchSize() > 1) {
flow.handle(aggregator);
}
flow.handle(indexingHandler);
};
}

@Bean
public MessageHandler indexingHandler(ElasticsearchClient elasticsearchClient,
ElasticsearchConsumerProperties consumerProperties) {
return message -> {
if (message.getPayload() instanceof Iterable) {

return (message) -> {
if (message.getPayload() instanceof Iterable<?> iterable) {
BulkRequest.Builder builder = new BulkRequest.Builder();
StreamSupport.stream(((Iterable<?>) message.getPayload()).spliterator(), false)
StreamSupport.stream(iterable.spliterator(), false)
.filter(MessageWrapper.class::isInstance)
.map(itemPayload -> ((MessageWrapper) itemPayload).getMessage())
.map(m -> buildIndexRequest(m, consumerProperties))
.forEach(indexRequest -> builder
.operations(builder1 -> builder1.index(idx -> idx.index(indexRequest.index())
.map((itemPayload) -> ((MessageWrapper) itemPayload).message())
.map((m) -> buildIndexRequest(m, consumerProperties))
.forEach((indexRequest) -> builder
.operations((builder1) -> builder1.index((idx) -> idx.index(indexRequest.index())
.id(indexRequest.id())
.document(indexRequest.document()))));

index(elasticsearchClient, builder.build(), consumerProperties.isAsync());
}
else {
IndexRequest request = buildIndexRequest(message, consumerProperties);
IndexRequest<?> request = buildIndexRequest(message, consumerProperties);
index(elasticsearchClient, request, consumerProperties.isAsync());
}
};
}

private IndexRequest buildIndexRequest(Message<?> message, ElasticsearchConsumerProperties consumerProperties) {
IndexRequest.Builder requestBuilder = new IndexRequest.Builder();
private IndexRequest<Object> buildIndexRequest(Message<?> message,
ElasticsearchConsumerProperties consumerProperties) {

IndexRequest.Builder<Object> requestBuilder = new IndexRequest.Builder<>();

String index = consumerProperties.getIndex();
if (message.getHeaders().containsKey(INDEX_NAME_HEADER)) {
Expand Down Expand Up @@ -214,14 +221,14 @@ private void index(ElasticsearchClient elasticsearchClient, BulkRequest request,
BulkResponse bulkResponse = elasticsearchClient.bulk(request);
handleBulkResponse(bulkResponse);
}
catch (IOException e) {
catch (IOException ex) {
throw new IllegalStateException(
"Error occurred while performing bulk index operation: " + e.getMessage(), e);
"Error occurred while performing bulk index operation: " + ex.getMessage(), ex);
}
}
}

private void index(ElasticsearchClient elasticsearchClient, IndexRequest request, boolean isAsync) {
private void index(ElasticsearchClient elasticsearchClient, IndexRequest<?> request, boolean isAsync) {
if (isAsync) {
ElasticsearchAsyncClient elasticsearchAsyncClient = new ElasticsearchAsyncClient(
elasticsearchClient._transport());
Expand All @@ -240,35 +247,35 @@ private void index(ElasticsearchClient elasticsearchClient, IndexRequest request
IndexResponse response = elasticsearchClient.index(request);
handleResponse(response);
}
catch (IOException e) {
throw new IllegalStateException("Error occurred while indexing document: " + e.getMessage(), e);
catch (IOException ex) {
throw new IllegalStateException("Error occurred while indexing document: " + ex.getMessage(), ex);
}
}
}

private void handleBulkResponse(BulkResponse response) {
if (logger.isDebugEnabled() || response.errors()) {
if (LOGGER.isDebugEnabled() || response.errors()) {
for (BulkResponseItem itemResponse : response.items()) {
if (itemResponse.error() != null) {
if (logger.isDebugEnabled()) {
logger.debug("itemResponse.error=" + itemResponse.error());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("itemResponse.error=" + itemResponse.error());
}
logger.error(String.format("Index operation [id=%s, index=%s] failed: %s", itemResponse.id(),
LOGGER.error(String.format("Index operation [id=%s, index=%s] failed: %s", itemResponse.id(),
itemResponse.index(), itemResponse.error().toString()));
}
else {
var r = itemResponse.get();
if (r != null) {
if (logger.isDebugEnabled()) {
logger.debug("itemResponse:" + r);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("itemResponse:" + r);
}
logger.debug(String.format(
LOGGER.debug(String.format(
"Index operation [id=%s, index=%s] succeeded: document [id=%s, version=%s] was written on shard %s.",
itemResponse.id(), itemResponse.index(), r.source().get("id"),
r.source().get("version"), r.source().get("shardId")));
}
else {
logger.debug(String.format("Index operation [id=%s, index=%s] succeeded", itemResponse.id(),
LOGGER.debug(String.format("Index operation [id=%s, index=%s] succeeded", itemResponse.id(),
itemResponse.index()));
}
}
Expand All @@ -278,34 +285,25 @@ private void handleBulkResponse(BulkResponse response) {
if (response.errors()) {
String error = response.items()
.stream()
.map(bulkResponseItem -> bulkResponseItem.error() != null ? bulkResponseItem.error().toString() : "")
.reduce((errorCause, errorCause2) -> errorCause != null ? errorCause + " : " + errorCause2
: errorCause2)
.map((bulkResponseItem) -> (bulkResponseItem.error() != null) ? bulkResponseItem.error().toString()
: "")
.reduce((errorCause, errorCause2) -> errorCause + " : " + errorCause2)
.orElseGet(response::toString);
throw new IllegalStateException("Bulk indexing operation completed with failures: " + error);
}
}

private void handleResponse(IndexResponse response) {
logger.debug(String.format(
LOGGER.debug(String.format(
"Index operation [index=%s] succeeded: document [id=%s, version=%d] was written on shard %s.",
response.index(), response.id(), response.version(), response.shards().toString()));
}

static class MessageWrapper {

private final Message<?> message;

MessageWrapper(Message<?> message) {
this.message = message;
}

public Message<?> getMessage() {
return message;
}
record MessageWrapper(Message<?> message) {

}

@MessagingGateway(name = "elasticsearchConsumer", defaultRequestChannel = "elasticsearchConsumerFlow.input")
private interface MessageConsumer extends Consumer<Message<?>> {

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,8 @@
import org.springframework.expression.Expression;

/**
* The Elasticsearch consumer configuration properties.
*
* @author Soby Chacko
* @author Andrea Montemaggio
*/
Expand All @@ -28,13 +30,13 @@ public class ElasticsearchConsumerProperties {

/**
* The id of the document to index. If set, the INDEX_ID header value overrides this
* property on a per message basis.
* property on a per-message basis.
*/
Expression id;

/**
* Name of the index. If set, the INDEX_NAME header value overrides this property on a
* per message basis.
* per-message basis.
*/
String index;

Expand All @@ -51,7 +53,7 @@ public class ElasticsearchConsumerProperties {
long timeoutSeconds;

/**
* Indicates whether the indexing operation is async or not. By default indexing is
* Indicates whether the indexing operation is async or not. By default, indexing is
* done synchronously.
*/
boolean async;
Expand All @@ -70,55 +72,55 @@ public class ElasticsearchConsumerProperties {
long groupTimeout = -1L;

public Expression getId() {
return id;
return this.id;
}

public void setId(Expression id) {
this.id = id;
}

public String getIndex() {
return index;
return this.index;
}

public void setIndex(String index) {
this.index = index;
}

public String getRouting() {
return routing;
return this.routing;
}

public void setRouting(String routing) {
this.routing = routing;
}

public long getTimeoutSeconds() {
return timeoutSeconds;
return this.timeoutSeconds;
}

public void setTimeoutSeconds(long timeoutSeconds) {
this.timeoutSeconds = timeoutSeconds;
}

public boolean isAsync() {
return async;
return this.async;
}

public void setAsync(boolean async) {
this.async = async;
}

public int getBatchSize() {
return batchSize;
return this.batchSize;
}

public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}

public long getGroupTimeout() {
return groupTimeout;
return this.groupTimeout;
}

public void setGroupTimeout(long groupTimeout) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* The Elasticsearch consumer auto-configuration.
*/
package org.springframework.cloud.fn.consumer.elasticsearch;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.springframework.cloud.fn.consumer.elasticsearch.ElasticsearchConsumerConfiguration
Loading

0 comments on commit 68e0990

Please sign in to comment.