Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial context updates #93

Open
wants to merge 310 commits into
base: dev
Choose a base branch
from
Open

Partial context updates #93

wants to merge 310 commits into from

Conversation

pseusys
Copy link
Collaborator

@pseusys pseusys commented Mar 13, 2023

Description

Context storages are updated partially now instead of reading and writing whole data at once.

Checklist

  • I have covered the code with tests
  • I have added comments to my code to help others understand it
  • I have updated the documentation to reflect the changes
  • I have performed a self-review of the changes
  • Consider extending UpdateScheme from BaseModel
  • Decide how we want to use clear method.

@pseusys pseusys self-assigned this Mar 13, 2023
@pseusys pseusys requested review from kudep and RLKRo April 7, 2023 01:43
@pseusys pseusys added the enhancement New feature or request label Apr 7, 2023
@pseusys pseusys marked this pull request as ready for review April 7, 2023 01:43
@kudep kudep marked this pull request as draft April 24, 2023 16:41
dff/context_storages/database.py Outdated Show resolved Hide resolved
dff/context_storages/update_scheme.py Outdated Show resolved Hide resolved
dff/context_storages/update_scheme.py Outdated Show resolved Hide resolved
dff/context_storages/update_scheme.py Outdated Show resolved Hide resolved
dff/context_storages/update_scheme.py Outdated Show resolved Hide resolved
dff/context_storages/update_scheme.py Outdated Show resolved Hide resolved
dff/context_storages/update_scheme.py Outdated Show resolved Hide resolved
dff/context_storages/json.py Outdated Show resolved Hide resolved
dff/context_storages/update_scheme.py Outdated Show resolved Hide resolved
dff/context_storages/update_scheme.py Outdated Show resolved Hide resolved
dff/context_storages/update_scheme.py Outdated Show resolved Hide resolved
dff/context_storages/update_scheme.py Outdated Show resolved Hide resolved
@RLKRo

This comment was marked as outdated.

@RLKRo
Copy link
Member

RLKRo commented Oct 24, 2024

@pseusys
I reverted get_context_ids and filters to process them in a separate PR (#399).

@RLKRo
Copy link
Member

RLKRo commented Oct 24, 2024

Suggestion for performance analysis:

  1. Add logging (add debug level logs inside db methods that log used statements; the logs should be both before and after statement execution to measure statement execution time via difference between log event times) (also logs inside context dict).
  2. Try doing following: enable logs; clear db; add a context with 10000 turns into the db (message and misc dimensions both (10, 10)); run the pipeline on that context and chat with it via CLI -> this will collect logs + will confirm if the time it takes to update context at that point is actually 3 seconds (per benchmark results).

@RLKRo
Copy link
Member

RLKRo commented Oct 25, 2024

OSError: [Errno 24] Too many open files: '/root/.cache/pypoetry/virtualenvs/chatsky-KJesWijk-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/init.py'

Traceback:

File "/home/git_clones/chatsky/chatsky/utils/db_benchmark/benchmark.py", line 290, in _run
    self.db_factory.db(),

  File "/home/git_clones/chatsky/chatsky/utils/db_benchmark/benchmark.py", line 146, in db
    return getattr(module, self.factory)(self.uri)

  File "/home/git_clones/chatsky/chatsky/context_storages/database.py", line 195, in context_storage_factory
    return target_class(path, **kwargs)

  File "/home/git_clones/chatsky/chatsky/context_storages/sql.py", line 156, in __init__

  File "/root/.cache/pypoetry/virtualenvs/chatsky-KJesWijk-py3.10/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/engine.py", line 120, in create_async_engine

  File "<string>", line 2, in create_engine

  File "/root/.cache/pypoetry/virtualenvs/chatsky-KJesWijk-py3.10/lib/python3.10/site-packages/sqlalchemy/util/deprecations.py", line 281, in warned

  File "/root/.cache/pypoetry/virtualenvs/chatsky-KJesWijk-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/create.py", line 550, in create_engine

  File "/root/.cache/pypoetry/virtualenvs/chatsky-KJesWijk-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/url.py", line 758, in _get_entrypoint

  File "/root/.cache/pypoetry/virtualenvs/chatsky-KJesWijk-py3.10/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 365, in load

  File "/root/.cache/pypoetry/virtualenvs/chatsky-KJesWijk-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/__init__.py", line 47, in _auto_fn

  File "<frozen importlib._bootstrap>", line 1027, in _find_and_load

  File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked

  File "<frozen importlib._bootstrap>", line 688, in _load_unlocked

  File "<frozen importlib._bootstrap_external>", line 879, in exec_module

  File "<frozen importlib._bootstrap_external>", line 1016, in get_code

  File "<frozen importlib._bootstrap_external>", line 1073, in get_data

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this file to chatsky/core.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:param serializer: Serializer that will be used for serializing contexts.
"""

is_asynchronous = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point of this flag?
Even if you limit concurrent execution within the processing of a single context, you still have potentially multiple contexts being processed at the same time.
IMO it's better to use asyncio.lock inside the methods that require to read and write data within a single method call.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 25 to 29
_items: Dict[K, V] = PrivateAttr(default_factory=dict)
_hashes: Dict[K, int] = PrivateAttr(default_factory=dict)
_keys: Set[K] = PrivateAttr(default_factory=set)
_added: Set[K] = PrivateAttr(default_factory=set)
_removed: Set[K] = PrivateAttr(default_factory=set)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that many private attributes.
Maybe split this into two classes (context dict and context dict db connector)?
The first would implement all dict methods and have a single private attribute (context dict db connector);
and the second would hold all the fields and implement some methods (load, get, set).

Copy link
Collaborator Author

@pseusys pseusys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I dislike logging. For now, it just looks inconsistent, why do we have it only for a fraction of code. I think we should either add it literally everywhere or only preserve it somewhere where it would make sence (e.g. when calling some external interfaces, like databases - but in that case we should also consider adding logging to message interfaces).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could anyone please confirm this file works fine? Considering all the pydantic models added?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's no error, it works fine.
But yes. Some of these rebuilds are no longer required:

  • SerializableStorage no longer references Context, so there's no need to rebuild it;
  • ContextDict does indeed need to be rebuild with DBContextStorage because it is imported in a TYPE_CHECKING block during ContextDict definition.

I would suggest trying to remove all the changes but

from chatsky.context_storages.database import DBContextStorage
from chatsky.core.ctx_dict import ContextDict

ContextDict.model_rebuild()

and see if that works.

Comment on lines +42 to +47
"""
class Turn(BaseModel):
label: Optional[NodeLabel2Type] = Field(default=None)
request: Optional[Message] = Field(default=None)
response: Optional[Message] = Field(default=None)
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need Turns after all? Do we add methods like last_turn, or turn_id(int)?..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not as a class.
We could add a method that zips all the turn fields for convenience:

async def turns(self, slice) -> Iterable[Label, Message, Message]:
    return zip(*asyncio.gather(
        self.labels.__getitem__(slice),
        self.requests.__getitem__(slice),
        self.responses.__getitem__(slice)
    ))

init_kwargs = {
"labels": {0: AbsoluteNodeLabel.model_validate(start_label)},
}
async def connected(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember we wamted to get rid of connected method. Do we still want to do that? If so, what should be the preferred alternatives?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I like the way it is now but I need a bit more time to work with the new context to see if there are any issues.

_value_type: Optional[TypeAdapter[Type[V]]] = PrivateAttr(None)

@classmethod
async def new(cls, storage: DBContextStorage, id: str, field: str, value_type: Type[V]) -> "ContextDict":
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once again, just like with Context class, what's our final decision about ContextDict creation? Should we preserve two classmethods or switch to one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My issues with the new creation methods stem from the fact that simply initializing context with Context() will no longer produce a functional context but you would also need to set _value_type for all context dicts:

ctx = Context()
ctx.labels._value_type = TypeAdapter(AbsoluteNodeLabel)
ctx.requests._value_type = TypeAdapter(Message)
ctx.responses._value_type = TypeAdapter(Message)

Which is not convenient for testing and debugging.

I think the best solution is to add validators to Context that would prep its context dicts (by setting their value types) so that Context() is a functional context without db connection.

await conn.run_sync(storage.table.drop, storage.engine)
async with storage.engine.begin() as conn:
for table in [storage.main_table, storage.turns_table]:
await conn.run_sync(table.drop, storage.engine)


async def delete_ydb(storage: YDBContextStorage):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an issue with YDB that some coroutines remain running after the cleanup. Still, everything works fine (Except for disturbing log entries). Are we OK with that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give more info?
Does await storage.pool.retry_operation(callee) request the db to cleanup but doesn't wait until it does so?

Comment on lines -157 to -167
Private methods
^^^^^^^^^^^^^^^

These methods should not be used outside of the internal workings.

* **set_last_response**
* **set_last_request**
* **add_request**
* **add_response**
* **add_label**

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some ambiguity about context field access. Most of the times, they are accessed like this: ctx.labels[ctx.current_turn_id]. However inside of the context we call them like this: self.labels._items[self.labels.keys()[-1]]. We could've also added a property setter for that. I think we should define a universally-correct way and use it everywhere.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between the two is that ctx.labels[ctx.current_turn_id] is a coroutine and I didn't want last_... kind of properties to become async.

That reminds me that for this workaround to work we need to make subscript value at least 1 (so that it always loads the last turn).

Copy link
Member

@RLKRo RLKRo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first two comments are from my unfinished review.

self.engine = create_async_engine(self.full_path, pool_pre_ping=True)
self.dialect: str = self.engine.dialect.name
self._insert_limit = _get_write_limit(self.dialect)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used.

self.main_table = Table(
f"{table_name_prefix}_{self._main_table_name}",
metadata,
Column(self._id_column_name, String(self._UUID_LENGTH), index=True, unique=True, nullable=False),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Context id doesn't have to be a UUID.
It could also be a telegram username, so it is not limited to uuid length.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's no error, it works fine.
But yes. Some of these rebuilds are no longer required:

  • SerializableStorage no longer references Context, so there's no need to rebuild it;
  • ContextDict does indeed need to be rebuild with DBContextStorage because it is imported in a TYPE_CHECKING block during ContextDict definition.

I would suggest trying to remove all the changes but

from chatsky.context_storages.database import DBContextStorage
from chatsky.core.ctx_dict import ContextDict

ContextDict.model_rebuild()

and see if that works.

Comment on lines +42 to +47
"""
class Turn(BaseModel):
label: Optional[NodeLabel2Type] = Field(default=None)
request: Optional[Message] = Field(default=None)
response: Optional[Message] = Field(default=None)
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not as a class.
We could add a method that zips all the turn fields for convenience:

async def turns(self, slice) -> Iterable[Label, Message, Message]:
    return zip(*asyncio.gather(
        self.labels.__getitem__(slice),
        self.requests.__getitem__(slice),
        self.responses.__getitem__(slice)
    ))

init_kwargs = {
"labels": {0: AbsoluteNodeLabel.model_validate(start_label)},
}
async def connected(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I like the way it is now but I need a bit more time to work with the new context to see if there are any issues.

_value_type: Optional[TypeAdapter[Type[V]]] = PrivateAttr(None)

@classmethod
async def new(cls, storage: DBContextStorage, id: str, field: str, value_type: Type[V]) -> "ContextDict":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My issues with the new creation methods stem from the fact that simply initializing context with Context() will no longer produce a functional context but you would also need to set _value_type for all context dicts:

ctx = Context()
ctx.labels._value_type = TypeAdapter(AbsoluteNodeLabel)
ctx.requests._value_type = TypeAdapter(Message)
ctx.responses._value_type = TypeAdapter(Message)

Which is not convenient for testing and debugging.

I think the best solution is to add validators to Context that would prep its context dicts (by setting their value types) so that Context() is a functional context without db connection.

await conn.run_sync(storage.table.drop, storage.engine)
async with storage.engine.begin() as conn:
for table in [storage.main_table, storage.turns_table]:
await conn.run_sync(table.drop, storage.engine)


async def delete_ydb(storage: YDBContextStorage):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give more info?
Does await storage.pool.retry_operation(callee) request the db to cleanup but doesn't wait until it does so?

Comment on lines -157 to -167
Private methods
^^^^^^^^^^^^^^^

These methods should not be used outside of the internal workings.

* **set_last_response**
* **set_last_request**
* **add_request**
* **add_response**
* **add_label**

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between the two is that ctx.labels[ctx.current_turn_id] is a coroutine and I didn't want last_... kind of properties to become async.

That reminds me that for this workaround to work we need to make subscript value at least 1 (so that it always loads the last turn).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants