Skip to content

Commit

Permalink
Proof of concept with iRODS session cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
lwesterhof committed Jul 20, 2023
1 parent d29ea4b commit d452aa0
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 42 deletions.
40 changes: 32 additions & 8 deletions connman.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

__copyright__ = 'Copyright (c) 2021-2022, Utrecht University'
__copyright__ = 'Copyright (c) 2021-2023, Utrecht University'
__license__ = 'GPLv3, see LICENSE'

import threading
Expand All @@ -9,7 +9,8 @@

from irods.session import iRODSSession

TTL = 60 * 30
TTL = 60 * 30 # Time to live (TTL) for Flask sessions.
IRODS_TTL = 60 # Time to live (TTL) for iRODS sessions.


class Session(object):
Expand All @@ -19,18 +20,19 @@ def __init__(self, sid: int, irods: iRODSSession) -> None:
:param sid: Flask session identifier
:param irods: iRODS session
"""
self.sid: int = sid
self.irods: iRODSSession = irods
self.time: float = time.time()
self.lock = threading.Lock()
self.sid: int = sid # Flask session identifier
self.time: float = time.time() # Flask session start time
self.irods: iRODSSession = irods # iRODS session
self.irods_time: float = time.time() # iRODS session start time
self.lock: threading.Lock = threading.Lock()

def __del__(self) -> None:
self.irods.cleanup()
print(f"[gc/logout]: Dropped iRODS session of session {self.sid}")
print(f"[gc/logout]: Cleanup session {self.sid}")


sessions: Dict[int, Session] = dict() # Custom session dict instead of Flask session (cannot pickle iRODS session)
lock = threading.Lock()
lock: threading.Lock = threading.Lock()


def gc() -> None:
Expand All @@ -39,8 +41,16 @@ def gc() -> None:
with lock:
t = time.time()
global sessions

# Remove sessions that exceed the Flask session TTL.
sessions = {k: v for k, v in sessions.items() if t - v.time < TTL or v.lock.locked()}

# Cleanup iRODS sessions that exceed the iRODS session TTL.
for _, s in sessions.items():
if t - s.irods_time > IRODS_TTL and not s.lock.locked():
s.irods.cleanup()
s.irods_time = time.time()

time.sleep(1)


Expand Down Expand Up @@ -69,6 +79,7 @@ def add(sid: int, irods: iRODSSession) -> None:
s: Session = Session(sid, irods)
sessions[sid] = s
s.time = time.time()
s.irods_time = time.time()
s.lock.acquire()
print(f"[login]: Successfully connected to iRODS for session {sid}'")

Expand All @@ -82,6 +93,7 @@ def release(sid: int) -> None:
if sid in sessions:
s: Session = sessions[sid]
s.time = time.time()
s.irods_time = time.time()
s.lock.release()


Expand All @@ -93,3 +105,15 @@ def clean(sid: int) -> None:
global sessions
if sid in sessions:
del sessions[sid]


def extend(sid: int) -> None:
"""Extend session TTLs.
:param sid: Flask session identifier
"""
global sessions
if sid in sessions:
s: Session = sessions[sid]
s.time = time.time()
s.irods_time = time.time()
12 changes: 6 additions & 6 deletions deposit/deposit.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
#!/usr/bin/env python3

__copyright__ = 'Copyright (c) 2021-2022, Utrecht University'
__copyright__ = 'Copyright (c) 2021-2023, Utrecht University'
__license__ = 'GPLv3, see LICENSE'

import io
from typing import Iterator

from flask import abort, Blueprint, g, redirect, render_template, request, Response, stream_with_context, url_for
from flask import abort, Blueprint, g, redirect, render_template, request, Response, session, stream_with_context, url_for
from irods.exception import CAT_NO_ACCESS_PERMISSION

import api
import connman

deposit_bp = Blueprint('deposit_bp', __name__,
template_folder='templates',
Expand Down Expand Up @@ -56,17 +57,16 @@ def data() -> Response:
def download() -> Response:
path = '/' + g.irods.zone + '/home' + request.args.get('filepath')
filename = path.rsplit('/', 1)[1]
session = g.irods

READ_BUFFER_SIZE = 1024 * io.DEFAULT_BUFFER_SIZE

def read_file_chunks(path: str) -> Iterator[bytes]:
obj = session.data_objects.get(path)
obj = g.irods.data_objects.get(path)
try:
with obj.open('r') as fd:
while True:
buf = fd.read(READ_BUFFER_SIZE)
if buf:
connman.extend(session.sid)
yield buf
else:
break
Expand All @@ -75,7 +75,7 @@ def read_file_chunks(path: str) -> Iterator[bytes]:
except Exception:
abort(500)

if session.data_objects.exists(path):
if g.irods.data_objects.exists(path):
return Response(
stream_with_context(read_file_chunks(path)),
headers={
Expand Down
25 changes: 11 additions & 14 deletions research/research.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@

from flask import (
abort, Blueprint, current_app as app, g, jsonify, make_response,
render_template, request, Response, stream_with_context
render_template, request, Response, session, stream_with_context
)
from irods.exception import CAT_NO_ACCESS_PERMISSION
from irods.message import iRODSMessage
from werkzeug.utils import secure_filename

import api
import connman
from util import log_error

research_bp = Blueprint('research_bp', __name__,
Expand All @@ -41,17 +42,16 @@ def index() -> Response:
def download() -> Response:
path = '/' + g.irods.zone + '/home' + request.args.get('filepath')
filename = path.rsplit('/', 1)[1]
session = g.irods

READ_BUFFER_SIZE = 1024 * io.DEFAULT_BUFFER_SIZE

def read_file_chunks(path: str) -> Iterator[bytes]:
obj = session.data_objects.get(path)
obj = g.irods.data_objects.get(path)
try:
with obj.open('r') as fd:
while True:
buf = fd.read(READ_BUFFER_SIZE)
if buf:
connman.extend(session.sid)
yield buf
else:
break
Expand All @@ -60,7 +60,7 @@ def read_file_chunks(path: str) -> Iterator[bytes]:
except Exception:
abort(500)

if session.data_objects.exists(path):
if g.irods.data_objects.exists(path):
return Response(
stream_with_context(read_file_chunks(path)),
headers={
Expand All @@ -80,9 +80,8 @@ def build_object_path(path: str, relative_path: str, filename: str) -> str:
if relative_path:
base_dir = os.path.join("/" + g.irods.zone, 'home', path, relative_path)
# Ensure upload collection exists.
session = g.irods
if not session.collections.exists(base_dir):
session.collections.create(base_dir)
if not g.irods.collections.exists(base_dir):
g.irods.collections.create(base_dir)
else:
base_dir = os.path.join("/" + g.irods.zone, 'home', path)

Expand Down Expand Up @@ -115,7 +114,6 @@ def upload_get() -> Response:
response.headers["Content-Type"] = "application/json"
return response

session = g.irods
object_path = build_object_path(filepath, flow_relative_path, flow_filename)

# Partial file name for chunked uploads.
Expand All @@ -128,7 +126,7 @@ def upload_get() -> Response:
return response

try:
obj = session.data_objects.get(object_path)
obj = g.irods.data_objects.get(object_path)

if obj.replicas[0].size > int(flow_chunk_size * (flow_chunk_number - 1)):
# Chunk already exists.
Expand Down Expand Up @@ -170,7 +168,6 @@ def upload_post() -> Response:
response.headers["Content-Type"] = "application/json"
return response

session = g.irods
object_path = build_object_path(filepath, flow_relative_path, flow_filename)

# Partial file name for chunked uploads.
Expand All @@ -183,7 +180,7 @@ def upload_post() -> Response:

# Write chunk data.
try:
with session.data_objects.open(object_path, 'a', rescName=app.config.get('IRODS_DEFAULT_RESC')) as obj_desc:
with g.irods.data_objects.open(object_path, 'a', rescName=app.config.get('IRODS_DEFAULT_RESC')) as obj_desc:
obj_desc.seek(int(flow_chunk_size * (flow_chunk_number - 1)))
obj_desc.write(encode_unicode_content)
except Exception:
Expand All @@ -202,11 +199,11 @@ def upload_post() -> Response:
final_object_path = build_object_path(filepath, flow_relative_path, flow_filename)
try:
# overwriting doesn't work using the move command, therefore unlink the previous file first
session.data_objects.unlink(final_object_path, force=True)
g.irods.data_objects.unlink(final_object_path, force=True)
except Exception:
# Probably there was no file present which is no erroneous situation
pass
session.data_objects.move(object_path, final_object_path)
g.irods.data_objects.move(object_path, final_object_path)

response = make_response(jsonify({"message": "Chunk upload succeeded"}), 200)
response.headers["Content-Type"] = "application/json"
Expand Down
34 changes: 20 additions & 14 deletions vault/vault.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
#!/usr/bin/env python3

__copyright__ = 'Copyright (c) 2021-2022, Utrecht University'
__copyright__ = 'Copyright (c) 2021-2023, Utrecht University'
__license__ = 'GPLv3, see LICENSE'

import io
from typing import Iterator
from uuid import UUID

from flask import abort, Blueprint, g, render_template, request, Response, stream_with_context
from flask import abort, Blueprint, g, render_template, request, Response, session, stream_with_context
from irods.exception import CAT_NO_ACCESS_PERMISSION

import api
import connman

vault_bp = Blueprint('vault_bp', __name__,
template_folder='templates',
Expand All @@ -34,21 +36,25 @@ def index() -> Response:
def download() -> Response:
path = '/' + g.irods.zone + '/home' + request.args.get('filepath')
filename = path.rsplit('/', 1)[1]
session = g.irods

READ_BUFFER_SIZE = 1024 * io.DEFAULT_BUFFER_SIZE

def read_file_chunks(path: str) -> Iterator[bytes]:
obj = session.data_objects.get(path)
with obj.open('r') as fd:
while True:
buf = fd.read(READ_BUFFER_SIZE)
if buf:
yield buf
else:
break

if session.data_objects.exists(path):
obj = g.irods.data_objects.get(path)
try:
with obj.open('r') as fd:
while True:
buf = fd.read(READ_BUFFER_SIZE)
if buf:
connman.extend(session.sid)
yield buf
else:
break
except CAT_NO_ACCESS_PERMISSION:
abort(403)
except Exception:
abort(500)

if g.irods.data_objects.exists(path):
return Response(
stream_with_context(read_file_chunks(path)),
headers={
Expand Down

0 comments on commit d452aa0

Please sign in to comment.