Skip to content

Commit

Permalink
feat: RND-109: Add EntityExtraction skill in Adala (#168)
Browse files Browse the repository at this point in the history
Co-authored-by: nik <[email protected]>
  • Loading branch information
niklub and nik authored Aug 13, 2024
1 parent 8cd9081 commit ea653ae
Show file tree
Hide file tree
Showing 19 changed files with 4,599 additions and 4,598 deletions.
22 changes: 14 additions & 8 deletions adala/runtimes/_litellm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import logging
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union, Type

import litellm
from litellm.exceptions import AuthenticationError
Expand All @@ -14,7 +14,8 @@
partial_str_format,
parse_template_to_pydantic_class,
)
from pydantic import ConfigDict, field_validator
from openai import NotFoundError
from pydantic import ConfigDict, field_validator, BaseModel
from rich import print

from .base import AsyncRuntime, Runtime
Expand Down Expand Up @@ -131,6 +132,7 @@ def record_to_record(
extra_fields: Optional[Dict[str, str]] = None,
field_schema: Optional[Dict] = None,
instructions_first: bool = False,
response_model: Optional[Type[BaseModel]] = None,
) -> Dict[str, str]:
"""
Execute OpenAI request given record and templates for input, instructions and output.
Expand All @@ -143,16 +145,18 @@ def record_to_record(
extra_fields: Extra fields to be used in templates.
field_schema: Field schema to be used for parsing templates.
instructions_first: If True, instructions will be sent before input.
response_model: Pydantic model for response. If set, `output_template` and `field_schema` are ignored.
Returns:
Dict[str, str]: Output record.
"""

extra_fields = extra_fields or {}

