Skip to content

Commit

Permalink
Merge branch 'master' into python-311-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Dec 27, 2024
2 parents 10174f4 + 29e4528 commit e691850
Show file tree
Hide file tree
Showing 45 changed files with 740 additions and 440 deletions.
50 changes: 36 additions & 14 deletions .github/workflows/docker-unified.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1011,18 +1011,39 @@ jobs:
needs: setup
outputs:
matrix: ${{ steps.set-matrix.outputs.matrix }}
cypress_batch_count: ${{ steps.set-batch-count.outputs.cypress_batch_count }}
python_batch_count: ${{ steps.set-batch-count.outputs.python_batch_count }}
steps:
- id: set-batch-count
# Tests are split simply to ensure the configured number of batches for parallelization. This may need some
# increase as a new tests added increase the duration where an additional parallel batch helps.
# python_batch_count is used to split pytests in the smoke-test (batches of actual test functions)
# cypress_batch_count is used to split the collection of cypress test specs into batches.
run: |
echo "cypress_batch_count=11" >> "$GITHUB_OUTPUT"
echo "python_batch_count=5" >> "$GITHUB_OUTPUT"
- id: set-matrix
# For m batches for python and n batches for cypress, we need a test matrix of python x m + cypress x n.
# while the github action matrix generation can handle these two parts individually, there isnt a way to use the
# two generated matrices for the same job. So, produce that matrix with scripting and use the include directive
# to add it to the test matrix.
run: |
if [ '${{ needs.setup.outputs.frontend_only }}' == 'true' ]; then
echo 'matrix=["cypress_suite1","cypress_rest"]' >> "$GITHUB_OUTPUT"
elif [ '${{ needs.setup.outputs.ingestion_only }}' == 'true' ]; then
echo 'matrix=["no_cypress_suite0","no_cypress_suite1"]' >> "$GITHUB_OUTPUT"
elif [[ '${{ needs.setup.outputs.backend_change }}' == 'true' || '${{ needs.setup.outputs.smoke_test_change }}' == 'true' ]]; then
echo 'matrix=["no_cypress_suite0","no_cypress_suite1","cypress_suite1","cypress_rest"]' >> "$GITHUB_OUTPUT"
else
echo 'matrix=[]' >> "$GITHUB_OUTPUT"
python_batch_count=${{ steps.set-batch-count.outputs.python_batch_count }}
python_matrix=$(printf "{\"test_strategy\":\"pytests\",\"batch\":\"0\",\"batch_count\":\"$python_batch_count\"}"; for ((i=1;i<python_batch_count;i++)); do printf ",{\"test_strategy\":\"pytests\", \"batch_count\":\"$python_batch_count\",\"batch\":\"%d\"}" $i; done)
cypress_batch_count=${{ steps.set-batch-count.outputs.cypress_batch_count }}
cypress_matrix=$(printf "{\"test_strategy\":\"cypress\",\"batch\":\"0\",\"batch_count\":\"$cypress_batch_count\"}"; for ((i=1;i<cypress_batch_count;i++)); do printf ",{\"test_strategy\":\"cypress\", \"batch_count\":\"$cypress_batch_count\",\"batch\":\"%d\"}" $i; done)
includes=''
if [[ "${{ needs.setup.outputs.frontend_only }}" == 'true' ]]; then
includes=$cypress_matrix
elif [ "${{ needs.setup.outputs.ingestion_only }}" == 'true' ]; then
includes=$python_matrix
elif [[ "${{ needs.setup.outputs.backend_change }}" == 'true' || "${{ needs.setup.outputs.smoke_test_change }}" == 'true' ]]; then
includes="$python_matrix,$cypress_matrix"
fi
echo "matrix={\"include\":[$includes] }" >> "$GITHUB_OUTPUT"
smoke_test:
name: Run Smoke Tests
Expand All @@ -1043,8 +1064,7 @@ jobs:
]
strategy:
fail-fast: false
matrix:
test_strategy: ${{ fromJson(needs.smoke_test_matrix.outputs.matrix) }}
matrix: ${{ fromJson(needs.smoke_test_matrix.outputs.matrix) }}
if: ${{ always() && !failure() && !cancelled() && needs.smoke_test_matrix.outputs.matrix != '[]' }}
steps:
- name: Free up disk space
Expand Down Expand Up @@ -1220,6 +1240,8 @@ jobs:
CYPRESS_RECORD_KEY: ${{ secrets.CYPRESS_RECORD_KEY }}
CLEANUP_DATA: "false"
TEST_STRATEGY: ${{ matrix.test_strategy }}
BATCH_COUNT: ${{ matrix.batch_count }}
BATCH_NUMBER: ${{ matrix.batch }}
run: |
echo "$DATAHUB_VERSION"
./gradlew --stop
Expand All @@ -1230,25 +1252,25 @@ jobs:
if: failure()
run: |
docker ps -a
TEST_STRATEGY="-${{ matrix.test_strategy }}"
TEST_STRATEGY="-${{ matrix.test_strategy }}-${{ matrix.batch }}"
source .github/scripts/docker_logs.sh
- name: Upload logs
uses: actions/upload-artifact@v3
if: failure()
with:
name: docker-logs-${{ matrix.test_strategy }}
name: docker-logs-${{ matrix.test_strategy }}-${{ matrix.batch }}
path: "docker_logs/*.log"
retention-days: 5
- name: Upload screenshots
uses: actions/upload-artifact@v3
if: failure()
with:
name: cypress-snapshots-${{ matrix.test_strategy }}
name: cypress-snapshots-${{ matrix.test_strategy }}-${{ matrix.batch }}
path: smoke-test/tests/cypress/cypress/screenshots/
- uses: actions/upload-artifact@v3
if: always()
with:
name: Test Results (smoke tests) ${{ matrix.test_strategy }}
name: Test Results (smoke tests) ${{ matrix.test_strategy }} ${{ matrix.batch }}
path: |
**/build/reports/tests/test/**
**/build/test-results/test/**
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ buildscript {
ext.pegasusVersion = '29.57.0'
ext.mavenVersion = '3.6.3'
ext.versionGradle = '8.11.1'
ext.springVersion = '6.1.13'
ext.springVersion = '6.1.14'
ext.springBootVersion = '3.2.9'
ext.springKafkaVersion = '3.1.6'
ext.openTelemetryVersion = '1.18.0'
Expand Down
3 changes: 2 additions & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
This file documents any backwards-incompatible changes in DataHub and assists people when migrating to a new version.

## Next

- #12191 - Configs `include_view_lineage` and `include_view_column_lineage` are removed from snowflake ingestion source. View and External Table DDL lineage will always be ingested when definitions are available.
- #11560 - The PowerBI ingestion source configuration option include_workspace_name_in_dataset_urn determines whether the workspace name is included in the PowerBI dataset's URN.<br/> PowerBI allows to have identical name of semantic model and their tables across the workspace, It will overwrite the semantic model in-case of multi-workspace ingestion.<br/>
Entity urn with `include_workspace_name_in_dataset_urn: false`

Expand All @@ -42,6 +42,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #12077: `Kafka` source no longer ingests schemas from schema registry as separate entities by default, set `ingest_schemas_as_entities` to `true` to ingest them
- OpenAPI Update: PIT Keep Alive parameter added to scroll. NOTE: This parameter requires the `pointInTimeCreationEnabled` feature flag to be enabled and the `elasticSearch.implementation` configuration to be `elasticsearch`. This feature is not supported for OpenSearch at this time and the parameter will not be respected without both of these set.
- OpenAPI Update 2: Previously there was an incorrectly marked parameter named `sort` on the generic list entities endpoint for v3. This parameter is deprecated and only supports a single string value while the documentation indicates it supports a list of strings. This documentation error has been fixed and the correct field, `sortCriteria`, is now documented which supports a list of strings.
- #12223: For dbt Cloud ingestion, the "View in dbt" link will point at the "Explore" page in the dbt Cloud UI. You can revert to the old behavior of linking to the dbt Cloud IDE by setting `external_url_mode: ide".

### Breaking Changes

Expand Down
14 changes: 12 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.report import EntityFilterReport
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws import s3_util
Expand Down Expand Up @@ -115,7 +116,6 @@

logger = logging.getLogger(__name__)


DEFAULT_PLATFORM = "glue"
VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"]

Expand Down Expand Up @@ -220,6 +220,7 @@ def platform_validator(cls, v: str) -> str:
class GlueSourceReport(StaleEntityRemovalSourceReport):
tables_scanned = 0
filtered: List[str] = dataclass_field(default_factory=list)
databases: EntityFilterReport = EntityFilterReport.field(type="database")

num_job_script_location_missing: int = 0
num_job_script_location_invalid: int = 0
Expand Down Expand Up @@ -668,6 +669,7 @@ def get_datajob_wu(self, node: Dict[str, Any], job_name: str) -> MetadataWorkUni
return MetadataWorkUnit(id=f'{job_name}-{node["Id"]}', mce=mce)

def get_all_databases(self) -> Iterable[Mapping[str, Any]]:
logger.debug("Getting all databases")
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/paginator/GetDatabases.html
paginator = self.glue_client.get_paginator("get_databases")

Expand All @@ -684,10 +686,18 @@ def get_all_databases(self) -> Iterable[Mapping[str, Any]]:
pattern += "[?!TargetDatabase]"

for database in paginator_response.search(pattern):
if self.source_config.database_pattern.allowed(database["Name"]):
if (not self.source_config.database_pattern.allowed(database["Name"])) or (
self.source_config.catalog_id
and database.get("CatalogId")
and database.get("CatalogId") != self.source_config.catalog_id
):
self.report.databases.dropped(database["Name"])
else:
self.report.databases.processed(database["Name"])
yield database

def get_tables_from_database(self, database: Mapping[str, Any]) -> Iterable[Dict]:
logger.debug(f"Getting tables from database {database['Name']}")
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/paginator/GetTables.html
paginator = self.glue_client.get_paginator("get_tables")
database_name = database["Name"]
Expand Down
13 changes: 10 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from datetime import datetime
from json import JSONDecodeError
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Literal, Optional, Tuple
from urllib.parse import urlparse

import dateutil.parser
Expand Down Expand Up @@ -62,6 +62,11 @@ class DBTCloudConfig(DBTCommonConfig):
description="The ID of the run to ingest metadata from. If not specified, we'll default to the latest run.",
)

external_url_mode: Literal["explore", "ide"] = Field(
default="explore",
description='Where should the "View in dbt" link point to - either the "Explore" UI or the dbt Cloud IDE',
)

@root_validator(pre=True)
def set_metadata_endpoint(cls, values: dict) -> dict:
if values.get("access_url") and not values.get("metadata_endpoint"):
Expand Down Expand Up @@ -527,5 +532,7 @@ def _parse_into_dbt_column(
)

def get_external_url(self, node: DBTNode) -> Optional[str]:
# TODO: Once dbt Cloud supports deep linking to specific files, we can use that.
return f"{self.config.access_url}/develop/{self.config.account_id}/projects/{self.config.project_id}"
if self.config.external_url_mode == "explore":
return f"{self.config.access_url}/explore/{self.config.account_id}/projects/{self.config.project_id}/environments/production/details/{node.dbt_name}"
else:
return f"{self.config.access_url}/develop/{self.config.account_id}/projects/{self.config.project_id}"
23 changes: 18 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
SoftDeletedEntitiesCleanupConfig,
SoftDeletedEntitiesReport,
)
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -86,6 +87,7 @@ class DataHubGcSourceReport(
DataProcessCleanupReport,
SoftDeletedEntitiesReport,
DatahubExecutionRequestCleanupReport,
IngestionStageReport,
):
expired_tokens_revoked: int = 0

Expand Down Expand Up @@ -139,31 +141,40 @@ def get_workunits_internal(
) -> Iterable[MetadataWorkUnit]:
if self.config.cleanup_expired_tokens:
try:
self.report.report_ingestion_stage_start("Expired Token Cleanup")
self.revoke_expired_tokens()
except Exception as e:
self.report.failure("While trying to cleanup expired token ", exc=e)
if self.config.truncate_indices:
try:
self.report.report_ingestion_stage_start("Truncate Indices")
self.truncate_indices()
except Exception as e:
self.report.failure("While trying to truncate indices ", exc=e)
if self.config.soft_deleted_entities_cleanup.enabled:
try:
self.report.report_ingestion_stage_start(
"Soft Deleted Entities Cleanup"
)
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
except Exception as e:
self.report.failure(
"While trying to cleanup soft deleted entities ", exc=e
)
if self.config.execution_request_cleanup.enabled:
try:
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
if self.config.dataprocess_cleanup.enabled:
try:
self.report.report_ingestion_stage_start("Data Process Cleanup")
yield from self.dataprocess_cleanup.get_workunits_internal()
except Exception as e:
self.report.failure("While trying to cleanup data process ", exc=e)
if self.config.execution_request_cleanup.enabled:
try:
self.report.report_ingestion_stage_start("Execution request Cleanup")
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
# Otherwise last stage's duration does not get calculated.
self.report.report_ingestion_stage_start("End")
yield from []

def truncate_indices(self) -> None:
Expand Down Expand Up @@ -281,6 +292,8 @@ def revoke_expired_tokens(self) -> None:
list_access_tokens = expired_tokens_res.get("listAccessTokens", {})
tokens = list_access_tokens.get("tokens", [])
total = list_access_tokens.get("total", 0)
if tokens == []:
break
for token in tokens:
self.report.expired_tokens_revoked += 1
token_id = token["id"]
Expand Down
Loading

0 comments on commit e691850

Please sign in to comment.