From 34b3acab34a87484426610de60f5ed79333d1feb Mon Sep 17 00:00:00 2001 From: Ratul Hasan <34002411+RaSan147@users.noreply.github.com> Date: Tue, 5 Mar 2024 00:53:48 +0600 Subject: [PATCH] Improved code quality, added POST security check See changelog.MD on v0.9.4 --- CHANGELOG.MD | 22 +++- VERSION | 2 +- dev_src/clone.py | 2 - dev_src/local_server_pyrobox.py | 217 +++++++++++++++++--------------- dev_src/pyroboxCore.py | 44 ++++++- run_setup.py | 4 +- setup.cfg | 2 +- src/pyroboxCore.py | 42 ++++++- src/server.py | 217 +++++++++++++++++--------------- 9 files changed, 332 insertions(+), 220 deletions(-) diff --git a/CHANGELOG.MD b/CHANGELOG.MD index f7077c2..0d54ebc 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -1,3 +1,19 @@ +# Version 0.9.4 + ## Client-side Changes: + * Nothing to notice + + ## Server-side Changes: + * Improved code quality + * Fixed zip, when empty folder has \ in name + * Added security check for POST requests to prevent directory traversal + + ## Fixes: + * Fixed zip, when empty folder has \ in name + + ## TODO: + * add zip in the dynamic island (maybe use session storage to store zip data) + + # Version 0.9.3 ## Client-side Changes: * Fixed zip downloader as a PWA @@ -9,11 +25,11 @@ ## Fixes: * Fixed zip downloader page - * Fixed unwanted URL-encoding in Top-nav - * Video player page now use innerHTML instead of innerText (error message was wrongly shown) + * Fixed unwanted URL encoding in Top-nav + * Video player page now uses innerHTML instead of innerText (error message was wrongly shown) ## TODO: - * add zip in dynamic island (maybe use session storage to store zip data) + * add zip in the dynamic island (maybe use session storage to store zip data) diff --git a/VERSION b/VERSION index 965065d..a602fc9 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.9.3 +0.9.4 diff --git a/dev_src/clone.py b/dev_src/clone.py index d3d78f9..2cd494e 100644 --- a/dev_src/clone.py +++ b/dev_src/clone.py @@ -3,8 +3,6 @@ from queue import Queue from datetime import datetime, timezone -#print(datetime.datetime.now()) - import requests os.umask(0) # make sure permissions are set correctly diff --git a/dev_src/local_server_pyrobox.py b/dev_src/local_server_pyrobox.py index 62c23f7..2c025dc 100644 --- a/dev_src/local_server_pyrobox.py +++ b/dev_src/local_server_pyrobox.py @@ -245,7 +245,9 @@ def get_page_type(self: SH, *args, **kwargs): - path = kwargs.get('path', '') + os_path = kwargs.get('path') + url_path = kwargs.get('url_path') + result= "unknown" @@ -264,13 +266,14 @@ def get_page_type(self: SH, *args, **kwargs): elif self.query("czip"): result = "zip" - elif path == "/favicon.ico": + elif url_path == "/favicon.ico": result = "favicon" - elif os.path.isdir(path): + + elif os.path.isdir(os_path): for index in "index.html", "index.htm": - index = os.path.join(path, index) + index = os.path.join(os_path, index) if os.path.exists(index): result = "html" break @@ -281,7 +284,7 @@ def get_page_type(self: SH, *args, **kwargs): else: result = "dir" - elif os.path.isfile(path): + elif os.path.isfile(os_path): result = "file" return self.return_txt(result, cookie=cookie) @@ -502,16 +505,15 @@ def get_size(self: SH, *args, **kwargs): url_path = kwargs.get('url_path', '') + os_path = self.translate_path(url_path) - xpath = self.translate_path(url_path) - - stat = get_stat(xpath) + stat = get_stat(os_path) if not stat: return self.send_json({"status": 0}, cookie=cookie) - if os.path.isfile(xpath): + if os.path.isfile(os_path): size = stat.st_size else: - size = get_dir_size(xpath) + size = get_dir_size(os_path) humanbyte = humanbytes(size) fmbyte = fmbytes(size) @@ -531,16 +533,15 @@ def get_size_n_count(self: SH, *args, **kwargs): url_path = kwargs.get('url_path', '') + os_path = self.translate_path(url_path) - xpath = self.translate_path(url_path) - - stat = get_stat(xpath) + stat = get_stat(os_path) if not stat: return self.send_json({"status": 0}, cookie=cookie) - if os.path.isfile(xpath): + if os.path.isfile(os_path): count, size = 1, stat.st_size else: - count, size = get_tree_count_n_size(xpath) + count, size = get_tree_count_n_size(os_path) humanbyte = humanbytes(size) fmbyte = fmbytes(size) @@ -569,9 +570,8 @@ def get_zip_id(self: SH, *args, **kwargs): return self.return_txt("ERROR: ZIP FEATURE IS UNAVAILABLE !", HTTPStatus.INTERNAL_SERVER_ERROR, cookie=cookie) - path = kwargs.get('path', '') - os_path = self.translate_path(path) - spathsplit = kwargs.get('spathsplit', '') + os_path = kwargs.get('path') + spathsplit = kwargs.get('spathsplit') filename = spathsplit[-2] + ".zip" zid = None @@ -618,6 +618,11 @@ def create_zip(self: SH, *args, **kwargs): return self.return_txt("ERROR: ZIP FEATURE IS UNAVAILABLE !", HTTPStatus.INTERNAL_SERVER_ERROR, cookie=cookie) url_path = kwargs.get('url_path', '') + os_path = self.translate_path(url_path) + + # if not dir or not exists + if not os.path.isdir(os_path): + return self.send_error(HTTPStatus.NOT_FOUND, "Directory not found", cookie=cookie) # dir_size = get_dir_size(path, limit=6*1024*1024*1024) @@ -658,9 +663,8 @@ def get_zip(self: SH, *args, **kwargs): - path = kwargs.get('path', '') - os_path = self.translate_path(path) - spathsplit = kwargs.get('spathsplit', '') + os_path = kwargs.get('path') + spathsplit = kwargs.get('spathsplit') query = self.query msg = False @@ -734,13 +738,14 @@ def send_video_data(self: SH, *args, **kwargs): if not user: # guest or not will be handled in Authentication return self.send_text(pt.login_page(), HTTPStatus.UNAUTHORIZED, cookie=cookie) - path = kwargs.get('path', '') - url_path = kwargs.get('url_path', '') + os_path = kwargs.get('path') + url_path = kwargs.get('url_path') + vid_source = url_path - content_type = self.guess_type(path) + content_type = self.guess_type(os_path) if not content_type.startswith('video/'): self.send_error(HTTPStatus.NOT_FOUND, "THIS IS NOT A VIDEO FILE", cookie=cookie) @@ -776,11 +781,11 @@ def send_video_page(self: SH, *args, **kwargs): return self.send_text(pt.login_page(), HTTPStatus.UNAUTHORIZED, cookie=cookie) - path = kwargs.get('path', '') - url_path = kwargs.get('url_path', '') + os_path = kwargs.get('path') + url_path = kwargs.get('url_path') - vid_source = url_path - content_type = self.guess_type(path) + # vid_source = url_path + content_type = self.guess_type(os_path) if not content_type.startswith('video/'): self.send_error(HTTPStatus.NOT_FOUND, "THIS IS NOT A VIDEO FILE", cookie=cookie) @@ -819,8 +824,8 @@ def send_video_page(self: SH, *args, **kwargs): # return None -# path = kwargs.get('path', '') -# spathsplit = kwargs.get('spathsplit', '') +# path = kwargs.get('path') +# spathsplit = kwargs.get('spathsplit') # path = CoreConfig.ASSETS_dir + "/".join(spathsplit[2:]) # # print("USING ASSETS", path) @@ -927,7 +932,7 @@ def get_folder_data(self: SH, *args, **kwargs): }, cookie=cookie) - path = kwargs.get('path', '') + os_path = kwargs.get('path') if not user.VIEW: return self.send_json({ @@ -938,9 +943,11 @@ def get_folder_data(self: SH, *args, **kwargs): }, cookie=cookie) - is_dir = None try: - is_dir = os.path.isdir(path) + if not os.path.isdir(os_path): + return self.send_json({"status": 0, + "warning": "Folder not found"}, cookie=cookie) + except Exception as e: err = traceback.format_exc() return self.send_json({ @@ -950,11 +957,7 @@ def get_folder_data(self: SH, *args, **kwargs): }) - if is_dir is None: - return self.send_json({"status": 0, - "warning": "Folder not found"}, cookie=cookie) - - data = list_directory(self, path, user, cookie=cookie) + data = list_directory(self, os_path, user, cookie=cookie) if data: return self.send_json(data, cookie=cookie) @@ -973,9 +976,9 @@ def default_get(self: SH, filename=None, *args, **kwargs): return self.redirect("?login") - path = kwargs.get('path', '') + os_path = kwargs.get('path') - if os.path.isdir(path): + if os.path.isdir(os_path): parts = urllib.parse.urlsplit(self.path) if not parts.path.endswith('/'): # redirect browser - doing basically what apache does @@ -988,12 +991,12 @@ def default_get(self: SH, filename=None, *args, **kwargs): self.end_headers() return None for index in "index.html", "index.htm": - index = os.path.join(path, index) + index = os.path.join(os_path, index) if os.path.exists(index): - path = index + os_path = index break else: - return list_directory_html(self, path, user, cookie=cookie) + return list_directory_html(self, os_path, user, cookie=cookie) # check for trailing "/" which should return 404. See Issue17324 # The test for this was added in test_httpserver.py @@ -1001,7 +1004,7 @@ def default_get(self: SH, filename=None, *args, **kwargs): # See discussion on python-dev and Issue34711 regarding # parsing and rejection of filenames with a trailing slash - if path.endswith("/"): + if os_path.endswith("/"): self.send_error(HTTPStatus.NOT_FOUND, "File not found", cookie=cookie) return None @@ -1012,9 +1015,9 @@ def default_get(self: SH, filename=None, *args, **kwargs): if (not user.DOWNLOAD) or user.NOPERMISSION: return self.send_error(HTTPStatus.SERVICE_UNAVAILABLE, "Download is disabled", cookie=cookie) - if not os.path.exists(path): + if not os.path.exists(os_path): return self.send_error(HTTPStatus.NOT_FOUND, "File not found", cookie=cookie) - return self.return_file(path, filename, cookie=cookie) + return self.return_file(os_path, filename, cookie=cookie) @@ -1125,7 +1128,7 @@ def upload(self: SH, *args, **kwargs): return self.send_txt("Upload not allowed", HTTPStatus.SERVICE_UNAVAILABLE, cookie=cookie) - path = kwargs.get('path') + os_path = kwargs.get('path') url_path = kwargs.get('url_path') @@ -1161,14 +1164,16 @@ def upload(self: SH, *args, **kwargs): return self.send_error(HTTPStatus.BAD_REQUEST, "Can't find out file name...", cookie=cookie) - path = self.translate_path(self.path) rltv_path = posixpath.join(url_path, fn) + + if not self.path_safety_check(fn, rltv_path): + return self.send_txt("Invalid Path: " + rltv_path, HTTPStatus.BAD_REQUEST, cookie=cookie) - temp_fn = os.path.join(path, ".LStemp-"+fn +'.tmp') + temp_fn = os.path.join(os_path, ".LStemp-"+fn +'.tmp') CoreConfig.temp_file.add(temp_fn) - fn = os.path.join(path, fn) + os_f_path = os.path.join(os_path, fn) @@ -1195,12 +1200,12 @@ def upload(self: SH, *args, **kwargs): preline = line - while (not user.MODIFY) and os.path.isfile(fn): + while (not user.MODIFY) and os.path.isfile(os_f_path): n = 1 - name, ext = os.path.splitext(fn) + name, ext = os.path.splitext(os_f_path) fn = f"{name}({n}){ext}" n += 1 - os.replace(temp_fn, fn) + os.replace(temp_fn, os_f_path) @@ -1237,7 +1242,6 @@ def del_2_recycle(self: SH, *args, **kwargs): if user.NOPERMISSION or (not user.DELETE): return self.send_json({"head": "Failed", "body": "You have no permission to delete."}, cookie=cookie) - path = kwargs.get('path') url_path = kwargs.get('url_path') @@ -1255,16 +1259,20 @@ def del_2_recycle(self: SH, *args, **kwargs): # File link to move to recycle bin filename = form.get_multi_field(verify_name='name', decode=T)[1].strip() - path = self.get_rel_path(filename) - xpath = self.translate_path(posixpath.join(url_path, filename)) + rel_path = self.get_rel_path(filename) + + if not self.path_safety_check(filename, rel_path): + return self.send_json({"head": "Failed", "body": "Invalid Path: " + rel_path}, cookie=cookie) + + os_f_path = self.translate_path(posixpath.join(url_path, filename)) - self.log_warning(f'<-send2trash-> {xpath} by {[uid]}') + self.log_warning(f'<-send2trash-> {os_f_path} by {[uid]}') head = "Failed" try: if CoreConfig.OS == 'Android': raise InterruptedError - send2trash(xpath) + send2trash(os_f_path) msg = "Successfully Moved To Recycle bin"+ post.refresh head = "Success" except TrashPermissionError: @@ -1273,7 +1281,7 @@ def del_2_recycle(self: SH, *args, **kwargs): msg = "Recycling unavailable! Try deleting permanently..." except Exception as e: traceback.print_exc() - msg = "" + path + " " + e.__class__.__name__ + msg = "" + rel_path + " " + e.__class__.__name__ return self.send_json({"head": head, "body": msg}, cookie=cookie) @@ -1293,7 +1301,6 @@ def del_permanently(self: SH, *args, **kwargs): if user.NOPERMISSION or (not user.DELETE): return self.send_json({"head": "Failed", "body": "Recycling unavailable! Try deleting permanently..."}, cookie=cookie) - path = kwargs.get('path') url_path = kwargs.get('url_path') @@ -1307,23 +1314,27 @@ def del_permanently(self: SH, *args, **kwargs): # File link to move to recycle bin filename = form.get_multi_field(verify_name='name', decode=T)[1].strip() - path = self.get_rel_path(filename) - xpath = self.translate_path(posixpath.join(url_path, filename)) + rel_path = self.get_rel_path(filename) + + if not self.path_safety_check(filename, rel_path): + return self.send_json({"head": "Failed", "body": "Invalid Path: " + rel_path}, cookie=cookie) - self.log_warning(f'Perm. DELETED {xpath} by {[uid]}') + os_f_path = self.translate_path(posixpath.join(url_path, filename)) + + self.log_warning(f'Perm. DELETED {os_f_path} by {[uid]}') try: - if os.path.isfile(xpath): os.remove(xpath) - else: shutil.rmtree(xpath) + if os.path.isfile(os_f_path): os.remove(os_f_path) + else: shutil.rmtree(os_f_path, ignore_errors=True) - return self.send_json({"head": "Success", "body": "PERMANENTLY DELETED " + path + post.refresh}, cookie=cookie) + return self.send_json({"head": "Success", "body": "PERMANENTLY DELETED " + rel_path + post.refresh}, cookie=cookie) except Exception as e: traceback.print_exc() - return self.send_json({"head": "Failed", "body": "" + path + "" + e.__class__.__name__}, cookie=cookie) + return self.send_json({"head": "Failed", "body": "" + rel_path + "" + e.__class__.__name__}, cookie=cookie) @@ -1342,7 +1353,6 @@ def rename_content(self: SH, *args, **kwargs): return self.send_json({"head": "Failed", "body": "Renaming is disabled."}, cookie=cookie) - path = kwargs.get('path') url_path = kwargs.get('url_path') @@ -1355,26 +1365,27 @@ def rename_content(self: SH, *args, **kwargs): # File link to move to recycle bin + filename = form.get_multi_field(verify_name='name', decode=T)[1].strip() - new_name = form.get_multi_field(verify_name='data', decode=T)[1].strip() - path = self.get_rel_path(filename) + rel_path = self.get_rel_path(filename) + new_rel_path = self.get_rel_path(new_name) + if not self.path_safety_check(filename, new_name, rel_path, new_rel_path): + return self.send_json({"head": "Failed", "body": "Invalid Path: " + rel_path}, cookie=cookie) - xpath = self.translate_path(posixpath.join(url_path, filename)) + os_f_path = self.translate_path(posixpath.join(url_path, filename)) + os_new_f_path = self.translate_path(posixpath.join(url_path, new_name)) - - new_path = self.translate_path(posixpath.join(url_path, new_name)) - - self.log_warning(f'Renamed "{xpath}" to "{new_path}" by {[uid]}') + self.log_warning(f'Renamed "{os_f_path}" to "{os_new_f_path}" by {[uid]}') try: - os.rename(xpath, new_path) + os.rename(os_f_path, os_new_f_path) return self.send_json({"head": "Renamed Successfully", "body": post.refresh}, cookie=cookie) except Exception as e: - return self.send_json({"head": "Failed", "body": "" + path + "
" + e.__class__.__name__ + " : " + str(e) }, cookie=cookie) + return self.send_json({"head": "Failed", "body": "" + rel_path + "
" + e.__class__.__name__ + " : " + self.get_web_path(str(e), -1) }, cookie=cookie) @@ -1392,7 +1403,7 @@ def get_info(self: SH, *args, **kwargs): if user.NOPERMISSION: return self.send_json({"head": "Failed", "body": "You have no permission to view."}, cookie=cookie) - path = kwargs.get('path') + os_path = kwargs.get('path') url_path = kwargs.get('url_path') script = None @@ -1405,30 +1416,31 @@ def get_info(self: SH, *args, **kwargs): form = post.form - - - # File link to move to check info + filename = form.get_multi_field(verify_name='name', decode=T)[1].strip() - path = self.get_rel_path(filename) # the relative path of the file or folder + rel_path = self.get_rel_path(filename) + + if not self.path_safety_check(filename, rel_path): + return self.send_json({"head": "Failed", "body": "Invalid Path: " + rel_path}, cookie=cookie) - xpath = self.translate_path(posixpath.join(url_path, filename)) # the absolute path of the file or folder + os_f_path = self.translate_path(posixpath.join(url_path, filename)) - self.log_warning(f'Info Checked "{xpath}" by: {[uid]}') + self.log_warning(f'Info Checked "{os_f_path}" by: {[uid]}') - if not os.path.exists(xpath): + if not os.path.exists(os_f_path): return self.send_json({"head":"Failed", "body":"File/Folder Not Found"}, cookie=cookie) - file_stat = get_stat(xpath) + file_stat = get_stat(os_f_path) if not file_stat: return self.send_json({"head":"Failed", "body":"Permission Denied"}, cookie=cookie) data = [] data.append(["Name", urllib.parse.unquote(filename, errors= 'surrogatepass')]) - if os.path.isfile(xpath): + if os.path.isfile(os_f_path): data.append(["Type","File"]) if "." in filename: data.append(["Extension", filename.rpartition(".")[2]]) @@ -1445,7 +1457,7 @@ def get_info(self: SH, *args, **kwargs): data.append(["Total Size", 'Please Wait']) script = ''' - tools.fetch_json(tools.full_path("''' + path + '''?size_n_count")).then(resp => { + tools.fetch_json(tools.full_path("''' + rel_path + '''?size_n_count")).then(resp => { // console.log(resp); if (resp.status) { size = resp.humanbyte; @@ -1460,7 +1472,7 @@ def get_info(self: SH, *args, **kwargs): }); ''' - data.append(["Path", path]) + data.append(["Path", rel_path]) def get_dt(time): return datetime.datetime.fromtimestamp(time) @@ -1513,7 +1525,7 @@ def new_folder(self: SH, *args, **kwargs): return self.send_json({"head": "Failed", "body": "Permission denied."}, cookie=cookie) - path = kwargs.get('path') + os_path = kwargs.get('path') url_path = kwargs.get('url_path') post = DPD(self) @@ -1524,28 +1536,27 @@ def new_folder(self: SH, *args, **kwargs): filename = form.get_multi_field(verify_name='name', decode=T)[1].strip() - path = self.get_rel_path(filename) + rel_path = self.get_rel_path(filename) - xpath = filename - if xpath.startswith(('../', '..\\', '/../', '\\..\\')) or '/../' in xpath or '\\..\\' in xpath or xpath.endswith(('/..', '\\..')): - return self.send_json({"head": "Failed", "body": "Invalid Path: " + path}, cookie=cookie) + if not self.path_safety_check(filename, rel_path): + return self.send_json({"head": "Failed", "body": "Invalid Path: " + rel_path}, cookie=cookie) - xpath = self.translate_path(posixpath.join(url_path, filename)) + os_f_path = self.translate_path(posixpath.join(url_path, filename)) - self.log_warning(f'New Folder Created "{xpath}" by: {[uid]}') + self.log_warning(f'New Folder Created "{os_f_path}" by: {[uid]}') try: - if os.path.exists(xpath): - return self.send_json({"head": "Failed", "body": "Folder Already Exists: " + path}, cookie=cookie) - if os.path.isfile(xpath): - return self.send_json({"head": "Failed", "body": "File Already Exists: " + path}, cookie=cookie) - os.makedirs(xpath) - return self.send_json({"head": "Success", "body": "New Folder Created: " + path +post.refresh}, cookie=cookie) + if os.path.exists(os_f_path): + return self.send_json({"head": "Failed", "body": "Folder Already Exists: " + rel_path}, cookie=cookie) + if os.path.isfile(os_f_path): + return self.send_json({"head": "Failed", "body": "File Already Exists: " + rel_path}, cookie=cookie) + os.makedirs(os_f_path) + return self.send_json({"head": "Success", "body": "New Folder Created: " + rel_path +post.refresh}, cookie=cookie) except Exception as e: self.log_error(traceback.format_exc()) - return self.send_json({"head": "Failed", "body": f"{ path }
{ e.__class__.__name__ }"}, cookie=cookie) + return self.send_json({"head": "Failed", "body": f"{ rel_path }
{ e.__class__.__name__ }"}, cookie=cookie) diff --git a/dev_src/pyroboxCore.py b/dev_src/pyroboxCore.py index 3ea5df9..0b83254 100644 --- a/dev_src/pyroboxCore.py +++ b/dev_src/pyroboxCore.py @@ -30,7 +30,7 @@ import atexit import os -__version__ = "0.9.3" +__version__ = "0.9.4" enc = "utf-8" __all__ = [ "HTTPServer", "ThreadingHTTPServer", "BaseHTTPRequestHandler", @@ -391,7 +391,7 @@ def URL_MANAGER(url: str): returns a tuple of (`path`, `query_dict`, `fragment`)\n `url` = `'/store?page=10&limit=15&price=ASC#dskjfhs'`\n - `path` = `'/store'`\n + `path` = `'/store'` or `/`\n `query_dict` = `{'page': ['10'], 'limit': ['15'], 'price': ['ASC']}`\n `fragment` = `dskjfhs`\n """ @@ -399,11 +399,18 @@ def URL_MANAGER(url: str): # url = '/store?page=10&limit=15&price#dskjfhs' parse_result = urllib.parse.urlparse(url) + path = parse_result.path + if path == '': + path = '/' + dict_result = Callable_dict(urllib.parse.parse_qs( parse_result.query, keep_blank_values=True)) - return (parse_result.path, dict_result, parse_result.fragment) + return (path, dict_result, parse_result.fragment) +if __name__ == "__main__": + print(URL_MANAGER('https://www.google.com')) + print(URL_MANAGER('https://www.google.com/store?page=10&limit=15&price=ASC#dskjfhs')) class HTTPServer(socketserver.TCPServer): @@ -1431,8 +1438,37 @@ def get_displaypath(self, url_path): return displaypath def get_rel_path(self, filename): - """Return the relative path to the file, FOR OS.""" + """Return the relative path to the file, FOR WEB.""" return urllib.parse.unquote(posixpath.join(self.url_path, filename), errors='surrogatepass') + + def get_web_path(self, path:str, times=1): + """replace current directory with /""" + return path.replace(self.directory, "/", times) + + def path_safety_check(self, paths:Union[str, list], *more_paths:Union[str, list]): + """check if path is safe + paths: list of paths to check""" + if isinstance(paths, str): + paths = [paths] + + if more_paths: + for path in more_paths: + if isinstance(path, str): + paths.append(path) + elif isinstance(path, (list, tuple, set)): + paths += more_paths + else: + raise TypeError(f"Invalid type {type(path)} for path") + + + for path in paths: + if path.startswith(('../', '..\\', '/../', '\\..\\')) or '/../' in path or '\\..\\' in path or path.endswith(('/..', '\\..')): + return False + + return True + + + def translate_path(self, path): """Translate a /-separated PATH to the local filename syntax. diff --git a/run_setup.py b/run_setup.py index d531b11..8204264 100644 --- a/run_setup.py +++ b/run_setup.py @@ -10,7 +10,7 @@ os.system(f"pip uninstall pyrobox -y") os.system(f"pip install ./dist/pyrobox-{version}.tar.gz") -os.system("pyrobox 45454") +# os.system("pyrobox 45454") # post to pypi -# os.system("twine upload dist/*") \ No newline at end of file +os.system("twine upload dist/*") \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index ddc0baa..f067bbe 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = pyrobox -version = 0.9.3 +version = 0.9.4 author = Rasan author_email= wwwqweasd147@gmail.com description = Personal DropBox for Private Network diff --git a/src/pyroboxCore.py b/src/pyroboxCore.py index 2a1b02e..4896a86 100644 --- a/src/pyroboxCore.py +++ b/src/pyroboxCore.py @@ -394,7 +394,7 @@ def URL_MANAGER(url: str): returns a tuple of (`path`, `query_dict`, `fragment`)\n `url` = `'/store?page=10&limit=15&price=ASC#dskjfhs'`\n - `path` = `'/store'`\n + `path` = `'/store'` or `/`\n `query_dict` = `{'page': ['10'], 'limit': ['15'], 'price': ['ASC']}`\n `fragment` = `dskjfhs`\n """ @@ -402,11 +402,18 @@ def URL_MANAGER(url: str): # url = '/store?page=10&limit=15&price#dskjfhs' parse_result = urllib.parse.urlparse(url) + path = parse_result.path + if path == '': + path = '/' + dict_result = Callable_dict(urllib.parse.parse_qs( parse_result.query, keep_blank_values=True)) - return (parse_result.path, dict_result, parse_result.fragment) + return (path, dict_result, parse_result.fragment) +if __name__ == "__main__": + print(URL_MANAGER('https://www.google.com')) + print(URL_MANAGER('https://www.google.com/store?page=10&limit=15&price=ASC#dskjfhs')) class HTTPServer(socketserver.TCPServer): @@ -1434,8 +1441,37 @@ def get_displaypath(self, url_path): return displaypath def get_rel_path(self, filename): - """Return the relative path to the file, FOR OS.""" + """Return the relative path to the file, FOR WEB.""" return urllib.parse.unquote(posixpath.join(self.url_path, filename), errors='surrogatepass') + + def get_web_path(self, path:str, times=1): + """replace current directory with /""" + return path.replace(self.directory, "/", times) + + def path_safety_check(self, paths:Union[str, list], *more_paths:Union[str, list]): + """check if path is safe + paths: list of paths to check""" + if isinstance(paths, str): + paths = [paths] + + if more_paths: + for path in more_paths: + if isinstance(path, str): + paths.append(path) + elif isinstance(path, (list, tuple, set)): + paths += more_paths + else: + raise TypeError(f"Invalid type {type(path)} for path") + + + for path in paths: + if path.startswith(('../', '..\\', '/../', '\\..\\')) or '/../' in path or '\\..\\' in path or path.endswith(('/..', '\\..')): + return False + + return True + + + def translate_path(self, path): """Translate a /-separated PATH to the local filename syntax. diff --git a/src/server.py b/src/server.py index f32e1b5..2bdc433 100644 --- a/src/server.py +++ b/src/server.py @@ -245,7 +245,9 @@ def get_page_type(self: SH, *args, **kwargs): - path = kwargs.get('path', '') + os_path = kwargs.get('path') + url_path = kwargs.get('url_path') + result= "unknown" @@ -264,13 +266,14 @@ def get_page_type(self: SH, *args, **kwargs): elif self.query("czip"): result = "zip" - elif path == "/favicon.ico": + elif url_path == "/favicon.ico": result = "favicon" - elif os.path.isdir(path): + + elif os.path.isdir(os_path): for index in "index.html", "index.htm": - index = os.path.join(path, index) + index = os.path.join(os_path, index) if os.path.exists(index): result = "html" break @@ -281,7 +284,7 @@ def get_page_type(self: SH, *args, **kwargs): else: result = "dir" - elif os.path.isfile(path): + elif os.path.isfile(os_path): result = "file" return self.return_txt(result, cookie=cookie) @@ -502,16 +505,15 @@ def get_size(self: SH, *args, **kwargs): url_path = kwargs.get('url_path', '') + os_path = self.translate_path(url_path) - xpath = self.translate_path(url_path) - - stat = get_stat(xpath) + stat = get_stat(os_path) if not stat: return self.send_json({"status": 0}, cookie=cookie) - if os.path.isfile(xpath): + if os.path.isfile(os_path): size = stat.st_size else: - size = get_dir_size(xpath) + size = get_dir_size(os_path) humanbyte = humanbytes(size) fmbyte = fmbytes(size) @@ -531,16 +533,15 @@ def get_size_n_count(self: SH, *args, **kwargs): url_path = kwargs.get('url_path', '') + os_path = self.translate_path(url_path) - xpath = self.translate_path(url_path) - - stat = get_stat(xpath) + stat = get_stat(os_path) if not stat: return self.send_json({"status": 0}, cookie=cookie) - if os.path.isfile(xpath): + if os.path.isfile(os_path): count, size = 1, stat.st_size else: - count, size = get_tree_count_n_size(xpath) + count, size = get_tree_count_n_size(os_path) humanbyte = humanbytes(size) fmbyte = fmbytes(size) @@ -569,9 +570,8 @@ def get_zip_id(self: SH, *args, **kwargs): return self.return_txt("ERROR: ZIP FEATURE IS UNAVAILABLE !", HTTPStatus.INTERNAL_SERVER_ERROR, cookie=cookie) - path = kwargs.get('path', '') - os_path = self.translate_path(path) - spathsplit = kwargs.get('spathsplit', '') + os_path = kwargs.get('path') + spathsplit = kwargs.get('spathsplit') filename = spathsplit[-2] + ".zip" zid = None @@ -618,6 +618,11 @@ def create_zip(self: SH, *args, **kwargs): return self.return_txt("ERROR: ZIP FEATURE IS UNAVAILABLE !", HTTPStatus.INTERNAL_SERVER_ERROR, cookie=cookie) url_path = kwargs.get('url_path', '') + os_path = self.translate_path(url_path) + + # if not dir or not exists + if not os.path.isdir(os_path): + return self.send_error(HTTPStatus.NOT_FOUND, "Directory not found", cookie=cookie) # dir_size = get_dir_size(path, limit=6*1024*1024*1024) @@ -658,9 +663,8 @@ def get_zip(self: SH, *args, **kwargs): - path = kwargs.get('path', '') - os_path = self.translate_path(path) - spathsplit = kwargs.get('spathsplit', '') + os_path = kwargs.get('path') + spathsplit = kwargs.get('spathsplit') query = self.query msg = False @@ -734,13 +738,14 @@ def send_video_data(self: SH, *args, **kwargs): if not user: # guest or not will be handled in Authentication return self.send_text(pt.login_page(), HTTPStatus.UNAUTHORIZED, cookie=cookie) - path = kwargs.get('path', '') - url_path = kwargs.get('url_path', '') + os_path = kwargs.get('path') + url_path = kwargs.get('url_path') + vid_source = url_path - content_type = self.guess_type(path) + content_type = self.guess_type(os_path) if not content_type.startswith('video/'): self.send_error(HTTPStatus.NOT_FOUND, "THIS IS NOT A VIDEO FILE", cookie=cookie) @@ -776,11 +781,11 @@ def send_video_page(self: SH, *args, **kwargs): return self.send_text(pt.login_page(), HTTPStatus.UNAUTHORIZED, cookie=cookie) - path = kwargs.get('path', '') - url_path = kwargs.get('url_path', '') + os_path = kwargs.get('path') + url_path = kwargs.get('url_path') - vid_source = url_path - content_type = self.guess_type(path) + # vid_source = url_path + content_type = self.guess_type(os_path) if not content_type.startswith('video/'): self.send_error(HTTPStatus.NOT_FOUND, "THIS IS NOT A VIDEO FILE", cookie=cookie) @@ -819,8 +824,8 @@ def send_video_page(self: SH, *args, **kwargs): # return None -# path = kwargs.get('path', '') -# spathsplit = kwargs.get('spathsplit', '') +# path = kwargs.get('path') +# spathsplit = kwargs.get('spathsplit') # path = CoreConfig.ASSETS_dir + "/".join(spathsplit[2:]) # # print("USING ASSETS", path) @@ -927,7 +932,7 @@ def get_folder_data(self: SH, *args, **kwargs): }, cookie=cookie) - path = kwargs.get('path', '') + os_path = kwargs.get('path') if not user.VIEW: return self.send_json({ @@ -938,9 +943,11 @@ def get_folder_data(self: SH, *args, **kwargs): }, cookie=cookie) - is_dir = None try: - is_dir = os.path.isdir(path) + if not os.path.isdir(os_path): + return self.send_json({"status": 0, + "warning": "Folder not found"}, cookie=cookie) + except Exception as e: err = traceback.format_exc() return self.send_json({ @@ -950,11 +957,7 @@ def get_folder_data(self: SH, *args, **kwargs): }) - if is_dir is None: - return self.send_json({"status": 0, - "warning": "Folder not found"}, cookie=cookie) - - data = list_directory(self, path, user, cookie=cookie) + data = list_directory(self, os_path, user, cookie=cookie) if data: return self.send_json(data, cookie=cookie) @@ -973,9 +976,9 @@ def default_get(self: SH, filename=None, *args, **kwargs): return self.redirect("?login") - path = kwargs.get('path', '') + os_path = kwargs.get('path') - if os.path.isdir(path): + if os.path.isdir(os_path): parts = urllib.parse.urlsplit(self.path) if not parts.path.endswith('/'): # redirect browser - doing basically what apache does @@ -988,12 +991,12 @@ def default_get(self: SH, filename=None, *args, **kwargs): self.end_headers() return None for index in "index.html", "index.htm": - index = os.path.join(path, index) + index = os.path.join(os_path, index) if os.path.exists(index): - path = index + os_path = index break else: - return list_directory_html(self, path, user, cookie=cookie) + return list_directory_html(self, os_path, user, cookie=cookie) # check for trailing "/" which should return 404. See Issue17324 # The test for this was added in test_httpserver.py @@ -1001,7 +1004,7 @@ def default_get(self: SH, filename=None, *args, **kwargs): # See discussion on python-dev and Issue34711 regarding # parsing and rejection of filenames with a trailing slash - if path.endswith("/"): + if os_path.endswith("/"): self.send_error(HTTPStatus.NOT_FOUND, "File not found", cookie=cookie) return None @@ -1012,9 +1015,9 @@ def default_get(self: SH, filename=None, *args, **kwargs): if (not user.DOWNLOAD) or user.NOPERMISSION: return self.send_error(HTTPStatus.SERVICE_UNAVAILABLE, "Download is disabled", cookie=cookie) - if not os.path.exists(path): + if not os.path.exists(os_path): return self.send_error(HTTPStatus.NOT_FOUND, "File not found", cookie=cookie) - return self.return_file(path, filename, cookie=cookie) + return self.return_file(os_path, filename, cookie=cookie) @@ -1125,7 +1128,7 @@ def upload(self: SH, *args, **kwargs): return self.send_txt("Upload not allowed", HTTPStatus.SERVICE_UNAVAILABLE, cookie=cookie) - path = kwargs.get('path') + os_path = kwargs.get('path') url_path = kwargs.get('url_path') @@ -1161,14 +1164,16 @@ def upload(self: SH, *args, **kwargs): return self.send_error(HTTPStatus.BAD_REQUEST, "Can't find out file name...", cookie=cookie) - path = self.translate_path(self.path) rltv_path = posixpath.join(url_path, fn) + + if not self.path_safety_check(fn, rltv_path): + return self.send_txt("Invalid Path: " + rltv_path, HTTPStatus.BAD_REQUEST, cookie=cookie) - temp_fn = os.path.join(path, ".LStemp-"+fn +'.tmp') + temp_fn = os.path.join(os_path, ".LStemp-"+fn +'.tmp') CoreConfig.temp_file.add(temp_fn) - fn = os.path.join(path, fn) + os_f_path = os.path.join(os_path, fn) @@ -1195,12 +1200,12 @@ def upload(self: SH, *args, **kwargs): preline = line - while (not user.MODIFY) and os.path.isfile(fn): + while (not user.MODIFY) and os.path.isfile(os_f_path): n = 1 - name, ext = os.path.splitext(fn) + name, ext = os.path.splitext(os_f_path) fn = f"{name}({n}){ext}" n += 1 - os.replace(temp_fn, fn) + os.replace(temp_fn, os_f_path) @@ -1237,7 +1242,6 @@ def del_2_recycle(self: SH, *args, **kwargs): if user.NOPERMISSION or (not user.DELETE): return self.send_json({"head": "Failed", "body": "You have no permission to delete."}, cookie=cookie) - path = kwargs.get('path') url_path = kwargs.get('url_path') @@ -1255,16 +1259,20 @@ def del_2_recycle(self: SH, *args, **kwargs): # File link to move to recycle bin filename = form.get_multi_field(verify_name='name', decode=T)[1].strip() - path = self.get_rel_path(filename) - xpath = self.translate_path(posixpath.join(url_path, filename)) + rel_path = self.get_rel_path(filename) + + if not self.path_safety_check(filename, rel_path): + return self.send_json({"head": "Failed", "body": "Invalid Path: " + rel_path}, cookie=cookie) + + os_f_path = self.translate_path(posixpath.join(url_path, filename)) - self.log_warning(f'<-send2trash-> {xpath} by {[uid]}') + self.log_warning(f'<-send2trash-> {os_f_path} by {[uid]}') head = "Failed" try: if CoreConfig.OS == 'Android': raise InterruptedError - send2trash(xpath) + send2trash(os_f_path) msg = "Successfully Moved To Recycle bin"+ post.refresh head = "Success" except TrashPermissionError: @@ -1273,7 +1281,7 @@ def del_2_recycle(self: SH, *args, **kwargs): msg = "Recycling unavailable! Try deleting permanently..." except Exception as e: traceback.print_exc() - msg = "" + path + " " + e.__class__.__name__ + msg = "" + rel_path + " " + e.__class__.__name__ return self.send_json({"head": head, "body": msg}, cookie=cookie) @@ -1293,7 +1301,6 @@ def del_permanently(self: SH, *args, **kwargs): if user.NOPERMISSION or (not user.DELETE): return self.send_json({"head": "Failed", "body": "Recycling unavailable! Try deleting permanently..."}, cookie=cookie) - path = kwargs.get('path') url_path = kwargs.get('url_path') @@ -1307,23 +1314,27 @@ def del_permanently(self: SH, *args, **kwargs): # File link to move to recycle bin filename = form.get_multi_field(verify_name='name', decode=T)[1].strip() - path = self.get_rel_path(filename) - xpath = self.translate_path(posixpath.join(url_path, filename)) + rel_path = self.get_rel_path(filename) + + if not self.path_safety_check(filename, rel_path): + return self.send_json({"head": "Failed", "body": "Invalid Path: " + rel_path}, cookie=cookie) - self.log_warning(f'Perm. DELETED {xpath} by {[uid]}') + os_f_path = self.translate_path(posixpath.join(url_path, filename)) + + self.log_warning(f'Perm. DELETED {os_f_path} by {[uid]}') try: - if os.path.isfile(xpath): os.remove(xpath) - else: shutil.rmtree(xpath) + if os.path.isfile(os_f_path): os.remove(os_f_path) + else: shutil.rmtree(os_f_path, ignore_errors=True) - return self.send_json({"head": "Success", "body": "PERMANENTLY DELETED " + path + post.refresh}, cookie=cookie) + return self.send_json({"head": "Success", "body": "PERMANENTLY DELETED " + rel_path + post.refresh}, cookie=cookie) except Exception as e: traceback.print_exc() - return self.send_json({"head": "Failed", "body": "" + path + "" + e.__class__.__name__}, cookie=cookie) + return self.send_json({"head": "Failed", "body": "" + rel_path + "" + e.__class__.__name__}, cookie=cookie) @@ -1342,7 +1353,6 @@ def rename_content(self: SH, *args, **kwargs): return self.send_json({"head": "Failed", "body": "Renaming is disabled."}, cookie=cookie) - path = kwargs.get('path') url_path = kwargs.get('url_path') @@ -1355,26 +1365,31 @@ def rename_content(self: SH, *args, **kwargs): # File link to move to recycle bin + filename = form.get_multi_field(verify_name='name', decode=T)[1].strip() - new_name = form.get_multi_field(verify_name='data', decode=T)[1].strip() - path = self.get_rel_path(filename) + rel_path = self.get_rel_path(filename) + new_rel_path = self.get_rel_path(new_name) + if not self.path_safety_check(filename, new_name, rel_path, new_rel_path): + return self.send_json({"head": "Failed", "body": "Invalid Path: " + rel_path}, cookie=cookie) - xpath = self.translate_path(posixpath.join(url_path, filename)) + os_f_path = self.translate_path(posixpath.join(url_path, filename)) + os_new_f_path = self.translate_path(posixpath.join(url_path, new_name)) + + filename = form.get_multi_field(verify_name='name', decode=T)[1].strip() - new_path = self.translate_path(posixpath.join(url_path, new_name)) - self.log_warning(f'Renamed "{xpath}" to "{new_path}" by {[uid]}') + self.log_warning(f'Renamed "{os_f_path}" to "{os_new_f_path}" by {[uid]}') try: - os.rename(xpath, new_path) + os.rename(os_f_path, os_new_f_path) return self.send_json({"head": "Renamed Successfully", "body": post.refresh}, cookie=cookie) except Exception as e: - return self.send_json({"head": "Failed", "body": "" + path + "
" + e.__class__.__name__ + " : " + str(e) }, cookie=cookie) + return self.send_json({"head": "Failed", "body": "" + rel_path + "
" + e.__class__.__name__ + " : " + self.get_web_path(str(e), -1) }, cookie=cookie) @@ -1392,7 +1407,7 @@ def get_info(self: SH, *args, **kwargs): if user.NOPERMISSION: return self.send_json({"head": "Failed", "body": "You have no permission to view."}, cookie=cookie) - path = kwargs.get('path') + os_path = kwargs.get('path') url_path = kwargs.get('url_path') script = None @@ -1405,30 +1420,31 @@ def get_info(self: SH, *args, **kwargs): form = post.form - - - # File link to move to check info + filename = form.get_multi_field(verify_name='name', decode=T)[1].strip() - path = self.get_rel_path(filename) # the relative path of the file or folder + rel_path = self.get_rel_path(filename) + + if not self.path_safety_check(filename, rel_path): + return self.send_json({"head": "Failed", "body": "Invalid Path: " + rel_path}, cookie=cookie) - xpath = self.translate_path(posixpath.join(url_path, filename)) # the absolute path of the file or folder + os_f_path = self.translate_path(posixpath.join(url_path, filename)) - self.log_warning(f'Info Checked "{xpath}" by: {[uid]}') + self.log_warning(f'Info Checked "{os_f_path}" by: {[uid]}') - if not os.path.exists(xpath): + if not os.path.exists(os_f_path): return self.send_json({"head":"Failed", "body":"File/Folder Not Found"}, cookie=cookie) - file_stat = get_stat(xpath) + file_stat = get_stat(os_f_path) if not file_stat: return self.send_json({"head":"Failed", "body":"Permission Denied"}, cookie=cookie) data = [] data.append(["Name", urllib.parse.unquote(filename, errors= 'surrogatepass')]) - if os.path.isfile(xpath): + if os.path.isfile(os_f_path): data.append(["Type","File"]) if "." in filename: data.append(["Extension", filename.rpartition(".")[2]]) @@ -1445,7 +1461,7 @@ def get_info(self: SH, *args, **kwargs): data.append(["Total Size", 'Please Wait']) script = ''' - tools.fetch_json(tools.full_path("''' + path + '''?size_n_count")).then(resp => { + tools.fetch_json(tools.full_path("''' + rel_path + '''?size_n_count")).then(resp => { // console.log(resp); if (resp.status) { size = resp.humanbyte; @@ -1460,7 +1476,7 @@ def get_info(self: SH, *args, **kwargs): }); ''' - data.append(["Path", path]) + data.append(["Path", rel_path]) def get_dt(time): return datetime.datetime.fromtimestamp(time) @@ -1513,7 +1529,7 @@ def new_folder(self: SH, *args, **kwargs): return self.send_json({"head": "Failed", "body": "Permission denied."}, cookie=cookie) - path = kwargs.get('path') + os_path = kwargs.get('path') url_path = kwargs.get('url_path') post = DPD(self) @@ -1524,28 +1540,27 @@ def new_folder(self: SH, *args, **kwargs): filename = form.get_multi_field(verify_name='name', decode=T)[1].strip() - path = self.get_rel_path(filename) + rel_path = self.get_rel_path(filename) - xpath = filename - if xpath.startswith(('../', '..\\', '/../', '\\..\\')) or '/../' in xpath or '\\..\\' in xpath or xpath.endswith(('/..', '\\..')): - return self.send_json({"head": "Failed", "body": "Invalid Path: " + path}, cookie=cookie) + if not self.path_safety_check(filename, rel_path): + return self.send_json({"head": "Failed", "body": "Invalid Path: " + rel_path}, cookie=cookie) - xpath = self.translate_path(posixpath.join(url_path, filename)) + os_f_path = self.translate_path(posixpath.join(url_path, filename)) - self.log_warning(f'New Folder Created "{xpath}" by: {[uid]}') + self.log_warning(f'New Folder Created "{os_f_path}" by: {[uid]}') try: - if os.path.exists(xpath): - return self.send_json({"head": "Failed", "body": "Folder Already Exists: " + path}, cookie=cookie) - if os.path.isfile(xpath): - return self.send_json({"head": "Failed", "body": "File Already Exists: " + path}, cookie=cookie) - os.makedirs(xpath) - return self.send_json({"head": "Success", "body": "New Folder Created: " + path +post.refresh}, cookie=cookie) + if os.path.exists(os_f_path): + return self.send_json({"head": "Failed", "body": "Folder Already Exists: " + rel_path}, cookie=cookie) + if os.path.isfile(os_f_path): + return self.send_json({"head": "Failed", "body": "File Already Exists: " + rel_path}, cookie=cookie) + os.makedirs(os_f_path) + return self.send_json({"head": "Success", "body": "New Folder Created: " + rel_path +post.refresh}, cookie=cookie) except Exception as e: self.log_error(traceback.format_exc()) - return self.send_json({"head": "Failed", "body": f"{ path }
{ e.__class__.__name__ }"}, cookie=cookie) + return self.send_json({"head": "Failed", "body": f"{ rel_path }
{ e.__class__.__name__ }"}, cookie=cookie)