diff --git a/01_standalone_examples/prefect-integration/prefect/Common/CommonSetup.ipynb b/01_standalone_examples/prefect-integration/prefect/Common/CommonSetup.ipynb index ecc9ee2f5..461117e22 100644 --- a/01_standalone_examples/prefect-integration/prefect/Common/CommonSetup.ipynb +++ b/01_standalone_examples/prefect-integration/prefect/Common/CommonSetup.ipynb @@ -183,7 +183,7 @@ "prefect_ui_endpoint_variable_creation_request = requests.post(prefectAPIEndPoint + '/variables', json={\"name\": \"prefect_ui_endpoint\", \"value\": prefectUIEndPoint})\n", "\n", "if lakefsEndPoint.startswith('http://host.docker.internal'):\n", - " lakefsUIEndPoint = 'http://127.0.0.1:8000'\n", + " lakefsUIEndPoint = lakefsEndPoint.replace('host.docker.internal','127.0.0.1')\n", "elif lakefsEndPoint.startswith('http://lakefs'):\n", " lakefsUIEndPoint = 'http://127.0.0.1:58000'\n", "else:\n", diff --git a/01_standalone_examples/prefect-integration/prefect/Existing_DAG/lakefs_tutorial_taskflow_api_etl.py b/01_standalone_examples/prefect-integration/prefect/Existing_DAG/lakefs_tutorial_taskflow_api_etl.py index 71c07fbdd..779c20a48 100644 --- a/01_standalone_examples/prefect-integration/prefect/Existing_DAG/lakefs_tutorial_taskflow_api_etl.py +++ b/01_standalone_examples/prefect-integration/prefect/Existing_DAG/lakefs_tutorial_taskflow_api_etl.py @@ -50,7 +50,7 @@ def load(total_order_value: float, new_branch: str): # [START of lakeFS Code] branch = lakefs.repository(variables.get('repo')).branch(new_branch) - w = branch.object("total_order_value.txt").writer(metadata={'using': 'python_wrapper', 'source':'Sales system'}) + w = branch.object("total_order_value.txt").writer(pre_sign=False, metadata={'using': 'python_wrapper', 'source':'Sales system'}) w.write(f"Total order value is: {total_order_value:.2f}") w.close() # [END of lakeFS Code] diff --git a/01_standalone_examples/prefect-integration/prefect/New_DAG/lakefs_new_dag.py b/01_standalone_examples/prefect-integration/prefect/New_DAG/lakefs_new_dag.py index f36572c0f..2f4857473 100644 --- a/01_standalone_examples/prefect-integration/prefect/New_DAG/lakefs_new_dag.py +++ b/01_standalone_examples/prefect-integration/prefect/New_DAG/lakefs_new_dag.py @@ -86,7 +86,7 @@ def load_file(new_branch: str): branch_name = new_branch + '_etl_load' branch = lakefs.repository(variables.get("repo")).branch(branch_name) obj = branch.object(path=variables.get("file_name")) - with open(variables.get("file_path")+'/'+variables.get("file_name"), mode='rb') as reader, obj.writer(mode='wb') as writer: + with open(variables.get("file_path")+'/'+variables.get("file_name"), mode='rb') as reader, obj.writer(mode='wb', pre_sign=False) as writer: writer.write(reader.read()) my_logger = get_run_logger() @@ -252,51 +252,51 @@ def merge_etl_load_branch(new_branch: str): def etl_task1(new_branch: str): repo = lakefs.repository(variables.get("repo")) branch = repo.branch(new_branch+'_etl_task1') - content = branch.object(path=variables.get("file_name")).reader(mode='r').read() + content = branch.object(path=variables.get("file_name")).reader(mode='r', pre_sign=False).read() if len(content.split(",")) < 1: raise Exception("Column _c0 not found in schema") else: - return branch.object(path=variables.get("new_path") + '_c0/' + variables.get("file_name")).upload(mode='wb', data=content) + return branch.object(path=variables.get("new_path") + '_c0/' + variables.get("file_name")).upload(mode='wb', data=content, pre_sign=False) @task(name='ETL Task2_1') def etl_task2_1(new_branch: str): repo = lakefs.repository(variables.get("repo")) branch = repo.branch(new_branch+'_etl_task2') - content = branch.object(path=variables.get("file_name")).reader(mode='r').read() + content = branch.object(path=variables.get("file_name")).reader(mode='r', pre_sign=False).read() if len(content.split(",")) < 2: raise Exception("Column _c1 not found in schema struct<_c0:string>") else: - return branch.object(path=variables.get("new_path") + '_c1/' + variables.get("file_name")).upload(mode='wb', data=content) + return branch.object(path=variables.get("new_path") + '_c1/' + variables.get("file_name")).upload(mode='wb', data=content, pre_sign=False) @task(name='ETL Task2_2') def etl_task2_2(new_branch: str): repo = lakefs.repository(variables.get("repo")) branch = repo.branch(new_branch+'_etl_task2') - content = branch.object(path=variables.get("file_name")).reader(mode='r').read() + content = branch.object(path=variables.get("file_name")).reader(mode='r', pre_sign=False).read() if len(content.split(",")) < 3: raise Exception("Column _c2 not found in schema struct<_c0:string,_c1:string>") else: - return branch.object(path=variables.get("new_path") + '_c2/' + variables.get("file_name")).upload(mode='wb', data=content) + return branch.object(path=variables.get("new_path") + '_c2/' + variables.get("file_name")).upload(mode='wb', data=content, pre_sign=False) @task(name='ETL Task2_3') def etl_task2_3(new_branch: str): repo = lakefs.repository(variables.get("repo")) branch = repo.branch(new_branch+'_etl_task2') - content = branch.object(path=variables.get("file_name")).reader(mode='r').read() + content = branch.object(path=variables.get("file_name")).reader(mode='r', pre_sign=False).read() if len(content.split(",")) < 4: raise Exception("Column _c3 not found in schema struct<_c0:string,_c1:string,_c2:string>") else: - return branch.object(path=variables.get("new_path") + '_c3/' + variables.get("file_name")).upload(mode='wb', data=content) + return branch.object(path=variables.get("new_path") + '_c3/' + variables.get("file_name")).upload(mode='wb', data=content, pre_sign=False) @task(name='ETL Task3') def etl_task3(new_branch: str): repo = lakefs.repository(variables.get("repo")) branch = repo.branch(new_branch+'_etl_task3') - content = branch.object(path=variables.get("file_name")).reader(mode='r').read() + content = branch.object(path=variables.get("file_name")).reader(mode='r', pre_sign=False).read() if len(content.split(",")) < 5: raise Exception("Column _c4 not found in schema struct<_c0:string,_c1:string,_c2:string,_c3:string>") else: - return branch.object(path=variables.get("new_path") + '_c4/' + variables.get("file_name")).upload(mode='wb', data=content) + return branch.object(path=variables.get("new_path") + '_c4/' + variables.get("file_name")).upload(mode='wb', data=content, pre_sign=False) @flow(name='lakeFS New DAG', flow_run_name=generate_flow_run_name) def lakefs_new_dag():