response_model = parse_template_to_pydantic_class(
output_template, provided_field_schema=field_schema
)
if not response_model:
response_model = parse_template_to_pydantic_class(
output_template, provided_field_schema=field_schema
)
messages = get_messages(
input_template.format(**record, **extra_fields),
instructions_template,
Expand Down Expand Up @@ -263,12 +267,14 @@ async def batch_to_batch(
extra_fields: Optional[Dict[str, str]] = None,
field_schema: Optional[Dict] = None,
instructions_first: bool = True,
response_model: Optional[Type[BaseModel]] = None,
) -> InternalDataFrame:
"""Execute batch of requests with async calls to OpenAI API"""

response_model = parse_template_to_pydantic_class(
output_template, provided_field_schema=field_schema
)
if not response_model:
response_model = parse_template_to_pydantic_class(
output_template, provided_field_schema=field_schema
)

extra_fields = extra_fields or {}
user_prompts = batch.apply(
Expand Down
23 changes: 22 additions & 1 deletion adala/runtimes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from tqdm import tqdm
from abc import ABC, abstractmethod
from pydantic import BaseModel, model_validator, Field
from typing import List, Dict, Optional, Tuple, Any, Callable, ClassVar
from typing import List, Dict, Optional, Tuple, Any, Callable, ClassVar, Type
from adala.utils.internal_data import InternalDataFrame, InternalSeries
from adala.utils.registry import BaseModelInRegistry
from pandarallel import pandarallel
Expand All @@ -21,6 +21,9 @@ class Runtime(BaseModelInRegistry):
batch_size (Optional[int]): The batch size to use for processing records. Defaults to None.
concurrency (Optional[int]): The number of parallel processes to use for processing records. Defaults to 1.
Note that when parallel processing is used, the memory footprint will be doubled compared to sequential processing.
response_model (Optional[Type[BaseModel]]): The response model to use for processing records. Defaults to None.
If set, the response will be generated according to this model and `output_template` and `field_schema` fields will be ignored.
Note, explicitly providing ResponseModel will be the default behavior for all runtimes in the future.
"""

verbose: bool = False
Expand Down Expand Up @@ -48,6 +51,7 @@ def record_to_record(
extra_fields: Optional[Dict[str, Any]] = None,
field_schema: Optional[Dict] = None,
instructions_first: bool = True,
response_model: Optional[Type[BaseModel]] = None,
) -> Dict[str, str]:
"""
Processes a record.
Expand All @@ -61,6 +65,9 @@ def record_to_record(
field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings,
i.e. analogous to {"field_n": {"type": "string"}}.
instructions_first (bool): Whether to put instructions first. Defaults to True.
response_model (Optional[Type[BaseModel]]): The response model to use for processing records. Defaults to None.
If set, the response will be generated according to this model and `output_template` and `field_schema` fields will be ignored.
Note, explicitly providing ResponseModel will be the default behavior for all runtimes in the future.
Returns:
Dict[str, str]: The processed record.
Expand All @@ -75,6 +82,7 @@ def batch_to_batch(
extra_fields: Optional[Dict[str, str]] = None,
field_schema: Optional[Dict] = None,
instructions_first: bool = True,
response_model: Optional[Type[BaseModel]] = None,
) -> InternalDataFrame:
"""
Processes a record.
Expand Down Expand Up @@ -132,6 +140,7 @@ def batch_to_batch(
extra_fields=extra_fields,
field_schema=field_schema,
instructions_first=instructions_first,
response_model=response_model,
)
return output

Expand All @@ -145,6 +154,7 @@ def record_to_batch(
extra_fields: Optional[Dict[str, str]] = None,
field_schema: Optional[Dict] = None,
instructions_first: bool = True,
response_model: Optional[Type[BaseModel]] = None,
) -> InternalDataFrame:
"""
Processes a record and return a batch.
Expand All @@ -159,6 +169,9 @@ def record_to_batch(
field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings,
i.e. analogous to {"field_n": {"type": "string"}}.
instructions_first (bool): Whether to put instructions first. Defaults to True.
response_model (Optional[Type[BaseModel]]): The response model to use for processing records. Defaults to None.
If set, the response will be generated according to this model and `output_template` and `field_schema` fields will be ignored.
Note, explicitly providing ResponseModel will be the default behavior for all runtimes in the future.
Returns:
InternalDataFrame: The processed batch.
Expand All @@ -172,6 +185,7 @@ def record_to_batch(
extra_fields=extra_fields,
field_schema=field_schema,
instructions_first=instructions_first,
response_model=response_model
)


Expand All @@ -188,6 +202,7 @@ async def record_to_record(
extra_fields: Optional[Dict[str, Any]] = None,
field_schema: Optional[Dict] = None,
instructions_first: bool = True,
response_model: Optional[Type[BaseModel]] = None,
) -> Dict[str, str]:
"""
Processes a record.
Expand All @@ -201,6 +216,9 @@ async def record_to_record(
field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings,
i.e. analogous to {"field_n": {"type": "string"}}.
instructions_first (bool): Whether to put instructions first. Defaults to True.
response_model (Optional[Type[BaseModel]]): The response model to use for processing records. Defaults to None.
If set, the response will be generated according to this model and `output_template` and `field_schema` fields will be ignored.
Note, explicitly providing ResponseModel will be the default behavior for all runtimes in the future.
Returns:
Dict[str, str]: The processed record.
Expand All @@ -216,6 +234,7 @@ async def batch_to_batch(
extra_fields: Optional[Dict[str, str]] = None,
field_schema: Optional[Dict] = None,
instructions_first: bool = True,
response_model: Optional[Type[BaseModel]] = None,
) -> InternalDataFrame:
"""
Processes a record.
Expand All @@ -229,6 +248,7 @@ async def batch_to_batch(
field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings,
i.e. analogous to {"field_n": {"type": "string"}}.
instructions_first (bool): Whether to put instructions first. Defaults to True.
response_model (Optional[Type[BaseModel]]): The response model to use for processing records. Defaults to None.
Returns:
InternalDataFrame: The processed batch.
Expand All @@ -243,5 +263,6 @@ async def batch_to_batch(
extra_fields=extra_fields,
field_schema=field_schema,
instructions_first=instructions_first,
response_model=response_model,
)
return output
23 changes: 22 additions & 1 deletion adala/skills/_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pydantic import BaseModel, Field, field_validator
from typing import List, Optional, Any, Dict, Tuple, Union, ClassVar
from typing import List, Optional, Any, Dict, Tuple, Union, ClassVar, Type
from abc import ABC, abstractmethod
from adala.utils.internal_data import (
InternalDataFrame,
Expand Down Expand Up @@ -31,6 +31,7 @@ class Skill(BaseModelInRegistry):
instructions_first (bool): Flag indicating if instructions should be executed before input. Defaults to True.
verbose (bool): Flag indicating if runtime outputs should be verbose. Defaults to False.
frozen (bool): Flag indicating if the skill is frozen. Defaults to False.
response_model (Optional[Type[BaseModel]]): Pydantic-based response model for the skill. If used, `output_template` and `field_schema` are ignored. Note that using `response_model` will become the default in the future.
type (ClassVar[str]): Type of the skill.
"""

Expand All @@ -44,6 +45,8 @@ class Skill(BaseModelInRegistry):
description="Instructs agent what to do with the input data. "
"Can use templating to refer to input fields.",
examples=["Label the input text with the following labels: {labels}"],
# TODO: instructions can be deprecated in favor of using `input_template` to specify the instructions
default=''
)
input_template: str = Field(
title="Input template",
Expand All @@ -56,6 +59,8 @@ class Skill(BaseModelInRegistry):
description="Template for the output data. "
"Can use templating to refer to input parameters and perform data transformations",
examples=["Output: {output}", "{predictions}"],
# TODO: output_template can be deprecated in favor of using `response_model` to specify the output
default=''
)
description: Optional[str] = Field(
default="",
Expand Down Expand Up @@ -95,6 +100,12 @@ class Skill(BaseModelInRegistry):
examples=[True, False],
)

response_model: Type[BaseModel] = Field(
default=None,
title="Response model",
description="Pydantic-based response model for the skill. If used, `output_template` and `field_schema` are ignored.",
)

def _get_extra_fields(self):
"""
Retrieves fields that are not categorized as system fields.
Expand All @@ -111,6 +122,12 @@ def _get_extra_fields(self):
"output_template",
"instructions",
"field_schema",
"extra_fields",
"instructions_first",
"verbose",
"frozen",
"response_model",
"type",
}
extra_fields = self.model_dump(exclude=system_fields)
return extra_fields
Expand Down Expand Up @@ -173,6 +190,7 @@ def apply(
field_schema=self.field_schema,
extra_fields=self._get_extra_fields(),
instructions_first=self.instructions_first,
response_model=self.response_model,
)

async def aapply(
Expand All @@ -199,6 +217,7 @@ async def aapply(
field_schema=self.field_schema,
extra_fields=self._get_extra_fields(),
instructions_first=self.instructions_first,
response_model=self.response_model
)

def improve(
Expand Down Expand Up @@ -411,6 +430,7 @@ def apply(
field_schema=self.field_schema,
extra_fields=self._get_extra_fields(),
instructions_first=self.instructions_first,
response_model=self.response_model
)

def improve(self, **kwargs):
Expand Down Expand Up @@ -479,6 +499,7 @@ def apply(
instructions_template=self.instructions,
extra_fields=extra_fields,
instructions_first=self.instructions_first,
response_model=self.response_model
)
outputs.append(InternalSeries(output))
output = InternalDataFrame(outputs)
Expand Down
88 changes: 88 additions & 0 deletions adala/skills/collection/entity_extraction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import pandas as pd

from adala.runtimes import Runtime, AsyncRuntime
from adala.skills._base import TransformSkill
from pydantic import BaseModel, Field, model_validator
from typing import List, Type, Optional, Dict

from adala.utils.internal_data import InternalDataFrame


class EntityExtraction(TransformSkill):

name: str = "entity_extraction"
input_template: str = 'Extract entities from the input text.\n\nInput:\n"""\n{text}\n"""'
labels: Optional[List[str]] = None
output_template: str = 'Extracted entities: {entities}'

@model_validator(mode="after")
def maybe_add_labels(self):
self.field_schema = {
"entities": {
"type": "array",
"items": {
"type": "object",
"properties": {
"quote_string": {
"type": "string",
"description": "The text of the entity extracted from the input document."
}
}
}
}
}
if self.labels:
self.field_schema["entities"]["items"]["properties"]["label"] = {
"type": "string",
"description": "The label assigned to the entity.",
"enum": self.labels
}

def extract_indices(self, df):
"""
Give the input dataframe with "text" column and "entities" column of the format
```
[{"quote_string": "entity_1"}, {"quote_string": "entity_2"}, ...]
```
extract the indices of the entities in the input text and put indices in the "entities" column:
```
[{"quote_string": "entity_1", "start": 0, "end": 5}, {"quote_string": "entity_2", "start": 10, "end": 15}, ...]
```
"""
for i, row in df.iterrows():
text = row["text"]
entities = row["entities"]
to_remove = []
for entity in entities:
# TODO: current naive implementation assumes that the quote_string is unique in the text.
# this can be as a baseline for now
# and we can improve this to handle entities ambiguity (for example, requesting "prefix" in response model)
# as well as fuzzy pattern matching
start_idx = text.lower().find(entity["quote_string"].lower())
if start_idx == -1:
# we need to remove the entity if it is not found in the text
to_remove.append(entity)
else:
entity["start"] = start_idx
entity["end"] = start_idx + len(entity["quote_string"])
for entity in to_remove:
entities.remove(entity)
return df

def apply(
self,
input: InternalDataFrame,
runtime: Runtime,
) -> InternalDataFrame:
output = super().apply(input, runtime)
output = self.extract_indices(pd.concat([input, output], axis=1))
return output

async def aapply(
self,
input: InternalDataFrame,
runtime: AsyncRuntime,
) -> InternalDataFrame:
output = await super().aapply(input, runtime)
output = self.extract_indices(pd.concat([input, output], axis=1))
return output
9 changes: 5 additions & 4 deletions adala/utils/pydantic_generator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional, Type, Union, Tuple
from typing import Any, Dict, List, Optional, Type, Union, Tuple, Literal
from enum import Enum
from datetime import datetime
from pydantic import BaseModel, Field, create_model
Expand Down Expand Up @@ -118,9 +118,10 @@ def json_schema_to_pydantic_type(
else:
raise NotImplementedError(f"Unsupported JSON schema format: {format_}")
elif "enum" in json_schema:
return Enum(
enum_class_name, {item: item for item in json_schema["enum"]}, type=str
)
return Literal[*json_schema["enum"]]
# return Enum(
# enum_class_name, {item: item for item in json_schema["enum"]}, type=str
# )
return str
elif type_ == "integer":
return int
Expand Down
Loading

0 comments on commit ea653ae

Please sign in to comment.