Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-107: Make Splitter Function as Flux-based #128

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.integration.splitter.AbstractMessageSplitter;
Expand All @@ -51,7 +52,7 @@
public class SplitterFunctionConfiguration {

@Bean
public Function<Message<?>, List<Message<?>>> splitterFunction(
public Function<Flux<Message<?>>, Flux<Message<?>>> splitterFunction(
@Qualifier("expressionSplitter") Optional<AbstractMessageSplitter> expressionSplitter,
@Qualifier("fileSplitter") Optional<AbstractMessageSplitter> fileSplitter,
@Qualifier("defaultSplitter") Optional<AbstractMessageSplitter> defaultSplitter,
Expand All @@ -60,13 +61,13 @@ public Function<Message<?>, List<Message<?>>> splitterFunction(
AbstractMessageSplitter messageSplitter = expressionSplitter.or(() -> fileSplitter)
.or(() -> defaultSplitter)
.get();

messageSplitter.setApplySequence(splitterFunctionProperties.isApplySequence());
ThreadLocalFluxSinkMessageChannel outputChannel = new ThreadLocalFluxSinkMessageChannel();
FluxMessageChannel inputChannel = new FluxMessageChannel();
inputChannel.subscribe(messageSplitter);
FluxMessageChannel outputChannel = new FluxMessageChannel();
messageSplitter.setOutputChannel(outputChannel);
return (message) -> {
messageSplitter.handleMessage(message);
return outputChannel.publisherThreadLocal.get();
};
return (messageFlux) -> Flux.from(outputChannel).doOnRequest((l) -> inputChannel.subscribeTo(messageFlux));
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package org.springframework.cloud.fn.splitter;

import java.util.List;
import java.time.Duration;
import java.util.function.Function;

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
Expand All @@ -28,19 +30,18 @@
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest(properties = "splitter.expression=payload.split(',')")
@DirtiesContext
public class SplitterFunctionApplicationTests {

@Autowired
Function<Message<?>, List<Message<?>>> splitter;
Function<Flux<Message<?>>, Flux<Message<?>>> splitter;

@Test
public void testExpressionSplitter() {
List<Message<?>> messageList = this.splitter.apply(new GenericMessage<>("hello,world"));
assertThat(messageList).extracting((m) -> m.getPayload().toString()).contains("hello", "world");
Flux<Message<?>> messageFlux = this.splitter.apply(Flux.just(new GenericMessage<>("hello,world")));
Flux<String> payloads = messageFlux.map(Message::getPayload).map(Object::toString);
StepVerifier.create(payloads).expectNext("hello", "world").thenCancel().verify(Duration.ofSeconds(30));
}

@SpringBootApplication
Expand Down
5 changes: 1 addition & 4 deletions samples/zip-split-rabbit-binder/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ The second one is for `UnZipTransformer`, which we use for a custom function to
The `splitterFunction` is used in a `FileSplitter` mode to read lines from unzipped entries and emit each of them as an individual message.
Essentially, we are splitting twice: zip entries, and content of each file.

The composition is like this: `fileSupplier|unzipFunction|splitterFunction|flattenFunction`.
(The `flattenFunction` will be explained latter).
The composition is like this: `fileSupplier|unzipFunction|splitterFunction.
The result of this composition is a `Supplier<Flux<Mesage<?>>>` and we bind it into a RabbitMQ `unzipped_data_exchange` using Spring Cloud Stream.

For `fileSupplier` we provide these configuration properties:
Expand Down Expand Up @@ -49,8 +48,6 @@ Which is a trigger for that function to use a `FileSplitter` for zip entries to
The custom `ZipSplitRabbitBinderApplication.unzipFunction()` (might be a candidate for the future Functions Catalog version) uses `Flux` API to unzip polled files via `UnZipTransformer` and then `flatMapIterable()` for zip entries.
Then those entries are fed into a `splitterFunction` for `FileSplitter` mode.

The mention `ZipSplitRabbitBinderApplication.flattenFunction()` is needed for now here since `splitterFucntion` produces a `List<Message>` which cannot be https://docs.spring.io/spring-cloud-stream/reference/spring-cloud-stream/producing-and-consuming-messages.html#batch-producers[de-batched] by Spring Cloud Stream since our final product of the composition is, essentially, `Supplier<Flux<Message<?>>>`.

To run the application from main `ZipSplitRabbitBinderApplication` class (`./gradlew bootRun`), the RabbitMQ broker must be supplied on the target environment.

The test environment for this sample uses `org.springframework.boot:spring-boot-testcontainers` and `org.testcontainers:rabbitmq` to run RabbitMQ in Docker container and wire it properly into Spring Boot auto-configuration.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.example;

import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

Expand Down Expand Up @@ -34,10 +33,4 @@ Function<Flux<Message<File>>, Flux<File>> unzipFunction(UnZipTransformer unZipTr
.flatMapIterable(Map::values);
}

// TODO until 'splitterFunction' is fixed this way: https://github.com/spring-cloud/spring-functions-catalog/issues/107
@Bean
Function<Flux<Message<List<Message<?>>>>, Flux<Message<?>>> flattenFunction() {
return messageFlux -> messageFlux.map(Message::getPayload).flatMapIterable(Function.identity());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ spring:

cloud:
function:
definition: fileSupplier|unzipFunction|splitterFunction|flattenFunction
definition: fileSupplier|unzipFunction|splitterFunction

stream:
bindings:
fileSupplier|unzipFunction|splitterFunction|flattenFunction-out-0:
fileSupplier|unzipFunction|splitterFunction-out-0:
destination: unzipped_data_exchange

file:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.test.annotation.DirtiesContext;

Expand Down Expand Up @@ -47,11 +48,16 @@ void zippedFilesAreSplittedToRabbitBinding() throws InterruptedException {
}
}

@RabbitListener(bindings = @QueueBinding(value = @Queue,
exchange = @Exchange(value = "unzipped_data_exchange", type = ExchangeTypes.TOPIC), key = "#"))
void receiveDataFromSplittedZips(String payload) {
LOG.info("A line from zip entry: " + payload);
DATA_SINK.offer(payload);
@TestConfiguration
static class RabbitListenerTestConfiguration {

@RabbitListener(bindings = @QueueBinding(value = @Queue,
exchange = @Exchange(value = "unzipped_data_exchange", type = ExchangeTypes.TOPIC), key = "#"))
void receiveDataFromSplittedZips(String payload) {
LOG.info("A line from zip entry: " + payload);
DATA_SINK.offer(payload);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.springframework.cloud.fn.supplier.jdbc;

import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -30,10 +29,10 @@
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
import org.springframework.cloud.fn.splitter.SplitterFunctionConfiguration;
import org.springframework.cloud.function.context.PollableBean;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;

Expand All @@ -57,7 +56,7 @@ public JdbcSupplierConfiguration(JdbcSupplierProperties properties, DataSource d
}

@Bean
public MessageSource<Object> jdbcMessageSource(
public JdbcPollingChannelAdapter jdbcMessageSource(
@Nullable ComponentCustomizer<JdbcPollingChannelAdapter> jdbcPollingChannelAdapterCustomizer) {

JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter(this.dataSource,
Expand All @@ -71,21 +70,11 @@ public MessageSource<Object> jdbcMessageSource(
}

@Bean(name = "jdbcSupplier")
@PollableBean
@ConditionalOnProperty(prefix = "jdbc.supplier", name = "split", matchIfMissing = true)
public Supplier<Flux<Message<?>>> splittedSupplier(MessageSource<Object> jdbcMessageSource,
Function<Message<?>, List<Message<?>>> splitterFunction) {
public Supplier<Flux<Message<?>>> splittedSupplier(JdbcPollingChannelAdapter jdbcMessageSource,
Function<Flux<Message<Object>>, Flux<Message<?>>> splitterFunction) {

return () -> {
Message<?> received = jdbcMessageSource.receive();
if (received != null) {
// multiple Message<Map<String, Object>>
return Flux.fromIterable(splitterFunction.apply(received));
}
else {
return Flux.empty();
}
};
return () -> IntegrationReactiveUtils.messageSourceToFlux(jdbcMessageSource).transform(splitterFunction);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2024 the original author or authors.
* Copyright 2020-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,27 +26,27 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.messaging.Message;
import org.springframework.test.annotation.DirtiesContext;

import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Soby Chacko
* @author Artem Bilan
*/
@SpringBootTest(properties = "jdbc.supplier.query=select id, name from test order by id")
@DirtiesContext
public class DefaultJdbcSupplierTests {

@Autowired
Supplier<Flux<Message<?>>> jdbcSupplier;

@Autowired
JdbcTemplate jdbcTemplate;

@Test
@SuppressWarnings("rawtypes")
void testExtraction() {
final Flux<Message<?>> messageFlux = jdbcSupplier.get();
StepVerifier stepVerifier = StepVerifier.create(messageFlux)
StepVerifier.create(messageFlux)
.assertNext((message) -> assertThat(message)
.satisfies((msg) -> assertThat(msg).extracting(Message::getPayload).matches((o) -> {
Map map = (Map) o;
Expand All @@ -63,8 +63,7 @@ void testExtraction() {
return map.get("ID").equals(3L) && map.get("NAME").equals("John");
})))
.thenCancel()
.verifyLater();
stepVerifier.verify();
.verify();
}

@SpringBootApplication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.springframework.cloud.fn.supplier.mongo;

import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -28,12 +27,12 @@
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
import org.springframework.cloud.fn.splitter.SplitterFunctionConfiguration;
import org.springframework.cloud.function.context.PollableBean;
import org.springframework.context.annotation.Bean;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.mongodb.inbound.MongoDbMessageSource;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;

Expand All @@ -59,21 +58,11 @@ public MongodbSupplierConfiguration(MongodbSupplierProperties properties, MongoT
}

@Bean(name = "mongodbSupplier")
@PollableBean
@ConditionalOnProperty(prefix = "mongodb", name = "split", matchIfMissing = true)
public Supplier<Flux<Message<?>>> splittedSupplier(MongoDbMessageSource mongoDbSource,
Function<Message<?>, List<Message<?>>> splitterFunction) {

return () -> {
Message<?> received = mongoDbSource.receive();
if (received != null) {
// multiple Message<Map<String, Object>>
return Flux.fromIterable(splitterFunction.apply(received));
}
else {
return Flux.empty();
}
};
Function<Flux<Message<Object>>, Flux<Message<?>>> splitterFunction) {

return () -> IntegrationReactiveUtils.messageSourceToFlux(mongoDbSource).transform(splitterFunction);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2024 the original author or authors.
* Copyright 2019-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -76,8 +76,6 @@ void testMongodbSupplier() {
(message) -> assertThat(toMap(message)).contains(entry("greeting", "hola"), entry("name", "bar")))
.thenCancel()
.verify();

assertThat(this.mongodbSupplier.get().collectList().block()).isEmpty();
}

@SuppressWarnings("unchecked")
Expand Down
Loading