Skip to content

Commit

Permalink
fix(v4): avoid spawning goroutine on publish audit log (#83)
Browse files Browse the repository at this point in the history
  • Loading branch information
medreza authored Sep 25, 2024
1 parent 39c93a0 commit d9f6213
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 40 deletions.
17 changes: 5 additions & 12 deletions eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,18 +487,16 @@ type AuditLogBuilder struct {
content map[string]interface{} `description:"optional"`
diff *AuditLogDiff `description:"optional, if diff is not nil, please make sure diff.Before and diff.Before are both not nil"`

key string
errorCallback PublishErrorCallbackFunc
ctx context.Context
version int
key string
ctx context.Context
version int
}

// NewAuditLogBuilder create new AuditLogBuilder instance
func NewAuditLogBuilder() *AuditLogBuilder {
return &AuditLogBuilder{
version: defaultVersion,
ctx: context.Background(),
errorCallback: nil,
version: defaultVersion,
ctx: context.Background(),
}
}

Expand Down Expand Up @@ -588,11 +586,6 @@ func (auditLogBuilder *AuditLogBuilder) Diff(diff *AuditLogDiff) *AuditLogBuilde
return auditLogBuilder
}

func (auditLogBuilder *AuditLogBuilder) ErrorCallback(errCallback PublishErrorCallbackFunc) *AuditLogBuilder {
auditLogBuilder.errorCallback = errCallback
return auditLogBuilder
}

func (auditLogBuilder *AuditLogBuilder) Key(key string) *AuditLogBuilder {
auditLogBuilder.key = key
return auditLogBuilder
Expand Down
38 changes: 10 additions & 28 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,38 +452,20 @@ func (client *KafkaClient) PublishAuditLog(auditLogBuilder *AuditLogBuilder) err
if err != nil {
return err
}
return client.publishAndRetryFailure(context.Background(), topic, "", message, auditLogBuilder.errorCallback)
}
return nil
}

// publishAndRetryFailure will publish message to kafka, if it fails, will retry at most 3 times.
// If the message finally failed to publish, will call the error callback function to process this failure.
func (client *KafkaClient) publishAndRetryFailure(context context.Context, topic, eventName string, message *kafka.Message, failureCallback PublishErrorCallbackFunc) error {

config := client.configMap
topic = constructTopic(client.prefix, topic)

go func() {
err := backoff.RetryNotify(func() error {
return client.publishEvent(topic, eventName, config, message, true)
}, backoff.WithMaxRetries(newPublishBackoff(), maxBackOffCount),
func(err error, _ time.Duration) {
logrus.WithField("topic", topic).
Warn("retrying publish message: ", err)
})
config := client.configMap
topic = constructTopic(client.prefix, topic)
err = client.publishEvent(topic, "", config, message, false)
if err != nil {
logrus.WithField("topic", topic).
Error("retrying publish message failed: ", err)

if failureCallback != nil {
failureCallback(message.Value, err)
}
return
logrus.
WithField("Topic Name", topic).
Error("unable to publish event: ", err)
return nil
}

logrus.WithField("topic", topic).
Debug("successfully publish message")
}()
Debug("successfully publish audit log message")
}

return nil
}
Expand Down

0 comments on commit d9f6213

Please sign in to comment.