Skip to content

Commit

Permalink
Updated Prefect demo so it will work with local lakeFS Enterprise (#206)
Browse files Browse the repository at this point in the history
Co-authored-by: Your Name <[email protected]>
  • Loading branch information
kesarwam and Your Name authored Jun 21, 2024
1 parent 5a002df commit 75a08ad
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@
"prefect_ui_endpoint_variable_creation_request = requests.post(prefectAPIEndPoint + '/variables', json={\"name\": \"prefect_ui_endpoint\", \"value\": prefectUIEndPoint})\n",
"\n",
"if lakefsEndPoint.startswith('http://host.docker.internal'):\n",
" lakefsUIEndPoint = 'http://127.0.0.1:8000'\n",
" lakefsUIEndPoint = lakefsEndPoint.replace('host.docker.internal','127.0.0.1')\n",
"elif lakefsEndPoint.startswith('http://lakefs'):\n",
" lakefsUIEndPoint = 'http://127.0.0.1:58000'\n",
"else:\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def load(total_order_value: float, new_branch: str):

# [START of lakeFS Code]
branch = lakefs.repository(variables.get('repo')).branch(new_branch)
w = branch.object("total_order_value.txt").writer(metadata={'using': 'python_wrapper', 'source':'Sales system'})
w = branch.object("total_order_value.txt").writer(pre_sign=False, metadata={'using': 'python_wrapper', 'source':'Sales system'})
w.write(f"Total order value is: {total_order_value:.2f}")
w.close()
# [END of lakeFS Code]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def load_file(new_branch: str):
branch_name = new_branch + '_etl_load'
branch = lakefs.repository(variables.get("repo")).branch(branch_name)
obj = branch.object(path=variables.get("file_name"))
with open(variables.get("file_path")+'/'+variables.get("file_name"), mode='rb') as reader, obj.writer(mode='wb') as writer:
with open(variables.get("file_path")+'/'+variables.get("file_name"), mode='rb') as reader, obj.writer(mode='wb', pre_sign=False) as writer:
writer.write(reader.read())

my_logger = get_run_logger()
Expand Down Expand Up @@ -252,51 +252,51 @@ def merge_etl_load_branch(new_branch: str):
def etl_task1(new_branch: str):
repo = lakefs.repository(variables.get("repo"))
branch = repo.branch(new_branch+'_etl_task1')
content = branch.object(path=variables.get("file_name")).reader(mode='r').read()
content = branch.object(path=variables.get("file_name")).reader(mode='r', pre_sign=False).read()
if len(content.split(",")) < 1:
raise Exception("Column _c0 not found in schema")
else:
return branch.object(path=variables.get("new_path") + '_c0/' + variables.get("file_name")).upload(mode='wb', data=content)
return branch.object(path=variables.get("new_path") + '_c0/' + variables.get("file_name")).upload(mode='wb', data=content, pre_sign=False)

@task(name='ETL Task2_1')
def etl_task2_1(new_branch: str):
repo = lakefs.repository(variables.get("repo"))
branch = repo.branch(new_branch+'_etl_task2')
content = branch.object(path=variables.get("file_name")).reader(mode='r').read()
content = branch.object(path=variables.get("file_name")).reader(mode='r', pre_sign=False).read()
if len(content.split(",")) < 2:
raise Exception("Column _c1 not found in schema struct<_c0:string>")
else:
return branch.object(path=variables.get("new_path") + '_c1/' + variables.get("file_name")).upload(mode='wb', data=content)
return branch.object(path=variables.get("new_path") + '_c1/' + variables.get("file_name")).upload(mode='wb', data=content, pre_sign=False)

@task(name='ETL Task2_2')
def etl_task2_2(new_branch: str):
repo = lakefs.repository(variables.get("repo"))
branch = repo.branch(new_branch+'_etl_task2')
content = branch.object(path=variables.get("file_name")).reader(mode='r').read()
content = branch.object(path=variables.get("file_name")).reader(mode='r', pre_sign=False).read()
if len(content.split(",")) < 3:
raise Exception("Column _c2 not found in schema struct<_c0:string,_c1:string>")
else:
return branch.object(path=variables.get("new_path") + '_c2/' + variables.get("file_name")).upload(mode='wb', data=content)
return branch.object(path=variables.get("new_path") + '_c2/' + variables.get("file_name")).upload(mode='wb', data=content, pre_sign=False)

@task(name='ETL Task2_3')
def etl_task2_3(new_branch: str):
repo = lakefs.repository(variables.get("repo"))
branch = repo.branch(new_branch+'_etl_task2')
content = branch.object(path=variables.get("file_name")).reader(mode='r').read()
content = branch.object(path=variables.get("file_name")).reader(mode='r', pre_sign=False).read()
if len(content.split(",")) < 4:
raise Exception("Column _c3 not found in schema struct<_c0:string,_c1:string,_c2:string>")
else:
return branch.object(path=variables.get("new_path") + '_c3/' + variables.get("file_name")).upload(mode='wb', data=content)
return branch.object(path=variables.get("new_path") + '_c3/' + variables.get("file_name")).upload(mode='wb', data=content, pre_sign=False)

@task(name='ETL Task3')
def etl_task3(new_branch: str):
repo = lakefs.repository(variables.get("repo"))
branch = repo.branch(new_branch+'_etl_task3')
content = branch.object(path=variables.get("file_name")).reader(mode='r').read()
content = branch.object(path=variables.get("file_name")).reader(mode='r', pre_sign=False).read()
if len(content.split(",")) < 5:
raise Exception("Column _c4 not found in schema struct<_c0:string,_c1:string,_c2:string,_c3:string>")
else:
return branch.object(path=variables.get("new_path") + '_c4/' + variables.get("file_name")).upload(mode='wb', data=content)
return branch.object(path=variables.get("new_path") + '_c4/' + variables.get("file_name")).upload(mode='wb', data=content, pre_sign=False)

@flow(name='lakeFS New DAG', flow_run_name=generate_flow_run_name)
def lakefs_new_dag():
Expand Down

0 comments on commit 75a08ad

Please sign in to comment.