Skip to content

Commit

Permalink
[DENG-4256] Search for shredder targets using table lineage (#6289)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenWu authored Oct 3, 2024
1 parent 80a4927 commit d741403
Show file tree
Hide file tree
Showing 6 changed files with 445 additions and 10 deletions.
20 changes: 20 additions & 0 deletions dags.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1810,3 +1810,23 @@ bqetl_dynamic_dau:
tags:
- repo/bigquery-etl
- impact/tier_2

bqetl_shredder_monitoring:
default_args:
depends_on_past: false
email:
- [email protected]
email_on_failure: true
email_on_retry: False
end_date: null
owner: [email protected]
retries: 2
retry_delay: 30m
start_date: '2024-10-01'
description: '[EXPERIMENTAL] Monitoring queries for shredder operation'
repo: bigquery-etl
schedule_interval: 0 12 * * *
tags:
- repo/bigquery-etl
- impact/tier_3
- triage/no_triage
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ gitpython==3.1.43
google-auth>=2.30.0 # To try to fix "Compute Engine Metadata server call to universe/universe_domain returned 404" errors.
google-cloud-bigquery==3.25.0
google-cloud-bigquery-storage[fastavro]==2.24.0
google-cloud-datacatalog-lineage==0.3.8
google-cloud-storage==2.18.2
Jinja2==3.1.3
jsonschema==4.23.0
Expand Down
25 changes: 15 additions & 10 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ google-api-core[grpc]==2.17.1 \
# google-cloud-bigquery
# google-cloud-bigquery-storage
# google-cloud-core
# google-cloud-datacatalog-lineage
# google-cloud-storage
google-auth==2.35.0 \
--hash=sha256:25df55f327ef021de8be50bad0dfd4a916ad0de96da86cd05661c9297723ad3f \
Expand All @@ -525,6 +526,7 @@ google-auth==2.35.0 \
# google-auth-oauthlib
# google-cloud-bigquery
# google-cloud-core
# google-cloud-datacatalog-lineage
# google-cloud-storage
google-auth-oauthlib==0.8.0 \
--hash=sha256:40cc612a13c3336d5433e94e2adb42a0c88f6feb6c55769e44500fc70043a576 \
Expand All @@ -544,6 +546,10 @@ google-cloud-core==2.3.2 \
# via
# google-cloud-bigquery
# google-cloud-storage
google-cloud-datacatalog-lineage==0.3.8 \
--hash=sha256:8a17b11ba647fda33b36a8680658ce684d75c3d633c67e626001bbb19d8f3ca6 \
--hash=sha256:e22151121cbe7bfb07ae8c8e28319b7a03cc8d17a3407fa9ace0db8581701cfa
# via -r requirements.in
google-cloud-storage==2.18.2 \
--hash=sha256:97a4d45c368b7d401ed48c4fdfe86e1e1cb96401c9e199e419d289e2c0370166 \
--hash=sha256:aaf7acd70cdad9f274d29332673fcab98708d0e1f4dceb5a5356aaef06af4d99
Expand Down Expand Up @@ -742,9 +748,7 @@ jaraco-classes==3.4.0 \
jeepney==0.8.0 \
--hash=sha256:5efe48d255973902f6badc3ce55e2aa6c5c3b3bc642059ef3a91247bcfcc5806 \
--hash=sha256:c0a454ad016ca575060802ee4d590dd912e35c122fa04e70306de3d076cce755
# via
# keyring
# secretstorage
# via secretstorage
jinja2==3.1.3 \
--hash=sha256:7d6d50dd97d52cbc355597bd845fabfbac3f551e1f99619e39a35ce8c370b5fa \
--hash=sha256:ac8bd6544d4bb2c9792bf3a159e80bba8fda7f07e81bc3aed565432d5925ba90
Expand Down Expand Up @@ -1199,10 +1203,12 @@ pre-commit==3.8.0 \
--hash=sha256:8bb6494d4a20423842e198980c9ecf9f96607a07ea29549e180eef9ae80fe7af \
--hash=sha256:9a90a53bf82fdd8778d58085faf8d83df56e40dfe18f45b19446e26bf1b3a63f
# via -r requirements.in
proto-plus==1.22.2 \
--hash=sha256:0e8cda3d5a634d9895b75c573c9352c16486cb75deb0e078b5fda34db4243165 \
--hash=sha256:de34e52d6c9c6fcd704192f09767cb561bb4ee64e70eede20b0834d841f0be4d
# via google-cloud-bigquery-storage
proto-plus==1.24.0 \
--hash=sha256:30b72a5ecafe4406b0d339db35b56c4059064e69227b8c3bda7462397f966445 \
--hash=sha256:402576830425e5f6ce4c2a6702400ac79897dab0b4343821aa5188b0fab81a12
# via
# google-cloud-bigquery-storage
# google-cloud-datacatalog-lineage
protobuf==4.21.12 \
--hash=sha256:1f22ac0ca65bb70a876060d96d914dae09ac98d114294f77584b0d2644fa9c30 \
--hash=sha256:237216c3326d46808a9f7c26fd1bd4b20015fb6867dc5d263a493ef9a539293b \
Expand All @@ -1223,6 +1229,7 @@ protobuf==4.21.12 \
# gcloud
# google-api-core
# google-cloud-bigquery-storage
# google-cloud-datacatalog-lineage
# googleapis-common-protos
# grpcio-status
# proto-plus
Expand Down Expand Up @@ -1970,9 +1977,7 @@ s3transfer==0.10.2 \
secretstorage==3.3.3 \
--hash=sha256:2403533ef369eca6d2ba81718576c5e0f564d5cca1b58f73a8b23e7d4eeebd77 \
--hash=sha256:f356e6628222568e3af06f2eba8df495efa13b3b63081dafd4f7d9a7b7bc9f99
# via
# bigeye-sdk
# keyring
# via bigeye-sdk
shellingham==1.5.4 \
--hash=sha256:7ecfff8f2fd72616f7481040475a65b2bf8af90a56c89140852d1120324e8686 \
--hash=sha256:8dbca0739d487e5bd35ab3ca4b36e11c4078f3a234bfce294b0a0291363404de
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
friendly_name: Shredder Targets
description: |-
Daily list of shredder deletion targets comparing the configured targets with
the lineage of found id tables in bigquery.
owners:
- [email protected]
labels:
incremental: true
owner1: benwu
scheduling:
dag_name: bqetl_shredder_monitoring
arguments: ["--output-table", "moz-fx-data-shared-prod.telemetry_derived.shredder_targets_v1", "--run-date", "{{ ds }}"]
bigquery:
time_partitioning:
type: day
field: run_date
require_partition_filter: true
Loading

2 comments on commit d741403

@dataops-ci-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integration report for "[DENG-4256] Search for shredder targets using table lineage (#6289)"

sql.diff

Click to expand!
Only in /tmp/workspace/generated-sql/dags/: bqetl_shredder_monitoring.py
diff -bur --no-dereference --new-file /tmp/workspace/main-generated-sql/dags/bqetl_shredder_monitoring.py /tmp/workspace/generated-sql/dags/bqetl_shredder_monitoring.py
--- /tmp/workspace/main-generated-sql/dags/bqetl_shredder_monitoring.py	1970-01-01 00:00:00.000000000 +0000
+++ /tmp/workspace/generated-sql/dags/bqetl_shredder_monitoring.py	2024-10-03 17:42:59.000000000 +0000
@@ -0,0 +1,70 @@
+# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
+
+from airflow import DAG
+from airflow.sensors.external_task import ExternalTaskMarker
+from airflow.sensors.external_task import ExternalTaskSensor
+from airflow.utils.task_group import TaskGroup
+import datetime
+from operators.gcp_container_operator import GKEPodOperator
+from utils.constants import ALLOWED_STATES, FAILED_STATES
+from utils.gcp import bigquery_etl_query, bigquery_dq_check
+from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
+
+docs = """
+### bqetl_shredder_monitoring
+
+Built from bigquery-etl repo, [`dags/bqetl_shredder_monitoring.py`](https://github.com/mozilla/bigquery-etl/blob/generated-sql/dags/bqetl_shredder_monitoring.py)
+
+#### Description
+
+[EXPERIMENTAL] Monitoring queries for shredder operation
+#### Owner
+
+[email protected]
+
+#### Tags
+
+* impact/tier_3
+* repo/bigquery-etl
+* triage/no_triage
+"""
+
+
+default_args = {
+    "owner": "[email protected]",
+    "start_date": datetime.datetime(2024, 10, 1, 0, 0),
+    "end_date": None,
+    "email": ["[email protected]"],
+    "depends_on_past": False,
+    "retry_delay": datetime.timedelta(seconds=1800),
+    "email_on_failure": True,
+    "email_on_retry": False,
+    "retries": 2,
+}
+
+tags = ["impact/tier_3", "repo/bigquery-etl", "triage/no_triage"]
+
+with DAG(
+    "bqetl_shredder_monitoring",
+    default_args=default_args,
+    schedule_interval="0 12 * * *",
+    doc_md=docs,
+    tags=tags,
+) as dag:
+
+    monitoring_derived__shredder_targets__v1 = GKEPodOperator(
+        task_id="monitoring_derived__shredder_targets__v1",
+        arguments=[
+            "python",
+            "sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/query.py",
+        ]
+        + [
+            "--output-table",
+            "moz-fx-data-shared-prod.telemetry_derived.shredder_targets_v1",
+            "--run-date",
+            "{{ ds }}",
+        ],
+        image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
+        owner="[email protected]",
+        email=["[email protected]"],
+    )
Only in /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/monitoring_derived: shredder_targets_v1
diff -bur --no-dereference --new-file /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/metadata.yaml /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/metadata.yaml
--- /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/metadata.yaml	1970-01-01 00:00:00.000000000 +0000
+++ /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/metadata.yaml	2024-10-03 17:38:20.000000000 +0000
@@ -0,0 +1,30 @@
+friendly_name: Shredder Targets
+description: |-
+  Daily list of shredder deletion targets comparing the configured targets with
+  the lineage of found id tables in bigquery.
+owners:
+- [email protected]
+labels:
+  incremental: true
+  owner1: bewu
+  dag: bqetl_shredder_monitoring
+scheduling:
+  dag_name: bqetl_shredder_monitoring
+  arguments:
+  - --output-table
+  - moz-fx-data-shared-prod.telemetry_derived.shredder_targets_v1
+  - --run-date
+  - '{{ ds }}'
+bigquery:
+  time_partitioning:
+    type: day
+    field: run_date
+    require_partition_filter: true
+    expiration_days: null
+  range_partitioning: null
+  clustering: null
+workgroup_access:
+- role: roles/bigquery.dataViewer
+  members:
+  - workgroup:mozilla-confidential
+references: {}
diff -bur --no-dereference --new-file /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/query.py /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/query.py
--- /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/query.py	1970-01-01 00:00:00.000000000 +0000
+++ /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/query.py	2024-10-03 17:36:12.000000000 +0000
@@ -0,0 +1,350 @@
+#!/usr/bin/env python3
+
+"""Search for tables with client id columns."""
+import datetime
+from collections import defaultdict
+from multiprocessing.pool import ThreadPool
+from pathlib import Path
+from typing import Any, Dict, Iterable, List, Set
+
+import click
+from google.cloud import bigquery
+from google.cloud import datacatalog_lineage_v1 as datacatalog_lineage
+from google.cloud.bigquery import TableReference
+
+from bigquery_etl.schema import Schema
+from bigquery_etl.shredder.config import (
+    CLIENT_ID,
+    DELETE_TARGETS,
+    GLEAN_CLIENT_ID,
+    SHARED_PROD,
+    DeleteSource,
+    find_glean_targets,
+    get_glean_channel_to_app_name_mapping,
+)
+
+FIND_TABLES_QUERY_TEMPLATE = """
+WITH no_client_id_tables AS (
+  SELECT
+    table_catalog,
+    table_schema,
+    table_name,
+  FROM
+    `{project}.region-us.INFORMATION_SCHEMA.TABLE_OPTIONS`
+  WHERE
+    option_name = 'labels'
+    AND option_value LIKE '%(\"include_client_id\", \"false\")%'
+)
+
+SELECT
+  table_catalog,
+  table_schema,
+  table_name,
+FROM
+  `{project}.region-us.INFORMATION_SCHEMA.TABLES`
+LEFT JOIN
+  no_client_id_tables
+USING
+  (table_catalog, table_schema, table_name)
+WHERE
+  -- find tables with columns ending with client_id
+  (
+    (
+      ddl LIKE '%client_id STRING%' AND no_client_id_tables.table_name IS NULL
+    )
+    OR (
+      -- glean tables may have an all null client_info.client_id column but may have a secondary client id
+      ddl LIKE '%client_id STRING%client_id STRING%' AND no_client_id_tables.table_name IS NOT NULL
+    )
+  )
+  AND table_type = 'BASE TABLE'  -- exclude views
+  AND (
+    table_schema LIKE '%_derived'
+    or table_schema LIKE '%_stable'
+  )
+  -- TODO: can't get lineage for most opmon tables, need to figure this out separately
+  AND table_schema != "operational_monitoring_derived"
+  AND table_schema != "backfills_staging_derived"
+  AND table_name != 'deletion_request_v1'
+  AND table_name != 'deletion_request_v4'
+"""
+
+
+def find_client_id_tables(project: str) -> List[str]:
+    """Return a list of tables that have columns ending with 'client_id'."""
+    client = bigquery.Client()
+    row_results = client.query_and_wait(
+        query=FIND_TABLES_QUERY_TEMPLATE.format(project=project)
+    )
+
+    return [f"{project}.{row.table_schema}.{row.table_name}" for row in row_results]
+
+
+def get_upstream_stable_tables(id_tables: List[str]) -> Dict[str, Set[str]]:
+    """Build map of tables to upstream stable tables using GCP data catalog lineage.
+
+    Note that the data catalog only uses the information from the last 30 days of query jobs.
+    """
+    client = datacatalog_lineage.LineageClient()
+
+    upstream_stable_tables = defaultdict(set)
+
+    def traverse_upstream(base_table: str):
+        """Recursively traverse lineage to find stable tables."""
+        table_ref = TableReference.from_string(base_table)
+
+        if table_ref.dataset_id.endswith("_stable"):  # stable tables are terminal nodes
+            upstream_stable_tables[base_table] = {base_table}
+        elif base_table not in upstream_stable_tables:
+            upstream_links_result = client.search_links(
+                request={
+                    "parent": "projects/moz-fx-data-shared-prod/locations/us",
+                    "target": datacatalog_lineage.EntityReference(
+                        fully_qualified_name=f"bigquery:{base_table}"
+                    ),
+                }
+            )
+            # recursively add upstream tables
+            for upstream_link in upstream_links_result:
+                # remove "bigquery:" and "sharded:" prefixes
+                parent_table = upstream_link.source.fully_qualified_name.split(":")[-1]
+
+                upstream_stable_tables[base_table] = upstream_stable_tables[
+                    base_table
+                ].union(traverse_upstream(parent_table))
+
+        return upstream_stable_tables[base_table]
+
+    upstream_stable_table_map = {}
+
+    print("Upstream stable tables:")
+    for table_name in id_tables:
+        upstream_stable_table_map[table_name] = set(traverse_upstream(table_name))
+        print(f"{table_name} upstream: {upstream_stable_table_map[table_name]}")
+
+    return upstream_stable_table_map
+
+
+def get_associated_deletions(
+    upstream_stable_tables: Dict[str, Set[str]]
+) -> Dict[str, Set[DeleteSource]]:
+    """Get a list of associated deletion requests tables per table based on the stable tables."""
+    # deletion targets for stable tables defined in the shredder config
+    known_stable_table_sources: Dict[str, Set[DeleteSource]] = {
+        f"{target.project}.{target.dataset_id}.{target.table_id}": (
+            set(src) if isinstance(src, Iterable) else {src}
+        )
+        for target, src in DELETE_TARGETS.items()
+        if target.dataset_id.endswith("_stable")
+    }
+
+    table_to_deletions: Dict[str, Set[DeleteSource]] = {}
+
+    datasets_with_additional_deletion_requests = {}
+
+    for base_table in upstream_stable_tables:
+        if base_table.endswith(".additional_deletion_requests_v1"):
+            dataset_name = TableReference.from_string(base_table).dataset_id
+            datasets_with_additional_deletion_requests[dataset_name] = (
+                f"{dataset_name}.additional_deletion_requests_v1"
+            )
+            datasets_with_additional_deletion_requests[
+                dataset_name.replace("_derived", "_stable")
+            ] = f"{dataset_name}.additional_deletion_requests_v1"
+
+    glean_channel_names = get_glean_channel_to_app_name_mapping()
+
+    for table_name, stable_tables in upstream_stable_tables.items():
+        deletion_tables: Set[DeleteSource] = set()
+
+        for stable_table in stable_tables:
+            if stable_table in known_stable_table_sources:
+                table_to_deletions[stable_table] = known_stable_table_sources[
+                    stable_table
+                ]
+            elif stable_table not in table_to_deletions:
+                stable_table_ref = TableReference.from_string(stable_table)
+
+                # glean table
+                if (
+                    stable_table_ref.dataset_id[: -len("_stable")]
+                    in glean_channel_names
+                ):
+                    table_to_deletions[stable_table] = {
+                        DeleteSource(
+                            table=f"{stable_table_ref.dataset_id}.deletion_request_v1",
+                            field=GLEAN_CLIENT_ID,
+                            project=SHARED_PROD,
+                        )
+                    }
+                    if (
+                        stable_table_ref.dataset_id
+                        in datasets_with_additional_deletion_requests
+                    ):
+                        table_to_deletions[stable_table].add(
+                            DeleteSource(
+                                table=datasets_with_additional_deletion_requests[
+                                    stable_table_ref.dataset_id
+                                ],
+                                field=CLIENT_ID,
+                                project=SHARED_PROD,
+                            )
+                        )
+                # unknown legacy telemetry or non-glean structured
+                else:
+                    table_to_deletions[stable_table] = set()
+
+            deletion_tables = deletion_tables.union(table_to_deletions[stable_table])
+
+        table_to_deletions[table_name] = deletion_tables
+
+    return {
+        table: deletions
+        for table, deletions in table_to_deletions.items()
+        if table in upstream_stable_tables
+    }
+
+
+def delete_source_to_dict(source: DeleteSource):
+    """Convert a DeleteSource to a dict, removing the condition field."""
+    d = source.__dict__.copy()
+    d.pop("conditions")
+    return d
+
+
+def get_missing_deletions(
+    associated_deletions: Dict[str, Set[DeleteSource]]
+) -> List[Dict[str, Any]]:
+    """Get list of all tables with the currently configured deletion sources and the sources based on lineage."""
+    # get the generated glean deletion list
+    with ThreadPool(processes=12) as pool:
+        bigquery_client = bigquery.Client()
+        glean_delete_targets = find_glean_targets(pool, client=bigquery_client)
+
+    glean_channel_names = get_glean_channel_to_app_name_mapping()
+    glean_app_name_to_channels = defaultdict(list)
+    for channel, app_name in glean_channel_names.items():
+        glean_app_name_to_channels[app_name].append(channel)
+
+    table_deletions = []
+
+    for target, sources in (*glean_delete_targets.items(), *DELETE_TARGETS.items()):
+        target_table = f"{target.project}.{target.table}"
+
+        # expand per-app deletion request views into per-channel tables
+        unnested_sources = set()
+        if isinstance(sources, Iterable):
+            for source in sources:
+                if (
+                    source.dataset_id in glean_app_name_to_channels
+                    and source.table_id == "deletion_request"
+                ):
+                    for channel in glean_app_name_to_channels[source.dataset_id]:
+                        unnested_sources.add(
+                            DeleteSource(
+                                table=f"{channel}_stable.deletion_request_v1",
+                                field=source.field,
+                                project=source.project,
+                                conditions=source.conditions,
+                            )
+                        )
+                else:
+                    unnested_sources.add(source)
+        else:
+            unnested_sources.add(sources)
+
+        # tables not in associated_deletions likely use another id column, e.g. user_id
+        detected_deletions = associated_deletions.pop(target_table, set())
+
+        table_deletions.append(
+            {
+                "project_id": target.project,
+                "dataset_id": target.dataset_id,
+                "table_id": target.table_id,
+                "current_sources": [
+                    delete_source_to_dict(source) for source in unnested_sources
+                ],
+                "detected_sources": [
+                    delete_source_to_dict(detected) for detected in detected_deletions
+                ],
+                "matching_sources": set(unnested_sources) == detected_deletions,
+            }
+        )
+
+    # delete target not in shredder config
+    for table_name, srcs in associated_deletions.items():
+        project, dataset, table = table_name.split(".")
+        table_deletions.append(
+            {
+                "project_id": project,
+                "dataset_id": dataset,
+                "table_id": table,
+                "current_sources": [],
+                "detected_sources": [
+                    delete_source_to_dict(detected) for detected in detected_deletions
+                ],
+                "matching_sources": False,
+            }
+        )
+
+    return table_deletions
+
+
+def write_to_bigquery(
+    run_date: datetime.datetime,
+    target_table: TableReference,
+    deletions: List[Dict[str, Any]],
+):
+    client = bigquery.Client()
+
+    result = client.load_table_from_json(
+        json_rows=[
+            {"run_date": run_date.date().isoformat(), **deletion}
+            for deletion in deletions
+        ],
+        destination=f"{str(target_table)}${run_date.date().strftime('%Y%m%d')}",
+        job_config=bigquery.LoadJobConfig(
+            write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
+            schema=Schema.from_schema_file(
+                Path(__file__).parent / "schema.yaml"
+            ).to_bigquery_schema(),
+            time_partitioning=bigquery.TimePartitioning(field="run_date"),
+        ),
+    ).result()
+
+    print(f"Wrote {result.output_rows} rows to {result.destination}")
+
+
+@click.command
+@click.option(
+    "--run-date", type=click.DateTime(), help="The date to write in the output."
+)
+@click.option(
+    "--output-table",
+    type=TableReference.from_string,
+    metavar="PROJECT.DATASET.TABLE",
+    help="Table to write results to in the form of PROJECT.DATASET.TABLE.",
+)
+@click.option(
+    "--project-id",
+    default=SHARED_PROD,
+    help="BigQuery project to search for client id tables.",
+)
+def main(run_date, output_table, project_id):
+    """Find tables in the given project that could be added to shredder."""
+    # TODO: handle other id columns
+    client_id_tables = find_client_id_tables(project_id)
+
+    print(f"Found {len(client_id_tables)} client id tables.")
+
+    upstream_stable_tables = get_upstream_stable_tables(client_id_tables)
+
+    associated_deletions = get_associated_deletions(upstream_stable_tables)
+
+    table_deletions = get_missing_deletions(associated_deletions)
+
+    write_to_bigquery(run_date, output_table, table_deletions)
+
+
+if __name__ == "__main__":
+    main()
diff -bur --no-dereference --new-file /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/schema.yaml /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/schema.yaml
--- /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/schema.yaml	1970-01-01 00:00:00.000000000 +0000
+++ /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/schema.yaml	2024-10-03 17:36:12.000000000 +0000
@@ -0,0 +1,42 @@
+fields:
+- name: run_date
+  type: DATE
+  mode: NULLABLE
+- name: project_id
+  type: STRING
+  mode: NULLABLE
+- name: dataset_id
+  type: STRING
+  mode: NULLABLE
+- name: table_id
+  type: STRING
+  mode: NULLABLE
+- name: current_sources
+  type: RECORD
+  mode: REPEATED
+  fields:
+  - name: table
+    type: STRING
+    mode: NULLABLE
+  - name: field
+    type: STRING
+    mode: NULLABLE
+  - name: project
+    type: STRING
+    mode: NULLABLE
+- name: detected_sources
+  type: RECORD
+  mode: REPEATED
+  fields:
+  - name: table
+    type: STRING
+    mode: NULLABLE
+  - name: field
+    type: STRING
+    mode: NULLABLE
+  - name: project
+    type: STRING
+    mode: NULLABLE
+- name: matching_sources
+  type: BOOLEAN
+  mode: NULLABLE

Link to full diff

@dataops-ci-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integration report for "[DENG-4256] Search for shredder targets using table lineage (#6289)"

sql.diff

No content detected.

Please sign in to comment.