Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Athena Lineage and Profiller #18302

Open
guilhermehuther opened this issue Oct 16, 2024 · 0 comments
Open

Athena Lineage and Profiller #18302

guilhermehuther opened this issue Oct 16, 2024 · 0 comments

Comments

@guilhermehuther
Copy link

guilhermehuther commented Oct 16, 2024

Affected module
Ingestion Framework

Describe the bug

  • First: the lineage

When i execute my task in airflow for the lineage i don't get any error but i don't get any data in the OM. For seeking the problem i found out that when i print any data from the lines commented bellow i see that a big portion of the SQL don't go to "inspection". Finally i got the idea to comment those lines and miracly it worked, i got my ingestion. I understand that this is not expected behavior (so the issue).

file: /home/airflow/.local/lib/python3.11/site-packages/metadata/ingestion/source/database/athena/lineage.py
image

  • Second and most important: the profilling

In logs i could see that the problems are related to the pydantic models. I suspect that some logic has changed but not in the profilling logic. More specific in this file: /home/airflow/.local/lib/python3.11/site-packages/metadata/ingestion/source/database/database_service.py

Running in debug mode i got this logs:

[2024-10-16 14:13:42,802] GET executed in 0.01s
[2024-10-16 14:13:42,806] OpenMetadata client running with Server version [1.5.6] and Client version [1.5.6.0]
[2024-10-16 14:13:42,808] GET executed in 0.0s
[2024-10-16 14:13:42,845] GET executed in 0.04s
[2024-10-16 14:13:43,084] GET executed in 0.03s
[2024-10-16 14:13:46,059] Test connection results:
[2024-10-16 14:13:46,059] failed=[] success=["'CheckAccess': Pass", "'GetSchemas': Pass", "'GetTables': Pass", "'GetViews': Pass"] warning=[]
[2024-10-16 14:13:46,060] Getting AWS client for service [lakeformation]
[2024-10-16 14:13:46,127] Source type:athena,<class 'metadata.ingestion.source.database.athena.metadata.AthenaSource'> configured
[2024-10-16 14:13:46,127] Source type:athena,<class 'metadata.ingestion.source.database.athena.metadata.AthenaSource'>  prepared
[2024-10-16 14:13:46,733] Sink type:metadata-rest, <class 'metadata.ingestion.sink.metadata_rest.MetadataRestSink'> configured
[2024-10-16 14:13:46,734] Processing node producer='get_services' stages=[NodeStage(type_=<class 'metadata.generated.schema.entity.services.databaseService.DatabaseService'>, processor='yield_create_request_database_service', nullable=False, must_return=True, overwrite=False, consumer=None, context='database_service', store_all_in_context=False, clear_context=False, store_fqn=False, cache_entities=True, use_cache=False)] children=['database'] post_process=['yield_view_lineage', 'yield_procedure_lineage_and_queries', 'yield_external_table_lineage'] threads=False
[2024-10-16 14:13:46,734] Processing stage: type_=<class 'metadata.generated.schema.entity.services.databaseService.DatabaseService'> processor='yield_create_request_database_service' nullable=False must_return=True overwrite=False consumer=None context='database_service' store_all_in_context=False clear_context=False store_fqn=False cache_entities=True use_cache=False
[2024-10-16 14:13:46,764] Source.GET executed in 0.03s
[2024-10-16 14:13:46,796] Source.GET executed in 0.03s
[2024-10-16 14:13:46,797] Processing node producer='get_database_names' stages=[NodeStage(type_=<class 'metadata.ingestion.models.ometa_classification.OMetaTagAndClassification'>, processor='yield_database_tag_details', nullable=True, must_return=False, overwrite=True, consumer=None, context='tags', store_all_in_context=True, clear_context=False, store_fqn=False, cache_entities=False, use_cache=False), NodeStage(type_=<class 'metadata.generated.schema.entity.data.database.Database'>, processor='yield_database', nullable=False, must_return=False, overwrite=True, consumer=['database_service'], context='database', store_all_in_context=False, clear_context=False, store_fqn=False, cache_entities=True, use_cache=True)] children=['databaseSchema'] post_process=None threads=False
[2024-10-16 14:13:46,797] Processing stage: type_=<class 'metadata.ingestion.models.ometa_classification.OMetaTagAndClassification'> processor='yield_database_tag_details' nullable=True must_return=False overwrite=True consumer=None context='tags' store_all_in_context=True clear_context=False store_fqn=False cache_entities=False use_cache=False
[2024-10-16 14:13:46,797] Processing stage: type_=<class 'metadata.generated.schema.entity.data.database.Database'> processor='yield_database' nullable=False must_return=False overwrite=True consumer=['database_service'] context='database' store_all_in_context=False clear_context=False store_fqn=False cache_entities=True use_cache=True
[2024-10-16 14:13:46,797] Source.yield_database executed in 0.0s
[2024-10-16 14:13:46,797] No changes detected for Database 'athena_source.default'
[2024-10-16 14:13:46,797] Source.yield_database executed in 0.0s
[2024-10-16 14:13:46,839] Source.GET executed in 0.04s
[2024-10-16 14:13:46,840] Processing node producer='get_database_schema_names' stages=[NodeStage(type_=<class 'metadata.ingestion.models.ometa_classification.OMetaTagAndClassification'>, processor='yield_database_schema_tag_details', nullable=True, must_return=False, overwrite=True, consumer=None, context='tags', store_all_in_context=True, clear_context=False, store_fqn=False, cache_entities=False, use_cache=False), NodeStage(type_=<class 'metadata.generated.schema.entity.data.databaseSchema.DatabaseSchema'>, processor='yield_database_schema', nullable=False, must_return=False, overwrite=True, consumer=['database_service', 'database'], context='database_schema', store_all_in_context=False, clear_context=False, store_fqn=False, cache_entities=True, use_cache=True)] children=['table', 'stored_procedure'] post_process=['mark_tables_as_deleted', 'mark_stored_procedures_as_deleted'] threads=True
[2024-10-16 14:13:46,976] Processing stage: type_=<class 'metadata.ingestion.models.ometa_classification.OMetaTagAndClassification'> processor='yield_database_schema_tag_details' nullable=True must_return=False overwrite=True consumer=None context='tags' store_all_in_context=True clear_context=False store_fqn=False cache_entities=False use_cache=False
[2024-10-16 14:13:46,976] Processing stage: type_=<class 'metadata.ingestion.models.ometa_classification.OMetaTagAndClassification'> processor='yield_database_schema_tag_details' nullable=True must_return=False overwrite=True consumer=None context='tags' store_all_in_context=True clear_context=False store_fqn=False cache_entities=False use_cache=False
[2024-10-16 14:13:46,976] Processing stage: type_=<class 'metadata.ingestion.models.ometa_classification.OMetaTagAndClassification'> processor='yield_database_schema_tag_details' nullable=True must_return=False overwrite=True consumer=None context='tags' store_all_in_context=True clear_context=False store_fqn=False cache_entities=False use_cache=False
[2024-10-16 14:13:46,977] Processing stage: type_=<class 'metadata.ingestion.models.ometa_classification.OMetaTagAndClassification'> processor='yield_database_schema_tag_details' nullable=True must_return=False overwrite=True consumer=None context='tags' store_all_in_context=True clear_context=False store_fqn=False cache_entities=False use_cache=False
[2024-10-16 14:13:46,977] Object type defined in `def _iter()` /home/airflow/.local/lib/python3.11/site-packages/metadata/ingestion/api/topology_runner.py is not an Either: ['DatabaseServiceProfilerPipeline' object has no attribute 'includeTags']
[2024-10-16 14:13:46,979] Object type defined in `def _iter()` /home/airflow/.local/lib/python3.11/site-packages/metadata/ingestion/api/topology_runner.py is not an Either: ['DatabaseServiceProfilerPipeline' object has no attribute 'includeTags']
[2024-10-16 14:13:46,979] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/metadata/ingestion/api/step.py", line 214, in run
    for result in self._iter():
  File "/home/airflow/.local/lib/python3.11/site-packages/metadata/ingestion/api/topology_runner.py", line 320, in _iter
    yield from self.process_nodes(get_topology_root(self.topology))
  File "/home/airflow/.local/lib/python3.11/site-packages/metadata/ingestion/api/topology_runner.py", line 175, in process_nodes
    yield from self._process_node(node)
  File "/home/airflow/.local/lib/python3.11/site-packages/metadata/ingestion/api/topology_runner.py", line 138, in _process_node
    yield from self.process_nodes(child_nodes)
  File "/home/airflow/.local/lib/python3.11/site-packages/metadata/ingestion/api/topology_runner.py", line 175, in process_nodes
    yield from self._process_node(node)
  File "/home/airflow/.local/lib/python3.11/site-packages/metadata/ingestion/api/topology_runner.py", line 138, in _process_node
    yield from self.process_nodes(child_nodes)
  File "/home/airflow/.local/lib/python3.11/site-packages/metadata/ingestion/api/topology_runner.py", line 173, in process_nodes
    yield from self._multithread_process_node(node, self.context.threads)
  File "/home/airflow/.local/lib/python3.11/site-packages/metadata/ingestion/api/topology_runner.py", line 116, in _multithread_process_node
    future.result()
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/metadata/ingestion/api/topology_runner.py", line 194, in _multithread_process_entity
    for stage_result in self._process_stage(
  File "/home/airflow/.local/lib/python3.11/site-packages/metadata/ingestion/api/topology_runner.py", line 234, in _process_stage
    for entity_request in stage_fn(node_entity) or []:
  File "/home/airflow/.local/lib/python3.11/site-packages/metadata/ingestion/source/database/database_service.py", line 333, in yield_database_schema_tag_details
    if self.source_config.includeTags:
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/pydantic/main.py", line 828, in __getattr__
    raise AttributeError(f'{type(self).__name__!r} object has no attribute {item!r}')
AttributeError: 'DatabaseServiceProfilerPipeline' object has no attribute 'includeTags'

To Reproduce
Run the Athena lineage and profilling ingestion following the docs: https://docs.open-metadata.org/latest/connectors/database/athena/yaml

Expected behavior
Athena Lineage and Profilling showing in the UI

Version:

  • OS:
  • Python version: 3.11.9
  • Aiflow version: apache/airflow:2.9.3-python3.11
  • OpenMetadata version: 1.5.6
  • OpenMetadata Ingestion package version: 1.5.6

Additional context
All of this is running in a AWS EKS. But i don't see if it's relevant.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant