From 018c72378ac69fa825b5b2ee23db3db85ea8b028 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Tue, 28 Feb 2023 14:34:21 +0100 Subject: [PATCH 01/27] Add initial setup and test connection to ADX with dummy --- README.md | 4 + config.yaml | 3 + simulator/main.py | 17 ++-- simulator/modules/__init__.py | 3 + simulator/modules/azure_data_explorer.py | 104 +++++++++++++++++++++++ simulator/modules/influxdb.py | 2 +- simulator/modules/population.csv | 3 + 7 files changed, 128 insertions(+), 8 deletions(-) create mode 100644 simulator/modules/azure_data_explorer.py create mode 100644 simulator/modules/population.csv diff --git a/README.md b/README.md index 2f553d1..916fdb0 100644 --- a/README.md +++ b/README.md @@ -312,6 +312,10 @@ kubectl apply -f dbinstall/elastic-deployment.yaml ``` +### Azure Data Explorer + +Besides having an emulator for ADX, capable of running locally, it is not recommended to use this emulator for any kind of benchmark tests, since the performance profile is very different. Furthermore, it is even prohibited by license terms. + ## Remarks on the databases This is a collection of problems / observations collected over the course of the tests. diff --git a/config.yaml b/config.yaml index 3dcb377..19fb043 100644 --- a/config.yaml +++ b/config.yaml @@ -56,4 +56,7 @@ targets: elasticsearch: module: elasticsearch endpoint: https://elastic-es-http.default.svc.cluster.local:9200 + azure_data_explorer: + module: azure_data_explorer + connection_string: "TODO" namespace: default diff --git a/simulator/main.py b/simulator/main.py index 73a8432..034d3bf 100644 --- a/simulator/main.py +++ b/simulator/main.py @@ -1,10 +1,13 @@ import os -instance_type = os.environ.get("INSTANCE_TYPE") +import modules.azure_data_explorer as ade +ade.test() -if instance_type == "worker": - import worker - worker.run() -else: - import collector - collector.run() +# instance_type = os.environ.get("INSTANCE_TYPE") +# +# if instance_type == "worker": +# import worker +# worker.run() +# else: +# import collector +# collector.run() diff --git a/simulator/modules/__init__.py b/simulator/modules/__init__.py index cad5df9..fa9a996 100644 --- a/simulator/modules/__init__.py +++ b/simulator/modules/__init__.py @@ -20,5 +20,8 @@ def select_module(): elif mod == "elasticsearch": from . import elasticsearch return elasticsearch + elif mod == "azure_data_explorer": + from . import azure_data_explorer + return azure_data_explorer else: raise Exception(f"Unknown module: {mod}") diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py new file mode 100644 index 0000000..ddcf03f --- /dev/null +++ b/simulator/modules/azure_data_explorer.py @@ -0,0 +1,104 @@ +from azure.kusto.data import KustoClient, KustoConnectionStringBuilder +from azure.kusto.data.exceptions import KustoServiceError +from azure.kusto.data.helpers import dataframe_from_result_table +from azure.kusto.data import DataFormat +from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, ReportLevel, \ + ReportMethod + +# from .config import config TODO: uncomment + +def test(): + + KUSTO_URI = "https://kvc7rq22b9ye5d4a4fgyas.northeurope.kusto.windows.net" + KUSTO_INGEST_URI = "https://ingest-kvc7rq22b9ye5d4a4fgyas.northeurope.kusto.windows.net" + KUSTO_DATABASE = "ConnTest" + + KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(KUSTO_INGEST_URI) + KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(KUSTO_URI) + + DESTINATION_TABLE = "PopulationDataNew" + DESTINATION_TABLE_COLUMN_MAPPING = "PopulationDataNew_CSV_Mapping" + + # CONTAINER = "samplefiles" + # ACCOUNT_NAME = "kustosamples" + # SAS_TOKEN = "" + # FILE_PATH = "PopulationDataNew.csv" + # FILE_SIZE = 64158321 # in bytes + # + # BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \ + # CONTAINER + "/" + FILE_PATH + SAS_TOKEN + + KUSTO_CLIENT = KustoClient(KCSB_DATA) + CREATE_TABLE_COMMAND = ".create table PopulationDataNew (State: string, Population: int)" + RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND) + # dataframe_from_result_table(RESPONSE.primary_results[0]) + + CREATE_MAPPING_COMMAND = """.create table PopulationDataNew ingestion csv mapping 'PopulationDataNew_CSV_Mapping' '[{"Name":"State","datatype":"string","Ordinal":0}, {"Name":"Population","datatype":"int","Ordinal":1}]'""" + RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND) + # dataframe_from_result_table(RESPONSE.primary_results[0]) + + INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST) + # All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties + # INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV, ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'}) + # FILE_SIZE is the raw size of the data in bytes + # BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE) + # INGESTION_CLIENT.ingest_from_blob( + # BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES) + + ingestion_props = IngestionProperties( + database="ConnTest", + table="PopulationDataNew", + data_format=DataFormat.CSV, + # in case status update for success are also required (remember to import ReportLevel from azure.kusto.ingest) + # report_level=ReportLevel.FailuresAndSuccesses, + # in case a mapping is required (remember to import IngestionMappingKind from azure.kusto.data.data_format) + # ingestion_mapping_reference="{json_mapping_that_already_exists_on_table}", + # ingestion_mapping_kind= IngestionMappingKind.JSON, + ) + + import os + + cwd = os.getcwd() # Get the current working directory (cwd) + files = os.listdir(cwd) # Get all the files in that directory + print("Files in %r: %s" % (cwd, files)) + + + file_descriptor = FileDescriptor("modules/population.csv", 48) # 3333 is the raw size of the data in bytes. + result = INGESTION_CLIENT.ingest_from_file(file_descriptor, ingestion_props) + print(repr(result)) + + print('Done queuing up ingestion with Azure Data Explorer') + + +def _db(): + # cluster = Cluster() + pass + + +def init(): + pass + + +def prefill_events(events): + _insert_events(events, True, 1000) + + +def insert_events(events): + batch_mode = config.get("batch_mode", False) + batch_size = config.get("batch_size", 1000) + _insert_events(events, batch_mode, batch_size) + + +def _insert_events(events, batch_mode, batch_size, use_values_lists=False): + print("Connecting to database", flush=True) + use_multiple_tables = config["use_multiple_tables"] + if use_multiple_tables: + table_names = ["events0", "events1", "events2", "events3"] + else: + table_names = ["events"] + + print("Inserting events", flush=True) + + +def queries(): + db = _db() diff --git a/simulator/modules/influxdb.py b/simulator/modules/influxdb.py index f07ec21..c1369f8 100644 --- a/simulator/modules/influxdb.py +++ b/simulator/modules/influxdb.py @@ -8,7 +8,7 @@ BUCKET_NAME = "dbtest" CURRENT_YEAR = date.today().year -NEXT_YEAR = (date.today()+ timedelta(days=366)).year +NEXT_YEAR = (date.today() + timedelta(days=366)).year def _db(): diff --git a/simulator/modules/population.csv b/simulator/modules/population.csv new file mode 100644 index 0000000..3cbbebc --- /dev/null +++ b/simulator/modules/population.csv @@ -0,0 +1,3 @@ +TEXAS, 500000 +INDIANA, 400000 +WASHINGTON, 300000 \ No newline at end of file From 3a708df7e3bb4268d13ed639f03396d7bd60e1f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Tue, 28 Feb 2023 16:07:33 +0100 Subject: [PATCH 02/27] Small clean up --- README.md | 4 +-- simulator/modules/azure_data_explorer.py | 32 ++---------------------- simulator/requirements.txt | 2 ++ 3 files changed, 6 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 916fdb0..7552c06 100644 --- a/README.md +++ b/README.md @@ -136,9 +136,9 @@ To run the test use `python run.py insert`. You can use the following options: * `--workers`: Set of worker counts to try, default is `1,4,8,12,16` meaning the test will try with 1 concurrent worker, then with 4, then 8, then 12 and finally 16 * `--runs`: How often should the test be repeated for each worker count, default is `3` * `--primary-key`: Defines how the primary key should be generated, see below for choices. Defaults to `db` -* `--tables`: To simulate how the databases behave if inserts are done to several tables this option can be changed from `single` to `multiple` to have the test write into four instead of just one table +* `--tables`: To simulate how the databases behave if inserts are done to several tables. This option can be changed from `single` to `multiple` to have the test write into four instead of just one table * `--num-inserts`: The number of inserts each worker should do, by default 10000 to get a quick result. Increase this to see how the databases behave under constant load. Also increase the timout option accordingly -* `--timeout`: How long should the script wait for the insert test to complete in seconds. Default is `0`. Increase accordingly if you increase the number of inserts or disable by stting to `0` +* `--timeout`: How long should the script wait for the insert test to complete in seconds. Default is `0`. Increase accordingly if you increase the number of inserts or disable by setting to `0` * `--batch`: Switch to batch mode (for postgres this means manual commits, for arangodb using the [batch api](https://docs.python-arango.com/en/main/batch.html)). Specify the number of inserts per transaction/batch * `--extra-option`: Extra options to supply to the test scripts, can be used multiple times. Currently only used for ArangoDB (see below) * `--clean` / `--no-clean`: By default the simulator will clean and recreate tables to always have the same basis for the runs. Can be disabled diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py index ddcf03f..27e4a1c 100644 --- a/simulator/modules/azure_data_explorer.py +++ b/simulator/modules/azure_data_explorer.py @@ -19,50 +19,22 @@ def test(): DESTINATION_TABLE = "PopulationDataNew" DESTINATION_TABLE_COLUMN_MAPPING = "PopulationDataNew_CSV_Mapping" - # CONTAINER = "samplefiles" - # ACCOUNT_NAME = "kustosamples" - # SAS_TOKEN = "" - # FILE_PATH = "PopulationDataNew.csv" - # FILE_SIZE = 64158321 # in bytes - # - # BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \ - # CONTAINER + "/" + FILE_PATH + SAS_TOKEN - KUSTO_CLIENT = KustoClient(KCSB_DATA) CREATE_TABLE_COMMAND = ".create table PopulationDataNew (State: string, Population: int)" RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND) # dataframe_from_result_table(RESPONSE.primary_results[0]) CREATE_MAPPING_COMMAND = """.create table PopulationDataNew ingestion csv mapping 'PopulationDataNew_CSV_Mapping' '[{"Name":"State","datatype":"string","Ordinal":0}, {"Name":"Population","datatype":"int","Ordinal":1}]'""" - RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND) - # dataframe_from_result_table(RESPONSE.primary_results[0]) + KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND) INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST) - # All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties - # INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV, ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'}) - # FILE_SIZE is the raw size of the data in bytes - # BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE) - # INGESTION_CLIENT.ingest_from_blob( - # BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES) ingestion_props = IngestionProperties( database="ConnTest", table="PopulationDataNew", - data_format=DataFormat.CSV, - # in case status update for success are also required (remember to import ReportLevel from azure.kusto.ingest) - # report_level=ReportLevel.FailuresAndSuccesses, - # in case a mapping is required (remember to import IngestionMappingKind from azure.kusto.data.data_format) - # ingestion_mapping_reference="{json_mapping_that_already_exists_on_table}", - # ingestion_mapping_kind= IngestionMappingKind.JSON, + data_format=DataFormat.CSV ) - import os - - cwd = os.getcwd() # Get the current working directory (cwd) - files = os.listdir(cwd) # Get all the files in that directory - print("Files in %r: %s" % (cwd, files)) - - file_descriptor = FileDescriptor("modules/population.csv", 48) # 3333 is the raw size of the data in bytes. result = INGESTION_CLIENT.ingest_from_file(file_descriptor, ingestion_props) print(repr(result)) diff --git a/simulator/requirements.txt b/simulator/requirements.txt index fa5cc64..85b6c75 100644 --- a/simulator/requirements.txt +++ b/simulator/requirements.txt @@ -5,3 +5,5 @@ python-arango==7.1.0 cassandra-driver==3.25.0 influxdb-client==1.21.0 elasticsearch==7.15.1 +azure-kusto-data==4.1.2 +azure-kusto-ingest==4.1.2 \ No newline at end of file From 5e414365c3865f5403647308c23ab7c7d41d93c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Wed, 1 Mar 2023 08:37:50 +0100 Subject: [PATCH 03/27] Change ingestion from file to dataframe and start with init() --- simulator/modules/azure_data_explorer.py | 82 +++++++++++++----------- simulator/requirements.txt | 3 +- 2 files changed, 48 insertions(+), 37 deletions(-) diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py index 27e4a1c..329b1cc 100644 --- a/simulator/modules/azure_data_explorer.py +++ b/simulator/modules/azure_data_explorer.py @@ -5,59 +5,69 @@ from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, ReportLevel, \ ReportMethod -# from .config import config TODO: uncomment +import pandas as pd -def test(): - - KUSTO_URI = "https://kvc7rq22b9ye5d4a4fgyas.northeurope.kusto.windows.net" - KUSTO_INGEST_URI = "https://ingest-kvc7rq22b9ye5d4a4fgyas.northeurope.kusto.windows.net" - KUSTO_DATABASE = "ConnTest" - - KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(KUSTO_INGEST_URI) - KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(KUSTO_URI) - - DESTINATION_TABLE = "PopulationDataNew" - DESTINATION_TABLE_COLUMN_MAPPING = "PopulationDataNew_CSV_Mapping" +# from .config import config - KUSTO_CLIENT = KustoClient(KCSB_DATA) - CREATE_TABLE_COMMAND = ".create table PopulationDataNew (State: string, Population: int)" - RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND) - # dataframe_from_result_table(RESPONSE.primary_results[0]) +KUSTO_URI = "https://kvc7rq22b9ye5d4a4fgyas.northeurope.kusto.windows.net" +KUSTO_INGEST_URI = "https://ingest-kvc7rq22b9ye5d4a4fgyas.northeurope.kusto.windows.net" +KUSTO_DATABASE = "ConnTest" - CREATE_MAPPING_COMMAND = """.create table PopulationDataNew ingestion csv mapping 'PopulationDataNew_CSV_Mapping' '[{"Name":"State","datatype":"string","Ordinal":0}, {"Name":"Population","datatype":"int","Ordinal":1}]'""" - KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND) +DESTINATION_TABLE = "PopulationDataNew" +DESTINATION_TABLE_COLUMN_MAPPING = "PopulationDataNew_CSV_Mapping" - INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST) - ingestion_props = IngestionProperties( - database="ConnTest", - table="PopulationDataNew", - data_format=DataFormat.CSV - ) +def test(): + kcsb_ingest = KustoConnectionStringBuilder.with_interactive_login(KUSTO_INGEST_URI) + kcsb_data = KustoConnectionStringBuilder.with_interactive_login(KUSTO_URI) + + kusto_client = KustoClient(kcsb_data) + create_table_command = f".create table {DESTINATION_TABLE} (State: string, Population: int)" + kusto_client.execute_mgmt(KUSTO_DATABASE, create_table_command) + + create_mapping_command = """.create table PopulationDataNew ingestion csv mapping 'PopulationDataNew_CSV_Mapping' '[{"Name":"State","datatype":"string","Ordinal":0}, {"Name":"Population","datatype":"int","Ordinal":1}]'""" + kusto_client.execute_mgmt(KUSTO_DATABASE, create_mapping_command) + + ingestion_client = QueuedIngestClient(kcsb_ingest) + ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV, ignore_first_record=True) + # file_descriptor = FileDescriptor("modules/population.csv", 48) + # ingestion_client.ingest_from_file(file_descriptor, ingestion_props) + mapping = {'State': ['Texas', 'New York', 'Arizona'], 'Population': [300, 400, 500]} + dataframe = pd.DataFrame(data=mapping) + ingestion_client.ingest_from_dataframe(dataframe, ingestion_props) + print('Done queuing up ingestion with Azure Data Explorer') - file_descriptor = FileDescriptor("modules/population.csv", 48) # 3333 is the raw size of the data in bytes. - result = INGESTION_CLIENT.ingest_from_file(file_descriptor, ingestion_props) - print(repr(result)) +def _conn(): + return KustoConnectionStringBuilder.with_interactive_login(KUSTO_INGEST_URI) - print('Done queuing up ingestion with Azure Data Explorer') +def init(): + kcsb_data = KustoConnectionStringBuilder.with_interactive_login(KUSTO_URI) + kusto_client = KustoClient(kcsb_data) + if config["use_multiple_tables"]: + table_names = ["events0", "events1", "events2", "events3"] + else: + table_names = ["events"] -def _db(): - # cluster = Cluster() - pass + if config["clean_database"]: + for table_name in ["events0", "events1", "events2", "events3", "events"]: + delete_table_command = f".drop table {table_name}" + kusto_client.execute_mgmt(KUSTO_DATABASE, delete_table_command) + for table_name in table_names: + create_table_command = f".create table {table_name} (timestamp: long, device_id: string, sequence_number: long, temperature: real)" + kusto_client.execute_mgmt(KUSTO_DATABASE, create_table_command) -def init(): - pass + create_mapping_command = f""".create table {table_name} ingestion csv mapping '{table_name}_CSV_Mapping' '[{"Name":"timestamp","datatype":"long","Ordinal":0}, {"Name":"device_id","datatype":"string","Ordinal":1}, {"Name":"sequence_number","datatype":"long","Ordinal":2}, {"Name":"temperature","datatype":"real","Ordinal":3}]'""" + kusto_client.execute_mgmt(KUSTO_DATABASE, create_mapping_command) def prefill_events(events): - _insert_events(events, True, 1000) - + _insert_events(events, True, 1_000) def insert_events(events): batch_mode = config.get("batch_mode", False) - batch_size = config.get("batch_size", 1000) + batch_size = config.get("batch_size", 1_000) _insert_events(events, batch_mode, batch_size) diff --git a/simulator/requirements.txt b/simulator/requirements.txt index 85b6c75..20feaca 100644 --- a/simulator/requirements.txt +++ b/simulator/requirements.txt @@ -6,4 +6,5 @@ cassandra-driver==3.25.0 influxdb-client==1.21.0 elasticsearch==7.15.1 azure-kusto-data==4.1.2 -azure-kusto-ingest==4.1.2 \ No newline at end of file +azure-kusto-ingest==4.1.2 +pandas==1.5.3 \ No newline at end of file From b3954ebf492a95d975313fc5afb15a4264239eb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Thu, 2 Mar 2023 00:42:19 +0100 Subject: [PATCH 04/27] Make Dockerfile and config runnable for adx --- deployment/templates/worker.yaml | 3 +- deployment/values.yaml | 6 +-- simulator/Dockerfile | 8 ++-- simulator/main.py | 17 +++----- simulator/modules/azure_data_explorer.py | 54 +++++++++++++++++++++--- simulator/modules/config.py | 3 +- simulator/requirements.txt | 2 +- 7 files changed, 66 insertions(+), 27 deletions(-) diff --git a/deployment/templates/worker.yaml b/deployment/templates/worker.yaml index fad7450..2950b4c 100644 --- a/deployment/templates/worker.yaml +++ b/deployment/templates/worker.yaml @@ -39,6 +39,5 @@ spec: fieldPath: metadata.name restartPolicy: Never terminationGracePeriodSeconds: 2 - nodeSelector: -{{ .Values.nodeSelector | toYaml | indent 8 }} + nodeSelector: {{ .Values.nodeSelector | toYaml | indent 8 }} backoffLimit: 0 diff --git a/deployment/values.yaml b/deployment/values.yaml index 16f49a7..9bf78c2 100644 --- a/deployment/values.yaml +++ b/deployment/values.yaml @@ -1,11 +1,11 @@ --- namespace: default -target_module: postgres -run_config: null +target_module: azure_data_explorer +run_config: ewoidGFzayI6ICJpbnNlcnQiLAoidXNlX211bHRpcGxlX3RhYmxlcyI6ICJmYWxzZSIsCiJjbGVhbl9kYXRhYmFzZSI6ICJ0cnVlIgp9 #ewoidGFzayI6ICJpbnNlcnQiLAoidXNlX211bHRpcGxlX3RhYmxlcyI6ICJmYWxzZSIKfQ== #eyJ0YXNrIjogImluc2VydCJ9 workers: 1 image: - name: maibornwolff/database-comparison + name: davidboettcher/database-comparison tag: latest nodeSelector: {} diff --git a/simulator/Dockerfile b/simulator/Dockerfile index 379e5f1..98c84d7 100644 --- a/simulator/Dockerfile +++ b/simulator/Dockerfile @@ -2,8 +2,8 @@ FROM python:3.8-alpine as base FROM base as builder -RUN mkdir /install -RUN apk update && apk add postgresql-dev gcc python3-dev musl-dev +RUN mkdir /install +RUN apk update && apk add postgresql-dev gcc python3-dev musl-dev libc-dev make git libffi-dev openssl-dev libxml2-dev libxslt-dev automake g++ WORKDIR /install COPY requirements.txt /requirements.txt RUN pip install --prefix=/install -r /requirements.txt @@ -12,8 +12,10 @@ FROM base COPY --from=builder /install /usr/local COPY requirements.txt / + RUN pip install -r requirements.txt -RUN apk --no-cache add libpq + +RUN apk --no-cache add libpq libstdc++ ADD . /simulator WORKDIR /simulator CMD ["python", "main.py"] diff --git a/simulator/main.py b/simulator/main.py index 034d3bf..73a8432 100644 --- a/simulator/main.py +++ b/simulator/main.py @@ -1,13 +1,10 @@ import os -import modules.azure_data_explorer as ade -ade.test() +instance_type = os.environ.get("INSTANCE_TYPE") -# instance_type = os.environ.get("INSTANCE_TYPE") -# -# if instance_type == "worker": -# import worker -# worker.run() -# else: -# import collector -# collector.run() +if instance_type == "worker": + import worker + worker.run() +else: + import collector + collector.run() diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py index 329b1cc..da0e928 100644 --- a/simulator/modules/azure_data_explorer.py +++ b/simulator/modules/azure_data_explorer.py @@ -7,7 +7,7 @@ import pandas as pd -# from .config import config +from .config import config KUSTO_URI = "https://kvc7rq22b9ye5d4a4fgyas.northeurope.kusto.windows.net" KUSTO_INGEST_URI = "https://ingest-kvc7rq22b9ye5d4a4fgyas.northeurope.kusto.windows.net" @@ -17,6 +17,7 @@ DESTINATION_TABLE_COLUMN_MAPPING = "PopulationDataNew_CSV_Mapping" + def test(): kcsb_ingest = KustoConnectionStringBuilder.with_interactive_login(KUSTO_INGEST_URI) kcsb_data = KustoConnectionStringBuilder.with_interactive_login(KUSTO_URI) @@ -29,16 +30,18 @@ def test(): kusto_client.execute_mgmt(KUSTO_DATABASE, create_mapping_command) ingestion_client = QueuedIngestClient(kcsb_ingest) - ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV, ignore_first_record=True) - # file_descriptor = FileDescriptor("modules/population.csv", 48) - # ingestion_client.ingest_from_file(file_descriptor, ingestion_props) + ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV, + ignore_first_record=True) mapping = {'State': ['Texas', 'New York', 'Arizona'], 'Population': [300, 400, 500]} dataframe = pd.DataFrame(data=mapping) ingestion_client.ingest_from_dataframe(dataframe, ingestion_props) print('Done queuing up ingestion with Azure Data Explorer') -def _conn(): - return KustoConnectionStringBuilder.with_interactive_login(KUSTO_INGEST_URI) + +def _ingestion_client(): + kcsb_ingest = KustoConnectionStringBuilder.with_interactive_login(KUSTO_INGEST_URI) + return QueuedIngestClient(kcsb_ingest) + def init(): kcsb_data = KustoConnectionStringBuilder.with_interactive_login(KUSTO_URI) @@ -65,13 +68,46 @@ def init(): def prefill_events(events): _insert_events(events, True, 1_000) + def insert_events(events): batch_mode = config.get("batch_mode", False) batch_size = config.get("batch_size", 1_000) _insert_events(events, batch_mode, batch_size) -def _insert_events(events, batch_mode, batch_size, use_values_lists=False): +def _batch_insert(events, batch_size, table_names): + count = 0 + timestamps = [] + device_ids = [] + sequence_numbers = [] + temperatures = [] + for idx, event in enumerate(events): + table = table_names[idx % len(table_names)] + timestamps.append(event.timestamp) + device_ids.append(event.device_id) + sequence_numbers.append(event.sequence_number) + temperatures.append(event.temperature) + count += 1 + if count >= batch_size: + _ingest(table, timestamps, device_ids, sequence_numbers, temperatures) + if count > 0: + _ingest(table, timestamps, device_ids, sequence_numbers, temperatures) + +def _ingest(table, timestamps, device_ids, sequence_numbers, temperatures): + ingestion_client = _ingestion_client() + ingestion_data = {'timestamp': timestamps, 'device_id': device_ids, 'sequence_number': sequence_numbers, + 'temperature': temperatures} + dataframe = pd.DataFrame(data=ingestion_data) + ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, data_format=DataFormat.CSV, + ignore_first_record=True) + ingestion_client.ingest_from_dataframe(dataframe, ingestion_props) + + +def _stream_insert(events, table_names): + pass + + +def _insert_events(events, batch_mode, batch_size): print("Connecting to database", flush=True) use_multiple_tables = config["use_multiple_tables"] if use_multiple_tables: @@ -80,6 +116,10 @@ def _insert_events(events, batch_mode, batch_size, use_values_lists=False): table_names = ["events"] print("Inserting events", flush=True) + if batch_mode: + _batch_insert(events, batch_size, table_names) + else: + _stream_insert(events, table_names) def queries(): diff --git a/simulator/modules/config.py b/simulator/modules/config.py index 26d151c..edf5d1f 100644 --- a/simulator/modules/config.py +++ b/simulator/modules/config.py @@ -2,5 +2,6 @@ import json import os - +print(os.getenv("RUN_CONFIG")) +print(base64.b64decode(os.getenv("RUN_CONFIG"))) config = json.loads(base64.b64decode(os.getenv("RUN_CONFIG"))) diff --git a/simulator/requirements.txt b/simulator/requirements.txt index 20feaca..174ea20 100644 --- a/simulator/requirements.txt +++ b/simulator/requirements.txt @@ -1,4 +1,4 @@ -Flask==1.1.2 +Flask==2.2.2 dataclasses-json==0.5.2 psycopg2-binary==2.8.6 python-arango==7.1.0 From 576e970e7044bb0544aafff8ad7a7df42669d3ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Wed, 8 Mar 2023 14:27:40 +0100 Subject: [PATCH 05/27] Add remaining azure data explorer logic. Stream-Functionality does not work --- config.yaml | 4 +- deployment/templates/collector.yaml | 4 + deployment/templates/worker.yaml | 3 + deployment/values.yaml | 2 +- simulator/modules/azure_data_explorer.py | 190 +++++++++++++++++------ simulator/modules/config.py | 3 + 6 files changed, 157 insertions(+), 49 deletions(-) diff --git a/config.yaml b/config.yaml index 19fb043..b34e7c5 100644 --- a/config.yaml +++ b/config.yaml @@ -58,5 +58,7 @@ targets: endpoint: https://elastic-es-http.default.svc.cluster.local:9200 azure_data_explorer: module: azure_data_explorer - connection_string: "TODO" + kusto_uri: https://adxcompare.westeurope.kusto.windows.net + kusto_ingest_uri: https://ingest-adxcompare.westeurope.kusto.windows.net + kusto_db: ConnTest namespace: default diff --git a/deployment/templates/collector.yaml b/deployment/templates/collector.yaml index 3283e3b..9e39de2 100644 --- a/deployment/templates/collector.yaml +++ b/deployment/templates/collector.yaml @@ -26,6 +26,10 @@ spec: value: "{{ .Values.target_module }}" - name: RUN_CONFIG value: "{{ .Values.run_config }}" + envFrom: + - secretRef: + name: adx-secret + terminationGracePeriodSeconds: 2 nodeSelector: {{ .Values.nodeSelector | toYaml | indent 8 }} diff --git a/deployment/templates/worker.yaml b/deployment/templates/worker.yaml index 2950b4c..9c26f61 100644 --- a/deployment/templates/worker.yaml +++ b/deployment/templates/worker.yaml @@ -37,6 +37,9 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + envFrom: + - secretRef: + name: adx-secret restartPolicy: Never terminationGracePeriodSeconds: 2 nodeSelector: {{ .Values.nodeSelector | toYaml | indent 8 }} diff --git a/deployment/values.yaml b/deployment/values.yaml index 9bf78c2..7133898 100644 --- a/deployment/values.yaml +++ b/deployment/values.yaml @@ -1,7 +1,7 @@ --- namespace: default target_module: azure_data_explorer -run_config: ewoidGFzayI6ICJpbnNlcnQiLAoidXNlX211bHRpcGxlX3RhYmxlcyI6ICJmYWxzZSIsCiJjbGVhbl9kYXRhYmFzZSI6ICJ0cnVlIgp9 #ewoidGFzayI6ICJpbnNlcnQiLAoidXNlX211bHRpcGxlX3RhYmxlcyI6ICJmYWxzZSIKfQ== #eyJ0YXNrIjogImluc2VydCJ9 +run_config: ewoidGFzayI6ICJpbnNlcnQiLAoidXNlX211bHRpcGxlX3RhYmxlcyI6ICJmYWxzZSIsCiJjbGVhbl9kYXRhYmFzZSI6ICJ0cnVlIiwKImt1c3RvX3VyaSI6ICJodHRwczovL2FkeGNvbXBhcmUud2VzdGV1cm9wZS5rdXN0by53aW5kb3dzLm5ldCIsCiJrdXN0b19pbmdlc3RfdXJpIjogImh0dHBzOi8vaW5nZXN0LWFkeGNvbXBhcmUud2VzdGV1cm9wZS5rdXN0by53aW5kb3dzLm5ldCIsCiJrdXN0b19kYiI6ICJDb25uVGVzdCIKfQ== # ewoidGFzayI6ICJpbnNlcnQiLAoidXNlX211bHRpcGxlX3RhYmxlcyI6ICJmYWxzZSIsCiJjbGVhbl9kYXRhYmFzZSI6ICJ0cnVlIgp9 workers: 1 image: diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py index da0e928..28cde97 100644 --- a/simulator/modules/azure_data_explorer.py +++ b/simulator/modules/azure_data_explorer.py @@ -1,52 +1,94 @@ +import io +import itertools +import json +import time + from azure.kusto.data import KustoClient, KustoConnectionStringBuilder -from azure.kusto.data.exceptions import KustoServiceError -from azure.kusto.data.helpers import dataframe_from_result_table +from azure.kusto.data.exceptions import KustoApiError from azure.kusto.data import DataFormat -from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, ReportLevel, \ - ReportMethod +from azure.kusto.ingest import QueuedIngestClient, IngestionProperties import pandas as pd +import os from .config import config -KUSTO_URI = "https://kvc7rq22b9ye5d4a4fgyas.northeurope.kusto.windows.net" -KUSTO_INGEST_URI = "https://ingest-kvc7rq22b9ye5d4a4fgyas.northeurope.kusto.windows.net" -KUSTO_DATABASE = "ConnTest" - -DESTINATION_TABLE = "PopulationDataNew" -DESTINATION_TABLE_COLUMN_MAPPING = "PopulationDataNew_CSV_Mapping" - - - -def test(): - kcsb_ingest = KustoConnectionStringBuilder.with_interactive_login(KUSTO_INGEST_URI) - kcsb_data = KustoConnectionStringBuilder.with_interactive_login(KUSTO_URI) - - kusto_client = KustoClient(kcsb_data) - create_table_command = f".create table {DESTINATION_TABLE} (State: string, Population: int)" - kusto_client.execute_mgmt(KUSTO_DATABASE, create_table_command) - - create_mapping_command = """.create table PopulationDataNew ingestion csv mapping 'PopulationDataNew_CSV_Mapping' '[{"Name":"State","datatype":"string","Ordinal":0}, {"Name":"Population","datatype":"int","Ordinal":1}]'""" - kusto_client.execute_mgmt(KUSTO_DATABASE, create_mapping_command) - - ingestion_client = QueuedIngestClient(kcsb_ingest) - ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV, - ignore_first_record=True) - mapping = {'State': ['Texas', 'New York', 'Arizona'], 'Population': [300, 400, 500]} - dataframe = pd.DataFrame(data=mapping) - ingestion_client.ingest_from_dataframe(dataframe, ingestion_props) - print('Done queuing up ingestion with Azure Data Explorer') +AAD_APP_ID = os.getenv("adx_aad_app_id") +APP_KEY = os.getenv("adx_app_key") +AUTHORITY_ID = os.getenv("adx_authority_id") + +print(AAD_APP_ID) +print(APP_KEY) +print(AUTHORITY_ID) + +KUSTO_URI = config["kusto_uri"] +KUSTO_INGEST_URI = config["kusto_ingest_uri"] +KUSTO_DATABASE = config["kusto_db"] + + +# DESTINATION_TABLE = "PopulationDataNew" +# DESTINATION_TABLE_COLUMN_MAPPING = "PopulationDataNew_CSV_Mapping" + + +# def test(): +# #interactive +# # kcsb_ingest = KustoConnectionStringBuilder.with_interactive_login(KUSTO_INGEST_URI) +# # kcsb_data = KustoConnectionStringBuilder.with_interactive_login(KUSTO_URI) +# #secure +# kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_INGEST_URI, AAD_APP_ID, APP_KEY, AUTHORITY_ID) +# kcsb_data = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_URI, AAD_APP_ID, APP_KEY, AUTHORITY_ID) +# +# kusto_client = KustoClient(kcsb_data) +# create_table_command = f".create table {DESTINATION_TABLE} (State: string, Population: int)" +# kusto_client.execute_mgmt("ConnTest", create_table_command) +# +# create_mapping_command = """.create table PopulationDataNew ingestion csv mapping 'PopulationDataNew_CSV_Mapping' '[{"Name":"State","datatype":"string","Ordinal":0}, {"Name":"Population","datatype":"int","Ordinal":1}]'""" +# kusto_client.execute_mgmt("ConnTest", create_mapping_command) +# +# ingestion_client = QueuedIngestClient(kcsb_ingest) +# ingestion_props = IngestionProperties(database="ConnTest", table=DESTINATION_TABLE, data_format=DataFormat.CSV, +# ignore_first_record=True) +# mapping = {'State': ['Texas', 'New York', 'Arizona'], 'Population': [300, 400, 500]} +# dataframe = pd.DataFrame(data=mapping) +# ingestion_result = ingestion_client.ingest_from_dataframe(dataframe, ingestion_props) +# print('Done queuing up ingestion with Azure Data Explorer') +# print(f"Ingestion_result: {ingestion_result}") +# import time +# time.sleep(50) +# sample_query = "PopulationDataNew | summarize max(Population), min(Population), avg(Population) by State" +# sample_response = kusto_client.execute("ConnTest", sample_query) +# for row in sample_response.primary_results[0]: +# # printing specific columns by index +# print("value at 0 {}".format(row[0])) +# print("\n") +# # printing specific columns by name +# print("EventType:{}".format(row["State"])) +# print("EventType:{}".format(row["max_Population"])) +# print("EventType:{}".format(row["min_Population"])) +# print("EventType:{}".format(row["avg_Population"])) +# try: +# delete_table_command = f".drop table PopulationDataNew2" +# kusto_client.execute_mgmt(KUSTO_DATABASE, delete_table_command) +# except KustoApiError: +# print("Could not delete table. Table was probably not found") +# delete_table_command = f".drop table PopulationDataNew" +# kusto_client.execute_mgmt(KUSTO_DATABASE, delete_table_command) def _ingestion_client(): - kcsb_ingest = KustoConnectionStringBuilder.with_interactive_login(KUSTO_INGEST_URI) + kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_INGEST_URI, AAD_APP_ID, + APP_KEY, AUTHORITY_ID) return QueuedIngestClient(kcsb_ingest) -def init(): - kcsb_data = KustoConnectionStringBuilder.with_interactive_login(KUSTO_URI) - kusto_client = KustoClient(kcsb_data) +def _kusto_client(): + kcsb_data = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_URI, AAD_APP_ID, APP_KEY, + AUTHORITY_ID) + return KustoClient(kcsb_data) + +def init(): + kusto_client = _kusto_client() if config["use_multiple_tables"]: table_names = ["events0", "events1", "events2", "events3"] else: @@ -54,14 +96,20 @@ def init(): if config["clean_database"]: for table_name in ["events0", "events1", "events2", "events3", "events"]: - delete_table_command = f".drop table {table_name}" - kusto_client.execute_mgmt(KUSTO_DATABASE, delete_table_command) - + try: + delete_table_command = f".drop table {table_name}" + kusto_client.execute_mgmt(KUSTO_DATABASE, delete_table_command) + except KustoApiError: + print(f"Could not delete table. Table '{table_name}' was probably not found") for table_name in table_names: create_table_command = f".create table {table_name} (timestamp: long, device_id: string, sequence_number: long, temperature: real)" kusto_client.execute_mgmt(KUSTO_DATABASE, create_table_command) - create_mapping_command = f""".create table {table_name} ingestion csv mapping '{table_name}_CSV_Mapping' '[{"Name":"timestamp","datatype":"long","Ordinal":0}, {"Name":"device_id","datatype":"string","Ordinal":1}, {"Name":"sequence_number","datatype":"long","Ordinal":2}, {"Name":"temperature","datatype":"real","Ordinal":3}]'""" + print(f"Enable streaming for {table_name}") + enable_streaming_command = f".alter table {table_name} policy streamingingestion enable" + kusto_client.execute_mgmt(KUSTO_DATABASE, enable_streaming_command) + + create_mapping_command = f""".create table {table_name} ingestion csv mapping '{table_name}_CSV_Mapping' '[{{"Name":"timestamp","datatype":"long","Ordinal":0}}, {{"Name":"device_id","datatype":"string","Ordinal":1}}, {{"Name":"sequence_number","datatype":"long","Ordinal":2}}, {{"Name":"temperature","datatype":"real","Ordinal":3}}]'""" kusto_client.execute_mgmt(KUSTO_DATABASE, create_mapping_command) @@ -82,16 +130,41 @@ def _batch_insert(events, batch_size, table_names): sequence_numbers = [] temperatures = [] for idx, event in enumerate(events): - table = table_names[idx % len(table_names)] timestamps.append(event.timestamp) device_ids.append(event.device_id) sequence_numbers.append(event.sequence_number) temperatures.append(event.temperature) count += 1 if count >= batch_size: + table = table_names[int(idx / batch_size) % len(table_names)] + print(f"Insert {count} entries into {table}") _ingest(table, timestamps, device_ids, sequence_numbers, temperatures) + timestamps.clear() + device_ids.clear() + sequence_numbers.clear() + temperatures.clear() + count = 0 if count > 0: - _ingest(table, timestamps, device_ids, sequence_numbers, temperatures) + print(f"Insert {count} entries into {table_names[0]}") + _ingest(table_names[0], timestamps, device_ids, sequence_numbers, temperatures) + + +def _stream_insert(events, table_names): + number_of_tables = len(table_names) + number_of_inserts = int(config["num_inserts"]) + inserts_per_table = number_of_inserts // number_of_tables + for table in table_names: + buffered_io = io.BytesIO() + for event in itertools.islice(events, inserts_per_table): + buffered_io.write(json.dumps(event.__dict__).encode('utf-8')) + # (buffered_io.write(json.dumps(event.__dict__).encode('utf-8')) for event in itertools.islice(events, inserts_per_table)) + print(f"Ingest {inserts_per_table} into {table}") + print(f"Bytes: {buffered_io.getbuffer().nbytes}") + ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, + ignore_first_record=False) + result = _ingestion_client().ingest_from_stream(buffered_io, ingestion_props) + print(result) + def _ingest(table, timestamps, device_ids, sequence_numbers, temperatures): ingestion_client = _ingestion_client() @@ -100,11 +173,8 @@ def _ingest(table, timestamps, device_ids, sequence_numbers, temperatures): dataframe = pd.DataFrame(data=ingestion_data) ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, data_format=DataFormat.CSV, ignore_first_record=True) - ingestion_client.ingest_from_dataframe(dataframe, ingestion_props) - - -def _stream_insert(events, table_names): - pass + result = ingestion_client.ingest_from_dataframe(dataframe, ingestion_props) + print(result) def _insert_events(events, batch_mode, batch_size): @@ -122,5 +192,31 @@ def _insert_events(events, batch_mode, batch_size): _stream_insert(events, table_names) +_queries = { + "count-events": "events | count", + "temperature-min-max": "events| summarize max(temperature), min(temperature)", + "temperature-stats": "events| summarize max(temperature), avg(temperature), min(temperature)", + "temperature-stats-per-device": "events | summarize max(temperature), avg(temperature), min(temperature) by device_id", + "newest-per-device": "events | partition by device_id (top 1 by timestamp desc | project device_id, temperature)", +} + + def queries(): - db = _db() + if "queries" in config: + included = config["queries"].split(",") + for key in list(_queries.keys()): + if key not in included: + del _queries[key] + print(_queries) + query_times = dict([(name, []) for name in _queries.keys()]) + for _ in range(0, int(config["runs"])): + for name, query in _queries.items(): + print(f"Executing query {name}", flush=True) + start = time.time() + result = _kusto_client().execute(KUSTO_DATABASE, query) + print(result) + duration = time.time() - start + print(f"Finished query. Duration: {duration}", flush=True) + query_times[name].append(duration) + + return query_times diff --git a/simulator/modules/config.py b/simulator/modules/config.py index edf5d1f..0c4db16 100644 --- a/simulator/modules/config.py +++ b/simulator/modules/config.py @@ -5,3 +5,6 @@ print(os.getenv("RUN_CONFIG")) print(base64.b64decode(os.getenv("RUN_CONFIG"))) config = json.loads(base64.b64decode(os.getenv("RUN_CONFIG"))) + +config = json.loads(base64.b64decode("ewoidGFzayI6ICJpbnNlcnQiLAoicXVlcmllcyI6ImNvdW50LWV2ZW50cyx0ZW1wZXJhdHVyZS1taW4tbWF4LHRlbXBlcmF0dXJlLXN0YXRzLHRlbXBlcmF0dXJlLXN0YXRzLXBlci1kZXZpY2UsbmV3ZXN0LXBlci1kZXZpY2UiLAoicnVucyI6ICIxIiwKIm51bV9pbnNlcnRzIjogIjEwODAiLAoidXNlX211bHRpcGxlX3RhYmxlcyI6IGZhbHNlLAoiY2xlYW5fZGF0YWJhc2UiOiB0cnVlLAoicHJlZmlsbCI6ICIwIiwKImt1c3RvX3VyaSI6ICJodHRwczovL2FkeGNvbXBhcmUud2VzdGV1cm9wZS5rdXN0by53aW5kb3dzLm5ldCIsCiJrdXN0b19pbmdlc3RfdXJpIjogImh0dHBzOi8vaW5nZXN0LWFkeGNvbXBhcmUud2VzdGV1cm9wZS5rdXN0by53aW5kb3dzLm5ldCIsCiJrdXN0b19kYiI6ICJDb25uVGVzdCIKfQ==")) +print(config) \ No newline at end of file From 3cca2dc55167085df303ee2482fc0b1140d1cbf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Tue, 14 Mar 2023 23:44:22 +0100 Subject: [PATCH 06/27] Improve error handling and add first main.tf --- config.yaml | 2 +- dbinstall/main.tf | 36 +++++ deployment/values.yaml | 2 +- simulator/main.py | 1 + simulator/modules/azure_data_explorer.py | 160 ++++++++++++----------- simulator/modules/config.py | 2 - 6 files changed, 125 insertions(+), 78 deletions(-) create mode 100644 dbinstall/main.tf diff --git a/config.yaml b/config.yaml index b34e7c5..1e08559 100644 --- a/config.yaml +++ b/config.yaml @@ -60,5 +60,5 @@ targets: module: azure_data_explorer kusto_uri: https://adxcompare.westeurope.kusto.windows.net kusto_ingest_uri: https://ingest-adxcompare.westeurope.kusto.windows.net - kusto_db: ConnTest + kusto_db: SampleDB namespace: default diff --git a/dbinstall/main.tf b/dbinstall/main.tf new file mode 100644 index 0000000..7935fd1 --- /dev/null +++ b/dbinstall/main.tf @@ -0,0 +1,36 @@ +terraform { + required_providers { + azurerm = { + source = "hashicorp/azurerm" + version = "~> 3.0.2" + } + } + + required_version = ">= 1.1.0" +} + +provider "azurerm" { + features {} +} + + +resource "azurerm_resource_group" “rg-compare” { + name = "db-performance-comparison" + location = "West Europe" +} + +resource "azurerm_kusto_cluster" "example" { + name = "adxcompare" + location = azurerm_resource_group.rg-compare.location + resource_group_name = azurerm_resource_group.rg-compare.name + + sku { + name = "Standard_D13_v2" + capacity = 2 + } + + tags = { + cost center = "IOT", + environment = "iot-lab" + } +} \ No newline at end of file diff --git a/deployment/values.yaml b/deployment/values.yaml index 7133898..9b7ebb8 100644 --- a/deployment/values.yaml +++ b/deployment/values.yaml @@ -1,7 +1,7 @@ --- namespace: default target_module: azure_data_explorer -run_config: ewoidGFzayI6ICJpbnNlcnQiLAoidXNlX211bHRpcGxlX3RhYmxlcyI6ICJmYWxzZSIsCiJjbGVhbl9kYXRhYmFzZSI6ICJ0cnVlIiwKImt1c3RvX3VyaSI6ICJodHRwczovL2FkeGNvbXBhcmUud2VzdGV1cm9wZS5rdXN0by53aW5kb3dzLm5ldCIsCiJrdXN0b19pbmdlc3RfdXJpIjogImh0dHBzOi8vaW5nZXN0LWFkeGNvbXBhcmUud2VzdGV1cm9wZS5rdXN0by53aW5kb3dzLm5ldCIsCiJrdXN0b19kYiI6ICJDb25uVGVzdCIKfQ== # ewoidGFzayI6ICJpbnNlcnQiLAoidXNlX211bHRpcGxlX3RhYmxlcyI6ICJmYWxzZSIsCiJjbGVhbl9kYXRhYmFzZSI6ICJ0cnVlIgp9 +run_config: ewoidGFzayI6ICJpbnNlcnQiLAoidXNlX211bHRpcGxlX3RhYmxlcyI6IHRydWUsCiJjbGVhbl9kYXRhYmFzZSI6IGZhbHNlLAoiYmF0Y2hfbW9kZSI6IHRydWUsCiJwcmVmaWxsIjogMTAwMDAsCiJydW5zIjogMSwKIm51bV9pbnNlcnRzIjogMTAwMDAsCiJrdXN0b191cmkiOiAiaHR0cHM6Ly9hZHhjb21wYXJlLndlc3RldXJvcGUua3VzdG8ud2luZG93cy5uZXQiLAoia3VzdG9faW5nZXN0X3VyaSI6ICJodHRwczovL2luZ2VzdC1hZHhjb21wYXJlLndlc3RldXJvcGUua3VzdG8ud2luZG93cy5uZXQiLAoia3VzdG9fZGIiOiAiU2FtcGxlREIiCn0= workers: 1 image: diff --git a/simulator/main.py b/simulator/main.py index 73a8432..7925d6d 100644 --- a/simulator/main.py +++ b/simulator/main.py @@ -1,5 +1,6 @@ import os +print("v1") instance_type = os.environ.get("INSTANCE_TYPE") if instance_type == "worker": diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py index 28cde97..f9fd5e8 100644 --- a/simulator/modules/azure_data_explorer.py +++ b/simulator/modules/azure_data_explorer.py @@ -10,7 +10,6 @@ import pandas as pd import os - from .config import config AAD_APP_ID = os.getenv("adx_aad_app_id") @@ -25,34 +24,33 @@ KUSTO_INGEST_URI = config["kusto_ingest_uri"] KUSTO_DATABASE = config["kusto_db"] +print(KUSTO_URI) +print(KUSTO_INGEST_URI) +print(KUSTO_DATABASE) -# DESTINATION_TABLE = "PopulationDataNew" -# DESTINATION_TABLE_COLUMN_MAPPING = "PopulationDataNew_CSV_Mapping" +def test(): + # #interactive + # # kcsb_ingest = KustoConnectionStringBuilder.with_interactive_login(KUSTO_INGEST_URI) + # # kcsb_data = KustoConnectionStringBuilder.with_interactive_login(KUSTO_URI) + # #secure + kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication("https://ingest-adxcompare.westeurope.kusto.windows.net", AAD_APP_ID, APP_KEY, AUTHORITY_ID) + kcsb_data = KustoConnectionStringBuilder.with_aad_application_key_authentication("https://adxcompare.westeurope.kusto.windows.net", AAD_APP_ID, APP_KEY, AUTHORITY_ID) + kusto_client = KustoClient(kcsb_data) + create_table_command = f".create table PopulationTable (State: string, Population: int)" + kusto_client.execute_mgmt("SampleDB", create_table_command) -# def test(): -# #interactive -# # kcsb_ingest = KustoConnectionStringBuilder.with_interactive_login(KUSTO_INGEST_URI) -# # kcsb_data = KustoConnectionStringBuilder.with_interactive_login(KUSTO_URI) -# #secure -# kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_INGEST_URI, AAD_APP_ID, APP_KEY, AUTHORITY_ID) -# kcsb_data = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_URI, AAD_APP_ID, APP_KEY, AUTHORITY_ID) -# -# kusto_client = KustoClient(kcsb_data) -# create_table_command = f".create table {DESTINATION_TABLE} (State: string, Population: int)" -# kusto_client.execute_mgmt("ConnTest", create_table_command) -# -# create_mapping_command = """.create table PopulationDataNew ingestion csv mapping 'PopulationDataNew_CSV_Mapping' '[{"Name":"State","datatype":"string","Ordinal":0}, {"Name":"Population","datatype":"int","Ordinal":1}]'""" -# kusto_client.execute_mgmt("ConnTest", create_mapping_command) -# -# ingestion_client = QueuedIngestClient(kcsb_ingest) -# ingestion_props = IngestionProperties(database="ConnTest", table=DESTINATION_TABLE, data_format=DataFormat.CSV, -# ignore_first_record=True) -# mapping = {'State': ['Texas', 'New York', 'Arizona'], 'Population': [300, 400, 500]} -# dataframe = pd.DataFrame(data=mapping) -# ingestion_result = ingestion_client.ingest_from_dataframe(dataframe, ingestion_props) -# print('Done queuing up ingestion with Azure Data Explorer') -# print(f"Ingestion_result: {ingestion_result}") + create_mapping_command = """.create table PopulationTable ingestion csv mapping 'PopulationDataNew_CSV_Mapping' '[{"Name":"State","datatype":"string","Ordinal":0}, {"Name":"Population","datatype":"int","Ordinal":1}]'""" + kusto_client.execute_mgmt("SampleDB", create_mapping_command) + + ingestion_client = QueuedIngestClient(kcsb_ingest) + ingestion_props = IngestionProperties(database="SampleDB", table="PopulationTable", data_format=DataFormat.CSV, + ignore_first_record=True) + mapping = {'State': ['Texas', 'New York', 'Arizona'], 'Population': [300, 400, 500]} + dataframe = pd.DataFrame(data=mapping) + ingestion_result = ingestion_client.ingest_from_dataframe(dataframe, ingestion_props) + print('Done queuing up ingestion with Azure Data Explorer') + print(f"Ingestion_result: {ingestion_result}") # import time # time.sleep(50) # sample_query = "PopulationDataNew | summarize max(Population), min(Population), avg(Population) by State" @@ -75,42 +73,54 @@ # kusto_client.execute_mgmt(KUSTO_DATABASE, delete_table_command) + + +# +# +# DESTINATION_TABLE = "PopulationDataNew" +# DESTINATION_TABLE_COLUMN_MAPPING = "PopulationDataNew_CSV_Mapping" + + + def _ingestion_client(): - kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_INGEST_URI, AAD_APP_ID, - APP_KEY, AUTHORITY_ID) + kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_INGEST_URI, AAD_APP_ID, APP_KEY, AUTHORITY_ID) return QueuedIngestClient(kcsb_ingest) def _kusto_client(): - kcsb_data = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_URI, AAD_APP_ID, APP_KEY, - AUTHORITY_ID) + kcsb_data = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_URI, AAD_APP_ID, APP_KEY, AUTHORITY_ID) return KustoClient(kcsb_data) def init(): - kusto_client = _kusto_client() if config["use_multiple_tables"]: table_names = ["events0", "events1", "events2", "events3"] else: table_names = ["events"] - - if config["clean_database"]: - for table_name in ["events0", "events1", "events2", "events3", "events"]: - try: - delete_table_command = f".drop table {table_name}" - kusto_client.execute_mgmt(KUSTO_DATABASE, delete_table_command) - except KustoApiError: - print(f"Could not delete table. Table '{table_name}' was probably not found") - for table_name in table_names: - create_table_command = f".create table {table_name} (timestamp: long, device_id: string, sequence_number: long, temperature: real)" - kusto_client.execute_mgmt(KUSTO_DATABASE, create_table_command) - - print(f"Enable streaming for {table_name}") - enable_streaming_command = f".alter table {table_name} policy streamingingestion enable" - kusto_client.execute_mgmt(KUSTO_DATABASE, enable_streaming_command) - - create_mapping_command = f""".create table {table_name} ingestion csv mapping '{table_name}_CSV_Mapping' '[{{"Name":"timestamp","datatype":"long","Ordinal":0}}, {{"Name":"device_id","datatype":"string","Ordinal":1}}, {{"Name":"sequence_number","datatype":"long","Ordinal":2}}, {{"Name":"temperature","datatype":"real","Ordinal":3}}]'""" - kusto_client.execute_mgmt(KUSTO_DATABASE, create_mapping_command) + with _kusto_client() as kusto_client: + response = kusto_client.execute(KUSTO_DATABASE, f""".show tables | where DatabaseName == "{KUSTO_DATABASE}" """) + existing_tables = [row[0] for row in response.primary_results[0]] + print(f"Following tables already exist: {existing_tables}") + if config["clean_database"]: + for table_name in existing_tables: + try: + print(f"Delete table {table_name}") + delete_table_command = f".drop table {table_name}" + kusto_client.execute_mgmt(KUSTO_DATABASE, delete_table_command) + except KustoApiError as error: + print(f"Could not delete table, due to:\n {error}") + table_names = [table_name for table_name in table_names if table_name not in existing_tables] + for table_name in table_names: + print(f"Create table {table_name}") + create_table_command = f".create table {table_name} (timestamp: long, device_id: string, sequence_number: long, temperature: real)" + kusto_client.execute_mgmt(KUSTO_DATABASE, create_table_command) + + # print(f"Enable streaming for {table_name}") + # enable_streaming_command = f".alter table {table_name} policy streamingingestion enable" + # kusto_client.execute_mgmt(KUSTO_DATABASE, enable_streaming_command) + + create_mapping_command = f""".create table {table_name} ingestion csv mapping '{table_name}_CSV_Mapping' '[{{"Name":"timestamp","datatype":"long","Ordinal":0}}, {{"Name":"device_id","datatype":"string","Ordinal":1}}, {{"Name":"sequence_number","datatype":"long","Ordinal":2}}, {{"Name":"temperature","datatype":"real","Ordinal":3}}]'""" + kusto_client.execute_mgmt(KUSTO_DATABASE, create_mapping_command) def prefill_events(events): @@ -154,27 +164,28 @@ def _stream_insert(events, table_names): number_of_inserts = int(config["num_inserts"]) inserts_per_table = number_of_inserts // number_of_tables for table in table_names: - buffered_io = io.BytesIO() - for event in itertools.islice(events, inserts_per_table): - buffered_io.write(json.dumps(event.__dict__).encode('utf-8')) - # (buffered_io.write(json.dumps(event.__dict__).encode('utf-8')) for event in itertools.islice(events, inserts_per_table)) - print(f"Ingest {inserts_per_table} into {table}") - print(f"Bytes: {buffered_io.getbuffer().nbytes}") - ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, - ignore_first_record=False) - result = _ingestion_client().ingest_from_stream(buffered_io, ingestion_props) - print(result) + with _ingestion_client() as ingestion_client: + buffered_io = io.BytesIO() + for event in itertools.islice(events, inserts_per_table): + buffered_io.write(json.dumps(event.__dict__).encode('utf-8')) + # (buffered_io.write(json.dumps(event.__dict__).encode('utf-8')) for event in itertools.islice(events, inserts_per_table)) + print(f"Ingest {inserts_per_table} into {table}") + print(f"Bytes: {buffered_io.getbuffer().nbytes}") + ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, + ignore_first_record=False) + result = ingestion_client.ingest_from_stream(buffered_io, ingestion_props) + print(result) def _ingest(table, timestamps, device_ids, sequence_numbers, temperatures): - ingestion_client = _ingestion_client() - ingestion_data = {'timestamp': timestamps, 'device_id': device_ids, 'sequence_number': sequence_numbers, - 'temperature': temperatures} - dataframe = pd.DataFrame(data=ingestion_data) - ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, data_format=DataFormat.CSV, - ignore_first_record=True) - result = ingestion_client.ingest_from_dataframe(dataframe, ingestion_props) - print(result) + with _ingestion_client() as ingestion_client: + ingestion_data = {'timestamp': timestamps, 'device_id': device_ids, 'sequence_number': sequence_numbers, + 'temperature': temperatures} + dataframe = pd.DataFrame(data=ingestion_data) + ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, data_format=DataFormat.CSV, + ignore_first_record=True) + result = ingestion_client.ingest_from_dataframe(dataframe, ingestion_props) + print(result) def _insert_events(events, batch_mode, batch_size): @@ -211,12 +222,13 @@ def queries(): query_times = dict([(name, []) for name in _queries.keys()]) for _ in range(0, int(config["runs"])): for name, query in _queries.items(): - print(f"Executing query {name}", flush=True) - start = time.time() - result = _kusto_client().execute(KUSTO_DATABASE, query) - print(result) - duration = time.time() - start - print(f"Finished query. Duration: {duration}", flush=True) - query_times[name].append(duration) + with _kusto_client() as kusto_client: + print(f"Executing query {name}", flush=True) + start = time.time() + result = kusto_client.execute(KUSTO_DATABASE, query) + print(result) + duration = time.time() - start + print(f"Finished query. Duration: {duration}", flush=True) + query_times[name].append(duration) return query_times diff --git a/simulator/modules/config.py b/simulator/modules/config.py index 0c4db16..ff66817 100644 --- a/simulator/modules/config.py +++ b/simulator/modules/config.py @@ -5,6 +5,4 @@ print(os.getenv("RUN_CONFIG")) print(base64.b64decode(os.getenv("RUN_CONFIG"))) config = json.loads(base64.b64decode(os.getenv("RUN_CONFIG"))) - -config = json.loads(base64.b64decode("ewoidGFzayI6ICJpbnNlcnQiLAoicXVlcmllcyI6ImNvdW50LWV2ZW50cyx0ZW1wZXJhdHVyZS1taW4tbWF4LHRlbXBlcmF0dXJlLXN0YXRzLHRlbXBlcmF0dXJlLXN0YXRzLXBlci1kZXZpY2UsbmV3ZXN0LXBlci1kZXZpY2UiLAoicnVucyI6ICIxIiwKIm51bV9pbnNlcnRzIjogIjEwODAiLAoidXNlX211bHRpcGxlX3RhYmxlcyI6IGZhbHNlLAoiY2xlYW5fZGF0YWJhc2UiOiB0cnVlLAoicHJlZmlsbCI6ICIwIiwKImt1c3RvX3VyaSI6ICJodHRwczovL2FkeGNvbXBhcmUud2VzdGV1cm9wZS5rdXN0by53aW5kb3dzLm5ldCIsCiJrdXN0b19pbmdlc3RfdXJpIjogImh0dHBzOi8vaW5nZXN0LWFkeGNvbXBhcmUud2VzdGV1cm9wZS5rdXN0by53aW5kb3dzLm5ldCIsCiJrdXN0b19kYiI6ICJDb25uVGVzdCIKfQ==")) print(config) \ No newline at end of file From 684fed62cc344f2b09afcb6fc9fa8bf36d46cb59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Sat, 18 Mar 2023 10:23:58 +0100 Subject: [PATCH 07/27] Improve terraform --- dbinstall/azure_data_explorer/main.tf | 85 +++++++++++++++++++++++++++ dbinstall/main.tf | 36 ------------ 2 files changed, 85 insertions(+), 36 deletions(-) create mode 100644 dbinstall/azure_data_explorer/main.tf delete mode 100644 dbinstall/main.tf diff --git a/dbinstall/azure_data_explorer/main.tf b/dbinstall/azure_data_explorer/main.tf new file mode 100644 index 0000000..6f28d2d --- /dev/null +++ b/dbinstall/azure_data_explorer/main.tf @@ -0,0 +1,85 @@ +terraform { + required_providers { + azurerm = { + source = "hashicorp/azurerm" + version = "~> 3.0.2" + } + } + + required_version = ">= 1.1.0" +} + +provider "azurerm" { + features {} +} + + +resource "azurerm_resource_group" "rg-compare" { + name = "db-performance-comparison2" + location = "West Europe" + + tags = { + "cost center" = "IOT", + environment = "iot-lab" + } +} + +resource "azurerm_kusto_cluster" "adxcompare" { + name = "adxcompare" + location = azurerm_resource_group.rg-compare.location + resource_group_name = azurerm_resource_group.rg-compare.name + streaming_ingestion_enabled = true + + sku { + name = "Dev(No SLA)_Standard_E2a_v4" + capacity = 1 + } + + tags = { + "cost center" = "IOT", + environment = "iot-lab" + } +} + +resource "azurerm_kusto_database" "sample-db" { + name = "SampleDB" + resource_group_name = azurerm_resource_group.rg-compare.name + location = azurerm_resource_group.rg-compare.location + cluster_name = azurerm_kusto_cluster.adxcompare.name + + hot_cache_period = "P1D" + soft_delete_period = "P1D" +} + +data "azurerm_client_config" "current" { +} + + +data "azuread_service_principal" "service-principle" { + display_name = "mw_iot_ADX-DB-Comparison" +} + +/* +output "tenant_id" { + value = data.azurerm_client_config.current.tenant_id +} + +output "principal_id" { + value = data.azuread_service_principal.service-principle.id +} + +output "principal_id2" { + value = data.azurerm_client_config.current.client_id +} +*/ +resource "azurerm_kusto_database_principal_assignment" "ad-permission" { + name = "AD-Permission" + resource_group_name = azurerm_resource_group.rg-compare.name + cluster_name = azurerm_kusto_cluster.adxcompare.name + database_name = azurerm_kusto_database.sample-db.name + + tenant_id = data.azurerm_client_config.current.tenant_id + principal_id = data.azuread_service_principal.service-principle.id + principal_type = "App" + role = "Admin" +} \ No newline at end of file diff --git a/dbinstall/main.tf b/dbinstall/main.tf deleted file mode 100644 index 7935fd1..0000000 --- a/dbinstall/main.tf +++ /dev/null @@ -1,36 +0,0 @@ -terraform { - required_providers { - azurerm = { - source = "hashicorp/azurerm" - version = "~> 3.0.2" - } - } - - required_version = ">= 1.1.0" -} - -provider "azurerm" { - features {} -} - - -resource "azurerm_resource_group" “rg-compare” { - name = "db-performance-comparison" - location = "West Europe" -} - -resource "azurerm_kusto_cluster" "example" { - name = "adxcompare" - location = azurerm_resource_group.rg-compare.location - resource_group_name = azurerm_resource_group.rg-compare.name - - sku { - name = "Standard_D13_v2" - capacity = 2 - } - - tags = { - cost center = "IOT", - environment = "iot-lab" - } -} \ No newline at end of file From 28ac758a0dd42d4adecf83f4a756786d4aeeac8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Sat, 18 Mar 2023 15:35:28 +0100 Subject: [PATCH 08/27] Improve stream ingestion (still not working) --- deployment/values.yaml | 2 +- simulator/modules/azure_data_explorer.py | 37 +++++++++++++++--------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/deployment/values.yaml b/deployment/values.yaml index 9b7ebb8..d27cd95 100644 --- a/deployment/values.yaml +++ b/deployment/values.yaml @@ -1,7 +1,7 @@ --- namespace: default target_module: azure_data_explorer -run_config: ewoidGFzayI6ICJpbnNlcnQiLAoidXNlX211bHRpcGxlX3RhYmxlcyI6IHRydWUsCiJjbGVhbl9kYXRhYmFzZSI6IGZhbHNlLAoiYmF0Y2hfbW9kZSI6IHRydWUsCiJwcmVmaWxsIjogMTAwMDAsCiJydW5zIjogMSwKIm51bV9pbnNlcnRzIjogMTAwMDAsCiJrdXN0b191cmkiOiAiaHR0cHM6Ly9hZHhjb21wYXJlLndlc3RldXJvcGUua3VzdG8ud2luZG93cy5uZXQiLAoia3VzdG9faW5nZXN0X3VyaSI6ICJodHRwczovL2luZ2VzdC1hZHhjb21wYXJlLndlc3RldXJvcGUua3VzdG8ud2luZG93cy5uZXQiLAoia3VzdG9fZGIiOiAiU2FtcGxlREIiCn0= +run_config: ewoidGFzayI6ICJpbnNlcnQiLAoidXNlX211bHRpcGxlX3RhYmxlcyI6IHRydWUsCiJjbGVhbl9kYXRhYmFzZSI6IHRydWUsCiJiYXRjaF9tb2RlIjogZmFsc2UsCiJwcmVmaWxsIjogMCwKInJ1bnMiOiAxLAoibnVtX2luc2VydHMiOiAxMDAwMCwKImt1c3RvX3VyaSI6ICJodHRwczovL2FkeGNvbXBhcmUud2VzdGV1cm9wZS5rdXN0by53aW5kb3dzLm5ldCIsCiJrdXN0b19pbmdlc3RfdXJpIjogImh0dHBzOi8vaW5nZXN0LWFkeGNvbXBhcmUud2VzdGV1cm9wZS5rdXN0by53aW5kb3dzLm5ldCIsCiJrdXN0b19kYiI6ICJTYW1wbGVEQiIKfQ== workers: 1 image: diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py index f9fd5e8..5941045 100644 --- a/simulator/modules/azure_data_explorer.py +++ b/simulator/modules/azure_data_explorer.py @@ -1,12 +1,13 @@ import io import itertools import json +import pickle import time from azure.kusto.data import KustoClient, KustoConnectionStringBuilder from azure.kusto.data.exceptions import KustoApiError from azure.kusto.data import DataFormat -from azure.kusto.ingest import QueuedIngestClient, IngestionProperties +from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, StreamDescriptor import pandas as pd import os @@ -109,15 +110,17 @@ def init(): kusto_client.execute_mgmt(KUSTO_DATABASE, delete_table_command) except KustoApiError as error: print(f"Could not delete table, due to:\n {error}") - table_names = [table_name for table_name in table_names if table_name not in existing_tables] + else: + table_names = [table_name for table_name in table_names if table_name not in existing_tables] for table_name in table_names: print(f"Create table {table_name}") create_table_command = f".create table {table_name} (timestamp: long, device_id: string, sequence_number: long, temperature: real)" kusto_client.execute_mgmt(KUSTO_DATABASE, create_table_command) - # print(f"Enable streaming for {table_name}") - # enable_streaming_command = f".alter table {table_name} policy streamingingestion enable" - # kusto_client.execute_mgmt(KUSTO_DATABASE, enable_streaming_command) + if not config.get("batch_mode", True): + print(f"Enable streaming for {table_name}") + enable_streaming_command = f".alter table {table_name} policy streamingingestion enable" + kusto_client.execute_mgmt(KUSTO_DATABASE, enable_streaming_command) create_mapping_command = f""".create table {table_name} ingestion csv mapping '{table_name}_CSV_Mapping' '[{{"Name":"timestamp","datatype":"long","Ordinal":0}}, {{"Name":"device_id","datatype":"string","Ordinal":1}}, {{"Name":"sequence_number","datatype":"long","Ordinal":2}}, {{"Name":"temperature","datatype":"real","Ordinal":3}}]'""" kusto_client.execute_mgmt(KUSTO_DATABASE, create_mapping_command) @@ -165,16 +168,24 @@ def _stream_insert(events, table_names): inserts_per_table = number_of_inserts // number_of_tables for table in table_names: with _ingestion_client() as ingestion_client: - buffered_io = io.BytesIO() - for event in itertools.islice(events, inserts_per_table): - buffered_io.write(json.dumps(event.__dict__).encode('utf-8')) - # (buffered_io.write(json.dumps(event.__dict__).encode('utf-8')) for event in itertools.islice(events, inserts_per_table)) + events_partition = list(itertools.islice(events, inserts_per_table)) + byte_sequence = pickle.dumps(events_partition) + bytes_stream = io.BytesIO(byte_sequence) + stream_descriptor = StreamDescriptor(bytes_stream) print(f"Ingest {inserts_per_table} into {table}") - print(f"Bytes: {buffered_io.getbuffer().nbytes}") - ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, - ignore_first_record=False) - result = ingestion_client.ingest_from_stream(buffered_io, ingestion_props) + ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, data_format=DataFormat.CSV) + result = ingestion_client.ingest_from_stream(stream_descriptor, ingestion_props) print(result) + # buffered_io = io.BytesIO() + # for event in itertools.islice(events, inserts_per_table): + # buffered_io.write(json.dumps(event.__dict__).encode('utf-8')) + # # (buffered_io.write(json.dumps(event.__dict__).encode('utf-8')) for event in itertools.islice(events, inserts_per_table)) + # print(f"Ingest {inserts_per_table} into {table}") + # print(f"Bytes: {buffered_io.getbuffer().nbytes}") + # ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, + # ignore_first_record=False) + # result = ingestion_client.ingest_from_stream(buffered_io, ingestion_props) + # print(result) def _ingest(table, timestamps, device_ids, sequence_numbers, temperatures): From ac13e7ad03cbfb2b93079e5b8d2d51e492824f57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Mon, 20 Mar 2023 00:16:23 +0100 Subject: [PATCH 09/27] Improve stream ingestion (still not very efficient) --- simulator/modules/azure_data_explorer.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py index 5941045..f0eff6c 100644 --- a/simulator/modules/azure_data_explorer.py +++ b/simulator/modules/azure_data_explorer.py @@ -121,6 +121,7 @@ def init(): print(f"Enable streaming for {table_name}") enable_streaming_command = f".alter table {table_name} policy streamingingestion enable" kusto_client.execute_mgmt(KUSTO_DATABASE, enable_streaming_command) + # Manuel check: .show table policy streamingingestion create_mapping_command = f""".create table {table_name} ingestion csv mapping '{table_name}_CSV_Mapping' '[{{"Name":"timestamp","datatype":"long","Ordinal":0}}, {{"Name":"device_id","datatype":"string","Ordinal":1}}, {{"Name":"sequence_number","datatype":"long","Ordinal":2}}, {{"Name":"temperature","datatype":"real","Ordinal":3}}]'""" kusto_client.execute_mgmt(KUSTO_DATABASE, create_mapping_command) @@ -169,13 +170,13 @@ def _stream_insert(events, table_names): for table in table_names: with _ingestion_client() as ingestion_client: events_partition = list(itertools.islice(events, inserts_per_table)) - byte_sequence = pickle.dumps(events_partition) - bytes_stream = io.BytesIO(byte_sequence) - stream_descriptor = StreamDescriptor(bytes_stream) print(f"Ingest {inserts_per_table} into {table}") - ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, data_format=DataFormat.CSV) - result = ingestion_client.ingest_from_stream(stream_descriptor, ingestion_props) - print(result) + for event in events_partition: + byte_sequence = pickle.dumps(event) + bytes_stream = io.BytesIO(byte_sequence) + stream_descriptor = StreamDescriptor(bytes_stream) + ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, data_format=DataFormat.CSV) + ingestion_client.ingest_from_stream(stream_descriptor, ingestion_props) # buffered_io = io.BytesIO() # for event in itertools.islice(events, inserts_per_table): # buffered_io.write(json.dumps(event.__dict__).encode('utf-8')) From ab6e36818d7d8dda857b982c59a99cd119f2f19f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Tue, 21 Mar 2023 16:44:41 +0100 Subject: [PATCH 10/27] Make a single event ingestable by stream --- simulator/modules/azure_data_explorer.py | 39 +++++++++++++++--------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py index f0eff6c..cc39a36 100644 --- a/simulator/modules/azure_data_explorer.py +++ b/simulator/modules/azure_data_explorer.py @@ -1,16 +1,14 @@ import io import itertools -import json -import pickle +import os import time +import pandas as pd +from azure.kusto.data import DataFormat from azure.kusto.data import KustoClient, KustoConnectionStringBuilder from azure.kusto.data.exceptions import KustoApiError -from azure.kusto.data import DataFormat from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, StreamDescriptor -import pandas as pd -import os from .config import config AAD_APP_ID = os.getenv("adx_aad_app_id") @@ -167,16 +165,29 @@ def _stream_insert(events, table_names): number_of_tables = len(table_names) number_of_inserts = int(config["num_inserts"]) inserts_per_table = number_of_inserts // number_of_tables - for table in table_names: - with _ingestion_client() as ingestion_client: + print("Stream ingestion", flush=True) + with _ingestion_client() as ingestion_client: + for table in table_names: + print(f"Ingest {inserts_per_table} into {table}", flush=True) events_partition = list(itertools.islice(events, inserts_per_table)) - print(f"Ingest {inserts_per_table} into {table}") - for event in events_partition: - byte_sequence = pickle.dumps(event) - bytes_stream = io.BytesIO(byte_sequence) - stream_descriptor = StreamDescriptor(bytes_stream) - ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, data_format=DataFormat.CSV) - ingestion_client.ingest_from_stream(stream_descriptor, ingestion_props) + event = events_partition[0] + byte_seq = event.to_json() + bytes_array = byte_seq.encode("utf-8") + byte_stream = io.BytesIO(bytes_array) + stream_descriptor = StreamDescriptor(byte_stream) + ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, data_format=DataFormat.JSON) + result = ingestion_client.ingest_from_stream(stream_descriptor, ingestion_props) + print(result) + + # for dix, event in enumerate(events_partition): + # print(f"Event {dix}", flush=True) + # bytes_sequence = pickle.dumps(event) + # bytes_stream = io.BytesIO(bytes_sequence) + # stream_descriptor = StreamDescriptor(bytes_stream) + # print(StreamDescriptor.get_instance(stream_descriptor).stream) + # ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, data_format=DataFormat.CSV) + # ingestion_client.ingest_from_stream(stream_descriptor, ingestion_props) + # buffered_io = io.BytesIO() # for event in itertools.islice(events, inserts_per_table): # buffered_io.write(json.dumps(event.__dict__).encode('utf-8')) From 586e34dd48ce3115eb709494f5c705ad2e4c24e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Tue, 21 Mar 2023 23:47:38 +0100 Subject: [PATCH 11/27] Fix ingestion via stream --- simulator/modules/azure_data_explorer.py | 36 ++++++++++++++++-------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py index cc39a36..2b3520d 100644 --- a/simulator/modules/azure_data_explorer.py +++ b/simulator/modules/azure_data_explorer.py @@ -27,13 +27,16 @@ print(KUSTO_INGEST_URI) print(KUSTO_DATABASE) + def test(): # #interactive # # kcsb_ingest = KustoConnectionStringBuilder.with_interactive_login(KUSTO_INGEST_URI) # # kcsb_data = KustoConnectionStringBuilder.with_interactive_login(KUSTO_URI) # #secure - kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication("https://ingest-adxcompare.westeurope.kusto.windows.net", AAD_APP_ID, APP_KEY, AUTHORITY_ID) - kcsb_data = KustoConnectionStringBuilder.with_aad_application_key_authentication("https://adxcompare.westeurope.kusto.windows.net", AAD_APP_ID, APP_KEY, AUTHORITY_ID) + kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication( + "https://ingest-adxcompare.westeurope.kusto.windows.net", AAD_APP_ID, APP_KEY, AUTHORITY_ID) + kcsb_data = KustoConnectionStringBuilder.with_aad_application_key_authentication( + "https://adxcompare.westeurope.kusto.windows.net", AAD_APP_ID, APP_KEY, AUTHORITY_ID) kusto_client = KustoClient(kcsb_data) create_table_command = f".create table PopulationTable (State: string, Population: int)" @@ -50,6 +53,8 @@ def test(): ingestion_result = ingestion_client.ingest_from_dataframe(dataframe, ingestion_props) print('Done queuing up ingestion with Azure Data Explorer') print(f"Ingestion_result: {ingestion_result}") + + # import time # time.sleep(50) # sample_query = "PopulationDataNew | summarize max(Population), min(Population), avg(Population) by State" @@ -72,22 +77,21 @@ def test(): # kusto_client.execute_mgmt(KUSTO_DATABASE, delete_table_command) - - # # # DESTINATION_TABLE = "PopulationDataNew" # DESTINATION_TABLE_COLUMN_MAPPING = "PopulationDataNew_CSV_Mapping" - def _ingestion_client(): - kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_INGEST_URI, AAD_APP_ID, APP_KEY, AUTHORITY_ID) + kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_INGEST_URI, AAD_APP_ID, + APP_KEY, AUTHORITY_ID) return QueuedIngestClient(kcsb_ingest) def _kusto_client(): - kcsb_data = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_URI, AAD_APP_ID, APP_KEY, AUTHORITY_ID) + kcsb_data = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_URI, AAD_APP_ID, APP_KEY, + AUTHORITY_ID) return KustoClient(kcsb_data) @@ -170,12 +174,22 @@ def _stream_insert(events, table_names): for table in table_names: print(f"Ingest {inserts_per_table} into {table}", flush=True) events_partition = list(itertools.islice(events, inserts_per_table)) - event = events_partition[0] - byte_seq = event.to_json() - bytes_array = byte_seq.encode("utf-8") + json_string = "" + for event in events_partition: + json_string = json_string + event.to_json() + "\n" + print(json_string) + bytes_array = json_string.encode("utf-8") + byte_stream = io.BytesIO(bytes_array) + # for event in events_partition: + # json_string = event.to_json() + "\n" + # print(json_string) + # bytes_array = json_string.encode("utf-8") + # byte_stream.write(bytes_array) + byte_stream.flush() stream_descriptor = StreamDescriptor(byte_stream) - ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, data_format=DataFormat.JSON) + ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, + data_format=DataFormat.SINGLEJSON) result = ingestion_client.ingest_from_stream(stream_descriptor, ingestion_props) print(result) From 48e9982ecf43ca4ac3e345f59d30e5ba493213d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Tue, 21 Mar 2023 23:52:01 +0100 Subject: [PATCH 12/27] Cleanup code --- simulator/modules/azure_data_explorer.py | 93 ++---------------------- 1 file changed, 6 insertions(+), 87 deletions(-) diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py index 2b3520d..1322d63 100644 --- a/simulator/modules/azure_data_explorer.py +++ b/simulator/modules/azure_data_explorer.py @@ -15,72 +15,17 @@ APP_KEY = os.getenv("adx_app_key") AUTHORITY_ID = os.getenv("adx_authority_id") -print(AAD_APP_ID) -print(APP_KEY) -print(AUTHORITY_ID) +print(f"AAD_APP_ID: {AAD_APP_ID}") +print(f"APP_KEY: {APP_KEY}") +print(f"AUTHORITY_ID: {AUTHORITY_ID}") KUSTO_URI = config["kusto_uri"] KUSTO_INGEST_URI = config["kusto_ingest_uri"] KUSTO_DATABASE = config["kusto_db"] -print(KUSTO_URI) -print(KUSTO_INGEST_URI) -print(KUSTO_DATABASE) - - -def test(): - # #interactive - # # kcsb_ingest = KustoConnectionStringBuilder.with_interactive_login(KUSTO_INGEST_URI) - # # kcsb_data = KustoConnectionStringBuilder.with_interactive_login(KUSTO_URI) - # #secure - kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication( - "https://ingest-adxcompare.westeurope.kusto.windows.net", AAD_APP_ID, APP_KEY, AUTHORITY_ID) - kcsb_data = KustoConnectionStringBuilder.with_aad_application_key_authentication( - "https://adxcompare.westeurope.kusto.windows.net", AAD_APP_ID, APP_KEY, AUTHORITY_ID) - - kusto_client = KustoClient(kcsb_data) - create_table_command = f".create table PopulationTable (State: string, Population: int)" - kusto_client.execute_mgmt("SampleDB", create_table_command) - - create_mapping_command = """.create table PopulationTable ingestion csv mapping 'PopulationDataNew_CSV_Mapping' '[{"Name":"State","datatype":"string","Ordinal":0}, {"Name":"Population","datatype":"int","Ordinal":1}]'""" - kusto_client.execute_mgmt("SampleDB", create_mapping_command) - - ingestion_client = QueuedIngestClient(kcsb_ingest) - ingestion_props = IngestionProperties(database="SampleDB", table="PopulationTable", data_format=DataFormat.CSV, - ignore_first_record=True) - mapping = {'State': ['Texas', 'New York', 'Arizona'], 'Population': [300, 400, 500]} - dataframe = pd.DataFrame(data=mapping) - ingestion_result = ingestion_client.ingest_from_dataframe(dataframe, ingestion_props) - print('Done queuing up ingestion with Azure Data Explorer') - print(f"Ingestion_result: {ingestion_result}") - - -# import time -# time.sleep(50) -# sample_query = "PopulationDataNew | summarize max(Population), min(Population), avg(Population) by State" -# sample_response = kusto_client.execute("ConnTest", sample_query) -# for row in sample_response.primary_results[0]: -# # printing specific columns by index -# print("value at 0 {}".format(row[0])) -# print("\n") -# # printing specific columns by name -# print("EventType:{}".format(row["State"])) -# print("EventType:{}".format(row["max_Population"])) -# print("EventType:{}".format(row["min_Population"])) -# print("EventType:{}".format(row["avg_Population"])) -# try: -# delete_table_command = f".drop table PopulationDataNew2" -# kusto_client.execute_mgmt(KUSTO_DATABASE, delete_table_command) -# except KustoApiError: -# print("Could not delete table. Table was probably not found") -# delete_table_command = f".drop table PopulationDataNew" -# kusto_client.execute_mgmt(KUSTO_DATABASE, delete_table_command) - - -# -# -# DESTINATION_TABLE = "PopulationDataNew" -# DESTINATION_TABLE_COLUMN_MAPPING = "PopulationDataNew_CSV_Mapping" +print(f"KUSTO_URI: {KUSTO_URI}") +print(f"KUSTO_INGEST_URI: {KUSTO_INGEST_URI}") +print(f"KUSTO_DATABASE: {KUSTO_DATABASE}") def _ingestion_client(): @@ -179,13 +124,7 @@ def _stream_insert(events, table_names): json_string = json_string + event.to_json() + "\n" print(json_string) bytes_array = json_string.encode("utf-8") - byte_stream = io.BytesIO(bytes_array) - # for event in events_partition: - # json_string = event.to_json() + "\n" - # print(json_string) - # bytes_array = json_string.encode("utf-8") - # byte_stream.write(bytes_array) byte_stream.flush() stream_descriptor = StreamDescriptor(byte_stream) ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, @@ -193,26 +132,6 @@ def _stream_insert(events, table_names): result = ingestion_client.ingest_from_stream(stream_descriptor, ingestion_props) print(result) - # for dix, event in enumerate(events_partition): - # print(f"Event {dix}", flush=True) - # bytes_sequence = pickle.dumps(event) - # bytes_stream = io.BytesIO(bytes_sequence) - # stream_descriptor = StreamDescriptor(bytes_stream) - # print(StreamDescriptor.get_instance(stream_descriptor).stream) - # ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, data_format=DataFormat.CSV) - # ingestion_client.ingest_from_stream(stream_descriptor, ingestion_props) - - # buffered_io = io.BytesIO() - # for event in itertools.islice(events, inserts_per_table): - # buffered_io.write(json.dumps(event.__dict__).encode('utf-8')) - # # (buffered_io.write(json.dumps(event.__dict__).encode('utf-8')) for event in itertools.islice(events, inserts_per_table)) - # print(f"Ingest {inserts_per_table} into {table}") - # print(f"Bytes: {buffered_io.getbuffer().nbytes}") - # ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, - # ignore_first_record=False) - # result = ingestion_client.ingest_from_stream(buffered_io, ingestion_props) - # print(result) - def _ingest(table, timestamps, device_ids, sequence_numbers, temperatures): with _ingestion_client() as ingestion_client: From f71c48f86a9c05398a64f4d6772312779fb7471b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Wed, 22 Mar 2023 00:07:18 +0100 Subject: [PATCH 13/27] Refactor code --- simulator/modules/azure_data_explorer.py | 112 +++++++++++++++-------- 1 file changed, 76 insertions(+), 36 deletions(-) diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py index 1322d63..a0eb30c 100644 --- a/simulator/modules/azure_data_explorer.py +++ b/simulator/modules/azure_data_explorer.py @@ -46,32 +46,55 @@ def init(): else: table_names = ["events"] with _kusto_client() as kusto_client: - response = kusto_client.execute(KUSTO_DATABASE, f""".show tables | where DatabaseName == "{KUSTO_DATABASE}" """) - existing_tables = [row[0] for row in response.primary_results[0]] - print(f"Following tables already exist: {existing_tables}") + existing_tables = _get_existing_tables(kusto_client) if config["clean_database"]: - for table_name in existing_tables: - try: - print(f"Delete table {table_name}") - delete_table_command = f".drop table {table_name}" - kusto_client.execute_mgmt(KUSTO_DATABASE, delete_table_command) - except KustoApiError as error: - print(f"Could not delete table, due to:\n {error}") + _clean_database(existing_tables, kusto_client) else: - table_names = [table_name for table_name in table_names if table_name not in existing_tables] + table_names = _get_tables_requiring_creation(existing_tables, table_names) for table_name in table_names: - print(f"Create table {table_name}") - create_table_command = f".create table {table_name} (timestamp: long, device_id: string, sequence_number: long, temperature: real)" - kusto_client.execute_mgmt(KUSTO_DATABASE, create_table_command) + _create_table(kusto_client, table_name) + _handle_stream_ingestion(kusto_client, table_name) + _create_ingestion_mapping(kusto_client, table_name) - if not config.get("batch_mode", True): - print(f"Enable streaming for {table_name}") - enable_streaming_command = f".alter table {table_name} policy streamingingestion enable" - kusto_client.execute_mgmt(KUSTO_DATABASE, enable_streaming_command) - # Manuel check: .show table policy streamingingestion - create_mapping_command = f""".create table {table_name} ingestion csv mapping '{table_name}_CSV_Mapping' '[{{"Name":"timestamp","datatype":"long","Ordinal":0}}, {{"Name":"device_id","datatype":"string","Ordinal":1}}, {{"Name":"sequence_number","datatype":"long","Ordinal":2}}, {{"Name":"temperature","datatype":"real","Ordinal":3}}]'""" - kusto_client.execute_mgmt(KUSTO_DATABASE, create_mapping_command) +def _get_tables_requiring_creation(existing_tables, table_names): + return [table_name for table_name in table_names if table_name not in existing_tables] + + +def _clean_database(existing_tables, kusto_client): + for table_name in existing_tables: + try: + print(f"Delete table {table_name}") + delete_table_command = f".drop table {table_name}" + kusto_client.execute_mgmt(KUSTO_DATABASE, delete_table_command) + except KustoApiError as error: + print(f"Could not delete table, due to:\n {error}") + + +def _get_existing_tables(kusto_client): + response = kusto_client.execute(KUSTO_DATABASE, f""".show tables | where DatabaseName == "{KUSTO_DATABASE}" """) + existing_tables = [row[0] for row in response.primary_results[0]] + print(f"Following tables already exist: {existing_tables}") + return existing_tables + + +def _create_ingestion_mapping(kusto_client, table_name): + create_mapping_command = f""".create table {table_name} ingestion csv mapping '{table_name}_CSV_Mapping' '[{{"Name":"timestamp","datatype":"long","Ordinal":0}}, {{"Name":"device_id","datatype":"string","Ordinal":1}}, {{"Name":"sequence_number","datatype":"long","Ordinal":2}}, {{"Name":"temperature","datatype":"real","Ordinal":3}}]'""" + kusto_client.execute_mgmt(KUSTO_DATABASE, create_mapping_command) + + +def _create_table(kusto_client, table_name): + print(f"Create table {table_name}") + create_table_command = f".create table {table_name} (timestamp: long, device_id: string, sequence_number: long, temperature: real)" + kusto_client.execute_mgmt(KUSTO_DATABASE, create_table_command) + + +def _handle_stream_ingestion(kusto_client, table_name): + if not config.get("batch_mode", True): + print(f"Enable streaming for {table_name}") + enable_streaming_command = f".alter table {table_name} policy streamingingestion enable" + kusto_client.execute_mgmt(KUSTO_DATABASE, enable_streaming_command) + # Manuel check: .show table policy streamingingestion def prefill_events(events): @@ -97,7 +120,7 @@ def _batch_insert(events, batch_size, table_names): temperatures.append(event.temperature) count += 1 if count >= batch_size: - table = table_names[int(idx / batch_size) % len(table_names)] + table = _determine_table_for_ingestion(batch_size, idx, table_names) print(f"Insert {count} entries into {table}") _ingest(table, timestamps, device_ids, sequence_numbers, temperatures) timestamps.clear() @@ -110,6 +133,10 @@ def _batch_insert(events, batch_size, table_names): _ingest(table_names[0], timestamps, device_ids, sequence_numbers, temperatures) +def _determine_table_for_ingestion(batch_size, idx, table_names): + return table_names[int(idx / batch_size) % len(table_names)] + + def _stream_insert(events, table_names): number_of_tables = len(table_names) number_of_inserts = int(config["num_inserts"]) @@ -117,20 +144,33 @@ def _stream_insert(events, table_names): print("Stream ingestion", flush=True) with _ingestion_client() as ingestion_client: for table in table_names: - print(f"Ingest {inserts_per_table} into {table}", flush=True) - events_partition = list(itertools.islice(events, inserts_per_table)) - json_string = "" - for event in events_partition: - json_string = json_string + event.to_json() + "\n" - print(json_string) - bytes_array = json_string.encode("utf-8") - byte_stream = io.BytesIO(bytes_array) - byte_stream.flush() - stream_descriptor = StreamDescriptor(byte_stream) - ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, - data_format=DataFormat.SINGLEJSON) - result = ingestion_client.ingest_from_stream(stream_descriptor, ingestion_props) - print(result) + _ingest_by_stream(events, ingestion_client, inserts_per_table, table) + + +def _ingest_by_stream(events, ingestion_client, inserts_per_table, table): + print(f"Ingest {inserts_per_table} into {table}", flush=True) + events_partition = list(itertools.islice(events, inserts_per_table)) + json_string = _to_json(events_partition) + stream_descriptor = _create_stream_descriptor(json_string) + ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, + data_format=DataFormat.SINGLEJSON) + result = ingestion_client.ingest_from_stream(stream_descriptor, ingestion_props) + print(result) + + +def _create_stream_descriptor(json_string): + bytes_array = json_string.encode("utf-8") + byte_stream = io.BytesIO(bytes_array) + byte_stream.flush() + stream_descriptor = StreamDescriptor(byte_stream) + return stream_descriptor + + +def _to_json(events_partition): + json_string = "" + for event in events_partition: + json_string = json_string + event.to_json() + "\n" + return json_string def _ingest(table, timestamps, device_ids, sequence_numbers, temperatures): From b03e7039b9741ac01a8bcd3dd0afcb4f09d224d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Wed, 22 Mar 2023 16:54:11 +0100 Subject: [PATCH 14/27] Refactor code --- simulator/modules/azure_data_explorer.py | 91 +++++++++++++----------- 1 file changed, 48 insertions(+), 43 deletions(-) diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py index a0eb30c..bf6547f 100644 --- a/simulator/modules/azure_data_explorer.py +++ b/simulator/modules/azure_data_explorer.py @@ -28,18 +28,6 @@ print(f"KUSTO_DATABASE: {KUSTO_DATABASE}") -def _ingestion_client(): - kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_INGEST_URI, AAD_APP_ID, - APP_KEY, AUTHORITY_ID) - return QueuedIngestClient(kcsb_ingest) - - -def _kusto_client(): - kcsb_data = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_URI, AAD_APP_ID, APP_KEY, - AUTHORITY_ID) - return KustoClient(kcsb_data) - - def init(): if config["use_multiple_tables"]: table_names = ["events0", "events1", "events2", "events3"] @@ -57,6 +45,44 @@ def init(): _create_ingestion_mapping(kusto_client, table_name) +def prefill_events(events): + _insert_events(events, True, 1_000) + + +def insert_events(events): + batch_mode = config.get("batch_mode", False) + batch_size = config.get("batch_size", 1_000) + _insert_events(events, batch_mode, batch_size) + + +_queries = { + "count-events": "events | count", + "temperature-min-max": "events| summarize max(temperature), min(temperature)", + "temperature-stats": "events| summarize max(temperature), avg(temperature), min(temperature)", + "temperature-stats-per-device": "events | summarize max(temperature), avg(temperature), min(temperature) by device_id", + "newest-per-device": "events | partition by device_id (top 1 by timestamp desc | project device_id, temperature)", +} + + +def queries(): + _filter_queries_to_execute() + query_times = dict([(name, []) for name in _queries.keys()]) + for _ in range(0, int(config["runs"])): + for name, query in _queries.items(): + _execute_query(name, query, query_times) + return query_times + + +def _ingestion_client(): + kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_INGEST_URI, AAD_APP_ID, APP_KEY, AUTHORITY_ID) + return QueuedIngestClient(kcsb_ingest) + + +def _kusto_client(): + kcsb_data = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_URI, AAD_APP_ID, APP_KEY, AUTHORITY_ID) + return KustoClient(kcsb_data) + + def _get_tables_requiring_creation(existing_tables, table_names): return [table_name for table_name in table_names if table_name not in existing_tables] @@ -97,16 +123,6 @@ def _handle_stream_ingestion(kusto_client, table_name): # Manuel check: .show table policy streamingingestion -def prefill_events(events): - _insert_events(events, True, 1_000) - - -def insert_events(events): - batch_mode = config.get("batch_mode", False) - batch_size = config.get("batch_size", 1_000) - _insert_events(events, batch_mode, batch_size) - - def _batch_insert(events, batch_size, table_names): count = 0 timestamps = [] @@ -199,32 +215,21 @@ def _insert_events(events, batch_mode, batch_size): _stream_insert(events, table_names) -_queries = { - "count-events": "events | count", - "temperature-min-max": "events| summarize max(temperature), min(temperature)", - "temperature-stats": "events| summarize max(temperature), avg(temperature), min(temperature)", - "temperature-stats-per-device": "events | summarize max(temperature), avg(temperature), min(temperature) by device_id", - "newest-per-device": "events | partition by device_id (top 1 by timestamp desc | project device_id, temperature)", -} +def _execute_query(name, query, query_times): + with _kusto_client() as kusto_client: + print(f"Executing query {name}", flush=True) + start = time.time() + result = kusto_client.execute(KUSTO_DATABASE, query) + print(result) + duration = time.time() - start + print(f"Finished query. Duration: {duration}", flush=True) + query_times[name].append(duration) -def queries(): +def _filter_queries_to_execute(): if "queries" in config: included = config["queries"].split(",") for key in list(_queries.keys()): if key not in included: del _queries[key] print(_queries) - query_times = dict([(name, []) for name in _queries.keys()]) - for _ in range(0, int(config["runs"])): - for name, query in _queries.items(): - with _kusto_client() as kusto_client: - print(f"Executing query {name}", flush=True) - start = time.time() - result = kusto_client.execute(KUSTO_DATABASE, query) - print(result) - duration = time.time() - start - print(f"Finished query. Duration: {duration}", flush=True) - query_times[name].append(duration) - - return query_times From c0562d7a7dec4b4e14ee4dc7c92448436400004c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Tue, 28 Mar 2023 11:54:21 +0200 Subject: [PATCH 15/27] Clean up --- README.md | 30 +++++++++++++++++++++++++++ dbinstall/azure_data_explorer/main.tf | 13 ------------ deployment/values.yaml | 2 +- 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 7552c06..0b4bc88 100644 --- a/README.md +++ b/README.md @@ -314,8 +314,38 @@ kubectl apply -f dbinstall/elastic-deployment.yaml ### Azure Data Explorer +Azure Data Explorer (ADX) is a fully managed, high-performance, big data analytics platform. Azure Data Explorer can take all this varied data, and then ingest, process, and store it. You can use Azure Data Explorer for near real-time queries and advanced analytics. + +To deploy an ADX-Cluster change the Service Principle (AAD) to your own Service Principle inside dbinstall/azure_data_explorer/main.tf. +``` +data "azuread_service_principal" "service-principle" { + display_name = "mw_iot_ADX-DB-Comparison" +} +``` + +Additionally adjust the following fields depending on your performance needs: + +``` +resource "azurerm_kusto_cluster" "adxcompare" { + ... + + sku { + name = "Dev(No SLA)_Standard_E2a_v4" + capacity = 1 + } +``` + +adjust the terraform-file to your needs and then run: +```bash +az login +terraform apply +``` + + + Besides having an emulator for ADX, capable of running locally, it is not recommended to use this emulator for any kind of benchmark tests, since the performance profile is very different. Furthermore, it is even prohibited by license terms. + ## Remarks on the databases This is a collection of problems / observations collected over the course of the tests. diff --git a/dbinstall/azure_data_explorer/main.tf b/dbinstall/azure_data_explorer/main.tf index 6f28d2d..ba98f11 100644 --- a/dbinstall/azure_data_explorer/main.tf +++ b/dbinstall/azure_data_explorer/main.tf @@ -59,19 +59,6 @@ data "azuread_service_principal" "service-principle" { display_name = "mw_iot_ADX-DB-Comparison" } -/* -output "tenant_id" { - value = data.azurerm_client_config.current.tenant_id -} - -output "principal_id" { - value = data.azuread_service_principal.service-principle.id -} - -output "principal_id2" { - value = data.azurerm_client_config.current.client_id -} -*/ resource "azurerm_kusto_database_principal_assignment" "ad-permission" { name = "AD-Permission" resource_group_name = azurerm_resource_group.rg-compare.name diff --git a/deployment/values.yaml b/deployment/values.yaml index d27cd95..6328185 100644 --- a/deployment/values.yaml +++ b/deployment/values.yaml @@ -1,7 +1,7 @@ --- namespace: default target_module: azure_data_explorer -run_config: ewoidGFzayI6ICJpbnNlcnQiLAoidXNlX211bHRpcGxlX3RhYmxlcyI6IHRydWUsCiJjbGVhbl9kYXRhYmFzZSI6IHRydWUsCiJiYXRjaF9tb2RlIjogZmFsc2UsCiJwcmVmaWxsIjogMCwKInJ1bnMiOiAxLAoibnVtX2luc2VydHMiOiAxMDAwMCwKImt1c3RvX3VyaSI6ICJodHRwczovL2FkeGNvbXBhcmUud2VzdGV1cm9wZS5rdXN0by53aW5kb3dzLm5ldCIsCiJrdXN0b19pbmdlc3RfdXJpIjogImh0dHBzOi8vaW5nZXN0LWFkeGNvbXBhcmUud2VzdGV1cm9wZS5rdXN0by53aW5kb3dzLm5ldCIsCiJrdXN0b19kYiI6ICJTYW1wbGVEQiIKfQ== +run_config: ewoidGFzayI6ICJxdWVyeSIsCiJ1c2VfbXVsdGlwbGVfdGFibGVzIjogZmFsc2UsCiJjbGVhbl9kYXRhYmFzZSI6IHRydWUsCiJiYXRjaF9tb2RlIjogdHJ1ZSwKInByZWZpbGwiOiAwLAoicnVucyI6IDMsCiJudW1faW5zZXJ0cyI6IDEwMDAwLAoia3VzdG9fdXJpIjogImh0dHBzOi8vYWR4Y29tcGFyZS53ZXN0ZXVyb3BlLmt1c3RvLndpbmRvd3MubmV0IiwKImt1c3RvX2luZ2VzdF91cmkiOiAiaHR0cHM6Ly9pbmdlc3QtYWR4Y29tcGFyZS53ZXN0ZXVyb3BlLmt1c3RvLndpbmRvd3MubmV0IiwKImt1c3RvX2RiIjogIlNhbXBsZURCIgp9 workers: 1 image: From 8ec6824354055edeb909b4e14bcb3b3fba5879ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Wed, 29 Mar 2023 00:01:49 +0200 Subject: [PATCH 16/27] Final Clean up --- README.md | 2 +- dbinstall/azure_data_explorer/main.tf | 7 ++++--- deployment/values.yaml | 4 ++-- simulator/main.py | 1 - simulator/modules/azure_data_explorer.py | 11 +---------- simulator/modules/config.py | 3 --- simulator/modules/population.csv | 3 --- 7 files changed, 8 insertions(+), 23 deletions(-) delete mode 100644 simulator/modules/population.csv diff --git a/README.md b/README.md index 0b4bc88..429bed4 100644 --- a/README.md +++ b/README.md @@ -335,7 +335,7 @@ resource "azurerm_kusto_cluster" "adxcompare" { } ``` -adjust the terraform-file to your needs and then run: +Finally, run: ```bash az login terraform apply diff --git a/dbinstall/azure_data_explorer/main.tf b/dbinstall/azure_data_explorer/main.tf index ba98f11..d0a1d18 100644 --- a/dbinstall/azure_data_explorer/main.tf +++ b/dbinstall/azure_data_explorer/main.tf @@ -15,7 +15,7 @@ provider "azurerm" { resource "azurerm_resource_group" "rg-compare" { - name = "db-performance-comparison2" + name = "db-performance-comparison" location = "West Europe" tags = { @@ -30,6 +30,7 @@ resource "azurerm_kusto_cluster" "adxcompare" { resource_group_name = azurerm_resource_group.rg-compare.name streaming_ingestion_enabled = true + # Change depending on your performance needs sku { name = "Dev(No SLA)_Standard_E2a_v4" capacity = 1 @@ -54,7 +55,7 @@ resource "azurerm_kusto_database" "sample-db" { data "azurerm_client_config" "current" { } - +# Change to your own service principal data "azuread_service_principal" "service-principle" { display_name = "mw_iot_ADX-DB-Comparison" } @@ -69,4 +70,4 @@ resource "azurerm_kusto_database_principal_assignment" "ad-permission" { principal_id = data.azuread_service_principal.service-principle.id principal_type = "App" role = "Admin" -} \ No newline at end of file +} diff --git a/deployment/values.yaml b/deployment/values.yaml index 6328185..13b2b35 100644 --- a/deployment/values.yaml +++ b/deployment/values.yaml @@ -1,7 +1,7 @@ --- namespace: default -target_module: azure_data_explorer -run_config: ewoidGFzayI6ICJxdWVyeSIsCiJ1c2VfbXVsdGlwbGVfdGFibGVzIjogZmFsc2UsCiJjbGVhbl9kYXRhYmFzZSI6IHRydWUsCiJiYXRjaF9tb2RlIjogdHJ1ZSwKInByZWZpbGwiOiAwLAoicnVucyI6IDMsCiJudW1faW5zZXJ0cyI6IDEwMDAwLAoia3VzdG9fdXJpIjogImh0dHBzOi8vYWR4Y29tcGFyZS53ZXN0ZXVyb3BlLmt1c3RvLndpbmRvd3MubmV0IiwKImt1c3RvX2luZ2VzdF91cmkiOiAiaHR0cHM6Ly9pbmdlc3QtYWR4Y29tcGFyZS53ZXN0ZXVyb3BlLmt1c3RvLndpbmRvd3MubmV0IiwKImt1c3RvX2RiIjogIlNhbXBsZURCIgp9 +target_module: postgres +run_config: workers: 1 image: diff --git a/simulator/main.py b/simulator/main.py index 7925d6d..73a8432 100644 --- a/simulator/main.py +++ b/simulator/main.py @@ -1,6 +1,5 @@ import os -print("v1") instance_type = os.environ.get("INSTANCE_TYPE") if instance_type == "worker": diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py index bf6547f..58ab066 100644 --- a/simulator/modules/azure_data_explorer.py +++ b/simulator/modules/azure_data_explorer.py @@ -15,18 +15,10 @@ APP_KEY = os.getenv("adx_app_key") AUTHORITY_ID = os.getenv("adx_authority_id") -print(f"AAD_APP_ID: {AAD_APP_ID}") -print(f"APP_KEY: {APP_KEY}") -print(f"AUTHORITY_ID: {AUTHORITY_ID}") - KUSTO_URI = config["kusto_uri"] KUSTO_INGEST_URI = config["kusto_ingest_uri"] KUSTO_DATABASE = config["kusto_db"] -print(f"KUSTO_URI: {KUSTO_URI}") -print(f"KUSTO_INGEST_URI: {KUSTO_INGEST_URI}") -print(f"KUSTO_DATABASE: {KUSTO_DATABASE}") - def init(): if config["use_multiple_tables"]: @@ -219,8 +211,7 @@ def _execute_query(name, query, query_times): with _kusto_client() as kusto_client: print(f"Executing query {name}", flush=True) start = time.time() - result = kusto_client.execute(KUSTO_DATABASE, query) - print(result) + kusto_client.execute(KUSTO_DATABASE, query) duration = time.time() - start print(f"Finished query. Duration: {duration}", flush=True) query_times[name].append(duration) diff --git a/simulator/modules/config.py b/simulator/modules/config.py index ff66817..c3a7f20 100644 --- a/simulator/modules/config.py +++ b/simulator/modules/config.py @@ -2,7 +2,4 @@ import json import os -print(os.getenv("RUN_CONFIG")) -print(base64.b64decode(os.getenv("RUN_CONFIG"))) config = json.loads(base64.b64decode(os.getenv("RUN_CONFIG"))) -print(config) \ No newline at end of file diff --git a/simulator/modules/population.csv b/simulator/modules/population.csv deleted file mode 100644 index 3cbbebc..0000000 --- a/simulator/modules/population.csv +++ /dev/null @@ -1,3 +0,0 @@ -TEXAS, 500000 -INDIANA, 400000 -WASHINGTON, 300000 \ No newline at end of file From 81d1404f695bd6dfad7e7c59827aec392761eae5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Wed, 29 Mar 2023 07:48:26 +0200 Subject: [PATCH 17/27] Final Clean up, Part 2 --- deployment/templates/worker.yaml | 3 ++- deployment/values.yaml | 2 +- simulator/modules/config.py | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/deployment/templates/worker.yaml b/deployment/templates/worker.yaml index 9c26f61..302d596 100644 --- a/deployment/templates/worker.yaml +++ b/deployment/templates/worker.yaml @@ -42,5 +42,6 @@ spec: name: adx-secret restartPolicy: Never terminationGracePeriodSeconds: 2 - nodeSelector: {{ .Values.nodeSelector | toYaml | indent 8 }} + nodeSelector: +{{ .Values.nodeSelector | toYaml | indent 8 }} backoffLimit: 0 diff --git a/deployment/values.yaml b/deployment/values.yaml index 13b2b35..542a400 100644 --- a/deployment/values.yaml +++ b/deployment/values.yaml @@ -1,7 +1,7 @@ --- namespace: default target_module: postgres -run_config: +run_config: null workers: 1 image: diff --git a/simulator/modules/config.py b/simulator/modules/config.py index c3a7f20..26d151c 100644 --- a/simulator/modules/config.py +++ b/simulator/modules/config.py @@ -2,4 +2,5 @@ import json import os + config = json.loads(base64.b64decode(os.getenv("RUN_CONFIG"))) From abecb5ef1d1278ee103c36f82daba9499ce3e84e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Wed, 29 Mar 2023 07:51:01 +0200 Subject: [PATCH 18/27] Final Clean up, Part 3 --- simulator/Dockerfile | 2 -- 1 file changed, 2 deletions(-) diff --git a/simulator/Dockerfile b/simulator/Dockerfile index 98c84d7..c40173e 100644 --- a/simulator/Dockerfile +++ b/simulator/Dockerfile @@ -12,9 +12,7 @@ FROM base COPY --from=builder /install /usr/local COPY requirements.txt / - RUN pip install -r requirements.txt - RUN apk --no-cache add libpq libstdc++ ADD . /simulator WORKDIR /simulator From 8f26e9a004b3310296b547efed4394b5053eba75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Thu, 30 Mar 2023 22:38:42 +0200 Subject: [PATCH 19/27] Improve readme --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 429bed4..2423118 100644 --- a/README.md +++ b/README.md @@ -341,6 +341,8 @@ az login terraform apply ``` +To enable streaming set the `batch` parameter of the config to `false`. This will not apply to existing tables. +Streaming ingestion on cluster level is enabled by default by terraform cluster config. Besides having an emulator for ADX, capable of running locally, it is not recommended to use this emulator for any kind of benchmark tests, since the performance profile is very different. Furthermore, it is even prohibited by license terms. From 1b05a32f237b9ce1229575cf843720056b7b37cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Thu, 30 Mar 2023 22:41:42 +0200 Subject: [PATCH 20/27] Change back to mw-docker image --- deployment/values.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deployment/values.yaml b/deployment/values.yaml index 542a400..16f49a7 100644 --- a/deployment/values.yaml +++ b/deployment/values.yaml @@ -5,7 +5,7 @@ run_config: null workers: 1 image: - name: davidboettcher/database-comparison + name: maibornwolff/database-comparison tag: latest nodeSelector: {} From 3b7d1ffb1e5e173205c261f494b639c4940fa9db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Sat, 1 Apr 2023 12:09:05 +0200 Subject: [PATCH 21/27] Document results for storage optimized --- README.md | 58 +++++++++++++++--------- simulator/modules/azure_data_explorer.py | 2 +- 2 files changed, 38 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 2423118..d0dd11e 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ This readme contains three sections: ## Benchmark Using the test setup from this repository we ran tests against PostgreSQL, CockroachDB, YugabyteDB, ArangoDB, Cassandra and InfluxDB with (aside from PostgreSQL and InfluxDB) 3 nodes and 16 parallel workers that insert data (more concurrent workers might speed up the process but we didn't test that as this test was only to establish a baseline). All tests were done on a k3s cluster consisting of 8 m5ad.2xlarge (8vCPU, 32GB memory) and 3 i3.8xlarge (32vCPU, 244GB memory) EC2 instances on AWS. The database pods were run on the i3 instances with a resource limit of 8 cores and 10 GB memory per node, the client pods of the benchmark on the m5ad instances. In the text below all mentions of nodes refer to database nodes (pods) and not VMs or kubernetes nodes. +For Azure Data Explorer the test setup adapted to use: AKS with Standard_D8_v5 on 3 Nodes. All tests were run on an empty database. @@ -36,18 +37,22 @@ For ArangoDB batch mode is implemented using the document batch API (`/_api/docu The table below shows the best results for the databases for a 3 node cluster and a resource limit of 8 cores and 10 GB memory per node. The exceptions are PostgreSQL, InfluxDB and TimescaleDB which were launched as only a single instance. Influx provides a clustered variant only with their Enterprise product and for TimescaleDB there is no official and automated way to create a cluster with a distributed hypertable. All tests were run with the newest available version of the databases at the time of testing and using the opensource or free versions. Inserts were done with 16 parallel workers, and each test was run 3 times with the result being the average of these runs. For each run the inserts per second was calculated as the number of inserts divided by the sumed up duration of the workers. -| Database (Version tested) | Inserts/s | Insert mode | Primary-key mode | -|---------------------------------------------|------------|--------------------------------------|------------------| -| PostgreSQL | 428000 | copy, size 1000 | sql | -| CockroachDB (22.1.3) | 91000 | values lists, size 1000 | db | -| YugabyteDB YSQL (2.15.0) | 295000 | copy, size 1000 | sql | -| YugabyteDB YCQL (2.15.0) | 288000 | batch, size 1000 | - | -| ArrangoDB | 137000 | batch, size 1000 | db | -| Cassandra sync inserts | 389000 | batch, size 1000, max_sync_calls 1 | - | -| Cassandra async inserts | 410000 | batch, size 1000, max_sync_calls 120 | - | -| InfluxDB | 460000 | batch, size 1000 | - | -| TimescaleDB | 600000 | copy, size 1000 | - | -| Elasticsearch | 170000 | batch, size 10000 | db | +| Database (Version tested) | Inserts/s | Insert mode | Primary-key mode | +|-----------------------------------------|-----------|--------------------------------------|------------------| +| PostgreSQL | 428000 | copy, size 1000 | sql | +| CockroachDB (22.1.3) | 91000 | values lists, size 1000 | db | +| YugabyteDB YSQL (2.15.0) | 295000 | copy, size 1000 | sql | +| YugabyteDB YCQL (2.15.0) | 288000 | batch, size 1000 | - | +| ArrangoDB | 137000 | batch, size 1000 | db | +| Cassandra sync inserts | 389000 | batch, size 1000, max_sync_calls 1 | - | +| Cassandra async inserts | 410000 | batch, size 1000, max_sync_calls 120 | - | +| InfluxDB | 460000 | batch, size 1000 | - | +| TimescaleDB | 600000 | copy, size 1000 | - | +| Elasticsearch | 170000 | batch, size 10000 | db | +| Azure Data Explorer (Storage optimized) | 36000 | batch, size 1000 | - | +| Azure Data Explorer (Storage optimized) | 30000 | stream, size 1000 | - | +| Azure Data Explorer (Compute optimized) | ? | batch, size 1000 | - | +| Azure Data Explorer (Compute optimized) | ? | stream, size 1000 | - | You can find additional results from older runs in [old-results.md](old-results.md) but be aware that comparing them with the current ones is not always possible due to different conditions during the runs. @@ -64,16 +69,22 @@ Although the results of our benchmarks show a drastic improvement, batching in m For TimescaleDB the insert performance depends a lot on the number and size of chunks that are written to. In a fill-level test with 50 million inserts per step where in each step the timestamps started again (so the same chunks were written to as in the last step) performance degraded rapidly. But in the more realistic case of ever increasing timestamps (so new chunks being added) performance stayed relatively constant. +Azure Data Explorer offers compute and storage optimized sku types, as well as batch and stream ingestion. For storage Optimized it was decided to test against Standard_L8s_v2, for compute optimized against ??. +Notable about Azure Data Explorer is the fact, that inserts are not immediatly written to the database, instead their queued. The larger the batch/stream the longer it takes until the queue +starts to work it off. Once started it can keep the pace. For example a couple of rows take 20 - 30 seconds to appear in the database, 1000 rows 5 - 6 minutes, but 12.5 million rows require also only 5 - 6 minutes. + ### Query performance -| Database / Query | count-events | temperature-min-max | temperature-stats | temperature-stats-per-device | newest-per-device | -|------------------|--------------|---------------------|-------------------|------------------------------|-------------------| -| PostgreSQL | 39 | 0.01 | 66 | 119 | 92 | -| CockroachDB | 123 | 153 | 153 | 153 | 150 | -| InfluxDB | 10 | 48 | 70 | 71 | 0.1 | -| TimescaleDB | 30 | 0.17 | 34 | 42 | 38 | -| Elasticsearch | 0.04 | 0.03 | 5.3 | 11 | 13 | -| Yugabyte (YSQL) | 160 | 0.03 | 220 | 1700 | failure | +| Database / Query | count-events | temperature-min-max | temperature-stats | temperature-stats-per-device | newest-per-device | +|-----------------------------------------|--------------|---------------------|-------------------|------------------------------|-------------------| +| PostgreSQL | 39 | 0.01 | 66 | 119 | 92 | +| CockroachDB | 123 | 153 | 153 | 153 | 150 | +| InfluxDB | 10 | 48 | 70 | 71 | 0.1 | +| TimescaleDB | 30 | 0.17 | 34 | 42 | 38 | +| Elasticsearch | 0.04 | 0.03 | 5.3 | 11 | 13 | +| Yugabyte (YSQL) | 160 | 0.03 | 220 | 1700 | failure | +| Azure Data Explorer (Storage optimized( | 0.33 | 0.76 | 1.0 | 3.2 | 8.6 | +| Azure Data Explorer (Storage optimized( | ?? | ?? | ?? | ?? | ?? | The table gives the average query duration in seconds @@ -335,12 +346,17 @@ resource "azurerm_kusto_cluster" "adxcompare" { } ``` -Finally, run: +To apply the infrastructure, run: ```bash az login terraform apply ``` +Finally, create the kubernetes secret: + +````bash +kubectl create secret generic adx-secret --from-literal=adx_aad_app_id= --from-literal=adx_app_key= --from=adx_authority_id= -n default +```` To enable streaming set the `batch` parameter of the config to `false`. This will not apply to existing tables. Streaming ingestion on cluster level is enabled by default by terraform cluster config. diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py index 58ab066..bfc3ee3 100644 --- a/simulator/modules/azure_data_explorer.py +++ b/simulator/modules/azure_data_explorer.py @@ -59,7 +59,7 @@ def insert_events(events): def queries(): _filter_queries_to_execute() query_times = dict([(name, []) for name in _queries.keys()]) - for _ in range(0, int(config["runs"])): + for _ in range(int(config["runs"])): for name, query in _queries.items(): _execute_query(name, query, query_times) return query_times From 684a17a02fb8c6aa5d4b14a8f5e6dae7ce18041a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Sat, 8 Apr 2023 17:44:41 +0200 Subject: [PATCH 22/27] Document results for compute optimized --- README.md | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index d0dd11e..0108021 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,8 @@ For generating primary keys for the events we have two modes: Calculating it on For the insert testcase we use single inserts (for PostgreSQL with autocommit) to simulate an ingest where each message needs to be persisted as soon as possible so no batching of messages is possible. Depending on the architecture and implementation buffering and batching of messages is possible. To see what effect this has on the performance we have implemented a batch mode for the test. For ArangoDB batch mode is implemented using the document batch API (`/_api/document`), the older generic batch API (`/_api/batch`) will be deprecated and produces worse performance so we did not use it. For PostgreSQL we implemented batch mode by doing a manual commit every x inserts and using COPY instead of INSERT. Another way to implement batch inserts is to use values lists (one insert statement with a list of values tuples) but this is not as fast as COPY. This mode can however be activated by passing `--extra-option use_values_lists=true`. +Azure Data Explorer offers compute and storage optimized sku types, as well as batch and stream ingestion. For storage optimized it was decided to test against Standard_L8s_v2, for compute optimized against Standard_E8a_v4 and in both cases a sku capacity of 2 was used. + ### Insert performance The table below shows the best results for the databases for a 3 node cluster and a resource limit of 8 cores and 10 GB memory per node. The exceptions are PostgreSQL, InfluxDB and TimescaleDB which were launched as only a single instance. Influx provides a clustered variant only with their Enterprise product and for TimescaleDB there is no official and automated way to create a cluster with a distributed hypertable. All tests were run with the newest available version of the databases at the time of testing and using the opensource or free versions. @@ -51,8 +53,8 @@ Inserts were done with 16 parallel workers, and each test was run 3 times with t | Elasticsearch | 170000 | batch, size 10000 | db | | Azure Data Explorer (Storage optimized) | 36000 | batch, size 1000 | - | | Azure Data Explorer (Storage optimized) | 30000 | stream, size 1000 | - | -| Azure Data Explorer (Compute optimized) | ? | batch, size 1000 | - | -| Azure Data Explorer (Compute optimized) | ? | stream, size 1000 | - | +| Azure Data Explorer (Compute optimized) | 38000 | batch, size 1000 | - | +| Azure Data Explorer (Compute optimized) | 53000 | stream, size 1000 | - | You can find additional results from older runs in [old-results.md](old-results.md) but be aware that comparing them with the current ones is not always possible due to different conditions during the runs. @@ -69,8 +71,7 @@ Although the results of our benchmarks show a drastic improvement, batching in m For TimescaleDB the insert performance depends a lot on the number and size of chunks that are written to. In a fill-level test with 50 million inserts per step where in each step the timestamps started again (so the same chunks were written to as in the last step) performance degraded rapidly. But in the more realistic case of ever increasing timestamps (so new chunks being added) performance stayed relatively constant. -Azure Data Explorer offers compute and storage optimized sku types, as well as batch and stream ingestion. For storage Optimized it was decided to test against Standard_L8s_v2, for compute optimized against ??. -Notable about Azure Data Explorer is the fact, that inserts are not immediatly written to the database, instead their queued. The larger the batch/stream the longer it takes until the queue +Notable about Azure Data Explorer is the fact, that inserts are not immediately written to the database, instead their queued. The larger the batch/stream the longer it takes until the queue starts to work it off. Once started it can keep the pace. For example a couple of rows take 20 - 30 seconds to appear in the database, 1000 rows 5 - 6 minutes, but 12.5 million rows require also only 5 - 6 minutes. ### Query performance @@ -83,8 +84,8 @@ starts to work it off. Once started it can keep the pace. For example a couple o | TimescaleDB | 30 | 0.17 | 34 | 42 | 38 | | Elasticsearch | 0.04 | 0.03 | 5.3 | 11 | 13 | | Yugabyte (YSQL) | 160 | 0.03 | 220 | 1700 | failure | -| Azure Data Explorer (Storage optimized( | 0.33 | 0.76 | 1.0 | 3.2 | 8.6 | -| Azure Data Explorer (Storage optimized( | ?? | ?? | ?? | ?? | ?? | +| Azure Data Explorer (Storage optimized) | 0.33 | 0.76 | 1.0 | 3.2 | 8.6 | +| Azure Data Explorer (Compute optimized) | 0.29 | 0.55 | 0.69 | 2.2 | failure | The table gives the average query duration in seconds @@ -112,6 +113,10 @@ For TimescaleDB query performance is also very dependent on the number and size Elasticsearch seems to cache query results, as such running the queries several times will yield millisecond response times for all queries. The times noted in the table above are against a freshly started elasticsearch cluster. +In case of the newest-per-device query (compute optimized) Azure Data Explorer did not succeed but terminated with "partition operator exceed amount of maximum partitions allowed (64)." +When a query is run against Azure Data Explorer, the query engine tries to optimize it by breaking it down into smaller, parallelizable tasks that can be executed across multiple partitions. +If the query requires more partitions than the maximum allowed limit, it will fail with the error message above. + For all databases there seems to be a rough linear correlation between query times and database size. So when running the tests with only 50 million rows the query times were about 10 times as fast. ## Running the tests @@ -327,7 +332,7 @@ kubectl apply -f dbinstall/elastic-deployment.yaml Azure Data Explorer (ADX) is a fully managed, high-performance, big data analytics platform. Azure Data Explorer can take all this varied data, and then ingest, process, and store it. You can use Azure Data Explorer for near real-time queries and advanced analytics. -To deploy an ADX-Cluster change the Service Principle (AAD) to your own Service Principle inside dbinstall/azure_data_explorer/main.tf. +To deploy an ADX-Cluster change the Service Principle (AAD) to your own Service Principle inside dbinstall/azure_data_explorer/main.tf: ``` data "azuread_service_principal" "service-principle" { display_name = "mw_iot_ADX-DB-Comparison" From f4200624cccbfc37fb2eefcdad294da5f684ed37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Sat, 8 Apr 2023 17:49:40 +0200 Subject: [PATCH 23/27] Fix command in README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 0108021..94cea39 100644 --- a/README.md +++ b/README.md @@ -360,7 +360,7 @@ terraform apply Finally, create the kubernetes secret: ````bash -kubectl create secret generic adx-secret --from-literal=adx_aad_app_id= --from-literal=adx_app_key= --from=adx_authority_id= -n default +kubectl create secret generic adx-secret --from-literal=adx_aad_app_id= --from-literal=adx_app_key= --from-literal=adx_authority_id= -n default ```` To enable streaming set the `batch` parameter of the config to `false`. This will not apply to existing tables. Streaming ingestion on cluster level is enabled by default by terraform cluster config. From 92c851a2340e3443c3dc44f2ae78ebda38c478b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Sat, 8 Apr 2023 18:17:50 +0200 Subject: [PATCH 24/27] Small refactoring --- simulator/modules/azure_data_explorer.py | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py index bfc3ee3..cc10078 100644 --- a/simulator/modules/azure_data_explorer.py +++ b/simulator/modules/azure_data_explorer.py @@ -19,21 +19,19 @@ KUSTO_INGEST_URI = config["kusto_ingest_uri"] KUSTO_DATABASE = config["kusto_db"] +TABLE_NAMES = ["events0", "events1", "events2", "events3"] if config["use_multiple_tables"] else ["events"] + def init(): - if config["use_multiple_tables"]: - table_names = ["events0", "events1", "events2", "events3"] - else: - table_names = ["events"] with _kusto_client() as kusto_client: existing_tables = _get_existing_tables(kusto_client) if config["clean_database"]: _clean_database(existing_tables, kusto_client) else: - table_names = _get_tables_requiring_creation(existing_tables, table_names) + table_names = _get_tables_requiring_creation(existing_tables, TABLE_NAMES) for table_name in table_names: _create_table(kusto_client, table_name) - _handle_stream_ingestion(kusto_client, table_name) + _configure_stream_ingestion(kusto_client, table_name) _create_ingestion_mapping(kusto_client, table_name) @@ -107,7 +105,7 @@ def _create_table(kusto_client, table_name): kusto_client.execute_mgmt(KUSTO_DATABASE, create_table_command) -def _handle_stream_ingestion(kusto_client, table_name): +def _configure_stream_ingestion(kusto_client, table_name): if not config.get("batch_mode", True): print(f"Enable streaming for {table_name}") enable_streaming_command = f".alter table {table_name} policy streamingingestion enable" @@ -193,18 +191,11 @@ def _ingest(table, timestamps, device_ids, sequence_numbers, temperatures): def _insert_events(events, batch_mode, batch_size): - print("Connecting to database", flush=True) - use_multiple_tables = config["use_multiple_tables"] - if use_multiple_tables: - table_names = ["events0", "events1", "events2", "events3"] - else: - table_names = ["events"] - print("Inserting events", flush=True) if batch_mode: - _batch_insert(events, batch_size, table_names) + _batch_insert(events, batch_size, TABLE_NAMES) else: - _stream_insert(events, table_names) + _stream_insert(events, TABLE_NAMES) def _execute_query(name, query, query_times): From cdb32e2b1fae406398561ca82e71f7a34c23d767 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Sat, 8 Apr 2023 19:35:31 +0200 Subject: [PATCH 25/27] Small refactoring --- simulator/modules/azure_data_explorer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py index cc10078..df280af 100644 --- a/simulator/modules/azure_data_explorer.py +++ b/simulator/modules/azure_data_explorer.py @@ -20,6 +20,7 @@ KUSTO_DATABASE = config["kusto_db"] TABLE_NAMES = ["events0", "events1", "events2", "events3"] if config["use_multiple_tables"] else ["events"] +EVENT_TABLE_MAPPING = """'[{"Name":"timestamp","datatype":"long","Ordinal":0}, {"Name":"device_id","datatype":"string","Ordinal":1}, {"Name":"sequence_number","datatype":"long","Ordinal":2}, {"Name":"temperature","datatype":"real","Ordinal":3}]'""" def init(): @@ -95,7 +96,7 @@ def _get_existing_tables(kusto_client): def _create_ingestion_mapping(kusto_client, table_name): - create_mapping_command = f""".create table {table_name} ingestion csv mapping '{table_name}_CSV_Mapping' '[{{"Name":"timestamp","datatype":"long","Ordinal":0}}, {{"Name":"device_id","datatype":"string","Ordinal":1}}, {{"Name":"sequence_number","datatype":"long","Ordinal":2}}, {{"Name":"temperature","datatype":"real","Ordinal":3}}]'""" + create_mapping_command = f""".create table {table_name} ingestion csv mapping '{table_name}_CSV_Mapping' {EVENT_TABLE_MAPPING}""" kusto_client.execute_mgmt(KUSTO_DATABASE, create_mapping_command) From f5895409377ee6f275f743d2033010cca3a21154 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=B6ttcher?= Date: Sat, 8 Apr 2023 19:40:34 +0200 Subject: [PATCH 26/27] Improve README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 94cea39..ba0105b 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ This readme contains three sections: ## Benchmark Using the test setup from this repository we ran tests against PostgreSQL, CockroachDB, YugabyteDB, ArangoDB, Cassandra and InfluxDB with (aside from PostgreSQL and InfluxDB) 3 nodes and 16 parallel workers that insert data (more concurrent workers might speed up the process but we didn't test that as this test was only to establish a baseline). All tests were done on a k3s cluster consisting of 8 m5ad.2xlarge (8vCPU, 32GB memory) and 3 i3.8xlarge (32vCPU, 244GB memory) EC2 instances on AWS. The database pods were run on the i3 instances with a resource limit of 8 cores and 10 GB memory per node, the client pods of the benchmark on the m5ad instances. In the text below all mentions of nodes refer to database nodes (pods) and not VMs or kubernetes nodes. -For Azure Data Explorer the test setup adapted to use: AKS with Standard_D8_v5 on 3 Nodes. +For Azure Data Explorer the test setup was adapted to the following: AKS with Standard_D8_v5 on 3 Nodes. All tests were run on an empty database. From 0488cddfe228aff46bdac26d6404846060592ec8 Mon Sep 17 00:00:00 2001 From: David Boettcher <32432917+DavidOno@users.noreply.github.com> Date: Tue, 11 Apr 2023 09:32:16 +0200 Subject: [PATCH 27/27] Improve README Add detail about queuing to insert table --- README.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index ba0105b..319fce4 100644 --- a/README.md +++ b/README.md @@ -51,10 +51,12 @@ Inserts were done with 16 parallel workers, and each test was run 3 times with t | InfluxDB | 460000 | batch, size 1000 | - | | TimescaleDB | 600000 | copy, size 1000 | - | | Elasticsearch | 170000 | batch, size 10000 | db | -| Azure Data Explorer (Storage optimized) | 36000 | batch, size 1000 | - | -| Azure Data Explorer (Storage optimized) | 30000 | stream, size 1000 | - | -| Azure Data Explorer (Compute optimized) | 38000 | batch, size 1000 | - | -| Azure Data Explorer (Compute optimized) | 53000 | stream, size 1000 | - | +| Azure Data Explorer (Storage optimized) | 36000* | batch, size 1000 | - | +| Azure Data Explorer (Storage optimized) | 30000* | stream, size 1000 | - | +| Azure Data Explorer (Compute optimized) | 38000* | batch, size 1000 | - | +| Azure Data Explorer (Compute optimized) | 53000* | stream, size 1000 | - | + +*Inserts are not written into database immediately, but only queued You can find additional results from older runs in [old-results.md](old-results.md) but be aware that comparing them with the current ones is not always possible due to different conditions during the runs.