From 0279f63937b5bc0189491896339cc46922aec611 Mon Sep 17 00:00:00 2001 From: Guen Prawiroatmodjo Date: Tue, 19 Nov 2024 10:10:16 -0800 Subject: [PATCH 1/2] add custom_user_agent when connecting to MotherDuck --- src/bytewax/duckdb/__init__.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/bytewax/duckdb/__init__.py b/src/bytewax/duckdb/__init__.py index de7ce76..b01d1cb 100644 --- a/src/bytewax/duckdb/__init__.py +++ b/src/bytewax/duckdb/__init__.py @@ -71,6 +71,7 @@ import os import sys from typing import List, Optional +from urllib.parse import parse_qsl, urlparse if "BYTEWAX_LICENSE" not in os.environ: msg = ( @@ -89,6 +90,8 @@ from bytewax.operators import V from bytewax.outputs import FixedPartitionedSink, StatefulSinkPartition +MOTHERDUCK_SCHEME = "md" + class DuckDBSinkPartition(StatefulSinkPartition[V, None]): """Stateful sink partition for writing data to either local DuckDB or MotherDuck.""" @@ -111,7 +114,16 @@ def __init__( resume_state (None): Unused, as this sink does not perform recovery. """ self.table_name = table_name - self.conn = md_duckdb.connect(db_path) + parsed_db_path = urlparse(db_path) + path = parsed_db_path.path + config = dict(parse_qsl(parsed_db_path.query)) + + if parsed_db_path.scheme == MOTHERDUCK_SCHEME: + path = f"md:{parsed_db_path.path}" + if "custom_user_agent" not in config: + config["custom_user_agent"] = "bytewax" + + self.conn = md_duckdb.connect(path, config=config) # Only create the table if specified and if it doesn't already exist if create_table_sql: From 21e86105c26b9e0b6c3db1fc864d75a67176fbd1 Mon Sep 17 00:00:00 2001 From: Guen Prawiroatmodjo Date: Tue, 19 Nov 2024 10:14:38 -0800 Subject: [PATCH 2/2] use MOTHERDUCK_SCHEME --- src/bytewax/duckdb/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bytewax/duckdb/__init__.py b/src/bytewax/duckdb/__init__.py index b01d1cb..ab137fe 100644 --- a/src/bytewax/duckdb/__init__.py +++ b/src/bytewax/duckdb/__init__.py @@ -119,7 +119,7 @@ def __init__( config = dict(parse_qsl(parsed_db_path.query)) if parsed_db_path.scheme == MOTHERDUCK_SCHEME: - path = f"md:{parsed_db_path.path}" + path = f"{MOTHERDUCK_SCHEME}:{parsed_db_path.path}" if "custom_user_agent" not in config: config["custom_user_agent"] = "bytewax"