Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise splitterFunction signatunre into Function<Flux<Message<?>, Flux<Message<?>>> #107

Open
artembilan opened this issue Dec 6, 2024 · 0 comments · May be fixed by #128
Open

Revise splitterFunction signatunre into Function<Flux<Message<?>, Flux<Message<?>>> #107

artembilan opened this issue Dec 6, 2024 · 0 comments · May be fixed by #128
Labels
type: enhancement A general enhancement
Milestone

Comments

@artembilan
Copy link
Collaborator

When we have a composition like this:

spring.cloud.function.definition = fileSupplier|splitterFunction

Then final "function" signature is like this Supplier<Flux<Message<List<Message<?>>>>>.
And that is exactly what we don't expected from the splitter in the end of the composition.
While Spring Cloud Stream supports de-batching, it works for a List output only if function is bound by itself.
In case of composition we got just a Supplier.

Changing signature for the splitterFunction to reactive types would make it working even with a Supplier composition.
The workaround is like this extra function in the end of the composition:

	@Bean
	Function<Flux<Message<List<Message<?>>>>, Flux<Message<?>>> flattenFunction() {
		return messageFlux -> messageFlux.map(Message::getPayload).flatMapIterable(Function.identity());
	}
@artembilan artembilan added the type: enhancement A general enhancement label Dec 6, 2024
@artembilan artembilan added this to the 5.1.0 milestone Dec 6, 2024
artembilan added a commit that referenced this issue Dec 26, 2024
Fixes: #107

When we have a composition like this:

```
spring.cloud.function.definition = fileSupplier|splitterFunction
```

Then final "function" signature is like this `Supplier<Flux<Message<List<Message<?>>>>>`.
And that is exactly what we don't expected from the splitter in the end of the composition.
While Spring Cloud Stream supports de-batching, it works for a `List` output only if function is bound by itself.
In case of composition we got just a `Supplier`.

* Rework `SplitterFunctionConfiguration` for `splitterFunction` from `Function<Message<?>, List<Message<?>>>`
to `Function<Flux<Message<?>>, Flux<Message<?>>>` signature to support every possible simple and composed bindings
in Spring Cloud Stream
* Rework `SplitterFunctionApplicationTests` for new expected `Function<Flux<Message<?>>, Flux<Message<?>>>` signature
* Rework `zip-split-rabbit-binder` sample to not use a `flattenFunction` workaround
and fully rely on whatever is new for the `splitterFunction`
* Fix `ZipSplitRabbitBinderApplicationTests` moving the `@RabbitListener` into a `@TestConfiguration`.
Apparently in a new Spring Boot version the test class is registered as a bean much later than normal application context startup.
Therefore, even if the `@RabbitListener` parsed and registered properly, the `RabbitAdmin` bean
has been already started to see our extra bean definition for the `@QueueBinding`

Changing signature for the splitterFunction to reactive types would make it working even with a Supplier composition.
@artembilan artembilan linked a pull request Dec 26, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement A general enhancement
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant