1.15.4.dev2+g3e3ce2426

flytekit.core.launch_plan

Directory

Classes

Class Description
Any Special type indicating an unconstrained type.
FlyteContext This is an internal-facing context object, that most users will not have to deal with.
FlyteContextManager FlyteContextManager manages the execution context within Flytekit.
FlyteEntities This is a global Object that tracks various tasks and workflows that are declared within a VM during the.
Interface A Python native interface object, like inspect.
LaunchPlan Launch Plans are one of the core constructs of Flyte.
LaunchPlanReference A reference object containing metadata that points to a remote launch plan.
LaunchPlanTriggerBase Base class for protocol classes.
ReferenceEntity None.
ReferenceLaunchPlan A reference launch plan serves as a pointer to a Launch Plan that already exists on your Flyte installation.

flytekit.core.launch_plan.Any

Special type indicating an unconstrained type.

  • Any is compatible with every type.
  • Any assumed to have all methods.
  • All values assumed to be instances of Any.

Note that all the above statements are true from the point of view of static type checkers. At runtime, Any should not be used with instance checks.

flytekit.core.launch_plan.FlyteContext

This is an internal-facing context object, that most users will not have to deal with. It’s essentially a globally available grab bag of settings and objects that allows flytekit to do things like convert complex types, run and compile workflows, serialize Flyte entities, etc.

Even though this object as a current_context function on it, it should not be called directly. Please use the :py:class:flytekit.FlyteContextManager object instead.

Please do not confuse this object with the :py:class:flytekit.ExecutionParameters object.

def FlyteContext(
    file_access: FileAccessProvider,
    level: int,
    flyte_client: Optional['friendly_client.SynchronousFlyteClient'],
    compilation_state: Optional[CompilationState],
    execution_state: Optional[ExecutionState],
    serialization_settings: Optional[SerializationSettings],
    in_a_condition: bool,
    origin_stackframe: Optional[traceback.FrameSummary],
    output_metadata_tracker: Optional[OutputMetadataTracker],
    worker_queue: Optional[Controller],
):
Parameter Type
file_access FileAccessProvider
level int
flyte_client Optional['friendly_client.SynchronousFlyteClient']
compilation_state Optional[CompilationState]
execution_state Optional[ExecutionState]
serialization_settings Optional[SerializationSettings]
in_a_condition bool
origin_stackframe Optional[traceback.FrameSummary]
output_metadata_tracker Optional[OutputMetadataTracker]
worker_queue Optional[Controller]

Methods

Method Description
current_context() This method exists only to maintain backwards compatibility
enter_conditional_section() None
get_deck() Returns the deck that was created as part of the last execution
get_origin_stackframe_repr() None
new_builder() None
new_compilation_state() Creates and returns a default compilation state
new_execution_state() Creates and returns a new default execution state
set_stackframe() None
with_client() None
with_compilation_state() None
with_execution_state() None
with_file_access() None
with_new_compilation_state() None
with_output_metadata_tracker() None
with_serialization_settings() None
with_worker_queue() None

current_context()

def current_context()

This method exists only to maintain backwards compatibility. Please use FlyteContextManager.current_context() instead.

Users of flytekit should be wary not to confuse the object returned from this function with :py:func:flytekit.current_context

enter_conditional_section()

def enter_conditional_section()

get_deck()

def get_deck()

Returns the deck that was created as part of the last execution.

The return value depends on the execution environment. In a notebook, the return value is compatible with IPython.display and should be rendered in the notebook.

.. code-block:: python

with flytekit.new_context() as ctx: my_task(…) ctx.get_deck()

OR if you wish to explicitly display

.. code-block:: python

from IPython import display display(ctx.get_deck())

get_origin_stackframe_repr()

def get_origin_stackframe_repr()

new_builder()

def new_builder()

new_compilation_state()

def new_compilation_state(
    prefix: str,
):

Creates and returns a default compilation state. For most of the code this should be the entrypoint of compilation, otherwise the code should always uses - with_compilation_state

Parameter Type
prefix str

new_execution_state()

def new_execution_state(
    working_dir: Optional[os.PathLike],
):

Creates and returns a new default execution state. This should be used at the entrypoint of execution, in all other cases it is preferable to use with_execution_state

Parameter Type
working_dir Optional[os.PathLike]

set_stackframe()

def set_stackframe(
    s: traceback.FrameSummary,
):
Parameter Type
s traceback.FrameSummary

with_client()

def with_client(
    c: SynchronousFlyteClient,
):
Parameter Type
c SynchronousFlyteClient

with_compilation_state()

def with_compilation_state(
    c: CompilationState,
):
Parameter Type
c CompilationState

with_execution_state()

def with_execution_state(
    es: ExecutionState,
):
Parameter Type
es ExecutionState

with_file_access()

def with_file_access(
    fa: FileAccessProvider,
):
Parameter Type
fa FileAccessProvider

with_new_compilation_state()

def with_new_compilation_state()

with_output_metadata_tracker()

def with_output_metadata_tracker(
    t: OutputMetadataTracker,
):
Parameter Type
t OutputMetadataTracker

with_serialization_settings()

def with_serialization_settings(
    ss: SerializationSettings,
):
Parameter Type
ss SerializationSettings

with_worker_queue()

def with_worker_queue(
    wq: Controller,
):
Parameter Type
wq Controller

Properties

Property Type Description
user_space_params

flytekit.core.launch_plan.FlyteContextManager

FlyteContextManager manages the execution context within Flytekit. It holds global state of either compilation or Execution. It is not thread-safe and can only be run as a single threaded application currently. Context’s within Flytekit is useful to manage compilation state and execution state. Refer to CompilationState and ExecutionState for more information. FlyteContextManager provides a singleton stack to manage these contexts.

Typical usage is

.. code-block:: python

FlyteContextManager.initialize() with FlyteContextManager.with_context(o) as ctx: pass

If required - not recommended you can use

FlyteContextManager.push_context()

but correspondingly a pop_context should be called

FlyteContextManager.pop_context()

Methods

Method Description
add_signal_handler() None
current_context() None
get_origin_stackframe() None
initialize() Re-initializes the context and erases the entire context
pop_context() None
push_context() None
size() None
with_context() None

add_signal_handler()

def add_signal_handler(
    handler: typing.Callable[[int, FrameType], typing.Any],
):
Parameter Type
handler typing.Callable[[int, FrameType], typing.Any]

current_context()

def current_context()

get_origin_stackframe()

def get_origin_stackframe(
    limit,
):
Parameter Type
limit

initialize()

def initialize()

Re-initializes the context and erases the entire context

pop_context()

def pop_context()

push_context()

def push_context(
    ctx: FlyteContext,
    f: Optional[traceback.FrameSummary],
):
Parameter Type
ctx FlyteContext
f Optional[traceback.FrameSummary]

size()

def size()

with_context()

def with_context(
    b: FlyteContext.Builder,
):
Parameter Type
b FlyteContext.Builder

flytekit.core.launch_plan.FlyteEntities

This is a global Object that tracks various tasks and workflows that are declared within a VM during the registration process

flytekit.core.launch_plan.Interface

A Python native interface object, like inspect.signature but simpler.

def Interface(
    inputs: Union[Optional[Dict[str, Type]], Optional[Dict[str, Tuple[Type, Any]]]],
    outputs: Union[Optional[Dict[str, Type]], Optional[Dict[str, Optional[Type]]]],
    output_tuple_name: Optional[str],
    docstring: Optional[Docstring],
):
Parameter Type
inputs Union[Optional[Dict[str, Type]], Optional[Dict[str, Tuple[Type, Any]]]]
outputs Union[Optional[Dict[str, Type]], Optional[Dict[str, Optional[Type]]]]
output_tuple_name Optional[str]
docstring Optional[Docstring]

Methods

Method Description
remove_inputs() This method is useful in removing some variables from the Flyte backend inputs specification, as these are
with_inputs() Use this to add additional inputs to the interface
with_outputs() This method allows addition of extra outputs are expected from a task specification

remove_inputs()

def remove_inputs(
    vars: Optional[List[str]],
):

This method is useful in removing some variables from the Flyte backend inputs specification, as these are implicit local only inputs or will be supplied by the library at runtime. For example, spark-session etc It creates a new instance of interface with the requested variables removed

Parameter Type
vars Optional[List[str]]

with_inputs()

def with_inputs(
    extra_inputs: Dict[str, Type],
):

Use this to add additional inputs to the interface. This is useful for adding additional implicit inputs that are added without the user requesting for them

Parameter Type
extra_inputs Dict[str, Type]

with_outputs()

def with_outputs(
    extra_outputs: Dict[str, Type],
):

This method allows addition of extra outputs are expected from a task specification

Parameter Type
extra_outputs Dict[str, Type]

Properties

Property Type Description
default_inputs_as_kwargs
docstring
inputs
inputs_with_defaults
output_names
output_tuple
output_tuple_name
outputs

flytekit.core.launch_plan.LaunchPlan

Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the :std:ref:core concepts <flyte:divedeep-launchplans> if you are unfamiliar with them.

Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow

