0.1.dev2192+g7c539c3.d20250403
flytekit.core.launch_plan
Directory
Classes
Class |
Description |
LaunchPlan |
Launch Plans are one of the core constructs of Flyte. |
ReferenceLaunchPlan |
A reference launch plan serves as a pointer to a Launch Plan that already exists on your Flyte installation. |
Methods
Method |
Description |
reference_launch_plan() |
A reference launch plan is a pointer to a launch plan that already exists on your Flyte installation. |
Methods
reference_launch_plan()
def reference_launch_plan(
project: str,
domain: str,
name: str,
version: str,
) -> Callable[[Callable[..., Any]], ReferenceLaunchPlan]
A reference launch plan is a pointer to a launch plan that already exists on your Flyte installation. This
object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface
via the function definition.
If at registration time the interface provided causes an issue with compilation, an error will be returned.
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
flytekit.core.launch_plan.LaunchPlan
Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the
:std:ref:core concepts <flyte:divedeep-launchplans>
if you are unfamiliar with them.
Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional
attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow
@workflow
def wf(a: int, c: str) -> str:
...
Create the default launch plan with
LaunchPlan.get_or_create(workflow=my_wf)
If you specify additional parameters, you’ll also have to give the launch plan a unique name. Default and
fixed inputs can be expressed as Python native values like so:
Additionally, a launch plan can be configured to run on a schedule and emit notifications.
Please see the relevant Schedule and Notification objects as well.
To configure the remaining parameters, you’ll need to import the relevant model objects as well.
from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfig
Then use as follows:
class LaunchPlan(
name: str,
workflow: _annotated_workflow.WorkflowBase,
parameters: _interface_models.ParameterMap,
fixed_inputs: _literal_models.LiteralMap,
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
)
Parameter |
Type |
name |
str |
workflow |
_annotated_workflow.WorkflowBase |
parameters |
_interface_models.ParameterMap |
fixed_inputs |
_literal_models.LiteralMap |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
Methods
clone_with()
def clone_with(
name: str,
parameters: Optional[_interface_models.ParameterMap],
fixed_inputs: Optional[_literal_models.LiteralMap],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
) -> LaunchPlan
Parameter |
Type |
name |
str |
parameters |
Optional[_interface_models.ParameterMap] |
fixed_inputs |
Optional[_literal_models.LiteralMap] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
def construct_node_metadata()
create()
def create(
name: str,
workflow: _annotated_workflow.WorkflowBase,
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
) -> LaunchPlan
Parameter |
Type |
name |
str |
workflow |
_annotated_workflow.WorkflowBase |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
get_default_launch_plan()
def get_default_launch_plan(
ctx: FlyteContext,
workflow: _annotated_workflow.WorkflowBase,
) -> LaunchPlan
Users should probably call the get_or_create function defined below instead. A default launch plan is the one
that will just pick up whatever default values are defined in the workflow function signature (if any) and
use the default auth information supplied during serialization, with no notifications or schedules.
Parameter |
Type |
ctx |
FlyteContext |
workflow |
_annotated_workflow.WorkflowBase |
get_or_create()
def get_or_create(
workflow: _annotated_workflow.WorkflowBase,
name: Optional[str],
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
) -> LaunchPlan
This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not
supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it
will be used. If creating the default launch plan, none of the other arguments may be specified.
The resulting launch plan is also cached and if called again with the same name, the
cached version is returned
Parameter |
Type |
workflow |
_annotated_workflow.WorkflowBase |
name |
Optional[str] |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
Properties
Property |
Type |
Description |
annotations |
|
|
fixed_inputs |
|
|
interface |
|
|
labels |
|
|
max_parallelism |
|
|
name |
|
|
notifications |
|
|
overwrite_cache |
|
|
parameters |
|
|
python_interface |
|
|
raw_output_data_config |
|
|
saved_inputs |
|
|
schedule |
|
|
security_context |
|
|
should_auto_activate |
|
|
trigger |
|
|
workflow |
|
|
flytekit.core.launch_plan.ReferenceLaunchPlan
A reference launch plan serves as a pointer to a Launch Plan that already exists on your Flyte installation. This
object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface.
If at registration time the interface provided causes an issue with compilation, an error will be returned.
class ReferenceLaunchPlan(
project: str,
domain: str,
name: str,
version: str,
inputs: Dict[str, Type],
outputs: Dict[str, Type],
)
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
inputs |
Dict[str, Type] |
outputs |
Dict[str, Type] |
Methods
clone_with()
def clone_with(
name: str,
parameters: Optional[_interface_models.ParameterMap],
fixed_inputs: Optional[_literal_models.LiteralMap],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
) -> LaunchPlan
Parameter |
Type |
name |
str |
parameters |
Optional[_interface_models.ParameterMap] |
fixed_inputs |
Optional[_literal_models.LiteralMap] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
)
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
create()
def create(
name: str,
workflow: _annotated_workflow.WorkflowBase,
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
) -> LaunchPlan
Parameter |
Type |
name |
str |
workflow |
_annotated_workflow.WorkflowBase |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
execute()
def execute(
kwargs,
) -> typing.Any
Parameter |
Type |
kwargs |
**kwargs |
get_default_launch_plan()
def get_default_launch_plan(
ctx: FlyteContext,
workflow: _annotated_workflow.WorkflowBase,
) -> LaunchPlan
Users should probably call the get_or_create function defined below instead. A default launch plan is the one
that will just pick up whatever default values are defined in the workflow function signature (if any) and
use the default auth information supplied during serialization, with no notifications or schedules.
Parameter |
Type |
ctx |
FlyteContext |
workflow |
_annotated_workflow.WorkflowBase |
get_or_create()
def get_or_create(
workflow: _annotated_workflow.WorkflowBase,
name: Optional[str],
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
) -> LaunchPlan
This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not
supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it
will be used. If creating the default launch plan, none of the other arguments may be specified.
The resulting launch plan is also cached and if called again with the same name, the
cached version is returned
Parameter |
Type |
workflow |
_annotated_workflow.WorkflowBase |
name |
Optional[str] |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, NoneType]
Please see the local_execute comments in the main task.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
unwrap_literal_map_and_execute()
def unwrap_literal_map_and_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
) -> flytekit.models.literals.LiteralMap
Please see the implementation of the dispatch_execute function in the real task.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property |
Type |
Description |
annotations |
|
|
fixed_inputs |
|
|
id |
|
|
interface |
|
|
labels |
|
|
max_parallelism |
|
|
name |
|
|
notifications |
|
|
overwrite_cache |
|
|
parameters |
|
|
python_interface |
|
|
raw_output_data_config |
|
|
reference |
|
|
saved_inputs |
|
|
schedule |
|
|
security_context |
|
|
should_auto_activate |
|
|
trigger |
|
|
workflow |
|
|