-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_recipe.py
205 lines (142 loc) · 8 KB
/
run_recipe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
from pprint import pprint
import argparse
from prefect import flow, get_run_logger
from prefect.runtime import flow_run
from sous_chef import RunPipeline, recipe_loader
import json
import os
from copy import copy
from prefect_aws import AwsCredentials
from datetime import date, timedelta, datetime
from email_flows import send_run_summary_email
def daterange(start_date, end_date):
for n in range(int((end_date - start_date).days)):
yield start_date + timedelta(n)
def generate_run_name_folder():
params = flow_run.parameters
name = params["recipe_directory"].split("sous-chef-recipes")[-1].replace("/", "-")
return name.strip("-")
def RunFilesystemRecipe(recipe_stream, recipe_location, test:bool):
logger = get_run_logger()
json_conf = recipe_loader.yaml_to_conf(recipe_stream)
if "name" not in json_conf:
name = recipe_location.replace("/", "-").replace("..", "").split(".")[0]
json_conf["name"] = name
logger.info(f"Loaded recipe at {recipe_location}, Running pipeline:")
run_data = {json_conf["name"] : RunPipeline(json_conf)}
return run_data
def RunTemplatedRecipe(recipe_str:str, mixin_str:str, recipe_location:str, test:bool, email_on_subflow:bool=False, email_to:list=[]):
logger = get_run_logger()
mixins = recipe_loader.load_mixins(mixin_str)
run_data = {}
for template_params in mixins:
json_conf = recipe_loader.t_yaml_to_conf(recipe_str, **template_params)
if "name" not in json_conf:
name = recipe_location.split(".")[0].split("/")[-1]+template_params["NAME"]
json_conf["name"] = name
logger.info(f"Loaded recipe at {recipe_location} with mixin {template_params['NAME']}, Running pipeline:")
run_data[json_conf["name"]] = RunPipeline(json_conf)
if email_on_subflow:
send_run_summary_email(run_data, email_to)
return run_data
#Run a query and recipe over a sequence of days
@flow(flow_run_name=generate_run_name_folder)
def IteratedRecipe(recipe_directory:str, start_date: str, end_date: str|None = None):
logger = get_run_logger()
recipe_location = recipe_directory+"recipe.yaml"
mixin_location = recipe_directory+"mixins.yaml"
recipe_stream = open(recipe_directory+"/recipe.yaml", "r").read()
mixin_stream = open(recipe_directory+"/mixins.yaml", "r").read()
run_data = RunIteratedRecipe(recipe_stream, recipe_location, mixin_stream, start_date, end_date)
#As above but loading content arbitrarily as strs instead of file locations.
def RunIteratedRecipe(recipe_str:str, recipe_location:str, mixin_str: str, start_date:str, end_date:str,
email_to:list=["[email protected]"]):
logger = get_run_logger()
mixins = recipe_loader.load_mixins(mixin_str)
if end_date is None:
end_date = datetime.today()
else:
end_date = datetime.strptime(end_date, "%Y-%m-%d")
start_date = datetime.strptime(start_date, "%Y-%m-%d")
run_data = {}
#Iterate over all the days in the daterange
for window_end in daterange(start_date, end_date):
date_run_data = {}
window_start = window_end - timedelta(days=1)
window_start = window_start.strftime("%Y-%m-%d")
window_end = window_end.strftime("%Y-%m-%d")
for template_params in mixins:
template_params = copy(template_params)
template_params["START_DATE"] = f"'{window_start}'"
template_params["END_DATE"] = f"'{window_end}'"
template_params["NAME"] += f"-{window_start}"
json_conf = recipe_loader.t_yaml_to_conf(recipe_str, **template_params)
if "name" not in json_conf:
name = recipe_location.split(".")[0].split("/")[-1]+template_params["NAME"]
json_conf["name"] = name
logger.info(f"Loaded recipe at {recipe_location} with mixin {template_params['NAME']}, Running pipeline:")
date_run_data[name] = RunPipeline(json_conf)
send_run_summary_email(date_run_data, email_to)
run_data[window_end] = date_run_data
return run_data
#Main flow entrypoint.
@flow(flow_run_name=generate_run_name_folder)
def RunRecipeDirectory(recipe_directory:str, email_to:list = ["[email protected]"], test:bool=False):
if "mixins.yaml" in os.listdir(recipe_directory):
recipe_stream = open(recipe_directory+"/recipe.yaml", "r").read()
mixin_stream = open(recipe_directory+"/mixins.yaml", "r")
run_data = RunTemplatedRecipe(recipe_stream, mixin_stream, recipe_directory, test)
else:
recipe_stream = open(recipe_directory+"/recipe.yaml", "r").read()
run_data = RunFilesystemRecipe(recipe_stream, recipe_directory, test)
send_run_summary_email(run_data, email_to)
@flow(flow_run_name=generate_run_name_folder)
def RunS3BucketRecipe(credentials_block_name: str, recipe_bucket:str, recipe_directory:str, email_to:list = ["[email protected]"],
test:bool=False, email_on_subflow:bool=False):
##Pull down recipe data from S3, then run that recipe in the local environment.
aws_credentials = AwsCredentials.load(credentials_block_name)
s3_client = aws_credentials.get_boto3_session().client("s3")
all_objects = s3_client.list_objects_v2(
Bucket=recipe_bucket
)
objects = [o["Key"] for o in all_objects["Contents"] if recipe_directory in o["Key"] and "." in o["Key"]]
order_content = {}
for component in objects:
final_name = component.split("/")[-1]
order_content[final_name] = s3_client.get_object(Bucket=recipe_bucket, Key=component)["Body"].read().decode('utf-8')
if any(["mixins.yaml" in o for o in objects]):
run_data = RunTemplatedRecipe(order_content["recipe.yaml"], order_content["mixins.yaml"], recipe_directory, test,
email_on_subflow=email_on_subflow, email_to=email_to)
else:
run_data = RunFilesystemRecipe(order_content["recipe.yaml"], recipe_directory, test)
send_run_summary_email(run_data, email_to)
###In progress....
@flow(flow_run_name=generate_run_name_folder)
def IteratedS3BucketRecipe(credentials_block_name:str, recipe_bucket:str, recipe_directory:str,
start_date:str, end_date:str):
aws_credentials = AwsCredentials.load(credentials_block_name)
s3_client = aws_credentials.get_boto3_session().client("s3")
all_objects = s3_client.list_objects_v2(
Bucket=recipe_bucket
)
objects = [o["Key"] for o in all_objects["Contents"] if recipe_directory in o["Key"] and ".yaml" in o["Key"]]
order_content = {}
for component in objects:
final_name = component.split("/")[-1]
order_content[final_name] = s3_client.get_object(Bucket=recipe_bucket, Key=component)["Body"].read().decode('utf-8')
RunIteratedRecipe(order_content["recipe.yaml"], recipe_directory, order_content["mixins.yaml"], start_date, end_date)
if __name__ == "__main__":
#These entrypoints are here for the sake of local development before deployment.
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--recipe-directory", help="A directory with a recipe.yaml and perhaps a mixins.yaml file to generate runs from")
parser.add_argument("-s", "--start-date", help="Start date in YYYY-MM-DD to iterate the recipe query over. Triggers iterated recipe")
parser.add_argument("-e", "--end-date", help="End date in YYYY-MM-DD to iterate the recipe query over. Triggers iterated recipe. Defaults to today if none.")
parser.add_argument("-b", "--bucket", action='store_true')
parser.add_argument("-t", "--test", action='store_true', help="Validate recipe")
args = parser.parse_args()
if args.bucket:
RunS3BucketRecipe("aws-s3-credentials", "sous-chef-recipes", args.recipe_directory, test=args.test)
elif args.start_date is None:
RunRecipeDirectory(args.recipe_directory, test=args.test)
else:
IteratedRecipe(args.recipe_directory, args.start_date)