Conditionals
Union elevates conditions to a first-class construct named conditional
, providing a powerful mechanism for selectively
executing branches in a workflow. Conditions leverage static or dynamic data generated by tasks or
received as workflow inputs. While conditions are highly performant in their evaluation,
it’s important to note that they are restricted to specific binary and logical operators
and are applicable only to primitive values.
To begin, import the necessary libraries.
import random
import union
from flytekit import conditional
from flytekit.core.task import Echo
Simple branch
In this example, we introduce two tasks, calculate_circle_circumference
and
calculate_circle_area
. The workflow dynamically chooses between these tasks based on whether the input
falls within the fraction range (0-1) or not.
@union.task
def calculate_circle_circumference(radius: float) -> float:
return 2 * 3.14 * radius # Task to calculate the circumference of a circle
@union.task
def calculate_circle_area(radius: float) -> float:
return 3.14 * radius * radius # Task to calculate the area of a circle
@union.workflow
def shape_properties(radius: float) -> float:
return (
conditional("shape_properties")
.if_((radius >= 0.1) & (radius < 1.0))
.then(calculate_circle_circumference(radius=radius))
.else_()
.then(calculate_circle_area(radius=radius))
)
if __name__ == "__main__":
radius_small = 0.5
print(f"Circumference of circle (radius={radius_small}): {shape_properties(radius=radius_small)}")
radius_large = 3.0
print(f"Area of circle (radius={radius_large}): {shape_properties(radius=radius_large)}")
Multiple branches
We establish an if
condition with multiple branches, which will result in a failure if none of the conditions is met.
It’s important to note that any conditional
statement in Flyte is expected to be complete,
meaning that all possible branches must be accounted for.
@union.workflow
def shape_properties_with_multiple_branches(radius: float) -> float:
return (
conditional("shape_properties_with_multiple_branches")
.if_((radius >= 0.1) & (radius < 1.0))
.then(calculate_circle_circumference(radius=radius))
.elif_((radius >= 1.0) & (radius <= 10.0))
.then(calculate_circle_area(radius=radius))
.else_()
.fail("The input must be within the range of 0 to 10.")
)
Take note of the usage of bitwise operators (&
). Due to Python’s PEP-335,
the logical and
, or
and not
operators cannot be overloaded.
Flytekit employs bitwise &
and |
as equivalents for logical and
and or
operators,
a convention also observed in other libraries.
Consuming the output of a conditional
Here, we write a task that consumes the output returned by a conditional
.
@union.workflow
def shape_properties_accept_conditional_output(radius: float) -> float:
result = (
conditional("shape_properties_accept_conditional_output")
.if_((radius >= 0.1) & (radius < 1.0))
.then(calculate_circle_circumference(radius=radius))
.elif_((radius >= 1.0) & (radius <= 10.0))
.then(calculate_circle_area(radius=radius))
.else_()
.fail("The input must exist between 0 and 10.")
)
return calculate_circle_area(radius=result)
if __name__ == "__main__":
radius_small = 0.5
print(
f"Circumference of circle (radius={radius_small}) x Area of circle (radius={calculate_circle_circumference(radius=radius_small)}): {shape_properties_accept_conditional_output(radius=radius_small)}"
)
Using the output of a previous task in a conditional
You can check if a boolean returned from the previous task is True
,
but unary operations are not supported directly. Instead, use the is_true
,
is_false
and is_none
methods on the result.
@union..task
def coin_toss(seed: int) -> bool:
"""
Mimic a condition to verify the successful execution of an operation
"""
r = random.Random(seed)
if r.random() < 0.5:
return True
return False
@union..task
def failed() -> int:
"""
Mimic a task that handles failure
"""
return -1
@union..task
def success() -> int:
"""
Mimic a task that handles success
"""
return 0
@union..workflow
def boolean_wf(seed: int = 5) -> int:
result = coin_toss(seed=seed)
return conditional("coin_toss").if_(result.is_true()).then(success()).else_().then(failed())
[!NOTE]
How do output values acquire these methods? In a workflow, direct access to outputs is not permitted. Inputs and outputs are automatically encapsulated in a special object known as
flytekit.extend.Promise
.
Using boolean workflow inputs in a conditional
You can directly pass a boolean to a workflow.
@union.workflow
def boolean_input_wf(boolean_input: bool) -> int:
return conditional("boolean_input_conditional").if_(boolean_input.is_true()).then(success()).else_().then(failed())
Observe that the passed boolean possesses a method called is_true
.
This boolean resides within the workflow context and is encapsulated in a specialized Flytekit object.
This special object enables it to exhibit additional behavior.
You can run the workflows locally as follows:
if __name__ == "__main__":
print("Running boolean_wf a few times...")
for index in range(0, 5):
print(f"The output generated by boolean_wf = {boolean_wf(seed=index)}")
print(
f"Boolean input: {True if index < 2 else False}; workflow output: {boolean_input_wf(boolean_input=True if index < 2 else False)}"
)
Nested conditionals
You can nest conditional sections arbitrarily inside other conditional sections.
However, these nested sections can only be in the then
part of a conditional
block.
@union.workflow
def nested_conditions(radius: float) -> float:
return (
conditional("nested_conditions")
.if_((radius >= 0.1) & (radius < 1.0))
.then(
conditional("inner_nested_conditions")
.if_(radius < 0.5)
.then(calculate_circle_circumference(radius=radius))
.elif_((radius >= 0.5) & (radius < 0.9))
.then(calculate_circle_area(radius=radius))
.else_()
.fail("0.9 is an outlier.")
)
.elif_((radius >= 1.0) & (radius <= 10.0))
.then(calculate_circle_area(radius=radius))
.else_()
.fail("The input must be within the range of 0 to 10.")
)
if __name__ == "__main__":
print(f"nested_conditions(0.4): {nested_conditions(radius=0.4)}")
Using the output of a task in a conditional
Let’s write a fun workflow that triggers the calculate_circle_circumference
task in the event of a “heads” outcome,
and alternatively, runs the calculate_circle_area
task in the event of a “tail” outcome.
@union.workflow
def consume_task_output(radius: float, seed: int = 5) -> float:
is_heads = coin_toss(seed=seed)
return (
conditional("double_or_square")
.if_(is_heads.is_true())
.then(calculate_circle_circumference(radius=radius))
.else_()
.then(calculate_circle_area(radius=radius))
)
You can run the workflow locally as follows:
if __name__ == "__main__":
default_seed_output = consume_task_output(radius=0.4)
print(
f"Executing consume_task_output(0.4) with default seed=5. Expected output: calculate_circle_area => {default_seed_output}"
)
custom_seed_output = consume_task_output(radius=0.4, seed=7)
print(
f"Executing consume_task_output(0.4, seed=7). Expected output: calculate_circle_circumference => {custom_seed_output}"
)
Running a noop task in a conditional
In some cases, you may want to skip the execution of a conditional workflow if a certain condition is not met.
You can achieve this by using the echo
task, which simply returns the input value.
To enable the echo plugin in the backend, add the plugin to Flyte’s configuration file.
task-plugins:
enabled-plugins:
- echo
echo = Echo(name="echo", inputs={"radius": float})
@union.workflow
def noop_in_conditional(radius: float, seed: int = 5) -> float:
is_heads = coin_toss(seed=seed)
return (
conditional("noop_in_conditional")
.if_(is_heads.is_true())
.then(calculate_circle_circumference(radius=radius))
.else_()
.then(echo(radius=radius))
)
Run the example on the Flyte cluster
To run the provided workflows on the Flyte cluster, use the following commands:
$ union run --remote \
https://raw.githubusercontent.com/flyteorg/flytesnacks/656e63d1c8dded3e9e7161c7af6425e9fcd43f56/examples/advanced_composition/advanced_composition/conditional.py \
shape_properties --radius 3.0
$ union run --remote \
https://raw.githubusercontent.com/flyteorg/flytesnacks/656e63d1c8dded3e9e7161c7af6425e9fcd43f56/examples/advanced_composition/advanced_composition/conditional.py \
shape_properties_with_multiple_branches --radius 11.0
$ union run --remote \
https://raw.githubusercontent.com/flyteorg/flytesnacks/656e63d1c8dded3e9e7161c7af6425e9fcd43f56/examples/advanced_composition/advanced_composition/conditional.py \
shape_properties_accept_conditional_output --radius 0.5
$ union run --remote \
https://raw.githubusercontent.com/flyteorg/flytesnacks/656e63d1c8dded3e9e7161c7af6425e9fcd43f56/examples/advanced_composition/advanced_composition/conditional.py \
boolean_wf
$ union run --remote \
https://raw.githubusercontent.com/flyteorg/flytesnacks/656e63d1c8dded3e9e7161c7af6425e9fcd43f56/examples/advanced_composition/advanced_composition/conditional.py \
boolean_input_wf --boolean_input
$ union run --remote \
https://raw.githubusercontent.com/flyteorg/flytesnacks/656e63d1c8dded3e9e7161c7af6425e9fcd43f56/examples/advanced_composition/advanced_composition/conditional.py \
nested_conditions --radius 0.7
$ union run --remote \
https://raw.githubusercontent.com/flyteorg/flytesnacks/656e63d1c8dded3e9e7161c7af6425e9fcd43f56/examples/advanced_composition/advanced_composition/conditional.py \
consume_task_output --radius 0.4 --seed 7
$ union run --remote \
https://raw.githubusercontent.com/flyteorg/flytesnacks/656e63d1c8dded3e9e7161c7af6425e9fcd43f56/examples/advanced_composition/advanced_composition/conditional.py \
noop_in_conditional --radius 0.4 --seed 5