Skip to content

Commit

Permalink
Updated code with new package versions, changed condition in workflow (
Browse files Browse the repository at this point in the history
  • Loading branch information
andrptrc authored Jan 23, 2024
1 parent 289eda6 commit 38667ab
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 66 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Documentation: https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsuses
name: github_worflow
name: github_workflow
run-name: GitHub Workflow

env:
Expand Down Expand Up @@ -47,11 +47,11 @@ env:
# Logging level
PROD_LOG_LEVEL: ${{ vars.PROD_LOG_LEVEL }}
# Kube configuration
PROD_KUBE_CONFIG: ${{ secrets.DEV_KUBE_CONFIG }}
PROD_KUBE_CONFIG: ${{ secrets.PROD_KUBE_CONFIG }}

# Allow one concurrent deployment
concurrency:
group: github_worflow
group: github_workflow
cancel-in-progress: true

on:
Expand Down Expand Up @@ -123,7 +123,7 @@ jobs:
release:
needs: test
runs-on: ubuntu-latest
if: ${{ vars.RUN_CICD == 'true' && success() && (vars.DEPLOY_DEV == 'true' || vars.DEPLOY_PROD == 'true') }}
if: ${{ vars.RUN_CICD == 'true' && success() && (github.ref == 'refs/heads/main' || github.ref == 'refs/heads/prod') && (vars.DEPLOY_DEV == 'true' || vars.DEPLOY_PROD == 'true') }}
steps:
- name: Clone repository
uses: actions/checkout@v3
Expand Down
11 changes: 6 additions & 5 deletions model-serving/requirements-all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ contourpy==1.1.0
coverage==7.2.7
cycler==0.11.0
dnspython==2.3.0
email-validator==1.3.0
email-validator==2.1.0.post1
exceptiongroup==1.1.1
fastapi==0.87.0
fastapi==0.108.0
flake8==5.0.4
flatbuffers==1.12
fonttools==4.40.0
Expand Down Expand Up @@ -60,7 +60,8 @@ protobuf==3.19.6
pyasn1==0.5.0
pyasn1-modules==0.3.0
pycodestyle==2.9.1
pydantic==1.10.9
pydantic==2.5.3
pydantic-settings==2.1.0
pyflakes==2.5.0
pyparsing==3.1.0
pytest==7.2.0
Expand All @@ -80,7 +81,7 @@ scipy==1.11.0
setuptools==67.6.1
six==1.16.0
sniffio==1.3.0
starlette==0.21.0
starlette==0.29.0
tensorboard==2.9.1
tensorboard-data-server==0.6.1
tensorboard-plugin-wit==1.8.1
Expand All @@ -90,7 +91,7 @@ tensorflow-io-gcs-filesystem==0.32.0
termcolor==2.3.0
threadpoolctl==3.1.0
tomli==2.0.1
typing_extensions==4.6.3
typing_extensions==4.9.0
urllib3==1.26.16
uvicorn==0.19.0
Werkzeug==2.3.6
Expand Down
113 changes: 56 additions & 57 deletions model-serving/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
from common_code.config import get_settings
from pydantic import Field
from common_code.http_client import HttpClient
from common_code.logger.logger import get_logger
from common_code.logger.logger import get_logger, Logger
from common_code.service.controller import router as service_router
from common_code.service.service import ServiceService
from common_code.storage.service import StorageService
Expand All @@ -18,6 +17,7 @@
from common_code.service.enums import ServiceStatus
from common_code.common.enums import FieldDescriptionType, ExecutionUnitTagName, ExecutionUnitTagAcronym
from common_code.common.models import FieldDescription, ExecutionUnitTag
from contextlib import asynccontextmanager

# Imports required by the service's model
import tensorflow as tf
Expand All @@ -35,8 +35,8 @@ class MyService(Service):
"""

# Any additional fields must be excluded for Pydantic to work
model: object = Field(exclude=True)
logger: object = Field(exclude=True)
_model: object
_logger: Logger

def __init__(self):
super().__init__(
Expand Down Expand Up @@ -64,16 +64,16 @@ def __init__(self):
],
has_ai=True
)
self.model = tf.keras.models.load_model(os.path.join(os.path.dirname(__file__), "..", "ae_model.h5"))
self.logger = get_logger(settings)
self._model = tf.keras.models.load_model(os.path.join(os.path.dirname(__file__), "..", "ae_model.h5"))
self._logger = get_logger(settings)

def process(self, data):
raw = str(data["text"].data)[2:-1]
raw = raw.replace('\\t', ',').replace('\\n', '\n').replace('\\r', '\n')
X_test = pd.read_csv(io.StringIO(raw), dtype={"value": np.float64})

# Use the model to reconstruct the original time series data
reconstructed_X = self.model.predict(X_test)
reconstructed_X = self._model.predict(X_test)

# Calculate the reconstruction error for each point in the time series
reconstruction_error = np.square(X_test - reconstructed_X).mean(axis=1)
Expand All @@ -97,6 +97,54 @@ def process(self, data):
}


service_service: ServiceService | None = None


@asynccontextmanager
async def lifespan(app: FastAPI):
# Manual instances because startup events doesn't support Dependency Injection
# https://github.com/tiangolo/fastapi/issues/2057
# https://github.com/tiangolo/fastapi/issues/425

# Global variable
global service_service

# Startup
logger = get_logger(settings)
http_client = HttpClient()
storage_service = StorageService(logger)
my_service = MyService()
tasks_service = TasksService(logger, settings, http_client, storage_service)
service_service = ServiceService(logger, settings, http_client, tasks_service)

tasks_service.set_service(my_service)

# Start the tasks service
tasks_service.start()

async def announce():
retries = settings.engine_announce_retries
for engine_url in settings.engine_urls:
announced = False
while not announced and retries > 0:
announced = await service_service.announce_service(my_service, engine_url)
retries -= 1
if not announced:
time.sleep(settings.engine_announce_retry_delay)
if retries == 0:
logger.warning(f"Aborting service announcement after "
f"{settings.engine_announce_retries} retries")

# Announce the service to its engine
asyncio.ensure_future(announce())

yield

# Shutdown
for engine_url in settings.engine_urls:
await service_service.graceful_shutdown(my_service, engine_url)


api_description = """
Anomaly detection of a time series with an autoencoder.
"""
Expand All @@ -106,6 +154,7 @@ def process(self, data):

# Define the FastAPI application with information
app = FastAPI(
lifespan=lifespan,
title="Autoencoder Anomaly Detection",
description=api_description,
version="1.0.0",
Expand Down Expand Up @@ -141,53 +190,3 @@ def process(self, data):
@app.get("/", include_in_schema=False)
async def root():
return RedirectResponse("/docs", status_code=301)


service_service: ServiceService | None = None


@app.on_event("startup")
async def startup_event():
# Manual instances because startup events doesn't support Dependency Injection
# https://github.com/tiangolo/fastapi/issues/2057
# https://github.com/tiangolo/fastapi/issues/425

# Global variable
global service_service

logger = get_logger(settings)
http_client = HttpClient()
storage_service = StorageService(logger)
my_service = MyService()
tasks_service = TasksService(logger, settings, http_client, storage_service)
service_service = ServiceService(logger, settings, http_client, tasks_service)

tasks_service.set_service(my_service)

# Start the tasks service
tasks_service.start()

async def announce():
retries = settings.engine_announce_retries
for engine_url in settings.engine_urls:
announced = False
while not announced and retries > 0:
announced = await service_service.announce_service(my_service, engine_url)
retries -= 1
if not announced:
time.sleep(settings.engine_announce_retry_delay)
if retries == 0:
logger.warning(f"Aborting service announcement after "
f"{settings.engine_announce_retries} retries")

# Announce the service to its engine
asyncio.ensure_future(announce())


@app.on_event("shutdown")
async def shutdown_event():
# Global variable
global service_service
my_service = MyService()
for engine_url in settings.engine_urls:
await service_service.graceful_shutdown(my_service, engine_url)

0 comments on commit 38667ab

Please sign in to comment.