Mapping over launch plans
You can map over launch plans the same way you can map over tasks to execute workflows in parallel across a series of inputs.
You can either map over a LaunchPlan
object defined in one of your Python modules or a reference launch plan that points to a previously registered launch plan.
Launch plan defined in your code
Here we define a workflow called interest_workflow
that we want to parallelize, along with a launch plan called interest_workflow_lp
, in a file we’ll call map_interest_wf.py
.
We then write a separate workflow, map_interest_wf
, that uses a map
to parallelize interest_workflow
over a list of inputs.
import union
# Task to calculate monthly interest payment on a loan
@union.task
def calculate_interest(principal: int, rate: float, time: int) -> float:
return (principal * rate * time) / 12
# Workflow using the calculate_interest task
@union.workflow
def interest_workflow(principal: int, rate: float, time: int) -> float:
return calculate_interest(principal=principal, rate=rate, time=time)
# Create LaunchPlan for interest_workflow
lp = union.LaunchPlan.get_or_create(
workflow=interest_workflow,
name="interest_workflow_lp",
)
# Mapping over the launch plan to calculate interest for multiple loans
@union.workflow
def map_interest_wf() -> list[float]:
principal = [1000, 5000, 10000]
rate = [0.05, 0.04, 0.03] # Different interest rates for each loan
time = [12, 24, 36] # Loan periods in months
return union.map(lp)(principal=principal, rate=rate, time=time)
# Mapping over the launch plan to calculate interest for multiple loans while fixing an input
@union.workflow
def map_interest_fixed_principal_wf() -> list[float]:
rate = [0.05, 0.04, 0.03] # Different interest rates for each loan
time = [12, 24, 36] # Loan periods in months
# Note: principal is set to 1000 for all the calculations
return union.map(lp, bound_inputs={'principal':1000})(rate=rate, time=time)
You can run the map_interest
workflow locally:
$ union run map_interest_wf.py map_interest_wf
You can also run the map_interest
workflow remotely on Union.ai:
$ union run --remote map_interest_wf.py map_interest_wf
Previously registered launch plan
To demonstrate the ability to map over previously registered launch plans, in this example, we map over the simple_wf
launch plan from the basic workflow example in the Flytesnacks repository.
Recall that when a workflow is registered, an associated launch plan is created automatically. One of these launch plans will be leveraged in this example, though custom launch plans can also be used.
-
Clone the Flytesnacks repository:
$ git clone [email protected]:flyteorg/flytesnacks.git
-
Navigate to the
basics
directory:$ cd flytesnacks/examples/basics
-
Register the
simple_wf
workflow:$ union register --project flytesnacks --domain development --version v1 basics/workflow.py
Note that the
simple_wf
workflow is defined as follows:@union.workflow def simple_wf(x: list[int], y: list[int]) -> float: slope_value = slope(x=x, y=y) intercept_value = intercept(x=x, y=y, slope=slope_value) return intercept_value
-
Create a file called
map_simple_wf.py
and copy the following code into it:import union from flytekit import reference_launch_plan @reference_launch_plan( project="flytesnacks", domain="development", name="basics.workflow.simple_wf", version="v1", ) def simple_wf_lp( x: list[int], y: list[int] ) -> float: pass @union.workflow def map_simple_wf() -> list[float]: x = [[-3, 0, 3], [-8, 2, 4], [7, 3, 1]] y = [[7, 4, -2], [-2, 4, 7], [3, 6, 4]] return union.map(simple_wf_lp)(x=x, y=y)
Note the fact that the reference launch plan has an interface that corresponds exactly to the registered
simple_wf
we wish to map over. -
Register the
map_simple_wf
workflow. Reference launch plans cannot be run locally, so we will register themap_simple_wf
workflow to Union.ai and run it remotely.$ union register map_simple_wf.py
-
In the Union.ai UI, run the
map_simple_wf
workflow.