Skip to content

Commit

Permalink
refactor construction of pydantic model
Browse files Browse the repository at this point in the history
  • Loading branch information
dphuang2 committed Nov 3, 2023
1 parent bd5819d commit 650ac3e
Showing 1 changed file with 36 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import typing_extensions
import aiohttp
import urllib3
{{#if prstv2}}
from pydantic import BaseModel
from pydantic import BaseModel, RootModel, ValidationError
{{/if}}
from urllib3._collections import HTTPHeaderDict
from urllib.parse import urlparse, quote
Expand Down Expand Up @@ -66,32 +66,47 @@ class RequestField(RequestFieldBase):


{{#if prstv2}}
T = typing.TypeVar('T', bound=BaseModel)
T = typing.TypeVar('T')


def construct_model_instance(model: typing.Type[T], data: dict) -> T:
def construct_model_instance(model: typing.Type[T], data: typing.Any) -> T:
"""
Recursively construct an instance of a Pydantic model along with its nested models.
"""
for field_name, field_type in model.__annotations__.items():
if field_name in data:
if typing_extensions.get_origin(field_type) is list:
list_item_type = typing_extensions.get_args(field_type)[0]
if issubclass(list_item_type, BaseModel):
data[field_name] = [construct_model_instance(list_item_type, item) for item in data[field_name]]
elif issubclass(field_type, BaseModel):
data[field_name] = construct_model_instance(field_type, data[field_name])

return model.model_construct(**data)


def construct_model_list(model: typing.Type[typing.List[T]], data_list: typing.List[dict]) -> typing.List[T]:
"""
Construct a list of Pydantic model instances from a list of dictionaries.
"""
# Extract the inner model type from Type[List[T]]
inner_model = typing_extensions.get_args(model)[0]
return [construct_model_instance(inner_model, data) for data in data_list]
# if model is Union,
if typing_extensions.get_origin(model) is typing.Union:
closest = []
# iterate over all union types and determine which one is closest to the data
for union_type in model.__args__:
matches = isinstance(data, union_type)
if matches:
# found match, return using construct_model_instance
return construct_model_instance(union_type, data)
# if no match, just use the first union_type
return construct_model_instance(model.__args__[0], data)
# if model is scalar value like str, number, etc., use RootModel to construct
elif isinstance(model, type):
model = RootModel[model]
# try to coerce value to model type
try:
return model(data).root
except ValidationError as e:
pass
# if not possible, give up
return model.model_construct(data).root
# if model is list, iterate over list and recursively call
elif issubclass(model, list):
item_model = typing_extensions.get_args(model)[0]
return [construct_model_instance(item_model, item) for item in data]
# if model is BaseModel, iterate over fields and recursively call
elif issubclass(model, BaseModel):
new_data = {}
for field_name, field_type in model.__annotations__.items():
if field_name in data:
new_data[field_name] = construct_model_instance(field_type, data[field_name])
return model.model_construct(**data)
raise ApiTypeError(f"Unable to construct model instance of type {model}")


class Dictionary(BaseModel):
Expand Down

0 comments on commit 650ac3e

Please sign in to comment.