diff --git a/roundup/cgi/client.py b/roundup/cgi/client.py index 739f5e7b..71fb404c 100644 --- a/roundup/cgi/client.py +++ b/roundup/cgi/client.py @@ -442,11 +442,11 @@ def __init__(self, instance, request, env, form=None, translator=None): # will never arrive. # If not defined, set CONTENT_LENGTH to 0 so it doesn't # hang reading the data. - if self.env['REQUEST_METHOD'] in ['OPTIONS', 'DELETE', 'PATCH']: - if 'CONTENT_LENGTH' not in self.env: - self.env['CONTENT_LENGTH'] = 0 - logger.debug("Setting CONTENT_LENGTH to 0 for method: %s", - self.env['REQUEST_METHOD']) + if self.env['REQUEST_METHOD'] in ['OPTIONS', 'DELETE', 'PATCH'] \ + and 'CONTENT_LENGTH' not in self.env: + self.env['CONTENT_LENGTH'] = 0 + logger.debug("Setting CONTENT_LENGTH to 0 for method: %s", + self.env['REQUEST_METHOD']) # cgi.FieldStorage must save all data as # binary/bytes. Subclass BinaryFieldStorage does this. @@ -496,8 +496,7 @@ def __init__(self, instance, request, env, form=None, translator=None): def _gen_nonce(self): """ generate a unique nonce """ - n = b2s(base64.b32encode(random_.token_bytes(40))) - return n + return b2s(base64.b32encode(random_.token_bytes(40))) def setTranslator(self, translator=None): """Replace the translation engine @@ -694,7 +693,8 @@ def handle_rest(self): if (self.is_cors_preflight()): self.handle_preflight() return - elif not self.db.security.hasPermission('Rest Access', self.userid): + + if not self.db.security.hasPermission('Rest Access', self.userid): output = s2b('{ "error": { "status": 403, "msg": "Forbidden." } }') self.reject_request(output, message_type="application/json", @@ -811,14 +811,11 @@ def inner_main(self): self.form_wins = True self._error_message = msg.args - if csrf_ok: - # csrf checks pass. Run actions etc. - # possibly handle a form submit action (may change - # self.classname and self.template, and may also - # append error/ok_messages) - html = self.handle_action() - else: - html = None + # If csrf checks pass. Run actions etc. + # handle_action() may handle a form submit action. + # It can change self.classname and self.template, + # and may also append error/ok_messages. + html = self.handle_action() if csrf_ok else None if html: self.write_html(html) @@ -883,7 +880,7 @@ def inner_main(self): self.response_code = http_.client.UNAUTHORIZED realm = self.instance.config.TRACKER_NAME self.setHeader("WWW-Authenticate", - "Basic realm=\"%s\"" % realm) + 'Basic realm="%s"' % realm) else: self.response_code = http_.client.FORBIDDEN self.renderFrontPage(str(message)) @@ -1041,10 +1038,7 @@ def determine_charset(self): def _decode_charref(matchobj): num = matchobj.group(1) - if num[0].lower() == 'x': - uc = int(num[1:], 16) - else: - uc = int(num) + uc = int(num[1:], 16) if num[0].lower() == 'x' else int(num) return uchr(uc) for field_name in self.form: @@ -1235,7 +1229,7 @@ def determine_user(self, is_api=False): # will be used later to override the get_roles method # having it defined as truthy allows it to be used. - override_get_roles = lambda self: iter_roles( # noqa: E731 + override_get_roles = lambda self: iter_roles( # noqa: ARG005 ','.join(token['roles'])) # if user was not set by http authorization, try session lookup @@ -1291,6 +1285,8 @@ def check_anonymous_access(self): raise SeriousError( self._('broken form: multiple @action values submitted')) elif action != '': + # '' is value when no action parameter was found so run + # this to extract action string value when action found. action = action.value.lower() if action in ('login', 'register'): return @@ -1302,8 +1298,8 @@ def check_anonymous_access(self): return # otherwise for everything else - if self.user == 'anonymous': - if not self.db.security.hasPermission('Web Access', self.userid): + if self.user == 'anonymous' and \ + not self.db.security.hasPermission('Web Access', self.userid): raise Unauthorised(self._("Anonymous users are not " "allowed to use the web interface")) @@ -1326,10 +1322,7 @@ def is_origin_header_ok(self, api=False, credentials=False): try: origin = self.env['HTTP_ORIGIN'] except KeyError: - if self.env['REQUEST_METHOD'] == 'GET': - return True - else: - return False + return self.env['REQUEST_METHOD'] == 'GET' # note base https://host/... ends host with with a /, # so add it to origin. @@ -1478,7 +1471,7 @@ def handle_csrf(self, api=False): "ORIGIN", "REFERER", "X-FORWARDED-HOST", - "HOST" + "HOST", ] header_pass = 0 # count of passing header checks @@ -1581,21 +1574,20 @@ def handle_csrf(self, api=False): raise UsageError(self._("Unable to verify sufficient headers")) enforce = config['WEB_CSRF_ENFORCE_HEADER_X-REQUESTED-WITH'] - if api: - if enforce in ['required', 'yes']: - # if we get here we have usually passed at least one - # header check. We check for presence of this custom - # header for xmlrpc/rest calls only. - # E.G. X-Requested-With: XMLHttpRequest - # Note we do not use CSRF nonces for xmlrpc/rest requests. - # - # see: https://www.owasp.org/index.php/Cross-Site_Request_Forgery_(CSRF)_Prevention_Cheat_Sheet#Protecting_REST_Services:_Use_of_Custom_Request_Headers - if 'HTTP_X_REQUESTED_WITH' not in self.env: - logger.error(self._( - ''"csrf X-REQUESTED-WITH xmlrpc required header " - ''"check failed for user%s."), - current_user) - raise UsageError(self._("Required Header Missing")) + if api and enforce in ['required', 'yes']: + # if we get here we have usually passed at least one + # header check. We check for presence of this custom + # header for xmlrpc/rest calls only. + # E.G. X-Requested-With: XMLHttpRequest + # Note we do not use CSRF nonces for xmlrpc/rest requests. + # + # see: https://www.owasp.org/index.php/Cross-Site_Request_Forgery_(CSRF)_Prevention_Cheat_Sheet#Protecting_REST_Services:_Use_of_Custom_Request_Headers + if 'HTTP_X_REQUESTED_WITH' not in self.env: + logger.error(self._( + ''"csrf X-REQUESTED-WITH xmlrpc required header " + ''"check failed for user%s."), + current_user) + raise UsageError(self._("Required Header Missing")) # Expire old csrf tokens now so we don't use them. These will # be committed after the otks.destroy below. Note that the @@ -1703,17 +1695,16 @@ def opendb(self, username): if not hasattr(self, 'db'): self.db = self.instance.open(username) self.db.tx_Source = "web" - else: - if self.instance.optimize: + elif self.instance.optimize: self.db.setCurrentUser(username) self.db.tx_Source = "web" - else: - self.db.close() - self.db = self.instance.open(username) - self.db.tx_Source = "web" - # The old session API refers to the closed database; - # we can no longer use it. - self.session_api = Session(self) + else: + self.db.close() + self.db = self.instance.open(username) + self.db.tx_Source = "web" + # The old session API refers to the closed database; + # we can no longer use it. + self.session_api = Session(self) # match designator in URL stripping leading 0's. So: # https://issues.roundup-tracker.org/issue002551190 is the same as @@ -1793,7 +1784,7 @@ def determine_context(self, dre=dre_url): else: self.template = '' return - elif path[0] in ('_file', '@@file'): + if path[0] in ('_file', '@@file'): raise SendStaticFile(os.path.join(*path[1:])) else: self.classname = path[0] @@ -1973,10 +1964,7 @@ def serve_static_file(self, file): file = str(file) mime_type = mimetypes.guess_type(file)[0] if not mime_type: - if file.endswith('.css'): - mime_type = 'text/css' - else: - mime_type = 'text/plain' + mime_type = 'text/css' if file.endswith('.css') else 'text/plain' # get filename: given a/b/c.js extract c.js fn = file.rpartition("/")[2] @@ -2079,12 +2067,9 @@ def selectTemplate(self, name, view): if (view and view.find('|') != -1): # we have alternate templates, parse them apart. (oktmpl, errortmpl) = view.split("|", 2) - if self._error_message: - # we have an error, use errortmpl - view = errortmpl - else: - # no error message recorded, use oktmpl - view = oktmpl + + # Choose the right template + view = errortmpl if self._error_message else oktmpl loader = self.instance.templates @@ -2135,7 +2120,7 @@ def renderContext(self): # catch errors so we can handle PT rendering errors more nicely args = { 'ok_message': self._ok_message, - 'error_message': self._error_message + 'error_message': self._error_message, } pt = self.instance.templates.load(tplname) # let the template render figure stuff out @@ -2189,8 +2174,7 @@ def renderContext(self): # than the one we tried to generate above. if sys.version_info[0] > 2: raise exc_info[0](exc_info[1]).with_traceback(exc_info[2]) - else: - exec('raise exc_info[0], exc_info[1], exc_info[2]') # nosec + exec('raise exc_info[0], exc_info[1], exc_info[2]') # nosec def renderError(self, error, response_code=400, use_template=True): self.response_code = response_code @@ -2222,7 +2206,7 @@ def renderError(self, error, response_code=400, use_template=True): args = { 'ok_message': self._ok_message, - 'error_message': self._error_message + 'error_message': self._error_message, } try: @@ -2573,7 +2557,7 @@ def handle_range_header(self, length, etag): # If the condition doesn't match the entity tag, then we # must send the client the entire file. if if_range != etag: - return + return None # The grammar for the Range header value is: # # ranges-specifier = byte-ranges-specifier @@ -2761,13 +2745,10 @@ def write_file(self, filename): self._socket_op(self.request.sendfile, filename, offset, length) return # Fallback to the "write" operation. - f = open(filename, 'rb') - try: + with open(filename, 'rb') as f: if offset: f.seek(offset) content = f.read(length) - finally: - f.close() self.write(content) def setHeader(self, header, value):