diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 00000000..14d5dd27 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,11 @@ +version: 2 +updates: + - package-ecosystem: "maven" + directory: "/" + schedule: + interval: "daily" + allow: + - dependency-name: "com.solace.*" + - dependency-name: "com.solacesystems:*" + - dependency-name: "org.springframework.*" + - dependency-name: "org.apache.logging.log4j:*" diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 0116a45a..b7bdb0d7 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -1,34 +1,66 @@ # This workflow will build and test a Java project with Maven -name: build +name: Test on: pull_request: push: + workflow_dispatch: jobs: - build: + dupe_check: + name: Check for Duplicate Workflow Run + runs-on: ubuntu-latest + outputs: + should_skip: ${{ steps.skip_check.outputs.should_skip }} + steps: + - id: skip_check + uses: fkirc/skip-duplicate-actions@v3.4.0 + with: + concurrent_skipping: same_content + do_not_skip: '["pull_request", "workflow_dispatch", "schedule"]' + build: + name : Build & Test + needs: + - dupe_check + if: needs.dupe_check.outputs.should_skip != 'true' runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 + - name: Cache local Maven repository + uses: actions/cache@v2 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-test-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-maven-test- - name: Setup JDK 1.8 - uses: actions/setup-java@v1 + uses: actions/setup-java@v2 with: - java-version: 1.8 - - name: Build and run Unit Tests + distribution: zulu + java-version: 8 + - name: Build and run Tests run: mvn clean verify - - name: Build and run Integration Tests - if: ${{ github.event_name != 'pull_request' }} - run: mvn clean verify -Pit - env: - SOLACE_JAVA_MSG_VPN: ${{ secrets.SOLACE_JAVA_MSG_VPN }} - SOLACE_JAVA_CLIENT_USERNAME: ${{ secrets.SOLACE_JAVA_CLIENT_USERNAME }} - SOLACE_JAVA_CLIENT_PASSWORD: ${{ secrets.SOLACE_JAVA_CLIENT_PASSWORD }} - SOLACE_JAVA_HOST: ${{ secrets.SOLACE_JAVA_HOST }} - TEST_SOLACE_MGMT_HOST: ${{ secrets.TEST_SOLACE_MGMT_HOST }} - TEST_SOLACE_MGMT_USERNAME: ${{ secrets.TEST_SOLACE_MGMT_USERNAME }} - TEST_SOLACE_MGMT_PASSWORD: ${{ secrets.TEST_SOLACE_MGMT_PASSWORD }} - - name: Verify Integration Test Support was not changed - run: ./solace-integration-test-support/scripts/validate_submodule_not_changed.sh + - name: Upload Test Artifacts + if: always() + uses: actions/upload-artifact@v2 + with: + name: Test Results + path: | + **/target/failsafe-reports/*.xml + **/target/surefire-reports/*.xml + - name: Publish Unit Test Results + if: (success() || failure()) && (github.actor != 'dependabot[bot]' || (github.event_name == 'push' && !contains(github.ref, 'dependabot'))) + uses: EnricoMi/publish-unit-test-result-action@v1 + continue-on-error: true + with: + check_name: Unit Test Results + comment_mode: create new + fail_on: nothing + hide_comments: orphaned commits + files: | + **/target/failsafe-reports/*.xml + !**/target/failsafe-reports/failsafe-summary.xml + **/target/surefire-reports/*.xml diff --git a/README.md b/README.md index 646481cf..7fd94343 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ Please consult the [Spring Cloud Compatibility Table](./solace-spring-cloud-bom/ ### Solace Spring Cloud Projects These are the projects contained within this repository: -* [Solace Spring Cloud Stream Starter](./solace-spring-cloud-starters/solace-spring-cloud-stream-starter) +* [Spring Cloud Stream Binder for Solace PubSub+](./solace-spring-cloud-starters/solace-spring-cloud-stream-starter) * [Solace Spring Cloud Connector](./solace-spring-cloud-connector) ## Building Locally @@ -71,6 +71,32 @@ If it shouldn't be inherited by the BOM, it doesn't go here. - solace-spring-cloud-parent This POM defines common properties and dependencies for the Solace Spring Cloud projects. +### Running the Tests + +Run the following command to run all the unit and integration tests: + +```shell +mvn clean verify +``` + +#### Run Tests With An External Broker + +By default, the tests requires for Docker to be installed on the host machine so that they can auto-provision a PubSub+ broker. Otherwise, the following environment variables can be set to direct the tests to use an external broker: + +``` +SOLACE_JAVA_HOST=tcp://localhost:55555 +SOLACE_JAVA_CLIENT_USERNAME=default +SOLACE_JAVA_CLIENT_PASSWORD=default +SOLACE_JAVA_MSG_VPN=default +TEST_SOLACE_MGMT_HOST=http://localhost:8080 +TEST_SOLACE_MGMT_USERNAME=admin +TEST_SOLACE_MGMT_PASSWORD=admin +``` + +#### Parallel Test Execution + +Parallel test execution is enabled by default. Add the `-Djunit.jupiter.execution.parallel.enabled=false` option to your command to disable parallel test execution. + ## Release Process 1. Update `solace-spring-boot-bom` to latest released version @@ -108,7 +134,7 @@ For more information about Spring Boot Auto-Configuration and Starters try these - [GitHub Tutorial - Master Spring Boot Auto-Configuration](//github.com/snicoll-demos/spring-boot-master-auto-configuration) For more information about Cloud Foundry and the Solace PubSub+ service these resources: -- [Solace PubSub+ for Pivotal Cloud Foundry](http://docs.pivotal.io/solace-messaging/) +- [Solace PubSub+ for VMware Tanzu](http://docs.pivotal.io/solace-messaging/) - [Cloud Foundry Documentation](http://docs.cloudfoundry.org/) - For an introduction to Cloud Foundry: https://www.cloudfoundry.org/ diff --git a/pom.xml b/pom.xml index a109fe45..d6708fb1 100644 --- a/pom.xml +++ b/pom.xml @@ -5,12 +5,12 @@ com.solace.spring.boot solace-spring-boot-bom - 1.1.1 + 1.2.1 com.solace.spring.cloud solace-spring-cloud-build - 2.2.2-SNAPSHOT + 2.3.0-SNAPSHOT pom Solace Spring Cloud Build @@ -21,7 +21,7 @@ SolaceProducts - 2020.0.1 + 2021.0.1 2.2.13.RELEASE @@ -29,16 +29,16 @@ - 1.1.1 + 1.2.1 - 2.4.3 + 2.6.4 4.3.6-SNAPSHOT - 3.2.2-SNAPSHOT + 3.3.0-SNAPSHOT - 0.2.0 + 0.9.0 false true @@ -113,7 +113,7 @@ ossrh - https://oss.sonatype.org/content/repositories/snapshots + https://s01.oss.sonatype.org/content/repositories/snapshots @@ -133,7 +133,7 @@ true ossrh - https://oss.sonatype.org/ + https://s01.oss.sonatype.org/ true diff --git a/solace-integration-test-support b/solace-integration-test-support index f17ca7f3..54fde40f 160000 --- a/solace-integration-test-support +++ b/solace-integration-test-support @@ -1 +1 @@ -Subproject commit f17ca7f379f97167402491b3d1f33bccebf0a05a +Subproject commit 54fde40fda262bda918219a07d94baf4c6e94662 diff --git a/solace-spring-cloud-bom/README.md b/solace-spring-cloud-bom/README.md index 7254751a..2eb05743 100644 --- a/solace-spring-cloud-bom/README.md +++ b/solace-spring-cloud-bom/README.md @@ -21,6 +21,7 @@ Consult the table below to determine which version of the BOM you need to use: |Hoxton.SR1 |1.0.0 | 2.2.x | |Hoxton.SR6 |1.1.0 | 2.3.x | |2020.0.1 |2.0.0, 2.1.0, 2.2.0, 2.2.1 | 2.4.x | +|2021.0.1 |2.3.0 | 2.6.x | ## Including the BOM @@ -33,7 +34,7 @@ In addition to showing how to include the BOM, the following snippets also shows com.solace.spring.cloud solace-spring-cloud-bom - 2.2.1 + 2.3.0 pom import @@ -61,7 +62,7 @@ apply plugin: 'io.spring.dependency-management' dependencyManagement { imports { - mavenBom "com.solace.spring.cloud:solace-spring-cloud-bom:2.2.1" + mavenBom "com.solace.spring.cloud:solace-spring-cloud-bom:2.3.0" } } @@ -73,7 +74,7 @@ dependencies { ### Using it with Gradle 5 ```groovy dependencies { - implementation(platform("com.solace.spring.cloud:solace-spring-cloud-bom:2.2.1")) + implementation(platform("com.solace.spring.cloud:solace-spring-cloud-bom:2.3.0")) implementation("com.solace.spring.cloud:spring-cloud-starter-stream-solace") } ``` diff --git a/solace-spring-cloud-bom/pom.xml b/solace-spring-cloud-bom/pom.xml index 7e3b9379..1e873b0b 100644 --- a/solace-spring-cloud-bom/pom.xml +++ b/solace-spring-cloud-bom/pom.xml @@ -5,13 +5,13 @@ com.solace.spring.cloud solace-spring-cloud-build - 2.2.2-SNAPSHOT + 2.3.0-SNAPSHOT ../pom.xml solace-spring-cloud-bom pom - 2.2.2-SNAPSHOT + 2.3.0-SNAPSHOT Solace Spring Cloud BOM BOM for Solace Spring Cloud diff --git a/solace-spring-cloud-connector/README.md b/solace-spring-cloud-connector/README.md index aee71722..531b1790 100644 --- a/solace-spring-cloud-connector/README.md +++ b/solace-spring-cloud-connector/README.md @@ -52,9 +52,9 @@ session.connect(); ## Spring Applications -The Spring Cloud Auto-Configure Java, JMS and JNDI tutorials in the [Solace PubSub+ with Pivotal Cloud Foundry Getting Started Samples](https://dev.solace.com/samples/solace-samples-cloudfoundry-java/) provide easy integration into Spring applications. +The Spring Cloud Auto-Configure Java, JMS and JNDI tutorials in the [Solace PubSub+ with VMware Tanzu Getting Started Samples](https://tutorials.solace.dev/tanzu) provide easy integration into Spring applications. -Above example for the Solace PubSub+ API for Java (JCSMP) would be further simplified as follows: here Spring creates a SpringJCSMPFactory with all the properties set and all that is required is to autowire this into your application. Check out the [tutorial](https://dev.solace.com/samples/solace-samples-cloudfoundry-java/spring-cloud-autoconf-java/) for further details. +Above example for the Solace PubSub+ API for Java (JCSMP) would be further simplified as follows: here Spring creates a SpringJCSMPFactory with all the properties set and all that is required is to autowire this into your application. Check out the [tutorial](https://tutorials.solace.dev/tanzu/spring-cloud-autoconf-java/) for further details. ```java @Autowired @@ -76,7 +76,7 @@ Include version 4.0.0 or later to use Spring Boot release 2.x ``` // Solace Cloud -compile("com.solace.cloud.cloudfoundry:solace-spring-cloud-connector:4.3.5") +compile("com.solace.cloud.cloudfoundry:solace-spring-cloud-connector:4.3.6") ``` ### Using it with Maven @@ -86,14 +86,14 @@ compile("com.solace.cloud.cloudfoundry:solace-spring-cloud-connector:4.3.5") com.solace.cloud.cloudfoundry solace-spring-cloud-connector - 4.3.5 + 4.3.6 ``` ## Resources For more information about Cloud Foundry and the Solace PubSub+ service these resources: -- [Solace PubSub+ for Pivotal Cloud Foundry](http://docs.pivotal.io/solace-messaging/) +- [Solace PubSub+ for VMware Tanzu](http://docs.pivotal.io/solace-messaging/) - [Cloud Foundry Documentation](http://docs.cloudfoundry.org/) - For an introduction to Cloud Foundry: https://www.cloudfoundry.org/ diff --git a/solace-spring-cloud-connector/pom.xml b/solace-spring-cloud-connector/pom.xml index a1706e0d..fb41c0a1 100644 --- a/solace-spring-cloud-connector/pom.xml +++ b/solace-spring-cloud-connector/pom.xml @@ -21,7 +21,7 @@ com.solace.spring.cloud solace-spring-cloud-parent - 2.2.2-SNAPSHOT + 2.3.0-SNAPSHOT ../solace-spring-cloud-parent/pom.xml @@ -38,7 +38,7 @@ com.solace.cloud.core solace-services-info - 0.4.3 + 0.4.4 diff --git a/solace-spring-cloud-parent/pom.xml b/solace-spring-cloud-parent/pom.xml index 606905e0..a617203c 100644 --- a/solace-spring-cloud-parent/pom.xml +++ b/solace-spring-cloud-parent/pom.xml @@ -5,7 +5,7 @@ com.solace.spring.cloud solace-spring-cloud-build - 2.2.2-SNAPSHOT + 2.3.0-SNAPSHOT ../pom.xml @@ -23,8 +23,8 @@ 1.8 1.8 - 10.12.0 - 10.12.0 + 10.13.1 + 10.13.1 @@ -36,7 +36,7 @@ org.apache.logging.log4j log4j-bom - 2.16.0 + 2.17.2 pom import @@ -90,6 +90,18 @@ + + maven-surefire-plugin + 2.22.2 + + + + junit.jupiter.execution.parallel.enabled=true + + + + + org.apache.maven.plugins maven-failsafe-plugin @@ -101,6 +113,13 @@ integration-test verify + + + + junit.jupiter.execution.parallel.enabled=true + + + diff --git a/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/README.adoc b/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/README.adoc index 399f8ab1..19fe508c 100644 --- a/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/README.adoc +++ b/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/README.adoc @@ -1,8 +1,9 @@ = Spring Cloud Stream Binder for Solace PubSub+ -:revnumber: 3.2.1 +:revnumber: 3.3.0 :toc: preamble +:toclevels: 3 :icons: font -:scst-version: 3.1.1 +:scst-version: 3.2.2 // Github-Specific Settings ifdef::env-github[] @@ -19,21 +20,26 @@ An implementation of Spring's Cloud Stream Binder for integrating with Solace Pu The Solace implementation of the Spring Cloud Stream Binder maps the following concepts from Spring to Solace: -* Destinations to topic subscriptions (Source apps always send messages to a topic) +* Destinations to topics/subscriptions +** Producer bindings always sends messages to topics * Consumer groups to durable queues +** A consumer group's queue is subscribed to its destination subscription (default) +** Consumer bindings always receives messages from queues * Anonymous consumer groups to temporary queues (When no group is specified; used for SCS Publish-Subscribe Model) -And internally, each consumer group queue is subscribed to at least their destination topic. So a typical message flow would then appear as follows: +In Solace, the above setup is called topic-to-queue mapping. So a typical message flow would then appear as follows: . Producer bindings publish messages to their destination topics -. Consumer group queues receive the messages published to their destination topic -. Consumers of a particular consumer group consume messages from their group in a round-robin fashion (by default) +. Each consumer groups' queue receives the messages published to their destination topic +. The PubSub+ broker distributes messages in a round-robin fashion to each consumer binding for a particular consumer group ++ +NOTE: Round-robin distribution only occurs if the consumer group's queue is configured for non-exclusive access. If the queue has exclusive access, then only one consumer will receive messages. -Note that partitioning is not yet supported by this version of the binder. +NOTE: Partitioning is not yet supported by this version of the binder. -Note that since the Binder always consumes from queues it is currently required that Assured Delivery be enabled on the Solace PubSub+ Message VPN being used (Assured Delivery is automatically enabled if using Solace Cloud.) +IMPORTANT: Since consumer bindings always consumes from queues it is required that Assured Delivery is enabled on the Solace PubSub+ Message VPN being used (Assured Delivery is automatically enabled if using Solace Cloud). Additionally, the client username's client profile must be allowed to send and receive guaranteed messages. -Also, it will be assumed that you have a basic understanding of the Spring Cloud Stream project. If not, then please refer to https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/[Spring's documentation]. For the sake of brevity, this document will solely focus on discussing components unique to Solace. +For the sake of brevity, it will be assumed that you have a basic understanding of the Spring Cloud Stream project. If not, then please refer to https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/[Spring's documentation]. This document will solely focus on discussing components unique to Solace. == Spring Cloud Stream Binder @@ -120,7 +126,7 @@ spring: solace-broker: type: solace environment: - solace: + solace: # <1> java: host: tcp://localhost:55555 msgVpn: default @@ -128,9 +134,12 @@ spring: clientPassword: default connectRetries: -1 reconnectRetries: -1 +# apiProperties: +# ssl_trust_store: +# ssl_trust_store_password: +# ssl_validate_certificate: true ---- - -Notice that the latter half of this configuration actually originates from the https://github.com/SolaceProducts/solace-spring-boot/tree/master/solace-spring-boot-starters/solace-java-spring-boot-starter#updating-your-application-properties[JCSMP Spring Boot Auto-Configuration project]. +<1> The latter half of this configuration where the Solace session is configured actually originates from the https://github.com/SolaceProducts/solace-spring-boot/tree/master/solace-spring-boot-starters/solace-java-spring-boot-starter#updating-your-application-properties[JCSMP Spring Boot Auto-Configuration project]. See <> for more info. == Configuration Options @@ -138,11 +147,22 @@ Notice that the latter half of this configuration actually originates from the h Configuration of the Solace Spring Cloud Stream Binder is done through https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-external-config.html[Spring Boot's externalized configuration]. This is where users can control the binder's configuration options as well as the Solace Java API properties. -=== Inherited Configuration Options +For general binder configuration options and properties, refer to the https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#_configuration_options[Spring Cloud Stream Reference Documentation]. -As for auto-configuration-related options required for auto-connecting to Solace message brokers, refer to the https://github.com/SolaceProducts/solace-spring-boot/tree/master/solace-spring-boot-starters/solace-java-spring-boot-starter#configure-the-application-to-use-your-solace-pubsub-service-credentials[JCSMP Spring Boot Auto-Configuration documentation]. +==== Solace Session Properties -For general binder configuration options and properties, refer to the https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#_configuration_options[Spring Cloud Stream Reference Documentation]. +The binder's Solace session is configurable using properties prefixed by `solace.java` or `spring.cloud.stream.binders..environment.solace.java`. + +IMPORTANT: This binder leverages the JCSMP Spring Boot Auto-Configuration project to configure its session. See the https://github.com/SolaceProducts/solace-spring-boot/tree/master/solace-spring-boot-starters/solace-java-spring-boot-starter#configure-the-application-to-use-your-solace-pubsub-service-credentials[JCSMP Spring Boot Auto-Configuration documentation] for more info on how to configure these properties. + +See <> for a simple example of how to configure a session for this binder. + +[TIP] +==== +Additional session properties not available under the usual `solace.java` prefix can be set using `solace.java.apiProperties.`, where `` is the name of a https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPProperties.html[JCSMPProperties constant] (e.g. `ssl_trust_store`). + +See https://github.com/SolaceProducts/solace-spring-boot/tree/master/solace-spring-boot-starters/solace-java-spring-boot-starter#updating-your-application-properties[JCSMP Spring Boot Auto-Configuration documentation] for more info about `solace.java.apiProperties`. +==== ==== Solace Consumer Properties @@ -156,28 +176,51 @@ Whether to provision durable queues for non-anonymous consumer groups. This shou Default: `true` + See: <> +addDestinationAsSubscriptionToQueue:: +Whether to add the Destination as a subscription to queue during provisioning. ++ +Default: `true` + provisionSubscriptionsToDurableQueue:: Whether to add topic subscriptions to durable queues for non-anonymous consumer groups. This should only be set to `false` if you have externally pre-added the required topic subscriptions (the destination topic should be added at minimum) on the consumer group's queue on the message broker. This property also applies to topics added by the `queueAdditionalSubscriptions` property. + Default: `true` ++ +WARNING: **Deprecated:** Since version 3.3.0, this property is deprecated in favor of `addDestinationAsSubscriptionToQueue`. + +queueNameExpression:: +A SpEL expression for creating the consumer group’s queue name. ++ +Default: `"(properties.solace.queueNamePrefix?.trim()?.length() > 0 ? properties.solace.queueNamePrefix.trim() + '/' : '') + (properties.solace.useFamiliarityInQueueName ? (isAnonymous ? 'an' : 'wk') + '/' : '') + (isAnonymous ? group?.trim() + '/' : (properties.solace.useGroupNameInQueueName ? group?.trim() + '/' : '')) + (properties.solace.useDestinationEncodingInQueueName ? 'plain' + '/' : '') + destination.trim().replaceAll('[*>]', '_')"` + +See: <> ++ +WARNING: Modifying this can cause naming conflicts between the queue names of consumer groups. ++ +WARNING: While the default SpEL expression will consistently return a value adhering to <>, directly using the SpEL expression string is not supported. The default value for this config option is subject to change without notice. queueNamePrefix:: Naming prefix for all queues. + Default: `"scst"` + See: <> ++ +WARNING: **Deprecated:** Since version 3.3.0, this property is deprecated in favor of `queueNameExpression` and `errorQueueNameExpression`. Prefixes can be specified directly in these SpEL expressions. useFamiliarityInQueueName:: When set to `true`, the familiarity modifier, `wk`/`an`, is included in the generated queue name. + Default: `true` + See: <> ++ +WARNING: **Deprecated:** Since version 3.3.0, this property is deprecated in favor of `queueNameExpression` and `errorQueueNameExpression`. The familiarity modifier can be removed from queue names by removing it directly from these SpEL expressions. useDestinationEncodingInQueueName:: When set to `true`, the destination encoding (`plain`), is included in the generated queue name. + Default: `true` + See: <> ++ +WARNING: **Deprecated:** Since version 3.3.0, this property is deprecated in favor of `queueNameExpression` and `errorQueueNameExpression`. The destination encoding can be removed from queue names by removing it directly from these SpEL expressions. useGroupNameInQueueName:: Whether to include the `group` name in the queue name for non-anonymous consumer groups. @@ -186,6 +229,8 @@ Default: `true` + See: <> + IMPORTANT: If set to `false`, all consumers of the same `destination` which also have this set to `false` will consume from the same queue regardless of their configured `group` names. ++ +WARNING: **Deprecated:** Since version 3.3.0, this property is deprecated in favor of `queueNameExpression`. The group name can be removed from the consumer group's queue name by removing it directly from this SpEL expression. queueAccessType:: Access type for the consumer group queue. @@ -224,13 +269,14 @@ Default: `null` queueAdditionalSubscriptions:: An array of additional topic subscriptions to be applied on the consumer group queue. + -These subscriptions may also contain wildcards. + -The `prefix` property is not applied on these subscriptions. +These subscriptions may also contain wildcards. + -Default: `String[0]` +Default: `String[0]` + +See: <> for more info on how this binder uses topic-to-queue mapping to implement Spring Cloud Streams consumer groups. polledConsumerWaitTimeInMillis:: -Maximum wait time for polled consumers to receive a message from their consumer group queue. +Maximum wait time for polled consumers to receive a message from their consumer group queue. + +Only applicable when `batchMode` is `false`. + Default: `100` @@ -239,6 +285,18 @@ The maximum time to wait for all unacknowledged messages to be acknowledged befo + Default: `10000` +batchMaxSize:: +The maximum number of messages per batch. + +Only applicable when `batchMode` is `true`. ++ +Default: `255` + +batchTimeout:: +The maximum wait time in milliseconds to receive a batch of messages. If this timeout is reached, then the messages that have already been received will be used to create the batch. A value of `0` means wait forever. + +Only applicable when `batchMode` is `true`. ++ +Default: `5000` + autoBindErrorQueue:: Whether to automatically create a durable error queue to which messages will be republished when message processing failures are encountered. Only applies once all internal retries have been exhausted. + @@ -252,11 +310,23 @@ Whether to provision durable queues for error queues when `autoBindErrorQueue` i Default: `true` + See: <> +errorQueueNameExpression:: +A SpEL expression for creating the error queue’s name. ++ +Default: `"(properties.solace.queueNamePrefix?.trim()?.length() > 0 ? properties.solace.queueNamePrefix.trim() + '/' : '') + 'error/' + (properties.solace.useFamiliarityInQueueName ? (isAnonymous ? 'an' : 'wk') + '/' : '') + (isAnonymous ? group?.trim() + '/' : (properties.solace.useGroupNameInErrorQueueName ? group?.trim() + '/' : '')) + (properties.solace.useDestinationEncodingInQueueName ? 'plain' + '/' : '') + destination.trim().replaceAll('[*>]', '_')"` + +See: <> ++ +WARNING: Modifying this can cause naming conflicts between the error queue names. ++ +WARNING: While the default SpEL expression will consistently return a value adhering to <>, directly using the SpEL expression string is not supported. The default value for this config option is subject to change without notice. + errorQueueNameOverride:: A custom error queue name. + Default: `null` + See: <> ++ +WARNING: **Deprecated:** Since version 3.3.0, this property is deprecated in favor of `errorQueueNameExpression`. useGroupNameInErrorQueueName:: Whether to include the `group` name in the error queue name for non-anonymous consumer groups. @@ -265,6 +335,8 @@ Default: `true` + See: <> + IMPORTANT: If set to `false`, all consumers of the same `destination` which also have this set to `false` will republish failed messages to the same error queue regardless of their configured `group` names. ++ +WARNING: **Deprecated:** Since version 3.3.0, this property is deprecated in favor of `errorQueueNameExpression`. The group name can be removed from the error queue name by removing it directly from this SpEL expression. errorQueueMaxDeliveryAttempts:: Maximum number of attempts to send a failed message to the error queue. When all delivery attempts have been exhausted, the failed message will be requeued. @@ -340,28 +412,63 @@ Whether to provision durable queues for non-anonymous consumer groups. This shou Default: `true` + See: <> +addDestinationAsSubscriptionToQueue:: +Whether to add the Destination as a subscription to queue during provisioning. ++ +Default: `true` + provisionSubscriptionsToDurableQueue:: Whether to add topic subscriptions to durable queues for non-anonymous consumer groups. This should only be set to `false` if you have externally pre-added the required topic subscriptions (the destination topic should be added at minimum) on the consumer group's queue on the message broker. This property also applies to topics added by the `queueAdditionalSubscriptions` property. + Default: `true` ++ +WARNING: **Deprecated:** Since version 3.3.0, this property is deprecated in favor of `addDestinationAsSubscriptionToQueue`. + +queueNameExpression:: +A SpEL expression for creating the consumer group’s queue name. ++ +Default: `"(properties.solace.queueNamePrefix?.trim()?.length() > 0 ? properties.solace.queueNamePrefix.trim() + '/' : '') + (properties.solace.useFamiliarityInQueueName ? (isAnonymous ? 'an' : 'wk') + '/' : '') + group?.trim() + '/' + (properties.solace.useDestinationEncodingInQueueName ? 'plain' + '/' : '') + destination.trim().replaceAll('[*>]', '_')"` + +See: <> ++ +WARNING: Modifying this can cause naming conflicts between the queue names of consumer groups. ++ +WARNING: While the default SpEL expression will consistently return a value adhering to <>, directly using the SpEL expression string is not supported. The default value for this config option is subject to change without notice. + +queueNameExpressionsForRequiredGroups:: +A mapping of required consumer groups to queue name SpEL expressions. ++ +By default, queueNameExpression will be used to generate a required group’s queue name if it isn’t specified within this configuration option. ++ +Default: `Empty Map` + +See: <> ++ +WARNING: Modifying this can cause naming conflicts between the queue names of consumer groups. ++ +WARNING: While the default SpEL expression will consistently return a value adhering to <>, directly using the SpEL expression string is not supported. The default value for this config option is subject to change without notice. queueNamePrefix:: Naming prefix for all queues. + Default: `"scst"` + See: <> ++ +WARNING: **Deprecated:** Since version 3.3.0, this property is deprecated in favor of `queueNameExpression` and `queueNameExpressionsForRequiredGroups`. Prefixes can be specified directly in these SpEL expressions. useFamiliarityInQueueName:: When set to `true`, the familiarity modifier, `wk`/`an`, is included in the generated queue name. + Default: `true` + See: <> ++ +WARNING: **Deprecated:** Since version 3.3.0, this property is deprecated in favor of `queueNameExpression` and `queueNameExpressionsForRequiredGroups`. The familiarity modifier can be removed from queue names by removing it directly from these SpEL expressions. useDestinationEncodingInQueueName:: When set to `true`, the destination encoding (`plain`), is included in the generated queue name. + Default: `true` + See: <> ++ +WARNING: **Deprecated:** Since version 3.3.0, this property is deprecated in favor of `queueNameExpression` and `queueNameExpressionsForRequiredGroups`. The destination encoding can be removed from queue names by removing it directly from these SpEL expressions. queueAccessType:: Access type for the required consumer group queue. @@ -400,10 +507,10 @@ Default: `null` queueAdditionalSubscriptions:: A mapping of required consumer groups to arrays of additional topic subscriptions to be applied on each consumer group's queue. + -These subscriptions may also contain wildcards. + -The `prefix` property is not applied on these subscriptions. +These subscriptions may also contain wildcards. + -Default: Empty `Map<String,String[]>` +Default: Empty `Map<String,String[]>` + +See: <> for more info on how this binder uses topic-to-queue mapping to implement Spring Cloud Streams consumer groups. === Solace Message Headers @@ -455,6 +562,13 @@ This is the `JMSType` header field if publishing/consuming to/from JMS. | Read/Write | The correlation ID. +| solace_deliveryCount +| Integer +| Read +| The number of times the message has been delivered. + +Note that, while the Delivery Count feature is in controlled availability, `Enable Client Delivery Count` must be enabled on the queue and consumer bindings may need to be restarted after `Enable Client Delivery Count` is turned on. + | solace_destination | Destination | Read @@ -480,6 +594,11 @@ This is the `JMSType` header field if publishing/consuming to/from JMS. | Read/Write | The HTTP content encoding header value from interaction with an HTTP client. +| solace_isReply +| Boolean +| Read/Write +| Indicates whether this message is a reply. + | solace_priority | Integer | Read/Write @@ -550,6 +669,14 @@ TIP: Use link:../../solace-spring-cloud-stream-binder/solace-spring-cloud-stream | Default Value | Description +| solace_scst_batchedHeaders +| List> +| Read +| +| Only applicable when `batchMode` is `true`. + +The consolidated list of message headers for a batch of messages where the headers for each payload element is in this list’s corresponding index. + | solace_scst_confirmCorrelation | CorrelationData | Write @@ -562,6 +689,12 @@ TIP: Use link:../../solace-spring-cloud-stream-binder/solace-spring-cloud-stream | 1 | A static number set by the publisher to indicate the Spring Cloud Stream Solace message version. +| solace_scst_nullPayload +| Boolean +| Read +| +| Present and true to indicate when the PubSub+ message payload was null. + | solace_scst_serializedPayload | Boolean | Internal Binder Use Only @@ -581,6 +714,64 @@ TIP: Use link:../../solace-spring-cloud-stream-binder/solace-spring-cloud-stream | The encoding algorithm used to encode the headers indicated by `solace_scst_serializedHeaders`. |=== +== Native Payload Types + +Below are the payload types natively supported by this binder (before/after https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#content-type-management[Content Type Negotiation]): + +[cols="1m,1,3", options="header"] +|=== +| Payload Type | PubSub+ Message Type | Notes + +| byte[] +| Binary Message +| Basic PubSub+ payload type. + +| String +| Text Message +| Basic PubSub+ payload type. + +| SDTStream +| Stream Message +| Basic PubSub+ payload type. + +| SDTMap +| Map Message +| Basic PubSub+ payload type. + +| String +| XML-Content Message +| Basic PubSub+ payload type. + +Only available for consumption. + +| Serializable +| Bytes Message +| This is not a basic payload type supported by the PubSub+ broker, but is one defined and coordinated by this binder. + +**Publishing:** + +When a `Serializable` payload which doesn't satisfy any of the basic PubSub+ payload types is given to the binder to publish, the binder will serialize this payload to a `byte[]` and set the user property, `solace_scst_serializedPayload`, to `true`. + +**Consuming:** + +When the binder consumes a binary message which has the `solace_scst_serializedPayload` user property set to `true`, the binder will deserialize the binary attachment. +|=== + +[TIP] +==== +Typically, the Spring Cloud Stream framework will convert a published payload into a `byte[]` before giving it to the binder. In which case, this binder will publish a binary message. + +If this occurs, but you wish to publish other message types, then one option is to set `useNativeEncoding=true` on your producer (https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#_producer_properties[but read the caveats carefully before enabling this feature]), and have your message handler return a payload of one of this binder's supported native payload types; e.g. return `Message` to publish a stream message. + +See https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#content-type-management[Content Type Negotiation] for more info on how Spring Cloud Streams converts payloads and other options to control message conversion. +==== + +=== Empty Payload VS Null Payload + +Spring messages can't contain null payloads, however, message handlers can differentiate between null payloads and empty payloads by looking at the `solace_scst_nullPayload` header. The binder adds the `solace_scst_nullPayload` header when a Solace message with null payload is consumed from the wire. When that is the case, the binder sets the Spring message's payload to a null equivalent payload. Null equivalent payloads are one of the following: empty `byte[]`, empty `String`, empty `SDTMap`, or empty `SDTStream`. + +NOTE: Applications can't differentiate between null payloads and empty payloads when consuming binary messages or XML-content messages from the wire. This is because Solace always converts empty payloads to null payloads when those message types are published. + == Generated Queue Name Syntax By default, generated consumer group queue names have the following form: @@ -588,22 +779,43 @@ By default, generated consumer group queue names have the following form: ---- //// ---- - prefix:: -A static prefix as indicated by the `queueNamePrefix` configuration option. +A static prefix as indicated by the `queueNamePrefix` (deprecated) configuration option. familiarity-modifier:: -Indicates the durability of the consumer group (`wk` for well-known or `an` for anonymous). Can be enabled/disabled with the `useFamiliarityInQueueName` config option. +Indicates the durability of the consumer group (`wk` for well-known or `an` for anonymous). Can be enabled/disabled with the `useFamiliarityInQueueName` (deprecated) config option. group:: -The consumer `group` name. Can be enabled/disabled for consumers with the `useGroupNameInQueueName` consumer config option. +The consumer `group` name. Can be enabled/disabled for consumers with the `useGroupNameInQueueName` (deprecated) consumer config option. destination-encoding:: -Indicates the encoding scheme used to encode the destination in the queue name (currently only `plain` is supported). Can be enabled/disabled with the `useDestinationEncodingInQueueName` config option. +Indicates the encoding scheme used to encode the destination in the queue name (currently only `plain` is supported). Can be enabled/disabled with the `useDestinationEncodingInQueueName` (deprecated) config option. encoded-destination:: The encoded `destination` as per ``. +The `queueNameExpression` property's default SpEL expression conforms to the above format, however, users can provide any valid SpEL expression in order to generate custom queue names. Valid expressions evaluate against the following context: +[cols="1m,1", options="header"] +|=== +| Context Variable +| Description + +| destination +| The binding’s destination name. + +| group +| The binding’s consumer group name. + +| isAnonymous +| Indicates whether the consumer is an anonymous consumer group + +| properties.solace +| The configured Solace binding properties. + +| properties.spring +| The configured Spring binding properties. +|=== + === Generated Error Queue Name Syntax By default, generated error queue names have the following form: @@ -615,9 +827,9 @@ By default, generated error queue names have the following form: The definitions of each segment of the error queue matches that from <>, with the following exceptions: group:: -The consumer `group` name. Can be enabled/disabled with the `useGroupNameInErrorQueueName` consumer config option. +The consumer `group` name. Can be enabled/disabled with the `useGroupNameInErrorQueueName` (deprecated) consumer config option. -The error queue name can also be manually overridden with the `errorQueueNameOverride` consumer config option. +The `errorQueueNameExpression` property's default SpEL expression conforms to the above format. Users can provide any valid SpEL expression in order to generate custom error queue names using the same evaluation context as described in <>. == Consumer Concurrency @@ -630,6 +842,48 @@ Though note that there are few limitations: . `concurrency` > 1 is ignored for polled consumers. . Setting `provisionDurableQueue` to `false` disables endpoint configuration validation. Meaning that point 1 cannot be validated. In this scenario, it is the developer's responsibility to ensure that point 1 is followed. +== Batch Consumers + +https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#_batch_consumers[Batch consumers] can be enabled by setting `spring.cloud.stream.bindings..consumer.batch-mode` to `true`. In which case, batched messages may be consumed as follows: + +[source,java] +---- +@Bean +Consumer>> input() { + return batchMsg -> { // <1> + List batchedPayloads = batchMsg.getPayload(); + List> batchedHeaders = (List>) batchMsg.getHeaders().get(SolaceBinderHeaders.BATCHED_HEADERS); // <2> + + for (int i = 0; i < batchedPayloads.size(); i++) { + Payload payload = batchedPayloads.get(i); + Map headers = batchedHeaders.get(i); + // Process inidividual message payload and its headers + } + }; +} +---- +<1> A batch of messages is really just a single Spring `Message` whose payload is a list of individual message payloads. +<2> The `solace_scst_batchedHeaders` message header contains the consolidated list of message headers for each of the individual messages in the batch. + +IMPORTANT: Message batches are non-transacted. A batch that this binder creates is fundamentally a collection of individual messages and must not be treated as a single consistent unit. + +[TIP] +==== +If the Spring Cloud Stream framework fails to convert the batch message, consider setting one of the following consumer config options: + +* An explicit https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#_common_binding_properties[`contentType`]. +** e.g. `application/octet-stream` for `byte[]` messages. +* https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#_consumer_properties[`useNativeDecoding=true`] if the message handler is just consuming raw payload types. +** e.g. if PubSub+ delivers a binary message and the consumer message handler accepts `Message>`. +** https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#_consumer_properties[Read the caveats carefully before enabling this feature] + +See https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#content-type-management[Content Type Negotiation] for more info on how Spring Cloud Streams converts payloads and other options to control message conversion. + +See <> for more info regarding this binder's natively supported payload types. +==== + +To create a batch of messages, the binder will consume messages from the PubSub+ broker until either a maximum batch size or timeout has been achieved. After which, the binder will compose the batch message and send it to the consumer handler for processing. Both these batching parameters can be configured using the `batchMaxSize` and `batchTimeout` consumer config options. + == Manual Message Acknowledgment Message handlers can disable auto-acknowledgement and manually invoke the acknowledgement callback as follows: @@ -693,31 +947,25 @@ If asynchronously acknowledging messages, then if these messages aren’t acknow This property can be configured for dynamically created queues by using https://docs.solace.com/Configuring-and-Managing/Configuring-Endpoint-Templates.htm#Configur[queue templates]. However note that as per https://docs.solace.com/PubSub-Basics/Endpoints.htm#Which[our documentation], anonymous consumer group queues (i.e. temporary queues) will not match a queue template’s name filter. Only the queue template defined in the client profile’s "Copy Settings From Queue Template" setting will apply to those. ==== -== Message Target Destination +== Dynamic Producer Destinations Spring Cloud Stream has a reserved message header called `scst_targetDestination` (retrievable via `BinderHeaders.TARGET_DESTINATION`), which allows for messages to be redirected from their bindings' configured destination to the target destination specified by this header. -For this binder's implementation of this header, the target destination defines the _exact_ Solace topic to which a message will be sent. i.e. No post-processing is done for this header (e.g. `prefix` is not applied). - -If you want to apply a destination post-processing step – lets say the `prefix` for example, you will need to directly apply that to the header itself: +For this binder's implementation of this header, the target destination defines the _exact_ Solace topic to which a message will be sent. i.e. No post-processing is done for this header. [source,java] ---- public class MyMessageBuilder { - @Value("${spring.cloud.stream.solace.bindings..producer.prefix}") // <1> - String prefix; - public Message buildMeAMessage() { return MessageBuilder.withPayload("payload") - .setHeader(BinderHeaders.TARGET_DESTINATION, prefix + "new-target-destination") // <2> + .setHeader(BinderHeaders.TARGET_DESTINATION, "some-dynamic-destination") // <1> .build(); } } ---- -<1> Retrieve your binding's configured prefix. -<2> Apply the prefix to the target destination header. +<1> This message will be sent to the `some-dynamic-destination` topic, ignoring the producer's configured destination -Also, this header is cleared by the message's producer before it is sent off to the message broker. So you should attach the target destination to your message payload if you want to get that information on the consumer-side. +NOTE: This header is cleared by the message's producer before it is sent off to the message broker. So you should attach the target destination to your message payload if you want to get that information on the consumer-side. == Failed Consumer Message Error Handling @@ -758,6 +1006,12 @@ Error Queues should not be used with anonymous consumer groups. Since the names of anonymous consumer groups, and in turn the name of their would-be Error Queues, are randomly generated at runtime, it would provide little value to create bindings to these Error Queues because of their unpredictable naming and temporary existence. Also, your environment will be polluted with orphaned Error Queues whenever these consumers rebind. ==== +== Consumer Bindings Pause/Resume + +The Solace binder supports pausing and resuming consumer bindings. See link:https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#binding_visualization_control[Spring Cloud Stream documentation] to learn how to pause and resume consumer bindings. + +NOTE: There is no guarantee that the effect of pausing a binding will be instantaneous: messages already in-flight or being processed by the binder may still be delivered after the call to pause returns. + == Failed Producer Message Error Handling By default, asynchronous producer errors aren't handled by the framework. Producer error channels can be enabled using the link:https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#_producer_properties[`errorChannelEnabled` producer config option]. @@ -794,6 +1048,25 @@ public void send(String payload, long timeout, TimeUnit unit) { } } ---- +== Solace Binder Health Indicator +Solace binders can report health statuses via the https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#_health_indicator[Spring Boot Actuator health endpoint]. To enable this feature, add Spring Boot Actuator to the classpath. To manually disable this feature, set `management.health.binders.enabled=false`. + +[cols="1,3", options="header"] +|=== +| Health Status +| Description + +| UP +| Status indicating that the binder is functioning as expected. + +| RECONNECTING +| Status indicating that the binder is actively trying to reconnect to the message broker. + +This is a custom health status. It isn't included in the health severity order list (`management.endpoint.health.status.order`) and returns the default HTTP status code of `200`. To customize these, see https://docs.spring.io/spring-boot/docs/current/reference/html/actuator.html#actuator.endpoints.health.writing-custom-health-indicators[Writing Custom HealthIndicators]. + +| DOWN +| Status indicating that the binder has suffered an unexpected failure. For instance, the binder may have exhausted all reconnection attempts. User intervention is likely required. +|=== == Resources diff --git a/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/pom.xml b/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/pom.xml index f79f6121..95161570 100644 --- a/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/pom.xml +++ b/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/pom.xml @@ -5,12 +5,12 @@ com.solace.spring.cloud solace-spring-cloud-parent - 2.2.2-SNAPSHOT + 2.3.0-SNAPSHOT ../../solace-spring-cloud-parent/pom.xml spring-cloud-starter-stream-solace - 3.2.2-SNAPSHOT + 3.3.0-SNAPSHOT jar diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/pom.xml b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/pom.xml index d05bd3f7..5cf9bf3c 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/pom.xml +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/pom.xml @@ -5,12 +5,12 @@ com.solace.spring.cloud solace-spring-cloud-parent - 2.2.2-SNAPSHOT + 2.3.0-SNAPSHOT ../../solace-spring-cloud-parent/pom.xml spring-cloud-stream-binder-solace-core - 3.2.2-SNAPSHOT + 3.3.0-SNAPSHOT jar Solace Spring Cloud Stream Binder Core @@ -29,11 +29,6 @@ solace-java-spring-boot-starter - - org.springframework.boot - spring-boot-starter-cloud-connectors - - org.springframework.boot spring-boot @@ -45,15 +40,21 @@ - org.junit.vintage - junit-vintage-engine - 5.7.1 + org.springframework.boot + spring-boot-configuration-processor + true + + + + org.junit-pioneer + junit-pioneer + 1.5.0 test com.solace.test.integration - solace-semp-v2-client + pubsubplus-junit-jupiter ${solace.integration.test.support.version} test @@ -91,17 +92,12 @@ - - - it - - - - org.apache.maven.plugins - maven-failsafe-plugin - - - - - + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/inbound/BasicInboundXMLMessageListener.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/inbound/BasicInboundXMLMessageListener.java index 9ae25db1..a5a9c609 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/inbound/BasicInboundXMLMessageListener.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/inbound/BasicInboundXMLMessageListener.java @@ -1,9 +1,9 @@ package com.solace.spring.cloud.stream.binder.inbound; +import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties; import com.solace.spring.cloud.stream.binder.util.FlowReceiverContainer; -import com.solace.spring.cloud.stream.binder.util.JCSMPAcknowledgementCallbackFactory; +import com.solace.spring.cloud.stream.binder.inbound.acknowledge.JCSMPAcknowledgementCallbackFactory; import com.solace.spring.cloud.stream.binder.util.SolaceAcknowledgmentException; -import com.solacesystems.jcsmp.BytesXMLMessage; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.cloud.stream.provisioning.ConsumerDestination; @@ -16,6 +16,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Supplier; public class BasicInboundXMLMessageListener extends InboundXMLMessageListener { private final BiFunction, RuntimeException, Boolean> errorHandlerFunction; @@ -23,35 +24,41 @@ public class BasicInboundXMLMessageListener extends InboundXMLMessageListener { private static final Log logger = LogFactory.getLog(BasicInboundXMLMessageListener.class); BasicInboundXMLMessageListener(FlowReceiverContainer flowReceiverContainer, - ConsumerDestination consumerDestination, - Consumer> messageConsumer, - JCSMPAcknowledgementCallbackFactory ackCallbackFactory, - BiFunction, RuntimeException, Boolean> errorHandlerFunction, - @Nullable AtomicBoolean remoteStopFlag, - ThreadLocal attributesHolder, - boolean needHolderAndAttributes) { - super(flowReceiverContainer, consumerDestination, messageConsumer, ackCallbackFactory, remoteStopFlag, - attributesHolder, needHolderAndAttributes, needHolderAndAttributes); + ConsumerDestination consumerDestination, + SolaceConsumerProperties consumerProperties, + @Nullable BatchCollector batchCollector, + Consumer> messageConsumer, + JCSMPAcknowledgementCallbackFactory ackCallbackFactory, + BiFunction, RuntimeException, Boolean> errorHandlerFunction, + @Nullable AtomicBoolean remoteStopFlag, + ThreadLocal attributesHolder, + boolean needHolderAndAttributes) { + super(flowReceiverContainer, consumerDestination, consumerProperties, batchCollector, messageConsumer, + ackCallbackFactory, remoteStopFlag, attributesHolder, needHolderAndAttributes, needHolderAndAttributes); this.errorHandlerFunction = errorHandlerFunction; } - void handleMessage(BytesXMLMessage bytesXMLMessage, AcknowledgmentCallback acknowledgmentCallback) throws SolaceAcknowledgmentException { + @Override + void handleMessage(Supplier> messageSupplier, Consumer> sendToConsumerHandler, + AcknowledgmentCallback acknowledgmentCallback, boolean isBatched) + throws SolaceAcknowledgmentException { Message message; try { - message = createMessage(bytesXMLMessage, acknowledgmentCallback); + message = messageSupplier.get(); } catch (RuntimeException e) { boolean processedByErrorHandler = errorHandlerFunction != null && errorHandlerFunction.apply(null, e); if (processedByErrorHandler) { AckUtils.autoAck(acknowledgmentCallback); } else { - logger.warn(String.format("Failed to map XMLMessage %s to a Spring Message and no error channel " + - "was configured. Message will be rejected.", bytesXMLMessage.getMessageId()), e); + logger.warn(String.format("Failed to map %s to a Spring Message and no error channel " + + "was configured. Message will be rejected.", isBatched ? "a batch of XMLMessages" : + "an XMLMessage"), e); AckUtils.reject(acknowledgmentCallback); } return; } - sendToConsumer(message, bytesXMLMessage); + sendToConsumerHandler.accept(message); AckUtils.autoAck(acknowledgmentCallback); } } diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/inbound/BatchCollector.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/inbound/BatchCollector.java new file mode 100644 index 00000000..d5be8a18 --- /dev/null +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/inbound/BatchCollector.java @@ -0,0 +1,131 @@ +package com.solace.spring.cloud.stream.binder.inbound; + +import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties; +import com.solace.spring.cloud.stream.binder.util.MessageContainer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +/** + * Collector which batches message. + * Message batches can be retrieved from this collector only when batching requirements have been met. + */ +public class BatchCollector { + private final SolaceConsumerProperties consumerProperties; + private final List batchedMessages; + private long timeSentLastBatch = System.currentTimeMillis(); + private UUID currentFlowReceiverReferenceId; + + private static final Log logger = LogFactory.getLog(BatchCollector.class); + + public BatchCollector(SolaceConsumerProperties consumerProperties) { + this.consumerProperties = consumerProperties; + this.batchedMessages = new ArrayList<>(consumerProperties.getBatchMaxSize()); + } + + /** + * Add message to batch + * @param messageContainer message container + */ + public void addToBatch(MessageContainer messageContainer) { + if (messageContainer == null) { + return; + } + + batchedMessages.add(messageContainer); + UUID flowReceiverReferenceId = messageContainer.getFlowReceiverReferenceId(); + if (currentFlowReceiverReferenceId != null && !currentFlowReceiverReferenceId.equals(flowReceiverReferenceId)) { + if (logger.isTraceEnabled()) { + logger.trace(String.format("Added a message to batch, but its flow receiver reference ID was %s, " + + "expected %s. Pruning stale messages from batch...", + flowReceiverReferenceId, currentFlowReceiverReferenceId)); + } + pruneStaleMessages(); + } + currentFlowReceiverReferenceId = flowReceiverReferenceId; + } + + /** + * Checks if batch is eligible to be collected. + * @return true if available (batch may be empty). + */ + public boolean isBatchAvailable() { + return isBatchAvailableInternal() && (!pruneStaleMessages() || isBatchAvailableInternal()); + } + + public boolean isBatchAvailableInternal() { + if (batchedMessages.size() < consumerProperties.getBatchMaxSize()) { + long batchTimeDiff = System.currentTimeMillis() - timeSentLastBatch; + if (consumerProperties.getBatchTimeout() == 0 || batchTimeDiff < consumerProperties.getBatchTimeout()) { + if (logger.isTraceEnabled()) { + logger.trace(String.format("Collecting batch... Size: %s, Time since last batch: %s ms", + batchedMessages.size(), batchTimeDiff)); + } + return false; + } else if (logger.isTraceEnabled()) { + logger.trace(String.format( + "Batch timeout reached