Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to Asphalt v5.0 #388

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,3 @@ $RECYCLE.BIN/
.jupyter_ystore.db
.jupyter_ystore.db-journal
fps_cli_args.toml

# pixi environments
.pixi
37 changes: 36 additions & 1 deletion jupyverse_api/jupyverse_api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Dict
from typing import Any, Dict

from anyio import Event
from pydantic import BaseModel

from .app import App
Expand Down Expand Up @@ -41,3 +42,37 @@ def mount(self, path: str, *args, **kwargs) -> None:

def add_middleware(self, middleware, *args, **kwargs) -> None:
self._app.add_middleware(middleware, *args, **kwargs)


class ResourceLock:
"""ResourceLock ensures that accesses cannot be done concurrently on the same resource.
"""
_locks: Dict[Any, Event]

def __init__(self):
self._locks = {}

def __call__(self, idx: Any):
return _ResourceLock(idx, self._locks)


class _ResourceLock:
_idx: Any
_locks: Dict[Any, Event]
_lock: Event

def __init__(self, idx: Any, locks: Dict[Any, Event]):
self._idx = idx
self._locks = locks

async def __aenter__(self):
while True:
if self._idx in self._locks:
await self._locks[self._idx].wait()
else:
break
self._locks[self._idx] = self._lock = Event()

async def __aexit__(self, exc_type, exc_value, exc_tb):
self._lock.set()
del self._locks[self._idx]
4 changes: 1 addition & 3 deletions jupyverse_api/jupyverse_api/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import List, Tuple

import rich_click as click
from asphalt.core.cli import run
from asphalt.core._cli import run

