flytekit.core.gate
Directory
Classes
Class |
Description |
ExecutionState |
This is the context that is active when executing a task or a local workflow. |
FlyteContext |
This is an internal-facing context object, that most users will not have to deal with. |
FlyteContextManager |
FlyteContextManager manages the execution context within Flytekit. |
Gate |
A node type that waits for user input before proceeding with a workflow. |
LiteralType |
None. |
Promise |
This object is a wrapper and exists for three main reasons. |
Scalar |
None. |
TypeEngine |
Core Extensible TypeEngine of Flytekit. |
VoidPromise |
This object is returned for tasks that do not return any outputs (declared interface is empty). |
Errors
flytekit.core.gate.ExecutionState
This is the context that is active when executing a task or a local workflow. This carries the necessary state to
execute.
Some required things during execution deal with temporary directories, ExecutionParameters that are passed to the
user etc.
Attributes:
mode (ExecutionState.Mode): Defines the context in which the task is executed (local, hosted, etc).
working_dir (os.PathLike): Specifies the remote, external directory where inputs, outputs and other protobufs
are uploaded
engine_dir (os.PathLike):
branch_eval_mode Optional[BranchEvalMode]: Used to determine whether a branch node should execute.
user_space_params Optional[ExecutionParameters]: Provides run-time, user-centric context such as a statsd
handler, a logging handler, the current execution id and a working directory.
def ExecutionState(
working_dir: Union[os.PathLike, str],
mode: Optional[ExecutionState.Mode],
engine_dir: Optional[Union[os.PathLike, str]],
branch_eval_mode: Optional[BranchEvalMode],
user_space_params: Optional[ExecutionParameters],
):
Parameter |
Type |
working_dir |
Union[os.PathLike, str] |
mode |
Optional[ExecutionState.Mode] |
engine_dir |
Optional[Union[os.PathLike, str]] |
branch_eval_mode |
Optional[BranchEvalMode] |
user_space_params |
Optional[ExecutionParameters] |
Methods
Method |
Description |
branch_complete() |
Indicates that we are within a conditional / ifelse block and the active branch is not done |
is_local_execution() |
None |
take_branch() |
Indicates that we are within an if-else block and the current branch has evaluated to true |
with_params() |
Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values |
branch_complete()
Indicates that we are within a conditional / ifelse block and the active branch is not done.
Default to SKIPPED
is_local_execution()
take_branch()
Indicates that we are within an if-else block and the current branch has evaluated to true.
Useful only in local execution mode
with_params()
def with_params(
working_dir: Optional[os.PathLike],
mode: Optional[Mode],
engine_dir: Optional[os.PathLike],
branch_eval_mode: Optional[BranchEvalMode],
user_space_params: Optional[ExecutionParameters],
):
Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values.
Parameter |
Type |
working_dir |
Optional[os.PathLike] |
mode |
Optional[Mode] |
engine_dir |
Optional[os.PathLike] |
branch_eval_mode |
Optional[BranchEvalMode] |
user_space_params |
Optional[ExecutionParameters] |
flytekit.core.gate.FlyteContext
This is an internal-facing context object, that most users will not have to deal with. It’s essentially a globally
available grab bag of settings and objects that allows flytekit to do things like convert complex types, run and
compile workflows, serialize Flyte entities, etc.
Even though this object as a current_context
function on it, it should not be called directly. Please use the
:py:class:flytekit.FlyteContextManager
object instead.
Please do not confuse this object with the :py:class:flytekit.ExecutionParameters
object.
def FlyteContext(
file_access: FileAccessProvider,
level: int,
flyte_client: Optional['friendly_client.SynchronousFlyteClient'],
compilation_state: Optional[CompilationState],
execution_state: Optional[ExecutionState],
serialization_settings: Optional[SerializationSettings],
in_a_condition: bool,
origin_stackframe: Optional[traceback.FrameSummary],
output_metadata_tracker: Optional[OutputMetadataTracker],
worker_queue: Optional[Controller],
):
Parameter |
Type |
file_access |
FileAccessProvider |
level |
int |
flyte_client |
Optional['friendly_client.SynchronousFlyteClient'] |
compilation_state |
Optional[CompilationState] |
execution_state |
Optional[ExecutionState] |
serialization_settings |
Optional[SerializationSettings] |
in_a_condition |
bool |
origin_stackframe |
Optional[traceback.FrameSummary] |
output_metadata_tracker |
Optional[OutputMetadataTracker] |
worker_queue |
Optional[Controller] |
Methods
current_context()
This method exists only to maintain backwards compatibility. Please use
FlyteContextManager.current_context()
instead.
Users of flytekit should be wary not to confuse the object returned from this function
with :py:func:flytekit.current_context
enter_conditional_section()
def enter_conditional_section()
get_deck()
Returns the deck that was created as part of the last execution.
The return value depends on the execution environment. In a notebook, the return value is compatible with
IPython.display and should be rendered in the notebook.
.. code-block:: python
with flytekit.new_context() as ctx:
my_task(…)
ctx.get_deck()
OR if you wish to explicitly display
.. code-block:: python
from IPython import display
display(ctx.get_deck())
get_origin_stackframe_repr()
def get_origin_stackframe_repr()
new_builder()
new_compilation_state()
def new_compilation_state(
prefix: str,
):
Creates and returns a default compilation state. For most of the code this should be the entrypoint
of compilation, otherwise the code should always uses - with_compilation_state
Parameter |
Type |
prefix |
str |
new_execution_state()
def new_execution_state(
working_dir: Optional[os.PathLike],
):
Creates and returns a new default execution state. This should be used at the entrypoint of execution,
in all other cases it is preferable to use with_execution_state
Parameter |
Type |
working_dir |
Optional[os.PathLike] |
set_stackframe()
def set_stackframe(
s: traceback.FrameSummary,
):
Parameter |
Type |
s |
traceback.FrameSummary |
with_client()
def with_client(
c: SynchronousFlyteClient,
):
Parameter |
Type |
c |
SynchronousFlyteClient |
with_compilation_state()
def with_compilation_state(
c: CompilationState,
):
Parameter |
Type |
c |
CompilationState |
with_execution_state()
def with_execution_state(
es: ExecutionState,
):
Parameter |
Type |
es |
ExecutionState |
with_file_access()
def with_file_access(
fa: FileAccessProvider,
):
Parameter |
Type |
fa |
FileAccessProvider |
with_new_compilation_state()
def with_new_compilation_state()
def with_output_metadata_tracker(
t: OutputMetadataTracker,
):
Parameter |
Type |
t |
OutputMetadataTracker |
with_serialization_settings()
def with_serialization_settings(
ss: SerializationSettings,
):
Parameter |
Type |
ss |
SerializationSettings |
with_worker_queue()
def with_worker_queue(
wq: Controller,
):
Parameter |
Type |
wq |
Controller |
Properties
Property |
Type |
Description |
user_space_params |
|
|
flytekit.core.gate.FlyteContextManager
FlyteContextManager manages the execution context within Flytekit. It holds global state of either compilation
or Execution. It is not thread-safe and can only be run as a single threaded application currently.
Context’s within Flytekit is useful to manage compilation state and execution state. Refer to CompilationState
and ExecutionState
for more information. FlyteContextManager provides a singleton stack to manage these contexts.
Typical usage is
.. code-block:: python
FlyteContextManager.initialize()
with FlyteContextManager.with_context(o) as ctx:
pass
If required - not recommended you can use
FlyteContextManager.push_context()
but correspondingly a pop_context should be called
FlyteContextManager.pop_context()
Methods
add_signal_handler()
def add_signal_handler(
handler: typing.Callable[[int, FrameType], typing.Any],
):
Parameter |
Type |
handler |
typing.Callable[[int, FrameType], typing.Any] |
current_context()
get_origin_stackframe()
def get_origin_stackframe(
limit,
):
initialize()
Re-initializes the context and erases the entire context
pop_context()
push_context()
def push_context(
ctx: FlyteContext,
f: Optional[traceback.FrameSummary],
):
Parameter |
Type |
ctx |
FlyteContext |
f |
Optional[traceback.FrameSummary] |
size()
with_context()
def with_context(
b: FlyteContext.Builder,
):
Parameter |
Type |
b |
FlyteContext.Builder |
flytekit.core.gate.FlyteDisapprovalException
Assertion failed.
def FlyteDisapprovalException(
args,
timestamp: typing.Optional[float],
):
Parameter |
Type |
args |
*args |
timestamp |
typing.Optional[float] |
Properties
Property |
Type |
Description |
timestamp |
|
|
flytekit.core.gate.Gate
A node type that waits for user input before proceeding with a workflow.
A gate is a type of node that behaves like a task, but instead of running code, it either needs to wait
for user input to proceed or wait for a timer to complete running.
def Gate(
name: str,
input_type: typing.Optional[typing.Type],
upstream_item: typing.Optional[typing.Any],
sleep_duration: typing.Optional[datetime.timedelta],
timeout: typing.Optional[datetime.timedelta],
):
Parameter |
Type |
name |
str |
input_type |
typing.Optional[typing.Type] |
upstream_item |
typing.Optional[typing.Any] |
sleep_duration |
typing.Optional[datetime.timedelta] |
timeout |
typing.Optional[datetime.timedelta] |
Methods
def construct_node_metadata()
local_execute()
def local_execute(
ctx: FlyteContext,
kwargs,
):
Parameter |
Type |
ctx |
FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
Properties
Property |
Type |
Description |
input_type |
|
|
literal_type |
|
|
name |
|
|
python_interface |
|
|
sleep_duration |
|
|
flytekit.core.gate.LiteralType
def LiteralType(
simple,
schema,
collection_type,
map_value_type,
blob,
enum_type,
union_type,
structured_dataset_type,
metadata,
structure,
annotation,
):
This is a oneof message, only one of the kwargs may be set, representing one of the Flyte types.
Parameter |
Type |
simple |
|
schema |
|
collection_type |
|
map_value_type |
|
blob |
|
enum_type |
|
union_type |
|
structured_dataset_type |
|
metadata |
|
structure |
|
annotation |
|
Methods
from_flyte_idl()
def from_flyte_idl(
proto,
):
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
annotation |
|
|
blob |
|
|
collection_type |
|
|
enum_type |
|
|
is_empty |
|
|
map_value_type |
|
|
metadata |
|
|
schema |
|
|
simple |
|
|
structure |
|
|
structured_dataset_type |
|
|
union_type |
|
|
flytekit.core.gate.Promise
This object is a wrapper and exists for three main reasons. Let’s assume we’re dealing with a task like ::
@task
def t1() -> (int, str): …
#. Handling the duality between compilation and local execution - when the task function is run in a local execution
mode inside a workflow function, a Python integer and string are produced. When the task is being compiled as
part of the workflow, the task call creates a Node instead, and the task returns two Promise objects that
point to that Node.
#. One needs to be able to call ::
x = t1().with_overrides(…)
If the task returns an integer or a (int, str)
tuple like t1
above, calling with_overrides
on the
result would throw an error. This Promise object adds that.
#. Assorted handling for conditionals.
def Promise(
var: str,
val: Union[NodeOutput, _literals_models.Literal],
type: typing.Optional[_type_models.LiteralType],
):
Parameter |
Type |
var |
str |
val |
Union[NodeOutput, _literals_models.Literal] |
type |
typing.Optional[_type_models.LiteralType] |
Methods
deepcopy()
eval()
is_()
is_false()
is_none()
is_true()
with_overrides()
def with_overrides(
node_name: Optional[str],
aliases: Optional[Dict[str, str]],
requests: Optional[Resources],
limits: Optional[Resources],
timeout: Optional[Union[int, datetime.timedelta, object]],
retries: Optional[int],
interruptible: Optional[bool],
name: Optional[str],
task_config: Optional[Any],
container_image: Optional[str],
accelerator: Optional[BaseAccelerator],
cache: Optional[bool],
cache_version: Optional[str],
cache_serialize: Optional[bool],
args,
kwargs,
):
Parameter |
Type |
node_name |
Optional[str] |
aliases |
Optional[Dict[str, str]] |
requests |
Optional[Resources] |
limits |
Optional[Resources] |
timeout |
Optional[Union[int, datetime.timedelta, object]] |
retries |
Optional[int] |
interruptible |
Optional[bool] |
name |
Optional[str] |
task_config |
Optional[Any] |
container_image |
Optional[str] |
accelerator |
Optional[BaseAccelerator] |
cache |
Optional[bool] |
cache_version |
Optional[str] |
cache_serialize |
Optional[bool] |
args |
*args |
kwargs |
**kwargs |
with_var()
def with_var(
new_var: str,
):
Parameter |
Type |
new_var |
str |
Properties
Property |
Type |
Description |
attr_path |
|
|
is_ready |
|
|
ref |
|
|
val |
|
|
var |
|
|
flytekit.core.gate.Scalar
def Scalar(
primitive: typing.Optional[flytekit.models.literals.Primitive],
blob: typing.Optional[flytekit.models.literals.Blob],
binary: typing.Optional[flytekit.models.literals.Binary],
schema: typing.Optional[flytekit.models.literals.Schema],
union: typing.Optional[flytekit.models.literals.Union],
none_type: typing.Optional[flytekit.models.literals.Void],
error: typing.Optional[flytekit.models.types.Error],
generic: typing.Optional[google.protobuf.struct_pb2.Struct],
structured_dataset: typing.Optional[flytekit.models.literals.StructuredDataset],
):
Scalar wrapper around Flyte types. Only one can be specified.
Parameter |
Type |
primitive |
typing.Optional[flytekit.models.literals.Primitive] |
blob |
typing.Optional[flytekit.models.literals.Blob] |
binary |
typing.Optional[flytekit.models.literals.Binary] |
schema |
typing.Optional[flytekit.models.literals.Schema] |
union |
typing.Optional[flytekit.models.literals.Union] |
none_type |
typing.Optional[flytekit.models.literals.Void] |
error |
typing.Optional[flytekit.models.types.Error] |
generic |
typing.Optional[google.protobuf.struct_pb2.Struct] |
structured_dataset |
typing.Optional[flytekit.models.literals.StructuredDataset] |
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
binary |
|
|
blob |
|
|
error |
|
|
generic |
|
|
is_empty |
|
|
none_type |
|
|
primitive |
|
|
schema |
|
|
structured_dataset |
|
|
union |
|
|
value |
|
|
flytekit.core.gate.TypeEngine
Core Extensible TypeEngine of Flytekit. This should be used to extend the capabilities of FlyteKits type system.
Users can implement their own TypeTransformers and register them with the TypeEngine. This will allow special handling
of user objects
Methods
async_to_literal()
def async_to_literal(
ctx: FlyteContext,
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
):
Converts a python value of a given type and expected LiteralType
into a resolved Literal
value.
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
python_type |
Type[T] |
expected |
LiteralType |
async_to_python_value()
def async_to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type,
):
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
Type |
calculate_hash()
def calculate_hash(
python_val: typing.Any,
python_type: Type[T],
):
Parameter |
Type |
python_val |
typing.Any |
python_type |
Type[T] |
dict_to_literal_map()
def dict_to_literal_map(
ctx: FlyteContext,
d: typing.Dict[str, typing.Any],
type_hints: Optional[typing.Dict[str, type]],
):
Parameter |
Type |
ctx |
FlyteContext |
d |
typing.Dict[str, typing.Any] |
type_hints |
Optional[typing.Dict[str, type]] |
dict_to_literal_map_pb()
def dict_to_literal_map_pb(
ctx: FlyteContext,
d: typing.Dict[str, typing.Any],
type_hints: Optional[typing.Dict[str, type]],
):
Parameter |
Type |
ctx |
FlyteContext |
d |
typing.Dict[str, typing.Any] |
type_hints |
Optional[typing.Dict[str, type]] |
def get_available_transformers()
Returns all python types for which transformers are available
def get_transformer(
python_type: Type,
):
Implements a recursive search for the transformer.
Parameter |
Type |
python_type |
Type |
guess_python_type()
def guess_python_type(
flyte_type: LiteralType,
):
Transforms a flyte-specific LiteralType
to a regular python value.
Parameter |
Type |
flyte_type |
LiteralType |
guess_python_types()
def guess_python_types(
flyte_variable_dict: typing.Dict[str, _interface_models.Variable],
):
Transforms a dictionary of flyte-specific Variable
objects to a dictionary of regular python values.
Parameter |
Type |
flyte_variable_dict |
typing.Dict[str, _interface_models.Variable] |
def lazy_import_transformers()
Only load the transformers if needed.
literal_map_to_kwargs()
def literal_map_to_kwargs(
ctx: FlyteContext,
lm: LiteralMap,
python_types: typing.Optional[typing.Dict[str, type]],
literal_types: typing.Optional[typing.Dict[str, _interface_models.Variable]],
):
Parameter |
Type |
ctx |
FlyteContext |
lm |
LiteralMap |
python_types |
typing.Optional[typing.Dict[str, type]] |
literal_types |
typing.Optional[typing.Dict[str, _interface_models.Variable]] |
named_tuple_to_variable_map()
def named_tuple_to_variable_map(
t: typing.NamedTuple,
):
Converts a python-native NamedTuple
to a flyte-specific VariableMap of named literals.
Parameter |
Type |
t |
typing.NamedTuple |
register()
def register(
transformer: TypeTransformer,
additional_types: Optional[typing.List[Type]],
):
This should be used for all types that respond with the right type annotation when you use type(…) function
Parameter |
Type |
transformer |
TypeTransformer |
additional_types |
Optional[typing.List[Type]] |
register_additional_type()
def register_additional_type(
transformer: TypeTransformer[T],
additional_type: Type[T],
override,
):
Parameter |
Type |
transformer |
TypeTransformer[T] |
additional_type |
Type[T] |
override |
|
register_restricted_type()
def register_restricted_type(
name: str,
type: Type[T],
):
Parameter |
Type |
name |
str |
type |
Type[T] |
to_html()
def to_html(
ctx: FlyteContext,
python_val: typing.Any,
expected_python_type: Type[typing.Any],
):
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
expected_python_type |
Type[typing.Any] |
to_literal()
def to_literal(
ctx: FlyteContext,
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
):
The current dance is because we are allowing users to call from an async function, this synchronous
to_literal function, and allowing this to_literal function, to then invoke yet another async function,
namely an async transformer.
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
python_type |
Type[T] |
expected |
LiteralType |
to_literal_checks()
def to_literal_checks(
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
):
Parameter |
Type |
python_val |
typing.Any |
python_type |
Type[T] |
expected |
LiteralType |
to_literal_type()
def to_literal_type(
python_type: Type[T],
):
Converts a python type into a flyte specific LiteralType
Parameter |
Type |
python_type |
Type[T] |
to_python_value()
def to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type,
):
Converts a Literal value with an expected python type into a python value.
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
Type |
unwrap_offloaded_literal()
def unwrap_offloaded_literal(
ctx: FlyteContext,
lv: Literal,
):
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
flytekit.core.gate.VoidPromise
This object is returned for tasks that do not return any outputs (declared interface is empty)
VoidPromise cannot be interacted with and does not allow comparisons or any operations
def VoidPromise(
task_name: str,
ref: Optional[NodeOutput],
):
Parameter |
Type |
task_name |
str |
ref |
Optional[NodeOutput] |
Methods
runs_before()
def runs_before(
args,
kwargs,
):
This is a placeholder and should do nothing. It is only here to enable local execution of workflows
where a task returns nothing.
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
with_overrides()
def with_overrides(
args,
kwargs,
):
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
Properties
Property |
Type |
Description |
ref |
|
|
task_name |
|
|