Skip to content

Commit

Permalink
Add YDrive
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart committed Dec 18, 2023
1 parent 190a70e commit 4ee2d3a
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 10 deletions.
10 changes: 7 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,12 @@ jobs:
python3 -m pip install --upgrade pip
python3 -m pip install hatch
- name: Create jupyterlab-auth dev environment
run: hatch env create dev.jupyterlab-auth
- name: Create jupyterlab-auth and jupyterlab-noauth dev environments
run: |
hatch env create dev.jupyterlab-auth
hatch env create dev.jupyterlab-noauth
- name: Run tests
run: hatch run dev.jupyterlab-auth:test
run: |
hatch run dev.jupyterlab-noauth:pytest plugins/yjs/tests -v --color=yes
hatch run dev.jupyterlab-auth:test
3 changes: 2 additions & 1 deletion jupyverse_api/jupyverse_api/contents/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union

from fastapi import APIRouter, Depends, Request, Response

Expand All @@ -15,6 +15,7 @@
class FileIdManager(ABC):
stop_watching_files: asyncio.Event
stopped_watching_files: asyncio.Event
Change: Any

@abstractmethod
async def get_path(self, file_id: str) -> str:
Expand Down
24 changes: 18 additions & 6 deletions plugins/contents/fps_contents/fileid.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ class FileIdManager(metaclass=Singleton):
initialized: asyncio.Event
watchers: Dict[str, List[Watcher]]
lock: asyncio.Lock
Change = Change

def __init__(self, db_path: str = ".fileid.db"):
def __init__(self, db_path: str = ".fileid.db", root_dir: str = "."):
self.db_path = db_path
self.root_dir = Path(root_dir)
self.initialized = asyncio.Event()
self.watchers = {}
self.watch_files_task = asyncio.create_task(self.watch_files())
Expand Down Expand Up @@ -78,6 +80,7 @@ async def index(self, path: str) -> Optional[str]:
return idx

async def watch_files(self):
self.root_dir = await self.root_dir.resolve()
async with self.lock:
async with aiosqlite.connect(self.db_path) as db:
await db.execute("DROP TABLE IF EXISTS fileids")
Expand All @@ -90,7 +93,7 @@ async def watch_files(self):
# index files
async with self.lock:
async with aiosqlite.connect(self.db_path) as db:
async for path in Path().rglob("*"):
async for path in self.root_dir.rglob("*"):
idx = uuid4().hex
mtime = (await path.stat()).st_mtime
await db.execute(
Expand All @@ -99,14 +102,16 @@ async def watch_files(self):
await db.commit()
self.initialized.set()

async for changes in awatch(".", stop_event=self.stop_watching_files):
async for changes in awatch(self.root_dir, stop_event=self.stop_watching_files):
async with self.lock:
async with aiosqlite.connect(self.db_path) as db:
deleted_paths = set()
added_paths = set()
for change, changed_path in changes:
# get relative path
changed_path = Path(changed_path).relative_to(await Path().absolute())
changed_path = Path(changed_path).relative_to(
await self.root_dir.absolute()
)
changed_path_str = str(changed_path)

if change == Change.deleted:
Expand Down Expand Up @@ -156,9 +161,16 @@ async def watch_files(self):
for change in changes:
changed_path = change[1]
# get relative path
relative_changed_path = str(Path(changed_path).relative_to(await Path().absolute()))
relative_changed_path = Path(changed_path).relative_to(
await self.root_dir.absolute()
)
relative_change = (change[0], relative_changed_path)
for watcher in self.watchers.get(relative_changed_path, []):
all_watchers = []
for path, watchers in self.watchers.items():
p = Path(path)
if p == relative_changed_path or p in relative_changed_path.parents:
all_watchers += watchers
for watcher in all_watchers:
watcher.notify(relative_change)

self.stopped_watching_files.set()
Expand Down
140 changes: 140 additions & 0 deletions plugins/yjs/fps_yjs/ydocs/ydrive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from __future__ import annotations

from contextlib import AsyncExitStack
from functools import partial
from pathlib import Path
from typing import Any, Callable

from anyio import create_task_group
from anyio.abc import TaskGroup
from pycrdt import Doc, Map, MapEvent

from jupyverse_api.auth import User
from jupyverse_api.contents import Contents

from .ybasedoc import YBaseDoc


class YDrive(YBaseDoc):
_starting: bool
_task_group: TaskGroup | None

def __init__(
self,
contents: Contents,
ydoc: Doc | None = None,
root_dir: Path | str | None = None,
):
super().__init__(ydoc)
self._root_dir = Path() if root_dir is None else Path(root_dir)
self._ydoc["content"] = self._ycontent = self._new_dir_content()
self._ycontent.observe_deep(self._callback)
self._user = User()
self._starting = False
self._task_group = None
self._contents = contents
self._watcher = contents.file_id_manager.watch(".")

async def __aenter__(self) -> YDrive:
if self._task_group is not None:
raise RuntimeError("YDrive already running")

async with AsyncExitStack() as exit_stack:
tg = create_task_group()
self._task_group = await exit_stack.enter_async_context(tg)
self._exit_stack = exit_stack.pop_all()

assert self._task_group is not None
self._task_group.start_soon(self._process_file_changes)

return self

async def _process_file_changes(self):
async for change in self._watcher:
change_, path = change
if change_ == self._contents.file_id_manager.Change.deleted:
parent_content = self._get(path.parent)
del parent_content["content"][path.name]

async def __aexit__(self, exc_type, exc_value, exc_tb):
if self._task_group is None:
raise RuntimeError("YDrive not running")

self._task_group.cancel_scope.cancel()
self._task_group = None
return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb)

def _callback(self, events):
for event in events:
if isinstance(event, MapEvent):
current = self._ycontent
for path in event.path:
current = current[path]
for key, val in event.keys.items():
if val.get("action") == "delete":
path = "/".join(event.path[1::2] + [key])
self._task_group.start_soon(self._contents.delete_content, path, self._user)

@property
def version(self) -> str:
return "1.0.0"

def _new_dir_content(self) -> Map:
return Map({"is_dir": True, "content": None})

def _new_file_content(self, size: int) -> Map:
return Map({"is_dir": False, "size": size})

def _get_directory_content(self, path: Path) -> Map:
res = {}
for entry in (self._root_dir / path).iterdir():
if entry.is_dir():
res[entry.name] = self._new_dir_content()
else:
stat = entry.stat()
res[entry.name] = self._new_file_content(
size=stat.st_size,
)
return Map(res)

def _maybe_populate_dir(self, path: Path, content: Map):
if content["content"] is None:
content["content"] = self._get_directory_content(path)

def _get(self, path: Path | str | None = None) -> Map:
path = Path() if path is None else Path(path)
current_content = self._ycontent
self._maybe_populate_dir(path, self._ycontent)
cwd = Path()
last_idx = len(path.parts) - 1
for idx, part in enumerate(path.parts):
try:
current_content = current_content["content"][part]
except KeyError:
raise FileNotFoundError(f'No entry "{part}" in "{cwd}".')
if current_content["is_dir"]:
cwd /= part
self._maybe_populate_dir(cwd, current_content)
elif idx < last_idx:
raise RuntimeError(f'Entry "{part}" in "{cwd}" is not a directory.')
return current_content

def get(self, path: Path | str | None = None) -> dict:
return dict(self._get(path))

def delete(self, path: Path | str):
path = Path(path) if isinstance(path, str) else path
if not path.parts:
raise RuntimeError("Cannot delete root directory")
parent_content = self._get(path.parent)
del parent_content["content"][path.name]

def set(self, value) -> None:
raise RuntimeError("Cannot set a YDrive")

def observe(self, callback: Callable[[str, Any], None]) -> None:
self.unobserve()
self._subscriptions[self._ystate] = self._ystate.observe(partial(callback, "state"))
self._subscriptions[self._ycontent] = self._ycontent.observe_deep(
partial(callback, "content")
)
8 changes: 8 additions & 0 deletions plugins/yjs/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,18 @@ description = "An FPS plugin for the Yjs API"
keywords = [ "jupyter", "server", "fastapi", "plugins" ]
requires-python = ">=3.8"
dependencies = [
"anyio >=3.6.2,<5",
"pycrdt >=0.7.2,<0.8.0",
"jupyverse-api >=0.1.2,<1",
]
dynamic = [ "version",]

