ServiceBus Playground
Azure Service Bus Queue/Topic does not ensure FIFO delivery. Out-of-order message may occur in various situations, for example:
- Issue with application: message isn't sent in expected order due to transient error or intermittent issue within application
- Issue with Azure: delayed delivery of messages due to unexpected incident within Azure, or inconsistent state in Service Bus due to failure on a single subsystem
- Issue with network
However, application side message sequencing is usually desired, the first step of which being consumer level sequence enforcement. In Ordering Messages in Azure Service Bus, Herald Gjura outlined a solution that ensures this delivery behavior. Messages are delivered through sessions, within which out-of-order messages are deferred until all prior messages defined by particular sequence have been processed successfully.
The three main parts of this solution:
- Session enabled composition subscription
- Sequential session execution using the Message deferral feature
- Sequence configurations
(Publishers) <SessionName, SequenceName, SequenceNumber>
|
. |. . . . . . . . . . . . .
. |--> (Topic1) --> (Sub1) --> .
. | --> (Sub2_Fwd) ----. .
. | | SqlFilter
. |--> (Topic2) --> (Sub2_Fwd) ----| EXISTS(user.SessionName) AND EXISTS(user.TransactionId)
. | .
. .-----------------------------------| .
. | .
. | Action .
. | SET sys.SessionId = SessionName + TransactionId + InstanceId
. |-> (Topic3) --> (SessionEnabled_Sub1) --> (Subscriber(s))
. . |
. . . . . . . . . . . . . . | <SessionName, SequenceName, ActivityInfo>
Service Bus Instance |-----------------------..
| |
Processor1 Processor2
- Publishing stays mostly unchanged with minor addition
- session configuration: <
SessionName
,SequenceName
,SequenceNumber
>
- session configuration: <
- Existing subscriptions: no change, or can be disabled if desired
- Forward subscriptions: forward messages target the original subscriptions to new session enabled entity
SessionId
:{SessionName}_{TransactionId}_{InstanceId}
- session enabled entity: receives message, make a copy, take actions, dispatch to destination subscription
- Subscriber process messages and close session once completed
- <
SessionName
,SequenceName
,ActivityInfo
>
- <
(Subscriber)
[Message Received]
|
[Read session state]
| No
<Is the next in sequence> --> [Update session state] --> [Defer message]-.
| Yes |
[Process message] |
| No |
<Success> --> [DLQ] --> ------------------------------------------------|
| Yes |
[Update session state] |
| Yes |
<Is the last in sequence> --> [Update session state] --> [Close session]-|
No | No |
[End processing] <-- <If the next found deferred> <-------------------------------------------|
^ | Yes |
[Close session] [Read and process the next] |
| | No |
[Update session state] <Success> --> [DLQ] --> [Update session state] --------------------|
| Yes | Yes No ^
|-------------- <-- <Is the last in sequence> --> --------|
- Session over various entities: a common use case where a sequential execution is participated by messages from different entities
SessionId
:{SessionName}_{TransactionId}_{InstanceId}
SessionName
: name of the session where message should be executed in a predefined sequenceTransactionId
: uniquely identifies a transactionInstanceId
: uniquely identifies an execution instance
- state maintaining solutions
- forward messages from original entities to a dedicated entity where session state can be maintained and enforced within ASB. For example:
SessionNameTopic/PartitionNameSubscription
SessionId
: this may be constructed with ASB (action), Azure Functions, or a self-hosted application- adding at least two more operations per message: forwarding, metadata action
- use centralized database to maintain session state.
- PrimaryKeyValue/Id:
SessionId
- PrimaryKeyValue/Id:
- forward messages from original entities to a dedicated entity where session state can be maintained and enforced within ASB. For example:
- Scaling strategy and throughput baseline: a throughput baseline should be established
- { consumerReplica,
MaxConcurrentSessions
, compositionRate } x { processingThroughput, exceptionRate, ASB pressure, consumer pressure } - consumerReplica: scale out -> consumer (down), ASB (probably up)
MaxConcurrentSessions
: scale out (probably also up) -> consumer (up), ASB (probably up)- process sessions in parallel: maintain sequence for each session while maximize throughput for each client
- bottleneck on the composite subscription if composition rate is high
- limit it to a reasonable number
- { consumerReplica,
- Graceful start up and shutdown: resource to be initialized and disposed properly
- session closure: session completion, session exception, session termination
- client closure: dispose consumer clients when shut down
- Limitations
- maximum sessions: no official information (probably restricted by storage)
- maximum forwards: 4
- session state size: one message equivalent = 1MB (Premium)
- Staled session
- never ending session sequence: incorrect configuration
- bad transaction: manual termination
- session error on closure: false positive - manual termination
- DLQs/ErrorLogs
- source subscription (publisher failure)
- when destination is disabled/reached quota: evaluate possibility, if happens manual intervention is required
- destination subscription (subscription failure)
- message needs to be reprocessed
- source subscription (publisher failure)
- Cost
- extra operations for each session enabled message: forward (1) + action (1)
- extra space for session state
- Composition rate: this is due to the limitation of native forward behavior lacking of partitioning.
- Use dedicated application/functions for finer control over forwarding, i.e. adding partitioning to forwarding logic so that more subscriptions can be used (dedicated topic, even ASB). These subscriptions are equivalent functionality-wise.
- Manual intervention to handle staled sessions
- Session termination: retrieve message from the session in question and manually close it.