Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INSPIRATION ONLY] - pseudo-implement asynchronous request-reply design [DO NOT MERGE] #1490

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,14 @@ jobs:
env:
SYNAPSE_ACCESS_TOKEN: ${{ secrets.SYNAPSE_ACCESS_TOKEN }}
SERVICE_ACCOUNT_CREDS: ${{ secrets.SERVICE_ACCOUNT_CREDS }}
# run: >
# source .venv/bin/activate;
# pytest --durations=0 --cov-report=term --cov-report=html:htmlcov --cov-report=xml:coverage.xml --cov=schematic/
# -m "table_operations" -n auto --log-level=DEBUG -v
run: >
source .venv/bin/activate;
pytest --durations=0 --cov-report=term --cov-report=html:htmlcov --cov-report=xml:coverage.xml --cov=schematic/
-m "not (rule_benchmark or table_operations)" --reruns 2 -n auto
-m "table_operations" --reruns 2


- name: Upload pytest test results
Expand Down
76 changes: 53 additions & 23 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,37 +1,67 @@
version: "3.9"
services:
# run schematic APIs in a docker container without uWSGI

schematic:
build:
dockerfile: Dockerfile
container_name: schematic
entrypoint: python /usr/src/app/run_api.py
build:
context: ../schematic
dockerfile: schematic_api/Dockerfile
container_name: schematic # Name of the container
ports:
- "3001:3001"
- "3001:3001" # Port mapping
volumes:
- .:/schematic
working_dir: /schematic
- .:/schematic # Volume mapping (bind mount)
working_dir: /schematic # Working directory inside the container
environment:
APP_HOST: "0.0.0.0"
APP_PORT: "3001"
SCHEMATIC_CONFIG: /schematic/config.yml
SCHEMATIC_CONFIG_CONTENT: "${SCHEMATIC_CONFIG_CONTENT}"
GE_HOME: /usr/src/app/great_expectations/
# run schematic APIs in a docker container with uWSGI and nginx
schematic-aws:
SCHEMATIC_CONFIG_CONTENT: ${SCHEMATIC_CONFIG_CONTENT}
SERVICE_ACCOUNT_CREDS: ${SERVICE_ACCOUNT_CREDS}
CELERY_BROKER_URL: redis://redis:6379/0 # URL for Celery to connect to Redis
command: python run_api.py # Command to run inside the container
# restart: unless-stopped # Optional: Automatically restart the container unless it's stopped manually
networks:
- schematic
# depends_on:
# - redis
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:3001/v1/ui || exit 1"]
interval: 10s
retries: 5
start_period: 30s
timeout: 10s

redis:
image: redis:latest # Redis image
container_name: redis # Name of the Redis container
ports:
- "6379:6379" # Expose Redis port
# restart: unless-stopped
networks:
- schematic

celery:
build:
context: ../schematic
dockerfile: schematic_api/Dockerfile
container_name: schematic-api-aws
image: sagebionetworks/schematic-aws-api:latest
restart: always
env_file:
- .env
container_name: celery # Name of the container
environment:
GE_HOME: /usr/src/app/great_expectations/
SCHEMATIC_CONFIG_CONTENT: ${SCHEMATIC_CONFIG_CONTENT}
SERVICE_ACCOUNT_CREDS: ${SERVICE_ACCOUNT_CREDS}
CELERY_BROKER_URL: redis://redis:6379/0 # URL for Celery to connect to Redis
command: celery -A schematic_api.api.celery worker --loglevel=info # Command to run Celery worker
depends_on:
redis: # Ensure Redis starts before Celery
condition: service_started
schematic:
condition: service_healthy

# restart: unless-stopped
networks:
- schematic
ports:
- "${USE_LISTEN_PORT}:80"
- "443:443"

networks:
schematic:
name: schematic
name: schematic



9 changes: 8 additions & 1 deletion env.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,11 @@ SERVER_PROTOCOL=http://
SERVER_DOMAIN=localhost
# port on the host machine
USE_LISTEN_PORT=81
SERVICE_ACCOUNT_CREDS='Provide service account creds'
SERVICE_ACCOUNT_CREDS='Provide service account creds'

# Integration testing variables (Optional)
# TRACING_EXPORT_FORMAT=otlp
# LOGGING_EXPORT_FORMAT=otlp
# TRACING_SERVICE_NAME=unique-name-testing
# LOGGING_SERVICE_NAME=unique-name-testing
# LOGGING_INSTANCE_NAME=unique-name-testing
2,065 changes: 1,206 additions & 859 deletions poetry.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,18 @@ pydantic = "^1.10.4"
connexion = {extras = ["swagger-ui"], version = "^2.8.0", optional = true}
Flask = {version = "2.1.3", optional = true}
Flask-Cors = {version = "^3.0.10", optional = true}
uWSGI = {version = "^2.0.21", optional = true}
# uWSGI = {version = "2.0.23", optional = true}
Jinja2 = {version = ">2.11.3", optional = true}
asyncio = "^3.4.3"
pytest-asyncio = "^0.23.7"
jaeger-client = {version = "^4.8.0", optional = true}
flask-opentracing = {version="^2.0.0", optional = true}
opentelemetry-exporter-otlp-proto-grpc = {version="^1.0.0", optional = true}
celery = {extras = ["redis"], version = "^5.4.0"}

[tool.poetry.extras]
api = ["connexion", "Flask", "Flask-Cors", "Jinja2", "pyopenssl", "jaeger-client", "flask-opentracing"]
aws = ["uWSGI"]
api = ["connexion", "Flask", "Flask-Cors", "Jinja2", "pyopenssl", "jaeger-client", "flask-opentracing", "opentelemetry-exporter-otlp-proto-grpc"]
# aws = ["uWSGI"]


[tool.poetry.group.dev.dependencies]
Expand Down
4 changes: 3 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
[pytest]
python_files = test_*.py
asyncio_mode = auto
asyncio_mode = auto
log_cli = True
log_cli_level = INFO
3 changes: 2 additions & 1 deletion run_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
port = int(port)

# Launch app
app = create_app()
app,_ = create_app()

#TO DO: add a flag --debug to control debug parameter
app.run(host=host, port=port, debug=False)
5 changes: 5 additions & 0 deletions schematic/models/validate_attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
parse_str_series_to_list,
rule_in_rule_list,
)
from opentelemetry import trace

logger = logging.getLogger(__name__)

tracer = trace.get_tracer("Schematic")

MessageLevelType = Literal["warning", "error"]
ScopeTypes = Literal["set", "value"]

Expand Down Expand Up @@ -1769,6 +1772,7 @@ def _get_rule_scope(self, val_rule: str) -> ScopeTypes:
scope = val_rule.lower().split(" ")[2]
return scope

@tracer.start_as_current_span("ValidateAttribute::_run_validation_across_target_manifests")
def _run_validation_across_target_manifests(
self,
project_scope: Optional[list[str]],
Expand Down Expand Up @@ -1933,6 +1937,7 @@ def _run_validation_across_target_manifests(
validation_store = (missing_values, duplicated_values, repeat_values)
return (start_time, validation_store)

@tracer.start_as_current_span("ValidateAttribute::cross_validation")
def cross_validation(
self,
val_rule: str,
Expand Down
31 changes: 15 additions & 16 deletions schematic/store/synapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ def getPaginatedRestResults(self, currentUserId: str) -> Dict[str, str]:

return all_results

@tracer.start_as_current_span("SynapseStorage::getStorageProjects")
def getStorageProjects(self, project_scope: List = None) -> list[tuple[str, str]]:
"""Gets all storage projects the current user has access to, within the scope of the 'storageFileview' attribute.

Expand Down Expand Up @@ -1748,8 +1749,10 @@ def _add_id_columns_to_manifest(

def _generate_table_name(self, manifest):
"""Helper function to generate a table name for upload to synapse.

Args:
Manifest loaded as a pd.Dataframe

Returns:
table_name (str): Name of the table to load
component_name (str): Name of the manifest component (if applicable)
Expand Down Expand Up @@ -2197,9 +2200,9 @@ def associateMetadataWithFiles(
# Upload manifest to synapse based on user input (manifest_record_type)
if manifest_record_type == "file_only":
manifest_synapse_file_id = self.upload_manifest_as_csv(
dmge,
manifest,
metadataManifestPath,
dmge=dmge,
manifest=manifest,
metadataManifestPath=metadataManifestPath,
datasetId=datasetId,
restrict=restrict_manifest,
hideBlanks=hideBlanks,
Expand All @@ -2210,9 +2213,9 @@ def associateMetadataWithFiles(
)
elif manifest_record_type == "table_and_file":
manifest_synapse_file_id = self.upload_manifest_as_table(
dmge,
manifest,
metadataManifestPath,
dmge=dmge,
manifest=manifest,
metadataManifestPath=metadataManifestPath,
datasetId=datasetId,
table_name=table_name,
component_name=component_name,
Expand All @@ -2226,9 +2229,9 @@ def associateMetadataWithFiles(
)
elif manifest_record_type == "file_and_entities":
manifest_synapse_file_id = self.upload_manifest_as_csv(
dmge,
manifest,
metadataManifestPath,
dmge=dmge,
manifest=manifest,
metadataManifestPath=metadataManifestPath,
datasetId=datasetId,
restrict=restrict_manifest,
hideBlanks=hideBlanks,
Expand All @@ -2239,9 +2242,9 @@ def associateMetadataWithFiles(
)
elif manifest_record_type == "table_file_and_entities":
manifest_synapse_file_id = self.upload_manifest_combo(
dmge,
manifest,
metadataManifestPath,
dmge=dmge,
manifest=manifest,
metadataManifestPath=metadataManifestPath,
datasetId=datasetId,
table_name=table_name,
component_name=component_name,
Expand Down Expand Up @@ -2453,7 +2456,6 @@ def getDatasetProject(self, datasetId: str) -> str:

# re-query if no datasets found
if dataset_row.empty:
sleep(5)
self.query_fileview()
# Subset main file view
dataset_index = self.storageFileviewTable["id"] == datasetId
Expand Down Expand Up @@ -2705,9 +2707,6 @@ def replaceTable(
current_table.addColumn(col)
self.synStore.syn.store(current_table, isRestricted=self.restrict)

# wait for synapse store to finish
sleep(1)

# build schema and table from columns and store with necessary restrictions
schema = Schema(
name=self.tableName, columns=cols, parent=datasetParentProject
Expand Down
42 changes: 38 additions & 4 deletions schematic_api/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import connexion
from typing import Tuple

from celery import Celery
import traceback
from synapseclient.core.exceptions import (
SynapseAuthenticationError,
)
from schematic.exceptions import AccessCredentialsError


from schematic import CONFIG
from jaeger_client import Config
from flask_opentracing import FlaskTracer
Expand All @@ -32,15 +34,27 @@ def create_app():

# get the underlying Flask app instance
app = connexionapp.app

app.logger.error("service: " + app.name)
# path to config.yml file saved as a Flask config variable
default_config = os.path.abspath(os.path.join(__file__, "../../../config.yml"))
schematic_config = os.environ.get("SCHEMATIC_CONFIG", default_config)
schematic_config_content = os.environ.get("SCHEMATIC_CONFIG_CONTENT")

app.config["SCHEMATIC_CONFIG"] = schematic_config
app.config["SCHEMATIC_CONFIG_CONTENT"] = schematic_config_content

app.config['CELERY_BROKER_URL'] = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
app.config['CELERY_RESULT_BACKEND'] = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
# TODO need to attach celery as an extension to the flask app...
celery = Celery(
app.name,
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(
broker_url=app.config['CELERY_BROKER_URL'],
result_backend=app.config['CELERY_BROKER_URL']
)
celery.autodiscover_tasks(['schematic_api.api.tasks'])

# handle exceptions in schematic when an exception gets raised
@app.errorhandler(Exception)
def handle_exception(e: Exception) -> Tuple[str, int]:
Expand All @@ -66,10 +80,30 @@ def handle_synapse_access_error(e: Exception) -> Tuple[str, int]:
"""handle synapse access error"""
return str(e), 403

return app
return app, celery


app, celery = create_app()

# def make_celery(app):
# """
# Create a new Celery object and tie the Celery config to the app's config.
# Wrap all tasks in the context of the Flask application.
# """

# class ContextTask(celery.Task):
# """
# Make Celery tasks work with Flask app context.
# """
# abstract = True

# def __call__(self, *args, **kwargs):
# with app.app_context():
# return super(ContextTask, self).__call__(*args, **kwargs)

app = create_app()
# celery.Task = ContextTask
# return celery
# celery = make_celery(app)

flask_tracer = FlaskTracer(
jaeger_tracer, True, app, ["url", "url_rule", "environ.HTTP_X_REAL_IP", "path"]
Expand Down
7 changes: 7 additions & 0 deletions schematic_api/api/__main__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import os
from schematic_api.api import app
from celery import Celery


def main():
# Get app configuration
host = os.environ.get("APP_HOST", "0.0.0.0")
port = os.environ.get("APP_PORT", "3001")
port = int(port)
# TODO: use env variable instead later
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

# Launch app
app.run(host=host, port=port, debug=False)
Expand Down
Loading