1.15.4.dev2+g3e3ce2426

flytekit.types.structured.structured_dataset

Directory

Classes

Class Description
ABC Helper class that provides a standard way to create an ABC using.
Annotated Add context-specific metadata to a type.
AsyncTypeTransformer Base transformer type that should be implemented for every python native type that can be handled by flytekit.
Binary None.
DataClassJSONMixin None.
FlyteContext This is an internal-facing context object, that most users will not have to deal with.
FlyteContextManager FlyteContextManager manages the execution context within Flytekit.
Generic Abstract base class for generic types.
Literal None.
LiteralType None.
Renderable Base class for protocol classes.
Scalar None.
SchemaType None.
SerializableType None.
Struct A ProtocolMessage.
StructuredDataset This is the user facing StructuredDataset class.
StructuredDatasetDecoder Helper class that provides a standard way to create an ABC using.
StructuredDatasetEncoder Helper class that provides a standard way to create an ABC using.
StructuredDatasetFormat str(object=’’) -> str.
StructuredDatasetMetadata None.
StructuredDatasetTransformerEngine Think of this transformer as a higher-level meta transformer that is used for all the dataframe types.
StructuredDatasetType None.
TypeEngine Core Extensible TypeEngine of Flytekit.

Errors

flytekit.types.structured.structured_dataset.ABC

Helper class that provides a standard way to create an ABC using inheritance.

flytekit.types.structured.structured_dataset.Annotated

Add context-specific metadata to a type.

Example: Annotated[int, runtime_check.Unsigned] indicates to the hypothetical runtime_check module that this type is an unsigned int. Every other consumer of this type can ignore this metadata and treat this type as int.

The first argument to Annotated must be a valid type.

Details:

  • It’s an error to call Annotated with less than two arguments.
  • Access the metadata via the __metadata__ attribute::

assert Annotated[int, ‘$’].metadata == (’$’,)

  • Nested Annotated types are flattened::

assert Annotated[Annotated[T, Ann1, Ann2], Ann3] == Annotated[T, Ann1, Ann2, Ann3]

  • Instantiating an annotated type is equivalent to instantiating the underlying type::

assert AnnotatedC, Ann1 == C(5)

  • Annotated can be used as a generic type alias::

type Optimized[T] = Annotated[T, runtime.Optimize()]

type checker will treat Optimized[int]

as equivalent to Annotated[int, runtime.Optimize()]

type OptimizedList[T] = Annotated[list[T], runtime.Optimize()]

type checker will treat OptimizedList[int]

as equivalent to Annotated[list[int], runtime.Optimize()]

  • Annotated cannot be used with an unpacked TypeVarTuple::

type Variadic[*Ts] = Annotated[*Ts, Ann1] # NOT valid

This would be equivalent to::

Annotated[T1, T2, T3, …, Ann1]

where T1, T2 etc. are TypeVars, which would be invalid, because only one type should be passed to Annotated.

flytekit.types.structured.structured_dataset.AsyncTypeTransformer

Base transformer type that should be implemented for every python native type that can be handled by flytekit

def AsyncTypeTransformer(
    name: str,
    t: Type[T],
    enable_type_assertions: bool,
):
Parameter Type
name str
t Type[T]
enable_type_assertions bool

Methods

Method Description
assert_type() None
async_to_literal() Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type
async_to_python_value() Converts the given Literal to a Python Type
from_binary_idl() This function primarily handles deserialization for untyped dicts, dataclasses, Pydantic BaseModels, and attribute access
from_generic_idl() TODO: Support all Flyte Types
get_literal_type() Converts the python type to a Flyte LiteralType
guess_python_type() Converts the Flyte LiteralType to a python object type
isinstance_generic() None
to_html() Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
to_literal() Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type
to_python_value() Converts the given Literal to a Python Type

assert_type()

def assert_type(
    t: Type[T],
    v: T,
):
Parameter Type
t Type[T]
v T

async_to_literal()

def async_to_literal(
    ctx: FlyteContext,
    python_val: T,
    python_type: Type[T],
    expected: LiteralType,
):

Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch

Parameter Type
ctx FlyteContext
python_val T
python_type Type[T]
expected LiteralType

async_to_python_value()

def async_to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: Type[T],
):

Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised

Parameter Type
ctx FlyteContext
lv Literal
expected_python_type Type[T]

from_binary_idl()

def from_binary_idl(
    binary_idl_object: Binary,
    expected_python_type: Type[T],
):

This function primarily handles deserialization for untyped dicts, dataclasses, Pydantic BaseModels, and attribute access.ï½€

For untyped dict, dataclass, and pydantic basemodel: Life Cycle (Untyped Dict as example): python val -> msgpack bytes -> binary literal scalar -> msgpack bytes -> python val (to_literal) (from_binary_idl)

For attribute access: Life Cycle: python val -> msgpack bytes -> binary literal scalar -> resolved golang value -> binary literal scalar -> msgpack bytes -> python val (to_literal) (propeller attribute access) (from_binary_idl)

