Skip to content

Commit

Permalink
feat/oracle-db-connector-with-refactoring-db-connectors (#606)
Browse files Browse the repository at this point in the history
* initial oracle db connector

* adding pdm lock

* code refactoring for db connectors + oracle db changes

* oracle db tested

* refactofing constant in db_utils

* check bigquery

* tested bigquery

* small refactoring

* tests all db connectors completed

* adding documentation

* adding documentation

* adding documentation

* small doc change

* doc on id

* PR cpmments

* PR cpmments

* Commit pdm.lock changes

* removing onboarding

* removing plugins

* small change

* change v1 changes for db connector to v2

* Commit pdm.lock changes

* PR comment

* Commit pdm.lock changes

* logger error mesage

* PR comments

---------

Co-authored-by: kirtimanmishrazipstack <[email protected]>
Co-authored-by: Hari John Kuriakose <[email protected]>
  • Loading branch information
3 people authored Dec 10, 2024
1 parent 1d8bfcb commit fabc6bd
Show file tree
Hide file tree
Showing 24 changed files with 3,088 additions and 528 deletions.
16 changes: 0 additions & 16 deletions backend/pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 0 additions & 18 deletions backend/workflow_manager/endpoint/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ class TableColumns:

class DBConnectionClass:
SNOWFLAKE = "SnowflakeDB"
BIGQUERY = "BigQuery"
MSSQL = "MSSQL"


class Snowflake:
Expand Down Expand Up @@ -94,22 +92,6 @@ class ApiDeploymentResultStatus:
FAILED = "Failed"


class BigQuery:
"""In big query, table name has to be in the format {db}.{schema}.{table}
Throws error if any of the params not set.
When converted to list table size should be 3
"""

TABLE_NAME_SIZE = 3
COLUMN_TYPES = [
"DATE",
"DATETIME",
"TIME",
"TIMESTAMP",
]


class QueueResultStatus:
SUCCESS = "Success"
FAILED = "Failed"
145 changes: 32 additions & 113 deletions backend/workflow_manager/endpoint/database_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,8 @@
from typing import Any, Optional

from utils.constants import Common
from workflow_manager.endpoint.constants import (
BigQuery,
DBConnectionClass,
TableColumns,
)
from workflow_manager.endpoint.db_connector_helper import DBConnectorQueryHelper
from workflow_manager.endpoint.exceptions import (
BigQueryTableNotFound,
UnstractDBException,
)
from workflow_manager.endpoint.constants import DBConnectionClass, TableColumns
from workflow_manager.endpoint.exceptions import UnstractDBException
from workflow_manager.workflow.enums import AgentName, ColumnModes

from unstract.connectors.databases import connectors as db_connectors
Expand Down Expand Up @@ -69,94 +61,41 @@ def get_sql_values_for_query(
# Default to Other SQL DBs
# TODO: Handle numeric types with no quotes
sql_values[column] = f"{values[column]}"
if column_types.get("id"):
# If table has a column 'id', unstract inserts a unique value to it
# Oracle db has column 'ID' instead of 'id'
if any(key in column_types for key in ["id", "ID"]):
uuid_id = str(uuid.uuid4())
sql_values["id"] = f"{uuid_id}"
return sql_values

@staticmethod
def get_column_types_util(columns_with_types: Any) -> dict[str, str]:
"""Converts db results columns_with_types to dict.
Args:
columns_with_types (Any): _description_
Returns:
dict[str, str]: _description_
"""
column_types: dict[str, str] = {}
for column_name, data_type in columns_with_types:
column_types[column_name] = data_type
return column_types

@staticmethod
def get_column_types(
cls_name: Any,
conn_cls: Any,
table_name: str,
connector_id: str,
connector_settings: dict[str, Any],
) -> Any:
"""Get db column name and types.
"""Function to return connector db column and types by calling
connector table information schema.
Args:
cls (Any): _description_
table_name (str): _description_
connector_id (str): _description_
connector_settings (dict[str, Any]): _description_
conn_cls (Any): DB Connection class
table_name (str): DB table-name
Raises:
ValueError: _description_
e: _description_
UnstractDBException: _description_
Returns:
Any: _description_
Any: db column name and db column types of corresponding table
"""
column_types: dict[str, str] = {}
try:
if cls_name == DBConnectionClass.SNOWFLAKE:
query = f"describe table {table_name}"
results = DatabaseUtils.execute_and_fetch_data(
connector_id=connector_id,
connector_settings=connector_settings,
query=query,
)
for column in results:
column_types[column[0].lower()] = column[1].split("(")[0]
elif cls_name == DBConnectionClass.BIGQUERY:
bigquery_table_name = str.lower(table_name).split(".")
if len(bigquery_table_name) != BigQuery.TABLE_NAME_SIZE:
raise BigQueryTableNotFound()
database = bigquery_table_name[0]
schema = bigquery_table_name[1]
table = bigquery_table_name[2]
query = (
"SELECT column_name, data_type FROM "
f"{database}.{schema}.INFORMATION_SCHEMA.COLUMNS WHERE "
f"table_name = '{table}'"
)
results = DatabaseUtils.execute_and_fetch_data(
connector_id=connector_id,
connector_settings=connector_settings,
query=query,
)
column_types = DatabaseUtils.get_column_types_util(results)
else:
table_name = str.lower(table_name)
query = (
"SELECT column_name, data_type FROM "
"information_schema.columns WHERE "
f"table_name = '{table_name}'"
)
results = DatabaseUtils.execute_and_fetch_data(
connector_id=connector_id,
connector_settings=connector_settings,
query=query,
)
column_types = DatabaseUtils.get_column_types_util(results)
return conn_cls.get_information_schema(table_name=table_name)
except ConnectorError as e:
raise UnstractDBException(detail=e.message) from e
except Exception as e:
logger.error(f"Error getting column types for {table_name}: {str(e)}")
raise e
return column_types
logger.error(
f"Error getting db-column-name and db-column-type "
f"for {table_name}: {str(e)}"
)
raise

@staticmethod
def get_columns_and_values(
Expand Down Expand Up @@ -224,18 +163,15 @@ def get_columns_and_values(

@staticmethod
def get_sql_query_data(
cls_name: str,
connector_id: str,
connector_settings: dict[str, Any],
conn_cls: Any,
table_name: str,
values: dict[str, Any],
) -> dict[str, Any]:
"""Generate SQL columns and values for an insert query based on the
provided values and table schema.
Args:
connector_id: The connector id of the connector provided
connector_settings: Connector settings provided by user
connector_cls: DB connection class
table_name (str): The name of the target table for the insert query.
values (dict[str, Any]): A dictionary containing column-value pairs
for the insert query.
Expand All @@ -251,11 +187,9 @@ def get_sql_query_data(
- For other SQL databases, it uses default SQL generation
based on column types.
"""
cls_name = conn_cls.__class__.__name__
column_types: dict[str, str] = DatabaseUtils.get_column_types(
cls_name=cls_name,
table_name=table_name,
connector_id=connector_id,
connector_settings=connector_settings,
conn_cls=conn_cls, table_name=table_name
)
sql_columns_and_values = DatabaseUtils.get_sql_values_for_query(
values=values,
Expand All @@ -270,7 +204,7 @@ def execute_write_query(
engine: Any,
table_name: str,
sql_keys: list[str],
sql_values: Any,
sql_values: list[str],
) -> None:
"""Execute Insert Query.
Expand All @@ -285,15 +219,9 @@ def execute_write_query(
So we need to use INSERT INTO ... SELECT ... syntax
- sql values can contain data with single quote. It needs to
"""
cls_name = db_class.__class__.__name__
sql = DBConnectorQueryHelper.build_sql_insert_query(
cls_name=cls_name, table_name=table_name, sql_keys=sql_keys
)
logger.debug(f"inserting into table {table_name} with: {sql} query")
sql = db_class.get_sql_insert_query(table_name=table_name, sql_keys=sql_keys)

sql_values = DBConnectorQueryHelper.prepare_sql_values(
cls_name=cls_name, sql_values=sql_values, sql_keys=sql_keys
)
logger.debug(f"inserting into table {table_name} with: {sql} query")
logger.debug(f"sql_values: {sql_values}")

try:
Expand All @@ -316,17 +244,6 @@ def get_db_class(
connector_class: UnstractDB = connector(connector_settings)
return connector_class

@staticmethod
def execute_and_fetch_data(
connector_id: str, connector_settings: dict[str, Any], query: str
) -> Any:
connector = db_connectors[connector_id][Common.METADATA][Common.CONNECTOR]
connector_class: UnstractDB = connector(connector_settings)
try:
return connector_class.execute(query=query)
except ConnectorError as e:
raise UnstractDBException(detail=e.message) from e

@staticmethod
def create_table_if_not_exists(
db_class: UnstractDB,
Expand All @@ -344,12 +261,14 @@ def create_table_if_not_exists(
Raises:
e: _description_
"""
sql = DBConnectorQueryHelper.create_table_query(
conn_cls=db_class, table=table_name, database_entry=database_entry
sql = db_class.create_table_query(
table=table_name, database_entry=database_entry
)
logger.debug(f"creating table {table_name} with: {sql} query")
try:
db_class.execute_query(engine=engine, sql_query=sql, sql_values=None)
db_class.execute_query(
engine=engine, sql_query=sql, sql_values=None, table_name=table_name
)
except UnstractDBConnectorException as e:
raise UnstractDBException(detail=e.detail) from e
logger.debug(f"successfully created table {table_name} with: {sql} query")
77 changes: 0 additions & 77 deletions backend/workflow_manager/endpoint/db_connector_helper.py

This file was deleted.

5 changes: 1 addition & 4 deletions backend/workflow_manager/endpoint/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,8 @@ def insert_into_db(self, input_file_path: str) -> None:
table_name=table_name,
database_entry=values,
)
cls_name = db_class.__class__.__name__
sql_columns_and_values = DatabaseUtils.get_sql_query_data(
cls_name=cls_name,
connector_id=connector_instance.connector_id,
connector_settings=connector_settings,
conn_cls=db_class,
table_name=table_name,
values=values,
)
Expand Down
Loading

0 comments on commit fabc6bd

Please sign in to comment.