flytekit.core.promise
Directory
Classes
Class | Description |
---|---|
ComparisonExpression |
ComparisonExpression refers to an expression of the form (lhs operator rhs), where lhs and rhs are operands. |
ConjunctionExpression |
A Conjunction Expression is an expression of the form either (A and B) or (A or B). |
HasFlyteInterface |
Base class for protocol classes. |
LocallyExecutable |
Base class for protocol classes. |
NodeOutput |
|
Promise |
This object is a wrapper and exists for three main reasons. |
SupportsNodeCreation |
Base class for protocol classes. |
VoidPromise |
This object is returned for tasks that do not return any outputs (declared interface is empty). |
Methods
Method | Description |
---|---|
async_flyte_entity_call_handler() |
This is a limited async version of the main call handler. |
binding_data_from_python_std() |
|
binding_from_python_std() |
|
create_and_link_node() |
This method is used to generate a node with bindings within a flytekit workflow. |
create_and_link_node_from_remote() |
This method is used to generate a node with bindings especially when using remote entities, like FlyteWorkflow,. |
create_native_named_tuple() |
Creates and returns a Named tuple with all variables that match the expected named outputs. |
create_task_output() |
|
extract_obj_name() |
Generates a shortened name, without the module information. |
flyte_entity_call_handler() |
This function is the call handler for tasks, workflows, and launch plans (which redirects to the underlying. |
get_primitive_val() |
|
resolve_attr_path_in_dict() |
|
resolve_attr_path_in_pb_struct() |
Resolves the protobuf struct (e. |
resolve_attr_path_in_promise() |
resolve_attr_path_in_promise resolves the attribute path in a promise and returns a new promise with the resolved value. |
resolve_attr_path_recursively() |
This function resolves the attribute path in a nested structure recursively. |
to_binding() |
|
translate_inputs_to_literals() |
The point of this function is to extract out Literals from a collection of either Python native values (which would. |
translate_inputs_to_native() |
Variables
Property | Type | Description |
---|---|---|
T |
TypeVar |
Methods
async_flyte_entity_call_handler()
def async_flyte_entity_call_handler(
entity: SupportsNodeCreation,
args,
kwargs,
) -> Union[Tuple[Promise], Promise, VoidPromise, Tuple, None]
This is a limited async version of the main call handler.
Parameter | Type |
---|---|
entity |
SupportsNodeCreation |
args |
*args |
kwargs |
**kwargs |
binding_data_from_python_std()
def binding_data_from_python_std(
ctx: _flyte_context.FlyteContext,
expected_literal_type: _type_models.LiteralType,
t_value: Any,
t_value_type: typing.Type[T],
nodes: List[Node],
) -> _literals_models.BindingData
Parameter | Type |
---|---|
ctx |
_flyte_context.FlyteContext |
expected_literal_type |
_type_models.LiteralType |
t_value |
Any |
t_value_type |
typing.Type[T] |
nodes |
List[Node] |
binding_from_python_std()
def binding_from_python_std(
ctx: _flyte_context.FlyteContext,
var_name: str,
expected_literal_type: _type_models.LiteralType,
t_value: Any,
t_value_type: type,
) -> Tuple[_literals_models.Binding, List[Node]]
Parameter | Type |
---|---|
ctx |
_flyte_context.FlyteContext |
var_name |
str |
expected_literal_type |
_type_models.LiteralType |
t_value |
Any |
t_value_type |
type |
create_and_link_node()
def create_and_link_node(
ctx: FlyteContext,
entity: SupportsNodeCreation,
overridden_interface: Optional[Interface],
add_node_to_compilation_state: bool,
node_id: str,
kwargs,
) -> n: Optional[Union[Tuple[Promise], Promise, VoidPromise]]
This method is used to generate a node with bindings within a flytekit workflow. this is useful to traverse the workflow using regular python interpreter and generate nodes and promises whenever an execution is encountered
Parameter | Type |
---|---|
ctx |
FlyteContext |
entity |
SupportsNodeCreation |
overridden_interface |
Optional[Interface] |
add_node_to_compilation_state |
bool |
node_id |
str |
kwargs |
**kwargs |
create_and_link_node_from_remote()
def create_and_link_node_from_remote(
ctx: FlyteContext,
entity: HasFlyteInterface,
overridden_interface: Optional[_interface_models.TypedInterface],
add_node_to_compilation_state: bool,
node_id: str,
_inputs_not_allowed: Optional[Set[str]],
_ignorable_inputs: Optional[Set[str]],
kwargs,
) -> n: Optional[Union[Tuple[Promise], Promise, VoidPromise]]
This method is used to generate a node with bindings especially when using remote entities, like FlyteWorkflow, FlyteTask and FlyteLaunchplan.
This method is kept separate from the similar named method create_and_link_node
as remote entities have to be
handled differently. The major difference arises from the fact that the remote entities do not have a python
interface, so all comparisons need to happen using the Literals.
Parameter | Type |
---|---|
ctx |
FlyteContext |
entity |
HasFlyteInterface |
overridden_interface |
Optional[_interface_models.TypedInterface] |
add_node_to_compilation_state |
bool |
node_id |
str |
_inputs_not_allowed |
Optional[Set[str]] |
_ignorable_inputs |
Optional[Set[str]] |
kwargs |
**kwargs |
create_native_named_tuple()
def create_native_named_tuple(
ctx: FlyteContext,
promises: Union[Tuple[Promise], Promise, VoidPromise, None],
entity_interface: Interface,
) -> Optional[Tuple]
Creates and returns a Named tuple with all variables that match the expected named outputs. this makes it possible to run things locally and expect a more native behavior, i.e. address elements of a named tuple by name.
Parameter | Type |
---|---|
ctx |
FlyteContext |
promises |
Union[Tuple[Promise], Promise, VoidPromise, None] |
entity_interface |
Interface |
create_task_output()
def create_task_output(
promises: Optional[Union[List[Promise], Promise]],
entity_interface: Optional[Interface],
) -> Optional[Union[Tuple[Promise], Promise]]
Parameter | Type |
---|---|
promises |
Optional[Union[List[Promise], Promise]] |
entity_interface |
Optional[Interface] |
extract_obj_name()
def extract_obj_name(
name: str,
) -> str
Generates a shortened name, without the module information. Useful for node-names etc. Only extracts the final
object information often separated by .
in the python fully qualified notation
Parameter | Type |
---|---|
name |
str |
flyte_entity_call_handler()
def flyte_entity_call_handler(
entity: SupportsNodeCreation,
args,
kwargs,
) -> Union[Tuple[Promise], Promise, VoidPromise, Tuple, None]
This function is the call handler for tasks, workflows, and launch plans (which redirects to the underlying workflow). The logic is the same for all three, but we did not want to create base class, hence this separate method. When one of these entities is () aka called, there are three things we may do: #. Compilation Mode - this happens when the function is called as part of a workflow (potentially dynamic task?). Instead of running the user function, produce promise objects and create a node. #. Workflow Execution Mode - when a workflow is being run locally. Even though workflows are functions and everything should be able to be passed through naturally, we’ll want to wrap output values of the function into objects, so that potential .with_cpu or other ancillary functions can be attached to do nothing. Subsequent tasks will have to know how to unwrap these. If by chance a non-Flyte task uses a task output as an input, things probably will fail pretty obviously. #. Start a local execution - This means that we’re not already in a local workflow execution, which means that we should expect inputs to be native Python values and that we should return Python native values.
Parameter | Type |
---|---|
entity |
SupportsNodeCreation |
args |
*args |
kwargs |
**kwargs |
get_primitive_val()
def get_primitive_val(
prim: Primitive,
) -> Any
Parameter | Type |
---|---|
prim |
Primitive |
resolve_attr_path_in_dict()
def resolve_attr_path_in_dict(
d: dict,
attr_path: List[Union[str, int]],
) -> Any
Parameter | Type |
---|---|
d |
dict |
attr_path |
List[Union[str, int]] |
resolve_attr_path_in_pb_struct()
def resolve_attr_path_in_pb_struct(
st: _struct.Struct,
attr_path: List[Union[str, int]],
) -> Union[_struct.Struct, _struct.ListValue]
Resolves the protobuf struct (e.g. dataclass) with attribute path.
Note that the return type can be google.protobuf.struct_pb2.Struct or google.protobuf.struct_pb2.ListValue.
Parameter | Type |
---|---|
st |
_struct.Struct |
attr_path |
List[Union[str, int]] |
resolve_attr_path_in_promise()
def resolve_attr_path_in_promise(
p: Promise,
) -> Promise
resolve_attr_path_in_promise resolves the attribute path in a promise and returns a new promise with the resolved value This is for local execution only. The remote execution will be resolved in flytepropeller.
Parameter | Type |
---|---|
p |
Promise |
resolve_attr_path_recursively()
def resolve_attr_path_recursively(
v: Any,
) -> Any
This function resolves the attribute path in a nested structure recursively.
Parameter | Type |
---|---|
v |
Any |
to_binding()
def to_binding(
p: Promise,
) -> _literals_models.Binding
Parameter | Type |
---|---|
p |
Promise |
translate_inputs_to_literals()
def translate_inputs_to_literals(
ctx: FlyteContext,
incoming_values: Dict[str, Any],
flyte_interface_types: Dict[str, _interface_models.Variable],
native_types: Dict[str, type],
) -> Dict[str, _literals_models.Literal]
The point of this function is to extract out Literals from a collection of either Python native values (which would be converted into Flyte literals) or Promises (the literals in which would just get extracted).
When calling a task inside a workflow, a user might do something like this.
def my_wf(in1: int) -> int:
a = task_1(in1=in1)
b = task_2(in1=5, in2=a)
return b
If this is the case, when task_2 is called in local workflow execution, we’ll need to translate the Python native literal 5 to a Flyte literal.
More interesting is this:
def my_wf(in1: int, in2: int) -> int:
a = task_1(in1=in1)
b = task_2(in1=5, in2=[a, in2])
return b
Here, in task_2, during execution we’d have a list of Promises. We have to make sure to give task2 a Flyte LiteralCollection (Flyte’s name for list), not a Python list of Flyte literals.
This helper function is used both when sorting out inputs to a task, as well as outputs of a function.
Parameter | Type |
---|---|
ctx |
FlyteContext |
incoming_values |
Dict[str, Any] |
flyte_interface_types |
Dict[str, _interface_models.Variable] |
native_types |
Dict[str, type] |
translate_inputs_to_native()
def translate_inputs_to_native(
ctx: FlyteContext,
incoming_values: Dict[str, Any],
flyte_interface_types: Dict[str, _interface_models.Variable],
) -> Dict[str, _literals_models.Literal]
Parameter | Type |
---|---|
ctx |
FlyteContext |
incoming_values |
Dict[str, Any] |
flyte_interface_types |
Dict[str, _interface_models.Variable] |
flytekit.core.promise.ComparisonExpression
ComparisonExpression refers to an expression of the form (lhs operator rhs), where lhs and rhs are operands and operator can be any comparison expression like <, >, <=, >=, ==, !=
class ComparisonExpression(
lhs: Union['Promise', Any],
op: ComparisonOps,
rhs: Union['Promise', Any],
)
Parameter | Type |
---|---|
lhs |
Union['Promise', Any] |
op |
ComparisonOps |
rhs |
Union['Promise', Any] |
Methods
Method | Description |
---|---|
eval() |
eval()
def eval()
Properties
Property | Type | Description |
---|---|---|
lhs |
||
op |
||
rhs |
flytekit.core.promise.ConjunctionExpression
A Conjunction Expression is an expression of the form either (A and B) or (A or B). where A, B are two expressions (comparison or conjunctions) and (and, or) are logical truth operators.
A conjunctionExpression evaluates to True or False depending on the logical operator and the truth values of each of the expressions A & B
class ConjunctionExpression(
lhs: Union[ComparisonExpression, 'ConjunctionExpression'],
op: ConjunctionOps,
rhs: Union[ComparisonExpression, 'ConjunctionExpression'],
)
Parameter | Type |
---|---|
lhs |
Union[ComparisonExpression, 'ConjunctionExpression'] |
op |
ConjunctionOps |
rhs |
Union[ComparisonExpression, 'ConjunctionExpression'] |
Methods
Method | Description |
---|---|
eval() |
eval()
def eval()
Properties
Property | Type | Description |
---|---|---|
lhs |
||
op |
||
rhs |
flytekit.core.promise.HasFlyteInterface
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol):
def meth(self) -> int:
...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C:
def meth(self) -> int:
return 0
def func(x: Proto) -> int:
return x.meth()
func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto[T](Protocol):
def meth(self) -> T:
...
class HasFlyteInterface(
args,
kwargs,
)
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
construct_node_metadata() |
construct_node_metadata()
def construct_node_metadata()
Properties
Property | Type | Description |
---|---|---|
interface |
||
name |
flytekit.core.promise.LocallyExecutable
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol):
def meth(self) -> int:
...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C:
def meth(self) -> int:
return 0
def func(x: Proto) -> int:
return x.meth()
func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto[T](Protocol):
def meth(self) -> T:
...
class LocallyExecutable(
args,
kwargs,
)
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
local_execute() |
|
local_execution_mode() |
local_execute()
def local_execute(
ctx: FlyteContext,
kwargs,
) -> Union[Tuple[Promise], Promise, VoidPromise, None]
Parameter | Type |
---|---|
ctx |
FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
flytekit.core.promise.NodeOutput
class NodeOutput(
node: Node,
var: str,
attr_path: Optional[List[Union[str, int]]],
)
Parameter | Type |
---|---|
node |
Node |
var |
str |
attr_path |
Optional[List[Union[str, int]]] |
Methods
Method | Description |
---|---|
deepcopy() |
|
from_flyte_idl() |
. |
serialize_to_string() |
|
short_string() |
:rtype: Text. |
to_flyte_idl() |
:rtype: flyteidl. |
verbose_string() |
:rtype: Text. |
with_attr() |
deepcopy()
def deepcopy()
from_flyte_idl()
def from_flyte_idl(
pb2_object,
) -> e: OutputReference
Parameter | Type |
---|---|
pb2_object |
serialize_to_string()
def serialize_to_string()
short_string()
def short_string()
:rtype: Text
to_flyte_idl()
def to_flyte_idl()
:rtype: flyteidl.core.types.OutputReference
verbose_string()
def verbose_string()
:rtype: Text
with_attr()
def with_attr(
key,
) -> NodeOutput
Parameter | Type |
---|---|
key |
Properties
Property | Type | Description |
---|---|---|
attr_path |
The attribute path the promise will be resolved with. :rtype: list[union[str, int]] |
|
is_empty |
||
node |
Return Node object. |
|
node_id |
Override the underlying node_id property to refer to the Node’s id. This is to make sure that overriding node IDs from with_overrides gets serialized correctly. :rtype: Text |
|
var |
Variable name must refer to an output variable for the node. :rtype: Text |
flytekit.core.promise.Promise
This object is a wrapper and exists for three main reasons. Let’s assume we’re dealing with a task like ::
@task
def t1() -> (int, str): ...
#. Handling the duality between compilation and local execution - when the task function is run in a local execution mode inside a workflow function, a Python integer and string are produced. When the task is being compiled as part of the workflow, the task call creates a Node instead, and the task returns two Promise objects that point to that Node. #. One needs to be able to call ::
x = t1().with_overrides(...)
If the task returns an integer or a (int, str)
tuple like t1
above, calling with_overrides
on the
result would throw an error. This Promise object adds that.
#. Assorted handling for conditionals.
class Promise(
var: str,
val: Union[NodeOutput, _literals_models.Literal],
type: typing.Optional[_type_models.LiteralType],
)
Parameter | Type |
---|---|
var |
str |
val |
Union[NodeOutput, _literals_models.Literal] |
type |
typing.Optional[_type_models.LiteralType] |
Methods
Method | Description |
---|---|
deepcopy() |
|
eval() |
|
is_() |
|
is_false() |
|
is_none() |
|
is_true() |
|
with_overrides() |
|
with_var() |
deepcopy()
def deepcopy()
eval()
def eval()
is_()
def is_(
v: bool,
) -> ComparisonExpression
Parameter | Type |
---|---|
v |
bool |
is_false()
def is_false()
is_none()
def is_none()
is_true()
def is_true()
with_overrides()
def with_overrides(
node_name: Optional[str],
aliases: Optional[Dict[str, str]],
requests: Optional[Resources],
limits: Optional[Resources],
timeout: Optional[Union[int, datetime.timedelta, object]],
retries: Optional[int],
interruptible: Optional[bool],
name: Optional[str],
task_config: Optional[Any],
container_image: Optional[str],
accelerator: Optional[BaseAccelerator],
cache: Optional[bool],
cache_version: Optional[str],
cache_serialize: Optional[bool],
args,
kwargs,
)
Parameter | Type |
---|---|
node_name |
Optional[str] |
aliases |
Optional[Dict[str, str]] |
requests |
Optional[Resources] |
limits |
Optional[Resources] |
timeout |
Optional[Union[int, datetime.timedelta, object]] |
retries |
Optional[int] |
interruptible |
Optional[bool] |
name |
Optional[str] |
task_config |
Optional[Any] |
container_image |
Optional[str] |
accelerator |
Optional[BaseAccelerator] |
cache |
Optional[bool] |
cache_version |
Optional[str] |
cache_serialize |
Optional[bool] |
args |
*args |
kwargs |
**kwargs |
with_var()
def with_var(
new_var: str,
) -> Promise
Parameter | Type |
---|---|
new_var |
str |
Properties
Property | Type | Description |
---|---|---|
attr_path |
The attribute path the promise will be resolved with. :rtype: List[Union[str, int]] |
|
is_ready |
Returns if the Promise is READY (is not a reference and the val is actually ready) Usage :: p = Promise(…) … if p.is_ready():
else:
|
|
ref |
If the promise is NOT READY / Incomplete, then it maps to the origin node that owns the promise |
|
val |
If the promise is ready then this holds the actual evaluate value in Flyte’s type system |
|
var |
Name of the variable bound with this promise |
flytekit.core.promise.SupportsNodeCreation
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol):
def meth(self) -> int:
...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C:
def meth(self) -> int:
return 0
def func(x: Proto) -> int:
return x.meth()
func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto[T](Protocol):
def meth(self) -> T:
...
class SupportsNodeCreation(
args,
kwargs,
)
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
construct_node_metadata() |
construct_node_metadata()
def construct_node_metadata()
Properties
Property | Type | Description |
---|---|---|
name |
||
python_interface |
flytekit.core.promise.VoidPromise
This object is returned for tasks that do not return any outputs (declared interface is empty) VoidPromise cannot be interacted with and does not allow comparisons or any operations
class VoidPromise(
task_name: str,
ref: Optional[NodeOutput],
)
Parameter | Type |
---|---|
task_name |
str |
ref |
Optional[NodeOutput] |
Methods
Method | Description |
---|---|
runs_before() |
This is a placeholder and should do nothing. |
with_overrides() |
runs_before()
def runs_before(
args,
kwargs,
)
This is a placeholder and should do nothing. It is only here to enable local execution of workflows where a task returns nothing.
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
with_overrides()
def with_overrides(
args,
kwargs,
)
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
Properties
Property | Type | Description |
---|---|---|
ref |
||
task_name |