Skip to content

Commit

Permalink
Features files updated
Browse files Browse the repository at this point in the history
  • Loading branch information
Jakub Drobena committed Nov 15, 2023
1 parent 4696960 commit 5e68bb3
Show file tree
Hide file tree
Showing 5 changed files with 417 additions and 0 deletions.
46 changes: 46 additions & 0 deletions parquet-factory/indexes.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
@parquet_service

Feature: Ability to set the indexes in the generated tables correctly

Background: Initial state is ready
Given the system is in default state
And Kafka broker is available
And Kafka topic "incoming_features_topic" is empty and has 2 partitions
And Kafka topic "incoming_rules_topic" is empty and has 2 partitions
And S3 endpoint is set
And S3 port is set
And S3 access key is set
And S3 secret access key is set
And S3 bucket name is set to test
And S3 connection is established
And The S3 bucket is empty

Scenario: If Parquet file already exists, the index of the new one should be 1
When I fill the topics with messages of the previous hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | 33333333-3333-3333-3333-333333333333 |
| incoming_rules_topic | 1 | rules message | 44444444-4444-4444-4444-444444444444 |
And I fill the topics with messages of the current hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | 77777777-7777-7777-7777-777777777777 |
| incoming_rules_topic | 1 | rules message | 88888888-8888-8888-8888-888888888888 |
When I run Parquet Factory with a timeout of "10" seconds
Then Parquet Factory should have finish
And I should see following objects generated in S3
| File name |
| fleet_aggregations/cluster_info/hourly/date=2016-02-02/hour=05/cluster_info-0.parquet |
# Re run and check that the index is 1. It is needed to empty the topics so that PF doesn't find the previous messages from current hour
Given Kafka topic "incoming_rules_topic" is empty and has 2 partitions
When I fill the topics with messages of the previous hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb |
| incoming_rules_topic | 1 | rules message | cccccccc-cccc-cccc-cccc-cccccccccccc |
And I fill the topics with messages of the current hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | ffffffff-ffff-ffff-ffff-ffffffffffff |
| incoming_rules_topic | 1 | rules message | 00000000-0000-0000-0000-000000000000 |
When I run Parquet Factory with a timeout of "10" seconds
Then Parquet Factory should have finish
And I should see following objects generated in S3
| File name |
| fleet_aggregations/cluster_info/hourly/date=2016-02-02/hour=05/cluster_info-1.parquet |
146 changes: 146 additions & 0 deletions parquet-factory/kafka_messages.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
@parquet_service

Feature: Ability to process the Kafka messages correctly

Background: Initial state is ready
Given the system is in default state
And Kafka broker is available
And Kafka topic "incoming_rules_topic" is empty and has 2 partitions
And S3 endpoint is set
And S3 port is set
And S3 access key is set
And S3 secret access key is set
And S3 bucket name is set to test
And S3 connection is established
And The S3 bucket is empty

Scenario: Parquet Factory should fail if it cannot read from Kafka
When I set the environment variable "PARQUET_FACTORY__KAFKA_RULES__ADDRESS" to "non-existent-url"
And I run Parquet Factory with a timeout of "10" seconds
Then Parquet Factory should have finish
And The logs should contain "Unable to create the Kafka consumer"
And The S3 bucket is empty

Scenario: Parquet Factory shouldn't finish if only messages from the previous hour arrived
When I fill the topics with messages of the previous hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | 33333333-3333-3333-3333-333333333333 |
| incoming_rules_topic | 1 | rules message | 44444444-4444-4444-4444-444444444444 |
When I set the environment variable "PARQUET_FACTORY__KAFKA_RULES__CONSUMER_TIMEOUT" to "20"
And I run Parquet Factory with a timeout of "10" seconds
Then Parquet Factory shouldn't have finish
And The logs should contain
| topic | partition | offset | message |
| incoming_rules_topic | 0 | 0 | message processed |
| incoming_rules_topic | 1 | 0 | message processed |
And The logs shouldn't contain
| topic | partition | offset | message |
| incoming_rules_topic | 0 | 1 | FINISH |
| incoming_rules_topic | 0 | 1 | FINISH |
And The S3 bucket is empty

Scenario: Parquet Factory shouldn't finish if not all the topics and partitions are filled with current hour messages
When I fill the topics with messages of the previous hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | 33333333-3333-3333-3333-333333333333 |
| incoming_rules_topic | 1 | rules message | 44444444-4444-4444-4444-444444444444 |
And I fill the topics with messages of the current hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | 77777777-7777-7777-7777-777777777777 |
| incoming_rules_topic | 1 | rules message | 88888888-8888-8888-8888-888888888888 |
When I run Parquet Factory with a timeout of "10" seconds
Then Parquet Factory shouldn't have finish
And The logs should contain
| topic | partition | offset | message |
| incoming_rules_topic | 0 | 0 | message processed |
| incoming_rules_topic | 1 | 0 | message processed |
| incoming_rules_topic | 0 | 1 | FINISH |
| incoming_rules_topic | 0 | 1 | FINISH |
And The S3 bucket is empty

Scenario: Parquet Factory should finish if all the topics and partitions are filled with current hour messages
When I fill the topics with messages of the previous hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | 33333333-3333-3333-3333-333333333333 |
| incoming_rules_topic | 1 | rules message | 44444444-4444-4444-4444-444444444444 |
And I fill the topics with messages of the current hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | 77777777-7777-7777-7777-777777777777 |
| incoming_rules_topic | 1 | rules message | 88888888-8888-8888-8888-888888888888 |
When I run Parquet Factory with a timeout of "10" seconds
Then Parquet Factory should have finish
And The logs should contain
| topic | partition | offset | message |
| incoming_rules_topic | 0 | 0 | message processed |
| incoming_rules_topic | 1 | 0 | message processed |
| incoming_rules_topic | 0 | 1 | FINISH |
| incoming_rules_topic | 0 | 1 | FINISH |
And The S3 bucket is not empty

Scenario: After aggregating messages from previous hour, the first messages from current hour has to be processed first
When I fill the topics with messages of the previous hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | 33333333-3333-3333-3333-333333333333 |
| incoming_rules_topic | 1 | rules message | 44444444-4444-4444-4444-444444444444 |
And I fill the topics with messages of the current hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | 77777777-7777-7777-7777-777777777777 |
| incoming_rules_topic | 1 | rules message | 88888888-8888-8888-8888-888888888888 |
When I run Parquet Factory with a timeout of "10" seconds
Then Parquet Factory should have finish
And The logs should contain
| topic | partition | offset | message |
| incoming_rules_topic | 0 | 0 | message processed |
| incoming_rules_topic | 1 | 0 | message processed |
| incoming_rules_topic | 0 | 1 | FINISH |
| incoming_rules_topic | 0 | 1 | FINISH |
Then The S3 bucket is not empty
When I run Parquet Factory with a timeout of "10" seconds
Then Parquet Factory should have finish
And The logs should contain
| topic | partition | offset | message |
| incoming_rules_topic | 0 | 1 | FINISH |
| incoming_rules_topic | 1 | 1 | FINISH |

Scenario: Parquet Factory should finish if the limit of kafka messages is exceeded even if no messages from current hour arrived
When I fill the topics with messages of the previous hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | 33333333-3333-3333-3333-333333333333 |
| incoming_rules_topic | 1 | rules message | 44444444-4444-4444-4444-444444444444 |
| incoming_rules_topic | 0 | rules message | 77777777-7777-7777-7777-777777777777 |
| incoming_rules_topic | 1 | rules message | 88888888-8888-8888-8888-888888888888 |
| incoming_rules_topic | 0 | rules message | bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb |
| incoming_rules_topic | 1 | rules message | cccccccc-cccc-cccc-cccc-cccccccccccc |
And I set the environment variable "PARQUET_FACTORY__KAFKA_RULES__MAX_CONSUMED_RECORDS" to "1"
And I run Parquet Factory with a timeout of "10" seconds
Then Parquet Factory should have finish
Then The S3 bucket is not empty

Scenario: Parquet Factory should not commit the messages from current hour if there are no prior messages
When I fill the topics with messages of the current hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | 33333333-3333-3333-3333-333333333333 |
| incoming_rules_topic | 1 | rules message | 44444444-4444-4444-4444-444444444444 |
When I run Parquet Factory with a timeout of "10" seconds
Then Parquet Factory should have finish
And The logs should contain
| topic | partition | offset | message |
| incoming_rules_topic | 0 | 0 | FINISH |
| incoming_rules_topic | 0 | 0 | FINISH |
Then The S3 bucket is empty
# Rerun it to check that it starts with the same messages
When I run Parquet Factory with a timeout of "10" seconds
Then Parquet Factory should have finish
And The logs should contain
| topic | partition | offset | message |
| incoming_rules_topic | 0 | 0 | FINISH |
| incoming_rules_topic | 0 | 0 | FINISH |
Then The S3 bucket is empty

Scenario: Parquet Factory shouldn't send duplicate rows
When I fill the topics with messages of the previous hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | 33333333-3333-3333-3333-333333333333 |
| incoming_rules_topic | 1 | rules message | 44444444-4444-4444-4444-444444444444 |
When I run Parquet Factory with a timeout of "10" seconds
Then The logs should contain "factory was about to duplicate a row, skipping"
92 changes: 92 additions & 0 deletions parquet-factory/metrics.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
@parquet_service

