From 87b6c250f6e2ef1e66b139c35a6834daafe08263 Mon Sep 17 00:00:00 2001 From: cching95 <73163191+cching95@users.noreply.github.com> Date: Tue, 19 Mar 2024 11:07:13 +0000 Subject: [PATCH] Mirico Transformer TagName Column Updates (#708) * updated mirico transformer tagname column Signed-off-by: Chloe Ching * update casing in mirico transformer Signed-off-by: Chloe Ching --------- Signed-off-by: Chloe Ching --- .../_pipeline_utils/mirico_field_mappings.py | 4 +- .../transformers/spark/mirico_json_to_pcdm.py | 50 +++++++++++++++++-- .../spark/test_mirico_json_to_pcdm.py | 24 ++++----- 3 files changed, 57 insertions(+), 21 deletions(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/mirico_field_mappings.py b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/mirico_field_mappings.py index a7f9a6134..56fc8625f 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/mirico_field_mappings.py +++ b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/mirico_field_mappings.py @@ -1,8 +1,8 @@ MIRICO_FIELD_MAPPINGS = { 0: {"TagName": "timeStamp", "ValueType": "string"}, - 1: {"TagName": "gasTypeId", "ValueType": "integer"}, + 1: {"TagName": "gasTypeId", "ValueType": "float"}, 2: {"TagName": "pathLengthMeters", "ValueType": "float"}, - 3: {"TagName": "quality", "ValueType": "integer"}, + 3: {"TagName": "quality", "ValueType": "float"}, 4: {"TagName": "windBearingDegreesTo", "ValueType": "float"}, 5: {"TagName": "windSpeedMetersPerSecond", "ValueType": "float"}, 6: {"TagName": "pressureMillibar", "ValueType": "float"}, diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py index 883955de5..79cbfafd0 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py @@ -122,10 +122,12 @@ def transform(self) -> DataFrame: .withColumn("Value", map_values("body")) .select( map_from_arrays("TagName", "Value").alias("x"), - to_timestamp(col("x.timeStamp")).alias("EventTime"), - col("x.siteName").alias("SiteName"), + col("x.timeStamp").alias("EventTime"), + col("x.siteName").alias("siteName"), + col("x.gasType").alias("gasType"), + col("x.retroName").alias("retroName"), ) - .select("EventTime", "SiteName", posexplode("x")) + .select("EventTime", "siteName", "gasType", "retroName", posexplode("x")) .withColumn( "ValueType", udf(lambda row: mapping[row]["ValueType"])(col("pos")) ) @@ -140,14 +142,52 @@ def transform(self) -> DataFrame: *[ upper(lit(self.tagname_field)), concat_ws( - "_", *[upper(col("SiteName")), upper(col("key"))] + "_", + *[ + upper(col("siteName")), + upper(col("retroName")), + when( + upper(col("key")) == "GASPPM", + concat_ws( + "_", + *[upper(col("key")), upper(col("gasType"))] + ), + ).otherwise(upper(col("key"))), + ] ), ] ), ).otherwise( - concat_ws("_", *[upper(col("SiteName")), upper(col("key"))]) + concat_ws( + "_", + *[ + upper(col("siteName")), + upper(col("retroName")), + when( + upper(col("key")) == "GASPPM", + concat_ws( + "_", *[upper(col("key")), upper(col("gasType"))] + ), + ).otherwise(upper(col("key"))), + ] + ) ), ) + .filter( + ~col("key").isin( + "timeStamp", + "gasType", + "retroLongitude", + "retroLatitude", + "retroAltitude", + "sensorLongitude", + "sensorLatitude", + "sensorAltitude", + "siteName", + "siteKey", + "retroName", + ) + ) ) return df.select( "EventTime", "TagName", "Status", "Value", "ValueType", "ChangeType" diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_pcdm.py b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_pcdm.py index 6798b00f2..6ff46ffbb 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_pcdm.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_pcdm.py @@ -31,18 +31,14 @@ _package_version_meets_minimum, ) -EVENTTIME = datetime.fromisoformat("2023-11-03T16:21:16") - def test_mirico_json_to_pcdm(spark_session: SparkSession): - mirico_json_data = ( - '{"timeStamp": "2023-11-03T16:21:16", "siteName": "test_site_name"}' - ) + mirico_json_data = '{"timeStamp": "2023-11-03T16:21:16", "siteName": "test_site_name", "gasTypeId": 3, "quality": 10}' mirico_df: DataFrame = spark_session.createDataFrame([{"body": mirico_json_data}]) expected_schema = StructType( [ - StructField("EventTime", TimestampType(), True), + StructField("EventTime", StringType(), True), StructField("TagName", StringType(), False), StructField("Status", StringType(), False), StructField("Value", StringType(), True), @@ -53,19 +49,19 @@ def test_mirico_json_to_pcdm(spark_session: SparkSession): expected_data = [ { - "EventTime": EVENTTIME, - "TagName": "TEST_SITE_NAME_TIMESTAMP", + "EventTime": "2023-11-03T16:21:16", + "TagName": "TEST_SITE_NAME_GASTYPEID", "Status": "Good", - "Value": "2023-11-03T16:21:16", - "ValueType": "string", + "Value": 3, + "ValueType": "float", "ChangeType": "insert", }, { - "EventTime": EVENTTIME, - "TagName": "TEST_SITE_NAME_SITENAME", + "EventTime": "2023-11-03T16:21:16", + "TagName": "TEST_SITE_NAME_QUALITY", "Status": "Good", - "Value": "test_site_name", - "ValueType": "integer", + "Value": 10, + "ValueType": "float", "ChangeType": "insert", }, ]