.. code-block:: python

@workflow def wf(a: int, c: str) -> str: …

Create the default launch plan with

.. code-block:: python

LaunchPlan.get_or_create(workflow=my_wf)

If you specify additional parameters, you’ll also have to give the launch plan a unique name. Default and fixed inputs can be expressed as Python native values like so:

.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # fixed_and_default_start :end-before: # fixed_and_default_end :language: python :dedent: 4

Additionally, a launch plan can be configured to run on a schedule and emit notifications.

Please see the relevant Schedule and Notification objects as well.

To configure the remaining parameters, you’ll need to import the relevant model objects as well.

.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # schedule_start :end-before: # schedule_end :language: python :dedent: 4

.. code-block:: python

from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfig

Then use as follows

.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # auth_role_start :end-before: # auth_role_end :language: python :dedent: 4

def LaunchPlan(
    name: str,
    workflow: _annotated_workflow.WorkflowBase,
    parameters: _interface_models.ParameterMap,
    fixed_inputs: _literal_models.LiteralMap,
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):
Parameter Type
name str
workflow _annotated_workflow.WorkflowBase
parameters _interface_models.ParameterMap
fixed_inputs _literal_models.LiteralMap
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

Methods

Method Description
clone_with() None
construct_node_metadata() None
create() None
get_default_launch_plan() Users should probably call the get_or_create function defined below instead
get_or_create() This function offers a friendlier interface for creating launch plans

clone_with()

def clone_with(
    name: str,
    parameters: Optional[_interface_models.ParameterMap],
    fixed_inputs: Optional[_literal_models.LiteralMap],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):
Parameter Type
name str
parameters Optional[_interface_models.ParameterMap]
fixed_inputs Optional[_literal_models.LiteralMap]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

construct_node_metadata()

def construct_node_metadata()

create()

def create(
    name: str,
    workflow: _annotated_workflow.WorkflowBase,
    default_inputs: Optional[Dict[str, Any]],
    fixed_inputs: Optional[Dict[str, Any]],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    auth_role: Optional[_common_models.AuthRole],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):
Parameter Type
name str
workflow _annotated_workflow.WorkflowBase
default_inputs Optional[Dict[str, Any]]
fixed_inputs Optional[Dict[str, Any]]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
auth_role Optional[_common_models.AuthRole]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

get_default_launch_plan()

def get_default_launch_plan(
    ctx: FlyteContext,
    workflow: _annotated_workflow.WorkflowBase,
):

Users should probably call the get_or_create function defined below instead. A default launch plan is the one that will just pick up whatever default values are defined in the workflow function signature (if any) and use the default auth information supplied during serialization, with no notifications or schedules.

Parameter Type
ctx FlyteContext
workflow _annotated_workflow.WorkflowBase

get_or_create()

def get_or_create(
    workflow: _annotated_workflow.WorkflowBase,
    name: Optional[str],
    default_inputs: Optional[Dict[str, Any]],
    fixed_inputs: Optional[Dict[str, Any]],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    auth_role: Optional[_common_models.AuthRole],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):

This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it will be used. If creating the default launch plan, none of the other arguments may be specified.

The resulting launch plan is also cached and if called again with the same name, the cached version is returned

Parameter Type
workflow _annotated_workflow.WorkflowBase
name Optional[str]
default_inputs Optional[Dict[str, Any]]
fixed_inputs Optional[Dict[str, Any]]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
auth_role Optional[_common_models.AuthRole]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

Properties

Property Type Description
annotations
fixed_inputs
interface
labels
max_parallelism
name
notifications
overwrite_cache
parameters
python_interface
raw_output_data_config
saved_inputs
schedule
security_context
should_auto_activate
trigger
workflow

flytekit.core.launch_plan.LaunchPlanReference

A reference object containing metadata that points to a remote launch plan.

def LaunchPlanReference(
    project: str,
    domain: str,
    name: str,
    version: str,
):
Parameter Type
project str
domain str
name str
version str

Properties

Property Type Description
id
resource_type

flytekit.core.launch_plan.LaunchPlanTriggerBase

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol): def meth(self) -> int: …

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C: def meth(self) -> int: return 0

def func(x: Proto) -> int: return x.meth()

func(C()) # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProtoT: def meth(self) -> T: …

