From b9d386d739377d445773be7ae3001aa44d0bef6a Mon Sep 17 00:00:00 2001 From: Xu Chen Date: Tue, 9 Jan 2024 18:58:21 +0800 Subject: [PATCH] fix python meta config parse logic (#388) Signed-off-by: chenxu Co-authored-by: chenxu --- python/lakesoul/metadata/native_client.py | 52 +++++++++++++------ .../{ => lakesoul/spark/testing}/run-tests.py | 0 python/metadata_poc.py | 26 ---------- python/parquet_writer.py | 32 ------------ 4 files changed, 37 insertions(+), 73 deletions(-) rename python/{ => lakesoul/spark/testing}/run-tests.py (100%) delete mode 100644 python/metadata_poc.py delete mode 100644 python/parquet_writer.py diff --git a/python/lakesoul/metadata/native_client.py b/python/lakesoul/metadata/native_client.py index 188df92a4..6571ff728 100644 --- a/python/lakesoul/metadata/native_client.py +++ b/python/lakesoul/metadata/native_client.py @@ -12,26 +12,53 @@ from .generated import entity_pb2 global config -config = None +config = "host=127.0.0.1 port=5432 user=lakesoul_test password=lakesoul_test" def reset_pg_conf(conf): global config config = " ".join(conf) +def parse_jdbc_url(url): + from urllib.parse import urlparse + if url.startswith('jdbc:'): + url = url[5:] + parsed_url = urlparse(url) + host, port = parsed_url.netloc.split(':') + return host, port, parsed_url.path[1:] def get_pg_conf_from_env(): import os conf = [] fields = 'host', 'port', 'dbname', 'user', 'password' - for field in fields: - key = 'LAKESOUL_METADATA_PG_%s' % field.upper() - value = os.environ.get(key) - if value is not None: - conf.append('%s=%s' % (field, value)) - if conf: + if os.environ.get("LAKESOUL_HOME") is not None: + import configparser + tmp_conf = configparser.ConfigParser() + with open(os.environ.get("LAKESOUL_HOME"), "r") as stream: + tmp_conf.read_string("[top]\n" + stream.read()) + tmp_conf = tmp_conf["top"] + host, port, dbname = parse_jdbc_url(tmp_conf['lakesoul.pg.url']) + conf.append('host=%s' % host) + conf.append('port=%s' % port) + conf.append('dbname=%s' % dbname) + conf.append('user=%s' % tmp_conf["lakesoul.pg.username"]) + conf.append('password=%s' % tmp_conf["lakesoul.pg.password"]) return conf - return None + env_vars = ['LAKESOUL_PG_URL', 'LAKESOUL_PG_USERNAME', 'LAKESOUL_PG_PASSWORD'] + for evar in env_vars: + if evar not in os.environ: + # or if you want to raise exception + print('Warning: Environment variable "%s" was not set and will use default meta db: ' + 'jdbc:postgresql://localhost:5432/lakesoul_test?stringtype=unspecified' % evar) + return None + host, port, dbname = parse_jdbc_url(os.environ.get('LAKESOUL_PG_URL')) + conf.append('host=%s' % host) + conf.append('port=%s' % port) + conf.append('dbname=%s' % dbname) + conf.append('user=%s' % os.environ.get('LAKESOUL_PG_USERNAME')) + conf.append('password=%s' % os.environ.get('LAKESOUL_PG_PASSWORD')) + + return conf class NativeMetadataClient: @@ -51,13 +78,8 @@ def callback(bool, msg): def target(): global config - if config is None: - conf = get_pg_conf_from_env() - if conf is None: - message = "set LAKESOUL_METADATA_PG_* environment variables or " - message += "call lakesoul.metadata.native_client.reset_pg_conf " - message += "to configure LakeSoul metadata" - raise RuntimeError(message) + conf = get_pg_conf_from_env() + if conf is not None: reset_pg_conf(conf) return lib.lakesoul_metadata_c.create_tokio_postgres_client(CFUNCTYPE(c_void_p, c_bool, c_char_p)(callback), config.encode("utf-8"), diff --git a/python/run-tests.py b/python/lakesoul/spark/testing/run-tests.py similarity index 100% rename from python/run-tests.py rename to python/lakesoul/spark/testing/run-tests.py diff --git a/python/metadata_poc.py b/python/metadata_poc.py deleted file mode 100644 index 4b27404ca..000000000 --- a/python/metadata_poc.py +++ /dev/null @@ -1,26 +0,0 @@ -# SPDX-FileCopyrightText: 2023 LakeSoul Contributors -# -# SPDX-License-Identifier: Apache-2.0 - -from lakesoul.metadata.db_manager import DBManager -from lakesoul.metadata.lib import reload_lib -from lakesoul.metadata.native_client import reset_pg_conf - -if __name__ == '__main__': - reset_pg_conf( - ["host=localhost", "port=5433", " dbname=test_lakesoul_meta", " user=yugabyte", "password=yugabyte"]) - - db_manager = DBManager() - table_name = "test_datatypes" - data_files = db_manager.get_data_files_by_table_name(table_name) - print(data_files) - data_files = db_manager.get_data_files_by_table_name(table_name) - print(data_files) - arrow_schema = db_manager.get_arrow_schema_by_table_name(table_name) - print(arrow_schema) - # data_files = db_manager.get_data_files_by_table_name("imdb") - # print(data_files) - # data_files = db_manager.get_data_files_by_table_name("imdb", partitions={"split": "train"}) - # print(data_files) - # arrow_schema = db_manager.get_arrow_schema_by_table_name("imdb") - # print(arrow_schema) diff --git a/python/parquet_writer.py b/python/parquet_writer.py deleted file mode 100644 index c3dfbade2..000000000 --- a/python/parquet_writer.py +++ /dev/null @@ -1,32 +0,0 @@ -# SPDX-FileCopyrightText: 2023 LakeSoul Contributors -# -# SPDX-License-Identifier: Apache-2.0 - -def write_file(): - """Original commands used to write 'large_file.parquet' - - This locally-written file was moved to S3 and GCS using `fs.put`. - """ - from dask.datasets import timeseries - - dtypes = {} - # dtypes.update({f"str{i}": str for i in range(15)}) - dtypes.update({f"int{i}": int for i in range(15)}) - - for seed in range(3): - df = timeseries( - start='2000-12-02', - end='2000-12-03', - freq='1800s', - partition_freq='1d', - dtypes=dtypes, - seed=seed, - ).reset_index(drop=True).compute() - - print(len(df)) - row_group_size = (len(df) // 5) # Want 10 row groups - - df.to_parquet(f"small_{seed}.parquet", engine="pyarrow", row_group_size=row_group_size) - -if __name__ == '__main__': - write_file() \ No newline at end of file