Skip to content

Commit

Permalink
Add receiverQueueSize to pulsar (#3589)
Browse files Browse the repository at this point in the history
Signed-off-by: yaron2 <[email protected]>
  • Loading branch information
yaron2 authored Nov 4, 2024
1 parent 8f5b880 commit b969bbf
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 1 deletion.
1 change: 1 addition & 0 deletions pubsub/pulsar/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type pulsarMetadata struct {
PrivateKey string `mapstructure:"privateKey"`
Keys string `mapstructure:"keys"`
MaxConcurrentHandlers uint `mapstructure:"maxConcurrentHandlers"`
ReceiverQueueSize int `mapstructure:"receiverQueueSize"`

Token string `mapstructure:"token"`
oauth2.ClientCredentialsMetadata `mapstructure:",squash"`
Expand Down
9 changes: 8 additions & 1 deletion pubsub/pulsar/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,11 @@ metadata:
description: |
Sets the maximum number of concurrent messages sent to the application. Default is 100.
default: '"100"'
example: '"100"'
example: '"100"'
- name: receiverQueueSize
type: number
description: |
Sets the size of the consumer receive queue.
Controls how many messages can be accumulated by the consumer before it is explicitly called to read messages by Dapr.
default: '"1000"'
example: '"1000"'
4 changes: 4 additions & 0 deletions pubsub/pulsar/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ const (
defaultRedeliveryDelay = 30 * time.Second
// defaultConcurrency controls the number of concurrent messages sent to the app.
defaultConcurrency = 100
// defaultReceiverQueueSize controls the number of messages the pulsar sdk pulls before dapr explicitly consumes the messages.
defaultReceiverQueueSize = 1000

subscribeTypeKey = "subscribeType"

Expand Down Expand Up @@ -125,6 +127,7 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
BatchingMaxSize: defaultMaxBatchSize,
RedeliveryDelay: defaultRedeliveryDelay,
MaxConcurrentHandlers: defaultConcurrency,
ReceiverQueueSize: defaultReceiverQueueSize,
}

if err := kitmd.DecodeMetadata(meta.Properties, &m); err != nil {
Expand Down Expand Up @@ -403,6 +406,7 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
Type: getSubscribeType(req.Metadata),
MessageChannel: channel,
NackRedeliveryDelay: p.metadata.RedeliveryDelay,
ReceiverQueueSize: p.metadata.ReceiverQueueSize,
}

if p.useConsumerEncryption() {
Expand Down

0 comments on commit b969bbf

Please sign in to comment.