Skip to content

Commit

Permalink
fix YAML generation
Browse files Browse the repository at this point in the history
  • Loading branch information
pmbrull committed Oct 16, 2024
1 parent f1806bb commit 354444c
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 47 deletions.
81 changes: 53 additions & 28 deletions openmetadata-webserver-cli/webserver/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,27 @@
import logging
from typing import Optional

from metadata.generated.schema.entity.automations.testServiceConnection import (
TestServiceConnectionRequest,
)
from metadata.generated.schema.entity.automations.testServiceConnection import \
TestServiceConnectionRequest
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
AuthProvider,
)
from metadata.generated.schema.entity.services.connections.serviceConnection import (
ServiceConnection,
)
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
TestConnectionResult,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
AuthProvider, OpenMetadataConnection)
from metadata.generated.schema.entity.services.connections.serviceConnection import \
ServiceConnection
from metadata.generated.schema.entity.services.connections.testConnectionResult import \
TestConnectionResult
from metadata.generated.schema.metadataIngestion.workflow import (
LogLevels, OpenMetadataWorkflowConfig, Sink)
from metadata.generated.schema.metadataIngestion.workflow import \
Source as WorkflowSource
from metadata.generated.schema.metadataIngestion.workflow import (
SourceConfig, WorkflowConfig)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import \
OpenMetadataJWTClientConfig
from metadata.ingestion.models.custom_pydantic import CustomSecretStr
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
from metadata.ingestion.source.connections import (get_connection,
get_test_connection_fn)
from metadata.utils.singleton import Singleton
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
OpenMetadataWorkflowConfig,
)

from webserver.models import OMetaServerModel

Expand All @@ -55,7 +52,10 @@ class LocalIngestionServer(metaclass=Singleton):
def __init__(self):
self._metadata: Optional[OpenMetadata] = None
self._service_connection: Optional[ServiceConnection] = None
self._source_config: Optional[WorkflowSource] = None
self._source_config: Optional[SourceConfig] = None
self._service_name: Optional[str] = None
self._service_type: Optional[str] = None
self._logger_level: LogLevels = LogLevels.INFO

@property
def metadata(self) -> OpenMetadata:
Expand All @@ -70,20 +70,27 @@ def service_connection(self) -> ServiceConnection:
return self._service_connection

@property
def source_config(self) -> WorkflowSource:
def source_config(self) -> SourceConfig:
if not self._source_config:
raise MissingStateException("Source config")
return self._source_config

@source_config.setter
def source_config(self, source_config: dict):
self._source_config = WorkflowSource.model_validate(source_config)
def set_source_config(self, raw_source_config: dict):
raw_source_config.pop("name")
raw_source_config.pop("displayName")

if raw_source_config.pop("enableDebugLog"):
self._logger_level = LogLevels.DEBUG

source_config = {"config": raw_source_config}
self._source_config = SourceConfig.model_validate(source_config)

@service_connection.setter
def service_connection(self, service_connection: dict):
def set_service_connection(self, raw_service_connection: dict):
self._service_name = raw_service_connection["name"]
self._service_type = raw_service_connection["serviceType"]

self._service_connection = ServiceConnection.model_validate(
service_connection.get("connection")
raw_service_connection.get("connection")
)

def init_ometa(self, ometa_server: OMetaServerModel):
Expand Down Expand Up @@ -115,3 +122,21 @@ def test_connection(

def build_workflow(self) -> OpenMetadataWorkflowConfig:
"""Build the workflow"""
# TODO: dynamic build from pipelineType
return OpenMetadataWorkflowConfig(
source=WorkflowSource(
type=self._service_type.lower(),
serviceName=self._service_name,
serviceConnection=self.service_connection,
sourceConfig=self.source_config,
),
sink=Sink(
type="metadata-rest",
config={},
),
workflowConfig=WorkflowConfig(
loggerLevel=self._logger_level,
openMetadataServerConfig=self.metadata.config,
),
ingestionPipelineFQN=None,
)
33 changes: 14 additions & 19 deletions openmetadata-webserver-cli/webserver/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@
"""
Local webserver for generating hybrid yamls
"""
from pathlib import Path

from flask import Flask, request, jsonify, send_from_directory, send_file
from flask_cors import CORS
import io
import os
from fastapi.encoders import jsonable_encoder

from metadata.generated.schema.entity.automations.testServiceConnection import (
TestServiceConnectionRequest,
)
import yaml
from fastapi.encoders import jsonable_encoder
from flask import jsonify, request, send_file, send_from_directory
from metadata.generated.schema.entity.automations.testServiceConnection import \
TestServiceConnectionRequest

from webserver import app
from webserver.models import OMetaServerModel
Expand All @@ -30,16 +28,16 @@
@app.route("/sourceConfig", methods=["POST"])
def save_source_config():
"""Route to save the service connection configuration"""
# TODO: Remove name, displayName and enableDebugLog from here to cast properly
LocalIngestionServer().source_config = request.json
# TODO: Send the full Ingestion Pipeline here
LocalIngestionServer().set_source_config(request.json)

print(request.json)
return jsonify(success=True)


@app.route("/serviceConnection", methods=["POST"])
def save_service_connection():
LocalIngestionServer().service_connection = request.json
LocalIngestionServer().set_service_connection(request.json)

print(request.json)
return jsonify(success=True)
Expand Down Expand Up @@ -84,21 +82,18 @@ def _test_connection():
def download_yaml():
try:
# Send the file as an attachment
return send_file("dummy.yaml", as_attachment=True)
# TODO: FIXME
return send_file(io.StringIO("..."), as_attachment=True)
except Exception as e:
return f"Error loading text file: {e}"


@app.route("/api/yaml", methods=["GET"])
def yaml():
def send_yaml():
"""Return an OpenMetadataWorkflowConfig"""
try:
# Open the text file and read its contents
with open("dummy.yaml", "r") as file:
content = file.read()

# Return the text file content
return content
workflow = LocalIngestionServer().build_workflow()
return yaml.dump(yaml.safe_load(workflow.model_dump_json()))

except Exception as e:
return f"Error loading text file: {e}"

0 comments on commit 354444c

Please sign in to comment.