Parameter Type
binary_idl_object Binary
expected_python_type Type[T]

from_generic_idl()

def from_generic_idl(
    generic: Struct,
    expected_python_type: Type[T],
):

TODO: Support all Flyte Types. This is for dataclass attribute access from input created from the Flyte Console.

Note:

  • This can be removed in the future when the Flyte Console support generate Binary IDL Scalar as input.
Parameter Type
generic Struct
expected_python_type Type[T]

get_literal_type()

def get_literal_type(
    t: Type[T],
):

Converts the python type to a Flyte LiteralType

Parameter Type
t Type[T]

guess_python_type()

def guess_python_type(
    literal_type: LiteralType,
):

Converts the Flyte LiteralType to a python object type.

Parameter Type
literal_type LiteralType

isinstance_generic()

def isinstance_generic(
    obj,
    generic_alias,
):
Parameter Type
obj
generic_alias

to_html()

def to_html(
    ctx: FlyteContext,
    python_val: T,
    expected_python_type: Type[T],
):

Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div

Parameter Type
ctx FlyteContext
python_val T
expected_python_type Type[T]

to_literal()

def to_literal(
    ctx: FlyteContext,
    python_val: typing.Any,
    python_type: Type[T],
    expected: LiteralType,
):

Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch

Parameter Type
ctx FlyteContext
python_val typing.Any
python_type Type[T]
expected LiteralType

to_python_value()

def to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: Type[T],
):

Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised

Parameter Type
ctx FlyteContext
lv Literal
expected_python_type Type[T]

Properties

Property Type Description
is_async
name
python_type
type_assertions_enabled

flytekit.types.structured.structured_dataset.Binary

def Binary(
    value,
    tag,
):
Parameter Type
value
tag

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
is_empty
tag
value

flytekit.types.structured.structured_dataset.DataClassJSONMixin

Methods

Method Description
from_dict() None
from_json() None
to_dict() None
to_json() None

from_dict()

def from_dict(
    d,
    dialect,
):
Parameter Type
d
dialect

from_json()

def from_json(
    data: typing.Union[str, bytes, bytearray],
    decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
    from_dict_kwargs: typing.Any,
):
Parameter Type
data typing.Union[str, bytes, bytearray]
decoder collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]]
from_dict_kwargs typing.Any

to_dict()

def to_dict()

to_json()

def to_json(
    encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
    to_dict_kwargs: typing.Any,
):
Parameter Type
encoder collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]]
to_dict_kwargs typing.Any

flytekit.types.structured.structured_dataset.DuplicateHandlerError

Inappropriate argument value (of correct type).

flytekit.types.structured.structured_dataset.FlyteContext

This is an internal-facing context object, that most users will not have to deal with. It’s essentially a globally available grab bag of settings and objects that allows flytekit to do things like convert complex types, run and compile workflows, serialize Flyte entities, etc.

Even though this object as a current_context function on it, it should not be called directly. Please use the :py:class:flytekit.FlyteContextManager object instead.

Please do not confuse this object with the :py:class:flytekit.ExecutionParameters object.

def FlyteContext(
    file_access: FileAccessProvider,
    level: int,
    flyte_client: Optional['friendly_client.SynchronousFlyteClient'],
    compilation_state: Optional[CompilationState],
    execution_state: Optional[ExecutionState],
    serialization_settings: Optional[SerializationSettings],
    in_a_condition: bool,
    origin_stackframe: Optional[traceback.FrameSummary],
    output_metadata_tracker: Optional[OutputMetadataTracker],
    worker_queue: Optional[Controller],
):
Parameter Type
file_access FileAccessProvider
level int
flyte_client Optional['friendly_client.SynchronousFlyteClient']
compilation_state Optional[CompilationState]
execution_state Optional[ExecutionState]
serialization_settings Optional[SerializationSettings]
in_a_condition bool
origin_stackframe Optional[traceback.FrameSummary]
output_metadata_tracker Optional[OutputMetadataTracker]
worker_queue Optional[Controller]

Methods

Method Description
current_context() This method exists only to maintain backwards compatibility
enter_conditional_section() None
get_deck() Returns the deck that was created as part of the last execution
get_origin_stackframe_repr() None
new_builder() None
new_compilation_state() Creates and returns a default compilation state
new_execution_state() Creates and returns a new default execution state
set_stackframe() None
with_client() None
with_compilation_state() None
with_execution_state() None
with_file_access() None
with_new_compilation_state() None
with_output_metadata_tracker() None
with_serialization_settings() None
with_worker_queue() None

current_context()

def current_context()

This method exists only to maintain backwards compatibility. Please use FlyteContextManager.current_context() instead.

Users of flytekit should be wary not to confuse the object returned from this function with :py:func:flytekit.current_context

enter_conditional_section()

def enter_conditional_section()

get_deck()

def get_deck()

Returns the deck that was created as part of the last execution.

The return value depends on the execution environment. In a notebook, the return value is compatible with IPython.display and should be rendered in the notebook.

.. code-block:: python

with flytekit.new_context() as ctx: my_task(…) ctx.get_deck()

OR if you wish to explicitly display

.. code-block:: python

from IPython import display display(ctx.get_deck())

get_origin_stackframe_repr()

def get_origin_stackframe_repr()

new_builder()

def new_builder()

new_compilation_state()

def new_compilation_state(
    prefix: str,
):

Creates and returns a default compilation state. For most of the code this should be the entrypoint of compilation, otherwise the code should always uses - with_compilation_state

Parameter Type
prefix str

new_execution_state()

def new_execution_state(
    working_dir: Optional[os.PathLike],
):

Creates and returns a new default execution state. This should be used at the entrypoint of execution, in all other cases it is preferable to use with_execution_state

Parameter Type
working_dir Optional[os.PathLike]

set_stackframe()

def set_stackframe(
    s: traceback.FrameSummary,
):
Parameter Type
s traceback.FrameSummary

with_client()

def with_client(
    c: SynchronousFlyteClient,
):
Parameter Type
c SynchronousFlyteClient

with_compilation_state()

def with_compilation_state(
    c: CompilationState,
):
Parameter Type
c CompilationState

with_execution_state()

def with_execution_state(
    es: ExecutionState,
):
Parameter Type
es ExecutionState

with_file_access()

def with_file_access(
    fa: FileAccessProvider,
):
Parameter Type
fa FileAccessProvider

with_new_compilation_state()

def with_new_compilation_state()

with_output_metadata_tracker()

def with_output_metadata_tracker(
    t: OutputMetadataTracker,
):
Parameter Type
t OutputMetadataTracker

with_serialization_settings()

def with_serialization_settings(
    ss: SerializationSettings,
):
Parameter Type
ss SerializationSettings

with_worker_queue()

def with_worker_queue(
    wq: Controller,
):
Parameter Type
wq Controller

Properties

Property Type Description
user_space_params

flytekit.types.structured.structured_dataset.FlyteContextManager

FlyteContextManager manages the execution context within Flytekit. It holds global state of either compilation or Execution. It is not thread-safe and can only be run as a single threaded application currently. Context’s within Flytekit is useful to manage compilation state and execution state. Refer to CompilationState and ExecutionState for more information. FlyteContextManager provides a singleton stack to manage these contexts.

Typical usage is

.. code-block:: python

FlyteContextManager.initialize() with FlyteContextManager.with_context(o) as ctx: pass

If required - not recommended you can use

FlyteContextManager.push_context()

but correspondingly a pop_context should be called

FlyteContextManager.pop_context()

Methods

Method Description
add_signal_handler() None
current_context() None
get_origin_stackframe() None
initialize() Re-initializes the context and erases the entire context
pop_context() None
push_context() None
size() None
with_context() None

add_signal_handler()

def add_signal_handler(
    handler: typing.Callable[[int, FrameType], typing.Any],
):
Parameter Type
handler typing.Callable[[int, FrameType], typing.Any]

current_context()

def current_context()

get_origin_stackframe()

def get_origin_stackframe(
    limit,
):
Parameter Type
limit

initialize()

def initialize()

Re-initializes the context and erases the entire context

pop_context()

def pop_context()

push_context()

def push_context(
    ctx: FlyteContext,
    f: Optional[traceback.FrameSummary],
):
Parameter Type
ctx FlyteContext
f Optional[traceback.FrameSummary]

size()

def size()

with_context()

def with_context(
    b: FlyteContext.Builder,
):
Parameter Type
b FlyteContext.Builder

flytekit.types.structured.structured_dataset.Generic

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class’s name::

class Mapping[KT, VT]: def getitem(self, key: KT) -> VT: …

Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

flytekit.types.structured.structured_dataset.Literal

def Literal(
    scalar: typing.Optional[flytekit.models.literals.Scalar],
    collection: typing.Optional[flytekit.models.literals.LiteralCollection],
    map: typing.Optional[flytekit.models.literals.LiteralMap],
    hash: typing.Optional[str],
    metadata: typing.Optional[typing.Dict[str, str]],
    offloaded_metadata: typing.Optional[flytekit.models.literals.LiteralOffloadedMetadata],
):

This IDL message represents a literal value in the Flyte ecosystem.

Parameter Type
scalar typing.Optional[flytekit.models.literals.Scalar]
collection typing.Optional[flytekit.models.literals.LiteralCollection]
map typing.Optional[flytekit.models.literals.LiteralMap]
hash typing.Optional[str]
metadata typing.Optional[typing.Dict[str, str]]
offloaded_metadata typing.Optional[flytekit.models.literals.LiteralOffloadedMetadata]

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
set_metadata() Note: This is a mutation on the literal
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object: flyteidl.core.literals_pb2.Literal,
):
Parameter Type
pb2_object flyteidl.core.literals_pb2.Literal

serialize_to_string()

def serialize_to_string()

set_metadata()

def set_metadata(
    metadata: typing.Dict[str, str],
):

Note: This is a mutation on the literal

Parameter Type
metadata typing.Dict[str, str]

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
collection
hash
is_empty
map
metadata
offloaded_metadata
scalar
value

flytekit.types.structured.structured_dataset.LiteralType

def LiteralType(
    simple,
    schema,
    collection_type,
    map_value_type,
    blob,
    enum_type,
    union_type,
    structured_dataset_type,
    metadata,
    structure,
    annotation,
):

This is a oneof message, only one of the kwargs may be set, representing one of the Flyte types.

Parameter Type
simple
schema
collection_type
map_value_type
blob
enum_type
union_type
structured_dataset_type
metadata
structure
annotation

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    proto,
):
Parameter Type
proto

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
annotation
blob
collection_type
enum_type
is_empty
map_value_type
metadata
schema
simple
structure
structured_dataset_type
union_type

flytekit.types.structured.structured_dataset.Renderable

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol): def meth(self) -> int: …

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C: def meth(self) -> int: return 0

def func(x: Proto) -> int: return x.meth()

func(C()) # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProtoT: def meth(self) -> T: …

def Renderable(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

Methods

Method Description
to_html() Convert an object(markdown, pandas

to_html()

def to_html(
    python_value: typing.Any,
):

Convert an object(markdown, pandas.dataframe) to HTML and return HTML as a unicode string. Returns: An HTML document as a string.

Parameter Type
python_value typing.Any

flytekit.types.structured.structured_dataset.Scalar

def Scalar(
    primitive: typing.Optional[flytekit.models.literals.Primitive],
    blob: typing.Optional[flytekit.models.literals.Blob],
    binary: typing.Optional[flytekit.models.literals.Binary],
    schema: typing.Optional[flytekit.models.literals.Schema],
    union: typing.Optional[flytekit.models.literals.Union],
    none_type: typing.Optional[flytekit.models.literals.Void],
    error: typing.Optional[flytekit.models.types.Error],
    generic: typing.Optional[google.protobuf.struct_pb2.Struct],
    structured_dataset: typing.Optional[flytekit.models.literals.StructuredDataset],
):

Scalar wrapper around Flyte types. Only one can be specified.

Parameter Type
primitive typing.Optional[flytekit.models.literals.Primitive]
blob typing.Optional[flytekit.models.literals.Blob]
binary typing.Optional[flytekit.models.literals.Binary]
schema typing.Optional[flytekit.models.literals.Schema]
union typing.Optional[flytekit.models.literals.Union]
none_type typing.Optional[flytekit.models.literals.Void]
error typing.Optional[flytekit.models.types.Error]
generic typing.Optional[google.protobuf.struct_pb2.Struct]
structured_dataset typing.Optional[flytekit.models.literals.StructuredDataset]

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
binary
blob
error
generic
is_empty
none_type
primitive
schema
structured_dataset
union
value

flytekit.types.structured.structured_dataset.SchemaType

def SchemaType(
    columns,
):
Parameter Type
columns

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    proto,
):
Parameter Type
proto

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
columns
is_empty

flytekit.types.structured.structured_dataset.SerializableType

flytekit.types.structured.structured_dataset.Struct

A ProtocolMessage

Methods

Method Description
get_or_create_list() Returns a list for this key, creating if it didn’t exist already
get_or_create_struct() Returns a struct for this key, creating if it didn’t exist already
items() None
keys() None
update() None
values() None

get_or_create_list()

def get_or_create_list(
    key,
):

Returns a list for this key, creating if it didn’t exist already.

Parameter Type
key

get_or_create_struct()

def get_or_create_struct(
    key,
):

Returns a struct for this key, creating if it didn’t exist already.

Parameter Type
key

items()

def items()

keys()

def keys()

update()

def update(
    dictionary,
):
Parameter Type
dictionary

values()

def values()

flytekit.types.structured.structured_dataset.StructuredDataset

This is the user facing StructuredDataset class. Please don’t confuse it with the literals.StructuredDataset class (that is just a model, a Python class representation of the protobuf).

def StructuredDataset(
    dataframe: typing.Optional[typing.Any],
    uri: typing.Optional[str],
    metadata: typing.Optional[literals.StructuredDatasetMetadata],
    kwargs,
):
Parameter Type
dataframe typing.Optional[typing.Any]
uri typing.Optional[str]
metadata typing.Optional[literals.StructuredDatasetMetadata]
kwargs **kwargs

Methods

Method Description
all() None
column_names() None
columns() None
deserialize_structured_dataset() None
from_dict() None
from_json() None
iter() None
open() None
serialize_structured_dataset() None
set_literal() A public wrapper method to set the StructuredDataset Literal
to_dict() None
to_json() None

all()

def all()

column_names()

def column_names()

columns()

def columns()

deserialize_structured_dataset()

def deserialize_structured_dataset(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

from_dict()

def from_dict(
    d,
    dialect,
):
Parameter Type
d
dialect

from_json()

def from_json(
    data: typing.Union[str, bytes, bytearray],
    decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
    from_dict_kwargs: typing.Any,
):
Parameter Type
data typing.Union[str, bytes, bytearray]
decoder collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]]
from_dict_kwargs typing.Any