[project.optional-dependencies]
test = [
"pytest",
"fps-contents",
]

[[project.authors]]
name = "Jupyter Development Team"
email = "[email protected]"
Expand Down
6 changes: 6 additions & 0 deletions plugins/yjs/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import pytest


@pytest.fixture
def anyio_backend():
return "asyncio"
14 changes: 14 additions & 0 deletions plugins/yjs/tests/fake_contents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from anyio import create_memory_object_stream
from anyio.streams.stapled import StapledObjectStream
from fps_contents.fileid import FileIdManager


class Contents:
def __init__(self, db_path, root_dir):
send_stream, recv_stream = create_memory_object_stream[str]()
self.event_stream = StapledObjectStream(send_stream, recv_stream)
self.file_id_manager = FileIdManager(db_path=db_path, root_dir=root_dir)
self.watcher = self.file_id_manager.watch(".")

async def delete_content(self, path, user):
await self.event_stream.send(f"delete {path}")
75 changes: 75 additions & 0 deletions plugins/yjs/tests/test_ydocs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import tempfile
from pathlib import Path

import pytest
from anyio import sleep
from fake_contents import Contents
from fps_yjs.ydocs.ydrive import YDrive


@pytest.mark.anyio
async def test_ydrive():
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_dir = Path(tmp_dir)
(tmp_dir / "file0").write_text(" " * 1)
(tmp_dir / "file1").write_text(" " * 2)
(tmp_dir / "dir0").mkdir()
(tmp_dir / "dir0" / "file2").write_text(" " * 3)
(tmp_dir / "dir1").mkdir()
(tmp_dir / "dir1" / "dir2").mkdir()
(tmp_dir / "dir1" / "dir2" / "file3").write_text(" " * 4)
(tmp_dir / "dir1" / "dir2" / "file4").write_text(" " * 5)

contents = Contents(db_path=str(tmp_dir / ".fileid.db"), root_dir=str(tmp_dir))

async with YDrive(contents=contents, root_dir=tmp_dir) as ydrive:

with pytest.raises(FileNotFoundError):
ydrive.get("doesnt_exist")

root_dir = ydrive.get()
assert len(root_dir["content"]) == 4
assert "file0" in root_dir["content"]
assert "file1" in root_dir["content"]
assert "dir0" in root_dir["content"]
assert "dir1" in root_dir["content"]

dir0 = ydrive.get("dir0")
assert len(dir0["content"]) == 1
assert "file2" in dir0["content"]

dir1 = ydrive.get("dir1")
assert len(dir1["content"]) == 1
assert "dir2" in dir1["content"]

dir2 = ydrive.get("dir1/dir2")
assert len(dir2["content"]) == 2
assert "file3" in dir2["content"]
assert "file4" in dir2["content"]
assert dict(dir1["content"]["dir2"]["content"]["file3"]) == {"is_dir": False, "size": 4}

# the fake contents actually doesn't delete files
path = "file0"
ydrive.delete(path)
assert await contents.event_stream.receive() == f"delete {path}"
path = "dir1/dir2/file3"
ydrive.delete(path)
assert await contents.event_stream.receive() == f"delete {path}"

await contents.file_id_manager.initialized.wait()
await sleep(1)
assert "file1" in root_dir["content"]
(tmp_dir / "file1").unlink()
for _ in range(100): # wait a total of 10s
await sleep(0.1)
if "file1" not in root_dir["content"]:
break
assert "file1" not in root_dir["content"]

assert "file4" in dir2["content"]
(tmp_dir / "dir1" / "dir2" / "file4").unlink()
for _ in range(100): # wait a total of 10s
await sleep(0.1)
if "file4" not in dir2["content"]:
break
assert "file4" not in dir2["content"]

0 comments on commit 4ee2d3a

Please sign in to comment.