Feature: Ability to send metrics correctly

Background: Initial state is ready
Given the system is in default state
And Kafka broker is available
And Kafka topic "incoming_rules_topic" is empty and has 2 partitions
And S3 endpoint is set
And S3 port is set
And S3 access key is set
And S3 secret access key is set
And S3 bucket name is set to test
And S3 connection is established
And The S3 bucket is empty
And Pushgateway in "pushgateway:9091" is empty of metrics

Scenario: If the Pushgateway is not accessible, Parquet Factory should run successfully
When I fill the topics with messages of the current hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | 33333333-3333-3333-3333-333333333333 |
| incoming_rules_topic | 1 | rules message | 44444444-4444-4444-4444-444444444444 |
And I set the environment variable "PARQUET_FACTORY__METRICS__GATEWAY_URL" to "non-existent-url"
And I run Parquet Factory with a timeout of "10" seconds
Then Parquet Factory should have finish
And The logs should contain "No files needed to be written"
And The logs should contain "Cannot push metrics"

Scenario: If the Pushgateway is accessible, Parquet Factory should run successfully and send the metrics to the Pushgateway
When I fill the topics with messages of the current hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | 33333333-3333-3333-3333-333333333333 |
| incoming_rules_topic | 1 | rules message | 44444444-4444-4444-4444-444444444444 |
And I set the environment variable "PARQUET_FACTORY__METRICS__GATEWAY_URL" to "pushgateway:9091"
And I run Parquet Factory with a timeout of "10" seconds
And I store the metrics from "pushgateway:9091"
Then Parquet Factory should have finish
And The logs should contain "No files needed to be written"
And The logs should contain "Metrics pushed successfully."
# Offset marked is 4 because the offset -2 is always marked
And Metrics are
| metric | operation | value | label | label_value |
| error_count | equal to | 0 | | |
| state | equal to | 0 | | |
| offset_consummed | equal to | 0 | | |
| offset_marked | equal to | 2 | | |
| offset_processed | equal to | 0 | | |
And Metric "inserted_rows" is not registered
And Metric "files_generated" is not registered

Scenario: If the Pushgateway is accessible and I run Parquet Factory with messages from the previous hour, the "files_generated" and "inserted_rows" metrics should be 1 for all the tables
When I fill the topics with messages of the previous hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | 33333333-3333-3333-3333-333333333333 |
| incoming_rules_topic | 1 | rules message | 44444444-4444-4444-4444-444444444444 |
And I fill the topics with messages of the current hour
| topic | partition | type | cluster |
| incoming_rules_topic | 0 | rules message | 33333333-3333-3333-3333-333333333333 |
| incoming_rules_topic | 1 | rules message | 44444444-4444-4444-4444-444444444444 |
And I set the environment variable "PARQUET_FACTORY__METRICS__GATEWAY_URL" to "pushgateway:9091"
And I run Parquet Factory with a timeout of "10" seconds
And I store the metrics from "pushgateway:9091"
Then Parquet Factory should have finish
And The logs should contain "\"rule_hits-0\" table was generated"
And The logs should contain "Metrics pushed successfully."
And Metrics are
| metric | operation | value | label | label_value |
| error_count | equal to | 0 | | |
| state | equal to | 0 | | |
| offset_consummed | equal to | 2 | | |
| offset_marked | equal to | 2 | | |
| offset_processed | equal to | 2 | | |
| inserted_rows | greater than | 1 | table | rule_hits |
| files_generated | equal to | 1 | table | rule_hits |

Scenario: If the Pushgateway is accessible and Parquet Factory errors, the "error_count" metric should increase
When I set the environment variable "PARQUET_FACTORY__KAFKA_RULES__ADDRESS" to "non-existent-url"
And I set the environment variable "PARQUET_FACTORY__METRICS__GATEWAY_URL" to "pushgateway:9091"
And I run Parquet Factory with a timeout of "10" seconds
And I store the metrics from "pushgateway:9091"
Then Parquet Factory should have finish
And The logs should contain "Unable to create the Kafka consumer"
And The logs should contain "Metrics pushed successfully."
And Metrics are
| metric | operation | value | label | label_value |
| error_count | equal to | 1 | | |
| state | equal to | 0 | | |
| offset_consummed | equal to | 0 | | |
| offset_marked | equal to | 0 | | |
| offset_processed | equal to | 0 | | |
And Metric "inserted_rows" is not registered
And Metric "files_generated" is not registered
Loading

0 comments on commit 5e68bb3

Please sign in to comment.