iter()

def iter()

open()

def open(
    dataframe_type: Type[DF],
):
Parameter Type
dataframe_type Type[DF]

serialize_structured_dataset()

def serialize_structured_dataset(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

set_literal()

def set_literal(
    ctx: FlyteContext,
    expected: LiteralType,
):

A public wrapper method to set the StructuredDataset Literal.

This method provides external access to the internal _set_literal method.

Parameter Type
ctx FlyteContext
expected LiteralType

to_dict()

def to_dict()

to_json()

def to_json(
    encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
    to_dict_kwargs: typing.Any,
):
Parameter Type
encoder collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]]
to_dict_kwargs typing.Any

Properties

Property Type Description
dataframe
literal
metadata

flytekit.types.structured.structured_dataset.StructuredDatasetDecoder

Helper class that provides a standard way to create an ABC using inheritance.

def StructuredDatasetDecoder(
    python_type: Type[DF],
    protocol: Optional[str],
    supported_format: Optional[str],
    additional_protocols: Optional[List[str]],
):

Extend this abstract class, implement the decode function, and register your concrete class with the StructuredDatasetTransformerEngine class in order for the core flytekit type engine to handle dataframe libraries. This is the decoder interface, meaning it is used when there is a Flyte Literal value, and we have to get a Python value out of it. For the other way, see the StructuredDatasetEncoder

Parameter Type
python_type Type[DF]
protocol Optional[str]
supported_format Optional[str]
additional_protocols Optional[List[str]]

Methods

Method Description
decode() This is code that will be called by the dataset transformer engine to ultimately translate from a Flyte Literal

decode()

def decode(
    ctx: FlyteContext,
    flyte_value: literals.StructuredDataset,
    current_task_metadata: StructuredDatasetMetadata,
):

This is code that will be called by the dataset transformer engine to ultimately translate from a Flyte Literal value into a Python instance.

Parameter Type
ctx FlyteContext
flyte_value literals.StructuredDataset
current_task_metadata StructuredDatasetMetadata

Properties

Property Type Description
protocol
python_type
supported_format

flytekit.types.structured.structured_dataset.StructuredDatasetEncoder

Helper class that provides a standard way to create an ABC using inheritance.

def StructuredDatasetEncoder(
    python_type: Type[T],
    protocol: Optional[str],
    supported_format: Optional[str],
):

Extend this abstract class, implement the encode function, and register your concrete class with the StructuredDatasetTransformerEngine class in order for the core flytekit type engine to handle dataframe libraries. This is the encoding interface, meaning it is used when there is a Python value that the flytekit type engine is trying to convert into a Flyte Literal. For the other way, see the StructuredDatasetEncoder

Parameter Type
python_type Type[T]
protocol Optional[str]
supported_format Optional[str]

Methods

Method Description
encode() Even if the user code returns a plain dataframe instance, the dataset transformer engine will wrap the

encode()

def encode(
    ctx: FlyteContext,
    structured_dataset: StructuredDataset,
    structured_dataset_type: StructuredDatasetType,
):

Even if the user code returns a plain dataframe instance, the dataset transformer engine will wrap the incoming dataframe with defaults set for that dataframe type. This simplifies this function’s interface as a lot of data that could be specified by the user using the

TODO: Do we need to add a flag to indicate if it was wrapped by the transformer or by the user?

Parameter Type
ctx FlyteContext
structured_dataset StructuredDataset
structured_dataset_type StructuredDatasetType

Properties

Property Type Description
protocol
python_type
supported_format

flytekit.types.structured.structured_dataset.StructuredDatasetFormat

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

flytekit.types.structured.structured_dataset.StructuredDatasetMetadata

def StructuredDatasetMetadata(
    structured_dataset_type: typing.Optional[flytekit.models.types.StructuredDatasetType],
):
Parameter Type
structured_dataset_type typing.Optional[flytekit.models.types.StructuredDatasetType]

Methods

Method Description
from_flyte_idl() None
serialize_to_string() None
short_string()
to_flyte_idl() None
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object: flyteidl.core.literals_pb2.StructuredDatasetMetadata,
):
Parameter Type
pb2_object flyteidl.core.literals_pb2.StructuredDatasetMetadata

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
is_empty
structured_dataset_type

flytekit.types.structured.structured_dataset.StructuredDatasetTransformerEngine

Think of this transformer as a higher-level meta transformer that is used for all the dataframe types. If you are bringing a custom data frame type, or any data frame type, to flytekit, instead of registering with the main type engine, you should register with this transformer instead.

def StructuredDatasetTransformerEngine()

Methods

Method Description
assert_type() None
async_to_literal() Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type
async_to_python_value() The only tricky thing with converting a Literal (say the output of an earlier task), to a Python value at
dict_to_structured_dataset() None
encode() None
from_binary_idl() If the input is from flytekit, the Life Cycle will be as follows:
from_generic_idl() If the input is from Flyte Console, the Life Cycle will be as follows:
get_decoder() None
get_encoder() None
get_literal_type() Provide a concrete implementation so that writers of custom dataframe handlers since there’s nothing that
guess_python_type() Converts the Flyte LiteralType to a python object type
isinstance_generic() None
iter_as() None
open_as()
register() Call this with any Encoder or Decoder to register it with the flytekit type system
register_for_protocol() See the main register function instead
register_renderer() None
to_html() Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
to_literal() Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type
to_python_value() Converts the given Literal to a Python Type

assert_type()

def assert_type(
    t: Type[StructuredDataset],
    v: typing.Any,
):
Parameter Type
t Type[StructuredDataset]
v typing.Any

async_to_literal()

def async_to_literal(
    ctx: FlyteContext,
    python_val: Union[StructuredDataset, typing.Any],
    python_type: Union[Type[StructuredDataset], Type],
    expected: LiteralType,
):

Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch

Parameter Type
ctx FlyteContext
python_val Union[StructuredDataset, typing.Any]
python_type Union[Type[StructuredDataset], Type]
expected LiteralType

async_to_python_value()

def async_to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: Type[T] | StructuredDataset,
):

The only tricky thing with converting a Literal (say the output of an earlier task), to a Python value at the start of a task execution, is the column subsetting behavior. For example, if you have,

def t1() -> Annotated[StructuredDataset, kwtypes(col_a=int, col_b=float)]: … def t2(in_a: Annotated[StructuredDataset, kwtypes(col_b=float)]): …

where t2(in_a=t1()), when t2 does in_a.open(pd.DataFrame).all(), it should get a DataFrame with only one column.

+—————————–+—————————————–+————————————–+ | | StructuredDatasetType of the incoming Literal | +—————————–+—————————————–+————————————–+ | StructuredDatasetType | Has columns defined | [] columns or None | | of currently running task | | | +=============================+=========================================+======================================+ | Has columns | The StructuredDatasetType passed to the decoder will have the columns | | defined | as defined by the type annotation of the currently running task. | | | | | | Decoders should then subset the incoming data to the columns requested. | | | | +—————————–+—————————————–+————————————–+ | [] columns or None | StructuredDatasetType passed to decoder | StructuredDatasetType passed to the | | | will have the columns from the incoming | decoder will have an empty list of | | | Literal. This is the scenario where | columns. | | | the Literal returned by the running | | | | task will have more information than | | | | the running task’s signature. | | +—————————–+—————————————–+————————————–+

Parameter Type
ctx FlyteContext
lv Literal
expected_python_type `Type[T]

dict_to_structured_dataset()

def dict_to_structured_dataset(
    dict_obj: typing.Dict[str, str],
    expected_python_type: Type[T] | StructuredDataset,
):
Parameter Type
dict_obj typing.Dict[str, str]
expected_python_type `Type[T]

encode()

def encode(
    ctx: FlyteContext,
    sd: StructuredDataset,
    df_type: Type,
    protocol: str,
    format: str,
    structured_literal_type: StructuredDatasetType,
):
Parameter Type
ctx FlyteContext
sd StructuredDataset
df_type Type
protocol str
format str
structured_literal_type StructuredDatasetType

from_binary_idl()

def from_binary_idl(
    binary_idl_object: Binary,
    expected_python_type: Type[T] | StructuredDataset,
):

If the input is from flytekit, the Life Cycle will be as follows:

Life Cycle: binary IDL -> resolved binary -> bytes -> expected Python object (flytekit customized (propeller processing) (flytekit binary IDL) (flytekit customized serialization) deserialization)

Example Code: @dataclass class DC: sd: StructuredDataset

@workflow def wf(dc: DC): t_sd(dc.sd)

Note:

  • The deserialization is the same as put a structured dataset in a dataclass, which will deserialize by the mashumaro’s API.

Related PR:

Parameter Type
binary_idl_object Binary
expected_python_type `Type[T]

from_generic_idl()

def from_generic_idl(
    generic: Struct,
    expected_python_type: Type[T] | StructuredDataset,
):

If the input is from Flyte Console, the Life Cycle will be as follows:

Life Cycle: json str -> protobuf struct -> resolved protobuf struct -> expected Python object (console user input) (console output) (propeller) (flytekit customized deserialization)

Example Code: @dataclass class DC: sd: StructuredDataset

@workflow def wf(dc: DC): t_sd(dc.sd)

Note:

  • The deserialization is the same as put a structured dataset in a dataclass, which will deserialize by the mashumaro’s API.

Related PR:

Parameter Type
generic Struct
expected_python_type `Type[T]

get_decoder()

def get_decoder(
    df_type: Type,
    protocol: str,
    format: str,
):
Parameter Type
df_type Type
protocol str
format str

get_encoder()

def get_encoder(
    df_type: Type,
    protocol: str,
    format: str,
):
Parameter Type
df_type Type
protocol str
format str

get_literal_type()

def get_literal_type(
    t: typing.Union[Type[StructuredDataset], typing.Any],
):

