1.15.4.dev2+g3e3ce2426

flytekit.extend

================== Extending Flytekit

.. currentmodule:: flytekit.extend

This package contains things that are useful when extending Flytekit.

.. autosummary:: :nosignatures: :template: custom.rst :toctree: generated/

get_serializable context_manager IgnoreOutputs ExecutionState Image ImageConfig Interface Promise TaskPlugins DictTransformer T TypeEngine TypeTransformer PythonCustomizedContainerTask ExecutableTemplateShimTask ShimTaskExecutor

Directory

Classes

Class Description
ClassStorageTaskResolver Stores tasks inside a class variable.
DictTransformer Transformer that transforms an univariate dictionary Dict[str, T] to a Literal Map or.
ExecutableTemplateShimTask The canonical @task decorated Python function task is pretty simple to reason about.
ExecutionState This is the context that is active when executing a task or a local workflow.
FileAccessProvider This is the class that is available through the FlyteContext and can be used for persisting data to the remote.
Image Image is a structured wrapper for task container images used in object serialization.
ImageConfig We recommend you to use ImageConfig.
Interface A Python native interface object, like inspect.
Promise This object is a wrapper and exists for three main reasons.
PythonCustomizedContainerTask Please take a look at the comments for :py:class`flytekit.
PythonTask Base Class for all Tasks with a Python native Interface.
SQLTask Base task types for all SQL tasks.
SecretsManager This provides a secrets resolution logic at runtime.
SerializationSettings These settings are provided while serializing a workflow and task, before registration.
ShimTaskExecutor Please see the notes for the metaclass above first.
TaskPlugins This is the TaskPlugins factory for task types that are derivative of PythonFunctionTask.
TaskResolverMixin Flytekit tasks interact with the Flyte platform very, very broadly in two steps.
TypeEngine Core Extensible TypeEngine of Flytekit.
TypeTransformer Base transformer type that should be implemented for every python native type that can be handled by flytekit.

Errors

flytekit.extend.ClassStorageTaskResolver

Stores tasks inside a class variable. The class must be inherited from at the point of usage because the task loading process basically relies on the same sequence of things happening.

def ClassStorageTaskResolver(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

Methods

Method Description
add() None
find_lhs() None
get_all_tasks() Future proof method
load_task() Given the set of identifier keys, should return one Python Task or raise an error if not found
loader_args() This is responsible for turning an instance of a task into args that the load_task function can reconstitute
name() None
task_name() Overridable function that can optionally return a custom name for a given task

add()

def add(
    t: flytekit.core.python_auto_container.PythonAutoContainerTask,
):
Parameter Type
t flytekit.core.python_auto_container.PythonAutoContainerTask

find_lhs()

def find_lhs()

get_all_tasks()

def get_all_tasks()

Future proof method. Just making it easy to access all tasks (Not required today as we auto register them)

load_task()

def load_task(
    loader_args: typing.List[str],
):

Given the set of identifier keys, should return one Python Task or raise an error if not found

Parameter Type
loader_args typing.List[str]

loader_args()

def loader_args(
    settings: flytekit.configuration.SerializationSettings,
    t: flytekit.core.python_auto_container.PythonAutoContainerTask,
):

This is responsible for turning an instance of a task into args that the load_task function can reconstitute.

Parameter Type
settings flytekit.configuration.SerializationSettings
t flytekit.core.python_auto_container.PythonAutoContainerTask

name()

def name()

task_name()

def task_name(
    t: flytekit.core.base_task.Task,
):

Overridable function that can optionally return a custom name for a given task

Parameter Type
t flytekit.core.base_task.Task

Properties

Property Type Description
instantiated_in
lhs
location

flytekit.extend.DictTransformer

Transformer that transforms an univariate dictionary Dict[str, T] to a Literal Map or transforms an untyped dictionary to a Binary Scalar Literal with a Struct Literal Type.

def DictTransformer()

Methods

Method Description
assert_type() None
async_to_literal() Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type
async_to_python_value() Converts the given Literal to a Python Type
dict_to_binary_literal() Converts a Python dictionary to a Flyte-specific Literal using MessagePack encoding
dict_to_generic_literal() This is deprecated from flytekit 1
extract_types() None
from_binary_idl() This function primarily handles deserialization for untyped dicts, dataclasses, Pydantic BaseModels, and attribute access
from_generic_idl() TODO: Support all Flyte Types
get_literal_type() Transforms a native python dictionary to a flyte-specific LiteralType
guess_python_type() Converts the Flyte LiteralType to a python object type
is_pickle() None
isinstance_generic() None
to_html() Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
to_literal() Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type
to_python_value() Converts the given Literal to a Python Type

assert_type()

def assert_type(
    t: Type[T],
    v: T,
):
Parameter Type
t Type[T]
v T

async_to_literal()

def async_to_literal(
    ctx: FlyteContext,
    python_val: typing.Any,
    python_type: Type[dict],
    expected: LiteralType,
):

Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch

Parameter Type
ctx FlyteContext
python_val typing.Any
python_type Type[dict]
expected LiteralType

async_to_python_value()

def async_to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: Type[dict],
):

Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised

Parameter Type
ctx FlyteContext
lv Literal
expected_python_type Type[dict]

dict_to_binary_literal()

def dict_to_binary_literal(
    ctx: FlyteContext,
    v: dict,
    python_type: Type[dict],
    allow_pickle: bool,
):

Converts a Python dictionary to a Flyte-specific Literal using MessagePack encoding. Falls back to Pickle if encoding fails and allow_pickle is True.

Parameter Type
ctx FlyteContext
v dict
python_type Type[dict]
allow_pickle bool

dict_to_generic_literal()

def dict_to_generic_literal(
    ctx: FlyteContext,
    v: dict,
    python_type: Type[dict],
    allow_pickle: bool,
):

This is deprecated from flytekit 1.14.0. Creates a flyte-specific Literal value from a native python dictionary. Note: This is deprecated and will be removed in the future.

Parameter Type
ctx FlyteContext
v dict
python_type Type[dict]
allow_pickle bool

extract_types()

def extract_types(
    t: Optional[Type[dict]],
):
Parameter Type
t Optional[Type[dict]]

from_binary_idl()

def from_binary_idl(
    binary_idl_object: Binary,
    expected_python_type: Type[T],
):

This function primarily handles deserialization for untyped dicts, dataclasses, Pydantic BaseModels, and attribute access.`

For untyped dict, dataclass, and pydantic basemodel: Life Cycle (Untyped Dict as example): python val -> msgpack bytes -> binary literal scalar -> msgpack bytes -> python val (to_literal) (from_binary_idl)

For attribute access: Life Cycle: python val -> msgpack bytes -> binary literal scalar -> resolved golang value -> binary literal scalar -> msgpack bytes -> python val (to_literal) (propeller attribute access) (from_binary_idl)

Parameter Type
binary_idl_object Binary
expected_python_type Type[T]

from_generic_idl()

def from_generic_idl(
    generic: Struct,
    expected_python_type: Type[T],
):

TODO: Support all Flyte Types. This is for dataclass attribute access from input created from the Flyte Console.

Note:

  • This can be removed in the future when the Flyte Console support generate Binary IDL Scalar as input.
Parameter Type
generic Struct
expected_python_type Type[T]

get_literal_type()

def get_literal_type(
    t: Type[dict],
):

Transforms a native python dictionary to a flyte-specific LiteralType

Parameter Type
t Type[dict]

guess_python_type()

def guess_python_type(
    literal_type: LiteralType,
):

Converts the Flyte LiteralType to a python object type.

Parameter Type
literal_type LiteralType

is_pickle()

def is_pickle(
    python_type: Type[dict],
):
Parameter Type
python_type Type[dict]

isinstance_generic()

def isinstance_generic(
    obj,
    generic_alias,
):
Parameter Type
obj
generic_alias

to_html()

def to_html(
    ctx: FlyteContext,
    python_val: T,
    expected_python_type: Type[T],
):

Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div

Parameter Type
ctx FlyteContext
python_val T
expected_python_type Type[T]

to_literal()

def to_literal(
    ctx: FlyteContext,
    python_val: typing.Any,
    python_type: Type[T],
    expected: LiteralType,
):

Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch

