Skip to content

Commit

Permalink
metadata-ingestion:lintFix
Browse files Browse the repository at this point in the history
  • Loading branch information
llance committed Jan 16, 2025
1 parent ddc9c00 commit 738c1a7
Showing 1 changed file with 27 additions and 10 deletions.
37 changes: 27 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
make_data_platform_urn,
make_dataset_urn,
make_dataset_urn_with_platform_instance,
make_domain_urn, make_term_urn,
make_domain_urn,
make_term_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
Expand All @@ -37,7 +38,7 @@
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph, DatahubClientConfig
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.ingestion.source.sql.sql_types import resolve_sql_type
from datahub.ingestion.source.sql.sqlalchemy_uri_mapper import (
get_platform_from_sqlalchemy_uri,
Expand All @@ -51,8 +52,12 @@
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.metadata._schema_classes import AuditStampClass, GlossaryTermsClass, GlossaryTermAssociationClass, \
GlossaryTermInfoClass
from datahub.metadata._schema_classes import (
AuditStampClass,
GlossaryTermAssociationClass,
GlossaryTermInfoClass,
GlossaryTermsClass,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
ChangeAuditStamps,
Expand Down Expand Up @@ -589,7 +594,11 @@ def gen_dataset_urn(self, datahub_dataset_name: str) -> str:

def check_if_term_exists(self, term_urn):
graph = DataHubGraph(

Check warning on line 596 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L596

Added line #L596 was not covered by tests
DatahubClientConfig(server=self.sink_config.get("server", ""), token=self.sink_config.get("token", "")))
DatahubClientConfig(
server=self.sink_config.get("server", ""),
token=self.sink_config.get("token", ""),
)
)
# Query multiple aspects from entity
result = graph.get_entity_semityped(

Check warning on line 603 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L603

Added line #L603 was not covered by tests
entity_urn=term_urn,
Expand All @@ -600,7 +609,9 @@ def check_if_term_exists(self, term_urn):
return True
return False

Check warning on line 610 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L608-L610

Added lines #L608 - L610 were not covered by tests

def parse_glossary_terms_from_metrics(self, metrics, last_modified) -> GlossaryTermsClass:
def parse_glossary_terms_from_metrics(
self, metrics, last_modified
) -> GlossaryTermsClass:
glossary_term_urns = []
for metric in metrics:
expression = metric.get("expression", "")
Expand All @@ -626,8 +637,10 @@ def parse_glossary_terms_from_metrics(self, metrics, last_modified) -> GlossaryT
)

# Create rest emitter
rest_emitter = DatahubRestEmitter(gms_server=self.sink_config.get("server", ""),
token=self.sink_config.get("token", ""))
rest_emitter = DatahubRestEmitter(

Check warning on line 640 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L640

Added line #L640 was not covered by tests
gms_server=self.sink_config.get("server", ""),
token=self.sink_config.get("token", ""),
)
rest_emitter.emit(event)
logger.info(f"Created Glossary term {term_urn}")
glossary_term_urns.append(GlossaryTermAssociationClass(urn=term_urn))

Check warning on line 646 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L644-L646

Added lines #L644 - L646 were not covered by tests
Expand All @@ -649,7 +662,9 @@ def construct_dataset_from_dataset_data(
) -> DatasetSnapshot:
dataset_response = self.get_dataset_info(dataset_data.get("id"))
dataset = SupersetDataset(**dataset_response["result"])
datasource_urn = self.get_datasource_urn_from_id(dataset_response, self.platform)
datasource_urn = self.get_datasource_urn_from_id(
dataset_response, self.platform
)
now = datetime.now().strftime("%I:%M%p on %B %d, %Y")
modified_ts = int(
dp.parse(dataset_data.get("changed_on") or now).timestamp() * 1000
Expand Down Expand Up @@ -679,7 +694,9 @@ def construct_dataset_from_dataset_data(
response_result = dataset_response.get("result", {})

if self._is_certified_metric(response_result):
glossary_terms = self.parse_glossary_terms_from_metrics(response_result.get("metrics", {}), last_modified)
glossary_terms = self.parse_glossary_terms_from_metrics(

Check warning on line 697 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L697

Added line #L697 was not covered by tests
response_result.get("metrics", {}), last_modified
)
aspects_items.append(glossary_terms)

Check warning on line 700 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L700

Added line #L700 was not covered by tests

dataset_snapshot = DatasetSnapshot(
Expand Down

0 comments on commit 738c1a7

Please sign in to comment.