-
Hi - Below is a Flow that returns the expected results when the input is a single number .i.e Working example with input NOT a listreturns 3 mapped Items
|
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 2 replies
-
Hi @gryBox, the above doesn't run clean for me. Rather than guess what you want and where the typo is, can you fix your provided code to run cleanly? After that, which input(s) to what task(s) do you want to change from a scalar value to a mapped value? from prefect import Flow, task, apply_map, Parameter, unmapped
import random
@task
def list_maker(input_num):
print(input_num)
lst = [random.randint(1, 10) for i in range(input_num)]
print(lst)
return lst
@task
def expensive_func(num_from_random_lst, num_from_input):
return num_from_random_lst * num_from_input
with Flow("map the map") as flow:
input_num_lst = Parameter("input_num_lst", default=3)
random_list = list_maker(input_num)
expensive_func = expensive_func.map(random_list, unmapped(input_num_lst))
flow.run() $ python test.py
Traceback (most recent call last):
File "test.py", line 21, in <module>
random_list = list_maker(input_num)
NameError: name 'input_num' is not defined |
Beta Was this translation helpful? Give feedback.
-
@jcrist Using your flow produces strange results both visually and as output. Between this roadblock and another issue I've opened up Multiple flows cannot be used with the same case statement I think it's time for me to re-architect the flow, so I don't run into these contortions? Results
|
Beta Was this translation helpful? Give feedback.
-
Sorry, I'm left guessing at what you actually want here. Here's another attempt: from prefect import Flow, task, apply_map, Parameter, unmapped, flatten
import random
@task
def list_maker(input_num):
return [(random.randint(1, 10), input_num) for i in range(input_num)]
@task
def expensive_func(input):
num_from_random_lst, num_from_input = input
return num_from_random_lst * num_from_input
with Flow("map the map") as flow:
input_num_lst = Parameter("input_num_lst", default=[2, 3])
random_list = list_maker.map(input_num_lst)
output = expensive_func.map(flatten(random_list))
state = flow.run()
print(state.result[output].result) $ python test.py
[2020-09-10 17:40:04] INFO - prefect.FlowRunner | Beginning Flow run for 'map the map'
[2020-09-10 17:40:04] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2020-09-10 17:40:04] DEBUG - prefect.FlowRunner | Flow 'map the map': Handling state change from Scheduled to Running
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'input_num_lst': Starting task run...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'input_num_lst': Handling state change from Pending to Running
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'input_num_lst': Calling task.run() method...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'input_num_lst': Handling state change from Running to Success
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'input_num_lst': finished task run for task with final state: 'Success'
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'list_maker': Starting task run...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'list_maker': Handling state change from Pending to Mapped
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'list_maker': finished task run for task with final state: 'Mapped'
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'list_maker[0]': Starting task run...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'list_maker[0]': Handling state change from Pending to Running
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'list_maker[0]': Calling task.run() method...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'list_maker[0]': Handling state change from Running to Success
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'list_maker[0]': finished task run for task with final state: 'Success'
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'list_maker[1]': Starting task run...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'list_maker[1]': Handling state change from Pending to Running
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'list_maker[1]': Calling task.run() method...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'list_maker[1]': Handling state change from Running to Success
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'list_maker[1]': finished task run for task with final state: 'Success'
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'expensive_func': Starting task run...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'expensive_func': Handling state change from Pending to Mapped
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'expensive_func': finished task run for task with final state: 'Mapped'
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'expensive_func[0]': Starting task run...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'expensive_func[0]': Handling state change from Pending to Running
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'expensive_func[0]': Calling task.run() method...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'expensive_func[0]': Handling state change from Running to Success
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'expensive_func[0]': finished task run for task with final state: 'Success'
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'expensive_func[1]': Starting task run...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'expensive_func[1]': Handling state change from Pending to Running
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'expensive_func[1]': Calling task.run() method...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'expensive_func[1]': Handling state change from Running to Success
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'expensive_func[1]': finished task run for task with final state: 'Success'
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'expensive_func[2]': Starting task run...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'expensive_func[2]': Handling state change from Pending to Running
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'expensive_func[2]': Calling task.run() method...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'expensive_func[2]': Handling state change from Running to Success
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'expensive_func[2]': finished task run for task with final state: 'Success'
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'expensive_func[3]': Starting task run...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'expensive_func[3]': Handling state change from Pending to Running
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'expensive_func[3]': Calling task.run() method...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'expensive_func[3]': Handling state change from Running to Success
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'expensive_func[3]': finished task run for task with final state: 'Success'
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'expensive_func[4]': Starting task run...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'expensive_func[4]': Handling state change from Pending to Running
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'expensive_func[4]': Calling task.run() method...
[2020-09-10 17:40:04] DEBUG - prefect.TaskRunner | Task 'expensive_func[4]': Handling state change from Running to Success
[2020-09-10 17:40:04] INFO - prefect.TaskRunner | Task 'expensive_func[4]': finished task run for task with final state: 'Success'
[2020-09-10 17:40:04] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-09-10 17:40:04] DEBUG - prefect.FlowRunner | Flow 'map the map': Handling state change from Running to Success
[20, 16, 24, 3, 24]
It gets called 5 times (2 + 3), and |
Beta Was this translation helpful? Give feedback.
-
@jcrist Exactly the output I was looking for.
Again I appreciate your help. |
Beta Was this translation helpful? Give feedback.
Sorry, I'm left guessing at what you actually want here. Here's another attempt: