Skip to content

Commit

Permalink
RFC: proposal using Postgres LISTEN/NOTIFY
Browse files Browse the repository at this point in the history
  • Loading branch information
jonas-brr committed Apr 4, 2024
1 parent 914bc16 commit ac6a8b4
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 13 deletions.
100 changes: 95 additions & 5 deletions backend/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import threading
from contextlib import asynccontextmanager
from typing import List, Optional

import psycopg2
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import select
from sqlalchemy.orm import Session, subqueryload

from .config import settings
from .database import SessionLocal
from .enum import FilterEnum
from .dynamic_routes import create_dynamic_router
from .models import Schema, AttributeDefinition, AttrType
from .enums import FilterEnum
from .general_routes import router
from .models import Schema, AttributeDefinition, AttrType


def load_schemas(db: Session) -> List[models.Schema]:
Expand Down Expand Up @@ -40,14 +43,101 @@ def load_dynamic_routes(db: Session, app: FastAPI):
create_dynamic_router(schema=schema, app=app)


"""
POSTGRES LISTEN/NOTIFY
SQL script to create triggers
==============================
-- Create a trigger function to handle insert and update events
CREATE OR REPLACE FUNCTION notify_changes()
RETURNS TRIGGER AS $$
BEGIN
-- Notify listening clients about the change
NOTIFY schema_changes;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Create a trigger for insert events
CREATE TRIGGER schema_insert_trigger
AFTER INSERT ON schemas
FOR EACH ROW
EXECUTE FUNCTION notify_changes();
-- Create a trigger for update events
CREATE TRIGGER schema_update_trigger
AFTER UPDATE ON schemas
FOR EACH ROW
EXECUTE FUNCTION notify_changes();
==============================

This comment has been minimized.

Copy link
@der-gabe

der-gabe Apr 5, 2024

Member

So sending the actual notification would be triggered by a custom function and some triggers on the Postgres side, rather than some Python code?

I guess that works, but if we go this route, then I think I'd like to have some code under version control that ensures that that function actually exists...

The way you wrote that SQL script it should be safe to run it on every startup, right?

Also, I think we're missing a third trigger for delete events...

This comment has been minimized.

Copy link
@jonas-brr

jonas-brr Apr 8, 2024

Author Contributor

yes, the triggers trigger the function on the DB side, tho it's there only to send the notify, so an extra listening python code (thread/coroutine) needs to complement this.

Yes I believe with the CREATE OR REPLACE it probably should be fine to run this on each startup. This could be living 100% as an SQL string in some python module (+1 for the VCS)

...ah and yes delete trigger would be nice as well 👍

This comment has been minimized.

Copy link
@der-gabe

der-gabe Apr 8, 2024

Member

[…] an extra listening python code (thread/coroutine) needs to complement this.

Of course, that's what the Watcher class below is for.

In fact, that's the first thing I saw, which is why I kind of expected the notifier code to be in Python, as well.

But I see that it's not necessary and doing it on the DB side is probably even more reliable.

I do wonder whether pgbouncer could cause us any problems, as we rely on that quite a bit... 🤔 I guess the only way to know is to try it out.

This comment has been minimized.

Copy link
@jonas-brr

jonas-brr Apr 8, 2024

Author Contributor

I was thinking about pg bouncer as well. But all the "messaging" stuff is happening inside the database and the client is listening for it using an ordinary connection (ordinary sql statement). So at worst we might solve some connection timeouts 🤷

"""


class Watcher(threading.Thread):

def __init__(self, app_reference):
super().__init__()
self.app_reference = app_reference

def run(self):
conn = psycopg2.connect(
dbname=settings.pg_db,
user=settings.pg_user,
password=settings.pg_password,
host=settings.pg_host,
port=settings.pg_port,
)

cur = conn.cursor()
channel = 'schema_changes'

# Execute the LISTEN query
listen_query = f"LISTEN {channel};"
cur.execute(listen_query)
conn.commit()

print(f"Listening for notifications on channel '{channel}'...")
try:
while True:
# Wait for notifications
conn.poll()
while conn.notifies:
notify = conn.notifies.pop(0)
#
# On schema update/create, refresh the dynamic routes
# and clear the openapi cache -> new one will be regenerated
# during the next [GET /docs]
#
print(f"Received notification: {notify.payload}")
with SessionLocal() as db:
load_dynamic_routes(db=db, app=self.app_reference)

This comment has been minimized.

Copy link
@der-gabe

der-gabe Apr 5, 2024

Member

Wouldn't we have to clear the existing routes first? AFAICS, load_dynamic_routes does not do that...

This comment has been minimized.

Copy link
@jonas-brr

jonas-brr Apr 8, 2024

Author Contributor

Actually it is written to do so, tho there was a nasty little bug hidden which caused that this functionality didn't work at all - fix is at line dynamic_routes.py@362 (ac6a8b4#diff-e91409727dcc8a075733c5cb02191baa880b266ade6c9e9a6466c440a2a128b8R362)

This comment has been minimized.

Copy link
@der-gabe

der-gabe Apr 8, 2024

Member

Ah, right. I didn't realise, as I hadn't re-read thecode fro create_dynamic_router.

And I had overlooked that fix among all the single line changes that only rename enum to enums.

But you're right, of course, it is meant to do clean up the old routes for the entity.

It even has a facility to do so when the entity's name has changed via the old_slug parameter, which load_dynamic_routes does not use, currently, and so netiher does this code.

We'd definitely have to make sure to catch that and special-case entitiy renamings, otherwise the old routes would stick around forever.

But hey, that's what this dicussion is there for, right?

This comment has been minimized.

Copy link
@jonas-brr

jonas-brr Apr 8, 2024

Author Contributor

yes, I didn't deem it necessary to point out. Were this a regular MR I'd probably make a separate MR just for the fix.

Yeah, you're right, thanks for pointing that out - this fix might not be everything and some more rigorous testing/fixing is needed, though the code is there, so I don't see this as a much of a problem.

self.app_reference.openapi_schema = None
finally:
# Close cursor and connection
cur.close()
conn.close()


@asynccontextmanager
async def lifespan(app: FastAPI):
# start the thread before the app startup -> can still be run as a single ASGI app.
watcher_thread = Watcher(app)
watcher_thread.start()
yield


def create_app(session: Optional[Session] = None) -> FastAPI:
app = FastAPI(description=generate_api_description())
app = FastAPI(description=generate_api_description(), lifespan=lifespan)
origins = ['*']
app.add_middleware(CORSMiddleware,
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'])
allow_headers=['*']
)

if session:
load_dynamic_routes(db=session, app=app)
Expand Down
2 changes: 1 addition & 1 deletion backend/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from sqlalchemy.sql.selectable import CompoundSelect

from .config import DEFAULT_PARAMS
from .enum import FilterEnum
from .enums import FilterEnum
from .models import (
AttrType,
Attribute,
Expand Down
7 changes: 4 additions & 3 deletions backend/dynamic_routes.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
from typing import Optional, Union
from dataclasses import make_dataclass

from fastapi import APIRouter, Depends, HTTPException, status, Query, Response
from fastapi.applications import FastAPI
from fastapi.openapi.utils import get_openapi
from fastapi_pagination import Page, Params
from sqlalchemy.exc import DataError
from sqlalchemy.orm.session import Session
Expand All @@ -11,7 +13,7 @@
from .auth.enum import PermissionType
from .auth.models import User
from .database import get_db
from .enum import FilterEnum, ModelVariant
from .enums import FilterEnum, ModelVariant
from .models import AttrType, Schema, Entity
from .schemas.auth import RequirePermission
from .schemas.entity import EntityModelFactory, EntityBaseSchema
Expand Down Expand Up @@ -357,7 +359,7 @@ def create_dynamic_router(schema: Schema, app: FastAPI, old_slug: str = None):
route_update_entity(router=router, schema=schema)
route_delete_entity(router=router, schema=schema)

router_routes = [(r.path, r.methods) for r in router.routes]
router_routes = [(f"/entity{r.path}", r.methods) for r in router.routes]

This comment has been minimized.

Copy link
@der-gabe

der-gabe Apr 8, 2024

Member

This is needed, because app.include_router is called with prefix='/entity below, right?

This comment has been minimized.

Copy link
@jonas-brr

jonas-brr Apr 8, 2024

Author Contributor

yes it is a 'sub' router with a prefix which gets merged into the path once included into the app router

routes_to_remove = []
for route in app.routes:
if (route.path, route.methods) in router_routes:
Expand All @@ -368,4 +370,3 @@ def create_dynamic_router(schema: Schema, app: FastAPI, old_slug: str = None):
app.routes.remove(route)

app.include_router(router, prefix='/entity')
app.openapi_schema = None
File renamed without changes.
2 changes: 1 addition & 1 deletion backend/general_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .config import settings, VERSION
from . import crud, schemas, exceptions
from .database import get_db
from .enum import FilterEnum
from .enums import FilterEnum
from .models import Schema, AttrType
from .dynamic_routes import create_dynamic_router
from .auth import authenticate_user, authenticated_user, authorized_user, create_access_token,\
Expand Down
2 changes: 1 addition & 1 deletion backend/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from .base_models import Value, Mapping
from .database import Base
from .enum import FilterEnum
from .enums import FilterEnum
from .utils import make_aware_datetime


Expand Down
2 changes: 1 addition & 1 deletion backend/schemas/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from pydantic import BaseModel, validator, Field, create_model

from ..enum import ModelVariant
from ..enums import ModelVariant
from ..models import Schema, AttributeDefinition
from .validators import validate_slug

Expand Down
2 changes: 1 addition & 1 deletion backend/traceability/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .. import crud
from ..auth.models import User
from ..config import DEFAULT_PARAMS
from ..enum import ModelVariant
from ..enums import ModelVariant
from ..exceptions import MissingChangeException, MissingEntityCreateRequestException, \
AttributeNotDefinedException, MissingEntityUpdateRequestException, NoOpChangeException, \
MissingEntityDeleteRequestException, MissingChangeRequestException, \
Expand Down

0 comments on commit ac6a8b4

Please sign in to comment.