Skip to content

Commit

Permalink
Merge pull request #173 from olcf/ngin_interpreter_refresh
Browse files Browse the repository at this point in the history
interpreter reload
  • Loading branch information
Noah Ginsburg authored May 6, 2020
2 parents f3b6597 + 68b3694 commit abda04b
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 205 deletions.
170 changes: 32 additions & 138 deletions libpkpass/commands/command.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
"""This module is a generic for all pkpass commands"""
import sys
import getpass
import json
import os
import yaml
import libpkpass.util as util
from sys import stdout
from getpass import getpass
from os import getcwd, path, sep, remove, rename
from libpkpass.commands.arguments import ARGUMENTS as arguments
from libpkpass.password import PasswordEntry
from libpkpass.identities import IdentityDB
from libpkpass.crypto import print_card_info
from libpkpass.errors import NullRecipientError, CliArgumentError, FileOpenError, GroupDefinitionError,\
PasswordIOError, JsonArgumentError, NotThePasswordOwnerError, ConfigParseError
from libpkpass.errors import NullRecipientError, CliArgumentError, GroupDefinitionError,\
PasswordIOError, NotThePasswordOwnerError
from libpkpass.identities import IdentityDB
from libpkpass.password import PasswordEntry
from libpkpass.util import collect_args, color_prepare

##########################################################################
class Command():
Expand All @@ -29,27 +27,7 @@ def __init__(self, cli, iddb=None):
##################################################################
self.cli = cli
#default certpath to none because connect string is allowed
self.args = {
'ignore_decrypt': False,
'identity': getpass.getuser(),
'cabundle': './certs/ca-bundle',
'keypath': './private',
'pwstore': './passwords',
'time': 10,
'card_slot': None,
'certpath': None,
'escrow_users': None,
'min_escrow': None,
'no_cache': False,
'noverify': None,
'noescrow': False,
'overwrite': False,
'recovery': False,
'rules': 'default',
'theme_map': None,
'color': True,
'verbosity': 0,
}
self.args = {}
self.recipient_list = []
self.escrow_and_recipient_list = []
self.iddbcached = iddb is not None
Expand All @@ -72,67 +50,13 @@ def run(self, parsedargs):
self._run_command_setup(parsedargs)
self._run_command_execution()

##################################################################
def _convert_strings_to_list(self, argname):
""" convert argparsed strings to lists for an argument """
##################################################################
if argname in self.args:
self.args[argname] = self.args[argname].split(",") if self.args[argname] else []
self.args[argname] = [arg.strip() for arg in self.args[argname] if arg.strip()]
elif isinstance(argname, str):
argname = argname.split(",") if argname else []
return [arg.strip() for arg in argname if arg.strip()]
return None

##################################################################
def _handle_boolean_args(self, argname):
##################################################################
if isinstance(self.args[argname], str):
return self.args[argname].upper() == 'TRUE'
return self.args[argname]

##################################################################
def _handle_filepath_args(self):
"""Does filepath expansion for config args"""
##################################################################
file_path_args = ['cabundle', 'pwstore', 'certpath', 'keypath']
for arg in file_path_args:
if arg in self.args and self.args[arg]:
self.args[arg] = os.path.expanduser(self.args[arg])
if 'connect' in self.args and self.args['connect'] and \
'base_directory' in self.args['connect'] and self.args['connect']['base_directory']:
self.args['connect']['base_directory'] = os.path.expanduser(self.args['connect']['base_directory'])

##################################################################
def _run_command_setup(self, parsedargs):
""" Passes the argparse Namespace object of parsed arguments """
##################################################################

# Build a dict out of the argparse args Namespace object and a dict from any
# configuration files and merge the two with cli taking priority
cli_args = vars(parsedargs)

config_args = self._get_config_args(cli_args['config'], cli_args)
self.args.update(config_args)

connectmap = self._parse_json_arguments('connect')
self._handle_filepath_args()

fles = ['cabundle', 'pwstore']
for key, value in cli_args.items():
if value is not None or key not in self.args:
self.args[key] = value
if key in fles and not os.path.exists(self.args[key]):
raise FileOpenError(self.args[key], "No such file or directory")

