Skip to content

Commit

Permalink
Merge pull request #4 from fsspec/recursive_rm
Browse files Browse the repository at this point in the history
fixed recursive rm
  • Loading branch information
d70-t authored Apr 13, 2022
2 parents b106523 + d225e07 commit 529ceda
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 5 deletions.
23 changes: 20 additions & 3 deletions swiftspec/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from urllib.parse import urlparse

import aiohttp
from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper
from fsspec.asyn import AsyncFileSystem, _run_coros_in_chunks, sync, sync_wrapper
from fsspec.exceptions import FSTimeoutError
from fsspec.spec import AbstractBufferedFile
from fsspec.utils import tokenize
Expand Down Expand Up @@ -260,14 +260,31 @@ async def _pipe_file(self, path, data, chunksize=50 * 2 ** 20, **kwargs):
async with session.put(url, data=data, headers=headers) as res:
res.raise_for_status()

async def _rm_file(self, path, **kwargs):
async def _rm_file(self, path, missing_is_ok=False, **kwargs):
ref = SWIFTRef(path)
if not ref.object:
raise NotImplementedError("currently rm is only implemented for objects")
headers = self.headers_for_url(ref.http_url)
session = await self.set_session()
async with session.delete(ref.http_url, headers=headers) as res:
res.raise_for_status()
if missing_is_ok and res.status == 404:
return
self._raise_not_found_for_status(res, ref)

async def _rm(self, path, recursive=False, batch_size=None, **kwargs):
# TODO: implement on_error
batch_size = batch_size or self.batch_size
path = await self._expand_path(path, recursive=recursive)
return await _run_coros_in_chunks(
[self._rm_file(p, missing_is_ok=recursive, **kwargs) for p in path],
batch_size=batch_size,
nofiles=True,
)

def rmdir(self, path):
raise OSError(
"empty directories can't exist on SWIFT, this method can't succeed"
)

def _open(
self,
Expand Down
67 changes: 65 additions & 2 deletions test/test_swiftfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ async def _method(self, method, url, params=None, headers=None, data=None):
params = params or {}
headers = headers or {}
yield self.router(
"/" + path, method, store=self.store, headers=headers, data=data
"/" + path,
method,
store=self.store,
headers=headers,
params=params,
data=data,
)

def get(self, url, params=None, headers=None):
Expand Down Expand Up @@ -103,9 +108,10 @@ def create_mock_data():


class SWIFTHandler:
def __init__(self, store, headers, data):
def __init__(self, store, headers, params, data):
self.store = store
self.headers = headers
self.params = params
self.data = data


Expand All @@ -125,6 +131,7 @@ def get(self, account):

class ContainerHandler(SWIFTHandler):
def get(self, account, container):
prefix = self.params.get("prefix", "")
objects = [
{
"hash": md5(v).hexdigest(),
Expand All @@ -134,7 +141,21 @@ def get(self, account, container):
"content_type": "application/octet-stream",
}
for k, v in self.store[account][container].items()
if k.startswith(prefix)
]
if "delimiter" in self.params:
delimiter = self.params["delimiter"]
keep = len(prefix)
files = []
folders = set()
for o in objects:
rest = o["name"][keep:]
if delimiter in rest:
folders.add(prefix + rest.split(delimiter)[0])
else:
files.append(o)
objects = files + [{"subdir": k + delimiter} for k in folders]

return MockResponse(200, json.dumps(objects))


Expand Down Expand Up @@ -171,6 +192,8 @@ def put(self, account, container, obj):
def delete(self, account, container, obj):
if obj in self.store[account][container]:
del self.store[account][container][obj]
else:
return MockResponse(404, "not found")
return MockResponse(204, "no content")


Expand Down Expand Up @@ -210,6 +233,15 @@ def test_ls_container(fs):
assert res[0]["size"] == len(b"Hello World")


def test_ls_object(fs):
fs.pipe("swift://server/a1/c1/foo/x/bar", b"bar")
fs.pipe("swift://server/a1/c1/foo/x/baz", b"bar")
res = fs.ls("swift://server/a1/c1/foo")
assert len(res) == 1
assert res[0]["name"] == "swift://server/a1/c1/foo/x"
assert res[0]["type"] == "directory"


def test_cat(fs):
assert fs.cat("swift://server/a1/c1/hello") == b"Hello World"

Expand Down Expand Up @@ -248,6 +280,37 @@ def test_rm(fs):
assert "hello" not in fs._session.store["a1"]["c1"]


def test_rm_nonexistent_raises(fs):
with pytest.raises(FileNotFoundError):
fs.rm("swift://server/a1/c1/doesnt_exist")


def test_rm_recursive(fs):
fs.pipe("swift://server/a1/c1/foo/x/bar", b"bar")
fs.pipe("swift://server/a1/c1/foo/x/baz", b"bar")
assert fs._session.store["a1"]["c1"]["foo/x/bar"] == b"bar"
assert fs._session.store["a1"]["c1"]["foo/x/baz"] == b"bar"
fs.rm("swift://server/a1/c1/foo", recursive=True)
assert "foo/x/bar" not in fs._session.store["a1"]["c1"]
assert "foo/x/baz" not in fs._session.store["a1"]["c1"]


def test_rmdir_raises(fs):
with pytest.raises(OSError):
# empty directores can't exist on SWIFT
fs.rmdir("swift://server/a1/c1/test")


def test_expand_path(fs):
fs.pipe("swift://server/a1/c1/foo/x/bar", b"bar")
fs.pipe("swift://server/a1/c1/foo/x/baz", b"bar")
assert set(fs.expand_path("swift://server/a1/c1/foo/", recursive=True)) == {
"swift://server/a1/c1/foo/x",
"swift://server/a1/c1/foo/x/bar",
"swift://server/a1/c1/foo/x/baz",
}


def test_open_read(fs):
with fs.open("swift://server/a1/c1/hello", "r") as f:
assert f.read() == "Hello World"
Expand Down

0 comments on commit 529ceda

Please sign in to comment.