Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-107: Make Splitter Function as Flux-based #128

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open

Conversation

artembilan
Copy link
Collaborator

Fixes: #107

When we have a composition like this:

spring.cloud.function.definition = fileSupplier|splitterFunction

Then final "function" signature is like this Supplier<Flux<Message<List<Message<?>>>>>. And that is exactly what we don't expected from the splitter in the end of the composition. While Spring Cloud Stream supports de-batching, it works for a List output only if function is bound by itself. In case of composition we got just a Supplier.

  • Rework SplitterFunctionConfiguration for splitterFunction from Function<Message<?>, List<Message<?>>> to Function<Flux<Message<?>>, Flux<Message<?>>> signature to support every possible simple and composed bindings in Spring Cloud Stream
  • Rework SplitterFunctionApplicationTests for new expected Function<Flux<Message<?>>, Flux<Message<?>>> signature
  • Rework zip-split-rabbit-binder sample to not use a flattenFunction workaround and fully rely on whatever is new for the splitterFunction
  • Fix ZipSplitRabbitBinderApplicationTests moving the @RabbitListener into a @TestConfiguration. Apparently in a new Spring Boot version the test class is registered as a bean much later than normal application context startup. Therefore, even if the @RabbitListener parsed and registered properly, the RabbitAdmin bean has been already started to see our extra bean definition for the @QueueBinding

Changing signature for the splitterFunction to reactive types would make it working even with a Supplier composition.

Fixes: #107

When we have a composition like this:

```
spring.cloud.function.definition = fileSupplier|splitterFunction
```

Then final "function" signature is like this `Supplier<Flux<Message<List<Message<?>>>>>`.
And that is exactly what we don't expected from the splitter in the end of the composition.
While Spring Cloud Stream supports de-batching, it works for a `List` output only if function is bound by itself.
In case of composition we got just a `Supplier`.

* Rework `SplitterFunctionConfiguration` for `splitterFunction` from `Function<Message<?>, List<Message<?>>>`
to `Function<Flux<Message<?>>, Flux<Message<?>>>` signature to support every possible simple and composed bindings
in Spring Cloud Stream
* Rework `SplitterFunctionApplicationTests` for new expected `Function<Flux<Message<?>>, Flux<Message<?>>>` signature
* Rework `zip-split-rabbit-binder` sample to not use a `flattenFunction` workaround
and fully rely on whatever is new for the `splitterFunction`
* Fix `ZipSplitRabbitBinderApplicationTests` moving the `@RabbitListener` into a `@TestConfiguration`.
Apparently in a new Spring Boot version the test class is registered as a bean much later than normal application context startup.
Therefore, even if the `@RabbitListener` parsed and registered properly, the `RabbitAdmin` bean
has been already started to see our extra bean definition for the `@QueueBinding`

Changing signature for the splitterFunction to reactive types would make it working even with a Supplier composition.
@artembilan artembilan requested a review from onobc December 26, 2024 20:29
The `IntegrationReactiveUtils.messageSourceToFlux()` provides convenient API to represent a `MessageSource`
as a `Flux` to poll this source.
The API has an error handling logic and delay when no data emitted by the source
@artembilan
Copy link
Collaborator Author

@onobc ,

gentle ping.
Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Revise splitterFunction signatunre into Function<Flux<Message<?>, Flux<Message<?>>>
1 participant