Skip to content

Commit

Permalink
chore(integrations): move openai,psycopg,pylibmc,pymemcache,pymongo,p…
Browse files Browse the repository at this point in the history
…ymysql to internal (#10186)

- Moves all integration internals in ddtrace/contrib/(integration name)/
to ddtrace/contrib/internal/(integration name)/ for openai, psycopg,
pylibmc, pymemcache, pymongo, and pymysql
- Ensures ddtrace/contrib/(integration name)/ and
ddtrace/contrib/(integration name)/ continue to expose the same
functions, classes, imports, and module level variables (via from
..internal.integration.module import * imports).
- Log a deprecation warning if internal modules in
ddtrace/contrib/(integration name)/ and ddtrace/contrib/(integration
name)/. Only patch and unpack methods should be exposed by these
packages.
- #9996

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: Emmett Butler <[email protected]>
Co-authored-by: Munir Abdinur <[email protected]>
Co-authored-by: Munir Abdinur <[email protected]>
  • Loading branch information
4 people authored Aug 15, 2024
1 parent 111d434 commit f42fb50
Show file tree
Hide file tree
Showing 47 changed files with 2,233 additions and 3,565 deletions.
6 changes: 3 additions & 3 deletions ddtrace/contrib/internal/aiopg/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

from ddtrace import config
from ddtrace.contrib.aiopg.connection import AIOTracedConnection
from ddtrace.contrib.psycopg.connection import patch_conn as psycopg_patch_conn
from ddtrace.contrib.psycopg.extensions import _patch_extensions
from ddtrace.contrib.psycopg.extensions import _unpatch_extensions
from ddtrace.contrib.internal.psycopg.connection import patch_conn as psycopg_patch_conn
from ddtrace.contrib.internal.psycopg.extensions import _patch_extensions
from ddtrace.contrib.internal.psycopg.extensions import _unpatch_extensions
from ddtrace.internal.schema import schematize_service_name
from ddtrace.internal.utils.wrappers import unwrap as _u
from ddtrace.vendor import wrapt
Expand Down
8 changes: 4 additions & 4 deletions ddtrace/contrib/internal/django/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ def patch_conn(django, conn):
try:
from psycopg.cursor import Cursor as psycopg_cursor_cls

from ddtrace.contrib.psycopg.cursor import Psycopg3TracedCursor
from ddtrace.contrib.internal.psycopg.cursor import Psycopg3TracedCursor
except ImportError:
Psycopg3TracedCursor = None
try:
from psycopg2._psycopg import cursor as psycopg_cursor_cls

from ddtrace.contrib.psycopg.cursor import Psycopg2TracedCursor
from ddtrace.contrib.internal.psycopg.cursor import Psycopg2TracedCursor
except ImportError:
psycopg_cursor_cls = None
Psycopg2TracedCursor = None
Expand Down Expand Up @@ -148,12 +148,12 @@ def cursor(django, pin, func, instance, args, kwargs):
try:
if cursor.cursor.__class__.__module__.startswith("psycopg2."):
# Import lazily to avoid importing psycopg2 if not already imported.
from ddtrace.contrib.psycopg.cursor import Psycopg2TracedCursor
from ddtrace.contrib.internal.psycopg.cursor import Psycopg2TracedCursor

traced_cursor_cls = Psycopg2TracedCursor
elif type(cursor.cursor).__name__ == "Psycopg3TracedCursor":
# Import lazily to avoid importing psycopg if not already imported.
from ddtrace.contrib.psycopg.cursor import Psycopg3TracedCursor
from ddtrace.contrib.internal.psycopg.cursor import Psycopg3TracedCursor

traced_cursor_cls = Psycopg3TracedCursor
except AttributeError:
Expand Down
66 changes: 66 additions & 0 deletions ddtrace/contrib/internal/psycopg/async_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from ddtrace import Pin
from ddtrace import config
from ddtrace.constants import SPAN_KIND
from ddtrace.constants import SPAN_MEASURED_KEY
from ddtrace.contrib import dbapi_async
from ddtrace.contrib.internal.psycopg.async_cursor import Psycopg3FetchTracedAsyncCursor
from ddtrace.contrib.internal.psycopg.async_cursor import Psycopg3TracedAsyncCursor
from ddtrace.contrib.internal.psycopg.connection import patch_conn
from ddtrace.contrib.trace_utils import ext_service
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import db
from ddtrace.internal.constants import COMPONENT


class Psycopg3TracedAsyncConnection(dbapi_async.TracedAsyncConnection):
def __init__(self, conn, pin=None, cursor_cls=None):
if not cursor_cls:
# Do not trace `fetch*` methods by default
cursor_cls = (
Psycopg3FetchTracedAsyncCursor if config.psycopg.trace_fetch_methods else Psycopg3TracedAsyncCursor
)

super(Psycopg3TracedAsyncConnection, self).__init__(conn, pin, config.psycopg, cursor_cls=cursor_cls)

async def execute(self, *args, **kwargs):
"""Execute a query and return a cursor to read its results."""
span_name = "{}.{}".format(self._self_datadog_name, "execute")

async def patched_execute(*args, **kwargs):
try:
cur = self.cursor()
if kwargs.get("binary", None):
cur.format = 1 # set to 1 for binary or 0 if not
return await cur.execute(*args, **kwargs)
except Exception as ex:
raise ex.with_traceback(None)

return await self._trace_method(patched_execute, span_name, {}, *args, **kwargs)


def patched_connect_async_factory(psycopg_module):
async def patched_connect_async(connect_func, _, args, kwargs):
traced_conn_cls = Psycopg3TracedAsyncConnection

pin = Pin.get_from(psycopg_module)

if not pin or not pin.enabled() or not pin._config.trace_connect:
conn = await connect_func(*args, **kwargs)
else:
with pin.tracer.trace(
"{}.{}".format(connect_func.__module__, connect_func.__name__),
service=ext_service(pin, pin._config),
span_type=SpanTypes.SQL,
) as span:
span.set_tag_str(SPAN_KIND, SpanKind.CLIENT)
span.set_tag_str(COMPONENT, pin._config.integration_name)
if span.get_tag(db.SYSTEM) is None:
span.set_tag_str(db.SYSTEM, pin._config.dbms_name)

span.set_tag(SPAN_MEASURED_KEY)
conn = await connect_func(*args, **kwargs)

return patch_conn(conn, pin=pin, traced_conn_cls=traced_conn_cls)

return patched_connect_async
11 changes: 11 additions & 0 deletions ddtrace/contrib/internal/psycopg/async_cursor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from ddtrace.contrib import dbapi_async
from ddtrace.contrib.internal.psycopg.cursor import Psycopg3TracedCursor


class Psycopg3TracedAsyncCursor(Psycopg3TracedCursor, dbapi_async.TracedAsyncCursor):
def __init__(self, cursor, pin, cfg, *args, **kwargs):
super(Psycopg3TracedAsyncCursor, self).__init__(cursor, pin, cfg)


class Psycopg3FetchTracedAsyncCursor(Psycopg3TracedAsyncCursor, dbapi_async.FetchTracedAsyncCursor):
"""Psycopg3FetchTracedAsyncCursor for psycopg"""
110 changes: 110 additions & 0 deletions ddtrace/contrib/internal/psycopg/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from ddtrace import Pin
from ddtrace import config
from ddtrace.constants import SPAN_KIND
from ddtrace.constants import SPAN_MEASURED_KEY
from ddtrace.contrib import dbapi
from ddtrace.contrib.internal.psycopg.cursor import Psycopg2FetchTracedCursor
from ddtrace.contrib.internal.psycopg.cursor import Psycopg2TracedCursor
from ddtrace.contrib.internal.psycopg.cursor import Psycopg3FetchTracedCursor
from ddtrace.contrib.internal.psycopg.cursor import Psycopg3TracedCursor
from ddtrace.contrib.internal.psycopg.extensions import _patch_extensions
from ddtrace.contrib.trace_utils import ext_service
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import db
from ddtrace.ext import net
from ddtrace.ext import sql
from ddtrace.internal.constants import COMPONENT


class Psycopg3TracedConnection(dbapi.TracedConnection):
def __init__(self, conn, pin=None, cursor_cls=None):
if not cursor_cls:
# Do not trace `fetch*` methods by default
cursor_cls = Psycopg3FetchTracedCursor if config.psycopg.trace_fetch_methods else Psycopg3TracedCursor

super(Psycopg3TracedConnection, self).__init__(conn, pin, config.psycopg, cursor_cls=cursor_cls)

def execute(self, *args, **kwargs):
"""Execute a query and return a cursor to read its results."""

def patched_execute(*args, **kwargs):
try:
cur = self.cursor()
if kwargs.get("binary", None):
cur.format = 1 # set to 1 for binary or 0 if not
return cur.execute(*args, **kwargs)
except Exception as ex:
raise ex.with_traceback(None)

return patched_execute(*args, **kwargs)


class Psycopg2TracedConnection(dbapi.TracedConnection):
"""TracedConnection wraps a Connection with tracing code."""

def __init__(self, conn, pin=None, cursor_cls=None):
if not cursor_cls:
# Do not trace `fetch*` methods by default
cursor_cls = Psycopg2FetchTracedCursor if config.psycopg.trace_fetch_methods else Psycopg2TracedCursor

super(Psycopg2TracedConnection, self).__init__(conn, pin, config.psycopg, cursor_cls=cursor_cls)


def patch_conn(conn, traced_conn_cls, pin=None):
"""Wrap will patch the instance so that its queries are traced."""
# ensure we've patched extensions (this is idempotent) in
# case we're only tracing some connections.
_config = None
if pin:
extensions_to_patch = pin._config.get("_extensions_to_patch", None)
_config = pin._config
if extensions_to_patch:
_patch_extensions(extensions_to_patch)

c = traced_conn_cls(conn)

# if the connection has an info attr, we are using psycopg3
if hasattr(conn, "dsn"):
dsn = sql.parse_pg_dsn(conn.dsn)
else:
dsn = sql.parse_pg_dsn(conn.info.dsn)

tags = {
net.TARGET_HOST: dsn.get("host"),
net.TARGET_PORT: dsn.get("port", 5432),
net.SERVER_ADDRESS: dsn.get("host"),
db.NAME: dsn.get("dbname"),
db.USER: dsn.get("user"),
"db.application": dsn.get("application_name"),
db.SYSTEM: "postgresql",
}
Pin(tags=tags, _config=_config).onto(c)
return c


def patched_connect_factory(psycopg_module):
def patched_connect(connect_func, _, args, kwargs):
traced_conn_cls = Psycopg3TracedConnection if psycopg_module.__name__ == "psycopg" else Psycopg2TracedConnection

pin = Pin.get_from(psycopg_module)

if not pin or not pin.enabled() or not pin._config.trace_connect:
conn = connect_func(*args, **kwargs)
else:
with pin.tracer.trace(
"{}.{}".format(connect_func.__module__, connect_func.__name__),
service=ext_service(pin, pin._config),
span_type=SpanTypes.SQL,
) as span:
span.set_tag_str(SPAN_KIND, SpanKind.CLIENT)
span.set_tag_str(COMPONENT, pin._config.integration_name)
if span.get_tag(db.SYSTEM) is None:
span.set_tag_str(db.SYSTEM, pin._config.dbms_name)

span.set_tag(SPAN_MEASURED_KEY)
conn = connect_func(*args, **kwargs)

return patch_conn(conn, pin=pin, traced_conn_cls=traced_conn_cls)

return patched_connect
28 changes: 28 additions & 0 deletions ddtrace/contrib/internal/psycopg/cursor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from ddtrace.contrib import dbapi


class Psycopg3TracedCursor(dbapi.TracedCursor):
"""TracedCursor for psycopg instances"""

def __init__(self, cursor, pin, cfg, *args, **kwargs):
super(Psycopg3TracedCursor, self).__init__(cursor, pin, cfg)

def _trace_method(self, method, name, resource, extra_tags, dbm_propagator, *args, **kwargs):
# treat Composable resource objects as strings
if resource.__class__.__name__ == "SQL" or resource.__class__.__name__ == "Composed":
resource = resource.as_string(self.__wrapped__)
return super(Psycopg3TracedCursor, self)._trace_method(
method, name, resource, extra_tags, dbm_propagator, *args, **kwargs
)


class Psycopg3FetchTracedCursor(Psycopg3TracedCursor, dbapi.FetchTracedCursor):
"""Psycopg3FetchTracedCursor for psycopg"""


class Psycopg2TracedCursor(Psycopg3TracedCursor):
"""TracedCursor for psycopg2"""


class Psycopg2FetchTracedCursor(Psycopg3FetchTracedCursor):
"""FetchTracedCursor for psycopg2"""
Loading

0 comments on commit f42fb50

Please sign in to comment.