# json args
self.args['color'] = self._handle_boolean_args('color')
self._convert_strings_to_list('groups')
self._convert_strings_to_list('users')
self._convert_strings_to_list('escrow_users')
self.args = collect_args(parsedargs)
self._validate_combinatorial_args()
self._validate_args()

verify_on_load = self.args['subparser_name'] in ['listrecipients', 'import', 'interpreter']

# Build the list of recipients that this command will act on
Expand All @@ -144,10 +68,11 @@ def _run_command_setup(self, parsedargs):
self.identities.load_certs_from_directory(
self.args['certpath'],
verify_on_load=verify_on_load,
connectmap=connectmap,
connectmap=self.args['connect'],
nocache=self.args['no_cache']
)
self.identities.load_keys_from_directory(self.args['keypath'])
if self.args['keypath']:
self.identities.load_keys_from_directory(self.args['keypath'])
self._validate_identities()

if 'pwname' in self.args and self.args['pwname']:
Expand All @@ -160,25 +85,25 @@ def _run_command_setup(self, parsedargs):
self.args['verbosity'],
self.args['color'],
self.args['theme_map'])
self.passphrase = getpass.getpass("Enter Pin/Passphrase: ")
self.passphrase = getpass("Enter Pin/Passphrase: ")

####################################################################
def _resolve_directory_path(self):
"""This handles how a user inputs the pwname, this tries to be smart
good luck everybody else"""
####################################################################
pwd = os.getcwd()
pwd_pwname = os.path.normpath(os.path.join(pwd, self.args['pwname']))
pwd = getcwd()
pwd_pwname = path.normpath(path.join(pwd, self.args['pwname']))
if self.args['pwstore'] in pwd_pwname:
self.args['pwname'] = pwd_pwname.replace(self.args['pwstore'] + os.sep, '')
self.args['pwname'] = pwd_pwname.replace(self.args['pwstore'] + sep, '')

##################################################################
def safety_check(self):
""" This provides a sanity check that you are the owner of a password."""
##################################################################
try:
password = PasswordEntry()
password.read_password_data(os.path.join(self.args['pwstore'], self.args['pwname']))
password.read_password_data(path.join(self.args['pwstore'], self.args['pwname']))
return (self.args['identity'] in password['recipients'].keys(), password['metadata']['creator'])
except PasswordIOError:
return (True, None)
Expand All @@ -188,7 +113,7 @@ def update_pass(self, pass_value):
"""Fully updated a password record"""
##################################################################
pass_entry = PasswordEntry()
pass_entry.read_password_data(os.path.join(self.args['pwstore'], self.args['pwname']))
pass_entry.read_password_data(path.join(self.args['pwstore'], self.args['pwname']))
swap_pass = PasswordEntry()
swap_pass.add_recipients(secret=pass_value,
distributor=self.args['identity'],
Expand All @@ -199,7 +124,7 @@ def update_pass(self, pass_value):
pwstore=self.args['pwstore']
)
pass_entry['recipients'][self.args['identity']] = swap_pass['recipients'][self.args['identity']]
pass_entry.write_password_data(os.path.join(self.args['pwstore'], self.args['pwname']),
pass_entry.write_password_data(path.join(self.args['pwstore'], self.args['pwname']),
overwrite=self.args['overwrite'])

##################################################################
Expand Down Expand Up @@ -230,7 +155,7 @@ def create_pass(self, password1, description, authorizer, recipient_list=None):
pwstore=self.args['pwstore']
)

password.write_password_data(os.path.join(self.args['pwstore'], self.args['pwname']),
password.write_password_data(path.join(self.args['pwstore'], self.args['pwname']),
overwrite=self.args['overwrite'])

##################################################################
Expand All @@ -250,20 +175,20 @@ def create_or_update_pass(self, password1, description, authorizer, recipient_li
def delete_pass(self):
"""This deletes a password that the user has created, useful for testing"""
##################################################################
filepath = os.path.join(self.args['pwstore'], self.args['pwname'])
filepath = path.join(self.args['pwstore'], self.args['pwname'])
try:
os.remove(filepath)
remove(filepath)
except OSError:
raise PasswordIOError("Password '%s' not found" % self.args['pwname'])

##################################################################
def rename_pass(self):
"""This renames a password that the user has created"""
##################################################################
oldpath = os.path.join(self.args['pwstore'], self.args['pwname'])
newpath = os.path.join(self.args['pwstore'], self.args['rename'])
oldpath = path.join(self.args['pwstore'], self.args['pwname'])
newpath = path.join(self.args['pwstore'], self.args['rename'])
try:
os.rename(oldpath, newpath)
rename(oldpath, newpath)
password = PasswordEntry()
password.read_password_data(newpath)
password['metadata']['name'] = self.args['rename']
Expand Down Expand Up @@ -307,23 +232,6 @@ def _parse_group_membership(self):
except KeyError as err:
raise GroupDefinitionError(str(err))

##################################################################
def _get_config_args(self, config, cli_args):
"""Return the configuration from the config file"""
##################################################################
try:
with open(config, 'r') as fname:
config_args = yaml.safe_load(fname)
if config_args is None:
config_args = {}
return config_args
except IOError:
if cli_args['verbosity'] != -1:
print("INFO: No .pkpassrc file found")
return {}
except yaml.parser.ParserError:
raise ConfigParseError("Parsing error with config file, please check syntax")

##################################################################
def _validate_args(self):
##################################################################
Expand All @@ -341,7 +249,7 @@ def _validate_combinatorial_args(self):
# we want a multi-dim of lists, this way if more combinations come up
# that would be required in a 1 or more capacity, we just add
# a list to this list
args_list = [['certpath', 'connect'], ['certpath', 'keypath']]
args_list = [['certpath', 'connect']]
for arg_set in args_list:
valid = False
for arg in arg_set:
Expand All @@ -352,19 +260,6 @@ def _validate_combinatorial_args(self):
raise CliArgumentError(
"'%s' or '%s' is required" % tuple(arg_set))

##################################################################
def _parse_json_arguments(self, argument):
""" Parses the json.loads arguments as dictionaries to use"""
##################################################################
try:
if argument in self.args and self.args[argument]:
if isinstance(self.args[argument], dict):
return self.args[argument]
return json.loads(self.args[argument])
return None
except ValueError as err:
raise JsonArgumentError(argument, err)

##################################################################
def _validate_identities(self, swap_list=None):
"""Ensure identities meet criteria for processing"""
Expand Down Expand Up @@ -395,10 +290,9 @@ def _print_debug(self):
def color_print(self, string, color_type):
"""Handle the color printing for objects"""
##################################################################
return util.color_prepare(string,
color_type,
self.args['color'],
self.args['theme_map'])
return color_prepare(string, color_type,
self.args['color'],
self.args['theme_map'])

##################################################################
def progress_bar(self, value, endvalue, bar_length=50):
Expand All @@ -407,5 +301,5 @@ def progress_bar(self, value, endvalue, bar_length=50):
percent = float(value) / endvalue
arrow = '-' * int(round(percent * bar_length)-1) + '>'
spaces = ' ' * (bar_length - len(arrow))
sys.stdout.write("\rPercent: [{0}] {1}%".format(arrow + spaces, int(round(percent * 100))))
sys.stdout.flush()
stdout.write("\rPercent: [{0}] {1}%".format(arrow + spaces, int(round(percent * 100))))
stdout.flush()
3 changes: 2 additions & 1 deletion libpkpass/commands/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import exrex
from libpkpass.commands.command import Command
from libpkpass.errors import CliArgumentError, NotThePasswordOwnerError, RulesMapError
from libpkpass.util import parse_json_arguments

####################################################################
class Generate(Command):
Expand Down Expand Up @@ -45,7 +46,7 @@ def _generate_pass(self):
#######################################################################
def _validate_args(self):
#######################################################################
self.args['rules_map'] = self._parse_json_arguments('rules_map')
self.args['rules_map'] = parse_json_arguments(self.args, 'rules_map')
for argument in ['pwname', 'keypath', 'rules_map']:
if argument not in self.args or self.args[argument] is None:
raise CliArgumentError(
Expand Down
Loading

0 comments on commit abda04b

Please sign in to comment.