Parameter Type
ctx FlyteContext
python_val typing.Any
python_type Type[T]
expected LiteralType

to_python_value()

def to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: Type[T],
):

Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised

Parameter Type
ctx FlyteContext
lv Literal
expected_python_type Type[T]

Properties

Property Type Description
is_async
name
python_type
type_assertions_enabled

flytekit.extend.ExecutableTemplateShimTask

The canonical @task decorated Python function task is pretty simple to reason about. At execution time (either locally or on a Flyte cluster), the function runs.

This class, along with the ShimTaskExecutor class below, represents another execution pattern. This pattern, has two components:

  • The TaskTemplate, or something like it like a FlyteTask.
  • An executor, which can use information from the task template (including the custom field)

Basically at execution time (both locally and on a Flyte cluster), the task template is given to the executor, which is responsible for computing and returning the results.

.. note::

The interface at execution time will have to derived from the Flyte IDL interface, which means it may be lossy. This is because when a task is serialized from Python into the TaskTemplate some information is lost because Flyte IDL can’t keep track of every single Python type (or Java type if writing in the Java flytekit).

This class also implements the dispatch_execute and execute functions to make it look like a PythonTask that the entrypoint.py can execute, even though this class doesn’t inherit from PythonTask.

def ExecutableTemplateShimTask(
    tt: _task_model.TaskTemplate,
    executor_type: Type[ShimTaskExecutor],
    args,
    kwargs,
):
Parameter Type
tt _task_model.TaskTemplate
executor_type Type[ShimTaskExecutor]
args *args
kwargs **kwargs

Methods

Method Description
dispatch_execute() This function is largely similar to the base PythonTask, with the exception that we have to infer the Python
execute() Rather than running here, send everything to the executor
post_execute() This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask
pre_execute() This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask

dispatch_execute()

def dispatch_execute(
    ctx: FlyteContext,
    input_literal_map: _literal_models.LiteralMap,
):

This function is largely similar to the base PythonTask, with the exception that we have to infer the Python interface before executing. Also, we refer to self.task_template rather than just self similar to task classes that derive from the base PythonTask.

Parameter Type
ctx FlyteContext
input_literal_map _literal_models.LiteralMap

execute()

def execute(
    kwargs,
):

Rather than running here, send everything to the executor.

Parameter Type
kwargs **kwargs

post_execute()

def post_execute(
    _: Optional[ExecutionParameters],
    rval: Any,
):

This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask.

Parameter Type
_ Optional[ExecutionParameters]
rval Any

pre_execute()

def pre_execute(
    user_params: Optional[ExecutionParameters],
):

This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask.

Parameter Type
user_params Optional[ExecutionParameters]

Properties

Property Type Description
executor
executor_type
name
task_template

flytekit.extend.ExecutionState

This is the context that is active when executing a task or a local workflow. This carries the necessary state to execute. Some required things during execution deal with temporary directories, ExecutionParameters that are passed to the user etc.

Attributes: mode (ExecutionState.Mode): Defines the context in which the task is executed (local, hosted, etc). working_dir (os.PathLike): Specifies the remote, external directory where inputs, outputs and other protobufs are uploaded engine_dir (os.PathLike): branch_eval_mode Optional[BranchEvalMode]: Used to determine whether a branch node should execute. user_space_params Optional[ExecutionParameters]: Provides run-time, user-centric context such as a statsd handler, a logging handler, the current execution id and a working directory.

def ExecutionState(
    working_dir: Union[os.PathLike, str],
    mode: Optional[ExecutionState.Mode],
    engine_dir: Optional[Union[os.PathLike, str]],
    branch_eval_mode: Optional[BranchEvalMode],
    user_space_params: Optional[ExecutionParameters],
):
Parameter Type
working_dir Union[os.PathLike, str]
mode Optional[ExecutionState.Mode]
engine_dir Optional[Union[os.PathLike, str]]
branch_eval_mode Optional[BranchEvalMode]
user_space_params Optional[ExecutionParameters]

Methods

Method Description
branch_complete() Indicates that we are within a conditional / ifelse block and the active branch is not done
is_local_execution() None
take_branch() Indicates that we are within an if-else block and the current branch has evaluated to true
with_params() Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values

branch_complete()

def branch_complete()

Indicates that we are within a conditional / ifelse block and the active branch is not done. Default to SKIPPED

is_local_execution()

def is_local_execution()

take_branch()

def take_branch()

Indicates that we are within an if-else block and the current branch has evaluated to true. Useful only in local execution mode

with_params()

def with_params(
    working_dir: Optional[os.PathLike],
    mode: Optional[Mode],
    engine_dir: Optional[os.PathLike],
    branch_eval_mode: Optional[BranchEvalMode],
    user_space_params: Optional[ExecutionParameters],
):

Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values.

Parameter Type
working_dir Optional[os.PathLike]
mode Optional[Mode]
engine_dir Optional[os.PathLike]
branch_eval_mode Optional[BranchEvalMode]
user_space_params Optional[ExecutionParameters]

flytekit.extend.FileAccessProvider

This is the class that is available through the FlyteContext and can be used for persisting data to the remote durable store.

def FileAccessProvider(
    local_sandbox_dir: typing.Union[str, os.PathLike],
    raw_output_prefix: str,
    data_config: typing.Optional[flytekit.configuration.DataConfig],
    execution_metadata: typing.Optional[dict],
):
Parameter Type
local_sandbox_dir typing.Union[str, os.PathLike]
raw_output_prefix str
data_config typing.Optional[flytekit.configuration.DataConfig]
execution_metadata typing.Optional[dict]

Methods

Method Description
async_get_data()
async_put_data() The implication here is that we’re always going to put data to the remote location, so we
async_put_raw_data() This is a more flexible version of put that accepts a file-like object or a string path
download() Downloads from remote to local
download_directory() Downloads directory from given remote to local path
exists() None
generate_new_custom_path() Generates a new path with the raw output prefix and a random string appended to it
get() None
get_async_filesystem_for_path() None
get_data()
get_file_tail() None
get_filesystem() None
get_filesystem_for_path() None
get_random_local_directory() None
get_random_local_path() Use file_path_or_file_name, when you want a random directory, but want to preserve the leaf file name
get_random_remote_directory() None
get_random_remote_path() None
get_random_string() None
is_remote() Deprecated
join() None
put_data() The implication here is that we’re always going to put data to the remote location, so we
put_raw_data() This is a more flexible version of put that accepts a file-like object or a string path
recursive_paths() None
sep() None
strip_file_header() Drops file:// if it exists from the file
upload()
upload_directory()

async_get_data()

def async_get_data(
    remote_path: str,
    local_path: str,
    is_multipart: bool,
    kwargs,
):
Parameter Type
remote_path str
local_path str
is_multipart bool
kwargs **kwargs

async_put_data()

def async_put_data(
    local_path: typing.Union[str, os.PathLike],
    remote_path: str,
    is_multipart: bool,
    kwargs,
):

The implication here is that we’re always going to put data to the remote location, so we .remote to ensure we don’t use the true local proxy if the remote path is a file://

Parameter Type
local_path typing.Union[str, os.PathLike]
remote_path str
is_multipart bool
kwargs **kwargs

async_put_raw_data()

def async_put_raw_data(
    lpath: typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO],
    upload_prefix: typing.Optional[str],
    file_name: typing.Optional[str],
    read_chunk_size_bytes: int,
    encoding: str,
    skip_raw_data_prefix: bool,
    kwargs,
):

This is a more flexible version of put that accepts a file-like object or a string path. Writes to the raw output prefix only. If you want to write to another fs use put_data or get the fsspec file system directly. FYI: Currently the raw output prefix set by propeller is already unique per retry and looks like s3://my-s3-bucket/data/o4/feda4e266c748463a97d-n0-0

If lpath is a folder, then recursive will be set. If lpath is a streamable, then it can only be a single file.

Writes to: {raw output prefix}/{upload_prefix}/{file_name}

Parameter Type
lpath typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO]
upload_prefix typing.Optional[str]
file_name typing.Optional[str]
read_chunk_size_bytes int
encoding str
skip_raw_data_prefix bool
kwargs **kwargs

download()

def download(
    remote_path: str,
    local_path: str,
    kwargs,
):

Downloads from remote to local

Parameter Type
remote_path str
local_path str
kwargs **kwargs

download_directory()

def download_directory(
    remote_path: str,
    local_path: str,
    kwargs,
):

Downloads directory from given remote to local path

Parameter Type
remote_path str
local_path str
kwargs **kwargs

exists()

def exists(
    path: str,
):
Parameter Type
path str

generate_new_custom_path()

def generate_new_custom_path(
    fs: typing.Optional[fsspec.spec.AbstractFileSystem],
    alt: typing.Optional[str],
    stem: typing.Optional[str],
):

Generates a new path with the raw output prefix and a random string appended to it. Optionally, you can provide an alternate prefix and a stem. If stem is provided, it will be appended to the path instead of a random string. If alt is provided, it will replace the first part of the output prefix, e.g. the S3 or GCS bucket.

If wanting to write to a non-random prefix in a non-default S3 bucket, this can be called with alt=“my-alt-bucket” and stem=“my-stem” to generate a path like s3://my-alt-bucket/default-prefix-part/my-stem

Parameter Type
fs typing.Optional[fsspec.spec.AbstractFileSystem]
alt typing.Optional[str]
stem typing.Optional[str]

get()

def get(
    from_path: str,
    to_path: str,
    recursive: bool,
    kwargs,
):
Parameter Type
from_path str
to_path str
recursive bool
kwargs **kwargs

get_async_filesystem_for_path()

def get_async_filesystem_for_path(
    path: str,
    anonymous: bool,
    kwargs,
):
Parameter Type
path str
anonymous bool
kwargs **kwargs

get_data()

def get_data(
    remote_path: str,
    local_path: str,
    is_multipart: bool,
    kwargs,
):
Parameter Type
remote_path str
local_path str
is_multipart bool
kwargs **kwargs

get_file_tail()

def get_file_tail(
    file_path_or_file_name: str,
):
Parameter Type
file_path_or_file_name str

get_filesystem()

def get_filesystem(
    protocol: typing.Optional[str],
    anonymous: bool,
    path: typing.Optional[str],
    kwargs,
):
Parameter Type
protocol typing.Optional[str]
anonymous bool
path typing.Optional[str]
kwargs **kwargs

get_filesystem_for_path()

def get_filesystem_for_path(
    path: str,
    anonymous: bool,
    kwargs,
):
Parameter Type
path str
anonymous bool
kwargs **kwargs

get_random_local_directory()

def get_random_local_directory()

get_random_local_path()

def get_random_local_path(
    file_path_or_file_name: typing.Optional[str],
):

Use file_path_or_file_name, when you want a random directory, but want to preserve the leaf file name

Parameter Type
file_path_or_file_name typing.Optional[str]

get_random_remote_directory()

def get_random_remote_directory()

get_random_remote_path()

def get_random_remote_path(
    file_path_or_file_name: typing.Optional[str],
):
Parameter Type
file_path_or_file_name typing.Optional[str]

get_random_string()

def get_random_string()

is_remote()

def is_remote(
    path: typing.Union[str, os.PathLike],
):

Deprecated. Let’s find a replacement

Parameter Type
path typing.Union[str, os.PathLike]

join()

def join(
    args: `*args`,
    unstrip: bool,
    fs: typing.Optional[fsspec.spec.AbstractFileSystem],
):
Parameter Type
args *args
unstrip bool
fs typing.Optional[fsspec.spec.AbstractFileSystem]

put_data()

def put_data(
    local_path: typing.Union[str, os.PathLike],
    remote_path: str,
    is_multipart: bool,
    kwargs,
):

The implication here is that we’re always going to put data to the remote location, so we .remote to ensure we don’t use the true local proxy if the remote path is a file://

Parameter Type
local_path typing.Union[str, os.PathLike]
remote_path str
is_multipart bool
kwargs **kwargs

put_raw_data()

def put_raw_data(
    lpath: typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO],
    upload_prefix: typing.Optional[str],
    file_name: typing.Optional[str],
    read_chunk_size_bytes: int,
    encoding: str,
    skip_raw_data_prefix: bool,
    kwargs,
):

This is a more flexible version of put that accepts a file-like object or a string path. Writes to the raw output prefix only. If you want to write to another fs use put_data or get the fsspec file system directly. FYI: Currently the raw output prefix set by propeller is already unique per retry and looks like s3://my-s3-bucket/data/o4/feda4e266c748463a97d-n0-0

If lpath is a folder, then recursive will be set. If lpath is a streamable, then it can only be a single file.

Writes to: {raw output prefix}/{upload_prefix}/{file_name}

Parameter Type
lpath typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO]
upload_prefix typing.Optional[str]
file_name typing.Optional[str]
read_chunk_size_bytes int
encoding str
skip_raw_data_prefix bool
kwargs **kwargs

recursive_paths()

def recursive_paths(
    f: str,
    t: str,
):
Parameter Type
f str
t str

sep()

def sep(
    file_system: typing.Optional[fsspec.spec.AbstractFileSystem],
):
Parameter Type
file_system typing.Optional[fsspec.spec.AbstractFileSystem]

strip_file_header()

def strip_file_header(
    path: str,
    trim_trailing_sep: bool,
):

Drops file:// if it exists from the file

Parameter Type
path str
trim_trailing_sep bool

upload()

def upload(
    file_path: str,
    to_path: str,
    kwargs,
):
Parameter Type
file_path str
to_path str
kwargs **kwargs

upload_directory()

def upload_directory(
    local_path: str,
    remote_path: str,
    kwargs,
):
Parameter Type
local_path str
remote_path str
kwargs **kwargs

Properties

Property Type Description
data_config
local_access
local_sandbox_dir
raw_output_fs
raw_output_prefix

flytekit.extend.IgnoreOutputs

This exception should be used to indicate that the outputs generated by this can be safely ignored. This is useful in case of distributed training or peer-to-peer parallel algorithms.

flytekit.extend.Image

Image is a structured wrapper for task container images used in object serialization.

Attributes: name (str): A user-provided name to identify this image. fqn (str): Fully qualified image name. This consists of #. a registry location #. a username #. a repository name For example: hostname/username/reponame tag (str): Optional tag used to specify which version of an image to pull digest (str): Optional digest used to specify which version of an image to pull

def Image(
    name: str,
    fqn: str,
    tag: Optional[str],
    digest: Optional[str],
):
Parameter Type
name str
fqn str
tag Optional[str]
digest Optional[str]

Methods

Method Description
from_dict() None
from_json() None
look_up_image_info() Creates an Image object from an image identifier string or a path to an ImageSpec yaml file
schema() None
to_dict() None
to_json() None

from_dict()

def from_dict(
    kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
    infer_missing,
):
Parameter Type
kvs typing.Union[dict, list, str, int, float, bool, NoneType]
infer_missing

from_json()

def from_json(
    s: typing.Union[str, bytes, bytearray],
    parse_float,
    parse_int,
    parse_constant,
    infer_missing,
    kw,
):
Parameter Type
s typing.Union[str, bytes, bytearray]
parse_float
parse_int
parse_constant
infer_missing
kw

look_up_image_info()

def look_up_image_info(
    name: str,
    image_identifier: str,
    allow_no_tag_or_digest: bool,
):

Creates an Image object from an image identifier string or a path to an ImageSpec yaml file.

This function is used when registering tasks/workflows with Admin. When using the canonical Python-based development cycle, the version that is used to register workflows and tasks with Admin should be the version of the image itself, which should ideally be something unique like the git revision SHA1 of the latest commit.

Parameter Type
name str
image_identifier str
allow_no_tag_or_digest bool

schema()

def schema(
    infer_missing: bool,
    only,
    exclude,
    many: bool,
    context,
    load_only,
    dump_only,
    partial: bool,
    unknown,
):
Parameter Type
infer_missing bool
only
exclude
many bool
context
load_only
dump_only
partial bool
unknown

to_dict()

def to_dict(
    encode_json,
):
Parameter Type
encode_json

to_json()

def to_json(
    skipkeys: bool,
    ensure_ascii: bool,
    check_circular: bool,
    allow_nan: bool,
    indent: typing.Union[int, str, NoneType],
    separators: typing.Tuple[str, str],
    default: typing.Callable,
    sort_keys: bool,
    kw,
):
Parameter Type
skipkeys bool
ensure_ascii bool
check_circular bool
allow_nan bool
indent typing.Union[int, str, NoneType]
separators typing.Tuple[str, str]
default typing.Callable
sort_keys bool
kw

Properties

Property Type Description
full
version

flytekit.extend.ImageConfig

We recommend you to use ImageConfig.auto(img_name=None) to create an ImageConfig. For example, ImageConfig.auto(img_name=““ghcr.io/flyteorg/flytecookbook:v1.0.0"”) will create an ImageConfig.

ImageConfig holds available images which can be used at registration time. A default image can be specified along with optional additional images. Each image in the config must have a unique name.

Attributes: default_image (Optional[Image]): The default image to be used as a container for task serialization. images (List[Image]): Optional, additional images which can be used in task container definitions.

def ImageConfig(
    default_image: Optional[Image],
    images: Optional[List[Image]],
):
Parameter Type
default_image Optional[Image]
images Optional[List[Image]]

Methods

Method Description
auto() Reads from config file or from img_name
auto_default_image() None
create_from() None
find_image() Return an image, by name, if it exists
from_dict() None
from_images() Allows you to programmatically create an ImageConfig
from_json() None
schema() None
to_dict() None
to_json() None
validate_image() Validates the image to match the standard format

auto()

def auto(
    config_file: typing.Union[str, ConfigFile, None],
    img_name: Optional[str],
):

Reads from config file or from img_name Note that this function does not take into account the flytekit default images (see the Dockerfiles at the base of this repo). To pick those up, see the auto_default_image function..

Parameter Type
config_file typing.Union[str, ConfigFile, None]
img_name Optional[str]

auto_default_image()

def auto_default_image()

create_from()

def create_from(
    default_image: Optional[Image],
    other_images: typing.Optional[typing.List[Image]],
):
Parameter Type
default_image Optional[Image]
other_images typing.Optional[typing.List[Image]]

find_image()

def find_image(
    name,
):

Return an image, by name, if it exists.

Parameter Type
name

from_dict()

def from_dict(
    kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
    infer_missing,
):
Parameter Type
kvs typing.Union[dict, list, str, int, float, bool, NoneType]
infer_missing

from_images()

def from_images(
    default_image: str,
    m: typing.Optional[typing.Dict[str, str]],
):

Allows you to programmatically create an ImageConfig. Usually only the default_image is required, unless your workflow uses multiple images

.. code:: python

ImageConfig.from_dict( “ghcr.io/flyteorg/flytecookbook:v1.0.0”, { “spark”: “ghcr.io/flyteorg/myspark:…”, “other”: “…”, } )

urn:

Parameter Type
default_image str
m typing.Optional[typing.Dict[str, str]]

from_json()

def from_json(
    s: typing.Union[str, bytes, bytearray],
    parse_float,
    parse_int,
    parse_constant,
    infer_missing,
    kw,
):
Parameter Type
s typing.Union[str, bytes, bytearray]
parse_float
parse_int
parse_constant
infer_missing
kw

schema()

def schema(
    infer_missing: bool,
    only,
    exclude,
    many: bool,
    context,
    load_only,
    dump_only,
    partial: bool,
    unknown,
):
Parameter Type
infer_missing bool
only
exclude
many bool
context
load_only
dump_only
partial bool
unknown

to_dict()

def to_dict(
    encode_json,
):
Parameter Type
encode_json

to_json()

def to_json(
    skipkeys: bool,
    ensure_ascii: bool,
    check_circular: bool,
    allow_nan: bool,
    indent: typing.Union[int, str, NoneType],
    separators: typing.Tuple[str, str],
    default: typing.Callable,
    sort_keys: bool,
    kw,
):
Parameter Type
skipkeys bool
ensure_ascii bool
check_circular bool
allow_nan bool
indent typing.Union[int, str, NoneType]
separators typing.Tuple[str, str]
default typing.Callable
sort_keys bool
kw

validate_image()

def validate_image(
    _: typing.Any,
    param: str,
    values: tuple,
):

Validates the image to match the standard format. Also validates that only one default image is provided. a default image, is one that is specified as default=<image_uri> or just <image_uri>. All other images should be provided with a name, in the format name=<image_uri> This method can be used with the CLI

Parameter Type
_ typing.Any
param str
values tuple

flytekit.extend.Interface

A Python native interface object, like inspect.signature but simpler.

def Interface(
    inputs: Union[Optional[Dict[str, Type]], Optional[Dict[str, Tuple[Type, Any]]]],
    outputs: Union[Optional[Dict[str, Type]], Optional[Dict[str, Optional[Type]]]],
    output_tuple_name: Optional[str],
    docstring: Optional[Docstring],
):
Parameter Type
inputs Union[Optional[Dict[str, Type]], Optional[Dict[str, Tuple[Type, Any]]]]
outputs Union[Optional[Dict[str, Type]], Optional[Dict[str, Optional[Type]]]]
output_tuple_name Optional[str]
docstring Optional[Docstring]

Methods

Method Description
remove_inputs() This method is useful in removing some variables from the Flyte backend inputs specification, as these are
with_inputs() Use this to add additional inputs to the interface
with_outputs() This method allows addition of extra outputs are expected from a task specification

remove_inputs()

def remove_inputs(
    vars: Optional[List[str]],
):

This method is useful in removing some variables from the Flyte backend inputs specification, as these are implicit local only inputs or will be supplied by the library at runtime. For example, spark-session etc It creates a new instance of interface with the requested variables removed

Parameter Type
vars Optional[List[str]]

with_inputs()

def with_inputs(
    extra_inputs: Dict[str, Type],
):

Use this to add additional inputs to the interface. This is useful for adding additional implicit inputs that are added without the user requesting for them

Parameter Type
extra_inputs Dict[str, Type]

with_outputs()

def with_outputs(
    extra_outputs: Dict[str, Type],
):

This method allows addition of extra outputs are expected from a task specification

Parameter Type
extra_outputs Dict[str, Type]

Properties

Property Type Description
default_inputs_as_kwargs
docstring
inputs
inputs_with_defaults
output_names
output_tuple
output_tuple_name
outputs

flytekit.extend.Promise

This object is a wrapper and exists for three main reasons. Let’s assume we’re dealing with a task like ::

@task def t1() -> (int, str): …

#. Handling the duality between compilation and local execution - when the task function is run in a local execution mode inside a workflow function, a Python integer and string are produced. When the task is being compiled as part of the workflow, the task call creates a Node instead, and the task returns two Promise objects that point to that Node. #. One needs to be able to call ::

x = t1().with_overrides(…)

If the task returns an integer or a (int, str) tuple like t1 above, calling with_overrides on the result would throw an error. This Promise object adds that. #. Assorted handling for conditionals.

def Promise(
    var: str,
    val: Union[NodeOutput, _literals_models.Literal],
    type: typing.Optional[_type_models.LiteralType],
):
Parameter Type
var str
val Union[NodeOutput, _literals_models.Literal]
type typing.Optional[_type_models.LiteralType]

Methods

Method Description
deepcopy() None
eval() None
is_() None
is_false() None
is_none() None
is_true() None
with_overrides() None
with_var() None

deepcopy()

def deepcopy()

eval()

def eval()

is_()

def is_(
    v: bool,
):
Parameter Type
v bool

is_false()

def is_false()

is_none()

def is_none()

is_true()

def is_true()

with_overrides()

def with_overrides(
    node_name: Optional[str],
    aliases: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    timeout: Optional[Union[int, datetime.timedelta, object]],
    retries: Optional[int],
    interruptible: Optional[bool],
    name: Optional[str],
    task_config: Optional[Any],
    container_image: Optional[str],
    accelerator: Optional[BaseAccelerator],
    cache: Optional[bool],
    cache_version: Optional[str],
    cache_serialize: Optional[bool],
    args,
    kwargs,
):
Parameter Type
node_name Optional[str]
aliases Optional[Dict[str, str]]
requests Optional[Resources]
limits Optional[Resources]
timeout Optional[Union[int, datetime.timedelta, object]]
retries Optional[int]
interruptible Optional[bool]
name Optional[str]
task_config Optional[Any]
container_image Optional[str]
accelerator Optional[BaseAccelerator]
cache Optional[bool]
cache_version Optional[str]
cache_serialize Optional[bool]
args *args
kwargs **kwargs

with_var()

def with_var(
    new_var: str,
):
Parameter Type
new_var str

Properties

Property Type Description
attr_path
is_ready
ref
val
var

flytekit.extend.PythonCustomizedContainerTask

Please take a look at the comments for :py:classflytekit.extend.ExecutableTemplateShimTask as well. This class should be subclassed and a custom Executor provided as a default to this parent class constructor when building a new external-container flytekit-only plugin.

This class provides authors of new task types the basic scaffolding to create task-template based tasks. In order to write such a task, authors need to

  • subclass the ShimTaskExecutor class and override the execute_from_model function. This function is where all the business logic should go. Keep in mind though that you, the plugin author, will not have access to anything that’s not serialized within the TaskTemplate which is why you’ll also need to
  • subclass this class, and override the get_custom function to include all the information the executor will need to run.
  • Also pass the executor you created as the executor_type argument of this class’s constructor.

Keep in mind that the total size of the TaskTemplate still needs to be small, since these will be accessed frequently by the Flyte engine.

def PythonCustomizedContainerTask(
    name: str,
    task_config: TC,
    container_image: str,
    executor_type: Type[ShimTaskExecutor],
    task_resolver: Optional[TaskTemplateResolver],
    task_type,
    requests: Optional[Resources],
    limits: Optional[Resources],
    environment: Optional[Dict[str, str]],
    secret_requests: Optional[List[Secret]],
    kwargs,
):
Parameter Type
name str
task_config TC
container_image str
executor_type Type[ShimTaskExecutor]
task_resolver Optional[TaskTemplateResolver]
task_type
requests Optional[Resources]
limits Optional[Resources]
environment Optional[Dict[str, str]]
secret_requests Optional[List[Secret]]
kwargs **kwargs

Methods

Method Description
compile() Generates a node that encapsulates this task in a workflow definition
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition
dispatch_execute() This function is largely similar to the base PythonTask, with the exception that we have to infer the Python
execute() Rather than running here, send everything to the executor
find_lhs() None
get_command() None
get_config() Returns the task config as a serializable dictionary
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte
get_image() None
get_input_types() Returns the names and python types as a dictionary for the inputs of this task
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte
get_type_for_input_var() Returns the python type for an input variable by name
get_type_for_output_var() Returns the python type for the specified output variable by name
local_execute() This function is used only in the local execution path and is responsible for calling dispatch execute
local_execution_mode() None
post_execute() This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask
pre_execute() This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution
serialize_to_model() None

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):

Generates a node that encapsulates this task in a workflow definition.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

dispatch_execute()

def dispatch_execute(
    ctx: FlyteContext,
    input_literal_map: _literal_models.LiteralMap,
):

This function is largely similar to the base PythonTask, with the exception that we have to infer the Python interface before executing. Also, we refer to self.task_template rather than just self similar to task classes that derive from the base PythonTask.

Parameter Type
ctx FlyteContext
input_literal_map _literal_models.LiteralMap

execute()

def execute(
    kwargs,
):

Rather than running here, send everything to the executor.

Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_command()

def get_command(
    settings: SerializationSettings,
):
Parameter Type
settings SerializationSettings

get_config()

def get_config(
    settings: SerializationSettings,
):

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type
settings SerializationSettings

get_container()

def get_container(
    settings: SerializationSettings,
):

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings SerializationSettings

get_custom()

def get_custom(
    settings: SerializationSettings,
):

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type
settings SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_image()

def get_image(
    settings: SerializationSettings,
):
Parameter Type
settings SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
):

Returns the python type for an input variable by name.

Parameter Type
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
):

Returns the python type for the specified output variable by name.

Parameter Type
k str
v typing.Any

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

post_execute()

def post_execute(
    _: Optional[ExecutionParameters],
    rval: Any,
):

This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask.

Parameter Type
_ Optional[ExecutionParameters]
rval Any

pre_execute()

def pre_execute(
    user_params: Optional[ExecutionParameters],
):

This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask.

Parameter Type
user_params Optional[ExecutionParameters]

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

serialize_to_model()

def serialize_to_model(
    settings: SerializationSettings,
):
Parameter Type
settings SerializationSettings

Properties

Property Type Description
container_image
deck_fields
disable_deck
docs
enable_deck
environment
executor
executor_type
instantiated_in
interface
lhs
location
metadata
name
python_interface
resources
security_context
task_config
task_resolver
task_template
task_type
task_type_version

flytekit.extend.PythonTask

Base Class for all Tasks with a Python native Interface. This should be directly used for task types, that do not have a python function to be executed. Otherwise refer to :py:class:flytekit.PythonFunctionTask.

def PythonTask(
    task_type: str,
    name: str,
    task_config: typing.Optional[~T],
    interface: typing.Optional[flytekit.core.interface.Interface],
    environment: typing.Optional[typing.Dict[str, str]],
    disable_deck: typing.Optional[bool],
    enable_deck: typing.Optional[bool],
    deck_fields: typing.Optional[typing.Tuple[flytekit.deck.deck.DeckField, ...]],
    kwargs,
):
Parameter Type
task_type str
name str
task_config typing.Optional[~T]
interface typing.Optional[flytekit.core.interface.Interface]
environment typing.Optional[typing.Dict[str, str]]
disable_deck typing.Optional[bool]
enable_deck typing.Optional[bool]
deck_fields typing.Optional[typing.Tuple[flytekit.deck.deck.DeckField, ...]]
kwargs **kwargs

Methods

Method Description
compile() Generates a node that encapsulates this task in a workflow definition
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition
dispatch_execute() This method translates Flyte’s Type system based input values and invokes the actual call to the executor
execute() This method will be invoked to execute the task
find_lhs() None
get_config() Returns the task config as a serializable dictionary
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte
get_input_types() Returns the names and python types as a dictionary for the inputs of this task
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte
get_type_for_input_var() Returns the python type for an input variable by name
get_type_for_output_var() Returns the python type for the specified output variable by name
local_execute() This function is used only in the local execution path and is responsible for calling dispatch execute
local_execution_mode() None
post_execute() Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
pre_execute() This is the method that will be invoked directly before executing the task method and before all the inputs
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):

Generates a node that encapsulates this task in a workflow definition.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

dispatch_execute()

def dispatch_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.

  • VoidPromise is returned in the case when the task itself declares no outputs.
  • Literal Map is returned when the task returns either one more outputs in the declaration. Individual outputs may be none
  • DynamicJobSpec is returned when a dynamic workflow is executed
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

execute()

def execute(
    kwargs,
):

This method will be invoked to execute the task.

Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_config()

def get_config(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_container()

def get_container(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_custom()

def get_custom(
    settings: flytekit.configuration.SerializationSettings,
):

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
):

Returns the python type for an input variable by name.

Parameter Type
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
):

Returns the python type for the specified output variable by name.

Parameter Type
k str
v typing.Any

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

post_execute()

def post_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
    rval: typing.Any,
):

Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]
rval typing.Any

pre_execute()

def pre_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):

This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called

This should return either the same context of the mutated context

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

Properties

Property Type Description
deck_fields
disable_deck
docs
enable_deck
environment
instantiated_in
interface
lhs
location
metadata
name
python_interface
security_context
task_config
task_type
task_type_version

flytekit.extend.SQLTask

Base task types for all SQL tasks. See :py:class:flytekit.extras.sqlite3.task.SQLite3Task and :py:class:flytekitplugins.athena.task.AthenaTask for examples of how to use it as a base class.

.. autoclass:: flytekit.extras.sqlite3.task.SQLite3Task :noindex:

def SQLTask(
    name: str,
    query_template: str,
    task_config: typing.Optional[~T],
    task_type,
    inputs: typing.Optional[typing.Dict[str, typing.Tuple[typing.Type, typing.Any]]],
    metadata: typing.Optional[flytekit.core.base_task.TaskMetadata],
    outputs: typing.Optional[typing.Dict[str, typing.Type]],
    kwargs,
):

