flytekit.remote
=====================
Remote Access
.. currentmodule:: flytekit.remote
This module provides utilities for performing operations on tasks, workflows, launchplans, and executions, for example,
the following code fetches and executes a workflow:
.. code-block:: python
# create a remote object from flyte config and environment variables
FlyteRemote(config=Config.auto())
FlyteRemote(config=Config.auto(config_file=....))
FlyteRemote(config=Config(....))
# Or if you need to specify a custom cert chain
# (options and compression are also respected keyword arguments)
FlyteRemote(private_key=your_private_key_bytes, root_certificates=..., certificate_chain=...)
# fetch a workflow from the flyte backend
remote = FlyteRemote(...)
flyte_workflow = remote.fetch_workflow(name="my_workflow", version="v1")
# execute the workflow, wait=True will return the execution object after it's completed
workflow_execution = remote.execute(flyte_workflow, inputs={"a": 1, "b": 10}, wait=True)
# inspect the execution's outputs
print(workflow_execution.outputs)
.. _remote-entrypoint:
Entrypoint
.. autosummary::
:template: custom.rst
:toctree: generated/
:nosignatures:
~remote.FlyteRemote
~remote.Options
.. _remote-flyte-entities:
Entities
.. autosummary::
:template: custom.rst
:toctree: generated/
:nosignatures:
~entities.FlyteTask
~entities.FlyteWorkflow
~entities.FlyteLaunchPlan
.. _remote-flyte-entity-components:
Entity Components
.. autosummary::
:template: custom.rst
:toctree: generated/
:nosignatures:
~entities.FlyteNode
~entities.FlyteTaskNode
~entities.FlyteWorkflowNode
.. _remote-flyte-execution-objects:
Execution Objects
.. autosummary::
:template: custom.rst
:toctree: generated/
:nosignatures:
~executions.FlyteWorkflowExecution
~executions.FlyteTaskExecution
~executions.FlyteNodeExecution
Directory
Classes
Class |
Description |
FlyteBranchNode |
None. |
FlyteLaunchPlan |
A class encapsulating a remote Flyte launch plan. |
FlyteNode |
A class encapsulating a remote Flyte node. |
FlyteNodeExecution |
A class encapsulating a node execution being run on a Flyte remote backend. |
FlyteRemote |
Main entrypoint for programmatically accessing a Flyte remote backend. |
FlyteTask |
A class encapsulating a remote Flyte task. |
FlyteTaskExecution |
A class encapsulating a task execution being run on a Flyte remote backend. |
FlyteTaskNode |
A class encapsulating a task that a Flyte node needs to execute. |
FlyteWorkflow |
A class encapsulating a remote Flyte workflow. |
FlyteWorkflowExecution |
A class encapsulating a workflow execution being run on a Flyte remote backend. |
FlyteWorkflowNode |
A class encapsulating a workflow that a Flyte node needs to execute. |
flytekit.remote.FlyteBranchNode
def FlyteBranchNode(
if_else: _workflow_model.IfElseBlock,
):
BranchNode is a special node that alter the flow of the workflow graph. It allows the control flow to branch at
runtime based on a series of conditions that get evaluated on various parameters (e.g. inputs, primitives).
Parameter |
Type |
if_else |
_workflow_model.IfElseBlock |
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_objct,
):
def promote_from_model(
base_model: _workflow_model.BranchNode,
sub_workflows: Dict[id_models.Identifier, _workflow_model.WorkflowTemplate],
node_launch_plans: Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec],
tasks: Dict[id_models.Identifier, FlyteTask],
converted_sub_workflows: Dict[id_models.Identifier, FlyteWorkflow],
):
Parameter |
Type |
base_model |
_workflow_model.BranchNode |
sub_workflows |
Dict[id_models.Identifier, _workflow_model.WorkflowTemplate] |
node_launch_plans |
Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec] |
tasks |
Dict[id_models.Identifier, FlyteTask] |
converted_sub_workflows |
Dict[id_models.Identifier, FlyteWorkflow] |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
if_else |
|
|
is_empty |
|
|
flytekit.remote.FlyteLaunchPlan
A class encapsulating a remote Flyte launch plan.
def FlyteLaunchPlan(
id,
args,
kwargs,
):
Parameter |
Type |
id |
|
args |
*args |
kwargs |
**kwargs |
Methods
compile()
def compile(
ctx: FlyteContext,
args,
kwargs,
):
Parameter |
Type |
ctx |
FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
execute()
Parameter |
Type |
kwargs |
**kwargs |
from_flyte_idl()
def from_flyte_idl(
pb2,
):
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
def promote_from_model(
id: id_models.Identifier,
model: _launch_plan_models.LaunchPlanSpec,
):
Parameter |
Type |
id |
id_models.Identifier |
model |
_launch_plan_models.LaunchPlanSpec |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
annotations |
|
|
auth_role |
|
|
default_inputs |
|
|
entity_metadata |
|
|
entity_type_text |
|
|
fixed_inputs |
|
|
flyte_workflow |
|
|
id |
|
|
interface |
|
|
is_empty |
|
|
is_scheduled |
|
|
labels |
|
|
max_parallelism |
|
|
name |
|
|
overwrite_cache |
|
|
python_interface |
|
|
raw_output_data_config |
|
|
resource_type |
|
|
security_context |
|
|
workflow_id |
|
|
flytekit.remote.FlyteNode
A class encapsulating a remote Flyte node.
def FlyteNode(
id,
upstream_nodes,
bindings,
metadata,
task_node: Optional[FlyteTaskNode],
workflow_node: Optional[FlyteWorkflowNode],
branch_node: Optional[FlyteBranchNode],
gate_node: Optional[FlyteGateNode],
array_node: Optional[FlyteArrayNode],
):
Parameter |
Type |
id |
|
upstream_nodes |
|
bindings |
|
metadata |
|
task_node |
Optional[FlyteTaskNode] |
workflow_node |
Optional[FlyteWorkflowNode] |
branch_node |
Optional[FlyteBranchNode] |
gate_node |
Optional[FlyteGateNode] |
array_node |
Optional[FlyteArrayNode] |
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
def promote_from_model(
model: _workflow_model.Node,
sub_workflows: Optional[Dict[id_models.Identifier, _workflow_model.WorkflowTemplate]],
node_launch_plans: Optional[Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec]],
tasks: Dict[id_models.Identifier, FlyteTask],
converted_sub_workflows: Dict[id_models.Identifier, FlyteWorkflow],
):
Parameter |
Type |
model |
_workflow_model.Node |
sub_workflows |
Optional[Dict[id_models.Identifier, _workflow_model.WorkflowTemplate]] |
node_launch_plans |
Optional[Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec]] |
tasks |
Dict[id_models.Identifier, FlyteTask] |
converted_sub_workflows |
Dict[id_models.Identifier, FlyteWorkflow] |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
array_node |
|
|
branch_node |
|
|
flyte_entity |
|
|
gate_node |
|
|
id |
|
|
inputs |
|
|
is_empty |
|
|
metadata |
|
|
output_aliases |
|
|
target |
|
|
task_node |
|
|
upstream_node_ids |
|
|
upstream_nodes |
|
|
workflow_node |
|
|
flytekit.remote.FlyteNodeExecution
A class encapsulating a node execution being run on a Flyte remote backend.
def FlyteNodeExecution(
args,
kwargs,
):
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
Methods
from_flyte_idl()
def from_flyte_idl(
p: flyteidl.admin.node_execution_pb2.NodeExecution,
):
Parameter |
Type |
p |
flyteidl.admin.node_execution_pb2.NodeExecution |
def promote_from_model(
base_model: node_execution_models.NodeExecution,
):
Parameter |
Type |
base_model |
node_execution_models.NodeExecution |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
closure |
|
|
error |
|
|
executions |
|
|
id |
|
|
input_uri |
|
|
inputs |
|
|
interface |
|
|
is_done |
|
|
is_empty |
|
|
metadata |
|
|
outputs |
|
|
subworkflow_node_executions |
|
|
task_executions |
|
|
workflow_executions |
|
|
flytekit.remote.FlyteRemote
Main entrypoint for programmatically accessing a Flyte remote backend.
The term ‘remote’ is synonymous with ‘backend’ or ‘deployment’ and refers to a hosted instance of the
Flyte platform, which comes with a Flyte Admin server on some known URI.
def FlyteRemote(
config: Config,
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: typing.Optional[bool],
kwargs,
):
Initialize a FlyteRemote object.
:type kwargs: All arguments that can be passed to create the SynchronousFlyteClient. These are usually grpc
parameters, if you want to customize credentials, ssl handling etc.
Parameter |
Type |
config |
Config |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
typing.Optional[bool] |
kwargs |
**kwargs |
Methods
Method |
Description |
activate_launchplan() |
Given a launchplan, activate it, all previous versions are deactivated |
approve() |
|
auto() |
None |
download() |
Download the data to the specified location |
execute() |
Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity |
execute_local_launch_plan() |
Execute a locally defined LaunchPlan |
execute_local_task() |
Execute a @task-decorated function or TaskTemplate task |
execute_local_workflow() |
Execute an @workflow decorated function |
execute_reference_launch_plan() |
Execute a ReferenceLaunchPlan |
execute_reference_task() |
Execute a ReferenceTask |
execute_reference_workflow() |
Execute a ReferenceWorkflow |
execute_remote_task_lp() |
Execute a FlyteTask, or FlyteLaunchplan |
execute_remote_wf() |
Execute a FlyteWorkflow |
fast_package() |
Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location |
fast_register_workflow() |
Use this method to register a workflow with zip mode |
fetch_active_launchplan() |
Returns the active version of the launch plan if it exists or returns None |
fetch_execution() |
Fetch a workflow execution entity from flyte admin |
fetch_launch_plan() |
Fetch a launchplan entity from flyte admin |
fetch_task() |
Fetch a task entity from flyte admin |
fetch_task_lazy() |
Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily |
fetch_workflow() |
Fetch a workflow entity from flyte admin |
fetch_workflow_lazy() |
Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily |
find_launch_plan() |
None |
find_launch_plan_for_node() |
None |
for_endpoint() |
None |
for_sandbox() |
None |
generate_console_http_domain() |
This should generate the domain where console is hosted |
generate_console_url() |
Generate a Flyteconsole URL for the given Flyte remote endpoint |
get() |
General function that works with flyte tiny urls |
get_domains() |
Lists registered domains from flyte admin |
get_execution_metrics() |
Get the metrics for a given execution |
get_extra_headers_for_protocol() |
None |
launch_backfill() |
Creates and launches a backfill workflow for the given launchplan |
list_projects() |
Lists registered projects from flyte admin |
list_signals() |
|
list_tasks_by_version() |
None |
raw_register() |
Raw register method, can be used to register control plane entities |
recent_executions() |
None |
register_launch_plan() |
Register a given launchplan, possibly applying overrides from the provided options |
register_script() |
Use this method to register a workflow via script mode |
register_task() |
Register a qualified task (PythonTask) with Remote |
register_workflow() |
Use this method to register a workflow |
reject() |
|
remote_context() |
Context manager with remote-specific configuration |
set_input() |
|
set_signal() |
|
sync() |
This function was previously a singledispatchmethod |
sync_execution() |
Sync a FlyteWorkflowExecution object with its corresponding remote state |
sync_node_execution() |
Get data backing a node execution |
sync_task_execution() |
Sync a FlyteTaskExecution object with its corresponding remote state |
terminate() |
Terminate a workflow execution |
upload_file() |
Function will use remote’s client to hash and then upload the file using Admin’s data proxy service |
wait() |
Wait for an execution to finish |
activate_launchplan()
def activate_launchplan(
ident: Identifier,
):
Given a launchplan, activate it, all previous versions are deactivated.
Parameter |
Type |
ident |
Identifier |
approve()
def approve(
signal_id: str,
execution_name: str,
project: str,
domain: str,
):
Parameter |
Type |
signal_id |
str |
execution_name |
str |
project |
str |
domain |
str |
auto()
def auto(
config_file: typing.Union[str, ConfigFile],
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
):
Parameter |
Type |
config_file |
typing.Union[str, ConfigFile] |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
bool |
kwargs |
**kwargs |
download()
def download(
data: typing.Union[LiteralsResolver, Literal, LiteralMap],
download_to: str,
recursive: bool,
):
Download the data to the specified location. If the data is a LiteralsResolver, LiteralMap and if recursive is
specified, then all file like objects will be recursively downloaded (e.g. FlyteFile/Dir (blob),
StructuredDataset etc).
Note: That it will use your sessions credentials to access the remote location. For sandbox, this should be
automatically configured, assuming you are running sandbox locally. For other environments, you will need to
configure your credentials appropriately.
Parameter |
Type |
data |
typing.Union[LiteralsResolver, Literal, LiteralMap] |
download_to |
str |
recursive |
bool |
execute()
def execute(
entity: typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity],
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
):
Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity.
This method supports:
Flyte{Task, Workflow, LaunchPlan}
remote module objects.
@task
-decorated functions and TaskTemplate
tasks.
@workflow
-decorated functions.
LaunchPlan
objects.
For local entities, this code will attempt to find the entity first, and if missing, will compile and register
the object.
Not all arguments are relevant in all circumstances. For example, there’s no reason to use the serialization
settings for entities that have already been registered on Admin.
Parameter |
Type |
entity |
typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity] |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
name |
str |
version |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
image_config |
typing.Optional[ImageConfig] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_local_launch_plan()
def execute_local_launch_plan(
entity: LaunchPlan,
inputs: typing.Dict[str, typing.Any],
version: str,
project: typing.Optional[str],
domain: typing.Optional[str],
name: typing.Optional[str],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
):
Execute a locally defined LaunchPlan
.
Parameter |
Type |
entity |
LaunchPlan |
inputs |
typing.Dict[str, typing.Any] |
version |
str |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
name |
typing.Optional[str] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_local_task()
def execute_local_task(
entity: PythonTask,
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
options: typing.Optional[Options],
serialization_settings: typing.Optional[SerializationSettings],
):
Execute a @task-decorated function or TaskTemplate task.
Parameter |
Type |
entity |
PythonTask |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
name |
str |
version |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
image_config |
typing.Optional[ImageConfig] |
wait |
bool |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
options |
typing.Optional[Options] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_local_workflow()
def execute_local_workflow(
entity: WorkflowBase,
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
options: typing.Optional[Options],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
):
Execute an @workflow decorated function.
Parameter |
Type |
entity |
WorkflowBase |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
name |
str |
version |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
image_config |
typing.Optional[ImageConfig] |
options |
typing.Optional[Options] |
wait |
bool |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_reference_launch_plan()
def execute_reference_launch_plan(
entity: ReferenceLaunchPlan,
inputs: typing.Dict[str, typing.Any],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a ReferenceLaunchPlan.
Parameter |
Type |
entity |
ReferenceLaunchPlan |
inputs |
typing.Dict[str, typing.Any] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_reference_task()
def execute_reference_task(
entity: ReferenceTask,
inputs: typing.Dict[str, typing.Any],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a ReferenceTask.
Parameter |
Type |
entity |
ReferenceTask |
inputs |
typing.Dict[str, typing.Any] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_reference_workflow()
def execute_reference_workflow(
entity: ReferenceWorkflow,
inputs: typing.Dict[str, typing.Any],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a ReferenceWorkflow.
Parameter |
Type |
entity |
ReferenceWorkflow |
inputs |
typing.Dict[str, typing.Any] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_remote_task_lp()
def execute_remote_task_lp(
entity: typing.Union[FlyteTask, FlyteLaunchPlan],
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a FlyteTask, or FlyteLaunchplan.
NOTE: the name and version arguments are currently not used and only there consistency in the function signature
Parameter |
Type |
entity |
typing.Union[FlyteTask, FlyteLaunchPlan] |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_remote_wf()
def execute_remote_wf(
entity: FlyteWorkflow,
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a FlyteWorkflow.
NOTE: the name and version arguments are currently not used and only there consistency in the function signature
Parameter |
Type |
entity |
FlyteWorkflow |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
fast_package()
def fast_package(
root: os.PathLike,
deref_symlinks: bool,
output: str,
options: typing.Optional[FastPackageOptions],
):
Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location
Parameter |
Type |
root |
os.PathLike |
deref_symlinks |
bool |
output |
str |
options |
typing.Optional[FastPackageOptions] |
fast_register_workflow()
def fast_register_workflow(
entity: WorkflowBase,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
default_launch_plan: typing.Optional[bool],
options: typing.Optional[Options],
fast_package_options: typing.Optional[FastPackageOptions],
):
Use this method to register a workflow with zip mode.
Parameter |
Type |
entity |
WorkflowBase |
serialization_settings |
typing.Optional[SerializationSettings] |
version |
typing.Optional[str] |
default_launch_plan |
typing.Optional[bool] |
options |
typing.Optional[Options] |
fast_package_options |
typing.Optional[FastPackageOptions] |
fetch_active_launchplan()
def fetch_active_launchplan(
project: str,
domain: str,
name: str,
):
Returns the active version of the launch plan if it exists or returns None
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
fetch_execution()
def fetch_execution(
project: str,
domain: str,
name: str,
):
Fetch a workflow execution entity from flyte admin.
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
fetch_launch_plan()
def fetch_launch_plan(
project: str,
domain: str,
name: str,
version: str,
):
Fetch a launchplan entity from flyte admin.
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_task()
def fetch_task(
project: str,
domain: str,
name: str,
version: str,
):
Fetch a task entity from flyte admin.
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_task_lazy()
def fetch_task_lazy(
project: str,
domain: str,
name: str,
version: str,
):
Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily.
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_workflow()
def fetch_workflow(
project: str,
domain: str,
name: str,
version: str,
):
Fetch a workflow entity from flyte admin.
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_workflow_lazy()
def fetch_workflow_lazy(
project: str,
domain: str,
name: str,
version: str,
):
Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily.
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
find_launch_plan()
def find_launch_plan(
lp_ref: id_models,
node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
):
Parameter |
Type |
lp_ref |
id_models |
node_launch_plans |
Dict[id_models, launch_plan_models.LaunchPlanSpec] |
find_launch_plan_for_node()
def find_launch_plan_for_node(
node: Node,
node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
):
Parameter |
Type |
node |
Node |
node_launch_plans |
Dict[id_models, launch_plan_models.LaunchPlanSpec] |
for_endpoint()
def for_endpoint(
endpoint: str,
insecure: bool,
data_config: typing.Optional[DataConfig],
config_file: typing.Union[str, ConfigFile],
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
):
Parameter |
Type |
endpoint |
str |
insecure |
bool |
data_config |
typing.Optional[DataConfig] |
config_file |
typing.Union[str, ConfigFile] |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
bool |
kwargs |
**kwargs |
for_sandbox()
def for_sandbox(
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
):
Parameter |
Type |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
bool |
kwargs |
**kwargs |
generate_console_http_domain()
def generate_console_http_domain()
This should generate the domain where console is hosted.
:return:
generate_console_url()
def generate_console_url(
entity: typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, WorkflowExecutionIdentifier, Identifier, FlyteLaunchPlan],
):
Generate a Flyteconsole URL for the given Flyte remote endpoint.
This will automatically determine if this is an execution or an entity and change the type automatically
Parameter |
Type |
entity |
typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, WorkflowExecutionIdentifier, Identifier, FlyteLaunchPlan] |
get()
def get(
flyte_uri: typing.Optional[str],
):
General function that works with flyte tiny urls. This can return outputs (in the form of LiteralsResolver, or
individual Literals for singular requests), or HTML if passed a deck link, or bytes containing HTML,
if ipython is not available locally.
Parameter |
Type |
flyte_uri |
typing.Optional[str] |
get_domains()
Lists registered domains from flyte admin.
:returns: typing.List[flytekit.models.domain.Domain]
get_execution_metrics()
def get_execution_metrics(
id: WorkflowExecutionIdentifier,
depth: int,
):
Get the metrics for a given execution.
Parameter |
Type |
id |
WorkflowExecutionIdentifier |
depth |
int |
def get_extra_headers_for_protocol(
native_url,
):
Parameter |
Type |
native_url |
|
launch_backfill()
def launch_backfill(
project: str,
domain: str,
from_date: datetime,
to_date: datetime,
launchplan: str,
launchplan_version: str,
execution_name: str,
version: str,
dry_run: bool,
execute: bool,
parallel: bool,
failure_policy: typing.Optional[WorkflowFailurePolicy],
overwrite_cache: typing.Optional[bool],
):
Creates and launches a backfill workflow for the given launchplan. If launchplan version is not specified,
then the latest launchplan is retrieved.
The from_date is exclusive and end_date is inclusive and backfill run for all instances in between. ::
-> (start_date - exclusive, end_date inclusive)
If dry_run is specified, the workflow is created and returned.
If execute==False is specified then the workflow is created and registered.
In the last case, the workflow is created, registered and executed.
The parallel
flag can be used to generate a workflow where all launchplans can be run in parallel. Default
is that execute backfill is run sequentially
Parameter |
Type |
project |
str |
domain |
str |
from_date |
datetime |
to_date |
datetime |
launchplan |
str |
launchplan_version |
str |
execution_name |
str |
version |
str |
dry_run |
bool |
execute |
bool |
parallel |
bool |
failure_policy |
typing.Optional[WorkflowFailurePolicy] |
overwrite_cache |
typing.Optional[bool] |
list_projects()
def list_projects(
limit: typing.Optional[int],
filters: typing.Optional[typing.List[filter_models.Filter]],
sort_by: typing.Optional[admin_common_models.Sort],
):
Lists registered projects from flyte admin.
Parameter |
Type |
limit |
typing.Optional[int] |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
sort_by |
typing.Optional[admin_common_models.Sort] |
list_signals()
def list_signals(
execution_name: str,
project: typing.Optional[str],
domain: typing.Optional[str],
limit: int,
filters: typing.Optional[typing.List[filter_models.Filter]],
):
Parameter |
Type |
execution_name |
str |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
limit |
int |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
list_tasks_by_version()
def list_tasks_by_version(
version: str,
project: typing.Optional[str],
domain: typing.Optional[str],
limit: typing.Optional[int],
):
Parameter |
Type |
version |
str |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
limit |
typing.Optional[int] |
raw_register()
def raw_register(
cp_entity: FlyteControlPlaneEntity,
settings: SerializationSettings,
version: str,
create_default_launchplan: bool,
options: Options,
og_entity: FlyteLocalEntity,
):
Raw register method, can be used to register control plane entities. Usually if you have a Flyte Entity like a
WorkflowBase, Task, LaunchPlan then use other methods. This should be used only if you have already serialized entities
Parameter |
Type |
cp_entity |
FlyteControlPlaneEntity |
settings |
SerializationSettings |
version |
str |
create_default_launchplan |
bool |
options |
Options |
og_entity |
FlyteLocalEntity |
recent_executions()
def recent_executions(
project: typing.Optional[str],
domain: typing.Optional[str],
limit: typing.Optional[int],
filters: typing.Optional[typing.List[filter_models.Filter]],
):
Parameter |
Type |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
limit |
typing.Optional[int] |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
register_launch_plan()
def register_launch_plan(
entity: LaunchPlan,
version: typing.Optional[str],
project: typing.Optional[str],
domain: typing.Optional[str],
options: typing.Optional[Options],
serialization_settings: typing.Optional[SerializationSettings],
):
Register a given launchplan, possibly applying overrides from the provided options. If the underlying workflow
is not already registered, it, along with any underlying entities, will also be registered. If the underlying
workflow does exist (with the given project/domain/version), then only the launchplan will be registered.
Parameter |
Type |
entity |
LaunchPlan |
version |
typing.Optional[str] |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
options |
typing.Optional[Options] |
serialization_settings |
typing.Optional[SerializationSettings] |
register_script()
def register_script(
entity: typing.Union[WorkflowBase, PythonTask, LaunchPlan],
image_config: typing.Optional[ImageConfig],
version: typing.Optional[str],
project: typing.Optional[str],
domain: typing.Optional[str],
destination_dir: str,
copy_all: bool,
default_launch_plan: bool,
options: typing.Optional[Options],
source_path: typing.Optional[str],
module_name: typing.Optional[str],
envs: typing.Optional[typing.Dict[str, str]],
fast_package_options: typing.Optional[FastPackageOptions],
):
Use this method to register a workflow via script mode.
Parameter |
Type |
entity |
typing.Union[WorkflowBase, PythonTask, LaunchPlan] |
image_config |
typing.Optional[ImageConfig] |
version |
typing.Optional[str] |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
destination_dir |
str |
copy_all |
bool |
default_launch_plan |
bool |
options |
typing.Optional[Options] |
source_path |
typing.Optional[str] |
module_name |
typing.Optional[str] |
envs |
typing.Optional[typing.Dict[str, str]] |
fast_package_options |
typing.Optional[FastPackageOptions] |
register_task()
def register_task(
entity: PythonTask,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
):
Register a qualified task (PythonTask) with Remote
For any conflicting parameters method arguments are regarded as overrides
Parameter |
Type |
entity |
PythonTask |
serialization_settings |
typing.Optional[SerializationSettings] |
version |
typing.Optional[str] |
register_workflow()
def register_workflow(
entity: WorkflowBase,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
default_launch_plan: typing.Optional[bool],
options: typing.Optional[Options],
):
Use this method to register a workflow.
Parameter |
Type |
entity |
WorkflowBase |
serialization_settings |
typing.Optional[SerializationSettings] |
version |
typing.Optional[str] |
default_launch_plan |
typing.Optional[bool] |
options |
typing.Optional[Options] |
reject()
def reject(
signal_id: str,
execution_name: str,
project: str,
domain: str,
):
Parameter |
Type |
signal_id |
str |
execution_name |
str |
project |
str |
domain |
str |
remote_context()
Context manager with remote-specific configuration.
def set_input(
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project,
domain,
python_type,
literal_type,
):
Parameter |
Type |
signal_id |
str |
execution_name |
str |
value |
typing.Union[literal_models.Literal, typing.Any] |
project |
|
domain |
|
python_type |
|
literal_type |
|
set_signal()
def set_signal(
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project: typing.Optional[str],
domain: typing.Optional[str],
python_type: typing.Optional[typing.Type],
literal_type: typing.Optional[type_models.LiteralType],
):
Parameter |
Type |
signal_id |
str |
execution_name |
str |
value |
typing.Union[literal_models.Literal, typing.Any] |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
python_type |
typing.Optional[typing.Type] |
literal_type |
typing.Optional[type_models.LiteralType] |
sync()
def sync(
execution: FlyteWorkflowExecution,
entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
sync_nodes: bool,
):
This function was previously a singledispatchmethod. We’ve removed that but this function remains
so that we don’t break people.
Parameter |
Type |
execution |
FlyteWorkflowExecution |
entity_definition |
typing.Union[FlyteWorkflow, FlyteTask] |
sync_nodes |
bool |
sync_execution()
def sync_execution(
execution: FlyteWorkflowExecution,
entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
sync_nodes: bool,
):
Sync a FlyteWorkflowExecution object with its corresponding remote state.
Parameter |
Type |
execution |
FlyteWorkflowExecution |
entity_definition |
typing.Union[FlyteWorkflow, FlyteTask] |
sync_nodes |
bool |
sync_node_execution()
def sync_node_execution(
execution: FlyteNodeExecution,
node_mapping: typing.Dict[str, FlyteNode],
):
Get data backing a node execution. These FlyteNodeExecution objects should’ve come from Admin with the model
fields already populated correctly. For purposes of the remote experience, we’d like to supplement the object
with some additional fields:
- inputs/outputs
- task/workflow executions, and/or underlying node executions in the case of parent nodes
- TypedInterface (remote wrapper type)
A node can have several different types of executions behind it. That is, the node could’ve run (perhaps
multiple times because of retries):
- A task
- A static subworkflow
- A dynamic subworkflow (which in turn may have run additional tasks, subwfs, and/or launch plans)
- A launch plan
The data model is complicated, so ascertaining which of these happened is a bit tricky. That logic is
encapsulated in this function.
Parameter |
Type |
execution |
FlyteNodeExecution |
node_mapping |
typing.Dict[str, FlyteNode] |
sync_task_execution()
def sync_task_execution(
execution: FlyteTaskExecution,
entity_interface: typing.Optional[TypedInterface],
):
Sync a FlyteTaskExecution object with its corresponding remote state.
Parameter |
Type |
execution |
FlyteTaskExecution |
entity_interface |
typing.Optional[TypedInterface] |
terminate()
def terminate(
execution: FlyteWorkflowExecution,
cause: str,
):
Terminate a workflow execution.
Parameter |
Type |
execution |
FlyteWorkflowExecution |
cause |
str |
upload_file()
def upload_file(
to_upload: pathlib.Path,
project: typing.Optional[str],
domain: typing.Optional[str],
filename_root: typing.Optional[str],
):
Function will use remote’s client to hash and then upload the file using Admin’s data proxy service.
Parameter |
Type |
to_upload |
pathlib.Path |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
filename_root |
typing.Optional[str] |
wait()
def wait(
execution: FlyteWorkflowExecution,
timeout: typing.Optional[typing.Union[timedelta, int]],
poll_interval: typing.Optional[typing.Union[timedelta, int]],
sync_nodes: bool,
):
Wait for an execution to finish.
Parameter |
Type |
execution |
FlyteWorkflowExecution |
timeout |
typing.Optional[typing.Union[timedelta, int]] |
poll_interval |
typing.Optional[typing.Union[timedelta, int]] |
sync_nodes |
bool |
Properties
Property |
Type |
Description |
client |
|
|
config |
|
|
context |
|
|
default_domain |
|
|
default_project |
|
|
file_access |
|
|
interactive_mode_enabled |
|
|
flytekit.remote.FlyteTask
A class encapsulating a remote Flyte task.
def FlyteTask(
id,
type,
metadata,
interface,
custom,
container,
task_type_version: int,
security_context,
config,
k8s_pod,
sql,
extended_resources,
should_register: bool,
):
Parameter |
Type |
id |
|
type |
|
metadata |
|
interface |
|
custom |
|
container |
|
task_type_version |
int |
security_context |
|
config |
|
k8s_pod |
|
sql |
|
extended_resources |
|
should_register |
bool |
Methods
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
execute()
Parameter |
Type |
kwargs |
**kwargs |
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
def promote_from_model(
base_model: _task_model.TaskTemplate,
):
Parameter |
Type |
base_model |
_task_model.TaskTemplate |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
config |
|
|
container |
|
|
custom |
|
|
docs |
|
|
entity_type_text |
|
|
extended_resources |
|
|
id |
|
|
interface |
|
|
is_empty |
|
|
k8s_pod |
|
|
metadata |
|
|
name |
|
|
python_interface |
|
|
resource_type |
|
|
security_context |
|
|
should_register |
|
|
sql |
|
|
task_type_version |
|
|
template |
|
|
type |
|
|
flytekit.remote.FlyteTaskExecution
A class encapsulating a task execution being run on a Flyte remote backend.
def FlyteTaskExecution(
args,
kwargs,
):
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
Methods
from_flyte_idl()
def from_flyte_idl(
proto,
):
def promote_from_model(
base_model: admin_task_execution_models.TaskExecution,
):
Parameter |
Type |
base_model |
admin_task_execution_models.TaskExecution |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
closure |
|
|
error |
|
|
id |
|
|
input_uri |
|
|
inputs |
|
|
is_done |
|
|
is_empty |
|
|
is_parent |
|
|
outputs |
|
|
task |
|
|
flytekit.remote.FlyteTaskNode
A class encapsulating a task that a Flyte node needs to execute.
def FlyteTaskNode(
flyte_task: FlyteTask,
):
Refers to the task that the Node is to execute.
This is currently a oneof in protobuf, but there’s only one option currently.
This code should be updated when more options are available.
Parameter |
Type |
flyte_task |
FlyteTask |
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
def promote_from_model(
task: FlyteTask,
):
Takes the idl wrapper for a TaskNode,
and returns the hydrated Flytekit object for it by fetching it with the FlyteTask control plane.
Parameter |
Type |
task |
FlyteTask |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
flyte_task |
|
|
is_empty |
|
|
overrides |
|
|
reference_id |
|
|
flytekit.remote.FlyteWorkflow
A class encapsulating a remote Flyte workflow.
def FlyteWorkflow(
id: id_models.Identifier,
nodes: List[FlyteNode],
interface,
output_bindings,
metadata,
metadata_defaults,
subworkflows: Optional[List[FlyteWorkflow]],
tasks: Optional[List[FlyteTask]],
launch_plans: Optional[Dict[id_models.Identifier, launch_plan_models.LaunchPlanSpec]],
compiled_closure: Optional[compiler_models.CompiledWorkflowClosure],
should_register: bool,
):
Parameter |
Type |
id |
id_models.Identifier |
nodes |
List[FlyteNode] |
interface |
|
output_bindings |
|
metadata |
|
metadata_defaults |
|
subworkflows |
Optional[List[FlyteWorkflow]] |
tasks |
Optional[List[FlyteTask]] |
launch_plans |
Optional[Dict[id_models.Identifier, launch_plan_models.LaunchPlanSpec]] |
compiled_closure |
Optional[compiler_models.CompiledWorkflowClosure] |
should_register |
bool |
Methods
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
execute()
Parameter |
Type |
kwargs |
**kwargs |
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
get_non_system_nodes()
def get_non_system_nodes(
nodes: List[_workflow_models.Node],
):
Parameter |
Type |
nodes |
List[_workflow_models.Node] |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
def promote_from_closure(
closure: compiler_models.CompiledWorkflowClosure,
node_launch_plans: Optional[Dict[id_models, launch_plan_models.LaunchPlanSpec]],
):
Extracts out the relevant portions of a FlyteWorkflow from a closure from the control plane.
Parameter |
Type |
closure |
compiler_models.CompiledWorkflowClosure |
node_launch_plans |
Optional[Dict[id_models, launch_plan_models.LaunchPlanSpec]] |
def promote_from_model(
base_model: _workflow_models.WorkflowTemplate,
sub_workflows: Optional[Dict[Identifier, _workflow_models.WorkflowTemplate]],
tasks: Optional[Dict[Identifier, FlyteTask]],
node_launch_plans: Optional[Dict[Identifier, launch_plan_models.LaunchPlanSpec]],
):
Parameter |
Type |
base_model |
_workflow_models.WorkflowTemplate |
sub_workflows |
Optional[Dict[Identifier, _workflow_models.WorkflowTemplate]] |
tasks |
Optional[Dict[Identifier, FlyteTask]] |
node_launch_plans |
Optional[Dict[Identifier, launch_plan_models.LaunchPlanSpec]] |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
docs |
|
|
entity_type_text |
|
|
failure_node |
|
|
flyte_nodes |
|
|
flyte_sub_workflows |
|
|
flyte_tasks |
|
|
id |
|
|
interface |
|
|
is_empty |
|
|
metadata |
|
|
metadata_defaults |
|
|
name |
|
|
nodes |
|
|
outputs |
|
|
python_interface |
|
|
resource_type |
|
|
should_register |
|
|
sub_workflows |
|
|
template |
|
|
flytekit.remote.FlyteWorkflowExecution
A class encapsulating a workflow execution being run on a Flyte remote backend.
def FlyteWorkflowExecution(
type_hints: Optional[Dict[str, typing.Type]],
remote: Optional['FlyteRemote'],
args,
kwargs,
):
Parameter |
Type |
type_hints |
Optional[Dict[str, typing.Type]] |
remote |
Optional['FlyteRemote'] |
args |
*args |
kwargs |
**kwargs |
Methods
from_flyte_idl()
def from_flyte_idl(
pb,
):
def promote_from_model(
base_model: execution_models.Execution,
remote: Optional['FlyteRemote'],
type_hints: Optional[Dict[str, typing.Type]],
):
Parameter |
Type |
base_model |
execution_models.Execution |
remote |
Optional['FlyteRemote'] |
type_hints |
Optional[Dict[str, typing.Type]] |
serialize_to_string()
def serialize_to_string()
short_string()
sync()
def sync(
sync_nodes: bool,
):
Sync the state of the current execution and returns a new object with the updated state.
Parameter |
Type |
sync_nodes |
bool |
to_flyte_idl()
verbose_string()
wait()
def wait(
timeout: Optional[Union[timedelta, int]],
poll_interval: Optional[Union[timedelta, int]],
sync_nodes: bool,
):
Wait for the execution to complete. This is a blocking call.
Parameter |
Type |
timeout |
Optional[Union[timedelta, int]] |
poll_interval |
Optional[Union[timedelta, int]] |
sync_nodes |
bool |
Properties
Property |
Type |
Description |
closure |
|
|
error |
|
|
execution_url |
|
|
flyte_workflow |
|
|
id |
|
|
inputs |
|
|
is_done |
|
|
is_empty |
|
|
is_successful |
|
|
node_executions |
|
|
outputs |
|
|
spec |
|
|
flytekit.remote.FlyteWorkflowNode
A class encapsulating a workflow that a Flyte node needs to execute.
def FlyteWorkflowNode(
flyte_workflow: FlyteWorkflow,
flyte_launch_plan: FlyteLaunchPlan,
):
Refers to a the workflow the node is to execute. One of the references must be supplied.
Parameter |
Type |
flyte_workflow |
FlyteWorkflow |
flyte_launch_plan |
FlyteLaunchPlan |
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
def promote_from_model(
base_model: _workflow_model.WorkflowNode,
sub_workflows: Dict[id_models.Identifier, _workflow_model.WorkflowTemplate],
node_launch_plans: Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec],
tasks: Dict[Identifier, FlyteTask],
converted_sub_workflows: Dict[id_models.Identifier, FlyteWorkflow],
):
Parameter |
Type |
base_model |
_workflow_model.WorkflowNode |
sub_workflows |
Dict[id_models.Identifier, _workflow_model.WorkflowTemplate] |
node_launch_plans |
Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec] |
tasks |
Dict[Identifier, FlyteTask] |
converted_sub_workflows |
Dict[id_models.Identifier, FlyteWorkflow] |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
flyte_launch_plan |
|
|
flyte_workflow |
|
|
is_empty |
|
|
launchplan_ref |
|
|
reference |
|
|
sub_workflow_ref |
|
|