Provide a concrete implementation so that writers of custom dataframe handlers since there’s nothing that special about the literal type. Any dataframe type will always be associated with the structured dataset type. The other aspects of it - columns, external schema type, etc. can be read from associated metadata.

Parameter Type
t typing.Union[Type[StructuredDataset], typing.Any]

guess_python_type()

def guess_python_type(
    literal_type: LiteralType,
):

Converts the Flyte LiteralType to a python object type.

Parameter Type
literal_type LiteralType

isinstance_generic()

def isinstance_generic(
    obj,
    generic_alias,
):
Parameter Type
obj
generic_alias

iter_as()

def iter_as(
    ctx: FlyteContext,
    sd: literals.StructuredDataset,
    df_type: Type[DF],
    updated_metadata: StructuredDatasetMetadata,
):
Parameter Type
ctx FlyteContext
sd literals.StructuredDataset
df_type Type[DF]
updated_metadata StructuredDatasetMetadata

open_as()

def open_as(
    ctx: FlyteContext,
    sd: literals.StructuredDataset,
    df_type: Type[DF],
    updated_metadata: StructuredDatasetMetadata,
):
Parameter Type
ctx FlyteContext
sd literals.StructuredDataset
df_type Type[DF]
updated_metadata StructuredDatasetMetadata

register()

def register(
    h: Handlers,
    default_for_type: bool,
    override: bool,
    default_format_for_type: bool,
    default_storage_for_type: bool,
):

Call this with any Encoder or Decoder to register it with the flytekit type system. If your handler does not specify a protocol (e.g. s3, gs, etc.) field, then

Parameter Type
h Handlers
default_for_type bool
override bool
default_format_for_type bool
default_storage_for_type bool

register_for_protocol()

def register_for_protocol(
    h: Handlers,
    protocol: str,
    default_for_type: bool,
    override: bool,
    default_format_for_type: bool,
    default_storage_for_type: bool,
):

See the main register function instead.

Parameter Type
h Handlers
protocol str
default_for_type bool
override bool
default_format_for_type bool
default_storage_for_type bool

register_renderer()

def register_renderer(
    python_type: Type,
    renderer: Renderable,
):
Parameter Type
python_type Type
renderer Renderable

to_html()

def to_html(
    ctx: FlyteContext,
    python_val: typing.Any,
    expected_python_type: Type[T],
):

Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div

Parameter Type
ctx FlyteContext
python_val typing.Any
expected_python_type Type[T]

to_literal()

def to_literal(
    ctx: FlyteContext,
    python_val: typing.Any,
    python_type: Type[T],
    expected: LiteralType,
):

Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch

Parameter Type
ctx FlyteContext
python_val typing.Any
python_type Type[T]
expected LiteralType

to_python_value()

def to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: Type[T],
):

Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised

Parameter Type
ctx FlyteContext
lv Literal
expected_python_type Type[T]

Properties

Property Type Description
is_async
name
python_type
type_assertions_enabled

flytekit.types.structured.structured_dataset.StructuredDatasetType

def StructuredDatasetType(
    columns: typing.List[flytekit.models.types.StructuredDatasetType.DatasetColumn],
    format: str,
    external_schema_type: str,
    external_schema_bytes: bytes,
):
Parameter Type
columns typing.List[flytekit.models.types.StructuredDatasetType.DatasetColumn]
format str
external_schema_type str
external_schema_bytes bytes

Methods

Method Description
from_flyte_idl() None
serialize_to_string() None
short_string()
to_flyte_idl() None
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    proto: flyteidl.core.types_pb2.StructuredDatasetType,
):
Parameter Type
proto flyteidl.core.types_pb2.StructuredDatasetType

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
columns
external_schema_bytes
external_schema_type
format
is_empty

flytekit.types.structured.structured_dataset.TypeEngine

Core Extensible TypeEngine of Flytekit. This should be used to extend the capabilities of FlyteKits type system. Users can implement their own TypeTransformers and register them with the TypeEngine. This will allow special handling of user objects

Methods

Method Description
async_to_literal() Converts a python value of a given type and expected LiteralType into a resolved Literal value
async_to_python_value() None
calculate_hash() None
dict_to_literal_map() None
dict_to_literal_map_pb() None
get_available_transformers() Returns all python types for which transformers are available
get_transformer() Implements a recursive search for the transformer
guess_python_type() Transforms a flyte-specific LiteralType to a regular python value
guess_python_types() Transforms a dictionary of flyte-specific Variable objects to a dictionary of regular python values
lazy_import_transformers() Only load the transformers if needed
literal_map_to_kwargs() None
named_tuple_to_variable_map() Converts a python-native NamedTuple to a flyte-specific VariableMap of named literals
register() This should be used for all types that respond with the right type annotation when you use type(
register_additional_type() None
register_restricted_type() None
to_html() None
to_literal() The current dance is because we are allowing users to call from an async function, this synchronous
to_literal_checks() None
to_literal_type() Converts a python type into a flyte specific LiteralType
to_python_value() Converts a Literal value with an expected python type into a python value
unwrap_offloaded_literal() None

async_to_literal()

def async_to_literal(
    ctx: FlyteContext,
    python_val: typing.Any,
    python_type: Type[T],
    expected: LiteralType,
):

Converts a python value of a given type and expected LiteralType into a resolved Literal value.

Parameter Type
ctx FlyteContext
python_val typing.Any
python_type Type[T]
expected LiteralType

async_to_python_value()

def async_to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: Type,
):
Parameter Type
ctx FlyteContext
lv Literal
expected_python_type Type