This SQLTask should mostly just be used as a base class for other SQL task types and should not be used directly. See :py:class:flytekit.extras.sqlite3.task.SQLite3Task

Parameter Type
name str
query_template str
task_config typing.Optional[~T]
task_type
inputs typing.Optional[typing.Dict[str, typing.Tuple[typing.Type, typing.Any]]]
metadata typing.Optional[flytekit.core.base_task.TaskMetadata]
outputs typing.Optional[typing.Dict[str, typing.Type]]
kwargs **kwargs

Methods

Method Description
compile() Generates a node that encapsulates this task in a workflow definition
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition
dispatch_execute() This method translates Flyte’s Type system based input values and invokes the actual call to the executor
execute() This method will be invoked to execute the task
find_lhs() None
get_config() Returns the task config as a serializable dictionary
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte
get_input_types() Returns the names and python types as a dictionary for the inputs of this task
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte
get_query() None
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte
get_type_for_input_var() Returns the python type for an input variable by name
get_type_for_output_var() Returns the python type for the specified output variable by name
interpolate_query() This function will fill in the query template with the provided kwargs and return the interpolated query
local_execute() This function is used only in the local execution path and is responsible for calling dispatch execute
local_execution_mode() None
post_execute() Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
pre_execute() This is the method that will be invoked directly before executing the task method and before all the inputs
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):

Generates a node that encapsulates this task in a workflow definition.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

dispatch_execute()

def dispatch_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.

  • VoidPromise is returned in the case when the task itself declares no outputs.
  • Literal Map is returned when the task returns either one more outputs in the declaration. Individual outputs may be none
  • DynamicJobSpec is returned when a dynamic workflow is executed
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

execute()

def execute(
    kwargs,
):

This method will be invoked to execute the task.

Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_config()

def get_config(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_container()

def get_container(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_custom()

def get_custom(
    settings: flytekit.configuration.SerializationSettings,
):

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_query()

def get_query(
    kwargs,
):
Parameter Type
kwargs **kwargs

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
):

Returns the python type for an input variable by name.

Parameter Type
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
):

Returns the python type for the specified output variable by name.

Parameter Type
k str
v typing.Any

interpolate_query()

def interpolate_query(
    query_template,
    kwargs,
):

This function will fill in the query template with the provided kwargs and return the interpolated query. Please note that when SQL tasks run in Flyte, this step is done by the task executor.

Parameter Type
query_template
kwargs **kwargs

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

post_execute()

def post_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
    rval: typing.Any,
):

Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]
rval typing.Any

pre_execute()

def pre_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):

This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called

This should return either the same context of the mutated context

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

Properties

Property Type Description
deck_fields
disable_deck
docs
enable_deck
environment
instantiated_in
interface
lhs
location
metadata
name
python_interface
query_template
security_context
task_config
task_type
task_type_version

flytekit.extend.SecretsManager

This provides a secrets resolution logic at runtime. The resolution order is

  • Try env var first. The env var should have the configuration.SECRETS_ENV_PREFIX. The env var will be all upper cased
  • If not then try the file where the name matches lower case configuration.SECRETS_DEFAULT_DIR/<group>/configuration.SECRETS_FILE_PREFIX<key>

All configuration values can always be overridden by injecting an environment variable

def SecretsManager(
    secrets_cfg: typing.Optional[SecretsConfig],
):
Parameter Type
secrets_cfg typing.Optional[SecretsConfig]

Methods

Method Description
get() Retrieves a secret using the resolution order -> Env followed by file
get_secrets_env_var() Returns a string that matches the ENV Variable to look for the secrets
get_secrets_file() Returns a path that matches the file to look for the secrets

get()

def get(
    group: Optional[str],
    key: Optional[str],
    group_version: Optional[str],
    encode_mode: str,
):

Retrieves a secret using the resolution order -> Env followed by file. If not found raises a ValueError param encode_mode, defines the mode to open files, it can either be “r” to read file, or “rb” to read binary file

Parameter Type
group Optional[str]
key Optional[str]
group_version Optional[str]
encode_mode str

get_secrets_env_var()

def get_secrets_env_var(
    group: Optional[str],
    key: Optional[str],
    group_version: Optional[str],
):

Returns a string that matches the ENV Variable to look for the secrets

Parameter Type
group Optional[str]
key Optional[str]
group_version Optional[str]

get_secrets_file()

def get_secrets_file(
    group: Optional[str],
    key: Optional[str],
    group_version: Optional[str],
):

Returns a path that matches the file to look for the secrets

Parameter Type
group Optional[str]
key Optional[str]
group_version Optional[str]

flytekit.extend.SerializationSettings

These settings are provided while serializing a workflow and task, before registration. This is required to get runtime information at serialization time, as well as some defaults.

Attributes: project (str): The project (if any) with which to register entities under. domain (str): The domain (if any) with which to register entities under. version (str): The version (if any) with which to register entities under. image_config (ImageConfig): The image config used to define task container images. env (Optional[Dict[str, str]]): Environment variables injected into task container definitions. flytekit_virtualenv_root (Optional[str]): During out of container serialize the absolute path of the flytekit virtualenv at serialization time won’t match the in-container value at execution time. This optional value is used to provide the in-container virtualenv path python_interpreter (Optional[str]): The python executable to use. This is used for spark tasks in out of container execution. entrypoint_settings (Optional[EntrypointSettings]): Information about the command, path and version of the entrypoint program. fast_serialization_settings (Optional[FastSerializationSettings]): If the code is being serialized so that it can be fast registered (and thus omit building a Docker image) this object contains additional parameters for serialization. source_root (Optional[str]): The root directory of the source code.

def SerializationSettings(
    image_config: ImageConfig,
    project: typing.Optional[str],
    domain: typing.Optional[str],
    version: typing.Optional[str],
    env: Optional[Dict[str, str]],
    git_repo: Optional[str],
    python_interpreter: str,
    flytekit_virtualenv_root: Optional[str],
    fast_serialization_settings: Optional[FastSerializationSettings],
    source_root: Optional[str],
):
Parameter Type
image_config ImageConfig
project typing.Optional[str]
domain typing.Optional[str]
version typing.Optional[str]
env Optional[Dict[str, str]]
git_repo Optional[str]
python_interpreter str
flytekit_virtualenv_root Optional[str]
fast_serialization_settings Optional[FastSerializationSettings]
source_root Optional[str]

Methods

Method Description
default_entrypoint_settings() Assumes the entrypoint is installed in a virtual-environment where the interpreter is
for_image() None
from_dict() None
from_json() None
from_transport() None
new_builder() Creates a ``SerializationSettings
schema() None
should_fast_serialize() Whether or not the serialization settings specify that entities should be serialized for fast registration
to_dict() None
to_json() None
venv_root_from_interpreter() Computes the path of the virtual environment root, based on the passed in python interpreter path
with_serialized_context() Use this method to create a new SerializationSettings that has an environment variable set with the SerializedContext

default_entrypoint_settings()

def default_entrypoint_settings(
    interpreter_path: str,
):

Assumes the entrypoint is installed in a virtual-environment where the interpreter is

Parameter Type
interpreter_path str

for_image()

def for_image(
    image: str,
    version: str,
    project: str,
    domain: str,
    python_interpreter_path: str,
):
Parameter Type
image str
version str
project str
domain str
python_interpreter_path str

from_dict()

def from_dict(
    kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
    infer_missing,
):
Parameter Type
kvs typing.Union[dict, list, str, int, float, bool, NoneType]
infer_missing

from_json()

def from_json(
    s: typing.Union[str, bytes, bytearray],
    parse_float,
    parse_int,
    parse_constant,
    infer_missing,
    kw,
):
Parameter Type
s typing.Union[str, bytes, bytearray]
parse_float
parse_int
parse_constant
infer_missing
kw

from_transport()

def from_transport(
    s: str,
):
Parameter Type
s str

new_builder()

def new_builder()

Creates a SerializationSettings.Builder that copies the existing serialization settings parameters and allows for customization.

schema()

