Defining launch plans
You can define a launch plan with the LaunchPlan
class.
This is a simple example of defining a launch plan:
import union
@union.workflow
def my_workflow(a: int, b: str) -> str:
return f"Result: {a} and {b}"
# Create a default launch plan
default_lp = @union.LaunchPlan.get_or_create(workflow=my_workflow)
# Create a named launch plan
named_lp = @union.LaunchPlan.get_or_create(
workflow=my_workflow,
name="my_custom_launch_plan"
)
Default and Fixed Inputs
Default inputs can be overridden at execution time, while fixed inputs cannot be changed.
import union
# Launch plan with default inputs
lp_with_defaults = union.LaunchPlan.get_or_create(
workflow=my_workflow,
name="with_defaults",
default_inputs={"a": 42, "b": "default_value"}
)
# Launch plan with fixed inputs
lp_with_fixed = union.LaunchPlan.get_or_create(
workflow=my_workflow,
name="with_fixed",
fixed_inputs={"a": 100} # 'a' will always be 100, only 'b' can be specified
)
# Combining default and fixed inputs
lp_combined = union.LaunchPlan.get_or_create(
workflow=my_workflow,
name="combined_inputs",
default_inputs={"b": "default_string"},
fixed_inputs={"a": 200}
)
Scheduled Execution
import union
from datetime import timedelta
from flytekit.core.schedule import CronSchedule, FixedRate
# Using a cron schedule (runs at 10:00 AM UTC every Monday)
cron_lp = union.LaunchPlan.get_or_create(
workflow=my_workflow,
name="weekly_monday",
default_inputs={"a": 1, "b": "weekly"},
schedule=CronSchedule(
schedule="0 10 * * 1", # Cron expression: minute hour day-of-month month day-of-week
kickoff_time_input_arg=None
)
)
# Using a fixed rate schedule (runs every 6 hours)
fixed_rate_lp = union.LaunchPlan.get_or_create(
workflow=my_workflow,
name="every_six_hours",
default_inputs={"a": 1, "b": "periodic"},
schedule=FixedRate(
duration=timedelta(hours=6)
)
)
Labels and Annotations
Labels and annotations help with organization and can be used for filtering or adding metadata.
import union
from flytekit.models.common import Labels, Annotations
# Adding labels and annotations
lp_with_metadata = union.LaunchPlan.get_or_create(
workflow=my_workflow,
name="with_metadata",
default_inputs={"a": 1, "b": "metadata"},
labels=Labels({"team": "data-science", "env": "staging"}),
annotations=Annotations({"description": "Launch plan for testing", "owner": "jane.doe"})
)
Execution Parameters
import union
# Setting max parallelism to limit concurrent task execution
lp_with_parallelism = union.LaunchPlan.get_or_create(
workflow=my_workflow,
name="with_parallelism",
default_inputs={"a": 1, "b": "parallel"},
max_parallelism=10 # Only 10 task nodes can run concurrently
)
# Disable caching for this launch plan's executions
lp_no_cache = union.LaunchPlan.get_or_create(
workflow=my_workflow,
name="no_cache",
default_inputs={"a": 1, "b": "fresh"},
overwrite_cache=True # Always execute fresh, ignoring cached results
)
# Auto-activate on registration
lp_auto_activate = union.LaunchPlan.get_or_create(
workflow=my_workflow,
name="auto_active",
default_inputs={"a": 1, "b": "active"},
auto_activate=True # Launch plan will be active immediately after registration
)
Security and Authentication
We can also override the auth role (either an iam role or a kubernetes service account) used to execute a launch plan.
import union
from flytekit.models.common import AuthRole
from flytekit import SecurityContext
# Setting auth role for the launch plan
lp_with_auth = union.LaunchPlan.get_or_create(
workflow=my_workflow,
name="with_auth",
default_inputs={"a": 1, "b": "secure"},
auth_role=AuthRole(
assumable_iam_role="arn:aws:iam::12345678:role/my-execution-role"
)
)
# Setting security context
lp_with_security = union.LaunchPlan.get_or_create(
workflow=my_workflow,
name="with_security",
default_inputs={"a": 1, "b": "context"},
security_context=SecurityContext(
run_as=SecurityContext.K8sServiceAccount(name="my-service-account")
)
)
Raw Output Data Configuration
from flytekit.models.common import RawOutputDataConfig
# Configure where large outputs should be stored
lp_with_output_config = LaunchPlan.get_or_create(
workflow=my_workflow,
name="with_output_config",
default_inputs={"a": 1, "b": "output"},
raw_output_data_config=RawOutputDataConfig(
output_location_prefix="s3://my-bucket/workflow-outputs/"
)
)
Putting It All Together
A pretty comprehensive example follows below. This custom launch plan has d
comprehensive_lp = LaunchPlan.get_or_create(
workflow=my_workflow,
name="comprehensive_example",
default_inputs={"b": "configurable"},
fixed_inputs={"a": 42},
schedule=CronSchedule(schedule="0 9 * * *"), # Daily at 9 AM UTC
notifications=[
Notification(
phases=["SUCCEEDED", "FAILED"],
email=EmailNotification(recipients_email=["[email protected]"])
)
],
labels=Labels({"env": "production", "team": "data"}),
annotations=Annotations({"description": "Daily data processing"}),
max_parallelism=20,
overwrite_cache=False,
auto_activate=True,
auth_role=AuthRole(assumable_iam_role="arn:aws:iam::12345678:role/workflow-role"),
raw_output_data_config=RawOutputDataConfig(
output_location_prefix="s3://results-bucket/daily-run/"
)
)
These examples demonstrate the flexibility of Launch Plans in Flyte, allowing you to customize execution parameters, inputs, schedules, and more to suit your workflow requirements.