diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 451d7276..1767e98d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -59,8 +59,12 @@ jobs: python3 -m pip install --upgrade pip python3 -m pip install hatch - - name: Create jupyterlab-auth dev environment - run: hatch env create dev.jupyterlab-auth + - name: Create jupyterlab-auth and jupyterlab-noauth dev environments + run: | + hatch env create dev.jupyterlab-auth + hatch env create dev.jupyterlab-noauth - name: Run tests - run: hatch run dev.jupyterlab-auth:test + run: | + hatch run dev.jupyterlab-noauth:pytest plugins/yjs/tests -v --color=yes + hatch run dev.jupyterlab-auth:test diff --git a/jupyverse_api/jupyverse_api/contents/__init__.py b/jupyverse_api/jupyverse_api/contents/__init__.py index 4296e29b..27a03b0e 100644 --- a/jupyverse_api/jupyverse_api/contents/__init__.py +++ b/jupyverse_api/jupyverse_api/contents/__init__.py @@ -1,20 +1,21 @@ import asyncio from abc import ABC, abstractmethod from pathlib import Path -from typing import Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Union -from fastapi import APIRouter, Depends, Request, Response +from fastapi import APIRouter, Depends, Request from jupyverse_api import Router from ..app import App from ..auth import Auth, User -from .models import Checkpoint, Content, SaveContent +from .models import Checkpoint, Content, CopyContent, CreateContent, RenameContent, SaveContent class FileIdManager(ABC): stop_watching_files: asyncio.Event stopped_watching_files: asyncio.Event + Change: Any @abstractmethod async def get_path(self, file_id: str) -> str: @@ -31,82 +32,7 @@ def unwatch(self, path: str, watcher): ... -class Contents(Router, ABC): - def __init__(self, app: App, auth: Auth): - super().__init__(app=app) - - router = APIRouter() - - @router.post( - "/api/contents/{path:path}/checkpoints", - status_code=201, - ) - async def create_checkpoint( - path, user: User = Depends(auth.current_user(permissions={"contents": ["write"]})) - ) -> Checkpoint: - return await self.create_checkpoint(path, user) - - @router.post( - "/api/contents{path:path}", - status_code=201, - ) - async def create_content( - path: Optional[str], - request: Request, - user: User = Depends(auth.current_user(permissions={"contents": ["write"]})), - ) -> Content: - return await self.create_content(path, request, user) - - @router.get("/api/contents") - async def get_root_content( - content: int, - user: User = Depends(auth.current_user(permissions={"contents": ["read"]})), - ) -> Content: - return await self.get_root_content(content, user) - - @router.get("/api/contents/{path:path}/checkpoints") - async def get_checkpoint( - path, user: User = Depends(auth.current_user(permissions={"contents": ["read"]})) - ) -> List[Checkpoint]: - return await self.get_checkpoint(path, user) - - @router.get("/api/contents/{path:path}") - async def get_content( - path: str, - content: int = 0, - user: User = Depends(auth.current_user(permissions={"contents": ["read"]})), - ) -> Content: - return await self.get_content(path, content, user) - - @router.put("/api/contents/{path:path}") - async def save_content( - path, - request: Request, - response: Response, - user: User = Depends(auth.current_user(permissions={"contents": ["write"]})), - ) -> Content: - return await self.save_content(path, request, response, user) - - @router.delete( - "/api/contents/{path:path}", - status_code=204, - ) - async def delete_content( - path, - user: User = Depends(auth.current_user(permissions={"contents": ["write"]})), - ): - return await self.delete_content(path, user) - - @router.patch("/api/contents/{path:path}") - async def rename_content( - path, - request: Request, - user: User = Depends(auth.current_user(permissions={"contents": ["write"]})), - ) -> Content: - return await self.rename_content(path, request, user) - - self.include_router(router) - +class Contents(ABC): @property @abstractmethod def file_id_manager(self) -> FileIdManager: @@ -134,11 +60,25 @@ async def create_checkpoint( async def create_content( self, path: Optional[str], - request: Request, + create_content: Union[CreateContent, CopyContent], user: User, ) -> Content: ... + @abstractmethod + async def create_file( + self, + path: str, + ) -> None: + ... + + @abstractmethod + async def create_directory( + self, + path: str, + ) -> None: + ... + @abstractmethod async def get_root_content( self, @@ -168,8 +108,7 @@ async def get_content( async def save_content( self, path, - request: Request, - response: Response, + content: SaveContent, user: User, ) -> Content: ... @@ -186,7 +125,93 @@ async def delete_content( async def rename_content( self, path, - request: Request, + rename_content: RenameContent, user: User, ) -> Content: ... + + +class HTTPContents(Router, ABC): + contents: Contents + + def __init__(self, app: App, auth: Auth): + super().__init__(app=app) + + router = APIRouter() + + @router.post( + "/api/contents/{path:path}/checkpoints", + status_code=201, + ) + async def create_checkpoint( + path, user: User = Depends(auth.current_user(permissions={"contents": ["write"]})) + ) -> Checkpoint: + return await self.contents.create_checkpoint(path, user) + + @router.post( + "/api/contents{path:path}", + status_code=201, + ) + async def create_content( + path: Optional[str], + request: Request, + user: User = Depends(auth.current_user(permissions={"contents": ["write"]})), + ) -> Content: + r = await request.json() + create_content: Union[CreateContent, CopyContent] + try: + create_content = CreateContent(**r) + except Exception: + create_content = CopyContent(**r) + return await self.contents.create_content(path, create_content, user) + + @router.get("/api/contents") + async def get_root_content( + content: int, + user: User = Depends(auth.current_user(permissions={"contents": ["read"]})), + ) -> Content: + return await self.contents.get_root_content(content, user) + + @router.get("/api/contents/{path:path}/checkpoints") + async def get_checkpoint( + path, user: User = Depends(auth.current_user(permissions={"contents": ["read"]})) + ) -> List[Checkpoint]: + return await self.contents.get_checkpoint(path, user) + + @router.get("/api/contents/{path:path}") + async def get_content( + path: str, + content: int = 0, + user: User = Depends(auth.current_user(permissions={"contents": ["read"]})), + ) -> Content: + return await self.contents.get_content(path, content, user) + + @router.put("/api/contents/{path:path}") + async def save_content( + path, + request: Request, + user: User = Depends(auth.current_user(permissions={"contents": ["write"]})), + ) -> Content: + content = SaveContent(**(await request.json())) + return await self.contents.save_content(path, content, user) + + @router.delete( + "/api/contents/{path:path}", + status_code=204, + ) + async def delete_content( + path, + user: User = Depends(auth.current_user(permissions={"contents": ["write"]})), + ): + return await self.contents.delete_content(path, user) + + @router.patch("/api/contents/{path:path}") + async def rename_content( + path, + request: Request, + user: User = Depends(auth.current_user(permissions={"contents": ["write"]})), + ) -> Content: + rename_content = RenameContent(**(await request.json())) + return await self.contents.rename_content(path, rename_content, user) + + self.include_router(router) diff --git a/jupyverse_api/jupyverse_api/contents/models.py b/jupyverse_api/jupyverse_api/contents/models.py index ab4b3161..1197dcb4 100644 --- a/jupyverse_api/jupyverse_api/contents/models.py +++ b/jupyverse_api/jupyverse_api/contents/models.py @@ -27,6 +27,10 @@ class CreateContent(BaseModel): type: str +class CopyContent(BaseModel): + copy_from: str + + class SaveContent(BaseModel): content: Optional[Union[str, Dict]] = None format: str diff --git a/plugins/contents/fps_contents/contents.py b/plugins/contents/fps_contents/contents.py new file mode 100644 index 00000000..ec9475fb --- /dev/null +++ b/plugins/contents/fps_contents/contents.py @@ -0,0 +1,339 @@ +import asyncio +import base64 +import json +import os +import shutil +from datetime import datetime +from http import HTTPStatus +from pathlib import Path +from typing import Dict, List, Optional, Union, cast + +import anyio +from fastapi import HTTPException, Response + +from jupyverse_api.auth import User +from jupyverse_api.contents import Contents +from jupyverse_api.contents.models import ( + Checkpoint, + Content, + CopyContent, + CreateContent, + RenameContent, + SaveContent, +) + +from .fileid import FileIdManager + + +class _Contents(Contents): + def __init__(self, db_path: str = ".fileid.db", root_dir: str = "."): + self.db_path = anyio.Path(db_path) + self.root_dir = anyio.Path(root_dir) + self._root_dir_task = asyncio.create_task(self._resolve_root_dir()) + + async def _resolve_root_dir(self): + self.root_dir = await self.root_dir.resolve() + + async def create_checkpoint( + self, + path, + user: User, + ): + await self._root_dir_task + src_path = self.root_dir / path + dst_path = self.root_dir / ".ipynb_checkpoints" / \ + f"{src_path.stem}-checkpoint{src_path.suffix}" + try: + await dst_path.parent.mkdir(exist_ok=True) + await anyio.to_thread.run_sync(shutil.copyfile, src_path, dst_path) + except Exception: + # FIXME: return error code? + return [] + mtime = await get_file_modification_time(dst_path) + return Checkpoint(**{"id": "checkpoint", "last_modified": mtime}) + + async def create_file( + self, + path: str, + ) -> None: + await self._root_dir_task + content_path = self.root_dir / path + await content_path.write_text("") + + async def create_directory( + self, + path: str, + ) -> None: + await self._root_dir_task + content_path = self.root_dir / path + await content_path.mkdir(parents=True, exist_ok=True) + + async def create_content( + self, + path: Optional[str], + create_content: Union[CreateContent, CopyContent], + user: User, + ): + await self._root_dir_task + if isinstance(create_content, CopyContent): + copy_from = self.root_dir / create_content.copy_from + assert path is not None + path_ = Path(path) + if path_.is_absolute(): + path_ = path_.relative_to(path_.root) + copy_to = self.root_dir / path_ / copy_from.name + available_path = await get_available_path(copy_to) + await anyio.to_thread.run_sync(shutil.copyfile, copy_from, available_path) + return await self.read_content(str(available_path.relative_to(self.root_dir)), False) + + assert create_content.path is not None + content_path = self.root_dir / create_content.path + if create_content.type == "notebook": + available_path = await get_available_path(content_path / "Untitled.ipynb") + await available_path.write_text( + json.dumps({"cells": [], "metadata": {}, "nbformat": 4, "nbformat_minor": 5}) + ) + src_path = available_path + dst_path = self.root_dir / ".ipynb_checkpoints" / \ + f"{src_path.stem}-checkpoint{src_path.suffix}" + try: + await dst_path.parent.mkdir(exist_ok=True) + await anyio.to_thread.run_sync(shutil.copyfile, src_path, dst_path) + except Exception: + # FIXME: return error code? + pass + elif create_content.type == "directory": + name = "Untitled Folder" + available_path = await get_available_path(content_path / name, sep=" ") + await available_path.mkdir(parents=True, exist_ok=True) + else: + assert create_content.ext is not None + available_path = await get_available_path( + content_path / ("untitled" + create_content.ext) + ) + await available_path.write_text("") + + return await self.read_content(str(available_path.relative_to(self.root_dir)), False) + + async def get_root_content( + self, + content: int, + user: User, + ): + return await self.read_content("", bool(content)) + + async def get_checkpoint( + self, + path: str, + user: User, + ): + await self._root_dir_task + src_path = self.root_dir / path + dst_path = self.root_dir / ".ipynb_checkpoints" / \ + f"{src_path.stem}-checkpoint{src_path.suffix}" + if not await dst_path.exists(): + return [] + mtime = await get_file_modification_time(dst_path) + return [Checkpoint(**{"id": "checkpoint", "last_modified": mtime})] + + async def get_content( + self, + path: str, + content: int, + user: User, + ): + return await self.read_content(path, bool(content)) + + async def save_content( + self, + path: str, + content: SaveContent, + user: User, + ): + try: + await self.write_content(content) + except Exception: + raise HTTPException(status_code=404, detail=f"Error saving {content.path}") + return await self.read_content(content.path, False) + + async def delete_content( + self, + path: str, + user: User, + ): + await self._root_dir_task + p = self.root_dir / path + if await p.exists(): + if await p.is_dir(): + await anyio.to_thread.run_sync(shutil.rmtree, p) + else: + await p.unlink() + return Response(status_code=HTTPStatus.NO_CONTENT.value) + + async def rename_content( + self, + path: str, + rename_content: RenameContent, + user: User, + ): + await self._root_dir_task + await (self.root_dir / path).rename(self.root_dir / rename_content.path) + return await self.read_content(rename_content.path, False) + + async def read_content( + self, path: Union[str, Path], get_content: bool, file_format: Optional[str] = None + ) -> Content: + await self._root_dir_task + apath = self.root_dir / path + content: Optional[Union[str, Dict, List[Dict]]] = None + if get_content: + if await apath.is_dir(): + content = [ + ( + await self.read_content( + Path(subpath.relative_to(self.root_dir)), + get_content=False, + ) + ).model_dump() + async for subpath in apath.iterdir() + if not subpath.name.startswith(".") + ] + elif await apath.is_file() or await apath.is_symlink(): + try: + content_bytes = await apath.read_bytes() + if file_format == "base64": + content = base64.b64encode(content_bytes).decode("ascii") + elif file_format == "json": + content = json.loads(content_bytes) + else: + content = content_bytes.decode() + except Exception: + raise HTTPException(status_code=404, detail="Item not found") + format: Optional[str] = None + if await apath.is_dir(): + size = None + type = "directory" + format = "json" + mimetype = None + elif await apath.is_file() or await apath.is_symlink(): + size = await get_file_size(apath) + if apath.suffix == ".ipynb": + type = "notebook" + format = None + mimetype = None + if content is not None: + nb: dict + if file_format == "json": + content = cast(Dict, content) + nb = content + else: + content = cast(str, content) + nb = json.loads(content) + for cell in nb["cells"]: + if "metadata" not in cell: + cell["metadata"] = {} + cell["metadata"].update({"trusted": False}) + if cell["cell_type"] == "code": + cell_source = cell["source"] + if not isinstance(cell_source, str): + cell["source"] = "".join(cell_source) + if file_format != "json": + content = json.dumps(nb) + elif apath.suffix == ".json": + type = "json" + format = "text" + mimetype = "application/json" + else: + type = "file" + format = None + mimetype = "text/plain" + else: + raise HTTPException(status_code=404, detail="Item not found") + + rel_path = apath.relative_to(self.root_dir) + return Content( + **{ + "name": rel_path.name, + "path": rel_path.as_posix(), + "last_modified": await get_file_modification_time(apath), + "created": await get_file_creation_time(apath), + "content": content, + "format": format, + "mimetype": mimetype, + "size": size, + "writable": await is_file_writable(apath), + "type": type, + } + ) + + async def write_content(self, content: Union[SaveContent, Dict]) -> None: + if not isinstance(content, SaveContent): + content = SaveContent(**content) + await self._root_dir_task + path = self.root_dir / content.path + if content.format == "base64": + content.content = cast(str, content.content) + content_bytes = content.content.encode("ascii") + await path.write_bytes(content_bytes) + else: + if content.format == "json": + dict_content = cast(Dict, content.content) + if content.type == "notebook": + # see https://github.com/jupyterlab/jupyterlab/issues/11005 + if ( + "metadata" in dict_content + and "orig_nbformat" in dict_content["metadata"] + ): + del dict_content["metadata"]["orig_nbformat"] + await path.write_text(json.dumps(dict_content, indent=2)) + else: + content.content = cast(str, content.content) + await path.write_text(content.content) + + @property + def file_id_manager(self): + return FileIdManager(db_path=str(self.db_path), root_dir=str(self.root_dir)) + + +async def get_available_path(path: anyio.Path, sep: str = "") -> anyio.Path: + directory = path.parent + name = Path(path.name) + i = None + while True: + if i is None: + i_str = "" + i = 1 + else: + i_str = str(i) + i += 1 + if i_str: + i_str = sep + i_str + available_path = directory / (name.stem + i_str + name.suffix) + if not await available_path.exists(): + return available_path + + +async def get_file_modification_time(path: anyio.Path): + if await path.exists(): + return datetime.utcfromtimestamp((await path.stat()).st_mtime).isoformat() + "Z" + + +async def get_file_creation_time(path: anyio.Path): + if await path.exists(): + return datetime.utcfromtimestamp((await path.stat()).st_ctime).isoformat() + "Z" + + +async def get_file_size(path: anyio.Path) -> Optional[int]: + if await path.exists(): + return (await path.stat()).st_size + raise HTTPException(status_code=404, detail="Item not found") + + +async def is_file_writable(path: anyio.Path) -> bool: + if await path.exists(): + if await path.is_dir(): + # FIXME + return True + else: + return os.access(path, os.W_OK) + return False diff --git a/plugins/contents/fps_contents/fileid.py b/plugins/contents/fps_contents/fileid.py index f489c59d..dbeb2672 100644 --- a/plugins/contents/fps_contents/fileid.py +++ b/plugins/contents/fps_contents/fileid.py @@ -35,9 +35,11 @@ class FileIdManager(metaclass=Singleton): initialized: asyncio.Event watchers: Dict[str, List[Watcher]] lock: asyncio.Lock + Change = Change - def __init__(self, db_path: str = ".fileid.db"): + def __init__(self, db_path: str = ".fileid.db", root_dir: str = "."): self.db_path = db_path + self.root_dir = Path(root_dir) self.initialized = asyncio.Event() self.watchers = {} self.watch_files_task = asyncio.create_task(self.watch_files()) @@ -78,6 +80,7 @@ async def index(self, path: str) -> Optional[str]: return idx async def watch_files(self): + self.root_dir = await self.root_dir.resolve() async with self.lock: async with aiosqlite.connect(self.db_path) as db: await db.execute("DROP TABLE IF EXISTS fileids") @@ -90,7 +93,7 @@ async def watch_files(self): # index files async with self.lock: async with aiosqlite.connect(self.db_path) as db: - async for path in Path().rglob("*"): + async for path in self.root_dir.rglob("*"): idx = uuid4().hex mtime = (await path.stat()).st_mtime await db.execute( @@ -99,14 +102,16 @@ async def watch_files(self): await db.commit() self.initialized.set() - async for changes in awatch(".", stop_event=self.stop_watching_files): + async for changes in awatch(self.root_dir, stop_event=self.stop_watching_files): async with self.lock: async with aiosqlite.connect(self.db_path) as db: deleted_paths = set() added_paths = set() for change, changed_path in changes: # get relative path - changed_path = Path(changed_path).relative_to(await Path().absolute()) + changed_path = Path(changed_path).relative_to( + await self.root_dir.absolute() + ) changed_path_str = str(changed_path) if change == Change.deleted: @@ -156,9 +161,16 @@ async def watch_files(self): for change in changes: changed_path = change[1] # get relative path - relative_changed_path = str(Path(changed_path).relative_to(await Path().absolute())) + relative_changed_path = Path(changed_path).relative_to( + await self.root_dir.absolute() + ) relative_change = (change[0], relative_changed_path) - for watcher in self.watchers.get(relative_changed_path, []): + all_watchers = [] + for path, watchers in self.watchers.items(): + p = Path(path) + if p == relative_changed_path or p in relative_changed_path.parents: + all_watchers += watchers + for watcher in all_watchers: watcher.notify(relative_change) self.stopped_watching_files.set() diff --git a/plugins/contents/fps_contents/main.py b/plugins/contents/fps_contents/main.py index 81f6fd18..01056bea 100644 --- a/plugins/contents/fps_contents/main.py +++ b/plugins/contents/fps_contents/main.py @@ -2,9 +2,9 @@ from jupyverse_api.app import App from jupyverse_api.auth import Auth -from jupyverse_api.contents import Contents +from jupyverse_api.contents import Contents, HTTPContents -from .routes import _Contents +from .routes import _HTTPContents class ContentsComponent(Component): @@ -15,5 +15,7 @@ async def start( app = await ctx.request_resource(App) auth = await ctx.request_resource(Auth) # type: ignore - contents = _Contents(app, auth) + http_contents = _HTTPContents(app, auth) + contents = http_contents.contents + ctx.add_resource(http_contents, types=HTTPContents) ctx.add_resource(contents, types=Contents) diff --git a/plugins/contents/fps_contents/routes.py b/plugins/contents/fps_contents/routes.py index 7f759b9e..48b0f9ec 100644 --- a/plugins/contents/fps_contents/routes.py +++ b/plugins/contents/fps_contents/routes.py @@ -1,292 +1,15 @@ -import base64 -import json -import os -import shutil -from datetime import datetime -from http import HTTPStatus -from pathlib import Path -from typing import Dict, List, Optional, Union, cast +import anyio -from anyio import open_file -from fastapi import HTTPException, Response -from starlette.requests import Request +from jupyverse_api.contents import HTTPContents -from jupyverse_api.auth import User -from jupyverse_api.contents import Contents -from jupyverse_api.contents.models import ( - Checkpoint, - Content, - CreateContent, - RenameContent, - SaveContent, -) +from .contents import _Contents -from .fileid import FileIdManager +class _HTTPContents(HTTPContents): + contents: _Contents + db_path: anyio.Path + root_dir: anyio.Path -class _Contents(Contents): - async def create_checkpoint( - self, - path, - user: User, - ): - src_path = Path(path) - dst_path = Path(".ipynb_checkpoints") / f"{src_path.stem}-checkpoint{src_path.suffix}" - try: - dst_path.parent.mkdir(exist_ok=True) - shutil.copyfile(src_path, dst_path) - except Exception: - # FIXME: return error code? - return [] - mtime = get_file_modification_time(dst_path) - return Checkpoint(**{"id": "checkpoint", "last_modified": mtime}) - - async def create_content( - self, - path: Optional[str], - request: Request, - user: User, - ): - create_content = CreateContent(**(await request.json())) - content_path = Path(create_content.path) - if create_content.type == "notebook": - available_path = get_available_path(content_path / "Untitled.ipynb") - async with await open_file(available_path, "w") as f: - await f.write( - json.dumps({"cells": [], "metadata": {}, "nbformat": 4, "nbformat_minor": 5}) - ) - src_path = available_path - dst_path = Path(".ipynb_checkpoints") / f"{src_path.stem}-checkpoint{src_path.suffix}" - try: - dst_path.parent.mkdir(exist_ok=True) - shutil.copyfile(src_path, dst_path) - except Exception: - # FIXME: return error code? - pass - elif create_content.type == "directory": - name = "Untitled Folder" - available_path = get_available_path(content_path / name, sep=" ") - available_path.mkdir(parents=True, exist_ok=True) - else: - assert create_content.ext is not None - available_path = get_available_path(content_path / ("untitled" + create_content.ext)) - open(available_path, "w").close() - - return await self.read_content(available_path, False) - - async def get_root_content( - self, - content: int, - user: User, - ): - return await self.read_content("", bool(content)) - - async def get_checkpoint( - self, - path, - user: User, - ): - src_path = Path(path) - dst_path = Path(".ipynb_checkpoints") / f"{src_path.stem}-checkpoint{src_path.suffix}" - if not dst_path.exists(): - return [] - mtime = get_file_modification_time(dst_path) - return [Checkpoint(**{"id": "checkpoint", "last_modified": mtime})] - - async def get_content( - self, - path: str, - content: int, - user: User, - ): - return await self.read_content(path, bool(content)) - - async def save_content( - self, - path, - request: Request, - response: Response, - user: User, - ): - content = SaveContent(**(await request.json())) - try: - await self.write_content(content) - except Exception: - raise HTTPException(status_code=404, detail=f"Error saving {content.path}") - return await self.read_content(content.path, False) - - async def delete_content( - self, - path, - user: User, - ): - p = Path(path) - if p.exists(): - if p.is_dir(): - shutil.rmtree(p) - else: - p.unlink() - return Response(status_code=HTTPStatus.NO_CONTENT.value) - - async def rename_content( - self, - path, - request: Request, - user: User, - ): - rename_content = RenameContent(**(await request.json())) - Path(path).rename(rename_content.path) - return await self.read_content(rename_content.path, False) - - async def read_content( - self, path: Union[str, Path], get_content: bool, file_format: Optional[str] = None - ) -> Content: - if isinstance(path, str): - path = Path(path) - content: Optional[Union[str, Dict, List[Dict]]] = None - if get_content: - if path.is_dir(): - content = [ - (await self.read_content(subpath, get_content=False)).model_dump() - for subpath in path.iterdir() - if not subpath.name.startswith(".") - ] - elif path.is_file() or path.is_symlink(): - try: - async with await open_file(path, mode="rb") as f: - content_bytes = await f.read() - if file_format == "base64": - content = base64.b64encode(content_bytes).decode("ascii") - elif file_format == "json": - content = json.loads(content_bytes) - else: - content = content_bytes.decode() - except Exception: - raise HTTPException(status_code=404, detail="Item not found") - format: Optional[str] = None - if path.is_dir(): - size = None - type = "directory" - format = "json" - mimetype = None - elif path.is_file() or path.is_symlink(): - size = get_file_size(path) - if path.suffix == ".ipynb": - type = "notebook" - format = None - mimetype = None - if content is not None: - nb: dict - if file_format == "json": - content = cast(Dict, content) - nb = content - else: - content = cast(str, content) - nb = json.loads(content) - for cell in nb["cells"]: - if "metadata" not in cell: - cell["metadata"] = {} - cell["metadata"].update({"trusted": False}) - if cell["cell_type"] == "code": - cell_source = cell["source"] - if not isinstance(cell_source, str): - cell["source"] = "".join(cell_source) - if file_format != "json": - content = json.dumps(nb) - elif path.suffix == ".json": - type = "json" - format = "text" - mimetype = "application/json" - else: - type = "file" - format = None - mimetype = "text/plain" - else: - raise HTTPException(status_code=404, detail="Item not found") - - return Content( - **{ - "name": path.name, - "path": path.as_posix(), - "last_modified": get_file_modification_time(path), - "created": get_file_creation_time(path), - "content": content, - "format": format, - "mimetype": mimetype, - "size": size, - "writable": is_file_writable(path), - "type": type, - } - ) - - async def write_content(self, content: Union[SaveContent, Dict]) -> None: - if not isinstance(content, SaveContent): - content = SaveContent(**content) - if content.format == "base64": - async with await open_file(content.path, "wb") as f: - content.content = cast(str, content.content) - content_bytes = content.content.encode("ascii") - await f.write(content_bytes) - else: - async with await open_file(content.path, "wt") as f: - if content.format == "json": - dict_content = cast(Dict, content.content) - if content.type == "notebook": - # see https://github.com/jupyterlab/jupyterlab/issues/11005 - if ( - "metadata" in dict_content - and "orig_nbformat" in dict_content["metadata"] - ): - del dict_content["metadata"]["orig_nbformat"] - await f.write(json.dumps(dict_content, indent=2)) - else: - content.content = cast(str, content.content) - await f.write(content.content) - - @property - def file_id_manager(self): - return FileIdManager() - - -def get_available_path(path: Path, sep: str = "") -> Path: - directory = path.parent - name = Path(path.name) - i = None - while True: - if i is None: - i_str = "" - i = 1 - else: - i_str = str(i) - i += 1 - if i_str: - i_str = sep + i_str - available_path = directory / (name.stem + i_str + name.suffix) - if not available_path.exists(): - return available_path - - -def get_file_modification_time(path: Path): - if path.exists(): - return datetime.utcfromtimestamp(path.stat().st_mtime).isoformat() + "Z" - - -def get_file_creation_time(path: Path): - if path.exists(): - return datetime.utcfromtimestamp(path.stat().st_ctime).isoformat() + "Z" - - -def get_file_size(path: Path) -> Optional[int]: - if path.exists(): - return path.stat().st_size - raise HTTPException(status_code=404, detail="Item not found") - - -def is_file_writable(path: Path) -> bool: - if path.exists(): - if path.is_dir(): - # FIXME - return True - else: - return os.access(path, os.W_OK) - return False + def __init__(self, *args, db_path: str = ".fileid.db", root_dir: str = ".", **kwargs): + self.contents = _Contents(db_path=db_path, root_dir=root_dir) + super().__init__(*args, **kwargs) diff --git a/plugins/yjs/fps_yjs/ydocs/ydrive.py b/plugins/yjs/fps_yjs/ydocs/ydrive.py new file mode 100644 index 00000000..8b283a4d --- /dev/null +++ b/plugins/yjs/fps_yjs/ydocs/ydrive.py @@ -0,0 +1,219 @@ +from __future__ import annotations + +from contextlib import AsyncExitStack +from functools import partial +from pathlib import Path +from typing import Any, Callable, Dict, List, cast + +from anyio import create_task_group +from anyio.abc import TaskGroup +from pycrdt import Doc, Map, MapEvent + +from jupyverse_api.auth import User +from jupyverse_api.contents import Content, Contents + +from .ybasedoc import YBaseDoc + + +class YDrive(YBaseDoc): + _starting: bool + _task_group: TaskGroup | None + + def __init__( + self, + contents: Contents, + ydoc: Doc | None = None, + root_dir: Path | str | None = None, + ): + super().__init__(ydoc) + self._root_dir = Path() if root_dir is None else Path(root_dir) + self._ydoc["content"] = self._ycontent = self._new_dir_content() + self._ycontent.observe_deep(self._callback) + self._user = User() + self._starting = False + self._task_group = None + self._contents = contents + self._watcher = contents.file_id_manager.watch(".") + + async def __aenter__(self) -> YDrive: + if self._task_group is not None: + raise RuntimeError("YDrive already running") + + async with AsyncExitStack() as exit_stack: + tg = create_task_group() + self._task_group = await exit_stack.enter_async_context(tg) + self._exit_stack = exit_stack.pop_all() + + assert self._task_group is not None + self._task_group.start_soon(self._process_file_changes) + + return self + + async def _process_file_changes(self): + async for change in self._watcher: + change_, path = change + if change_ == self._contents.file_id_manager.Change.deleted: + current_parent_content = await self._get(path.parent) + if path.name in current_parent_content["content"]: + del current_parent_content["content"][path.name] + elif change_ in ( + self._contents.file_id_manager.Change.added, + self._contents.file_id_manager.Change.modified, + ): + real_parent_content = await self._get_directory_content(path.parent) + # file change might be out of sync with current directory list + if path.name in real_parent_content.prelim: + current_parent_content = await self._get(path.parent) + current_parent_content[path.name] = real_parent_content.prelim[path.name] + + async def __aexit__(self, exc_type, exc_value, exc_tb): + if self._task_group is None: + raise RuntimeError("YDrive not running") + + self._task_group.cancel_scope.cancel() + self._task_group = None + return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb) + + def _callback(self, events): + for event in events: + if isinstance(event, MapEvent): + current = self._ycontent + for path in event.path: + current = current[path] + for key, val in event.keys.items(): + action = val.get("action") + if action == "delete": + path = "/".join(event.path[1::2] + [key]) + self._task_group.start_soon(self._try_delete_content, path) + elif action == "add": + path = "/".join(event.path[1::2] + [key]) + new_content = val["newValue"] + if new_content["is_dir"]: + self._task_group.start_soon(self._try_create_directory, path) + else: + self._task_group.start_soon(self._try_create_file, path) + + @property + def version(self) -> str: + return "1.0.0" + + def _new_dir_content(self) -> Map: + return Map({"is_dir": True, "content": None}) + + def _new_file_content(self, content: Content | None = None) -> Map: + if content is None: + return Map( + { + "is_dir": False, + "size": 0, + "last_modified": None, + "created": None, + "writable": True, + } + ) + return Map( + { + "is_dir": False, + "size": content.size, + "last_modified": content.last_modified, + "created": content.created, + "writable": content.writable, + } + ) + + async def _get_directory_content(self, path: Path) -> Map: + res = {} + content = await self._contents.read_content(self._root_dir / path, get_content=True) + if content.type == "directory": + directory_content = cast(List[Dict], content.content) + for entry in directory_content: + _content = Content(**entry) + if _content.type == "directory": + res[_content.name] = self._new_dir_content() + else: + res[_content.name] = self._new_file_content(_content) + return Map(res) + + async def _maybe_populate_dir(self, path: Path, content: Map): + if content["content"] is None: + content["content"] = await self._get_directory_content(path) + + async def _get(self, path: Path | str | None = None) -> Map: + path = Path() if path is None else Path(path) + current_content = self._ycontent + await self._maybe_populate_dir(path, self._ycontent) + cwd = Path() + last_idx = len(path.parts) - 1 + for idx, part in enumerate(path.parts): + try: + current_content = current_content["content"][part] + except KeyError: + raise FileNotFoundError(f'No entry "{part}" in "{cwd}".') + if current_content["is_dir"]: + cwd /= part + await self._maybe_populate_dir(cwd, current_content) + elif idx < last_idx: + raise RuntimeError(f'Entry "{part}" in "{cwd}" is not a directory.') + return current_content + + async def get(self, path: Path | str | None = None) -> dict: + return dict(await self._get(path)) + + async def _try_delete_content(self, path: str) -> None: + try: + await self._contents.delete_content(path, self._user) + except Exception: + # the operation didn't succeed, maybe revert the shared model change + await self._update(path) + + async def _try_create_directory(self, path: str) -> None: + try: + await self._contents.create_directory(path) + except Exception: + # the operation didn't succeed, maybe revert the shared model change + await self._update(path) + + async def _try_create_file(self, path: str) -> None: + try: + await self._contents.create_file(path) + except Exception: + # the operation didn't succeed, maybe revert the shared model change + await self._update(path) + + async def _update(self, path: Path | str) -> None: + path = Path(path) if isinstance(path, str) else path + real_parent_content = await self._get_directory_content(path.parent) + current_parent_content = await self._get(path.parent) + if path.name in real_parent_content.prelim: + if path.name not in current_parent_content: + current_parent_content[path.name] = real_parent_content.prelim[path.name] + else: + if path.name in current_parent_content: + del current_parent_content[path.name] + + async def delete(self, path: Path | str) -> None: + path = Path(path) if isinstance(path, str) else path + if not path.parts: + raise RuntimeError("Cannot delete root directory") + parent_content = await self._get(path.parent) + del parent_content["content"][path.name] + + async def create(self, path: Path | str, is_dir: bool): + path = Path(path) if isinstance(path, str) else path + if not path.parts: + raise RuntimeError("Cannot create root directory") + parent_content = await self._get(path.parent) + if is_dir: + parent_content["content"][path.name] = self._new_dir_content() + else: + parent_content["content"][path.name] = self._new_file_content() + + def set(self, value) -> None: + raise RuntimeError("Cannot set a YDrive") + + def observe(self, callback: Callable[[str, Any], None]) -> None: + self.unobserve() + self._subscriptions[self._ystate] = self._ystate.observe(partial(callback, "state")) + self._subscriptions[self._ycontent] = self._ycontent.observe_deep( + partial(callback, "content") + ) diff --git a/plugins/yjs/pyproject.toml b/plugins/yjs/pyproject.toml index 0895b4bd..f3acb78f 100644 --- a/plugins/yjs/pyproject.toml +++ b/plugins/yjs/pyproject.toml @@ -8,10 +8,18 @@ description = "An FPS plugin for the Yjs API" keywords = [ "jupyter", "server", "fastapi", "plugins" ] requires-python = ">=3.8" dependencies = [ + "anyio >=3.6.2,<5", "pycrdt >=0.8.2,<0.9.0", "jupyverse-api >=0.1.2,<1", ] dynamic = [ "version",] + +[project.optional-dependencies] +test = [ + "pytest", + "fps-contents", +] + [[project.authors]] name = "Jupyter Development Team" email = "jupyter@googlegroups.com" diff --git a/plugins/yjs/tests/conftest.py b/plugins/yjs/tests/conftest.py new file mode 100644 index 00000000..af7e4799 --- /dev/null +++ b/plugins/yjs/tests/conftest.py @@ -0,0 +1,6 @@ +import pytest + + +@pytest.fixture +def anyio_backend(): + return "asyncio" diff --git a/plugins/yjs/tests/fake_contents.py b/plugins/yjs/tests/fake_contents.py new file mode 100644 index 00000000..847756d3 --- /dev/null +++ b/plugins/yjs/tests/fake_contents.py @@ -0,0 +1,22 @@ +from pathlib import Path + +from fps_contents.fileid import FileIdManager + + +class Contents: + def __init__(self, db_path, root_dir): + self.file_id_manager = FileIdManager(db_path=db_path, root_dir=root_dir) + self.watcher = self.file_id_manager.watch(".") + self.root_dir = Path(root_dir) + + async def exists(self, path, user): + return (self.root_dir / path).exists() + + async def delete_content(self, path, user): + (self.root_dir / path).unlink() + + async def create_file(self, path): + (self.root_dir / path).write_text("") + + async def create_directory(self, path): + (self.root_dir / path).mkdir() diff --git a/plugins/yjs/tests/test_ydocs.py b/plugins/yjs/tests/test_ydocs.py new file mode 100644 index 00000000..0e08ef91 --- /dev/null +++ b/plugins/yjs/tests/test_ydocs.py @@ -0,0 +1,90 @@ +import tempfile +from pathlib import Path + +import pytest +from anyio import sleep + +#from fake_contents import Contents +from fastapi.exceptions import HTTPException +from fps_contents.contents import _Contents +from fps_yjs.ydocs.ydrive import YDrive + + +async def assert_with_timeout(timeout, func): + for _ in range(100): + await sleep(timeout / 100) + if func(): + break + assert func() + + +@pytest.mark.anyio +async def test_ydrive(): + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_dir = Path(tmp_dir) + (tmp_dir / "file0").write_text(" " * 1) + (tmp_dir / "file1").write_text(" " * 2) + (tmp_dir / "dir0").mkdir() + (tmp_dir / "dir0" / "file2").write_text(" " * 3) + (tmp_dir / "dir1").mkdir() + (tmp_dir / "dir1" / "dir2").mkdir() + (tmp_dir / "dir1" / "dir2" / "file3").write_text(" " * 4) + (tmp_dir / "dir1" / "dir2" / "file4").write_text(" " * 5) + + contents = _Contents(db_path=str(tmp_dir / ".fileid.db"), root_dir=str(tmp_dir)) + + async with YDrive(contents=contents, root_dir=tmp_dir) as ydrive: + await sleep(0.1) + await contents.file_id_manager.initialized.wait() + + with pytest.raises(HTTPException) as exc_info: + await ydrive.get("doesnt_exist") + assert str(exc_info.value) == "404: Item not found" + + root_dir = await ydrive.get() + assert "file0" in root_dir["content"] + assert "file1" in root_dir["content"] + assert "dir0" in root_dir["content"] + assert "dir1" in root_dir["content"] + + dir0 = await ydrive.get("dir0") + assert len(dir0["content"]) == 1 + assert "file2" in dir0["content"] + + dir1 = await ydrive.get("dir1") + assert len(dir1["content"]) == 1 + assert "dir2" in dir1["content"] + + dir2 = await ydrive.get("dir1/dir2") + assert len(dir2["content"]) == 2 + assert "file3" in dir2["content"] + assert "file4" in dir2["content"] + content = dict(dir1["content"]["dir2"]["content"]["file3"]) + assert not content["is_dir"] + assert content["size"] == 4 + + # delete files + await ydrive.delete("file0") + await assert_with_timeout(10, lambda: "file0" not in root_dir["content"]) + assert not (tmp_dir / "file0").exists() + + await ydrive.delete("dir1/dir2/file3") + await assert_with_timeout(10, lambda: "file3" not in root_dir["content"]) + assert not (tmp_dir / "dir1" / "dir2" / "file3").exists() + + await sleep(1) + assert "file1" in root_dir["content"] + (tmp_dir / "file1").unlink() + await assert_with_timeout(10, lambda: "file1" not in root_dir["content"]) + + assert "file4" in dir2["content"] + (tmp_dir / "dir1" / "dir2" / "file4").unlink() + await assert_with_timeout(10, lambda: "file4" not in dir2["content"]) + + # create files + await ydrive.create("new_dir0", is_dir=True) + await assert_with_timeout(10, lambda: "new_dir0" in root_dir["content"]) + await ydrive.create("new_dir0/new_file0", is_dir=False) + await assert_with_timeout( + 10, lambda: "new_file0" in root_dir["content"]["new_dir0"]["content"] + ) diff --git a/pyproject.toml b/pyproject.toml index c86aacbc..adc17f83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,7 @@ test = [ "requests", "websockets", "ipykernel", - "ypywidgets >=0.6.4,<0.7.0", + "ypywidgets >=0.6.5,<0.7.0", "ypywidgets-textual >=0.2.2,<0.3.0", ] docs = [ "mkdocs", "mkdocs-material" ] diff --git a/tests/test_execute.py b/tests/test_execute.py index d423f1a1..9a3e431e 100644 --- a/tests/test_execute.py +++ b/tests/test_execute.py @@ -92,7 +92,8 @@ async def test_execute(auth_mode, unused_tcp_port): "type": "notebook", } ) - file_id = response.json()["fileId"] + r = response.json() + file_id = r["fileId"] document_id = f"json:notebook:{file_id}" ynb = ydocs["notebook"]() def callback(aevent, events, event):