calculate_hash()

def calculate_hash(
    python_val: typing.Any,
    python_type: Type[T],
):
Parameter Type
python_val typing.Any
python_type Type[T]

dict_to_literal_map()

def dict_to_literal_map(
    ctx: FlyteContext,
    d: typing.Dict[str, typing.Any],
    type_hints: Optional[typing.Dict[str, type]],
):
Parameter Type
ctx FlyteContext
d typing.Dict[str, typing.Any]
type_hints Optional[typing.Dict[str, type]]

dict_to_literal_map_pb()

def dict_to_literal_map_pb(
    ctx: FlyteContext,
    d: typing.Dict[str, typing.Any],
    type_hints: Optional[typing.Dict[str, type]],
):
Parameter Type
ctx FlyteContext
d typing.Dict[str, typing.Any]
type_hints Optional[typing.Dict[str, type]]

get_available_transformers()

def get_available_transformers()

Returns all python types for which transformers are available

get_transformer()

def get_transformer(
    python_type: Type,
):

Implements a recursive search for the transformer.

Parameter Type
python_type Type

guess_python_type()

def guess_python_type(
    flyte_type: LiteralType,
):

Transforms a flyte-specific LiteralType to a regular python value.

Parameter Type
flyte_type LiteralType

guess_python_types()

def guess_python_types(
    flyte_variable_dict: typing.Dict[str, _interface_models.Variable],
):

Transforms a dictionary of flyte-specific Variable objects to a dictionary of regular python values.

Parameter Type
flyte_variable_dict typing.Dict[str, _interface_models.Variable]

lazy_import_transformers()

def lazy_import_transformers()

Only load the transformers if needed.

literal_map_to_kwargs()

def literal_map_to_kwargs(
    ctx: FlyteContext,
    lm: LiteralMap,
    python_types: typing.Optional[typing.Dict[str, type]],
    literal_types: typing.Optional[typing.Dict[str, _interface_models.Variable]],
):
Parameter Type
ctx FlyteContext
lm LiteralMap
python_types typing.Optional[typing.Dict[str, type]]
literal_types typing.Optional[typing.Dict[str, _interface_models.Variable]]

named_tuple_to_variable_map()

def named_tuple_to_variable_map(
    t: typing.NamedTuple,
):

Converts a python-native NamedTuple to a flyte-specific VariableMap of named literals.

Parameter Type
t typing.NamedTuple

register()

def register(
    transformer: TypeTransformer,
    additional_types: Optional[typing.List[Type]],
):

This should be used for all types that respond with the right type annotation when you use type(…) function

Parameter Type
transformer TypeTransformer
additional_types Optional[typing.List[Type]]

register_additional_type()

def register_additional_type(
    transformer: TypeTransformer[T],
    additional_type: Type[T],
    override,
):
Parameter Type
transformer TypeTransformer[T]
additional_type Type[T]
override

register_restricted_type()

def register_restricted_type(
    name: str,
    type: Type[T],
):
Parameter Type
name str
type Type[T]

to_html()

def to_html(
    ctx: FlyteContext,
    python_val: typing.Any,
    expected_python_type: Type[typing.Any],
):
Parameter Type
ctx FlyteContext
python_val typing.Any
expected_python_type Type[typing.Any]

to_literal()

def to_literal(
    ctx: FlyteContext,
    python_val: typing.Any,
    python_type: Type[T],
    expected: LiteralType,
):

The current dance is because we are allowing users to call from an async function, this synchronous to_literal function, and allowing this to_literal function, to then invoke yet another async function, namely an async transformer.

Parameter Type
ctx FlyteContext
python_val typing.Any
python_type Type[T]
expected LiteralType

to_literal_checks()

def to_literal_checks(
    python_val: typing.Any,
    python_type: Type[T],
    expected: LiteralType,
):
Parameter Type
python_val typing.Any
python_type Type[T]
expected LiteralType

to_literal_type()

def to_literal_type(
    python_type: Type[T],
):

Converts a python type into a flyte specific LiteralType

Parameter Type
python_type Type[T]

to_python_value()

def to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: Type,
):

Converts a Literal value with an expected python type into a python value.

Parameter Type
ctx FlyteContext
lv Literal
expected_python_type Type

unwrap_offloaded_literal()

def unwrap_offloaded_literal(
    ctx: FlyteContext,
    lv: Literal,
):
Parameter Type
ctx FlyteContext
lv Literal

flytekit.types.structured.structured_dataset.TypeTransformerFailedError

Inappropriate argument type.