flytekit.core.launch_plan
Directory
Classes
Class | Description |
---|---|
Any |
Special type indicating an unconstrained type. |
FlyteContext |
This is an internal-facing context object, that most users will not have to deal with. |
FlyteContextManager |
FlyteContextManager manages the execution context within Flytekit. |
FlyteEntities |
This is a global Object that tracks various tasks and workflows that are declared within a VM during the. |
Interface |
A Python native interface object, like inspect. |
LaunchPlan |
Launch Plans are one of the core constructs of Flyte. |
LaunchPlanReference |
A reference object containing metadata that points to a remote launch plan. |
LaunchPlanTriggerBase |
Base class for protocol classes. |
ReferenceEntity |
None. |
ReferenceLaunchPlan |
A reference launch plan serves as a pointer to a Launch Plan that already exists on your Flyte installation. |
flytekit.core.launch_plan.Any
Special type indicating an unconstrained type.
- Any is compatible with every type.
- Any assumed to have all methods.
- All values assumed to be instances of Any.
Note that all the above statements are true from the point of view of static type checkers. At runtime, Any should not be used with instance checks.
flytekit.core.launch_plan.FlyteContext
This is an internal-facing context object, that most users will not have to deal with. It’s essentially a globally available grab bag of settings and objects that allows flytekit to do things like convert complex types, run and compile workflows, serialize Flyte entities, etc.
Even though this object as a current_context
function on it, it should not be called directly. Please use the
:py:class:flytekit.FlyteContextManager
object instead.
Please do not confuse this object with the :py:class:flytekit.ExecutionParameters
object.
def FlyteContext(
file_access: FileAccessProvider,
level: int,
flyte_client: Optional['friendly_client.SynchronousFlyteClient'],
compilation_state: Optional[CompilationState],
execution_state: Optional[ExecutionState],
serialization_settings: Optional[SerializationSettings],
in_a_condition: bool,
origin_stackframe: Optional[traceback.FrameSummary],
output_metadata_tracker: Optional[OutputMetadataTracker],
worker_queue: Optional[Controller],
):
Parameter | Type |
---|---|
file_access |
FileAccessProvider |
level |
int |
flyte_client |
Optional['friendly_client.SynchronousFlyteClient'] |
compilation_state |
Optional[CompilationState] |
execution_state |
Optional[ExecutionState] |
serialization_settings |
Optional[SerializationSettings] |
in_a_condition |
bool |
origin_stackframe |
Optional[traceback.FrameSummary] |
output_metadata_tracker |
Optional[OutputMetadataTracker] |
worker_queue |
Optional[Controller] |
Methods
Method | Description |
---|---|
current_context() |
This method exists only to maintain backwards compatibility |
enter_conditional_section() |
None |
get_deck() |
Returns the deck that was created as part of the last execution |
get_origin_stackframe_repr() |
None |
new_builder() |
None |
new_compilation_state() |
Creates and returns a default compilation state |
new_execution_state() |
Creates and returns a new default execution state |
set_stackframe() |
None |
with_client() |
None |
with_compilation_state() |
None |
with_execution_state() |
None |
with_file_access() |
None |
with_new_compilation_state() |
None |
with_output_metadata_tracker() |
None |
with_serialization_settings() |
None |
with_worker_queue() |
None |
current_context()
def current_context()
This method exists only to maintain backwards compatibility. Please use
FlyteContextManager.current_context()
instead.
Users of flytekit should be wary not to confuse the object returned from this function
with :py:func:flytekit.current_context
enter_conditional_section()
def enter_conditional_section()
get_deck()
def get_deck()
Returns the deck that was created as part of the last execution.
The return value depends on the execution environment. In a notebook, the return value is compatible with IPython.display and should be rendered in the notebook.
.. code-block:: python
with flytekit.new_context() as ctx: my_task(…) ctx.get_deck()
OR if you wish to explicitly display
.. code-block:: python
from IPython import display display(ctx.get_deck())
get_origin_stackframe_repr()
def get_origin_stackframe_repr()
new_builder()
def new_builder()
new_compilation_state()
def new_compilation_state(
prefix: str,
):
Creates and returns a default compilation state. For most of the code this should be the entrypoint of compilation, otherwise the code should always uses - with_compilation_state
Parameter | Type |
---|---|
prefix |
str |
new_execution_state()
def new_execution_state(
working_dir: Optional[os.PathLike],
):
Creates and returns a new default execution state. This should be used at the entrypoint of execution, in all other cases it is preferable to use with_execution_state
Parameter | Type |
---|---|
working_dir |
Optional[os.PathLike] |
set_stackframe()
def set_stackframe(
s: traceback.FrameSummary,
):
Parameter | Type |
---|---|
s |
traceback.FrameSummary |
with_client()
def with_client(
c: SynchronousFlyteClient,
):
Parameter | Type |
---|---|
c |
SynchronousFlyteClient |
with_compilation_state()
def with_compilation_state(
c: CompilationState,
):
Parameter | Type |
---|---|
c |
CompilationState |
with_execution_state()
def with_execution_state(
es: ExecutionState,
):
Parameter | Type |
---|---|
es |
ExecutionState |
with_file_access()
def with_file_access(
fa: FileAccessProvider,
):
Parameter | Type |
---|---|
fa |
FileAccessProvider |
with_new_compilation_state()
def with_new_compilation_state()
with_output_metadata_tracker()
def with_output_metadata_tracker(
t: OutputMetadataTracker,
):
Parameter | Type |
---|---|
t |
OutputMetadataTracker |
with_serialization_settings()
def with_serialization_settings(
ss: SerializationSettings,
):
Parameter | Type |
---|---|
ss |
SerializationSettings |
with_worker_queue()
def with_worker_queue(
wq: Controller,
):
Parameter | Type |
---|---|
wq |
Controller |
Properties
Property | Type | Description |
---|---|---|
user_space_params |
flytekit.core.launch_plan.FlyteContextManager
FlyteContextManager manages the execution context within Flytekit. It holds global state of either compilation
or Execution. It is not thread-safe and can only be run as a single threaded application currently.
Context’s within Flytekit is useful to manage compilation state and execution state. Refer to CompilationState
and ExecutionState
for more information. FlyteContextManager provides a singleton stack to manage these contexts.
Typical usage is
.. code-block:: python
FlyteContextManager.initialize() with FlyteContextManager.with_context(o) as ctx: pass
If required - not recommended you can use
FlyteContextManager.push_context()
but correspondingly a pop_context should be called
FlyteContextManager.pop_context()
Methods
Method | Description |
---|---|
add_signal_handler() |
None |
current_context() |
None |
get_origin_stackframe() |
None |
initialize() |
Re-initializes the context and erases the entire context |
pop_context() |
None |
push_context() |
None |
size() |
None |
with_context() |
None |
add_signal_handler()
def add_signal_handler(
handler: typing.Callable[[int, FrameType], typing.Any],
):
Parameter | Type |
---|---|
handler |
typing.Callable[[int, FrameType], typing.Any] |
current_context()
def current_context()
get_origin_stackframe()
def get_origin_stackframe(
limit,
):
Parameter | Type |
---|---|
limit |
initialize()
def initialize()
Re-initializes the context and erases the entire context
pop_context()
def pop_context()
push_context()
def push_context(
ctx: FlyteContext,
f: Optional[traceback.FrameSummary],
):
Parameter | Type |
---|---|
ctx |
FlyteContext |
f |
Optional[traceback.FrameSummary] |
size()
def size()
with_context()
def with_context(
b: FlyteContext.Builder,
):
Parameter | Type |
---|---|
b |
FlyteContext.Builder |
flytekit.core.launch_plan.FlyteEntities
This is a global Object that tracks various tasks and workflows that are declared within a VM during the registration process
flytekit.core.launch_plan.Interface
A Python native interface object, like inspect.signature but simpler.
def Interface(
inputs: Union[Optional[Dict[str, Type]], Optional[Dict[str, Tuple[Type, Any]]]],
outputs: Union[Optional[Dict[str, Type]], Optional[Dict[str, Optional[Type]]]],
output_tuple_name: Optional[str],
docstring: Optional[Docstring],
):
Parameter | Type |
---|---|
inputs |
Union[Optional[Dict[str, Type]], Optional[Dict[str, Tuple[Type, Any]]]] |
outputs |
Union[Optional[Dict[str, Type]], Optional[Dict[str, Optional[Type]]]] |
output_tuple_name |
Optional[str] |
docstring |
Optional[Docstring] |
Methods
Method | Description |
---|---|
remove_inputs() |
This method is useful in removing some variables from the Flyte backend inputs specification, as these are |
with_inputs() |
Use this to add additional inputs to the interface |
with_outputs() |
This method allows addition of extra outputs are expected from a task specification |
remove_inputs()
def remove_inputs(
vars: Optional[List[str]],
):
This method is useful in removing some variables from the Flyte backend inputs specification, as these are implicit local only inputs or will be supplied by the library at runtime. For example, spark-session etc It creates a new instance of interface with the requested variables removed
Parameter | Type |
---|---|
vars |
Optional[List[str]] |
with_inputs()
def with_inputs(
extra_inputs: Dict[str, Type],
):
Use this to add additional inputs to the interface. This is useful for adding additional implicit inputs that are added without the user requesting for them
Parameter | Type |
---|---|
extra_inputs |
Dict[str, Type] |
with_outputs()
def with_outputs(
extra_outputs: Dict[str, Type],
):
This method allows addition of extra outputs are expected from a task specification
Parameter | Type |
---|---|
extra_outputs |
Dict[str, Type] |
Properties
Property | Type | Description |
---|---|---|
default_inputs_as_kwargs | ||
docstring | ||
inputs | ||
inputs_with_defaults | ||
output_names | ||
output_tuple | ||
output_tuple_name | ||
outputs |
flytekit.core.launch_plan.LaunchPlan
Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the
:std:ref:core concepts <flyte:divedeep-launchplans>
if you are unfamiliar with them.
Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow
.. code-block:: python
@workflow def wf(a: int, c: str) -> str: …
Create the default launch plan with
.. code-block:: python
LaunchPlan.get_or_create(workflow=my_wf)
If you specify additional parameters, you’ll also have to give the launch plan a unique name. Default and fixed inputs can be expressed as Python native values like so:
.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # fixed_and_default_start :end-before: # fixed_and_default_end :language: python :dedent: 4
Additionally, a launch plan can be configured to run on a schedule and emit notifications.
Please see the relevant Schedule and Notification objects as well.
To configure the remaining parameters, you’ll need to import the relevant model objects as well.
.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # schedule_start :end-before: # schedule_end :language: python :dedent: 4
.. code-block:: python
from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfig
Then use as follows
.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # auth_role_start :end-before: # auth_role_end :language: python :dedent: 4
def LaunchPlan(
name: str,
workflow: _annotated_workflow.WorkflowBase,
parameters: _interface_models.ParameterMap,
fixed_inputs: _literal_models.LiteralMap,
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
Parameter | Type |
---|---|
name |
str |
workflow |
_annotated_workflow.WorkflowBase |
parameters |
_interface_models.ParameterMap |
fixed_inputs |
_literal_models.LiteralMap |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
Methods
Method | Description |
---|---|
clone_with() |
None |
construct_node_metadata() |
None |
create() |
None |
get_default_launch_plan() |
Users should probably call the get_or_create function defined below instead |
get_or_create() |
This function offers a friendlier interface for creating launch plans |
clone_with()
def clone_with(
name: str,
parameters: Optional[_interface_models.ParameterMap],
fixed_inputs: Optional[_literal_models.LiteralMap],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
Parameter | Type |
---|---|
name |
str |
parameters |
Optional[_interface_models.ParameterMap] |
fixed_inputs |
Optional[_literal_models.LiteralMap] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
construct_node_metadata()
def construct_node_metadata()
create()
def create(
name: str,
workflow: _annotated_workflow.WorkflowBase,
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
Parameter | Type |
---|---|
name |
str |
workflow |
_annotated_workflow.WorkflowBase |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
get_default_launch_plan()
def get_default_launch_plan(
ctx: FlyteContext,
workflow: _annotated_workflow.WorkflowBase,
):
Users should probably call the get_or_create function defined below instead. A default launch plan is the one that will just pick up whatever default values are defined in the workflow function signature (if any) and use the default auth information supplied during serialization, with no notifications or schedules.
Parameter | Type |
---|---|
ctx |
FlyteContext |
workflow |
_annotated_workflow.WorkflowBase |
get_or_create()
def get_or_create(
workflow: _annotated_workflow.WorkflowBase,
name: Optional[str],
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it will be used. If creating the default launch plan, none of the other arguments may be specified.
The resulting launch plan is also cached and if called again with the same name, the cached version is returned
Parameter | Type |
---|---|
workflow |
_annotated_workflow.WorkflowBase |
name |
Optional[str] |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
Properties
Property | Type | Description |
---|---|---|
annotations | ||
fixed_inputs | ||
interface | ||
labels | ||
max_parallelism | ||
name | ||
notifications | ||
overwrite_cache | ||
parameters | ||
python_interface | ||
raw_output_data_config | ||
saved_inputs | ||
schedule | ||
security_context | ||
should_auto_activate | ||
trigger | ||
workflow |
flytekit.core.launch_plan.LaunchPlanReference
A reference object containing metadata that points to a remote launch plan.
def LaunchPlanReference(
project: str,
domain: str,
name: str,
version: str,
):
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
Properties
Property | Type | Description |
---|---|---|
id | ||
resource_type |
flytekit.core.launch_plan.LaunchPlanTriggerBase
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol): def meth(self) -> int: …
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C: def meth(self) -> int: return 0
def func(x: Proto) -> int: return x.meth()
func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProtoT: def meth(self) -> T: …
def LaunchPlanTriggerBase(
args,
kwargs,
):
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
to_flyte_idl() |
None |
to_flyte_idl()
def to_flyte_idl(
args,
kwargs,
):
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
flytekit.core.launch_plan.ReferenceEntity
def ReferenceEntity(
reference: typing.Union[flytekit.core.reference_entity.WorkflowReference, flytekit.core.reference_entity.TaskReference, flytekit.core.reference_entity.LaunchPlanReference],
inputs: typing.Dict[str, typing.Type],
outputs: typing.Dict[str, typing.Type],
):
Parameter | Type |
---|---|
reference |
typing.Union[flytekit.core.reference_entity.WorkflowReference, flytekit.core.reference_entity.TaskReference, flytekit.core.reference_entity.LaunchPlanReference] |
inputs |
typing.Dict[str, typing.Type] |
outputs |
typing.Dict[str, typing.Type] |
Methods
Method | Description |
---|---|
compile() |
None |
construct_node_metadata() |
None |
execute() |
None |
local_execute() |
Please see the local_execute comments in the main task |
local_execution_mode() |
None |
unwrap_literal_map_and_execute() |
Please see the implementation of the dispatch_execute function in the real task |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()
execute()
def execute(
kwargs,
):
Parameter | Type |
---|---|
kwargs |
**kwargs |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Please see the local_execute comments in the main task.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
unwrap_literal_map_and_execute()
def unwrap_literal_map_and_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Please see the implementation of the dispatch_execute function in the real task.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property | Type | Description |
---|---|---|
id | ||
interface | ||
name | ||
python_interface | ||
reference |
flytekit.core.launch_plan.ReferenceLaunchPlan
A reference launch plan serves as a pointer to a Launch Plan that already exists on your Flyte installation. This object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface. If at registration time the interface provided causes an issue with compilation, an error will be returned.
def ReferenceLaunchPlan(
project: str,
domain: str,
name: str,
version: str,
inputs: Dict[str, Type],
outputs: Dict[str, Type],
):
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
inputs |
Dict[str, Type] |
outputs |
Dict[str, Type] |
Methods
Method | Description |
---|---|
clone_with() |
None |
compile() |
None |
construct_node_metadata() |
None |
create() |
None |
execute() |
None |
get_default_launch_plan() |
Users should probably call the get_or_create function defined below instead |
get_or_create() |
This function offers a friendlier interface for creating launch plans |
local_execute() |
Please see the local_execute comments in the main task |
local_execution_mode() |
None |
unwrap_literal_map_and_execute() |
Please see the implementation of the dispatch_execute function in the real task |
clone_with()
def clone_with(
name: str,
parameters: Optional[_interface_models.ParameterMap],
fixed_inputs: Optional[_literal_models.LiteralMap],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
Parameter | Type |
---|---|
name |
str |
parameters |
Optional[_interface_models.ParameterMap] |
fixed_inputs |
Optional[_literal_models.LiteralMap] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()
create()
def create(
name: str,
workflow: _annotated_workflow.WorkflowBase,
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
Parameter | Type |
---|---|
name |
str |
workflow |
_annotated_workflow.WorkflowBase |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
execute()
def execute(
kwargs,
):
Parameter | Type |
---|---|
kwargs |
**kwargs |
get_default_launch_plan()
def get_default_launch_plan(
ctx: FlyteContext,
workflow: _annotated_workflow.WorkflowBase,
):
Users should probably call the get_or_create function defined below instead. A default launch plan is the one that will just pick up whatever default values are defined in the workflow function signature (if any) and use the default auth information supplied during serialization, with no notifications or schedules.
Parameter | Type |
---|---|
ctx |
FlyteContext |
workflow |
_annotated_workflow.WorkflowBase |
get_or_create()
def get_or_create(
workflow: _annotated_workflow.WorkflowBase,
name: Optional[str],
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it will be used. If creating the default launch plan, none of the other arguments may be specified.
The resulting launch plan is also cached and if called again with the same name, the cached version is returned
Parameter | Type |
---|---|
workflow |
_annotated_workflow.WorkflowBase |
name |
Optional[str] |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Please see the local_execute comments in the main task.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
unwrap_literal_map_and_execute()
def unwrap_literal_map_and_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Please see the implementation of the dispatch_execute function in the real task.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property | Type | Description |
---|---|---|
annotations | ||
fixed_inputs | ||
id | ||
interface | ||
labels | ||
max_parallelism | ||
name | ||
notifications | ||
overwrite_cache | ||
parameters | ||
python_interface | ||
raw_output_data_config | ||
reference | ||
saved_inputs | ||
schedule | ||
security_context | ||
should_auto_activate | ||
trigger | ||
workflow |