flytekit.core.workflow
Directory
Classes
Class | Description |
---|---|
ImperativeWorkflow |
An imperative workflow is a programmatic analogue to the typical @workflow function-based workflow and is. |
PythonFunctionWorkflow |
Please read :std:ref:flyte:divedeep-workflows first for a high-level understanding of what workflows are in Flyte. |
ReferenceWorkflow |
A reference workflow is a pointer to a workflow that already exists on your Flyte installation. |
WorkflowBase |
|
WorkflowMetadata |
|
WorkflowMetadataDefaults |
This class is similarly named to the one above. |
Methods
Method | Description |
---|---|
construct_input_promises() |
|
get_promise() |
This is a helper function that will turn a binding into a Promise object, using a lookup map. |
get_promise_map() |
Local execution of imperatively defined workflows is done node by node. |
reference_workflow() |
A reference workflow is a pointer to a workflow that already exists on your Flyte installation. |
workflow() |
This decorator declares a function to be a Flyte workflow. |
Variables
Property | Type | Description |
---|---|---|
FuncOut |
TypeVar |
|
P |
ParamSpec |
|
T |
TypeVar |
Methods
construct_input_promises()
def construct_input_promises(
inputs: List[str],
) -> Dict[str, Promise]
Parameter | Type |
---|---|
inputs |
List[str] |
get_promise()
def get_promise(
binding_data: _literal_models.BindingData,
outputs_cache: Dict[Node, Dict[str, Promise]],
) -> Promise
This is a helper function that will turn a binding into a Promise object, using a lookup map. Please see get_promise_map for the rest of the details.
Parameter | Type |
---|---|
binding_data |
_literal_models.BindingData |
outputs_cache |
Dict[Node, Dict[str, Promise]] |
get_promise_map()
def get_promise_map(
bindings: List[_literal_models.Binding],
outputs_cache: Dict[Node, Dict[str, Promise]],
) -> Dict[str, Promise]
Local execution of imperatively defined workflows is done node by node. This function will fill in the node’s entity’s input arguments, which are specified using the bindings list, and a map of nodes to its outputs. Basically this takes the place of propeller in resolving bindings, pulling in outputs from previously completed nodes and filling in the necessary inputs.
Parameter | Type |
---|---|
bindings |
List[_literal_models.Binding] |
outputs_cache |
Dict[Node, Dict[str, Promise]] |
reference_workflow()
def reference_workflow(
project: str,
domain: str,
name: str,
version: str,
) -> Callable[[Callable[..., Any]], ReferenceWorkflow]
A reference workflow is a pointer to a workflow that already exists on your Flyte installation. This object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface. If at registration time the interface provided causes an issue with compilation, an error will be returned.
Example:
@reference_workflow(project="proj", domain="development", name="wf_name", version="abc")
def ref_wf1(a: int) -> typing.Tuple[str, str]:
...
return "hello", "world"
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
workflow()
def workflow(
_workflow_function: Optional[Callable[P, FuncOut]],
failure_policy: Optional[WorkflowFailurePolicy],
interruptible: bool,
on_failure: Optional[Union[WorkflowBase, Task]],
docs: Optional[Documentation],
pickle_untyped: bool,
default_options: Optional[Options],
) -> Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionWorkflow], PythonFunctionWorkflow]
This decorator declares a function to be a Flyte workflow. Workflows are declarative entities that construct a DAG of tasks using the data flow between tasks.
Unlike a task, the function body of a workflow is evaluated at serialization-time (aka compile-time). This is because while we can determine the entire structure of a task by looking at the function’s signature, workflows need to run through the function itself because the body of the function is what expresses the workflow structure. It’s also important to note that, local execution notwithstanding, it is not evaluated again when the workflow runs on Flyte. That is, workflows should not call non-Flyte entities since they are only run once (again, this is with respect to the platform, local runs notwithstanding).
Example:
import os
import sys
import typing
from collections import OrderedDict
from unittest.mock import patch
import pytest
from typing_extensions import Annotated # type: ignore
import flytekit.configuration
from flytekit import FlyteContextManager, StructuredDataset, kwtypes
from flytekit.configuration import Image, ImageConfig
from flytekit.core import context_manager
from flytekit.core.condition import conditional
from flytekit.core.task import task
from flytekit.core.workflow import WorkflowFailurePolicy, WorkflowMetadata, WorkflowMetadataDefaults, workflow
from flytekit.exceptions.user import FlyteValidationException, FlyteValueException, FlyteMissingReturnValueException
from flytekit.tools.translator import get_serializable
from flytekit.types.error.error import FlyteError
default_img = Image(name="default", fqn="test", tag="tag")
serialization_settings = flytekit.configuration.SerializationSettings(
project="project",
domain="domain",
version="version",
env=None,
image_config=ImageConfig(default_image=default_img, images=[default_img]),
)
def test_metadata_values():
with pytest.raises(FlyteValidationException):
WorkflowMetadata(on_failure=0)
wm = WorkflowMetadata(on_failure=WorkflowFailurePolicy.FAIL_IMMEDIATELY)
assert wm.on_failure == WorkflowFailurePolicy.FAIL_IMMEDIATELY
def test_default_metadata_values():
with pytest.raises(FlyteValidationException):
WorkflowMetadataDefaults(3)
wm = WorkflowMetadataDefaults(interruptible=False)
assert wm.interruptible is False
def test_workflow_values():
@task
def t1(a: int) -> typing.NamedTuple("OutputsBC", [("t1_int_output", int), ("c", str)]):
a = a + 2
return a, "world-" + str(a)
@workflow(interruptible=True, failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def wf(a: int) -> typing.Tuple[str, str]:
x, y = t1(a=a)
_, v = t1(a=x)
return y, v
wf_spec = get_serializable(OrderedDict(), serialization_settings, wf)
assert wf_spec.template.metadata_defaults.interruptible
assert wf_spec.template.metadata.on_failure == 1
def test_default_values():
@task
def t() -> bool:
return True
@task
def f() -> bool:
return False
@workflow
def wf(a: bool = True) -> bool:
return conditional("bool").if_(a.is_true()).then(t()).else_().then(f()) # type: ignore
assert wf() is True
assert wf(a=False) is False
def test_list_output_wf():
@task
def t1(a: int) -> int:
a = a + 5
return a
@workflow
def list_output_wf() -> typing.List[int]:
v = []
for i in range(2):
v.append(t1(a=i))
return v
x = list_output_wf()
assert x == [5, 6]
def test_sub_wf_single_named_tuple():
nt = typing.NamedTuple("SingleNamedOutput", [("named1", int)])
@task
def t1(a: int) -> nt:
a = a + 2
return nt(a)
@workflow
def subwf(a: int) -> nt:
return t1(a=a)
@workflow
def wf(b: int) -> nt:
out = subwf(a=b)
return t1(a=out.named1)
x = wf(b=3)
assert x == (7,)
def test_sub_wf_multi_named_tuple():
nt = typing.NamedTuple("Multi", [("named1", int), ("named2", int)])
@task
def t1(a: int) -> nt:
a = a + 2
return nt(a, a)
@workflow
def subwf(a: int) -> nt:
return t1(a=a)
@workflow
def wf(b: int) -> nt:
out = subwf(a=b)
return t1(a=out.named1)
x = wf(b=3)
assert x == (7, 7)
def test_sub_wf_varying_types():
@task
def t1l(
a: typing.List[typing.Dict[str, typing.List[int]]],
b: typing.Dict[str, typing.List[int]],
c: typing.Union[typing.List[typing.Dict[str, typing.List[int]]], typing.Dict[str, typing.List[int]], int],
d: int,
) -> str:
xx = ",".join([f"{k}:{v}" for d in a for k, v in d.items()])
yy = ",".join([f"{k}: {i}" for k, v in b.items() for i in v])
if isinstance(c, list):
zz = ",".join([f"{k}:{v}" for d in c for k, v in d.items()])
elif isinstance(c, dict):
zz = ",".join([f"{k}: {i}" for k, v in c.items() for i in v])
else:
zz = str(c)
return f"First: {xx} Second: {yy} Third: {zz} Int: {d}"
@task
def get_int() -> int:
return 1
@workflow
def subwf(
a: typing.List[typing.Dict[str, typing.List[int]]],
b: typing.Dict[str, typing.List[int]],
c: typing.Union[typing.List[typing.Dict[str, typing.List[int]]], typing.Dict[str, typing.List[int]]],
d: int,
) -> str:
return t1l(a=a, b=b, c=c, d=d)
@workflow
def wf() -> str:
ds = [
{"first_map_a": [42], "first_map_b": [get_int(), 2]},
{
"second_map_c": [33],
"second_map_d": [9, 99],
},
]
ll = {
"ll_1": [get_int(), get_int(), get_int()],
"ll_2": [4, 5, 6],
}
out = subwf(a=ds, b=ll, c=ds, d=get_int())
return out
wf.compile()
x = wf()
expected = (
"First: first_map_a:[42],first_map_b:[1, 2],second_map_c:[33],second_map_d:[9, 99] "
"Second: ll_1: 1,ll_1: 1,ll_1: 1,ll_2: 4,ll_2: 5,ll_2: 6 "
"Third: first_map_a:[42],first_map_b:[1, 2],second_map_c:[33],second_map_d:[9, 99] "
"Int: 1"
)
assert x == expected
wf_spec = get_serializable(OrderedDict(), serialization_settings, wf)
assert set(wf_spec.template.nodes[5].upstream_node_ids) == {"n2", "n1", "n0", "n4", "n3"}
@workflow
def wf() -> str:
ds = [
{"first_map_a": [42], "first_map_b": [get_int(), 2]},
{
"second_map_c": [33],
"second_map_d": [9, 99],
},
]
ll = {
"ll_1": [get_int(), get_int(), get_int()],
"ll_2": [4, 5, 6],
}
out = subwf(a=ds, b=ll, c=ll, d=get_int())
return out
x = wf()
expected = (
"First: first_map_a:[42],first_map_b:[1, 2],second_map_c:[33],second_map_d:[9, 99] "
"Second: ll_1: 1,ll_1: 1,ll_1: 1,ll_2: 4,ll_2: 5,ll_2: 6 "
"Third: ll_1: 1,ll_1: 1,ll_1: 1,ll_2: 4,ll_2: 5,ll_2: 6 "
"Int: 1"
)
assert x == expected
def test_unexpected_outputs():
@task
def t1(a: int) -> int:
a = a + 5
return a
@workflow
def no_outputs_wf():
return t1(a=3)
# Should raise an exception because the workflow returns something when it shouldn't
with pytest.raises(FlyteValueException):
no_outputs_wf()
@pytest.mark.skipif(sys.version_info < (3, 10, 10), reason="inspect module does not work correctly with Python <3.10.10. https://github.com/python/cpython/issues/102647#issuecomment-1466868212")
def test_missing_return_value():
@task
def t1(a: int) -> int:
a = a + 5
return a
# Should raise an exception because it doesn't return something when it should
with pytest.raises(FlyteMissingReturnValueException):
@workflow
def one_output_wf() -> int: # type: ignore
t1(a=3)
one_output_wf()
def test_custom_wrapper():
def our_task(
_task_function: typing.Optional[typing.Callable] = None,
**kwargs,
):
def wrapped(_func: typing.Callable):
return task(_task_function=_func)
if _task_function:
return wrapped(_task_function)
else:
return wrapped
@our_task(
foo={
"bar1": lambda x: print(x),
"bar2": lambda x: print(x),
},
)
def missing_func_body() -> str:
return "foo"
def test_wf_no_output():
@task
def t1(a: int) -> int:
a = a + 5
return a
@workflow
def no_outputs_wf():
t1(a=3)
assert no_outputs_wf() is None
def test_wf_nested_comp(exec_prefix):
@task
def t1(a: int) -> int:
a = a + 5
return a
@workflow
def outer() -> typing.Tuple[int, int]:
# You should not do this. This is just here for testing.
@workflow
def wf2() -> int:
return t1(a=5)
return t1(a=3), wf2()
assert (8, 10) == outer()
entity_mapping = OrderedDict()
model_wf = get_serializable(entity_mapping, serialization_settings, outer)
assert len(model_wf.template.interface.outputs) == 2
assert len(model_wf.template.nodes) == 2
assert model_wf.template.nodes[1].workflow_node is not None
sub_wf = model_wf.sub_workflows[0]
assert len(sub_wf.nodes) == 1
assert sub_wf.nodes[0].id == "n0"
assert sub_wf.nodes[0].task_node.reference_id.name == f"{exec_prefix}tests.flytekit.unit.core.test_workflows.t1"
@task
def add_5(a: int) -> int:
a = a + 5
return a
@workflow
def simple_wf() -> int:
return add_5(a=1)
@workflow
def my_wf_example(a: int) -> typing.Tuple[int, int]:
'''example
Workflows can have inputs and return outputs of simple or complex types.
'''
x = add_5(a=a)
# You can use outputs of a previous task as inputs to other nodes.
z = add_5(a=x)
# You can call other workflows from within this workflow
d = simple_wf()
# You can add conditions that can run on primitive types and execute different branches
e = conditional("bool").if_(a == 5).then(add_5(a=d)).else_().then(add_5(a=z))
# Outputs of the workflow have to be outputs returned by prior nodes.
# No outputs and single or multiple outputs are supported
return x, e
def test_workflow_lhs():
assert my_wf_example._lhs == "my_wf_example"
def test_all_node_types():
assert my_wf_example(a=1) == (6, 16)
entity_mapping = OrderedDict()
model_wf = get_serializable(entity_mapping, serialization_settings, my_wf_example)
assert len(model_wf.template.interface.outputs) == 2
assert len(model_wf.template.nodes) == 4
assert model_wf.template.nodes[2].workflow_node is not None
sub_wf = model_wf.sub_workflows[0]
assert len(sub_wf.nodes) == 1
assert sub_wf.nodes[0].id == "n0"
assert sub_wf.nodes[0].task_node.reference_id.name == "tests.flytekit.unit.core.test_workflows.add_5"
def test_wf_docstring():
model_wf = get_serializable(OrderedDict(), serialization_settings, my_wf_example)
assert len(model_wf.template.interface.outputs) == 2
assert model_wf.template.interface.outputs["o0"].description == "outputs"
assert model_wf.template.interface.outputs["o1"].description == "outputs"
assert len(model_wf.template.interface.inputs) == 1
assert model_wf.template.interface.inputs["a"].description == "input a"
@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.")
def test_structured_dataset_wf():
import pandas as pd
from pandas.testing import assert_frame_equal
from flytekit.types.schema import FlyteSchema
superset_cols = kwtypes(Name=str, Age=int, Height=int)
subset_cols = kwtypes(Name=str)
superset_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22], "Height": [160, 178]})
subset_df = pd.DataFrame({"Name": ["Tom", "Joseph"]})
@task
def t1() -> Annotated[pd.DataFrame, superset_cols]:
return superset_df
@task
def t2(df: Annotated[pd.DataFrame, subset_cols]) -> Annotated[pd.DataFrame, subset_cols]:
return df
@task
def t3(df: FlyteSchema[superset_cols]) -> FlyteSchema[superset_cols]:
return df
@task
def t4() -> FlyteSchema[superset_cols]:
return superset_df
@task
def t5(sd: Annotated[StructuredDataset, subset_cols]) -> Annotated[pd.DataFrame, subset_cols]:
return sd.open(pd.DataFrame).all()
@workflow
def sd_wf() -> Annotated[pd.DataFrame, subset_cols]:
# StructuredDataset -> StructuredDataset
df = t1()
return t2(df=df)
@workflow
def sd_to_schema_wf() -> pd.DataFrame:
# StructuredDataset -> schema
df = t1()
return t3(df=df)
@workflow
def schema_to_sd_wf() -> typing.Tuple[pd.DataFrame, pd.DataFrame]:
# schema -> StructuredDataset
df = t4()
return t2(df=df), t5(sd=df) # type: ignore
assert_frame_equal(sd_wf(), subset_df)
assert_frame_equal(sd_to_schema_wf(), superset_df)
assert_frame_equal(schema_to_sd_wf()[0], subset_df)
assert_frame_equal(schema_to_sd_wf()[1], subset_df)
@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.")
def test_compile_wf_at_compile_time():
import pandas as pd
from flytekit.types.schema import FlyteSchema
superset_cols = kwtypes(Name=str, Age=int, Height=int)
superset_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22], "Height": [160, 178]})
ctx = FlyteContextManager.current_context()
with FlyteContextManager.with_context(
ctx.with_execution_state(
ctx.new_execution_state().with_params(mode=context_manager.ExecutionState.Mode.TASK_EXECUTION)
)
):
@task
def t4() -> FlyteSchema[superset_cols]:
return superset_df
@workflow
def wf():
t4()
assert ctx.compilation_state is None
@pytest.mark.parametrize(
"error_message", [
"Fail!",
None,
"",
("big", "boom!")
]
)
@patch("builtins.print")
def test_failure_node_local_execution(mock_print, error_message, exec_prefix):
@task
def clean_up(name: str, err: typing.Optional[FlyteError] = None):
print(f"Deleting cluster {name} due to {err}")
print("This is err:", str(err))
@task
def create_cluster(name: str):
print(f"Creating cluster: {name}")
@task
def delete_cluster(name: str, err: typing.Optional[FlyteError] = None):
print(f"Deleting cluster {name}")
print(err)
@task
def t1(a: int, b: str):
print(f"{a} {b}")
raise ValueError(error_message)
@workflow(on_failure=clean_up)
def wf(name: str = "flyteorg"):
c = create_cluster(name=name)
t = t1(a=1, b="2")
d = delete_cluster(name=name)
c >> t >> d
with pytest.raises(ValueError):
wf()
# Adjusted the error message to match the one in the failure
expected_error_message = str(
FlyteError(message=f"Error encountered while executing '{exec_prefix}tests.flytekit.unit.core.test_workflows.t1':
rror_message}", failed_node_id="fn0")
)
assert mock_print.call_count > 0
mock_print.assert_any_call("Creating cluster: flyteorg")
mock_print.assert_any_call("1 2")
mock_print.assert_any_call(f"Deleting cluster flyteorg due to {expected_error_message}")
mock_print.assert_any_call("This is err:", expected_error_message)
Again, users should keep in mind that even though the body of the function looks like regular Python, it is
actually not. When flytekit scans the workflow function, the objects being passed around between the tasks are not
your typical Python values. So even though you may have a task t1() -> int
, when a = t1()
is called, a
will not be an integer so if you try to range(a)
you’ll get an error.
Please see the :ref:user guide <cookbook:workflow>
for more usage examples.
Parameter | Type |
---|---|
_workflow_function |
Optional[Callable[P, FuncOut]] |
failure_policy |
Optional[WorkflowFailurePolicy] |
interruptible |
bool |
on_failure |
Optional[Union[WorkflowBase, Task]] |
docs |
Optional[Documentation] |
pickle_untyped |
bool |
default_options |
Optional[Options] |
flytekit.core.workflow.ImperativeWorkflow
An imperative workflow is a programmatic analogue to the typical @workflow
function-based workflow and is
better suited to programmatic applications.
Assuming you have some tasks like so
@task
def t1(a: str) -> str:
return a + " world"
@task
def t2():
print("side effect")
You could create a workflow imperatively like so
# Create the workflow with a name. This needs to be unique within the project and takes the place of the function
# name that's used for regular decorated function-based workflows.
wb = Workflow(name="my_workflow")
# Adds a top level input to the workflow. This is like an input to a workflow function.
wb.add_workflow_input("in1", str)
# Call your tasks.
node = wb.add_entity(t1, a=wb.inputs["in1"])
wb.add_entity(t2)
# This is analogous to a return statement
wb.add_workflow_output("from_n0t1", node.outputs["o0"])
This workflow would be identical on the back-end to
nt = typing.NamedTuple("wf_output", [("from_n0t1", str)])
@workflow
def my_workflow(in1: str) -> nt:
x = t1(a=in1)
t2()
return nt(x)
Note that the only reason we need the NamedTuple
is so we can name the output the same thing as in the
imperative example. The imperative paradigm makes the naming of workflow outputs easier, but this isn’t a big
deal in function-workflows because names tend to not be necessary.
class ImperativeWorkflow(
name: str,
failure_policy: Optional[WorkflowFailurePolicy],
interruptible: bool,
)
Parameter | Type |
---|---|
name |
str |
failure_policy |
Optional[WorkflowFailurePolicy] |
interruptible |
bool |
Methods
Method | Description |
---|---|
add_entity() |
Anytime you add an entity, all the inputs to the entity must be bound. |
add_launch_plan() |
|
add_on_failure_handler() |
This is a special function that mimics the add_entity function, but this is only used. |
add_subwf() |
|
add_task() |
|
add_workflow_input() |
Adds an input to the workflow. |
add_workflow_output() |
Add an output with the given name from the given node output. |
compile() |
|
construct_node_metadata() |
|
create_conditional() |
|
execute() |
Called by local_execute. |
local_execute() |
|
local_execution_mode() |
|
ready() |
This function returns whether or not the workflow is in a ready state, which means. |
add_entity()
def add_entity(
entity: Union[PythonTask, _annotated_launch_plan.LaunchPlan, WorkflowBase],
kwargs,
) -> Node
Anytime you add an entity, all the inputs to the entity must be bound.
Parameter | Type |
---|---|
entity |
Union[PythonTask, _annotated_launch_plan.LaunchPlan, WorkflowBase] |
kwargs |
**kwargs |
add_launch_plan()
def add_launch_plan(
launch_plan: _annotated_launch_plan.LaunchPlan,
kwargs,
) -> Node
Parameter | Type |
---|---|
launch_plan |
_annotated_launch_plan.LaunchPlan |
kwargs |
**kwargs |
add_on_failure_handler()
def add_on_failure_handler(
entity,
)
This is a special function that mimics the add_entity function, but this is only used to add the failure node. Failure nodes are special because we don’t want them to be part of the main workflow.
Parameter | Type |
---|---|
entity |
add_subwf()
def add_subwf(
sub_wf: WorkflowBase,
kwargs,
) -> Node
Parameter | Type |
---|---|
sub_wf |
WorkflowBase |
kwargs |
**kwargs |
add_task()
def add_task(
task: PythonTask,
kwargs,
) -> Node
Parameter | Type |
---|---|
task |
PythonTask |
kwargs |
**kwargs |
add_workflow_input()
def add_workflow_input(
input_name: str,
python_type: Type,
) -> Promise
Adds an input to the workflow.
Parameter | Type |
---|---|
input_name |
str |
python_type |
Type |
add_workflow_output()
def add_workflow_output(
output_name: str,
p: Union[Promise, List[Promise], Dict[str, Promise]],
python_type: Optional[Type],
)
Add an output with the given name from the given node output.
Parameter | Type |
---|---|
output_name |
str |
p |
Union[Promise, List[Promise], Dict[str, Promise]] |
python_type |
Optional[Type] |
compile()
def compile(
kwargs,
)
Parameter | Type |
---|---|
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()
create_conditional()
def create_conditional(
name: str,
) -> ConditionalSection
Parameter | Type |
---|---|
name |
str |
execute()
def execute(
kwargs,
)
Called by local_execute. This function is how local execution for imperative workflows runs. Because when an entity is added using the add_entity function, all inputs to that entity should’ve been already declared, we can just iterate through the nodes in order and we shouldn’t run into any dependency issues. That is, we force the user to declare entities already in a topological sort. To keep track of outputs, we create a map to start things off, filled in only with the workflow inputs (if any). As things are run, their outputs are stored in this map. After all nodes are run, we fill in workflow level outputs the same way as any other previous node.
Parameter | Type |
---|---|
kwargs |
**kwargs |
local_execute()
def local_execute(
ctx: FlyteContext,
kwargs,
) -> Union[Tuple[Promise], Promise, VoidPromise, None]
Parameter | Type |
---|---|
ctx |
FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
ready()
def ready()
This function returns whether or not the workflow is in a ready state, which means
- Has at least one node
- All workflow inputs are bound
These conditions assume that all nodes and workflow i/o changes were done with the functions above, which do additional checking.
Properties
Property | Type | Description |
---|---|---|
compilation_state |
Compilation is done a bit at a time, one task or other entity call at a time. This is why this workflow class has to keep track of its own compilation state. |
|
default_options |
||
docs |
||
failure_node |
||
inputs |
This holds the input promises to the workflow. The nodes in these Promise objects should always point to the global start node. |
|
interface |
||
name |
||
nodes |
||
on_failure |
||
output_bindings |
||
python_interface |
||
short_name |
||
workflow_metadata |
||
workflow_metadata_defaults |
flytekit.core.workflow.PythonFunctionWorkflow
Please read :std:ref:flyte:divedeep-workflows
first for a high-level understanding of what workflows are in Flyte.
This Python object represents a workflow defined by a function and decorated with the
@workflow <flytekit.workflow>
decorator. Please see notes on that object for additional information.
class PythonFunctionWorkflow(
workflow_function: Callable,
metadata: WorkflowMetadata,
default_metadata: WorkflowMetadataDefaults,
docstring: Optional[Docstring],
on_failure: Optional[Union[WorkflowBase, Task]],
docs: Optional[Documentation],
pickle_untyped: bool,
default_options: Optional[Options],
)
Parameter | Type |
---|---|
workflow_function |
Callable |
metadata |
WorkflowMetadata |
default_metadata |
WorkflowMetadataDefaults |
docstring |
Optional[Docstring] |
on_failure |
Optional[Union[WorkflowBase, Task]] |
docs |
Optional[Documentation] |
pickle_untyped |
bool |
default_options |
Optional[Options] |
Methods
Method | Description |
---|---|
add() |
|
compile() |
Supply static Python native values in the kwargs if you want them to be used in the compilation. |
construct_node_metadata() |
|
execute() |
This function is here only to try to streamline the pattern between workflows and tasks. |
find_lhs() |
|
get_all_tasks() |
Future proof method. |
load_task() |
Given the set of identifier keys, should return one Python Task or raise an error if not found. |
loader_args() |
This is responsible for turning an instance of a task into args that the load_task function can reconstitute. |
local_execute() |
|
local_execution_mode() |
|
task_name() |
Overridable function that can optionally return a custom name for a given task. |
add()
def add(
t: flytekit.core.python_auto_container.PythonAutoContainerTask,
)
Parameter | Type |
---|---|
t |
flytekit.core.python_auto_container.PythonAutoContainerTask |
compile()
def compile(
kwargs,
)
Supply static Python native values in the kwargs if you want them to be used in the compilation. This mimics a ‘closure’ in the traditional sense of the word.
Parameter | Type |
---|---|
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()
execute()
def execute(
kwargs,
)
This function is here only to try to streamline the pattern between workflows and tasks. Since tasks call execute from dispatch_execute which is in local_execute, workflows should also call an execute inside local_execute. This makes mocking cleaner.
Parameter | Type |
---|---|
kwargs |
**kwargs |
find_lhs()
def find_lhs()
get_all_tasks()
def get_all_tasks()
Future proof method. Just making it easy to access all tasks (Not required today as we auto register them)
load_task()
def load_task(
loader_args: typing.List[str],
) -> flytekit.core.python_auto_container.PythonAutoContainerTask
Given the set of identifier keys, should return one Python Task or raise an error if not found
Parameter | Type |
---|---|
loader_args |
typing.List[str] |
loader_args()
def loader_args(
settings: flytekit.configuration.SerializationSettings,
t: flytekit.core.python_auto_container.PythonAutoContainerTask,
) -> typing.List[str]
This is responsible for turning an instance of a task into args that the load_task function can reconstitute.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
t |
flytekit.core.python_auto_container.PythonAutoContainerTask |
local_execute()
def local_execute(
ctx: FlyteContext,
kwargs,
) -> Union[Tuple[Promise], Promise, VoidPromise, None]
Parameter | Type |
---|---|
ctx |
FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
task_name()
def task_name(
t: PythonAutoContainerTask,
) -> str
Overridable function that can optionally return a custom name for a given task
Parameter | Type |
---|---|
t |
PythonAutoContainerTask |
Properties
Property | Type | Description |
---|---|---|
default_options |
||
docs |
||
failure_node |
||
function |
||
instantiated_in |
||
interface |
||
lhs |
||
location |
||
name |
||
nodes |
||
on_failure |
||
output_bindings |
||
python_interface |
||
short_name |
||
workflow_metadata |
||
workflow_metadata_defaults |
flytekit.core.workflow.ReferenceWorkflow
A reference workflow is a pointer to a workflow that already exists on your Flyte installation. This object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface. If at registration time the interface provided causes an issue with compilation, an error will be returned.
class ReferenceWorkflow(
project: str,
domain: str,
name: str,
version: str,
inputs: Dict[str, Type],
outputs: Dict[str, Type],
)
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
inputs |
Dict[str, Type] |
outputs |
Dict[str, Type] |
Methods
Method | Description |
---|---|
add() |
|
compile() |
|
construct_node_metadata() |
|
execute() |
|
find_lhs() |
|
get_all_tasks() |
Future proof method. |
load_task() |
Given the set of identifier keys, should return one Python Task or raise an error if not found. |
loader_args() |
This is responsible for turning an instance of a task into args that the load_task function can reconstitute. |
local_execute() |
Please see the local_execute comments in the main task. |
local_execution_mode() |
|
task_name() |
Overridable function that can optionally return a custom name for a given task. |
unwrap_literal_map_and_execute() |
Please see the implementation of the dispatch_execute function in the real task. |
add()
def add(
t: flytekit.core.python_auto_container.PythonAutoContainerTask,
)
Parameter | Type |
---|---|
t |
flytekit.core.python_auto_container.PythonAutoContainerTask |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
)
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()
execute()
def execute(
kwargs,
) -> typing.Any
Parameter | Type |
---|---|
kwargs |
**kwargs |
find_lhs()
def find_lhs()
get_all_tasks()
def get_all_tasks()
Future proof method. Just making it easy to access all tasks (Not required today as we auto register them)
load_task()
def load_task(
loader_args: typing.List[str],
) -> flytekit.core.python_auto_container.PythonAutoContainerTask
Given the set of identifier keys, should return one Python Task or raise an error if not found
Parameter | Type |
---|---|
loader_args |
typing.List[str] |
loader_args()
def loader_args(
settings: flytekit.configuration.SerializationSettings,
t: flytekit.core.python_auto_container.PythonAutoContainerTask,
) -> typing.List[str]
This is responsible for turning an instance of a task into args that the load_task function can reconstitute.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
t |
flytekit.core.python_auto_container.PythonAutoContainerTask |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, NoneType]
Please see the local_execute comments in the main task.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
task_name()
def task_name(
t: PythonAutoContainerTask,
) -> str
Overridable function that can optionally return a custom name for a given task
Parameter | Type |
---|---|
t |
PythonAutoContainerTask |
unwrap_literal_map_and_execute()
def unwrap_literal_map_and_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
) -> flytekit.models.literals.LiteralMap
Please see the implementation of the dispatch_execute function in the real task.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property | Type | Description |
---|---|---|
default_options |
||
docs |
||
failure_node |
||
function |
||
id |
||
instantiated_in |
||
interface |
||
lhs |
||
location |
||
name |
||
nodes |
||
on_failure |
||
output_bindings |
||
python_interface |
||
reference |
||
short_name |
||
workflow_metadata |
||
workflow_metadata_defaults |
flytekit.core.workflow.WorkflowBase
class WorkflowBase(
name: str,
workflow_metadata: WorkflowMetadata,
workflow_metadata_defaults: WorkflowMetadataDefaults,
python_interface: Interface,
on_failure: Optional[Union[WorkflowBase, Task]],
docs: Optional[Documentation],
default_options: Optional[Options],
kwargs,
)
Parameter | Type |
---|---|
name |
str |
workflow_metadata |
WorkflowMetadata |
workflow_metadata_defaults |
WorkflowMetadataDefaults |
python_interface |
Interface |
on_failure |
Optional[Union[WorkflowBase, Task]] |
docs |
Optional[Documentation] |
default_options |
Optional[Options] |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
compile() |
|
construct_node_metadata() |
|
execute() |
|
local_execute() |
|
local_execution_mode() |
compile()
def compile(
kwargs,
)
Parameter | Type |
---|---|
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()
execute()
def execute(
kwargs,
)
Parameter | Type |
---|---|
kwargs |
**kwargs |
local_execute()
def local_execute(
ctx: FlyteContext,
kwargs,
) -> Union[Tuple[Promise], Promise, VoidPromise, None]
Parameter | Type |
---|---|
ctx |
FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
Properties
Property | Type | Description |
---|---|---|
default_options |
||
docs |
||
failure_node |
||
interface |
||
name |
||
nodes |
||
on_failure |
||
output_bindings |
||
python_interface |
||
short_name |
||
workflow_metadata |
||
workflow_metadata_defaults |
flytekit.core.workflow.WorkflowMetadata
class WorkflowMetadata(
on_failure: WorkflowFailurePolicy,
)
Parameter | Type |
---|---|
on_failure |
WorkflowFailurePolicy |
Methods
Method | Description |
---|---|
to_flyte_model() |
to_flyte_model()
def to_flyte_model()
flytekit.core.workflow.WorkflowMetadataDefaults
This class is similarly named to the one above. Please see the IDL for more information but essentially, this WorkflowMetadataDefaults class represents the defaults that are handed down to a workflow’s tasks, whereas WorkflowMetadata represents metadata about the workflow itself.
class WorkflowMetadataDefaults(
interruptible: bool,
)
Parameter | Type |
---|---|
interruptible |
bool |
Methods
Method | Description |
---|---|
to_flyte_model() |
to_flyte_model()
def to_flyte_model()