def schema(
    infer_missing: bool,
    only,
    exclude,
    many: bool,
    context,
    load_only,
    dump_only,
    partial: bool,
    unknown,
):
Parameter Type
infer_missing bool
only
exclude
many bool
context
load_only
dump_only
partial bool
unknown

should_fast_serialize()

def should_fast_serialize()

Whether or not the serialization settings specify that entities should be serialized for fast registration.

to_dict()

def to_dict(
    encode_json,
):
Parameter Type
encode_json

to_json()

def to_json(
    skipkeys: bool,
    ensure_ascii: bool,
    check_circular: bool,
    allow_nan: bool,
    indent: typing.Union[int, str, NoneType],
    separators: typing.Tuple[str, str],
    default: typing.Callable,
    sort_keys: bool,
    kw,
):
Parameter Type
skipkeys bool
ensure_ascii bool
check_circular bool
allow_nan bool
indent typing.Union[int, str, NoneType]
separators typing.Tuple[str, str]
default typing.Callable
sort_keys bool
kw

venv_root_from_interpreter()

def venv_root_from_interpreter(
    interpreter_path: str,
):

Computes the path of the virtual environment root, based on the passed in python interpreter path for example /opt/venv/bin/python3 -> /opt/venv

Parameter Type
interpreter_path str

with_serialized_context()

def with_serialized_context()

Use this method to create a new SerializationSettings that has an environment variable set with the SerializedContext This is useful in transporting SerializedContext to serialized and registered tasks. The setting will be available in the env field with the key SERIALIZED_CONTEXT_ENV_VAR :return: A newly constructed SerializationSettings, or self, if it already has the serializationSettings

Properties

Property Type Description
entrypoint_settings
serialized_context

flytekit.extend.ShimTaskExecutor

Please see the notes for the metaclass above first.

This functionality has two use-cases currently,

  • Keep track of naming for non-function PythonAutoContainerTasks. That is, things like the :py:class:flytekit.extras.sqlite3.task.SQLite3Task task.
  • Task resolvers, because task resolvers are instances of :py:class:flytekit.core.python_auto_container.TaskResolverMixin classes, not the classes themselves, which means we need to look on the left hand side of them to see how to find them at task execution time.
def ShimTaskExecutor(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

Methods

Method Description
execute_from_model() This function must be overridden and is where all the business logic for running a task should live
find_lhs() None

execute_from_model()

def execute_from_model(
    tt: _task_model.TaskTemplate,
    kwargs,
):

This function must be overridden and is where all the business logic for running a task should live. Keep in mind that you’re only working with the TaskTemplate. You won’t have access to any information in the task that wasn’t serialized into the template.

Parameter Type
tt _task_model.TaskTemplate
kwargs **kwargs

find_lhs()

def find_lhs()

Properties

Property Type Description
instantiated_in
lhs
location

flytekit.extend.TaskPlugins

This is the TaskPlugins factory for task types that are derivative of PythonFunctionTask. Every task that the user wishes to use should be available in this factory. Usage

.. code-block:: python

TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type)

config_object_type is any class that will be passed to the plugin_object as task_config

Plugin_object_type is a derivative of PythonFunctionTask

Examples of available task plugins include different query-based plugins such as :py:class:flytekitplugins.athena.task.AthenaTask and :py:class:flytekitplugins.hive.task.HiveTask, kubeflow operators like :py:class:plugins.kfpytorch.flytekitplugins.kfpytorch.task.PyTorchFunctionTask and :py:class:plugins.kftensorflow.flytekitplugins.kftensorflow.task.TensorflowFunctionTask, and generic plugins like :py:class:flytekitplugins.pod.task.PodFunctionTask which doesn’t integrate with third party tools or services.

The task_config is different for every task plugin type. This is filled out by users when they define a task to specify plugin-specific behavior and features. For example, with a query type task plugin, the config might store information related to which database to query.

The plugin_object_type can be used to customize execution behavior and task serialization properties in tandem with the task_config.

Methods

Method Description
find_pythontask_plugin() Returns a PluginObjectType if found or returns the base PythonFunctionTask
register_pythontask_plugin() Use this method to register a new plugin into Flytekit

find_pythontask_plugin()

def find_pythontask_plugin(
    plugin_config_type: type,
):

Returns a PluginObjectType if found or returns the base PythonFunctionTask

Parameter Type
plugin_config_type type

register_pythontask_plugin()

def register_pythontask_plugin(
    plugin_config_type: type,
    plugin: Type[PythonFunctionTask],
):

Use this method to register a new plugin into Flytekit. Usage ::

.. code-block:: python

TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type)

config_object_type is any class that will be passed to the plugin_object as task_config

Plugin_object_type is a derivative of PythonFunctionTask

Parameter Type
plugin_config_type type
plugin Type[PythonFunctionTask]

flytekit.extend.TaskResolverMixin

Flytekit tasks interact with the Flyte platform very, very broadly in two steps. They need to be uploaded to Admin, and then they are run by the user upon request (either as a single task execution or as part of a workflow). In any case, at execution time, for most tasks (that is those that generate a container target) the container image containing the task needs to be spun up again at which point the container needs to know which task it’s supposed to run and how to rehydrate the task object.

For example, the serialization of a simple task ::

in repo_root/workflows/example.py

@task def t1(…) -> …: …

might result in a container with arguments like ::

pyflyte-execute –inputs s3://path/inputs.pb –output-prefix s3://outputs/location –raw-output-data-prefix /tmp/data –resolver flytekit.core.python_auto_container.default_task_resolver – task-module repo_root.workflows.example task-name t1

At serialization time, the container created for the task will start out automatically with the pyflyte-execute bit, along with the requisite input/output args and the offloaded data prefix. Appended to that will be two things,

#. the location of the task’s task resolver, followed by two dashes, followed by #. the arguments provided by calling the loader_args function below.

The default_task_resolver declared below knows that

  • When loader_args is called on a task, to look up the module the task is in, and the name of the task (the key of the task in the module, either the function name, or the variable it was assigned to).
  • When load_task is called, it interprets the first part of the command as the module to call importlib.import_module on, and then looks for a key t1.

This is just the default behavior. Users should feel free to implement their own resolvers.

Methods

Method Description
get_all_tasks() Future proof method
load_task() Given the set of identifier keys, should return one Python Task or raise an error if not found
loader_args() Return a list of strings that can help identify the parameter Task
name() None
task_name() Overridable function that can optionally return a custom name for a given task

get_all_tasks()

def get_all_tasks()

Future proof method. Just making it easy to access all tasks (Not required today as we auto register them)

load_task()

def load_task(
    loader_args: typing.List[str],
):

Given the set of identifier keys, should return one Python Task or raise an error if not found

Parameter Type
loader_args typing.List[str]

loader_args()

def loader_args(
    settings: flytekit.configuration.SerializationSettings,
    t: flytekit.core.base_task.Task,
):

Return a list of strings that can help identify the parameter Task

Parameter Type
settings flytekit.configuration.SerializationSettings
t flytekit.core.base_task.Task

name()

def name()

task_name()

def task_name(
    t: flytekit.core.base_task.Task,
):

Overridable function that can optionally return a custom name for a given task

Parameter Type
t flytekit.core.base_task.Task

Properties

Property Type Description
location

flytekit.extend.TypeEngine

Core Extensible TypeEngine of Flytekit. This should be used to extend the capabilities of FlyteKits type system. Users can implement their own TypeTransformers and register them with the TypeEngine. This will allow special handling of user objects

Methods

Method Description
async_to_literal() Converts a python value of a given type and expected LiteralType into a resolved Literal value
async_to_python_value() None
calculate_hash() None
dict_to_literal_map() None
dict_to_literal_map_pb() None
get_available_transformers() Returns all python types for which transformers are available
get_transformer() Implements a recursive search for the transformer
guess_python_type() Transforms a flyte-specific LiteralType to a regular python value
guess_python_types() Transforms a dictionary of flyte-specific Variable objects to a dictionary of regular python values
lazy_import_transformers() Only load the transformers if needed
literal_map_to_kwargs() None
named_tuple_to_variable_map() Converts a python-native NamedTuple to a flyte-specific VariableMap of named literals
register() This should be used for all types that respond with the right type annotation when you use type(
register_additional_type() None
register_restricted_type() None
to_html() None
to_literal() The current dance is because we are allowing users to call from an async function, this synchronous
to_literal_checks() None
to_literal_type() Converts a python type into a flyte specific LiteralType
to_python_value() Converts a Literal value with an expected python type into a python value
unwrap_offloaded_literal() None

async_to_literal()

def async_to_literal(
    ctx: FlyteContext,
    python_val: typing.Any,
    python_type: Type[T],
    expected: LiteralType,
):

Converts a python value of a given type and expected LiteralType into a resolved Literal value.

Parameter Type
ctx FlyteContext
python_val typing.Any
python_type Type[T]
expected LiteralType

async_to_python_value()

def async_to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: Type,
):
Parameter Type
ctx FlyteContext
lv Literal
expected_python_type Type

