Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broken or too many connection to database when workers > 1 #3313

Open
simonpicard opened this issue Sep 11, 2024 · 1 comment
Open

Broken or too many connection to database when workers > 1 #3313

simonpicard opened this issue Sep 11, 2024 · 1 comment

Comments

@simonpicard
Copy link

Hi team, many thanks for the great work.

I am using Luigi to schedule a workflow, where some tasks can be run in parallel.
Hence, I would like to have multiple workers to parallelise computation, resulting in faster completion.
However, I am experiencing issue with the connection to my database when enabling multiple workers, whereas there is no issues with a single workers.
For reference, I am using SQLAlchemy 2.0.25 with a PostgreSQL database.
The typical error arising is: (psycopg2.OperationalError) SSL SYSCALL error: EOF detected which typically happens when the connection to the database was closed unexpectedly, according to ref.
As I understood that enabling multiple workers leads to creating/forking threads, I read that I must dispose the database connection as per the documentation.
Hence, I used Luigi events callback to dispose the database connection at the start of each of the task:

    @luigi.Task.event_handler(luigi.Event.START)  # type: ignore
    def on_start(self) -> None:  # pragma: no cover
        engine.dispose(close=False)

Unfortunately, such approach leads to (psycopg2.OperationalError) FATAL: sorry, too many clients already error.

As per my understanding, I should execute this database engine disposal a single time at the creation of worker, not at the start of each task.

Given the above context, my questions are:

  • Can someone share guidance on how to enable multiple workers with SQLAlchemy and PostgreSQL?
  • Is it possible to avoid the creation a dedicated engine/ database connection per workers to avoid the too many clients error?
  • Is there a way to execute a custom script at the creation of worker?

Many thanks for the help.

Similar question but for Django: #2782

@simonpicard simonpicard changed the title Broken or too many connection to database when n_workers > 1 Broken or too many connection to database when workers > 1 Sep 11, 2024
@simonpicard
Copy link
Author

Hi team, following up on my investigation, I increased the number of max connection to my PGSQL database and the above error about too many connections does not happen anymore but the following one arise: sqlalchemy.exc.DatabaseError: (psycopg2.DatabaseError) error with status PGRES_TUPLES_OK and no message from the libpq

The full traceback is the following:

2024-09-17 14:44:58 Traceback (most recent call last):
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
2024-09-17 14:44:58     self.dialect.do_execute(
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute
2024-09-17 14:44:58     cursor.execute(statement, parameters)
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/newrelic/hooks/database_psycopg2.py", line 61, in execute
2024-09-17 14:44:58     return super(CursorWrapper, self).execute(sql, parameters, *args, **kwargs)
2024-09-17 14:44:58            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/newrelic/hooks/database_dbapi2.py", line 42, in execute
2024-09-17 14:44:58     return self.__wrapped__.execute(sql, parameters, *args, **kwargs)
2024-09-17 14:44:58            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58 psycopg2.DatabaseError: error with status PGRES_TUPLES_OK and no message from the libpq
2024-09-17 14:44:58 
2024-09-17 14:44:58 The above exception was the direct cause of the following exception:
2024-09-17 14:44:58 
2024-09-17 14:44:58 Traceback (most recent call last):
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/luigi/worker.py", line 185, in run
2024-09-17 14:44:58     missing = [dep.task_id for dep in self.task.deps() if not self.check_complete(dep)]
2024-09-17 14:44:58                                       ^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/luigi/task.py", line 676, in deps
2024-09-17 14:44:58     return flatten(self._requires())
2024-09-17 14:44:58                    ^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/luigi/task.py", line 648, in _requires
2024-09-17 14:44:58     return flatten(self.requires())  # base impl
2024-09-17 14:44:58            ^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/luigi/task.py", line 987, in flatten
2024-09-17 14:44:58     for result in iterator:
2024-09-17 14:44:58   File "/code/src/tasks/utils/custom_tasks.py", line 149, in requires
2024-09-17 14:44:58     image_task = ImageTask.get_by_uuid(self.image_task_uuid)
2024-09-17 14:44:58                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/code/src/server/models/base_model.py", line 57, in get_by_uuid
2024-09-17 14:44:58     result = db.execute(select(cls).filter(cls.uuid == record_uuid))
2024-09-17 14:44:58              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2362, in execute
2024-09-17 14:44:58     return self._execute_internal(
2024-09-17 14:44:58            ^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2247, in _execute_internal
2024-09-17 14:44:58     result: Result[Any] = compile_state_cls.orm_execute_statement(
2024-09-17 14:44:58                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement
2024-09-17 14:44:58     result = conn.execute(
2024-09-17 14:44:58              ^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1418, in execute
2024-09-17 14:44:58     return meth(
2024-09-17 14:44:58            ^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection
2024-09-17 14:44:58     return connection._execute_clauseelement(
2024-09-17 14:44:58            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement
2024-09-17 14:44:58     ret = self._execute_context(
2024-09-17 14:44:58           ^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
2024-09-17 14:44:58     return self._exec_single_context(
2024-09-17 14:44:58            ^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
2024-09-17 14:44:58     self._handle_dbapi_exception(
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2355, in _handle_dbapi_exception
2024-09-17 14:44:58     raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
2024-09-17 14:44:58     self.dialect.do_execute(
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute
2024-09-17 14:44:58     cursor.execute(statement, parameters)
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/newrelic/hooks/database_psycopg2.py", line 61, in execute
2024-09-17 14:44:58     return super(CursorWrapper, self).execute(sql, parameters, *args, **kwargs)
2024-09-17 14:44:58            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/newrelic/hooks/database_dbapi2.py", line 42, in execute
2024-09-17 14:44:58     return self.__wrapped__.execute(sql, parameters, *args, **kwargs)
2024-09-17 14:44:58            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58 sqlalchemy.exc.DatabaseError: (psycopg2.DatabaseError) error with status PGRES_TUPLES_OK and no message from the libpq

As you can see, the error happens during a custom require function which make call to the database.
Hence I tried to trigger an event at the dependency discovery as such:

    @luigi.Task.event_handler(luigi.Event.DEPENDENCY_DISCOVERED)  # type: ignore
    def prepare_db_conn(self, *args: Any, **kwargs: Any) -> None:  # pragma: no cover
        engine.dispose(close=False)

It did not work, the same error happened from the same place.
Note that I kept the on start engine disposal too.

Hence, it would be really great to have a proper way to call that engine disposal at worker startup, rather than using the task events, is there anyway to achieve it?

Many thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant