-
-
Notifications
You must be signed in to change notification settings - Fork 62
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
411 additions
and
332 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,43 @@ | ||
"""OpenAPI spec validator shortcuts module.""" | ||
import warnings | ||
from typing import Any | ||
from typing import Hashable | ||
from typing import Mapping | ||
from typing import Optional | ||
from typing import Type | ||
|
||
from jsonschema_spec.handlers import all_urls_handler | ||
from jsonschema_spec.typing import Schema | ||
|
||
from openapi_spec_validator.validation import openapi_spec_validator_proxy | ||
from openapi_spec_validator.validation.protocols import SupportsValidation | ||
from openapi_spec_validator.validation.validators import SpecValidator | ||
|
||
|
||
def validate_spec( | ||
spec: Mapping[Hashable, Any], | ||
spec: Schema, | ||
base_uri: str = "", | ||
validator: SupportsValidation = openapi_spec_validator_proxy, | ||
validator: Optional[SupportsValidation] = None, | ||
cls: Optional[Type[SpecValidator]] = None, | ||
spec_url: Optional[str] = None, | ||
) -> None: | ||
return validator.validate(spec, base_uri=base_uri, spec_url=spec_url) | ||
if validator is not None: | ||
warnings.warn( | ||
"validator parameter is deprecated. Use cls instead.", | ||
DeprecationWarning, | ||
) | ||
return validator.validate(spec, base_uri=base_uri, spec_url=spec_url) | ||
if cls is None: | ||
# TODO: implement | ||
cls = OpenAPIV31SpecValidator | ||
validator = cls(spec) | ||
return validator.validate() | ||
|
||
|
||
def validate_spec_url( | ||
spec_url: str, | ||
validator: SupportsValidation = openapi_spec_validator_proxy, | ||
validator: Optional[SupportsValidation] = None, | ||
cls: Optional[Type[SpecValidator]] = None, | ||
) -> None: | ||
spec = all_urls_handler(spec_url) | ||
return validator.validate(spec, base_uri=spec_url) | ||
return validate_spec(spec, base_uri=spec_url) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,265 @@ | ||
import string | ||
from typing import Any | ||
from typing import Iterator | ||
from typing import Optional | ||
|
||
from jsonschema.exceptions import ValidationError | ||
from jsonschema_spec.paths import SchemaPath | ||
|
||
|
||
class KeywordValidator: | ||
def __init__(self, validator): | ||
self.validator = validator | ||
|
||
|
||
class ValueValidator(KeywordValidator): | ||
def __call__(self, schema: SchemaPath, value: Any) -> Iterator[ValidationError]: | ||
with schema.resolve() as resolved: | ||
validator = self.validator.value_validator_cls( | ||
resolved.contents, | ||
_resolver=resolved.resolver, | ||
format_checker=self.validator.value_validator_format_checker, | ||
) | ||
yield from validator.iter_errors(value) | ||
|
||
|
||
class SchemaValidator(KeywordValidator): | ||
def __call__(self, schema: SchemaPath, require_properties: bool = True) -> Iterator[ValidationError]: | ||
if not hasattr(schema.content(), "__getitem__"): | ||
return | ||
|
||
assert self.validator.schema_ids_registry is not None | ||
schema_id = id(schema.content()) | ||
if schema_id in self.validator.schema_ids_registry: | ||
return | ||
self.validator.schema_ids_registry.append(schema_id) | ||
|
||
nested_properties = [] | ||
if "allOf" in schema: | ||
all_of = schema / "allOf" | ||
for inner_schema in all_of: | ||
yield from SchemaValidator(self.validator)( | ||
inner_schema, | ||
require_properties=False, | ||
) | ||
if "properties" not in inner_schema: | ||
continue | ||
inner_schema_props = inner_schema / "properties" | ||
inner_schema_props_keys = inner_schema_props.keys() | ||
nested_properties += list(inner_schema_props_keys) | ||
|
||
if "anyOf" in schema: | ||
any_of = schema / "anyOf" | ||
for inner_schema in any_of: | ||
yield from SchemaValidator(self.validator)( | ||
inner_schema, | ||
require_properties=False, | ||
) | ||
|
||
if "oneOf" in schema: | ||
one_of = schema / "oneOf" | ||
for inner_schema in one_of: | ||
yield from SchemaValidator(self.validator)( | ||
inner_schema, | ||
require_properties=False, | ||
) | ||
|
||
if "not" in schema: | ||
not_schema = schema / "not" | ||
yield from SchemaValidator(self.validator)( | ||
not_schema, | ||
require_properties=False, | ||
) | ||
|
||
if "items" in schema: | ||
array_schema = schema / "items" | ||
yield from SchemaValidator(self.validator)( | ||
array_schema, | ||
require_properties=False, | ||
) | ||
|
||
if "properties" in schema: | ||
props = schema / "properties" | ||
for _, prop_schema in props.items(): | ||
yield from SchemaValidator(self.validator)( | ||
prop_schema, | ||
require_properties=False, | ||
) | ||
|
||
required = schema.getkey("required", []) | ||
properties = schema.get("properties", {}).keys() | ||
if "allOf" in schema: | ||
extra_properties = list( | ||
set(required) - set(properties) - set(nested_properties) | ||
) | ||
else: | ||
extra_properties = list(set(required) - set(properties)) | ||
|
||
if extra_properties and require_properties: | ||
yield ExtraParametersError( | ||
f"Required list has not defined properties: {extra_properties}" | ||
) | ||
|
||
if "default" in schema: | ||
default = schema["default"] | ||
nullable = schema.get("nullable", False) | ||
if default is not None or nullable is not True: | ||
yield from ValueValidator(self.validator)(schema, default) | ||
|
||
|
||
class SchemasValidator(KeywordValidator): | ||
def __call__(self, schemas: SchemaPath) -> Iterator[ValidationError]: | ||
for _, schema in schemas.items(): | ||
yield from SchemaValidator(self.validator)(schema) | ||
|
||
|
||
class ParameterValidator(KeywordValidator): | ||
def __call__(self, parameter: SchemaPath) -> Iterator[ValidationError]: | ||
if "schema" in parameter: | ||
schema = parameter / "schema" | ||
yield from SchemaValidator(self.validator)(schema) | ||
|
||
if "default" in parameter: | ||
# only possible in swagger 2.0 | ||
default = parameter.getkey("default") | ||
if default is not None: | ||
yield from ValueValidator(self.validator)(parameter, default) | ||
|
||
|
||
class ParametersValidator(KeywordValidator): | ||
def __call__(self, parameters: SchemaPath) -> Iterator[ValidationError]: | ||
seen = set() | ||
for parameter in parameters: | ||
yield from ParameterValidator(self.validator)(parameter) | ||
|
||
key = (parameter["name"], parameter["in"]) | ||
if key in seen: | ||
yield ParameterDuplicateError( | ||
f"Duplicate parameter `{parameter['name']}`" | ||
) | ||
seen.add(key) | ||
|
||
class MediaTypeValidator(KeywordValidator): | ||
def __call__(self, mimetype: str, media_type: SchemaPath) -> Iterator[ValidationError]: | ||
if "schema" in media_type: | ||
schema = media_type / "schema" | ||
yield from SchemaValidator(self.validator)(schema) | ||
|
||
|
||
class ContentValidator(KeywordValidator): | ||
def __call__(self, content: SchemaPath) -> Iterator[ValidationError]: | ||
for mimetype, media_type in content.items(): | ||
yield from MediaTypeValidator(self.validator)(mimetype, media_type) | ||
|
||
|
||
class ResponseValidator(KeywordValidator): | ||
def __call__(self, response_code: str, response: SchemaPath) -> Iterator[ValidationError]: | ||
# openapi 2 | ||
if "schema" in response: | ||
schema = response / "schema" | ||
yield from SchemaValidator(self.validator)(schema) | ||
# openapi 3 | ||
if "content" in response: | ||
content = response / "content" | ||
yield from ContentValidator(self.validator)(content) | ||
|
||
|
||
class ResponsesValidator(KeywordValidator): | ||
def __call__(self, responses: SchemaPath) -> Iterator[ValidationError]: | ||
for response_code, response in responses.items(): | ||
yield from ResponseValidator(self.validator)(response_code, response) | ||
|
||
|
||
class OperationValidator(KeywordValidator): | ||
def __call__( | ||
self, | ||
url: str, | ||
name: str, | ||
operation: SchemaPath, | ||
path_parameters: Optional[SchemaPath], | ||
) -> Iterator[ValidationError]: | ||
assert self.validator.operation_ids_registry is not None | ||
|
||
operation_id = operation.getkey("operationId") | ||
if ( | ||
operation_id is not None | ||
and operation_id in self.validator.operation_ids_registry | ||
): | ||
yield DuplicateOperationIDError( | ||
f"Operation ID '{operation_id}' for '{name}' in '{url}' is not unique" | ||
) | ||
self.validator.operation_ids_registry.append(operation_id) | ||
|
||
if "responses" in operation: | ||
responses = operation / "responses" | ||
yield from ResponsesValidator(self.validator)(responses) | ||
|
||
names = [] | ||
|
||
parameters = None | ||
if "parameters" in operation: | ||
parameters = operation / "parameters" | ||
yield from ParametersValidator(self.validator)(parameters) | ||
names += list(self._get_path_param_names(parameters)) | ||
|
||
if path_parameters is not None: | ||
names += list(self._get_path_param_names(path_parameters)) | ||
|
||
all_params = list(set(names)) | ||
|
||
for path in self._get_path_params_from_url(url): | ||
if path not in all_params: | ||
yield UnresolvableParameterError( | ||
"Path parameter '{}' for '{}' operation in '{}' " | ||
"was not resolved".format(path, name, url) | ||
) | ||
return | ||
|
||
def _get_path_param_names(self, params: SchemaPath) -> Iterator[str]: | ||
for param in params: | ||
if param["in"] == "path": | ||
yield param["name"] | ||
|
||
def _get_path_params_from_url(self, url: str) -> Iterator[str]: | ||
formatter = string.Formatter() | ||
path_params = [item[1] for item in formatter.parse(url)] | ||
return filter(None, path_params) | ||
|
||
|
||
class PathValidator(KeywordValidator): | ||
OPERATIONS = [ | ||
"get", | ||
"put", | ||
"post", | ||
"delete", | ||
"options", | ||
"head", | ||
"patch", | ||
"trace", | ||
] | ||
|
||
def __call__(self, url: str, path_item: SchemaPath) -> Iterator[ValidationError]: | ||
parameters = None | ||
if "parameters" in path_item: | ||
parameters = path_item / "parameters" | ||
yield from ParametersValidator(self.validator)(parameters) | ||
|
||
for field_name, operation in path_item.items(): | ||
if field_name not in self.OPERATIONS: | ||
continue | ||
|
||
yield from OperationValidator(self.validator)( | ||
url, field_name, operation, parameters | ||
) | ||
|
||
|
||
class PathsValidator(KeywordValidator): | ||
def __call__(self, paths: SchemaPath) -> Iterator[ValidationError]: | ||
for url, path_item in paths.items(): | ||
yield from PathValidator(self.validator)(url, path_item) | ||
|
||
|
||
class ComponentsValidator(KeywordValidator): | ||
def __call__(self, components: SchemaPath) -> Iterator[ValidationError]: | ||
schemas = components.get("schemas", {}) | ||
yield from SchemasValidator(self.validator)(schemas) |
Oops, something went wrong.