Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Augspurger committed Jul 9, 2024
1 parent c2866ef commit ecb7f88
Show file tree
Hide file tree
Showing 5 changed files with 2 additions and 17 deletions.
5 changes: 0 additions & 5 deletions pctasks/run/pctasks/run/argo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,35 +130,30 @@ def submit_workflow(
executor_config: WorkflowExecutorConfig,
runner_image: str,
) -> Dict[str, Any]:
print("=== SUBMITTING WORKFLOW ===")
b64encoded_config = b64encode(executor_config.to_yaml().encode("utf-8")).decode(
"utf-8"
)

run_settings = executor_config.run_settings

workflow_path = get_workflow_path(run_id)
print(f"Writing workflow to {workflow_path}")
workflow_uri = BlobUri(
f"blob://{run_settings.blob_account_name}/"
f"{run_settings.task_io_blob_container}/"
f"{workflow_path}"
)

print(f"TaskIO Storage from BlobStorage: {workflow_uri}")
task_io_storage = BlobStorage.from_account_key(
f"blob://{workflow_uri.storage_account_name}/{workflow_uri.container_name}",
account_key=run_settings.blob_account_key,
account_url=run_settings.blob_account_url,
)

print(f"Writing task text to {workflow_path}")
task_io_storage.write_text(
workflow_path,
submit_msg.to_yaml(),
)

print(f"Generating blob SAS token for {workflow_path}")
user_delegation_key = get_user_delegation_key(run_settings.blob_account_url)
input_blob_sas_token = generate_blob_sas(
account_name=run_settings.blob_account_name,
Expand Down
6 changes: 0 additions & 6 deletions pctasks/run/pctasks/run/task/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ def write_task_run_msg(run_msg: TaskRunMessage, settings: RunSettings) -> BlobCo
input_blob_sas_token = generate_blob_sas(
account_name=settings.blob_account_name,
user_delegation_key=user_delegation_key,
# account_key=settings.blob_account_key,
container_name=settings.task_io_blob_container,
blob_name=task_input_path,
start=datetime.utcnow(),
Expand Down Expand Up @@ -239,7 +238,6 @@ def prepare_task(
log_blob_sas_token = generate_blob_sas(
account_name=settings.blob_account_name,
user_delegation_key=user_delegation_key,
# account_key=settings.blob_account_key,
container_name=settings.task_io_blob_container,
blob_name=task_status_path,
start=datetime.utcnow(),
Expand All @@ -264,7 +262,6 @@ def prepare_task(
log_blob_sas_token = generate_blob_sas(
account_name=settings.blob_account_name,
user_delegation_key=user_delegation_key,
# account_key=settings.blob_account_key,
container_name=settings.log_blob_container,
blob_name=log_path,
start=datetime.utcnow(),
Expand All @@ -288,7 +285,6 @@ def prepare_task(
output_blob_sas_token = generate_blob_sas(
account_name=settings.blob_account_name,
user_delegation_key=get_user_delegation_key(settings.blob_account_url),
# account_key=settings.blob_account_key,
container_name=settings.task_io_blob_container,
blob_name=output_path,
start=datetime.utcnow(),
Expand Down Expand Up @@ -317,7 +313,6 @@ def prepare_task(
code_blob_sas_token = generate_blob_sas(
account_name=settings.blob_account_name,
user_delegation_key=get_user_delegation_key(settings.blob_account_url),
# account_key=settings.blob_account_key,
container_name=settings.code_blob_container,
blob_name=code_path,
start=datetime.utcnow(),
Expand Down Expand Up @@ -346,7 +341,6 @@ def prepare_task(
user_delegation_key=get_user_delegation_key(
settings.blob_account_url
),
# account_key=settings.blob_account_key,
container_name=settings.code_blob_container,
blob_name=requirements_path,
start=datetime.utcnow(),
Expand Down
3 changes: 0 additions & 3 deletions pctasks/run/pctasks/run/workflow/argo.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ class ArgoWorkflowRunner(WorkflowRunner):
def submit_workflow(
self, submit_msg: WorkflowSubmitMessage
) -> WorkflowSubmitResult:
print("Argo worflow runner", flush=True)
argo_host = self.run_settings.argo_host
argo_token = self.run_settings.argo_token
runner_image = self.run_settings.workflow_runner_image
Expand All @@ -25,14 +24,12 @@ def submit_workflow(
)

try:
print("Submitting the workflow...")
submit_results = argo_client.submit_workflow(
submit_msg,
run_id=submit_msg.run_id,
executor_config=self.get_executor_config(),
runner_image=runner_image,
)
print(f"Submition results!: {submit_results}", flush=True)
except Exception as e:
return WorkflowSubmitResult(
run_id=submit_msg.run_id,
Expand Down
1 change: 0 additions & 1 deletion pctasks/server/pctasks/server/routes/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ async def submit_workflow(
trigger_event=submit_request.trigger_event,
)

logger.info(f"Submit message: {submit_msg}")
workflow_runner = get_workflow_runner()

async with AsyncWorkflowRunsContainer(WorkflowRunRecord) as workflow_runs:
Expand Down
4 changes: 2 additions & 2 deletions scripts/publish
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ if [ "${BASH_SOURCE[0]}" = "${0}" ]; then
else
publish pctasks-server
publish pctasks-run
# publish pctasks-ingest
# publish pctasks-task-base
publish pctasks-ingest
publish pctasks-task-base
fi

fi
Expand Down

0 comments on commit ecb7f88

Please sign in to comment.