def LaunchPlanTriggerBase(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

Methods

Method Description
to_flyte_idl() None

to_flyte_idl()

def to_flyte_idl(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

flytekit.core.launch_plan.ReferenceEntity

def ReferenceEntity(
    reference: typing.Union[flytekit.core.reference_entity.WorkflowReference, flytekit.core.reference_entity.TaskReference, flytekit.core.reference_entity.LaunchPlanReference],
    inputs: typing.Dict[str, typing.Type],
    outputs: typing.Dict[str, typing.Type],
):
Parameter Type
reference typing.Union[flytekit.core.reference_entity.WorkflowReference, flytekit.core.reference_entity.TaskReference, flytekit.core.reference_entity.LaunchPlanReference]
inputs typing.Dict[str, typing.Type]
outputs typing.Dict[str, typing.Type]

Methods

Method Description
compile() None
construct_node_metadata() None
execute() None
local_execute() Please see the local_execute comments in the main task
local_execution_mode() None
unwrap_literal_map_and_execute() Please see the implementation of the dispatch_execute function in the real task

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

execute()

def execute(
    kwargs,
):
Parameter Type
kwargs **kwargs

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

Please see the local_execute comments in the main task.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

unwrap_literal_map_and_execute()

def unwrap_literal_map_and_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Please see the implementation of the dispatch_execute function in the real task.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

Properties

Property Type Description
id
interface
name
python_interface
reference

flytekit.core.launch_plan.ReferenceLaunchPlan

A reference launch plan serves as a pointer to a Launch Plan that already exists on your Flyte installation. This object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface. If at registration time the interface provided causes an issue with compilation, an error will be returned.

def ReferenceLaunchPlan(
    project: str,
    domain: str,
    name: str,
    version: str,
    inputs: Dict[str, Type],
    outputs: Dict[str, Type],
):
Parameter Type
project str
domain str
name str
version str
inputs Dict[str, Type]
outputs Dict[str, Type]

Methods

Method Description
clone_with() None
compile() None
construct_node_metadata() None
create() None
execute() None
get_default_launch_plan() Users should probably call the get_or_create function defined below instead
get_or_create() This function offers a friendlier interface for creating launch plans
local_execute() Please see the local_execute comments in the main task
local_execution_mode() None
unwrap_literal_map_and_execute() Please see the implementation of the dispatch_execute function in the real task

clone_with()

def clone_with(
    name: str,
    parameters: Optional[_interface_models.ParameterMap],
    fixed_inputs: Optional[_literal_models.LiteralMap],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):
Parameter Type
name str
parameters Optional[_interface_models.ParameterMap]
fixed_inputs Optional[_literal_models.LiteralMap]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

create()

def create(
    name: str,
    workflow: _annotated_workflow.WorkflowBase,
    default_inputs: Optional[Dict[str, Any]],
    fixed_inputs: Optional[Dict[str, Any]],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    auth_role: Optional[_common_models.AuthRole],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):
Parameter Type
name str
workflow _annotated_workflow.WorkflowBase
default_inputs Optional[Dict[str, Any]]
fixed_inputs Optional[Dict[str, Any]]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
auth_role Optional[_common_models.AuthRole]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

execute()

def execute(
    kwargs,
):
Parameter Type
kwargs **kwargs

get_default_launch_plan()

def get_default_launch_plan(
    ctx: FlyteContext,
    workflow: _annotated_workflow.WorkflowBase,
):

Users should probably call the get_or_create function defined below instead. A default launch plan is the one that will just pick up whatever default values are defined in the workflow function signature (if any) and use the default auth information supplied during serialization, with no notifications or schedules.

Parameter Type
ctx FlyteContext
workflow _annotated_workflow.WorkflowBase

get_or_create()

def get_or_create(
    workflow: _annotated_workflow.WorkflowBase,
    name: Optional[str],
    default_inputs: Optional[Dict[str, Any]],
    fixed_inputs: Optional[Dict[str, Any]],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    auth_role: Optional[_common_models.AuthRole],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):

This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it will be used. If creating the default launch plan, none of the other arguments may be specified.

The resulting launch plan is also cached and if called again with the same name, the cached version is returned

Parameter Type
workflow _annotated_workflow.WorkflowBase
name Optional[str]
default_inputs Optional[Dict[str, Any]]
fixed_inputs Optional[Dict[str, Any]]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
auth_role Optional[_common_models.AuthRole]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

Please see the local_execute comments in the main task.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

unwrap_literal_map_and_execute()

def unwrap_literal_map_and_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Please see the implementation of the dispatch_execute function in the real task.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

Properties

Property Type Description
annotations
fixed_inputs
id
interface
labels
max_parallelism
name
notifications
overwrite_cache
parameters
python_interface
raw_output_data_config
reference
saved_inputs
schedule
security_context
should_auto_activate
trigger
workflow