Skip to content

Commit

Permalink
commit
Browse files Browse the repository at this point in the history
  • Loading branch information
HadhemiDD committed Dec 17, 2024
1 parent 3ac67d4 commit d5f4706
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 24 deletions.
32 changes: 25 additions & 7 deletions duckdb/datadog_checks/duckdb/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# Licensed under a 3-clause BSD style license (see LICENSE)
from contextlib import closing, contextmanager
from copy import deepcopy
import json
import re
from typing import Any, AnyStr, Iterable, Iterator, Sequence # noqa: F401

import duckdb
Expand Down Expand Up @@ -39,7 +41,7 @@ def __init__(self, name, init_config, instances):
self._execute_query_raw,
queries=manager_queries,
tags=self.tags,
error_handler=self._query_errors,
error_handler=self._executor_error_handler,
)
self.check_initializations.append(self.initialize_config)
self.check_initializations.append(self._query_manager.compile_queries)
Expand All @@ -59,16 +61,21 @@ def check(self, _):
def _execute_query_raw(self, query):
# type: (AnyStr) -> Iterable[Sequence]
with closing(self._connection.cursor()) as cursor:

query = query.format(self.db_name)
if len(cursor.fetchall()) < 1: # this is returning a -1
curs = cursor.execute(query)
if len(cursor.execute(query).fetchall()) < 1: # this was returning a -1 with rowcount
self._query_errors += 1
self.log.warning('Failed to fetch records from query: `%s`.', query)
return None
for row in cursor.fetchall():
for row in cursor.execute(query).fetchall():
# To find the field name from the query
pattern = r"(?i)\bname\s*=\s*'([^']+)'"
query_name = re.search(pattern, query).group(1)
try:
yield self._queries_processor(row, query)
yield self._queries_processor(row, query_name)
except Exception as e:
self.log.debug('Unable to process row returned from query "%s", skipping row %s. %s', query, row, e)
self.log.debug('Unable to process row returned from query "%s", skipping row %s. %s', query_name, row, e)
yield row

def _queries_processor(self, row, query_name):
Expand All @@ -79,13 +86,16 @@ def _queries_processor(self, row, query_name):
if query_name == 'version':
self.submit_version(row)
return unprocessed_row

self.log.debug('Row processor returned: %s. \nFrom query: "%s"', unprocessed_row, query_name)
return unprocessed_row

@contextmanager
def connect(self):
conn = None
try:
# Try to establish the connection
conn = duckdb.connect(self.db_name)
conn = duckdb.connect(self.db_name, read_only=True)
self.log.info('Connected to DuckDB database.')
yield conn
except Exception as e:
Expand All @@ -99,7 +109,15 @@ def connect(self):
conn.close()

def initialize_config(self):
self._connect_params = self.db_name
self._connect_params = json.dumps(
{'db_name': self.db_name,})
global_tags = [
'db_name:{}'.format(self.instance.get('db_name')),
]
if self.tags is not None:
global_tags.extend(self.tags)
self._tags = global_tags
self._query_manager.tags = self._tags

def submit_health_checks(self):
# Check for connectivity
Expand Down
11 changes: 6 additions & 5 deletions duckdb/datadog_checks/duckdb/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
}

DUCKDDB_WAL = {
'name': 'duckdb_wal',
'name': 'wal_autocheckpoint',
'query': " SELECT CAST(SUBSTR(value, 1, LENGTH(value) - 3) AS INTEGER) * "
"CASE "
" WHEN RIGHT(value, 3) = 'KiB' THEN 1024 "
Expand All @@ -31,10 +31,11 @@
'columns': [{'name': 'wal_autocheckpoint', 'type': 'gauge'}],
}

DUCKDDB_WAL_2 = {
'name': 'duckdb_worker_threads',
'query': " select value from duckdb_settings() where name = 'worker_threads';",

DUCKDDB_THREADS = {
'name': 'worker_threads',
'query': "select value from duckdb_settings() where name = 'worker_threads';",
'columns': [{'name': 'worker_threads', 'type': 'gauge'}],
}

DEFAULT_QUERIES = [DUCKDDB_WAL_2]
DEFAULT_QUERIES = [DUCKDDB_THREADS, DUCKDDB_WAL, DUCKDB_VERSION]
1 change: 1 addition & 0 deletions duckdb/metadata.csv
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
metric_name,metric_type,interval,unit_name,per_unit_name,description,orientation,integration,short_name,curated_metric,sample_tags
duckdb.worker_threads,gauge,,,,The number of total threads used by the system,0,duckdb,,
8 changes: 6 additions & 2 deletions duckdb/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
from datadog_checks.dev import get_here

HERE = get_here()
DB_NAME = 'data/sample.db'
DB_NAME = 'data/sample_1.db'

DB = os.path.join(HERE, DB_NAME)

DEFAULT_INSTANCE = {'db_name': DB}

METRICS_MAP = ['duckdb.worker_threads']
METRICS_MAP = [
'duckdb.worker_threads',
'duckdb.wal_autocheckpoint',
'duckdb.wal_autocheckpoin',
]
2 changes: 0 additions & 2 deletions duckdb/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ def connection_db():

@pytest.fixture(scope='session')
def dd_environment():
# yield {"db_name": ":memory:"}
yield common.DEFAULT_INSTANCE


@pytest.fixture
def instance():
# return deepcopy({"db_name": ":memory:"})
return deepcopy(common.DEFAULT_INSTANCE)
12 changes: 4 additions & 8 deletions duckdb/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@

def test_check(dd_run_check, aggregator, instance):
# type: (Callable[[AgentCheck, bool], None], AggregatorStub, Dict[str, Any]) -> None
instance = {
'db_name': '/Users/hadhemi.samaali/go/src/github.com/DataDog/integrations-core/duckdb/tests/data/sample_1.db'
}
instance = common.DEFAULT_INSTANCE
check = DuckdbCheck('duckdb', {}, [instance])
dd_run_check(check)

Expand All @@ -26,11 +24,9 @@ def test_check(dd_run_check, aggregator, instance):
# aggregator.assert_metrics_using_metadata(get_metadata_metrics())


def test_emits_critical_service_check_when_service_is_down(dd_run_check, aggregator, instance):
def test_database_connection(dd_run_check, aggregator, instance):
# type: (Callable[[AgentCheck, bool], None], AggregatorStub, Dict[str, Any]) -> None
instance = {
'db_name': '/Users/hadhemi.samaali/go/src/github.com/DataDog/integrations-core/duckdb/tests/data/sample.db'
}
instance = common.DEFAULT_INSTANCE
check = DuckdbCheck('duckdb', {}, [instance])
dd_run_check(check)
aggregator.assert_service_check('duckdb.can_connect', DuckdbCheck.CRITICAL)
aggregator.assert_service_check('duckdb.can_connect', DuckdbCheck.OK)

0 comments on commit d5f4706

Please sign in to comment.