Skip to content

Commit

Permalink
BDD test added for compression in sha extractro
Browse files Browse the repository at this point in the history
  • Loading branch information
Jakub Drobena committed Feb 13, 2024
1 parent b95f541 commit d0dc8ef
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 7 deletions.
72 changes: 72 additions & 0 deletions config/insights_sha_extractor_compressed.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
plugins:
packages:
- insights.specs.default
- pythonjsonlogger
- pythonjsonlogger.jsonlogger
configs:
- name: ccx_ocp_core.config.telemeter.TelemeterServiceConfig
enabled: false
service:
extract_timeout:
extract_tmp_dir:
format: insights.formats._json.JsonFormat
target_components: []
consumer:
name: ccx_messaging.consumers.kafka_consumer.KafkaConsumer
kwargs:
incoming_topic: platform.upload.announce
dead_letter_queue_topic: dead.letter.queue
platform_service: testareno
processing_timeout_s: 0
group.id: ${CDP_GROUP_ID:insights_sha_extractor_app}
bootstrap.servers: kafka:9092
security.protocol: PLAINTEXT
max.poll.interval.ms: 30000
heartbeat.interval.ms: 50000
session.timeout.ms: 10000
auto.offset.reset: earliest
downloader:
name: ccx_messaging.downloaders.http_downloader.HTTPDownloader
kwargs:
max_archive_size: 100 MiB
allow_unsafe_links: true
engine:
name: ccx_messaging.engines.sha_extractor_engine.SHAExtractorEngine
kwargs:
extract_timeout: 10
publisher:
name: ccx_messaging.publishers.workloads_info_publisher.WorkloadInfoPublisher
kwargs:
outgoing_topic: archive-results
bootstrap.servers: kafka:9092
compression: gzip

logging:
version: 1
disable_existing_loggers: false
handlers:
default:
level: DEBUG
class: logging.StreamHandler
stream: ext://sys.stdout
formatter: json
formatters:
brief:
format: "%(message)s"
json:
(): "pythonjsonlogger.jsonlogger.JsonFormatter"
format: "%(filename)s %(lineno)d %(process)d %(levelname)s %(asctime)s %(name)s %(message)s"
cloudwatch:
format: "%(filename)s %(levelname)s %(asctime)s %(name)s %(hostname)s %(mac_address)s %(message)s"
root:
handlers:
- default
loggers:
insights_messaging:
level: DEBUG
ccx_messaging:
level: DEBUG
insights:
level: WARNING
insights_sha_extractor:
level: DEBUG
1 change: 1 addition & 0 deletions docs/feature_list.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ nav_order: 2
## SHA Extractor

* [SHA Extractor](https://github.com/RedHatInsights/insights-behavioral-spec/blob/main/features/SHA_Extractor/sha_extractor.feature)
* [SHA Extractor Compressed](https://github.com/RedHatInsights/insights-behavioral-spec/blob/main/features/SHA_Extractor/sha_extractor_compressed.feature)


## Smart Proxy
Expand Down
7 changes: 7 additions & 0 deletions docs/scenarios_list.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,13 @@ nav_order: 3
* Check if SHA extractor is able to consume messages from Kafka and then download tarball
* Check if SHA extractor is able to consume messages from Kafka, download tarball, and take SHA images
* Check if SHA extractor is able to finish the processing of SHA images
## [`SHA_Extractor/sha_extractor_compressed.feature`](https://github.com/RedHatInsights/insights-behavioral-spec/blob/main/features/SHA_Extractor/sha_extractor_compressed.feature)

* Check that SHA extractor service has all the information and interfaces it needs to work properly
* Check if SHA extractor is able to consume messages from Kafka
* Check if SHA extractor is able to consume messages from Kafka and then download tarball
* Check if SHA extractor is able to consume messages from Kafka, download tarball, and take SHA images
* Check if SHA extractor compression works properly

## [`smart-proxy/smoketests.feature`](https://github.com/RedHatInsights/insights-behavioral-spec/blob/main/features/smart-proxy/smoketests.feature)

Expand Down
4 changes: 3 additions & 1 deletion features/SHA_Extractor/sha_extractor.feature
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Feature: SHA Extractor


Background:
Given Kafka broker is started on host and port specified in configuration
Given Kafka broker is started on host and port specified in configuration "no-compression"
And Kafka topic specified in configuration variable "incoming_topic" is created
And Kafka topic specified in configuration variable "dead_letter_queue_topic" is created
And Kafka topic specified in configuration variable "outgoing_topic" is created
Expand Down Expand Up @@ -83,3 +83,5 @@ Feature: SHA Extractor
And SHA extractor should download tarball from given URL attribute
When the file "config/workload_info.json" is found
Then the content of this file needs to be sent into topic "archive_results"
When compresion is disabled
Then Published message should not be compressed
37 changes: 37 additions & 0 deletions features/SHA_Extractor/sha_extractor_compressed.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
@sha_extractor


Feature: SHA Extractor


Background:
Given Kafka broker is started on host and port specified in configuration "compressed"
And Kafka topic specified in configuration variable "incoming_topic" is created
And Kafka topic specified in configuration variable "dead_letter_queue_topic" is created
And Kafka topic specified in configuration variable "outgoing_topic" is created


Scenario: Check that SHA extractor service has all the information and interfaces it needs to work properly
Given SHA extractor service is not started
When SHA extractor service is started in group "check_info"
Then SHA extractor service does not exit with an error code
And SHA extractor service should be registered to topic "incoming_topic"

Scenario: Check if SHA extractor compresion works properly
Given SHA extractor service is started with compresion
When S3 and Kafka are populated with an archive with workload_info
Then SHA extractor should consume message about this event
And this message should contain following attributes
| Attribute | Description | Type |
| account | account ID | unsigned int |
| principal | principal ID | unsigned int |
| size | tarball size | unsigned int |
| url | URL to S3 | string |
| b64_identity | identity encoded by BASE64 | string |
| timestamp | timestamp of event | string |
Then SHA extractor retrieve the "url" attribute from the message
And SHA extractor should download tarball from given URL attribute
When the file "config/workload_info.json" is found
Then the content of this file needs to be sent into topic "archive_results"
When compresion is enabled
Then Published message have to be compressed
14 changes: 14 additions & 0 deletions features/src/kafka_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError, UnknownTopicOrPartitionError
import gzip


class SendEventException(Exception):
Expand Down Expand Up @@ -79,9 +80,22 @@ def send_event(bootstrap, topic, payload, headers=None, partition=None, timestam

def consume_event(bootstrap, topic, group_id=None):
"""Consume events in the given topic."""
f = open("/home/jdrobena/work9.2/insights-behavioral-spec/demofile3.txt", "a")
f.write(bootstrap+" "+topic)
consumer = KafkaConsumer(
bootstrap_servers=bootstrap,
group_id=group_id,
)
consumer.subscribe(topics=topic)
return consumer.poll()

def consume_one_message_from_topic(bootsrap, topic):
"""Consume one messages in given topic"""
f = open("/home/jdrobena/work9.2/insights-behavioral-spec/demofile3.txt", "a")
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootsrap,
auto_offset_reset='earliest'

)
return next(consumer)
63 changes: 58 additions & 5 deletions features/steps/insights_sha_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from behave import given, then, when
from kafka.cluster import ClusterMetadata
from src import kafka_util
import gzip


@given("SHA extractor service is not started")
Expand All @@ -29,13 +30,18 @@ def sha_extractor_not_started(context):
assert not hasattr(context, "sha_extractor")


@given("Kafka broker is started on host and port specified in configuration")
def kafka_broker_running(context):
@given('Kafka broker is started on host and port specified in configuration "{compression_var}')
def kafka_broker_running(context,compression_var):
"""Check if Kafka broker is running on specified address."""
config = None
with open("config/insights_sha_extractor.yaml", "r") as file:
config = yaml.safe_load(file)
context.sha_config = config
if compression_var != "compressed":
with open("config/insights_sha_extractor.yaml", "r") as file:
config = yaml.safe_load(file)
context.sha_config = config
else:
with open("config/insights_sha_extractor_compressed.yaml", "r") as file:
config = yaml.safe_load(file)
context.sha_config = config
hostname = config["service"]["consumer"]["kwargs"]["bootstrap.servers"]
context.hostname = hostname
context.kafka_hostname = hostname.split(":")[0]
Expand Down Expand Up @@ -203,3 +209,50 @@ def message_in_buffer(message, buffer):
break

return found

@given("SHA extractor service is started with compresion")
def start_sha_extractor_compressed(context, group_id=None):
"""Start SHA Extractor service."""
if group_id:
os.environ["CDP_GROUP_ID"] = group_id

sha_extractor = subprocess.Popen(
["ccx-messaging", "config/insights_sha_extractor_compressed.yaml"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
env=os.environ.copy(),
)
assert sha_extractor is not None, "Process was not created"
context.add_cleanup(sha_extractor.terminate)
context.sha_extractor = sha_extractor

@when("compresion is enabled")
@then("Published message have to be compressed")
def compressed_archive_sent_to_topic(context):
"""Check that sha extractor did not process any event."""
decoded = None
error= None
consumed_message = kafka_util.consume_one_message_from_topic(context.kafka_hostname, context.outgoing_topic)
try:
decoded = gzip.decompress(consumed_message.value)
except Exception as err:
error=err
assert decoded is not None and error is None



@when("compresion is disabled")
@then("Published message should not be compressed")
def no_compressed_archive_sent_to_topic(context):
"""Check that sha extractor did not process any event."""
decoded = None
error= None
consumed_message = kafka_util.consume_one_message_from_topic(context.kafka_hostname, context.outgoing_topic)
try:
decoded = gzip.decompress(consumed_message.value)
except Exception as err:
error=err
assert decoded is None and error is not None

3 changes: 2 additions & 1 deletion test_list/insights_sha_extractor.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
../features/SHA_Extractor/sha_extractor.feature
../features/SHA_Extractor/sha_extractor.feature
../features/SHA_Extractor/sha_extractor_compressed.feature

0 comments on commit d0dc8ef

Please sign in to comment.