calculate_hash()

def calculate_hash(
    python_val: typing.Any,
    python_type: Type[T],
):
Parameter Type
python_val typing.Any
python_type Type[T]

dict_to_literal_map()

def dict_to_literal_map(
    ctx: FlyteContext,
    d: typing.Dict[str, typing.Any],
    type_hints: Optional[typing.Dict[str, type]],
):
Parameter Type
ctx FlyteContext
d typing.Dict[str, typing.Any]
type_hints Optional[typing.Dict[str, type]]

dict_to_literal_map_pb()

def dict_to_literal_map_pb(
    ctx: FlyteContext,
    d: typing.Dict[str, typing.Any],
    type_hints: Optional[typing.Dict[str, type]],
):
Parameter Type
ctx FlyteContext
d typing.Dict[str, typing.Any]
type_hints Optional[typing.Dict[str, type]]

get_available_transformers()

def get_available_transformers()

Returns all python types for which transformers are available

get_transformer()

def get_transformer(
    python_type: Type,
):

Implements a recursive search for the transformer.

Parameter Type
python_type Type

guess_python_type()

def guess_python_type(
    flyte_type: LiteralType,
):

Transforms a flyte-specific LiteralType to a regular python value.

Parameter Type
flyte_type LiteralType

guess_python_types()

def guess_python_types(
    flyte_variable_dict: typing.Dict[str, _interface_models.Variable],
):

Transforms a dictionary of flyte-specific Variable objects to a dictionary of regular python values.

Parameter Type
flyte_variable_dict typing.Dict[str, _interface_models.Variable]

lazy_import_transformers()

def lazy_import_transformers()

Only load the transformers if needed.

literal_map_to_kwargs()

def literal_map_to_kwargs(
    ctx: FlyteContext,
    lm: LiteralMap,
    python_types: typing.Optional[typing.Dict[str, type]],
    literal_types: typing.Optional[typing.Dict[str, _interface_models.Variable]],
):
Parameter Type
ctx FlyteContext
lm LiteralMap
python_types typing.Optional[typing.Dict[str, type]]
literal_types typing.Optional[typing.Dict[str, _interface_models.Variable]]

named_tuple_to_variable_map()

def named_tuple_to_variable_map(
    t: typing.NamedTuple,
):

Converts a python-native NamedTuple to a flyte-specific VariableMap of named literals.

Parameter Type
t typing.NamedTuple

register()

def register(
    transformer: TypeTransformer,
    additional_types: Optional[typing.List[Type]],
):

This should be used for all types that respond with the right type annotation when you use type(…) function

Parameter Type
transformer TypeTransformer
additional_types Optional[typing.List[Type]]

register_additional_type()

def register_additional_type(
    transformer: TypeTransformer[T],
    additional_type: Type[T],
    override,
):
Parameter Type
transformer TypeTransformer[T]
additional_type Type[T]
override

register_restricted_type()

def register_restricted_type(
    name: str,
    type: Type[T],
):
Parameter Type
name str
type Type[T]

to_html()

def to_html(
    ctx: FlyteContext,
    python_val: typing.Any,
    expected_python_type: Type[typing.Any],
):
Parameter Type
ctx FlyteContext
python_val typing.Any
expected_python_type Type[typing.Any]

to_literal()

def to_literal(
    ctx: FlyteContext,
    python_val: typing.Any,
    python_type: Type[T],
    expected: LiteralType,
):

The current dance is because we are allowing users to call from an async function, this synchronous to_literal function, and allowing this to_literal function, to then invoke yet another async function, namely an async transformer.

Parameter Type
ctx FlyteContext
python_val typing.Any
python_type Type[T]
expected LiteralType

to_literal_checks()

def to_literal_checks(
    python_val: typing.Any,
    python_type: Type[T],
    expected: LiteralType,
):
Parameter Type
python_val typing.Any
python_type Type[T]
expected LiteralType

to_literal_type()

def to_literal_type(
    python_type: Type[T],
):

Converts a python type into a flyte specific LiteralType

Parameter Type
python_type Type[T]

to_python_value()

def to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: Type,
):

Converts a Literal value with an expected python type into a python value.

Parameter Type
ctx FlyteContext
lv Literal
expected_python_type Type

unwrap_offloaded_literal()

def unwrap_offloaded_literal(
    ctx: FlyteContext,
    lv: Literal,
):
Parameter Type
ctx FlyteContext
lv Literal

flytekit.extend.TypeTransformer

Base transformer type that should be implemented for every python native type that can be handled by flytekit

def TypeTransformer(
    name: str,
    t: Type[T],
    enable_type_assertions: bool,
):
Parameter Type
name str
t Type[T]
enable_type_assertions bool

Methods

Method Description
assert_type() None
from_binary_idl() This function primarily handles deserialization for untyped dicts, dataclasses, Pydantic BaseModels, and attribute access
from_generic_idl() TODO: Support all Flyte Types
get_literal_type() Converts the python type to a Flyte LiteralType
guess_python_type() Converts the Flyte LiteralType to a python object type
isinstance_generic() None
to_html() Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
to_literal() Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type
to_python_value() Converts the given Literal to a Python Type

assert_type()

def assert_type(
    t: Type[T],
    v: T,
):
Parameter Type
t Type[T]
v T

from_binary_idl()

def from_binary_idl(
    binary_idl_object: Binary,
    expected_python_type: Type[T],
):

This function primarily handles deserialization for untyped dicts, dataclasses, Pydantic BaseModels, and attribute access.`

For untyped dict, dataclass, and pydantic basemodel: Life Cycle (Untyped Dict as example): python val -> msgpack bytes -> binary literal scalar -> msgpack bytes -> python val (to_literal) (from_binary_idl)

For attribute access: Life Cycle: python val -> msgpack bytes -> binary literal scalar -> resolved golang value -> binary literal scalar -> msgpack bytes -> python val (to_literal) (propeller attribute access) (from_binary_idl)

Parameter Type
binary_idl_object Binary
expected_python_type Type[T]

from_generic_idl()

def from_generic_idl(
    generic: Struct,
    expected_python_type: Type[T],
):

TODO: Support all Flyte Types. This is for dataclass attribute access from input created from the Flyte Console.

Note:

  • This can be removed in the future when the Flyte Console support generate Binary IDL Scalar as input.
Parameter Type
generic Struct
expected_python_type Type[T]

get_literal_type()

def get_literal_type(
    t: Type[T],
):

Converts the python type to a Flyte LiteralType

Parameter Type
t Type[T]

guess_python_type()

def guess_python_type(
    literal_type: LiteralType,
):

Converts the Flyte LiteralType to a python object type.

Parameter Type
literal_type LiteralType

isinstance_generic()

def isinstance_generic(
    obj,
    generic_alias,
):
Parameter Type
obj
generic_alias

to_html()

def to_html(
    ctx: FlyteContext,
    python_val: T,
    expected_python_type: Type[T],
):

Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div

Parameter Type
ctx FlyteContext
python_val T
expected_python_type Type[T]

to_literal()

def to_literal(
    ctx: FlyteContext,
    python_val: T,
    python_type: Type[T],
    expected: LiteralType,
):

Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch

Parameter Type
ctx FlyteContext
python_val T
python_type Type[T]
expected LiteralType

to_python_value()

def to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: Type[T],
):

Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised

Parameter Type
ctx FlyteContext
lv Literal
expected_python_type Type[T]

Properties

Property Type Description
is_async
name
python_type
type_assertions_enabled