flytekit.extend
==================
Extending Flytekit
.. currentmodule:: flytekit.extend
This package contains things that are useful when extending Flytekit.
.. autosummary::
:nosignatures:
:template: custom.rst
:toctree: generated/
get_serializable
context_manager
IgnoreOutputs
ExecutionState
Image
ImageConfig
Interface
Promise
TaskPlugins
DictTransformer
T
TypeEngine
TypeTransformer
PythonCustomizedContainerTask
ExecutableTemplateShimTask
ShimTaskExecutor
Directory
Classes
Class |
Description |
ClassStorageTaskResolver |
Stores tasks inside a class variable. |
DictTransformer |
Transformer that transforms an univariate dictionary Dict[str, T] to a Literal Map or. |
ExecutableTemplateShimTask |
The canonical @task decorated Python function task is pretty simple to reason about. |
ExecutionState |
This is the context that is active when executing a task or a local workflow. |
FileAccessProvider |
This is the class that is available through the FlyteContext and can be used for persisting data to the remote. |
Image |
Image is a structured wrapper for task container images used in object serialization. |
ImageConfig |
We recommend you to use ImageConfig. |
Interface |
A Python native interface object, like inspect. |
Promise |
This object is a wrapper and exists for three main reasons. |
PythonCustomizedContainerTask |
Please take a look at the comments for :py:class`flytekit. |
PythonTask |
Base Class for all Tasks with a Python native Interface . |
SQLTask |
Base task types for all SQL tasks. |
SecretsManager |
This provides a secrets resolution logic at runtime. |
SerializationSettings |
These settings are provided while serializing a workflow and task, before registration. |
ShimTaskExecutor |
Please see the notes for the metaclass above first. |
TaskPlugins |
This is the TaskPlugins factory for task types that are derivative of PythonFunctionTask. |
TaskResolverMixin |
Flytekit tasks interact with the Flyte platform very, very broadly in two steps. |
TypeEngine |
Core Extensible TypeEngine of Flytekit. |
TypeTransformer |
Base transformer type that should be implemented for every python native type that can be handled by flytekit. |
Errors
flytekit.extend.ClassStorageTaskResolver
Stores tasks inside a class variable. The class must be inherited from at the point of usage because the task
loading process basically relies on the same sequence of things happening.
def ClassStorageTaskResolver(
args,
kwargs,
):
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
Methods
Method |
Description |
add() |
None |
find_lhs() |
None |
get_all_tasks() |
Future proof method |
load_task() |
Given the set of identifier keys, should return one Python Task or raise an error if not found |
loader_args() |
This is responsible for turning an instance of a task into args that the load_task function can reconstitute |
name() |
None |
task_name() |
Overridable function that can optionally return a custom name for a given task |
add()
def add(
t: flytekit.core.python_auto_container.PythonAutoContainerTask,
):
Parameter |
Type |
t |
flytekit.core.python_auto_container.PythonAutoContainerTask |
find_lhs()
get_all_tasks()
Future proof method. Just making it easy to access all tasks (Not required today as we auto register them)
load_task()
def load_task(
loader_args: typing.List[str],
):
Given the set of identifier keys, should return one Python Task or raise an error if not found
Parameter |
Type |
loader_args |
typing.List[str] |
loader_args()
def loader_args(
settings: flytekit.configuration.SerializationSettings,
t: flytekit.core.python_auto_container.PythonAutoContainerTask,
):
This is responsible for turning an instance of a task into args that the load_task function can reconstitute.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
t |
flytekit.core.python_auto_container.PythonAutoContainerTask |
name()
task_name()
def task_name(
t: flytekit.core.base_task.Task,
):
Overridable function that can optionally return a custom name for a given task
Parameter |
Type |
t |
flytekit.core.base_task.Task |
Properties
Property |
Type |
Description |
instantiated_in |
|
|
lhs |
|
|
location |
|
|
flytekit.extend.DictTransformer
Transformer that transforms an univariate dictionary Dict[str, T] to a Literal Map or
transforms an untyped dictionary to a Binary Scalar Literal with a Struct Literal Type.
Methods
assert_type()
def assert_type(
t: Type[T],
v: T,
):
Parameter |
Type |
t |
Type[T] |
v |
T |
async_to_literal()
def async_to_literal(
ctx: FlyteContext,
python_val: typing.Any,
python_type: Type[dict],
expected: LiteralType,
):
Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type.
Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these
do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating
what was the mismatch
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
python_type |
Type[dict] |
expected |
LiteralType |
async_to_python_value()
def async_to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type[dict],
):
Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
Type[dict] |
dict_to_binary_literal()
def dict_to_binary_literal(
ctx: FlyteContext,
v: dict,
python_type: Type[dict],
allow_pickle: bool,
):
Converts a Python dictionary to a Flyte-specific Literal
using MessagePack encoding.
Falls back to Pickle if encoding fails and allow_pickle
is True.
Parameter |
Type |
ctx |
FlyteContext |
v |
dict |
python_type |
Type[dict] |
allow_pickle |
bool |
dict_to_generic_literal()
def dict_to_generic_literal(
ctx: FlyteContext,
v: dict,
python_type: Type[dict],
allow_pickle: bool,
):
This is deprecated from flytekit 1.14.0.
Creates a flyte-specific Literal
value from a native python dictionary.
Note: This is deprecated and will be removed in the future.
Parameter |
Type |
ctx |
FlyteContext |
v |
dict |
python_type |
Type[dict] |
allow_pickle |
bool |
def extract_types(
t: Optional[Type[dict]],
):
Parameter |
Type |
t |
Optional[Type[dict]] |
from_binary_idl()
def from_binary_idl(
binary_idl_object: Binary,
expected_python_type: Type[T],
):
This function primarily handles deserialization for untyped dicts, dataclasses, Pydantic BaseModels, and attribute access.`
For untyped dict, dataclass, and pydantic basemodel:
Life Cycle (Untyped Dict as example):
python val -> msgpack bytes -> binary literal scalar -> msgpack bytes -> python val
(to_literal) (from_binary_idl)
For attribute access:
Life Cycle:
python val -> msgpack bytes -> binary literal scalar -> resolved golang value -> binary literal scalar -> msgpack bytes -> python val
(to_literal) (propeller attribute access) (from_binary_idl)
Parameter |
Type |
binary_idl_object |
Binary |
expected_python_type |
Type[T] |
from_generic_idl()
def from_generic_idl(
generic: Struct,
expected_python_type: Type[T],
):
TODO: Support all Flyte Types.
This is for dataclass attribute access from input created from the Flyte Console.
Note:
- This can be removed in the future when the Flyte Console support generate Binary IDL Scalar as input.
Parameter |
Type |
generic |
Struct |
expected_python_type |
Type[T] |
get_literal_type()
def get_literal_type(
t: Type[dict],
):
Transforms a native python dictionary to a flyte-specific LiteralType
Parameter |
Type |
t |
Type[dict] |
guess_python_type()
def guess_python_type(
literal_type: LiteralType,
):
Converts the Flyte LiteralType to a python object type.
Parameter |
Type |
literal_type |
LiteralType |
is_pickle()
def is_pickle(
python_type: Type[dict],
):
Parameter |
Type |
python_type |
Type[dict] |
isinstance_generic()
def isinstance_generic(
obj,
generic_alias,
):
Parameter |
Type |
obj |
|
generic_alias |
|
to_html()
def to_html(
ctx: FlyteContext,
python_val: T,
expected_python_type: Type[T],
):
Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
Parameter |
Type |
ctx |
FlyteContext |
python_val |
T |
expected_python_type |
Type[T] |
to_literal()
def to_literal(
ctx: FlyteContext,
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
):
Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type.
Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these
do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating
what was the mismatch
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
python_type |
Type[T] |
expected |
LiteralType |
to_python_value()
def to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type[T],
):
Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
Type[T] |
Properties
Property |
Type |
Description |
is_async |
|
|
name |
|
|
python_type |
|
|
type_assertions_enabled |
|
|
flytekit.extend.ExecutableTemplateShimTask
The canonical @task
decorated Python function task is pretty simple to reason about. At execution time (either
locally or on a Flyte cluster), the function runs.
This class, along with the ShimTaskExecutor
class below, represents another execution pattern. This pattern,
has two components:
- The
TaskTemplate
, or something like it like a FlyteTask
.
- An executor, which can use information from the task template (including the
custom
field)
Basically at execution time (both locally and on a Flyte cluster), the task template is given to the executor,
which is responsible for computing and returning the results.
.. note::
The interface at execution time will have to derived from the Flyte IDL interface, which means it may be lossy.
This is because when a task is serialized from Python into the TaskTemplate
some information is lost because
Flyte IDL can’t keep track of every single Python type (or Java type if writing in the Java flytekit).
This class also implements the dispatch_execute
and execute
functions to make it look like a PythonTask
that the entrypoint.py
can execute, even though this class doesn’t inherit from PythonTask
.
def ExecutableTemplateShimTask(
tt: _task_model.TaskTemplate,
executor_type: Type[ShimTaskExecutor],
args,
kwargs,
):
Parameter |
Type |
tt |
_task_model.TaskTemplate |
executor_type |
Type[ShimTaskExecutor] |
args |
*args |
kwargs |
**kwargs |
Methods
Method |
Description |
dispatch_execute() |
This function is largely similar to the base PythonTask, with the exception that we have to infer the Python |
execute() |
Rather than running here, send everything to the executor |
post_execute() |
This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask |
pre_execute() |
This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask |
dispatch_execute()
def dispatch_execute(
ctx: FlyteContext,
input_literal_map: _literal_models.LiteralMap,
):
This function is largely similar to the base PythonTask, with the exception that we have to infer the Python
interface before executing. Also, we refer to self.task_template
rather than just self
similar to task
classes that derive from the base PythonTask
.
Parameter |
Type |
ctx |
FlyteContext |
input_literal_map |
_literal_models.LiteralMap |
execute()
Rather than running here, send everything to the executor.
Parameter |
Type |
kwargs |
**kwargs |
post_execute()
def post_execute(
_: Optional[ExecutionParameters],
rval: Any,
):
This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask.
Parameter |
Type |
_ |
Optional[ExecutionParameters] |
rval |
Any |
pre_execute()
def pre_execute(
user_params: Optional[ExecutionParameters],
):
This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask.
Parameter |
Type |
user_params |
Optional[ExecutionParameters] |
Properties
Property |
Type |
Description |
executor |
|
|
executor_type |
|
|
name |
|
|
task_template |
|
|
flytekit.extend.ExecutionState
This is the context that is active when executing a task or a local workflow. This carries the necessary state to
execute.
Some required things during execution deal with temporary directories, ExecutionParameters that are passed to the
user etc.
Attributes:
mode (ExecutionState.Mode): Defines the context in which the task is executed (local, hosted, etc).
working_dir (os.PathLike): Specifies the remote, external directory where inputs, outputs and other protobufs
are uploaded
engine_dir (os.PathLike):
branch_eval_mode Optional[BranchEvalMode]: Used to determine whether a branch node should execute.
user_space_params Optional[ExecutionParameters]: Provides run-time, user-centric context such as a statsd
handler, a logging handler, the current execution id and a working directory.
def ExecutionState(
working_dir: Union[os.PathLike, str],
mode: Optional[ExecutionState.Mode],
engine_dir: Optional[Union[os.PathLike, str]],
branch_eval_mode: Optional[BranchEvalMode],
user_space_params: Optional[ExecutionParameters],
):
Parameter |
Type |
working_dir |
Union[os.PathLike, str] |
mode |
Optional[ExecutionState.Mode] |
engine_dir |
Optional[Union[os.PathLike, str]] |
branch_eval_mode |
Optional[BranchEvalMode] |
user_space_params |
Optional[ExecutionParameters] |
Methods
Method |
Description |
branch_complete() |
Indicates that we are within a conditional / ifelse block and the active branch is not done |
is_local_execution() |
None |
take_branch() |
Indicates that we are within an if-else block and the current branch has evaluated to true |
with_params() |
Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values |
branch_complete()
Indicates that we are within a conditional / ifelse block and the active branch is not done.
Default to SKIPPED
is_local_execution()
take_branch()
Indicates that we are within an if-else block and the current branch has evaluated to true.
Useful only in local execution mode
with_params()
def with_params(
working_dir: Optional[os.PathLike],
mode: Optional[Mode],
engine_dir: Optional[os.PathLike],
branch_eval_mode: Optional[BranchEvalMode],
user_space_params: Optional[ExecutionParameters],
):
Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values.
Parameter |
Type |
working_dir |
Optional[os.PathLike] |
mode |
Optional[Mode] |
engine_dir |
Optional[os.PathLike] |
branch_eval_mode |
Optional[BranchEvalMode] |
user_space_params |
Optional[ExecutionParameters] |
flytekit.extend.FileAccessProvider
This is the class that is available through the FlyteContext and can be used for persisting data to the remote
durable store.
def FileAccessProvider(
local_sandbox_dir: typing.Union[str, os.PathLike],
raw_output_prefix: str,
data_config: typing.Optional[flytekit.configuration.DataConfig],
execution_metadata: typing.Optional[dict],
):
Parameter |
Type |
local_sandbox_dir |
typing.Union[str, os.PathLike] |
raw_output_prefix |
str |
data_config |
typing.Optional[flytekit.configuration.DataConfig] |
execution_metadata |
typing.Optional[dict] |
Methods
async_get_data()
def async_get_data(
remote_path: str,
local_path: str,
is_multipart: bool,
kwargs,
):
Parameter |
Type |
remote_path |
str |
local_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
async_put_data()
def async_put_data(
local_path: typing.Union[str, os.PathLike],
remote_path: str,
is_multipart: bool,
kwargs,
):
The implication here is that we’re always going to put data to the remote location, so we .remote to ensure
we don’t use the true local proxy if the remote path is a file://
Parameter |
Type |
local_path |
typing.Union[str, os.PathLike] |
remote_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
async_put_raw_data()
def async_put_raw_data(
lpath: typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO],
upload_prefix: typing.Optional[str],
file_name: typing.Optional[str],
read_chunk_size_bytes: int,
encoding: str,
skip_raw_data_prefix: bool,
kwargs,
):
This is a more flexible version of put that accepts a file-like object or a string path.
Writes to the raw output prefix only. If you want to write to another fs use put_data or get the fsspec
file system directly.
FYI: Currently the raw output prefix set by propeller is already unique per retry and looks like
s3://my-s3-bucket/data/o4/feda4e266c748463a97d-n0-0
If lpath is a folder, then recursive will be set.
If lpath is a streamable, then it can only be a single file.
Writes to:
{raw output prefix}/{upload_prefix}/{file_name}
Parameter |
Type |
lpath |
typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO] |
upload_prefix |
typing.Optional[str] |
file_name |
typing.Optional[str] |
read_chunk_size_bytes |
int |
encoding |
str |
skip_raw_data_prefix |
bool |
kwargs |
**kwargs |
download()
def download(
remote_path: str,
local_path: str,
kwargs,
):
Downloads from remote to local
Parameter |
Type |
remote_path |
str |
local_path |
str |
kwargs |
**kwargs |
download_directory()
def download_directory(
remote_path: str,
local_path: str,
kwargs,
):
Downloads directory from given remote to local path
Parameter |
Type |
remote_path |
str |
local_path |
str |
kwargs |
**kwargs |
exists()
def exists(
path: str,
):
generate_new_custom_path()
def generate_new_custom_path(
fs: typing.Optional[fsspec.spec.AbstractFileSystem],
alt: typing.Optional[str],
stem: typing.Optional[str],
):
Generates a new path with the raw output prefix and a random string appended to it.
Optionally, you can provide an alternate prefix and a stem. If stem is provided, it
will be appended to the path instead of a random string. If alt is provided, it will
replace the first part of the output prefix, e.g. the S3 or GCS bucket.
If wanting to write to a non-random prefix in a non-default S3 bucket, this can be
called with alt=“my-alt-bucket” and stem=“my-stem” to generate a path like
s3://my-alt-bucket/default-prefix-part/my-stem
Parameter |
Type |
fs |
typing.Optional[fsspec.spec.AbstractFileSystem] |
alt |
typing.Optional[str] |
stem |
typing.Optional[str] |
get()
def get(
from_path: str,
to_path: str,
recursive: bool,
kwargs,
):
Parameter |
Type |
from_path |
str |
to_path |
str |
recursive |
bool |
kwargs |
**kwargs |
get_async_filesystem_for_path()
def get_async_filesystem_for_path(
path: str,
anonymous: bool,
kwargs,
):
Parameter |
Type |
path |
str |
anonymous |
bool |
kwargs |
**kwargs |
get_data()
def get_data(
remote_path: str,
local_path: str,
is_multipart: bool,
kwargs,
):
Parameter |
Type |
remote_path |
str |
local_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
get_file_tail()
def get_file_tail(
file_path_or_file_name: str,
):
Parameter |
Type |
file_path_or_file_name |
str |
get_filesystem()
def get_filesystem(
protocol: typing.Optional[str],
anonymous: bool,
path: typing.Optional[str],
kwargs,
):
Parameter |
Type |
protocol |
typing.Optional[str] |
anonymous |
bool |
path |
typing.Optional[str] |
kwargs |
**kwargs |
get_filesystem_for_path()
def get_filesystem_for_path(
path: str,
anonymous: bool,
kwargs,
):
Parameter |
Type |
path |
str |
anonymous |
bool |
kwargs |
**kwargs |
get_random_local_directory()
def get_random_local_directory()
get_random_local_path()
def get_random_local_path(
file_path_or_file_name: typing.Optional[str],
):
Use file_path_or_file_name, when you want a random directory, but want to preserve the leaf file name
Parameter |
Type |
file_path_or_file_name |
typing.Optional[str] |
get_random_remote_directory()
def get_random_remote_directory()
get_random_remote_path()
def get_random_remote_path(
file_path_or_file_name: typing.Optional[str],
):
Parameter |
Type |
file_path_or_file_name |
typing.Optional[str] |
get_random_string()
is_remote()
def is_remote(
path: typing.Union[str, os.PathLike],
):
Deprecated. Let’s find a replacement
Parameter |
Type |
path |
typing.Union[str, os.PathLike] |
join()
def join(
args: `*args`,
unstrip: bool,
fs: typing.Optional[fsspec.spec.AbstractFileSystem],
):
Parameter |
Type |
args |
*args |
unstrip |
bool |
fs |
typing.Optional[fsspec.spec.AbstractFileSystem] |
put_data()
def put_data(
local_path: typing.Union[str, os.PathLike],
remote_path: str,
is_multipart: bool,
kwargs,
):
The implication here is that we’re always going to put data to the remote location, so we .remote to ensure
we don’t use the true local proxy if the remote path is a file://
Parameter |
Type |
local_path |
typing.Union[str, os.PathLike] |
remote_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
put_raw_data()
def put_raw_data(
lpath: typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO],
upload_prefix: typing.Optional[str],
file_name: typing.Optional[str],
read_chunk_size_bytes: int,
encoding: str,
skip_raw_data_prefix: bool,
kwargs,
):
This is a more flexible version of put that accepts a file-like object or a string path.
Writes to the raw output prefix only. If you want to write to another fs use put_data or get the fsspec
file system directly.
FYI: Currently the raw output prefix set by propeller is already unique per retry and looks like
s3://my-s3-bucket/data/o4/feda4e266c748463a97d-n0-0
If lpath is a folder, then recursive will be set.
If lpath is a streamable, then it can only be a single file.
Writes to:
{raw output prefix}/{upload_prefix}/{file_name}
Parameter |
Type |
lpath |
typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO] |
upload_prefix |
typing.Optional[str] |
file_name |
typing.Optional[str] |
read_chunk_size_bytes |
int |
encoding |
str |
skip_raw_data_prefix |
bool |
kwargs |
**kwargs |
recursive_paths()
def recursive_paths(
f: str,
t: str,
):
Parameter |
Type |
f |
str |
t |
str |
sep()
def sep(
file_system: typing.Optional[fsspec.spec.AbstractFileSystem],
):
Parameter |
Type |
file_system |
typing.Optional[fsspec.spec.AbstractFileSystem] |
def strip_file_header(
path: str,
trim_trailing_sep: bool,
):
Drops file:// if it exists from the file
Parameter |
Type |
path |
str |
trim_trailing_sep |
bool |
upload()
def upload(
file_path: str,
to_path: str,
kwargs,
):
Parameter |
Type |
file_path |
str |
to_path |
str |
kwargs |
**kwargs |
upload_directory()
def upload_directory(
local_path: str,
remote_path: str,
kwargs,
):
Parameter |
Type |
local_path |
str |
remote_path |
str |
kwargs |
**kwargs |
Properties
Property |
Type |
Description |
data_config |
|
|
local_access |
|
|
local_sandbox_dir |
|
|
raw_output_fs |
|
|
raw_output_prefix |
|
|
flytekit.extend.IgnoreOutputs
This exception should be used to indicate that the outputs generated by this can be safely ignored.
This is useful in case of distributed training or peer-to-peer parallel algorithms.
flytekit.extend.Image
Image is a structured wrapper for task container images used in object serialization.
Attributes:
name (str): A user-provided name to identify this image.
fqn (str): Fully qualified image name. This consists of
#. a registry location
#. a username
#. a repository name
For example: hostname/username/reponame
tag (str): Optional tag used to specify which version of an image to pull
digest (str): Optional digest used to specify which version of an image to pull
def Image(
name: str,
fqn: str,
tag: Optional[str],
digest: Optional[str],
):
Parameter |
Type |
name |
str |
fqn |
str |
tag |
Optional[str] |
digest |
Optional[str] |
Methods
from_dict()
def from_dict(
kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
infer_missing,
):
Parameter |
Type |
kvs |
typing.Union[dict, list, str, int, float, bool, NoneType] |
infer_missing |
|
from_json()
def from_json(
s: typing.Union[str, bytes, bytearray],
parse_float,
parse_int,
parse_constant,
infer_missing,
kw,
):
Parameter |
Type |
s |
typing.Union[str, bytes, bytearray] |
parse_float |
|
parse_int |
|
parse_constant |
|
infer_missing |
|
kw |
|
look_up_image_info()
def look_up_image_info(
name: str,
image_identifier: str,
allow_no_tag_or_digest: bool,
):
Creates an Image
object from an image identifier string or a path to an ImageSpec yaml file.
This function is used when registering tasks/workflows with Admin. When using
the canonical Python-based development cycle, the version that is used to
register workflows and tasks with Admin should be the version of the image
itself, which should ideally be something unique like the git revision SHA1 of
the latest commit.
Parameter |
Type |
name |
str |
image_identifier |
str |
allow_no_tag_or_digest |
bool |
schema()
def schema(
infer_missing: bool,
only,
exclude,
many: bool,
context,
load_only,
dump_only,
partial: bool,
unknown,
):
Parameter |
Type |
infer_missing |
bool |
only |
|
exclude |
|
many |
bool |
context |
|
load_only |
|
dump_only |
|
partial |
bool |
unknown |
|
to_dict()
def to_dict(
encode_json,
):
Parameter |
Type |
encode_json |
|
to_json()
def to_json(
skipkeys: bool,
ensure_ascii: bool,
check_circular: bool,
allow_nan: bool,
indent: typing.Union[int, str, NoneType],
separators: typing.Tuple[str, str],
default: typing.Callable,
sort_keys: bool,
kw,
):
Parameter |
Type |
skipkeys |
bool |
ensure_ascii |
bool |
check_circular |
bool |
allow_nan |
bool |
indent |
typing.Union[int, str, NoneType] |
separators |
typing.Tuple[str, str] |
default |
typing.Callable |
sort_keys |
bool |
kw |
|
Properties
Property |
Type |
Description |
full |
|
|
version |
|
|
flytekit.extend.ImageConfig
We recommend you to use ImageConfig.auto(img_name=None) to create an ImageConfig.
For example, ImageConfig.auto(img_name=““ghcr.io/flyteorg/flytecookbook:v1.0.0"”) will create an ImageConfig.
ImageConfig holds available images which can be used at registration time. A default image can be specified
along with optional additional images. Each image in the config must have a unique name.
Attributes:
default_image (Optional[Image]): The default image to be used as a container for task serialization.
images (List[Image]): Optional, additional images which can be used in task container definitions.
def ImageConfig(
default_image: Optional[Image],
images: Optional[List[Image]],
):
Parameter |
Type |
default_image |
Optional[Image] |
images |
Optional[List[Image]] |
Methods
auto()
def auto(
config_file: typing.Union[str, ConfigFile, None],
img_name: Optional[str],
):
Reads from config file or from img_name
Note that this function does not take into account the flytekit default images (see the Dockerfiles at the
base of this repo). To pick those up, see the auto_default_image function..
Parameter |
Type |
config_file |
typing.Union[str, ConfigFile, None] |
img_name |
Optional[str] |
auto_default_image()
create_from()
def create_from(
default_image: Optional[Image],
other_images: typing.Optional[typing.List[Image]],
):
Parameter |
Type |
default_image |
Optional[Image] |
other_images |
typing.Optional[typing.List[Image]] |
find_image()
Return an image, by name, if it exists.
from_dict()
def from_dict(
kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
infer_missing,
):
Parameter |
Type |
kvs |
typing.Union[dict, list, str, int, float, bool, NoneType] |
infer_missing |
|
from_images()
def from_images(
default_image: str,
m: typing.Optional[typing.Dict[str, str]],
):
Allows you to programmatically create an ImageConfig. Usually only the default_image is required, unless
your workflow uses multiple images
.. code:: python
ImageConfig.from_dict(
“ghcr.io/flyteorg/flytecookbook:v1.0.0”,
{
“spark”: “ghcr.io/flyteorg/myspark:…”,
“other”: “…”,
}
)
urn:
Parameter |
Type |
default_image |
str |
m |
typing.Optional[typing.Dict[str, str]] |
from_json()
def from_json(
s: typing.Union[str, bytes, bytearray],
parse_float,
parse_int,
parse_constant,
infer_missing,
kw,
):
Parameter |
Type |
s |
typing.Union[str, bytes, bytearray] |
parse_float |
|
parse_int |
|
parse_constant |
|
infer_missing |
|
kw |
|
schema()
def schema(
infer_missing: bool,
only,
exclude,
many: bool,
context,
load_only,
dump_only,
partial: bool,
unknown,
):
Parameter |
Type |
infer_missing |
bool |
only |
|
exclude |
|
many |
bool |
context |
|
load_only |
|
dump_only |
|
partial |
bool |
unknown |
|
to_dict()
def to_dict(
encode_json,
):
Parameter |
Type |
encode_json |
|
to_json()
def to_json(
skipkeys: bool,
ensure_ascii: bool,
check_circular: bool,
allow_nan: bool,
indent: typing.Union[int, str, NoneType],
separators: typing.Tuple[str, str],
default: typing.Callable,
sort_keys: bool,
kw,
):
Parameter |
Type |
skipkeys |
bool |
ensure_ascii |
bool |
check_circular |
bool |
allow_nan |
bool |
indent |
typing.Union[int, str, NoneType] |
separators |
typing.Tuple[str, str] |
default |
typing.Callable |
sort_keys |
bool |
kw |
|
validate_image()
def validate_image(
_: typing.Any,
param: str,
values: tuple,
):
Validates the image to match the standard format. Also validates that only one default image
is provided. a default image, is one that is specified as default=<image_uri>
or just <image_uri>
. All
other images should be provided with a name, in the format name=<image_uri>
This method can be used with the
CLI
Parameter |
Type |
_ |
typing.Any |
param |
str |
values |
tuple |
flytekit.extend.Interface
A Python native interface object, like inspect.signature but simpler.
def Interface(
inputs: Union[Optional[Dict[str, Type]], Optional[Dict[str, Tuple[Type, Any]]]],
outputs: Union[Optional[Dict[str, Type]], Optional[Dict[str, Optional[Type]]]],
output_tuple_name: Optional[str],
docstring: Optional[Docstring],
):
Parameter |
Type |
inputs |
Union[Optional[Dict[str, Type]], Optional[Dict[str, Tuple[Type, Any]]]] |
outputs |
Union[Optional[Dict[str, Type]], Optional[Dict[str, Optional[Type]]]] |
output_tuple_name |
Optional[str] |
docstring |
Optional[Docstring] |
Methods
Method |
Description |
remove_inputs() |
This method is useful in removing some variables from the Flyte backend inputs specification, as these are |
with_inputs() |
Use this to add additional inputs to the interface |
with_outputs() |
This method allows addition of extra outputs are expected from a task specification |
def remove_inputs(
vars: Optional[List[str]],
):
This method is useful in removing some variables from the Flyte backend inputs specification, as these are
implicit local only inputs or will be supplied by the library at runtime. For example, spark-session etc
It creates a new instance of interface with the requested variables removed
Parameter |
Type |
vars |
Optional[List[str]] |
def with_inputs(
extra_inputs: Dict[str, Type],
):
Use this to add additional inputs to the interface. This is useful for adding additional implicit inputs that
are added without the user requesting for them
Parameter |
Type |
extra_inputs |
Dict[str, Type] |
with_outputs()
def with_outputs(
extra_outputs: Dict[str, Type],
):
This method allows addition of extra outputs are expected from a task specification
Parameter |
Type |
extra_outputs |
Dict[str, Type] |
Properties
Property |
Type |
Description |
default_inputs_as_kwargs |
|
|
docstring |
|
|
inputs |
|
|
inputs_with_defaults |
|
|
output_names |
|
|
output_tuple |
|
|
output_tuple_name |
|
|
outputs |
|
|
flytekit.extend.Promise
This object is a wrapper and exists for three main reasons. Let’s assume we’re dealing with a task like ::
@task
def t1() -> (int, str): …
#. Handling the duality between compilation and local execution - when the task function is run in a local execution
mode inside a workflow function, a Python integer and string are produced. When the task is being compiled as
part of the workflow, the task call creates a Node instead, and the task returns two Promise objects that
point to that Node.
#. One needs to be able to call ::
x = t1().with_overrides(…)
If the task returns an integer or a (int, str)
tuple like t1
above, calling with_overrides
on the
result would throw an error. This Promise object adds that.
#. Assorted handling for conditionals.
def Promise(
var: str,
val: Union[NodeOutput, _literals_models.Literal],
type: typing.Optional[_type_models.LiteralType],
):
Parameter |
Type |
var |
str |
val |
Union[NodeOutput, _literals_models.Literal] |
type |
typing.Optional[_type_models.LiteralType] |
Methods
deepcopy()
eval()
is_()
is_false()
is_none()
is_true()
with_overrides()
def with_overrides(
node_name: Optional[str],
aliases: Optional[Dict[str, str]],
requests: Optional[Resources],
limits: Optional[Resources],
timeout: Optional[Union[int, datetime.timedelta, object]],
retries: Optional[int],
interruptible: Optional[bool],
name: Optional[str],
task_config: Optional[Any],
container_image: Optional[str],
accelerator: Optional[BaseAccelerator],
cache: Optional[bool],
cache_version: Optional[str],
cache_serialize: Optional[bool],
args,
kwargs,
):
Parameter |
Type |
node_name |
Optional[str] |
aliases |
Optional[Dict[str, str]] |
requests |
Optional[Resources] |
limits |
Optional[Resources] |
timeout |
Optional[Union[int, datetime.timedelta, object]] |
retries |
Optional[int] |
interruptible |
Optional[bool] |
name |
Optional[str] |
task_config |
Optional[Any] |
container_image |
Optional[str] |
accelerator |
Optional[BaseAccelerator] |
cache |
Optional[bool] |
cache_version |
Optional[str] |
cache_serialize |
Optional[bool] |
args |
*args |
kwargs |
**kwargs |
with_var()
def with_var(
new_var: str,
):
Parameter |
Type |
new_var |
str |
Properties
Property |
Type |
Description |
attr_path |
|
|
is_ready |
|
|
ref |
|
|
val |
|
|
var |
|
|
flytekit.extend.PythonCustomizedContainerTask
Please take a look at the comments for :py:classflytekit.extend.ExecutableTemplateShimTask
as well. This class
should be subclassed and a custom Executor provided as a default to this parent class constructor
when building a new external-container flytekit-only plugin.
This class provides authors of new task types the basic scaffolding to create task-template based tasks. In order
to write such a task, authors need to
- subclass the
ShimTaskExecutor
class and override the execute_from_model
function. This function is
where all the business logic should go. Keep in mind though that you, the plugin author, will not have access
to anything that’s not serialized within the TaskTemplate
which is why you’ll also need to
- subclass this class, and override the
get_custom
function to include all the information the executor
will need to run.
- Also pass the executor you created as the
executor_type
argument of this class’s constructor.
Keep in mind that the total size of the TaskTemplate
still needs to be small, since these will be accessed
frequently by the Flyte engine.
def PythonCustomizedContainerTask(
name: str,
task_config: TC,
container_image: str,
executor_type: Type[ShimTaskExecutor],
task_resolver: Optional[TaskTemplateResolver],
task_type,
requests: Optional[Resources],
limits: Optional[Resources],
environment: Optional[Dict[str, str]],
secret_requests: Optional[List[Secret]],
kwargs,
):
Parameter |
Type |
name |
str |
task_config |
TC |
container_image |
str |
executor_type |
Type[ShimTaskExecutor] |
task_resolver |
Optional[TaskTemplateResolver] |
task_type |
|
requests |
Optional[Resources] |
limits |
Optional[Resources] |
environment |
Optional[Dict[str, str]] |
secret_requests |
Optional[List[Secret]] |
kwargs |
**kwargs |
Methods
Method |
Description |
compile() |
Generates a node that encapsulates this task in a workflow definition |
construct_node_metadata() |
Used when constructing the node that encapsulates this task as part of a broader workflow definition |
dispatch_execute() |
This function is largely similar to the base PythonTask, with the exception that we have to infer the Python |
execute() |
Rather than running here, send everything to the executor |
find_lhs() |
None |
get_command() |
None |
get_config() |
Returns the task config as a serializable dictionary |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte |
get_image() |
None |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte |
get_type_for_input_var() |
Returns the python type for an input variable by name |
get_type_for_output_var() |
Returns the python type for the specified output variable by name |
local_execute() |
This function is used only in the local execution path and is responsible for calling dispatch execute |
local_execution_mode() |
None |
post_execute() |
This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask |
pre_execute() |
This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution |
serialize_to_model() |
None |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Generates a node that encapsulates this task in a workflow definition.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute()
def dispatch_execute(
ctx: FlyteContext,
input_literal_map: _literal_models.LiteralMap,
):
This function is largely similar to the base PythonTask, with the exception that we have to infer the Python
interface before executing. Also, we refer to self.task_template
rather than just self
similar to task
classes that derive from the base PythonTask
.
Parameter |
Type |
ctx |
FlyteContext |
input_literal_map |
_literal_models.LiteralMap |
execute()
Rather than running here, send everything to the executor.
Parameter |
Type |
kwargs |
**kwargs |
find_lhs()
get_command()
def get_command(
settings: SerializationSettings,
):
Parameter |
Type |
settings |
SerializationSettings |
get_config()
def get_config(
settings: SerializationSettings,
):
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom
defined for this task.
Parameter |
Type |
settings |
SerializationSettings |
get_container()
def get_container(
settings: SerializationSettings,
):
Returns the container definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
SerializationSettings |
get_custom()
def get_custom(
settings: SerializationSettings,
):
Return additional plugin-specific custom data (if any) as a serializable dictionary.
Parameter |
Type |
settings |
SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: flytekit.configuration.SerializationSettings,
):
Returns the extended resources to allocate to the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_image()
def get_image(
settings: SerializationSettings,
):
Parameter |
Type |
settings |
SerializationSettings |
Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: flytekit.configuration.SerializationSettings,
):
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
):
Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
def get_type_for_input_var(
k: str,
v: typing.Any,
):
Returns the python type for an input variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
):
Returns the python type for the specified output variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
This function is used only in the local execution path and is responsible for calling dispatch execute.
Use this function when calling a task with native values (or Promises containing Flyte literals derived from
Python native values).
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
post_execute()
def post_execute(
_: Optional[ExecutionParameters],
rval: Any,
):
This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask.
Parameter |
Type |
_ |
Optional[ExecutionParameters] |
rval |
Any |
pre_execute()
def pre_execute(
user_params: Optional[ExecutionParameters],
):
This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask.
Parameter |
Type |
user_params |
Optional[ExecutionParameters] |
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
serialize_to_model()
def serialize_to_model(
settings: SerializationSettings,
):
Parameter |
Type |
settings |
SerializationSettings |
Properties
Property |
Type |
Description |
container_image |
|
|
deck_fields |
|
|
disable_deck |
|
|
docs |
|
|
enable_deck |
|
|
environment |
|
|
executor |
|
|
executor_type |
|
|
instantiated_in |
|
|
interface |
|
|
lhs |
|
|
location |
|
|
metadata |
|
|
name |
|
|
python_interface |
|
|
resources |
|
|
security_context |
|
|
task_config |
|
|
task_resolver |
|
|
task_template |
|
|
task_type |
|
|
task_type_version |
|
|
flytekit.extend.PythonTask
Base Class for all Tasks with a Python native Interface
. This should be directly used for task types, that do
not have a python function to be executed. Otherwise refer to :py:class:flytekit.PythonFunctionTask
.
def PythonTask(
task_type: str,
name: str,
task_config: typing.Optional[~T],
interface: typing.Optional[flytekit.core.interface.Interface],
environment: typing.Optional[typing.Dict[str, str]],
disable_deck: typing.Optional[bool],
enable_deck: typing.Optional[bool],
deck_fields: typing.Optional[typing.Tuple[flytekit.deck.deck.DeckField, ...]],
kwargs,
):
Parameter |
Type |
task_type |
str |
name |
str |
task_config |
typing.Optional[~T] |
interface |
typing.Optional[flytekit.core.interface.Interface] |
environment |
typing.Optional[typing.Dict[str, str]] |
disable_deck |
typing.Optional[bool] |
enable_deck |
typing.Optional[bool] |
deck_fields |
typing.Optional[typing.Tuple[flytekit.deck.deck.DeckField, ...]] |
kwargs |
**kwargs |
Methods
Method |
Description |
compile() |
Generates a node that encapsulates this task in a workflow definition |
construct_node_metadata() |
Used when constructing the node that encapsulates this task as part of a broader workflow definition |
dispatch_execute() |
This method translates Flyte’s Type system based input values and invokes the actual call to the executor |
execute() |
This method will be invoked to execute the task |
find_lhs() |
None |
get_config() |
Returns the task config as a serializable dictionary |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte |
get_type_for_input_var() |
Returns the python type for an input variable by name |
get_type_for_output_var() |
Returns the python type for the specified output variable by name |
local_execute() |
This function is used only in the local execution path and is responsible for calling dispatch execute |
local_execution_mode() |
None |
post_execute() |
Post execute is called after the execution has completed, with the user_params and can be used to clean-up, |
pre_execute() |
This is the method that will be invoked directly before executing the task method and before all the inputs |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Generates a node that encapsulates this task in a workflow definition.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
This method translates Flyte’s Type system based input values and invokes the actual call to the executor
This method is also invoked during runtime.
VoidPromise
is returned in the case when the task itself declares no outputs.
Literal Map
is returned when the task returns either one more outputs in the declaration. Individual outputs
may be none
DynamicJobSpec
is returned when a dynamic workflow is executed
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
This method will be invoked to execute the task.
Parameter |
Type |
kwargs |
**kwargs |
find_lhs()
get_config()
def get_config(
settings: flytekit.configuration.SerializationSettings,
):
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom
defined for this task.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_container()
def get_container(
settings: flytekit.configuration.SerializationSettings,
):
Returns the container definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
):
Return additional plugin-specific custom data (if any) as a serializable dictionary.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: flytekit.configuration.SerializationSettings,
):
Returns the extended resources to allocate to the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: flytekit.configuration.SerializationSettings,
):
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
):
Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
def get_type_for_input_var(
k: str,
v: typing.Any,
):
Returns the python type for an input variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
):
Returns the python type for the specified output variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
This function is used only in the local execution path and is responsible for calling dispatch execute.
Use this function when calling a task with native values (or Promises containing Flyte literals derived from
Python native values).
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
post_execute()
def post_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
rval: typing.Any,
):
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):
This is the method that will be invoked directly before executing the task method and before all the inputs
are converted. One particular case where this is useful is if the context is to be modified for the user process
to get some user space parameters. This also ensures that things like SparkSession are already correctly
setup before the type transformers are called
This should return either the same context of the mutated context
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property |
Type |
Description |
deck_fields |
|
|
disable_deck |
|
|
docs |
|
|
enable_deck |
|
|
environment |
|
|
instantiated_in |
|
|
interface |
|
|
lhs |
|
|
location |
|
|
metadata |
|
|
name |
|
|
python_interface |
|
|
security_context |
|
|
task_config |
|
|
task_type |
|
|
task_type_version |
|
|
flytekit.extend.SQLTask
Base task types for all SQL tasks. See :py:class:flytekit.extras.sqlite3.task.SQLite3Task
and :py:class:flytekitplugins.athena.task.AthenaTask
for examples of how to use it as a base class.
.. autoclass:: flytekit.extras.sqlite3.task.SQLite3Task
:noindex:
def SQLTask(
name: str,
query_template: str,
task_config: typing.Optional[~T],
task_type,
inputs: typing.Optional[typing.Dict[str, typing.Tuple[typing.Type, typing.Any]]],
metadata: typing.Optional[flytekit.core.base_task.TaskMetadata],
outputs: typing.Optional[typing.Dict[str, typing.Type]],
kwargs,
):
This SQLTask should mostly just be used as a base class for other SQL task types and should not be used
directly. See :py:class:flytekit.extras.sqlite3.task.SQLite3Task
Parameter |
Type |
name |
str |
query_template |
str |
task_config |
typing.Optional[~T] |
task_type |
|
inputs |
typing.Optional[typing.Dict[str, typing.Tuple[typing.Type, typing.Any]]] |
metadata |
typing.Optional[flytekit.core.base_task.TaskMetadata] |
outputs |
typing.Optional[typing.Dict[str, typing.Type]] |
kwargs |
**kwargs |
Methods
Method |
Description |
compile() |
Generates a node that encapsulates this task in a workflow definition |
construct_node_metadata() |
Used when constructing the node that encapsulates this task as part of a broader workflow definition |
dispatch_execute() |
This method translates Flyte’s Type system based input values and invokes the actual call to the executor |
execute() |
This method will be invoked to execute the task |
find_lhs() |
None |
get_config() |
Returns the task config as a serializable dictionary |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte |
get_query() |
None |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte |
get_type_for_input_var() |
Returns the python type for an input variable by name |
get_type_for_output_var() |
Returns the python type for the specified output variable by name |
interpolate_query() |
This function will fill in the query template with the provided kwargs and return the interpolated query |
local_execute() |
This function is used only in the local execution path and is responsible for calling dispatch execute |
local_execution_mode() |
None |
post_execute() |
Post execute is called after the execution has completed, with the user_params and can be used to clean-up, |
pre_execute() |
This is the method that will be invoked directly before executing the task method and before all the inputs |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Generates a node that encapsulates this task in a workflow definition.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
This method translates Flyte’s Type system based input values and invokes the actual call to the executor
This method is also invoked during runtime.
VoidPromise
is returned in the case when the task itself declares no outputs.
Literal Map
is returned when the task returns either one more outputs in the declaration. Individual outputs
may be none
DynamicJobSpec
is returned when a dynamic workflow is executed
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
This method will be invoked to execute the task.
Parameter |
Type |
kwargs |
**kwargs |
find_lhs()
get_config()
def get_config(
settings: flytekit.configuration.SerializationSettings,
):
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom
defined for this task.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_container()
def get_container(
settings: flytekit.configuration.SerializationSettings,
):
Returns the container definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
):
Return additional plugin-specific custom data (if any) as a serializable dictionary.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: flytekit.configuration.SerializationSettings,
):
Returns the extended resources to allocate to the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: flytekit.configuration.SerializationSettings,
):
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_query()
def get_query(
kwargs,
):
Parameter |
Type |
kwargs |
**kwargs |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
):
Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
def get_type_for_input_var(
k: str,
v: typing.Any,
):
Returns the python type for an input variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
):
Returns the python type for the specified output variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
interpolate_query()
def interpolate_query(
query_template,
kwargs,
):
This function will fill in the query template with the provided kwargs and return the interpolated query.
Please note that when SQL tasks run in Flyte, this step is done by the task executor.
Parameter |
Type |
query_template |
|
kwargs |
**kwargs |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
This function is used only in the local execution path and is responsible for calling dispatch execute.
Use this function when calling a task with native values (or Promises containing Flyte literals derived from
Python native values).
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
post_execute()
def post_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
rval: typing.Any,
):
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):
This is the method that will be invoked directly before executing the task method and before all the inputs
are converted. One particular case where this is useful is if the context is to be modified for the user process
to get some user space parameters. This also ensures that things like SparkSession are already correctly
setup before the type transformers are called
This should return either the same context of the mutated context
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property |
Type |
Description |
deck_fields |
|
|
disable_deck |
|
|
docs |
|
|
enable_deck |
|
|
environment |
|
|
instantiated_in |
|
|
interface |
|
|
lhs |
|
|
location |
|
|
metadata |
|
|
name |
|
|
python_interface |
|
|
query_template |
|
|
security_context |
|
|
task_config |
|
|
task_type |
|
|
task_type_version |
|
|
flytekit.extend.SecretsManager
This provides a secrets resolution logic at runtime.
The resolution order is
- Try env var first. The env var should have the configuration.SECRETS_ENV_PREFIX. The env var will be all upper
cased
- If not then try the file where the name matches lower case
configuration.SECRETS_DEFAULT_DIR/<group>/configuration.SECRETS_FILE_PREFIX<key>
All configuration values can always be overridden by injecting an environment variable
def SecretsManager(
secrets_cfg: typing.Optional[SecretsConfig],
):
Parameter |
Type |
secrets_cfg |
typing.Optional[SecretsConfig] |
Methods
Method |
Description |
get() |
Retrieves a secret using the resolution order -> Env followed by file |
get_secrets_env_var() |
Returns a string that matches the ENV Variable to look for the secrets |
get_secrets_file() |
Returns a path that matches the file to look for the secrets |
get()
def get(
group: Optional[str],
key: Optional[str],
group_version: Optional[str],
encode_mode: str,
):
Retrieves a secret using the resolution order -> Env followed by file. If not found raises a ValueError
param encode_mode, defines the mode to open files, it can either be “r” to read file, or “rb” to read binary file
Parameter |
Type |
group |
Optional[str] |
key |
Optional[str] |
group_version |
Optional[str] |
encode_mode |
str |
get_secrets_env_var()
def get_secrets_env_var(
group: Optional[str],
key: Optional[str],
group_version: Optional[str],
):
Returns a string that matches the ENV Variable to look for the secrets
Parameter |
Type |
group |
Optional[str] |
key |
Optional[str] |
group_version |
Optional[str] |
get_secrets_file()
def get_secrets_file(
group: Optional[str],
key: Optional[str],
group_version: Optional[str],
):
Returns a path that matches the file to look for the secrets
Parameter |
Type |
group |
Optional[str] |
key |
Optional[str] |
group_version |
Optional[str] |
flytekit.extend.SerializationSettings
These settings are provided while serializing a workflow and task, before registration. This is required to get
runtime information at serialization time, as well as some defaults.
Attributes:
project (str): The project (if any) with which to register entities under.
domain (str): The domain (if any) with which to register entities under.
version (str): The version (if any) with which to register entities under.
image_config (ImageConfig): The image config used to define task container images.
env (Optional[Dict[str, str]]): Environment variables injected into task container definitions.
flytekit_virtualenv_root (Optional[str]): During out of container serialize the absolute path of the flytekit
virtualenv at serialization time won’t match the in-container value at execution time. This optional value
is used to provide the in-container virtualenv path
python_interpreter (Optional[str]): The python executable to use. This is used for spark tasks in out of
container execution.
entrypoint_settings (Optional[EntrypointSettings]): Information about the command, path and version of the
entrypoint program.
fast_serialization_settings (Optional[FastSerializationSettings]): If the code is being serialized so that it
can be fast registered (and thus omit building a Docker image) this object contains additional parameters
for serialization.
source_root (Optional[str]): The root directory of the source code.
def SerializationSettings(
image_config: ImageConfig,
project: typing.Optional[str],
domain: typing.Optional[str],
version: typing.Optional[str],
env: Optional[Dict[str, str]],
git_repo: Optional[str],
python_interpreter: str,
flytekit_virtualenv_root: Optional[str],
fast_serialization_settings: Optional[FastSerializationSettings],
source_root: Optional[str],
):
Parameter |
Type |
image_config |
ImageConfig |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
version |
typing.Optional[str] |
env |
Optional[Dict[str, str]] |
git_repo |
Optional[str] |
python_interpreter |
str |
flytekit_virtualenv_root |
Optional[str] |
fast_serialization_settings |
Optional[FastSerializationSettings] |
source_root |
Optional[str] |
Methods
default_entrypoint_settings()
def default_entrypoint_settings(
interpreter_path: str,
):
Assumes the entrypoint is installed in a virtual-environment where the interpreter is
Parameter |
Type |
interpreter_path |
str |
for_image()
def for_image(
image: str,
version: str,
project: str,
domain: str,
python_interpreter_path: str,
):
Parameter |
Type |
image |
str |
version |
str |
project |
str |
domain |
str |
python_interpreter_path |
str |
from_dict()
def from_dict(
kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
infer_missing,
):
Parameter |
Type |
kvs |
typing.Union[dict, list, str, int, float, bool, NoneType] |
infer_missing |
|
from_json()
def from_json(
s: typing.Union[str, bytes, bytearray],
parse_float,
parse_int,
parse_constant,
infer_missing,
kw,
):
Parameter |
Type |
s |
typing.Union[str, bytes, bytearray] |
parse_float |
|
parse_int |
|
parse_constant |
|
infer_missing |
|
kw |
|
from_transport()
def from_transport(
s: str,
):
new_builder()
Creates a SerializationSettings.Builder
that copies the existing serialization settings parameters and
allows for customization.
schema()
def schema(
infer_missing: bool,
only,
exclude,
many: bool,
context,
load_only,
dump_only,
partial: bool,
unknown,
):
Parameter |
Type |
infer_missing |
bool |
only |
|
exclude |
|
many |
bool |
context |
|
load_only |
|
dump_only |
|
partial |
bool |
unknown |
|
should_fast_serialize()
def should_fast_serialize()
Whether or not the serialization settings specify that entities should be serialized for fast registration.
to_dict()
def to_dict(
encode_json,
):
Parameter |
Type |
encode_json |
|
to_json()
def to_json(
skipkeys: bool,
ensure_ascii: bool,
check_circular: bool,
allow_nan: bool,
indent: typing.Union[int, str, NoneType],
separators: typing.Tuple[str, str],
default: typing.Callable,
sort_keys: bool,
kw,
):
Parameter |
Type |
skipkeys |
bool |
ensure_ascii |
bool |
check_circular |
bool |
allow_nan |
bool |
indent |
typing.Union[int, str, NoneType] |
separators |
typing.Tuple[str, str] |
default |
typing.Callable |
sort_keys |
bool |
kw |
|
venv_root_from_interpreter()
def venv_root_from_interpreter(
interpreter_path: str,
):
Computes the path of the virtual environment root, based on the passed in python interpreter path
for example /opt/venv/bin/python3 -> /opt/venv
Parameter |
Type |
interpreter_path |
str |
with_serialized_context()
def with_serialized_context()
Use this method to create a new SerializationSettings that has an environment variable set with the SerializedContext
This is useful in transporting SerializedContext to serialized and registered tasks.
The setting will be available in the env
field with the key SERIALIZED_CONTEXT_ENV_VAR
:return: A newly constructed SerializationSettings, or self, if it already has the serializationSettings
Properties
Property |
Type |
Description |
entrypoint_settings |
|
|
serialized_context |
|
|
flytekit.extend.ShimTaskExecutor
Please see the notes for the metaclass above first.
This functionality has two use-cases currently,
- Keep track of naming for non-function
PythonAutoContainerTasks
. That is, things like the
:py:class:flytekit.extras.sqlite3.task.SQLite3Task
task.
- Task resolvers, because task resolvers are instances of :py:class:
flytekit.core.python_auto_container.TaskResolverMixin
classes, not the classes themselves, which means we need to look on the left hand side of them to see how to
find them at task execution time.
def ShimTaskExecutor(
args,
kwargs,
):
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
Methods
Method |
Description |
execute_from_model() |
This function must be overridden and is where all the business logic for running a task should live |
find_lhs() |
None |
execute_from_model()
def execute_from_model(
tt: _task_model.TaskTemplate,
kwargs,
):
This function must be overridden and is where all the business logic for running a task should live. Keep in
mind that you’re only working with the TaskTemplate
. You won’t have access to any information in the task
that wasn’t serialized into the template.
Parameter |
Type |
tt |
_task_model.TaskTemplate |
kwargs |
**kwargs |
find_lhs()
Properties
Property |
Type |
Description |
instantiated_in |
|
|
lhs |
|
|
location |
|
|
flytekit.extend.TaskPlugins
This is the TaskPlugins factory for task types that are derivative of PythonFunctionTask.
Every task that the user wishes to use should be available in this factory.
Usage
.. code-block:: python
TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type)
config_object_type is any class that will be passed to the plugin_object as task_config
Plugin_object_type is a derivative of PythonFunctionTask
Examples of available task plugins include different query-based plugins such as
:py:class:flytekitplugins.athena.task.AthenaTask
and :py:class:flytekitplugins.hive.task.HiveTask
, kubeflow
operators like :py:class:plugins.kfpytorch.flytekitplugins.kfpytorch.task.PyTorchFunctionTask
and
:py:class:plugins.kftensorflow.flytekitplugins.kftensorflow.task.TensorflowFunctionTask
, and generic plugins like
:py:class:flytekitplugins.pod.task.PodFunctionTask
which doesn’t integrate with third party tools or services.
The task_config
is different for every task plugin type. This is filled out by users when they define a task to
specify plugin-specific behavior and features. For example, with a query type task plugin, the config might store
information related to which database to query.
The plugin_object_type
can be used to customize execution behavior and task serialization properties in tandem
with the task_config
.
Methods
find_pythontask_plugin()
def find_pythontask_plugin(
plugin_config_type: type,
):
Returns a PluginObjectType if found or returns the base PythonFunctionTask
Parameter |
Type |
plugin_config_type |
type |
register_pythontask_plugin()
def register_pythontask_plugin(
plugin_config_type: type,
plugin: Type[PythonFunctionTask],
):
Use this method to register a new plugin into Flytekit. Usage ::
.. code-block:: python
TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type)
config_object_type is any class that will be passed to the plugin_object as task_config
Plugin_object_type is a derivative of PythonFunctionTask
Parameter |
Type |
plugin_config_type |
type |
plugin |
Type[PythonFunctionTask] |
flytekit.extend.TaskResolverMixin
Flytekit tasks interact with the Flyte platform very, very broadly in two steps. They need to be uploaded to Admin,
and then they are run by the user upon request (either as a single task execution or as part of a workflow). In any
case, at execution time, for most tasks (that is those that generate a container target) the container image
containing the task needs to be spun up again at which point the container needs to know which task it’s supposed
to run and how to rehydrate the task object.
For example, the serialization of a simple task ::
in repo_root/workflows/example.py
@task
def t1(…) -> …: …
might result in a container with arguments like ::
pyflyte-execute –inputs s3://path/inputs.pb –output-prefix s3://outputs/location –raw-output-data-prefix /tmp/data –resolver flytekit.core.python_auto_container.default_task_resolver – task-module repo_root.workflows.example task-name t1
At serialization time, the container created for the task will start out automatically with the pyflyte-execute
bit, along with the requisite input/output args and the offloaded data prefix. Appended to that will be two things,
#. the location
of the task’s task resolver, followed by two dashes, followed by
#. the arguments provided by calling the loader_args
function below.
The default_task_resolver
declared below knows that
- When
loader_args
is called on a task, to look up the module the task is in, and the name of the task (the
key of the task in the module, either the function name, or the variable it was assigned to).
- When
load_task
is called, it interprets the first part of the command as the module to call
importlib.import_module
on, and then looks for a key t1
.
This is just the default behavior. Users should feel free to implement their own resolvers.
Methods
Method |
Description |
get_all_tasks() |
Future proof method |
load_task() |
Given the set of identifier keys, should return one Python Task or raise an error if not found |
loader_args() |
Return a list of strings that can help identify the parameter Task |
name() |
None |
task_name() |
Overridable function that can optionally return a custom name for a given task |
get_all_tasks()
Future proof method. Just making it easy to access all tasks (Not required today as we auto register them)
load_task()
def load_task(
loader_args: typing.List[str],
):
Given the set of identifier keys, should return one Python Task or raise an error if not found
Parameter |
Type |
loader_args |
typing.List[str] |
loader_args()
def loader_args(
settings: flytekit.configuration.SerializationSettings,
t: flytekit.core.base_task.Task,
):
Return a list of strings that can help identify the parameter Task
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
t |
flytekit.core.base_task.Task |
name()
task_name()
def task_name(
t: flytekit.core.base_task.Task,
):
Overridable function that can optionally return a custom name for a given task
Parameter |
Type |
t |
flytekit.core.base_task.Task |
Properties
Property |
Type |
Description |
location |
|
|
flytekit.extend.TypeEngine
Core Extensible TypeEngine of Flytekit. This should be used to extend the capabilities of FlyteKits type system.
Users can implement their own TypeTransformers and register them with the TypeEngine. This will allow special handling
of user objects
Methods
async_to_literal()
def async_to_literal(
ctx: FlyteContext,
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
):
Converts a python value of a given type and expected LiteralType
into a resolved Literal
value.
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
python_type |
Type[T] |
expected |
LiteralType |
async_to_python_value()
def async_to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type,
):
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
Type |
calculate_hash()
def calculate_hash(
python_val: typing.Any,
python_type: Type[T],
):
Parameter |
Type |
python_val |
typing.Any |
python_type |
Type[T] |
dict_to_literal_map()
def dict_to_literal_map(
ctx: FlyteContext,
d: typing.Dict[str, typing.Any],
type_hints: Optional[typing.Dict[str, type]],
):
Parameter |
Type |
ctx |
FlyteContext |
d |
typing.Dict[str, typing.Any] |
type_hints |
Optional[typing.Dict[str, type]] |
dict_to_literal_map_pb()
def dict_to_literal_map_pb(
ctx: FlyteContext,
d: typing.Dict[str, typing.Any],
type_hints: Optional[typing.Dict[str, type]],
):
Parameter |
Type |
ctx |
FlyteContext |
d |
typing.Dict[str, typing.Any] |
type_hints |
Optional[typing.Dict[str, type]] |
def get_available_transformers()
Returns all python types for which transformers are available
def get_transformer(
python_type: Type,
):
Implements a recursive search for the transformer.
Parameter |
Type |
python_type |
Type |
guess_python_type()
def guess_python_type(
flyte_type: LiteralType,
):
Transforms a flyte-specific LiteralType
to a regular python value.
Parameter |
Type |
flyte_type |
LiteralType |
guess_python_types()
def guess_python_types(
flyte_variable_dict: typing.Dict[str, _interface_models.Variable],
):
Transforms a dictionary of flyte-specific Variable
objects to a dictionary of regular python values.
Parameter |
Type |
flyte_variable_dict |
typing.Dict[str, _interface_models.Variable] |
def lazy_import_transformers()
Only load the transformers if needed.
literal_map_to_kwargs()
def literal_map_to_kwargs(
ctx: FlyteContext,
lm: LiteralMap,
python_types: typing.Optional[typing.Dict[str, type]],
literal_types: typing.Optional[typing.Dict[str, _interface_models.Variable]],
):
Parameter |
Type |
ctx |
FlyteContext |
lm |
LiteralMap |
python_types |
typing.Optional[typing.Dict[str, type]] |
literal_types |
typing.Optional[typing.Dict[str, _interface_models.Variable]] |
named_tuple_to_variable_map()
def named_tuple_to_variable_map(
t: typing.NamedTuple,
):
Converts a python-native NamedTuple
to a flyte-specific VariableMap of named literals.
Parameter |
Type |
t |
typing.NamedTuple |
register()
def register(
transformer: TypeTransformer,
additional_types: Optional[typing.List[Type]],
):
This should be used for all types that respond with the right type annotation when you use type(…) function
Parameter |
Type |
transformer |
TypeTransformer |
additional_types |
Optional[typing.List[Type]] |
register_additional_type()
def register_additional_type(
transformer: TypeTransformer[T],
additional_type: Type[T],
override,
):
Parameter |
Type |
transformer |
TypeTransformer[T] |
additional_type |
Type[T] |
override |
|
register_restricted_type()
def register_restricted_type(
name: str,
type: Type[T],
):
Parameter |
Type |
name |
str |
type |
Type[T] |
to_html()
def to_html(
ctx: FlyteContext,
python_val: typing.Any,
expected_python_type: Type[typing.Any],
):
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
expected_python_type |
Type[typing.Any] |
to_literal()
def to_literal(
ctx: FlyteContext,
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
):
The current dance is because we are allowing users to call from an async function, this synchronous
to_literal function, and allowing this to_literal function, to then invoke yet another async function,
namely an async transformer.
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
python_type |
Type[T] |
expected |
LiteralType |
to_literal_checks()
def to_literal_checks(
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
):
Parameter |
Type |
python_val |
typing.Any |
python_type |
Type[T] |
expected |
LiteralType |
to_literal_type()
def to_literal_type(
python_type: Type[T],
):
Converts a python type into a flyte specific LiteralType
Parameter |
Type |
python_type |
Type[T] |
to_python_value()
def to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type,
):
Converts a Literal value with an expected python type into a python value.
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
Type |
unwrap_offloaded_literal()
def unwrap_offloaded_literal(
ctx: FlyteContext,
lv: Literal,
):
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
flytekit.extend.TypeTransformer
Base transformer type that should be implemented for every python native type that can be handled by flytekit
def TypeTransformer(
name: str,
t: Type[T],
enable_type_assertions: bool,
):
Parameter |
Type |
name |
str |
t |
Type[T] |
enable_type_assertions |
bool |
Methods
assert_type()
def assert_type(
t: Type[T],
v: T,
):
Parameter |
Type |
t |
Type[T] |
v |
T |
from_binary_idl()
def from_binary_idl(
binary_idl_object: Binary,
expected_python_type: Type[T],
):
This function primarily handles deserialization for untyped dicts, dataclasses, Pydantic BaseModels, and attribute access.`
For untyped dict, dataclass, and pydantic basemodel:
Life Cycle (Untyped Dict as example):
python val -> msgpack bytes -> binary literal scalar -> msgpack bytes -> python val
(to_literal) (from_binary_idl)
For attribute access:
Life Cycle:
python val -> msgpack bytes -> binary literal scalar -> resolved golang value -> binary literal scalar -> msgpack bytes -> python val
(to_literal) (propeller attribute access) (from_binary_idl)
Parameter |
Type |
binary_idl_object |
Binary |
expected_python_type |
Type[T] |
from_generic_idl()
def from_generic_idl(
generic: Struct,
expected_python_type: Type[T],
):
TODO: Support all Flyte Types.
This is for dataclass attribute access from input created from the Flyte Console.
Note:
- This can be removed in the future when the Flyte Console support generate Binary IDL Scalar as input.
Parameter |
Type |
generic |
Struct |
expected_python_type |
Type[T] |
get_literal_type()
def get_literal_type(
t: Type[T],
):
Converts the python type to a Flyte LiteralType
guess_python_type()
def guess_python_type(
literal_type: LiteralType,
):
Converts the Flyte LiteralType to a python object type.
Parameter |
Type |
literal_type |
LiteralType |
isinstance_generic()
def isinstance_generic(
obj,
generic_alias,
):
Parameter |
Type |
obj |
|
generic_alias |
|
to_html()
def to_html(
ctx: FlyteContext,
python_val: T,
expected_python_type: Type[T],
):
Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
Parameter |
Type |
ctx |
FlyteContext |
python_val |
T |
expected_python_type |
Type[T] |
to_literal()
def to_literal(
ctx: FlyteContext,
python_val: T,
python_type: Type[T],
expected: LiteralType,
):
Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type.
Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these
do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating
what was the mismatch
Parameter |
Type |
ctx |
FlyteContext |
python_val |
T |
python_type |
Type[T] |
expected |
LiteralType |
to_python_value()
def to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type[T],
):
Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
Type[T] |
Properties
Property |
Type |
Description |
is_async |
|
|
name |
|
|
python_type |
|
|
type_assertions_enabled |
|
|