Skip to content

Commit

Permalink
Merge pull request #47 from CSCfi/feature/in-memory-temp-key
Browse files Browse the repository at this point in the history
Feature/in memory temp key
  • Loading branch information
teemukataja authored Dec 22, 2023
2 parents 6546461 + c27d0cd commit 2b9e896
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 96 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ dist/
*.pub
*.spec
sda_uploader.egg-info/

# development files
venv/
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
crypt4gh==1.6
paramiko==3.3.1
paramiko==3.4.0
pynacl==1.5.0
45 changes: 4 additions & 41 deletions sda_uploader/cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""SDA Uploader CLI."""

import sys
import secrets
import getpass
import argparse

Expand All @@ -10,7 +9,8 @@

from typing import Optional, Sequence, Tuple, Union

from crypt4gh.keys import c4gh, get_private_key, get_public_key
from crypt4gh.keys import get_private_key, get_public_key
from nacl.public import PrivateKey

from .sftp import _sftp_connection, _sftp_upload_file, _sftp_upload_directory, _sftp_client
from . import __version__
Expand All @@ -21,43 +21,13 @@ def mock_callback(password: str) -> str:
return password


def _remove_file(filepath: str) -> None:
"""Remove temp files."""
try:
Path(filepath).unlink()
print(f"Removed temp file {filepath}")
except FileNotFoundError:
print(f"Deletion of file {filepath} failed")
pass
except PermissionError:
print(f"No permission to delete {filepath}. Please do manual cleanup.")
pass
except Exception:
print(f"Unexpected {Exception}, {type(Exception)}")
pass


def generate_one_time_key() -> Tuple:
"""Generate one time Crypt4GH encryption key."""
random_password = secrets.token_hex(16)
private_key_file = f"{getpass.getuser()}_temporary_crypt4gh.key"
public_key_file = f"{getpass.getuser()}_temporary_crypt4gh.pub"
# Remove existing temp keys if they exist
_remove_file(private_key_file)
_remove_file(public_key_file)

c4gh.generate(private_key_file, public_key_file, passphrase=str.encode(random_password))
print("One-time use encryption key generated.")
return private_key_file, public_key_file, random_password


def load_encryption_keys(
private_key_file: Union[str, Path] = "",
private_key_password: Optional[str] = None,
public_key_file: Union[str, Path] = "",
) -> Tuple:
"""Load encryption keys."""
private_key = ""
private_key = b""
if private_key_file:
# If using user's own crypt4gh private key
try:
Expand All @@ -66,14 +36,7 @@ def load_encryption_keys(
sys.exit(f"Incorrect password for {private_key_file}")
else:
# If using generated one-time encryption key
temp_private_key, temp_public_key, temp_private_key_password = generate_one_time_key()
try:
private_key = get_private_key(temp_private_key, partial(mock_callback, temp_private_key_password))
_remove_file(temp_private_key)
_remove_file(temp_public_key)

except Exception:
sys.exit(f"Incorrect password for {private_key}. This is likely a bug.") # generated password, should not fail
private_key = bytes(PrivateKey.generate())
public_key = get_public_key(public_key_file)
return private_key, public_key

Expand Down
2 changes: 1 addition & 1 deletion sda_uploader/encrypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pathlib import Path


def encrypt_file(file: Union[str, Path] = "", private_key_file: Union[str, Path] = "", recipient_public_key: Union[str, Path] = "") -> None:
def encrypt_file(file: Union[str, Path] = "", private_key_file: Union[bytes, Path] = b"", recipient_public_key: Union[str, Path] = "") -> None:
"""Encrypt a file with Crypt4GH."""
print(f"Encrypting {file} as {file}.c4gh")
original_file = open(file, "rb")
Expand Down
58 changes: 7 additions & 51 deletions sda_uploader/gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
import os
import sys
import json
import getpass
import secrets
import tkinter as tk
from typing import Dict, Tuple, Union
from typing import Dict, Union
import paramiko

from tkinter.simpledialog import askstring
Expand All @@ -17,7 +15,8 @@
from os import chmod
from stat import S_IRWXU

from crypt4gh.keys import c4gh, get_private_key, get_public_key
from crypt4gh.keys import get_public_key
from nacl.public import PrivateKey

from .sftp import _sftp_connection, _sftp_upload_file, _sftp_upload_directory, _sftp_client
from pathlib import Path
Expand Down Expand Up @@ -195,12 +194,7 @@ def open_file(self, action: str) -> None:
else:
print(f"Unknown action: {action}")

def _do_upload(self, private_key: str, password: str) -> None:
try:
private_key = get_private_key(private_key, partial(self.mock_callback, password))
except Exception:
print("Incorrect private key passphrase")
return
def _do_upload(self, private_key: bytes) -> None:
# Ask for RSA key password
sftp_password = self.passwords["sftp_key"]
while len(sftp_password) == 0:
Expand Down Expand Up @@ -245,50 +239,12 @@ def _do_upload(self, private_key: str, password: str) -> None:
def _start_process(self) -> None:
if self.their_key_value.get() and self.file_value.get() and self.sftp_username_value.get() and self.sftp_server_value.get():
# Generate random encryption key
temp_private_key, temp_public_key, temp_password = self._generate_one_time_key()
temp_private_key = bytes(PrivateKey.generate())
# Encrypt and upload
self._do_upload(temp_private_key, temp_password)
# Remove temp keys
self._remove_file(temp_private_key)
self._remove_file(temp_public_key)
self._do_upload(temp_private_key)
else:
print("All fields must be filled")

def _remove_file(self, filepath: str) -> None:
"""Remove temp files."""
try:
Path(filepath).unlink()
print(f"Removed temp file {filepath}")
except FileNotFoundError:
print(f"Deletion of file {filepath} failed")
pass
except PermissionError:
print(f"No permission to delete {filepath}. Please do manual cleanup.")
pass
except Exception:
print(f"Unexpected {Exception}, {type(Exception)}")
pass

def _generate_one_time_key(self) -> Tuple:
"""Generate one time Crypt4GH encryption key."""
random_password = secrets.token_hex(16)
private_key_file = f"{getpass.getuser()}_temporary_crypt4gh.key"
public_key_file = f"{getpass.getuser()}_temporary_crypt4gh.pub"
# Remove existing temp keys if they exist
self._remove_file(private_key_file)
self._remove_file(public_key_file)
try:
c4gh.generate(private_key_file, public_key_file, passphrase=str.encode(random_password))
print("One-time use encryption key generated")
except PermissionError:
print("A previous generated key exists under the name private_key_file already exists remove it and try again.")

return private_key_file, public_key_file, random_password

def mock_callback(self, password: str) -> str:
"""Mock callback to return password."""
return password

def write_config(self) -> None:
"""Save field values for re-runs."""
data = {
Expand Down Expand Up @@ -322,7 +278,7 @@ def sftp_upload(
self,
sftp: paramiko.SFTPClient,
target: Union[str, Path] = "",
private_key: Union[str, Path] = "",
private_key: Union[bytes, Path] = b"",
public_key: Union[str, Path] = "",
) -> None:
"""Upload file or directory."""
Expand Down
4 changes: 2 additions & 2 deletions sda_uploader/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _sftp_upload_file(
sftp: paramiko.SFTPClient,
source: Union[str, Path] = "",
destination: Union[str, Path] = "",
private_key: Union[str, Path] = "",
private_key: Union[bytes, Path] = b"",
public_key: Union[str, Path] = "",
) -> None:
"""Upload a single file."""
Expand All @@ -86,7 +86,7 @@ def _sftp_upload_file(
def _sftp_upload_directory(
sftp: paramiko.SFTPClient,
directory: Union[str, Path] = "",
private_key: Union[str, Path] = "",
private_key: Union[bytes, Path] = b"",
public_key: Union[str, Path] = "",
) -> None:
"""Upload directory."""
Expand Down

0 comments on commit 2b9e896

Please sign in to comment.