From ea2a524a533e528aa1d32a01959a288b6e5cd89a Mon Sep 17 00:00:00 2001 From: rhatgadkar-goog Date: Thu, 9 Jan 2025 09:54:06 -0800 Subject: [PATCH] feat: support native asyncpg connection pools (#409) This changed is analogous to CloudSQL's change: https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/pull/1182 --- README.md | 91 ++++++++++++++++++++--- setup.py | 2 +- tests/system/test_asyncpg_connection.py | 96 ++++++++++++++++++++++++- 3 files changed, 174 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index c1656027..46c8714d 100644 --- a/README.md +++ b/README.md @@ -251,6 +251,42 @@ currently supports the following asyncio database drivers: [asyncio]: https://docs.python.org/3/library/asyncio.html +#### Asyncpg Connection Pool + +```python +import asyncpg +from google.cloud.alloydb.connector import AsyncConnector + +async def main(): + # initialize Connector object for connections to AlloyDB + connector = AsyncConnector() + + # creation function to generate asyncpg connections as the 'connect' arg + async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection: + return await connector.connect( + instance_connection_name, + "asyncpg", + user="my-user", + password="my-password", + db="my-db", + ) + + # initialize connection pool + pool = await asyncpg.create_pool( + "projects//locations//clusters//instances/", + connect=getconn, + ) + + # acquire connection and query AlloyDB database + async with pool.acquire() as conn: + res = await conn.fetch("SELECT NOW()") + + # close Connector + await connector.close() +``` + +#### SQLAlchemy Async Engine + ```python import asyncpg @@ -260,7 +296,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from google.cloud.alloydb.connector import AsyncConnector async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine: - # initialize Connector object for connections to AlloyDB + # creation function to generate asyncpg connections as 'async_creator' arg async def getconn() -> asyncpg.Connection: conn: asyncpg.Connection = await connector.connect( "projects//locations//clusters//instances/", @@ -311,6 +347,39 @@ visit the [official documentation][asyncpg-docs]. The `AsyncConnector` also may be used as an async context manager, removing the need for explicit calls to `connector.close()` to cleanup resources. +#### Asyncpg Connection Pool + +```python +import asyncpg +from google.cloud.alloydb.connector import AsyncConnector + +async def main(): + # initialize AsyncConnector object for connections to AlloyDB + async with AsyncConnector() as connector: + + # creation function to generate asyncpg connections as the 'connect' arg + async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection: + return await connector.connect( + instance_connection_name, + "asyncpg", + user="my-user", + password="my-password", + db="my-db", + ) + + # create connection pool + pool = await asyncpg.create_pool( + "projects//locations//clusters//instances/", + connect=getconn, + ) + + # acquire connection and query AlloyDB database + async with pool.acquire() as conn: + res = await conn.fetch("SELECT NOW()") +``` + +#### SQLAlchemy Async Engine + ```python import asyncio import asyncpg @@ -321,17 +390,17 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from google.cloud.alloydb.connector import AsyncConnector async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine: - # initialize Connector object for connections to AlloyDB + # creation function to generate asyncpg connections as 'async_creator' arg async def getconn() -> asyncpg.Connection: - conn: asyncpg.Connection = await connector.connect( - "projects//locations//clusters//instances/", - "asyncpg", - user="my-user", - password="my-password", - db="my-db-name" - # ... additional database driver args - ) - return conn + conn: asyncpg.Connection = await connector.connect( + "projects//locations//clusters//instances/", + "asyncpg", + user="my-user", + password="my-password", + db="my-db-name" + # ... additional database driver args + ) + return conn # The AlloyDB Python Connector can be used along with SQLAlchemy using the # 'async_creator' argument to 'create_async_engine' diff --git a/setup.py b/setup.py index b3db3e2a..604ff141 100644 --- a/setup.py +++ b/setup.py @@ -77,7 +77,7 @@ install_requires=dependencies, extras_require={ "pg8000": ["pg8000>=1.31.1"], - "asyncpg": ["asyncpg>=0.29.0"], + "asyncpg": ["asyncpg>=0.30.0"], }, python_requires=">=3.9", include_package_data=True, diff --git a/tests/system/test_asyncpg_connection.py b/tests/system/test_asyncpg_connection.py index 756d15b8..9cefd2ba 100644 --- a/tests/system/test_asyncpg_connection.py +++ b/tests/system/test_asyncpg_connection.py @@ -13,7 +13,7 @@ # limitations under the License. import os -from typing import Tuple +from typing import Any, Tuple # [START alloydb_sqlalchemy_connect_async_connector] import asyncpg @@ -88,7 +88,65 @@ async def getconn() -> asyncpg.Connection: # [END alloydb_sqlalchemy_connect_async_connector] -async def test_connection_with_asyncpg() -> None: +async def create_asyncpg_pool( + instance_connection_name: str, + user: str, + password: str, + db: str, + refresh_strategy: str = "background", +) -> tuple[asyncpg.Pool, AsyncConnector]: + """Creates a native asyncpg connection pool for an AlloyDB instance and + returns the pool and the connector. Callers are responsible for closing the + pool and the connector. + + A sample invocation looks like: + + pool, connector = await create_asyncpg_pool( + inst_conn_name, + user, + password, + db, + ) + async with pool.acquire() as conn: + hello = await conn.fetch("SELECT 'Hello World!'") + # do something with query result + await connector.close() + + Args: + instance_connection_name (str): + The instance connection name specifies the instance relative to the + project and region. For example: "my-project:my-region:my-instance" + user (str): + The database user name, e.g., postgres + password (str): + The database user's password, e.g., secret-password + db (str): + The name of the database, e.g., mydb + refresh_strategy (Optional[str]): + Refresh strategy for the Cloud SQL Connector. Can be one of "lazy" + or "background". For serverless environments use "lazy" to avoid + errors resulting from CPU being throttled. + """ + connector = AsyncConnector(refresh_strategy=refresh_strategy) + + async def getconn( + instance_connection_name: str, **kwargs: Any + ) -> asyncpg.Connection: + conn: asyncpg.Connection = await connector.connect( + instance_connection_name, + "asyncpg", + user=user, + password=password, + db=db, + ) + return conn + + # create native asyncpg pool (requires asyncpg version >=0.30.0) + pool = await asyncpg.create_pool(instance_connection_name, connect=getconn) + return pool, connector + + +async def test_sqlalchemy_connection_with_asyncpg() -> None: """Basic test to get time from database.""" inst_uri = os.environ["ALLOYDB_INSTANCE_URI"] user = os.environ["ALLOYDB_USER"] @@ -104,7 +162,7 @@ async def test_connection_with_asyncpg() -> None: await connector.close() -async def test_lazy_connection_with_asyncpg() -> None: +async def test_lazy_sqlalchemy_connection_with_asyncpg() -> None: """Basic test to get time from database.""" inst_uri = os.environ["ALLOYDB_INSTANCE_URI"] user = os.environ["ALLOYDB_USER"] @@ -120,3 +178,35 @@ async def test_lazy_connection_with_asyncpg() -> None: assert res[0] == 1 await connector.close() + + +async def test_connection_with_asyncpg() -> None: + """Basic test to get time from database.""" + inst_uri = os.environ["ALLOYDB_INSTANCE_URI"] + user = os.environ["ALLOYDB_USER"] + password = os.environ["ALLOYDB_PASS"] + db = os.environ["ALLOYDB_DB"] + + pool, connector = await create_asyncpg_pool(inst_uri, user, password, db) + + async with pool.acquire() as conn: + res = await conn.fetch("SELECT 1") + assert res[0][0] == 1 + + await connector.close() + + +async def test_lazy_connection_with_asyncpg() -> None: + """Basic test to get time from database.""" + inst_uri = os.environ["ALLOYDB_INSTANCE_URI"] + user = os.environ["ALLOYDB_USER"] + password = os.environ["ALLOYDB_PASS"] + db = os.environ["ALLOYDB_DB"] + + pool, connector = await create_asyncpg_pool(inst_uri, user, password, db, "lazy") + + async with pool.acquire() as conn: + res = await conn.fetch("SELECT 1") + assert res[0][0] == 1 + + await connector.close()