From 38667ab8f5c11d81f5b5f621c0d281daaac1f6f5 Mon Sep 17 00:00:00 2001 From: Andrea Petrucci Date: Tue, 23 Jan 2024 15:03:17 +0100 Subject: [PATCH] Updated code with new package versions, changed condition in workflow (#2) --- .github/workflows/workflow.yml | 8 +- model-serving/requirements-all.txt | 11 +-- model-serving/src/main.py | 113 ++++++++++++++--------------- 3 files changed, 66 insertions(+), 66 deletions(-) diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 903b86b..ed0d2c9 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -1,5 +1,5 @@ # Documentation: https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsuses -name: github_worflow +name: github_workflow run-name: GitHub Workflow env: @@ -47,11 +47,11 @@ env: # Logging level PROD_LOG_LEVEL: ${{ vars.PROD_LOG_LEVEL }} # Kube configuration - PROD_KUBE_CONFIG: ${{ secrets.DEV_KUBE_CONFIG }} + PROD_KUBE_CONFIG: ${{ secrets.PROD_KUBE_CONFIG }} # Allow one concurrent deployment concurrency: - group: github_worflow + group: github_workflow cancel-in-progress: true on: @@ -123,7 +123,7 @@ jobs: release: needs: test runs-on: ubuntu-latest - if: ${{ vars.RUN_CICD == 'true' && success() && (vars.DEPLOY_DEV == 'true' || vars.DEPLOY_PROD == 'true') }} + if: ${{ vars.RUN_CICD == 'true' && success() && (github.ref == 'refs/heads/main' || github.ref == 'refs/heads/prod') && (vars.DEPLOY_DEV == 'true' || vars.DEPLOY_PROD == 'true') }} steps: - name: Clone repository uses: actions/checkout@v3 diff --git a/model-serving/requirements-all.txt b/model-serving/requirements-all.txt index 0f5a00a..7900ce7 100644 --- a/model-serving/requirements-all.txt +++ b/model-serving/requirements-all.txt @@ -19,9 +19,9 @@ contourpy==1.1.0 coverage==7.2.7 cycler==0.11.0 dnspython==2.3.0 -email-validator==1.3.0 +email-validator==2.1.0.post1 exceptiongroup==1.1.1 -fastapi==0.87.0 +fastapi==0.108.0 flake8==5.0.4 flatbuffers==1.12 fonttools==4.40.0 @@ -60,7 +60,8 @@ protobuf==3.19.6 pyasn1==0.5.0 pyasn1-modules==0.3.0 pycodestyle==2.9.1 -pydantic==1.10.9 +pydantic==2.5.3 +pydantic-settings==2.1.0 pyflakes==2.5.0 pyparsing==3.1.0 pytest==7.2.0 @@ -80,7 +81,7 @@ scipy==1.11.0 setuptools==67.6.1 six==1.16.0 sniffio==1.3.0 -starlette==0.21.0 +starlette==0.29.0 tensorboard==2.9.1 tensorboard-data-server==0.6.1 tensorboard-plugin-wit==1.8.1 @@ -90,7 +91,7 @@ tensorflow-io-gcs-filesystem==0.32.0 termcolor==2.3.0 threadpoolctl==3.1.0 tomli==2.0.1 -typing_extensions==4.6.3 +typing_extensions==4.9.0 urllib3==1.26.16 uvicorn==0.19.0 Werkzeug==2.3.6 diff --git a/model-serving/src/main.py b/model-serving/src/main.py index 75bd0d6..3b1fadc 100644 --- a/model-serving/src/main.py +++ b/model-serving/src/main.py @@ -5,9 +5,8 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import RedirectResponse from common_code.config import get_settings -from pydantic import Field from common_code.http_client import HttpClient -from common_code.logger.logger import get_logger +from common_code.logger.logger import get_logger, Logger from common_code.service.controller import router as service_router from common_code.service.service import ServiceService from common_code.storage.service import StorageService @@ -18,6 +17,7 @@ from common_code.service.enums import ServiceStatus from common_code.common.enums import FieldDescriptionType, ExecutionUnitTagName, ExecutionUnitTagAcronym from common_code.common.models import FieldDescription, ExecutionUnitTag +from contextlib import asynccontextmanager # Imports required by the service's model import tensorflow as tf @@ -35,8 +35,8 @@ class MyService(Service): """ # Any additional fields must be excluded for Pydantic to work - model: object = Field(exclude=True) - logger: object = Field(exclude=True) + _model: object + _logger: Logger def __init__(self): super().__init__( @@ -64,8 +64,8 @@ def __init__(self): ], has_ai=True ) - self.model = tf.keras.models.load_model(os.path.join(os.path.dirname(__file__), "..", "ae_model.h5")) - self.logger = get_logger(settings) + self._model = tf.keras.models.load_model(os.path.join(os.path.dirname(__file__), "..", "ae_model.h5")) + self._logger = get_logger(settings) def process(self, data): raw = str(data["text"].data)[2:-1] @@ -73,7 +73,7 @@ def process(self, data): X_test = pd.read_csv(io.StringIO(raw), dtype={"value": np.float64}) # Use the model to reconstruct the original time series data - reconstructed_X = self.model.predict(X_test) + reconstructed_X = self._model.predict(X_test) # Calculate the reconstruction error for each point in the time series reconstruction_error = np.square(X_test - reconstructed_X).mean(axis=1) @@ -97,6 +97,54 @@ def process(self, data): } +service_service: ServiceService | None = None + + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Manual instances because startup events doesn't support Dependency Injection + # https://github.com/tiangolo/fastapi/issues/2057 + # https://github.com/tiangolo/fastapi/issues/425 + + # Global variable + global service_service + + # Startup + logger = get_logger(settings) + http_client = HttpClient() + storage_service = StorageService(logger) + my_service = MyService() + tasks_service = TasksService(logger, settings, http_client, storage_service) + service_service = ServiceService(logger, settings, http_client, tasks_service) + + tasks_service.set_service(my_service) + + # Start the tasks service + tasks_service.start() + + async def announce(): + retries = settings.engine_announce_retries + for engine_url in settings.engine_urls: + announced = False + while not announced and retries > 0: + announced = await service_service.announce_service(my_service, engine_url) + retries -= 1 + if not announced: + time.sleep(settings.engine_announce_retry_delay) + if retries == 0: + logger.warning(f"Aborting service announcement after " + f"{settings.engine_announce_retries} retries") + + # Announce the service to its engine + asyncio.ensure_future(announce()) + + yield + + # Shutdown + for engine_url in settings.engine_urls: + await service_service.graceful_shutdown(my_service, engine_url) + + api_description = """ Anomaly detection of a time series with an autoencoder. """ @@ -106,6 +154,7 @@ def process(self, data): # Define the FastAPI application with information app = FastAPI( + lifespan=lifespan, title="Autoencoder Anomaly Detection", description=api_description, version="1.0.0", @@ -141,53 +190,3 @@ def process(self, data): @app.get("/", include_in_schema=False) async def root(): return RedirectResponse("/docs", status_code=301) - - -service_service: ServiceService | None = None - - -@app.on_event("startup") -async def startup_event(): - # Manual instances because startup events doesn't support Dependency Injection - # https://github.com/tiangolo/fastapi/issues/2057 - # https://github.com/tiangolo/fastapi/issues/425 - - # Global variable - global service_service - - logger = get_logger(settings) - http_client = HttpClient() - storage_service = StorageService(logger) - my_service = MyService() - tasks_service = TasksService(logger, settings, http_client, storage_service) - service_service = ServiceService(logger, settings, http_client, tasks_service) - - tasks_service.set_service(my_service) - - # Start the tasks service - tasks_service.start() - - async def announce(): - retries = settings.engine_announce_retries - for engine_url in settings.engine_urls: - announced = False - while not announced and retries > 0: - announced = await service_service.announce_service(my_service, engine_url) - retries -= 1 - if not announced: - time.sleep(settings.engine_announce_retry_delay) - if retries == 0: - logger.warning(f"Aborting service announcement after " - f"{settings.engine_announce_retries} retries") - - # Announce the service to its engine - asyncio.ensure_future(announce()) - - -@app.on_event("shutdown") -async def shutdown_event(): - # Global variable - global service_service - my_service = MyService() - for engine_url in settings.engine_urls: - await service_service.graceful_shutdown(my_service, engine_url)