if sys.version_info < (3, 10):
from importlib_metadata import entry_points
Expand Down Expand Up @@ -66,8 +66,6 @@ def main(
set_list.append(f"component.allow_origin={allow_origin}")
config = get_config(disable)
run.callback(
unsafe=False,
loop=None,
set_=set_list,
service=None,
configfile=[config],
Expand Down
18 changes: 14 additions & 4 deletions jupyverse_api/jupyverse_api/contents/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
import asyncio
from __future__ import annotations

from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Optional, Union

from fastapi import APIRouter, Depends, Request, Response

from jupyverse_api import Router
from jupyverse_api import ResourceLock, Router

from ..app import App
from ..auth import Auth, User
from .models import Checkpoint, Content, SaveContent


class FileIdManager(ABC):
stop_watching_files: asyncio.Event
stopped_watching_files: asyncio.Event
@abstractmethod
async def start(self) -> None:
...

@abstractmethod
async def stop(self) -> None:
...

@abstractmethod
async def get_path(self, file_id: str) -> str:
Expand All @@ -32,9 +38,13 @@ def unwatch(self, path: str, watcher):


class Contents(Router, ABC):
file_lock: ResourceLock

def __init__(self, app: App, auth: Auth):
super().__init__(app=app)

self.file_lock = ResourceLock()

router = APIRouter()

@router.post(
Expand Down
41 changes: 25 additions & 16 deletions jupyverse_api/jupyverse_api/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import webbrowser
from typing import Any, Callable, Dict, Sequence, Tuple

from anyio import Event
from asgiref.typing import ASGI3Application
from asphalt.core import Component, Context
from asphalt.core import Component, add_resource, get_resource, start_service_task
from asphalt.web.fastapi import FastAPIComponent
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
Expand All @@ -22,20 +23,16 @@ def __init__(
super().__init__()
self.mount_path = mount_path

async def start(
self,
ctx: Context,
) -> None:
app = await ctx.request_resource(FastAPI)
async def start(self) -> None:
app = await get_resource(FastAPI, wait=True)

_app = App(app, mount_path=self.mount_path)
ctx.add_resource(_app)
add_resource(_app)


class JupyverseComponent(FastAPIComponent):
def __init__(
self,
components: dict[str, dict[str, Any] | None] | None = None,
*,
app: FastAPI | str | None = None,
host: str = "127.0.0.1",
Expand All @@ -56,7 +53,6 @@ def __init__(
}
middlewares = list(middlewares) + [middleware]
super().__init__(
components, # type: ignore
app=app,
host=host,
port=port,
Expand All @@ -67,22 +63,30 @@ def __init__(
self.port = port
self.open_browser = open_browser
self.query_params = query_params
self.lifespan = Lifespan()

async def start(
self,
ctx: Context,
) -> None:
async def prepare(self) -> None:
query_params = QueryParams(d={})
host = self.host
if not host.startswith("http"):
host = f"http://{host}"
host_url = Host(url=f"{host}:{self.port}/")
ctx.add_resource(query_params)
ctx.add_resource(host_url)
add_resource(query_params)
add_resource(host_url)
add_resource(self.lifespan)

await super().prepare()

await super().start(ctx)
async def start(self) -> None:
await super().start()

# at this point, the server has started
await start_service_task(
self.lifespan.shutdown_request.wait,
"Server lifespan notifier",
teardown_action=self.lifespan.shutdown_request.set,
)

if self.open_browser:
qp = query_params.d
if self.query_params:
Expand All @@ -97,3 +101,8 @@ class QueryParams(BaseModel):

class Host(BaseModel):
url: str


class Lifespan:
def __init__(self):
self.shutdown_request = Event()
6 changes: 4 additions & 2 deletions jupyverse_api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ classifiers = [
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
Expand All @@ -28,8 +29,9 @@ dependencies = [
"pydantic >=2,<3",
"fastapi >=0.95.0,<1",
"rich-click >=1.6.1,<2",
"asphalt >=4.11.0,<5",
"asphalt-web[fastapi] >=1.1.0,<2",
"importlib_metadata >=3.6; python_version<'3.10'",
#"asphalt >=4.11.0,<5",
#"asphalt-web[fastapi] >=1.1.0,<2",
]
dynamic = ["version"]

Expand Down
57 changes: 57 additions & 0 deletions jupyverse_api/tests/test_resource_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import pytest
from anyio import create_task_group, sleep

from jupyverse_api import ResourceLock

pytestmark = pytest.mark.anyio


async def do_op(operation, resource_lock, operations):
op, path = operation
async with resource_lock(path):
operations.append(operation + ["start"])
await sleep(0.1)
operations.append(operation + ["done"])


async def test_resource_lock():
resource_lock = ResourceLock()

# test concurrent accesses to the same resource
idx = "idx"
operations = []
async with create_task_group() as tg:
tg.start_soon(do_op, [0, idx], resource_lock, operations)
await sleep(0.01)
tg.start_soon(do_op, [1, idx], resource_lock, operations)

assert operations == [
[0, idx, "start"],
[0, idx, "done"],
[1, idx, "start"],
[1, idx, "done"],
]

# test concurrent accesses to different files
idx0 = "idx0"
idx1 = "idx1"
operations = []
async with create_task_group() as tg:
tg.start_soon(do_op, [0, idx0], resource_lock, operations)
await sleep(0.01)
tg.start_soon(do_op, [1, idx1], resource_lock, operations)
await sleep(0.01)
tg.start_soon(do_op, [2, idx0], resource_lock, operations)
await sleep(0.01)
tg.start_soon(do_op, [3, idx1], resource_lock, operations)

assert operations == [
[0, idx0, "start"],
[1, idx1, "start"],
[0, idx0, "done"],
[2, idx0, "start"],
[1, idx1, "done"],
[3, idx1, "start"],
[2, idx0, "done"],
[3, idx1, "done"],
]
19 changes: 8 additions & 11 deletions plugins/auth/fps_auth/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from asphalt.core import Component, Context
from asphalt.core import Component, add_resource, get_resource
from fastapi_users.exceptions import UserAlreadyExists

from jupyverse_api.app import App
Expand All @@ -18,17 +18,14 @@ class AuthComponent(Component):
def __init__(self, **kwargs):
self.auth_config = _AuthConfig(**kwargs)

async def start(
self,
ctx: Context,
) -> None:
ctx.add_resource(self.auth_config, types=AuthConfig)
async def start(self) -> None:
add_resource(self.auth_config, types=AuthConfig)

app = await ctx.request_resource(App)
frontend_config = await ctx.request_resource(FrontendConfig)
app = await get_resource(App, wait=True)
frontend_config = await get_resource(FrontendConfig, wait=True)

auth = auth_factory(app, self.auth_config, frontend_config)
ctx.add_resource(auth, types=Auth)
add_resource(auth, types=Auth)

await auth.db.create_db_and_tables()

Expand Down Expand Up @@ -59,8 +56,8 @@ async def start(
)

if self.auth_config.mode == "token":
query_params = await ctx.request_resource(QueryParams)
host = await ctx.request_resource(Host)
query_params = await get_resource(QueryParams, wait=True)
host = await get_resource(Host, wait=True)
query_params.d["token"] = self.auth_config.token

logger.info("")
Expand Down
13 changes: 5 additions & 8 deletions plugins/auth_fief/fps_auth_fief/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from asphalt.core import Component, Context
from asphalt.core import Component, add_resource, get_resource

from jupyverse_api.app import App
from jupyverse_api.auth import Auth, AuthConfig
Expand All @@ -11,13 +11,10 @@ class AuthFiefComponent(Component):
def __init__(self, **kwargs):
self.auth_fief_config = _AuthFiefConfig(**kwargs)

async def start(
self,
ctx: Context,
) -> None:
ctx.add_resource(self.auth_fief_config, types=AuthConfig)
async def start(self) -> None:
add_resource(self.auth_fief_config, types=AuthConfig)

app = await ctx.request_resource(App)
app = await get_resource(App, wait=True)

auth_fief = auth_factory(app, self.auth_fief_config)
ctx.add_resource(auth_fief, types=Auth)
add_resource(auth_fief, types=Auth)
Loading