0.1.dev2184+g1e0cbe7.d20250401
flytekit.core.condition
Directory
Classes
Methods
Methods
conditional()
def conditional(
name: str,
) -> ConditionalSection
Use a conditional section to control the flow of a workflow. Conditional sections can only be used inside a workflow
context. Outside of a workflow they will raise an Assertion.
The conditional
method returns a new conditional section, that allows to create a - ternary operator like
if-else clauses. The reason why it is called ternary-like is because, it returns the output of the branch result.
So in-effect it is a functional style condition.
Example of a condition usage. Note the nesting and the assignment to a LHS variable
v = (
conditional("fractions")
.if_((my_input > 0.1) & (my_input < 1.0))
.then(
conditional("inner_fractions")
.if_(my_input < 0.5)
.then(double(n=my_input))
.elif_((my_input > 0.5) & (my_input < 0.7))
.then(square(n=my_input))
.else_()
.fail("Only <0.7 allowed")
)
.elif_((my_input > 1.0) & (my_input < 10.0))
.then(square(n=my_input))
.else_()
.then(double(n=my_input))
)
create_branch_node_promise_var()
def create_branch_node_promise_var(
node_id: str,
var: str,
) -> n: The generated unique id of the variable.
Generates a globally (wf-level) unique id for a variable.
When building bindings for the branch node, the inputs to the conditions (e.g. (x==5)) need to have variable names
(e.g. x). Because it’s currently infeasible to get the name (e.g. x), we resolve to using the referenced node’s
output name (e.g. o0, my_out,… etc.). In order to avoid naming collisions (in cases when, for example, the
conditions reference two outputs of two different nodes named the same), we build a variable name composed of the
referenced node name + ‘.’ + the referenced output name. Ideally we use something like
(https://github.com/pwwang/python-varname) to retrieve the assigned variable name (e.g. x). However, because of
https://github.com/pwwang/python-varname/issues/28, this is not currently supported for all AST nodes types.
Parameter |
Type |
node_id |
str |
var |
str |
merge_promises()
def merge_promises(
args: `*args`,
) -> typing.List[Promise]
Parameter |
Type |
args |
*args |
to_branch_node()
def to_branch_node(
name: str,
cs: ConditionalSection,
) -> Tuple[BranchNode, typing.List[Promise]]
Parameter |
Type |
name |
str |
cs |
ConditionalSection |
to_case_block()
def to_case_block(
c: Case,
) -> Tuple[Union[_core_wf.IfBlock], typing.List[Promise]]
to_ifelse_block()
def to_ifelse_block(
node_id: str,
cs: ConditionalSection,
) -> Tuple[_core_wf.IfElseBlock, typing.List[Binding]]
Parameter |
Type |
node_id |
str |
cs |
ConditionalSection |
def transform_to_boolexpr(
expr: Union[ComparisonExpression, ConjunctionExpression],
) -> Tuple[_core_cond.BooleanExpression, typing.List[Promise]]
Parameter |
Type |
expr |
Union[ComparisonExpression, ConjunctionExpression] |
def transform_to_comp_expr(
expr: ComparisonExpression,
) -> Tuple[_core_cond.ComparisonExpression, typing.List[Promise]]
Parameter |
Type |
expr |
ComparisonExpression |
def transform_to_conj_expr(
expr: ConjunctionExpression,
) -> Tuple[_core_cond.ConjunctionExpression, typing.List[Promise]]
Parameter |
Type |
expr |
ConjunctionExpression |
def transform_to_operand(
v: Union[Promise, Literal],
) -> Tuple[_core_cond.Operand, Optional[Promise]]
Parameter |
Type |
v |
Union[Promise, Literal] |
flytekit.core.condition.BranchNode
class BranchNode(
name: str,
ifelse_block: _core_wf.IfElseBlock,
)
Parameter |
Type |
name |
str |
ifelse_block |
_core_wf.IfElseBlock |
Properties
Property |
Type |
Description |
name |
|
|
flytekit.core.condition.Case
class Case(
cs: ConditionalSection,
expr: Optional[Union[ComparisonExpression, ConjunctionExpression]],
stmt: str,
)
Parameter |
Type |
cs |
ConditionalSection |
expr |
Optional[Union[ComparisonExpression, ConjunctionExpression]] |
stmt |
str |
Methods
fail()
def fail(
err: str,
) -> Promise
then()
def then(
p: Union[Promise, Tuple[Promise]],
) -> Optional[Union[Condition, Promise, Tuple[Promise], VoidPromise]]
Parameter |
Type |
p |
Union[Promise, Tuple[Promise]] |
Properties
Property |
Type |
Description |
err |
|
|
expr |
|
|
output_node |
|
|
output_promise |
|
|
flytekit.core.condition.Condition
class Condition(
cs: ConditionalSection,
)
Parameter |
Type |
cs |
ConditionalSection |
Methods
elif_()
def elif_(
expr: Union[ComparisonExpression, ConjunctionExpression],
) -> Case
Parameter |
Type |
expr |
Union[ComparisonExpression, ConjunctionExpression] |
else_()
flytekit.core.condition.ConditionalSection
ConditionalSection is used to denote a condition within a Workflow. This default conditional section only works
for Compilation mode. It is advised to derive the class and re-implement the start_branch
and end_branch
methods
to override the compilation behavior
Conditions can only be used within a workflow context.
Usage:
v = conditional("fractions").if_((my_input > 0.1) & (my_input < 1.0)).then(...)...
class ConditionalSection(
name: str,
)
Methods
Method |
Description |
compute_output_vars() |
Computes and returns the minimum set of outputs for this conditional block, based on all the cases that have. |
end_branch() |
This should be invoked after every branch has been visited. |
if_() |
|
start_branch() |
At the start of an execution of every branch this method should be called. |
compute_output_vars()
def compute_output_vars()
Computes and returns the minimum set of outputs for this conditional block, based on all the cases that have
been registered
end_branch()
This should be invoked after every branch has been visited.
In case this is not local workflow execution then, we should check if this is the last case.
If so then return the promise, else return the condition
if_()
def if_(
expr: Union[ComparisonExpression, ConjunctionExpression],
) -> Case
Parameter |
Type |
expr |
Union[ComparisonExpression, ConjunctionExpression] |
start_branch()
def start_branch(
c: Case,
last_case: bool,
) -> Case
At the start of an execution of every branch this method should be called.
Parameter |
Type |
c |
Case |
last_case |
bool |
Properties
Property |
Type |
Description |
cases |
|
|
name |
|
|
flytekit.core.condition.LocalExecutedConditionalSection
ConditionalSection is used to denote a condition within a Workflow. This default conditional section only works
for Compilation mode. It is advised to derive the class and re-implement the start_branch
and end_branch
methods
to override the compilation behavior
Conditions can only be used within a workflow context.
Usage:
v = conditional("fractions").if_((my_input > 0.1) & (my_input < 1.0)).then(...)...
class LocalExecutedConditionalSection(
name: str,
)
Methods
Method |
Description |
compute_output_vars() |
Computes and returns the minimum set of outputs for this conditional block, based on all the cases that have. |
end_branch() |
This should be invoked after every branch has been visited. |
if_() |
|
start_branch() |
At the start of an execution of every branch this method should be called. |
compute_output_vars()
def compute_output_vars()
Computes and returns the minimum set of outputs for this conditional block, based on all the cases that have
been registered
end_branch()
This should be invoked after every branch has been visited
In case of Local workflow execution, we should first mark the branch as complete, then
Then we first check for if this is the last case,
In case this is the last case, we return the output from the selected case - A case should always
be selected (see start_branch)
If this is not the last case, we should return the condition so that further chaining can be done
if_()
def if_(
expr: Union[ComparisonExpression, ConjunctionExpression],
) -> Case
Parameter |
Type |
expr |
Union[ComparisonExpression, ConjunctionExpression] |
start_branch()
def start_branch(
c: Case,
last_case: bool,
) -> Case
At the start of an execution of every branch this method should be called.
Parameter |
Type |
c |
Case |
last_case |
bool |
Properties
Property |
Type |
Description |
cases |
|
|
name |
|
|
flytekit.core.condition.SkippedConditionalSection
This ConditionalSection is used for nested conditionals, when the branch has been evaluated to false.
This ensures that the branch is not evaluated and thus the local tasks are not executed.
class SkippedConditionalSection(
name: str,
)
Methods
Method |
Description |
compute_output_vars() |
Computes and returns the minimum set of outputs for this conditional block, based on all the cases that have. |
end_branch() |
This should be invoked after every branch has been visited. |
if_() |
|
start_branch() |
At the start of an execution of every branch this method should be called. |
compute_output_vars()
def compute_output_vars()
Computes and returns the minimum set of outputs for this conditional block, based on all the cases that have
been registered
end_branch()
This should be invoked after every branch has been visited
if_()
def if_(
expr: Union[ComparisonExpression, ConjunctionExpression],
) -> Case
Parameter |
Type |
expr |
Union[ComparisonExpression, ConjunctionExpression] |
start_branch()
def start_branch(
c: Case,
last_case: bool,
) -> Case
At the start of an execution of every branch this method should be called.
Parameter |
Type |
c |
Case |
last_case |
bool |
Properties
Property |
Type |
Description |
cases |
|
|
name |
|
|