-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade azure-eventhub to the new Event Hub SDK #39796
Conversation
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
Pinging @elastic/obs-ds-hosted-services (Team:obs-ds-hosted-services) |
This pull request doesn't have a |
88abdd1
to
9afb38a
Compare
@@ -93,58 +99,6 @@ func TestProcessEvents(t *testing.T) { | |||
assert.Equal(t, message, single) | |||
} | |||
|
|||
func TestParseMultipleRecords(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why you removed those? Because you think that v1 is not going to be used anymore?
And you have the decoder_test for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the message parsing to the decoder to share it with the v1 and v2 processors. The decoder has its own tests based on the original v1 processor tests.
This pull request is now in conflicts. Could you fix it? 🙏
|
consumerGroup string) error { | ||
|
||
// v2 checkpoint information path | ||
// mbranca-general.servicebus.windows.net/sdh4552/$Default/checkpoint/0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uhm, I need to rephrase this comment to make it more useful.
|
||
size, err := cln.DownloadBuffer(ctx, buff[:], nil) | ||
if err != nil { | ||
return fmt.Errorf("failed to download checkpoint v1 information for partition %s: %w", partitionID, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this error be printed? I think you have to introduce a log.Error here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the migrationAssistant
fails to migrate a partition, the run()
function should print all the wrapped errors.
I will add a test to double-check it happens.
|
||
offset, err := strconv.ParseInt(checkpointV1.Checkpoint.Offset, 10, 64) | ||
if err != nil { | ||
return fmt.Errorf("failed to parse offset: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll test this as well.
Alongside the partition ID, users can optional send event with a partition key. Add an (optional) partition key to the event hub metadata.
The new migrate_checkpoint config option controls if the input v2 should perform a migration check on start. If migrate_checkpoint is true, the input checks and performs the migration (if v1 info exists) on the very first v2 run. If migrate_checkpoint is false, the input will skip the migration assistant and will not perform any checks or migration.
Expand processor options by adding a new `start_position` configuration. Possible values for `start_position` are: - "earliest" to start from the beginning of the event hub retention period. - "latest" to start from new events. The input uses the 'start_position' option when checkpoint information from the storage account container is unavailable (on the input's first start).
Makes the receive configuration settings available for customization on the input settings. The current default values (receive_timeout: 5s, receive_count: 100) are probably fine, but it is better to make these options available to users.
Co-authored-by: Andrew Gizas <[email protected]> Co-authored-by: subham sarkar <[email protected]>
Also update the option description as well adding the default value.
It's better to check the private data in the event has the expected type.
52b3fb8
to
feb52b7
Compare
Adding more details to the message logged on successful store: - sequence_number - offset - enqueued_time
The teardown() function is responsible to release all the resources allocated in the setup() function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Echoing Fae's comment:
Great work and I'm happy to see this quality of code going in :-)
I agree. The code is well-structured and idiomatically written, accompanied by clear and informative comments. LGTM — approving!
Restructure the `azure-eventhub` input, rebranding the current version as processor v1. Add a brand new processor v2, allowing users to select which version to use in the config: - processor v1: uses the [legacy](https://github.com/azure/azure-event-hubs-go) Event Hub SDK (default processor, at least for 8.15) - processor v2: uses the [modern](https://github.com/azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/) Event Hub SDK Why are we introducing a processor v2? - processor v1 uses deprecated libraries - [github.com/Azure/azure-event-hubs-go](http://github.com/Azure/azure-event-hubs-go) (legacy) - [github.com/Azure/azure-storage-blob-go](http://github.com/Azure/azure-storage-blob-go) (legacy, [retiring](https://azure.microsoft.com/en-gb/updates/retirement-notice-the-legacy-azure-storage-go-client-libraries-will-be-retired-on-13-september-2024/) on Sep 2024) - processor v1 does not support publishing acks (mostly due to lack of hooks; the legacy SDK is a black box) --------- Co-authored-by: Tiago Queiroz <[email protected]> Co-authored-by: Andrew Gizas <[email protected]> Co-authored-by: subham sarkar <[email protected]> (cherry picked from commit b95a8a0) # Conflicts: # go.mod # go.sum
…DK (#40455) * Upgrade azure-eventhub to the new Event Hub SDK (#39796) Restructure the `azure-eventhub` input, rebranding the current version as processor v1. Add a brand new processor v2, allowing users to select which version to use in the config: - processor v1: uses the [legacy](https://github.com/azure/azure-event-hubs-go) Event Hub SDK (default processor, at least for 8.15) - processor v2: uses the [modern](https://github.com/azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/) Event Hub SDK Why are we introducing a processor v2? - processor v1 uses deprecated libraries - [github.com/Azure/azure-event-hubs-go](http://github.com/Azure/azure-event-hubs-go) (legacy) - [github.com/Azure/azure-storage-blob-go](http://github.com/Azure/azure-storage-blob-go) (legacy, [retiring](https://azure.microsoft.com/en-gb/updates/retirement-notice-the-legacy-azure-storage-go-client-libraries-will-be-retired-on-13-september-2024/) on Sep 2024) - processor v1 does not support publishing acks (mostly due to lack of hooks; the legacy SDK is a black box) --------- Co-authored-by: Tiago Queiroz <[email protected]> Co-authored-by: Andrew Gizas <[email protected]> Co-authored-by: subham sarkar <[email protected]>
Proposed commit message
Restructure the
azure-eventhub
input, rebranding the current version as processor v1. Add a brand new processor v2, allowing users to select which version to use in the config:Why are we introducing a processor v2?
Notes for reviewers
Overview
To help with the review, here is an overview of the main flow of the processor v2-based input.
New features
Replace the legacy SDK with the new modern and supported SDK
The new SDK is more flexible and allows us to implement new features and configuration options.
Add support for publishing ACKs
Now, the processor v2 updates the sequence number only when the events have been successfully delivered to Elasticsearch.
Add a migration assistant to migrate checkpoint v1 information to the v2 format
On the first start of the processor v2, the migration assistant (enabled by default) checks if checkpoint v1 information exists from processor v1 and migrates them to the v2 format.
See "Scenario 001: Migration" at
x-pack/filebeat/input/azureeventhub/README.md
for more details.New configuration options
There are new configuration options for v2:
storage_account_connection_string
(required) to authenticate with the storage account container.migrate_checkpoint
(optional, default:yes
) controls if the processor v2 should check and migrate checkpoint v1 information on start.processor_version
(optional, default:v1
) which processor version to use.processor_update_interval
(optional, default:10s
) time interval between checking if new partitions are available.processor_start_position
(optional, default:earliest
) controls if the processor should start from the beginning earliest or the latest event in the event hub retention period.partition_receive_timeout
(optional, default:5s
)partition_receive_count
(optional, default:100
)Checklist
CHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Disruptive User Impact
How to test this PR locally
See "Test Scenarios" section in the
x-pack/filebeat/input/azureeventhub/README.md
file.Related issues
Use cases
Screenshots
Logs
Author's Checklist