Skip to content

Commit

Permalink
Merge branch 'master' into platform-events-business-attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
deepgarg-visa authored Dec 27, 2024
2 parents 16a9900 + d7de7eb commit e8a054c
Show file tree
Hide file tree
Showing 26 changed files with 457 additions and 311 deletions.
50 changes: 36 additions & 14 deletions .github/workflows/docker-unified.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1011,18 +1011,39 @@ jobs:
needs: setup
outputs:
matrix: ${{ steps.set-matrix.outputs.matrix }}
cypress_batch_count: ${{ steps.set-batch-count.outputs.cypress_batch_count }}
python_batch_count: ${{ steps.set-batch-count.outputs.python_batch_count }}
steps:
- id: set-batch-count
# Tests are split simply to ensure the configured number of batches for parallelization. This may need some
# increase as a new tests added increase the duration where an additional parallel batch helps.
# python_batch_count is used to split pytests in the smoke-test (batches of actual test functions)
# cypress_batch_count is used to split the collection of cypress test specs into batches.
run: |
echo "cypress_batch_count=11" >> "$GITHUB_OUTPUT"
echo "python_batch_count=5" >> "$GITHUB_OUTPUT"
- id: set-matrix
# For m batches for python and n batches for cypress, we need a test matrix of python x m + cypress x n.
# while the github action matrix generation can handle these two parts individually, there isnt a way to use the
# two generated matrices for the same job. So, produce that matrix with scripting and use the include directive
# to add it to the test matrix.
run: |
if [ '${{ needs.setup.outputs.frontend_only }}' == 'true' ]; then
echo 'matrix=["cypress_suite1","cypress_rest"]' >> "$GITHUB_OUTPUT"
elif [ '${{ needs.setup.outputs.ingestion_only }}' == 'true' ]; then
echo 'matrix=["no_cypress_suite0","no_cypress_suite1"]' >> "$GITHUB_OUTPUT"
elif [[ '${{ needs.setup.outputs.backend_change }}' == 'true' || '${{ needs.setup.outputs.smoke_test_change }}' == 'true' ]]; then
echo 'matrix=["no_cypress_suite0","no_cypress_suite1","cypress_suite1","cypress_rest"]' >> "$GITHUB_OUTPUT"
else
echo 'matrix=[]' >> "$GITHUB_OUTPUT"
python_batch_count=${{ steps.set-batch-count.outputs.python_batch_count }}
python_matrix=$(printf "{\"test_strategy\":\"pytests\",\"batch\":\"0\",\"batch_count\":\"$python_batch_count\"}"; for ((i=1;i<python_batch_count;i++)); do printf ",{\"test_strategy\":\"pytests\", \"batch_count\":\"$python_batch_count\",\"batch\":\"%d\"}" $i; done)
cypress_batch_count=${{ steps.set-batch-count.outputs.cypress_batch_count }}
cypress_matrix=$(printf "{\"test_strategy\":\"cypress\",\"batch\":\"0\",\"batch_count\":\"$cypress_batch_count\"}"; for ((i=1;i<cypress_batch_count;i++)); do printf ",{\"test_strategy\":\"cypress\", \"batch_count\":\"$cypress_batch_count\",\"batch\":\"%d\"}" $i; done)
includes=''
if [[ "${{ needs.setup.outputs.frontend_only }}" == 'true' ]]; then
includes=$cypress_matrix
elif [ "${{ needs.setup.outputs.ingestion_only }}" == 'true' ]; then
includes=$python_matrix
elif [[ "${{ needs.setup.outputs.backend_change }}" == 'true' || "${{ needs.setup.outputs.smoke_test_change }}" == 'true' ]]; then
includes="$python_matrix,$cypress_matrix"
fi
echo "matrix={\"include\":[$includes] }" >> "$GITHUB_OUTPUT"
smoke_test:
name: Run Smoke Tests
Expand All @@ -1043,8 +1064,7 @@ jobs:
]
strategy:
fail-fast: false
matrix:
test_strategy: ${{ fromJson(needs.smoke_test_matrix.outputs.matrix) }}
matrix: ${{ fromJson(needs.smoke_test_matrix.outputs.matrix) }}
if: ${{ always() && !failure() && !cancelled() && needs.smoke_test_matrix.outputs.matrix != '[]' }}
steps:
- name: Free up disk space
Expand Down Expand Up @@ -1220,6 +1240,8 @@ jobs:
CYPRESS_RECORD_KEY: ${{ secrets.CYPRESS_RECORD_KEY }}
CLEANUP_DATA: "false"
TEST_STRATEGY: ${{ matrix.test_strategy }}
BATCH_COUNT: ${{ matrix.batch_count }}
BATCH_NUMBER: ${{ matrix.batch }}
run: |
echo "$DATAHUB_VERSION"
./gradlew --stop
Expand All @@ -1230,25 +1252,25 @@ jobs:
if: failure()
run: |
docker ps -a
TEST_STRATEGY="-${{ matrix.test_strategy }}"
TEST_STRATEGY="-${{ matrix.test_strategy }}-${{ matrix.batch }}"
source .github/scripts/docker_logs.sh
- name: Upload logs
uses: actions/upload-artifact@v3
if: failure()
with:
name: docker-logs-${{ matrix.test_strategy }}
name: docker-logs-${{ matrix.test_strategy }}-${{ matrix.batch }}
path: "docker_logs/*.log"
retention-days: 5
- name: Upload screenshots
uses: actions/upload-artifact@v3
if: failure()
with:
name: cypress-snapshots-${{ matrix.test_strategy }}
name: cypress-snapshots-${{ matrix.test_strategy }}-${{ matrix.batch }}
path: smoke-test/tests/cypress/cypress/screenshots/
- uses: actions/upload-artifact@v3
if: always()
with:
name: Test Results (smoke tests) ${{ matrix.test_strategy }}
name: Test Results (smoke tests) ${{ matrix.test_strategy }} ${{ matrix.batch }}
path: |
**/build/reports/tests/test/**
**/build/test-results/test/**
Expand Down
23 changes: 0 additions & 23 deletions .github/workflows/qodana-scan.yml

This file was deleted.

2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ buildscript {
ext.pegasusVersion = '29.57.0'
ext.mavenVersion = '3.6.3'
ext.versionGradle = '8.11.1'
ext.springVersion = '6.1.13'
ext.springVersion = '6.1.14'
ext.springBootVersion = '3.2.9'
ext.springKafkaVersion = '3.1.6'
ext.openTelemetryVersion = '1.18.0'
Expand Down
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #12077: `Kafka` source no longer ingests schemas from schema registry as separate entities by default, set `ingest_schemas_as_entities` to `true` to ingest them
- OpenAPI Update: PIT Keep Alive parameter added to scroll. NOTE: This parameter requires the `pointInTimeCreationEnabled` feature flag to be enabled and the `elasticSearch.implementation` configuration to be `elasticsearch`. This feature is not supported for OpenSearch at this time and the parameter will not be respected without both of these set.
- OpenAPI Update 2: Previously there was an incorrectly marked parameter named `sort` on the generic list entities endpoint for v3. This parameter is deprecated and only supports a single string value while the documentation indicates it supports a list of strings. This documentation error has been fixed and the correct field, `sortCriteria`, is now documented which supports a list of strings.
- #12223: For dbt Cloud ingestion, the "View in dbt" link will point at the "Explore" page in the dbt Cloud UI. You can revert to the old behavior of linking to the dbt Cloud IDE by setting `external_url_mode: ide".

### Breaking Changes

Expand Down
13 changes: 10 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from datetime import datetime
from json import JSONDecodeError
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Literal, Optional, Tuple
from urllib.parse import urlparse

import dateutil.parser
Expand Down Expand Up @@ -62,6 +62,11 @@ class DBTCloudConfig(DBTCommonConfig):
description="The ID of the run to ingest metadata from. If not specified, we'll default to the latest run.",
)

external_url_mode: Literal["explore", "ide"] = Field(
default="explore",
description='Where should the "View in dbt" link point to - either the "Explore" UI or the dbt Cloud IDE',
)

@root_validator(pre=True)
def set_metadata_endpoint(cls, values: dict) -> dict:
if values.get("access_url") and not values.get("metadata_endpoint"):
Expand Down Expand Up @@ -527,5 +532,7 @@ def _parse_into_dbt_column(
)

def get_external_url(self, node: DBTNode) -> Optional[str]:
# TODO: Once dbt Cloud supports deep linking to specific files, we can use that.
return f"{self.config.access_url}/develop/{self.config.account_id}/projects/{self.config.project_id}"
if self.config.external_url_mode == "explore":
return f"{self.config.access_url}/explore/{self.config.account_id}/projects/{self.config.project_id}/environments/production/details/{node.dbt_name}"
else:
return f"{self.config.access_url}/develop/{self.config.account_id}/projects/{self.config.project_id}"
23 changes: 18 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
SoftDeletedEntitiesCleanupConfig,
SoftDeletedEntitiesReport,
)
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -86,6 +87,7 @@ class DataHubGcSourceReport(
DataProcessCleanupReport,
SoftDeletedEntitiesReport,
DatahubExecutionRequestCleanupReport,
IngestionStageReport,
):
expired_tokens_revoked: int = 0

Expand Down Expand Up @@ -139,31 +141,40 @@ def get_workunits_internal(
) -> Iterable[MetadataWorkUnit]:
if self.config.cleanup_expired_tokens:
try:
self.report.report_ingestion_stage_start("Expired Token Cleanup")
self.revoke_expired_tokens()
except Exception as e:
self.report.failure("While trying to cleanup expired token ", exc=e)
if self.config.truncate_indices:
try:
self.report.report_ingestion_stage_start("Truncate Indices")
self.truncate_indices()
except Exception as e:
self.report.failure("While trying to truncate indices ", exc=e)
if self.config.soft_deleted_entities_cleanup.enabled:
try:
self.report.report_ingestion_stage_start(
"Soft Deleted Entities Cleanup"
)
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
except Exception as e:
self.report.failure(
"While trying to cleanup soft deleted entities ", exc=e
)
if self.config.execution_request_cleanup.enabled:
try:
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
if self.config.dataprocess_cleanup.enabled:
try:
self.report.report_ingestion_stage_start("Data Process Cleanup")
yield from self.dataprocess_cleanup.get_workunits_internal()
except Exception as e:
self.report.failure("While trying to cleanup data process ", exc=e)
if self.config.execution_request_cleanup.enabled:
try:
self.report.report_ingestion_stage_start("Execution request Cleanup")
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
# Otherwise last stage's duration does not get calculated.
self.report.report_ingestion_stage_start("End")
yield from []

def truncate_indices(self) -> None:
Expand Down Expand Up @@ -281,6 +292,8 @@ def revoke_expired_tokens(self) -> None:
list_access_tokens = expired_tokens_res.get("listAccessTokens", {})
tokens = list_access_tokens.get("tokens", [])
total = list_access_tokens.get("total", 0)
if tokens == []:
break
for token in tokens:
self.report.expired_tokens_revoked += 1
token_id = token["id"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging
import time
from typing import Any, Dict, Iterator, Optional
Expand Down Expand Up @@ -42,16 +43,28 @@ class DatahubExecutionRequestCleanupConfig(ConfigModel):
description="Global switch for this cleanup task",
)

runtime_limit_seconds: int = Field(
default=3600,
description="Maximum runtime in seconds for the cleanup task",
)

max_read_errors: int = Field(
default=10,
description="Maximum number of read errors before aborting",
)

def keep_history_max_milliseconds(self):
return self.keep_history_max_days * 24 * 3600 * 1000


class DatahubExecutionRequestCleanupReport(SourceReport):
execution_request_cleanup_records_read: int = 0
execution_request_cleanup_records_preserved: int = 0
execution_request_cleanup_records_deleted: int = 0
execution_request_cleanup_read_errors: int = 0
execution_request_cleanup_delete_errors: int = 0
ergc_records_read: int = 0
ergc_records_preserved: int = 0
ergc_records_deleted: int = 0
ergc_read_errors: int = 0
ergc_delete_errors: int = 0
ergc_start_time: Optional[datetime.datetime] = None
ergc_end_time: Optional[datetime.datetime] = None


class CleanupRecord(BaseModel):
Expand Down Expand Up @@ -124,6 +137,13 @@ def _scroll_execution_requests(
params.update(overrides)

while True:
if self._reached_runtime_limit():
break
if self.report.ergc_read_errors >= self.config.max_read_errors:
self.report.failure(
f"ergc({self.instance_id}): too many read errors, aborting."
)
break
try:
url = f"{self.graph.config.server}/openapi/v2/entity/{DATAHUB_EXECUTION_REQUEST_ENTITY_NAME}"
response = self.graph._session.get(url, headers=headers, params=params)
Expand All @@ -141,7 +161,7 @@ def _scroll_execution_requests(
logger.error(
f"ergc({self.instance_id}): failed to fetch next batch of execution requests: {e}"
)
self.report.execution_request_cleanup_read_errors += 1
self.report.ergc_read_errors += 1

def _scroll_garbage_records(self):
state: Dict[str, Dict] = {}
Expand All @@ -150,7 +170,7 @@ def _scroll_garbage_records(self):
running_guard_timeout = now_ms - 30 * 24 * 3600 * 1000

for entry in self._scroll_execution_requests():
self.report.execution_request_cleanup_records_read += 1
self.report.ergc_records_read += 1
key = entry.ingestion_source

# Always delete corrupted records
Expand All @@ -171,15 +191,15 @@ def _scroll_garbage_records(self):

# Do not delete if number of requests is below minimum
if state[key]["count"] < self.config.keep_history_min_count:
self.report.execution_request_cleanup_records_preserved += 1
self.report.ergc_records_preserved += 1
continue

# Do not delete if number of requests do not exceed allowed maximum,
# or the cutoff date.
if (state[key]["count"] < self.config.keep_history_max_count) and (
entry.requested_at > state[key]["cutoffTimestamp"]
):
self.report.execution_request_cleanup_records_preserved += 1
self.report.ergc_records_preserved += 1
continue

# Do not delete if status is RUNNING or PENDING and created within last month. If the record is >month old and it did not
Expand All @@ -188,7 +208,7 @@ def _scroll_garbage_records(self):
"RUNNING",
"PENDING",
]:
self.report.execution_request_cleanup_records_preserved += 1
self.report.ergc_records_preserved += 1
continue

# Otherwise delete current record
Expand All @@ -200,7 +220,7 @@ def _scroll_garbage_records(self):
f"record timestamp: {entry.requested_at}."
)
)
self.report.execution_request_cleanup_records_deleted += 1
self.report.ergc_records_deleted += 1
yield entry

def _delete_entry(self, entry: CleanupRecord) -> None:
Expand All @@ -210,17 +230,31 @@ def _delete_entry(self, entry: CleanupRecord) -> None:
)
self.graph.delete_entity(entry.urn, True)
except Exception as e:
self.report.execution_request_cleanup_delete_errors += 1
self.report.ergc_delete_errors += 1
logger.error(
f"ergc({self.instance_id}): failed to delete ExecutionRequest {entry.request_id}: {e}"
)

def _reached_runtime_limit(self) -> bool:
if (
self.config.runtime_limit_seconds
and self.report.ergc_start_time
and (
datetime.datetime.now() - self.report.ergc_start_time
>= datetime.timedelta(seconds=self.config.runtime_limit_seconds)
)
):
logger.info(f"ergc({self.instance_id}): max runtime reached.")
return True
return False

def run(self) -> None:
if not self.config.enabled:
logger.info(
f"ergc({self.instance_id}): ExecutionRequest cleaner is disabled."
)
return
self.report.ergc_start_time = datetime.datetime.now()

logger.info(
(
Expand All @@ -232,8 +266,11 @@ def run(self) -> None:
)

for entry in self._scroll_garbage_records():
if self._reached_runtime_limit():
break
self._delete_entry(entry)

self.report.ergc_end_time = datetime.datetime.now()
logger.info(
f"ergc({self.instance_id}): Finished cleanup of ExecutionRequest records."
)
Loading

0 comments on commit e8a054c

Please sign in to comment.