1.15.4.dev2+g3e3ce2426

flytekit.types.structured

Flytekit StructuredDataset

.. currentmodule:: flytekit.types.structured

.. autosummary:: :template: custom.rst :toctree: generated/

StructuredDataset
StructuredDatasetDecoder
StructuredDatasetEncoder

Directory

Classes

Class Description
ArrowRenderer Render an Arrow dataframe as an HTML table.
StructuredDataset This is the user facing StructuredDataset class.
StructuredDatasetDecoder Helper class that provides a standard way to create an ABC using.
StructuredDatasetEncoder Helper class that provides a standard way to create an ABC using.
StructuredDatasetTransformerEngine Think of this transformer as a higher-level meta transformer that is used for all the dataframe types.
TopFrameRenderer Render a DataFrame as an HTML table.

Errors

flytekit.types.structured.ArrowRenderer

Render an Arrow dataframe as an HTML table.

Methods

Method Description
to_html() None

to_html()

def to_html(
    df: pyarrow.Table,
):
Parameter Type
df pyarrow.Table

flytekit.types.structured.DuplicateHandlerError

Inappropriate argument value (of correct type).

flytekit.types.structured.StructuredDataset

This is the user facing StructuredDataset class. Please don’t confuse it with the literals.StructuredDataset class (that is just a model, a Python class representation of the protobuf).

def StructuredDataset(
    dataframe: typing.Optional[typing.Any],
    uri: typing.Optional[str],
    metadata: typing.Optional[literals.StructuredDatasetMetadata],
    kwargs,
):
Parameter Type
dataframe typing.Optional[typing.Any]
uri typing.Optional[str]
metadata typing.Optional[literals.StructuredDatasetMetadata]
kwargs **kwargs

Methods

Method Description
all() None
column_names() None
columns() None
deserialize_structured_dataset() None
from_dict() None
from_json() None
iter() None
open() None
serialize_structured_dataset() None
set_literal() A public wrapper method to set the StructuredDataset Literal
to_dict() None
to_json() None

all()

def all()

column_names()

def column_names()

columns()

def columns()

deserialize_structured_dataset()

def deserialize_structured_dataset(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

from_dict()

def from_dict(
    d,
    dialect,
):
Parameter Type
d
dialect

from_json()

def from_json(
    data: typing.Union[str, bytes, bytearray],
    decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
    from_dict_kwargs: typing.Any,
):
Parameter Type
data typing.Union[str, bytes, bytearray]
decoder collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]]
from_dict_kwargs typing.Any

iter()

def iter()

open()

def open(
    dataframe_type: Type[DF],
):
Parameter Type
dataframe_type Type[DF]

serialize_structured_dataset()

def serialize_structured_dataset(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

set_literal()

def set_literal(
    ctx: FlyteContext,
    expected: LiteralType,
):

A public wrapper method to set the StructuredDataset Literal.

This method provides external access to the internal _set_literal method.

Parameter Type
ctx FlyteContext
expected LiteralType

to_dict()

def to_dict()

to_json()

def to_json(
    encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
    to_dict_kwargs: typing.Any,
):
Parameter Type
encoder collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]]
to_dict_kwargs typing.Any

Properties

Property Type Description
dataframe
literal
metadata

flytekit.types.structured.StructuredDatasetDecoder

Helper class that provides a standard way to create an ABC using inheritance.

def StructuredDatasetDecoder(
    python_type: Type[DF],
    protocol: Optional[str],
    supported_format: Optional[str],
    additional_protocols: Optional[List[str]],
):

Extend this abstract class, implement the decode function, and register your concrete class with the StructuredDatasetTransformerEngine class in order for the core flytekit type engine to handle dataframe libraries. This is the decoder interface, meaning it is used when there is a Flyte Literal value, and we have to get a Python value out of it. For the other way, see the StructuredDatasetEncoder

Parameter Type
python_type Type[DF]
protocol Optional[str]
supported_format Optional[str]
additional_protocols Optional[List[str]]

Methods

Method Description
decode() This is code that will be called by the dataset transformer engine to ultimately translate from a Flyte Literal

decode()

def decode(
    ctx: FlyteContext,
    flyte_value: literals.StructuredDataset,
    current_task_metadata: StructuredDatasetMetadata,
):

This is code that will be called by the dataset transformer engine to ultimately translate from a Flyte Literal value into a Python instance.

Parameter Type
ctx FlyteContext
flyte_value literals.StructuredDataset
current_task_metadata StructuredDatasetMetadata

Properties

Property Type Description
protocol
python_type
supported_format

flytekit.types.structured.StructuredDatasetEncoder

Helper class that provides a standard way to create an ABC using inheritance.

def StructuredDatasetEncoder(
    python_type: Type[T],
    protocol: Optional[str],
    supported_format: Optional[str],
):

Extend this abstract class, implement the encode function, and register your concrete class with the StructuredDatasetTransformerEngine class in order for the core flytekit type engine to handle dataframe libraries. This is the encoding interface, meaning it is used when there is a Python value that the flytekit type engine is trying to convert into a Flyte Literal. For the other way, see the StructuredDatasetEncoder

Parameter Type
python_type Type[T]
protocol Optional[str]
supported_format Optional[str]

Methods

Method Description
encode() Even if the user code returns a plain dataframe instance, the dataset transformer engine will wrap the

encode()

def encode(
    ctx: FlyteContext,
    structured_dataset: StructuredDataset,
    structured_dataset_type: StructuredDatasetType,
):

Even if the user code returns a plain dataframe instance, the dataset transformer engine will wrap the incoming dataframe with defaults set for that dataframe type. This simplifies this function’s interface as a lot of data that could be specified by the user using the

TODO: Do we need to add a flag to indicate if it was wrapped by the transformer or by the user?

Parameter Type
ctx FlyteContext
structured_dataset StructuredDataset
structured_dataset_type StructuredDatasetType

Properties

Property Type Description
protocol
python_type
supported_format

flytekit.types.structured.StructuredDatasetTransformerEngine

Think of this transformer as a higher-level meta transformer that is used for all the dataframe types. If you are bringing a custom data frame type, or any data frame type, to flytekit, instead of registering with the main type engine, you should register with this transformer instead.

def StructuredDatasetTransformerEngine()

Methods

Method Description
assert_type() None
async_to_literal() Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type
async_to_python_value() The only tricky thing with converting a Literal (say the output of an earlier task), to a Python value at
dict_to_structured_dataset() None
encode() None
from_binary_idl() If the input is from flytekit, the Life Cycle will be as follows:
from_generic_idl() If the input is from Flyte Console, the Life Cycle will be as follows:
get_decoder() None
get_encoder() None
get_literal_type() Provide a concrete implementation so that writers of custom dataframe handlers since there’s nothing that
guess_python_type() Converts the Flyte LiteralType to a python object type
isinstance_generic() None
iter_as() None
open_as()
register() Call this with any Encoder or Decoder to register it with the flytekit type system
register_for_protocol() See the main register function instead
register_renderer() None
to_html() Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
to_literal() Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type
to_python_value() Converts the given Literal to a Python Type

assert_type()

def assert_type(
    t: Type[StructuredDataset],
    v: typing.Any,
):
Parameter Type
t Type[StructuredDataset]
v typing.Any

async_to_literal()

def async_to_literal(
    ctx: FlyteContext,
    python_val: Union[StructuredDataset, typing.Any],
    python_type: Union[Type[StructuredDataset], Type],
    expected: LiteralType,
):

Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch

Parameter Type
ctx FlyteContext
python_val Union[StructuredDataset, typing.Any]
python_type Union[Type[StructuredDataset], Type]
expected LiteralType

async_to_python_value()

def async_to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: Type[T] | StructuredDataset,
):

The only tricky thing with converting a Literal (say the output of an earlier task), to a Python value at the start of a task execution, is the column subsetting behavior. For example, if you have,

def t1() -> Annotated[StructuredDataset, kwtypes(col_a=int, col_b=float)]: … def t2(in_a: Annotated[StructuredDataset, kwtypes(col_b=float)]): …

where t2(in_a=t1()), when t2 does in_a.open(pd.DataFrame).all(), it should get a DataFrame with only one column.

+—————————–+—————————————–+————————————–+ | | StructuredDatasetType of the incoming Literal | +—————————–+—————————————–+————————————–+ | StructuredDatasetType | Has columns defined | [] columns or None | | of currently running task | | | +=============================+=========================================+======================================+ | Has columns | The StructuredDatasetType passed to the decoder will have the columns | | defined | as defined by the type annotation of the currently running task. | | | | | | Decoders should then subset the incoming data to the columns requested. | | | | +—————————–+—————————————–+————————————–+ | [] columns or None | StructuredDatasetType passed to decoder | StructuredDatasetType passed to the | | | will have the columns from the incoming | decoder will have an empty list of | | | Literal. This is the scenario where | columns. | | | the Literal returned by the running | | | | task will have more information than | | | | the running task’s signature. | | +—————————–+—————————————–+————————————–+

Parameter Type
ctx FlyteContext
lv Literal
expected_python_type `Type[T]

dict_to_structured_dataset()

def dict_to_structured_dataset(
    dict_obj: typing.Dict[str, str],
    expected_python_type: Type[T] | StructuredDataset,
):
Parameter Type
dict_obj typing.Dict[str, str]
expected_python_type `Type[T]

encode()

def encode(
    ctx: FlyteContext,
    sd: StructuredDataset,
    df_type: Type,
    protocol: str,
    format: str,
    structured_literal_type: StructuredDatasetType,
):
Parameter Type
ctx FlyteContext
sd StructuredDataset
df_type Type
protocol str
format str
structured_literal_type StructuredDatasetType

from_binary_idl()

def from_binary_idl(
    binary_idl_object: Binary,
    expected_python_type: Type[T] | StructuredDataset,
):

If the input is from flytekit, the Life Cycle will be as follows:

Life Cycle: binary IDL -> resolved binary -> bytes -> expected Python object (flytekit customized (propeller processing) (flytekit binary IDL) (flytekit customized serialization) deserialization)

Example Code: @dataclass class DC: sd: StructuredDataset

@workflow def wf(dc: DC): t_sd(dc.sd)

Note:

  • The deserialization is the same as put a structured dataset in a dataclass, which will deserialize by the mashumaro’s API.

Related PR:

Parameter Type
binary_idl_object Binary
expected_python_type `Type[T]

from_generic_idl()

def from_generic_idl(
    generic: Struct,
    expected_python_type: Type[T] | StructuredDataset,
):

If the input is from Flyte Console, the Life Cycle will be as follows:

Life Cycle: json str -> protobuf struct -> resolved protobuf struct -> expected Python object (console user input) (console output) (propeller) (flytekit customized deserialization)

Example Code: @dataclass class DC: sd: StructuredDataset

@workflow def wf(dc: DC): t_sd(dc.sd)

Note:

  • The deserialization is the same as put a structured dataset in a dataclass, which will deserialize by the mashumaro’s API.

Related PR:

Parameter Type
generic Struct
expected_python_type `Type[T]

get_decoder()

def get_decoder(
    df_type: Type,
    protocol: str,
    format: str,
):
Parameter Type
df_type Type
protocol str
format str

get_encoder()

def get_encoder(
    df_type: Type,
    protocol: str,
    format: str,
):
Parameter Type
df_type Type
protocol str
format str

get_literal_type()

def get_literal_type(
    t: typing.Union[Type[StructuredDataset], typing.Any],
):

Provide a concrete implementation so that writers of custom dataframe handlers since there’s nothing that special about the literal type. Any dataframe type will always be associated with the structured dataset type. The other aspects of it - columns, external schema type, etc. can be read from associated metadata.

Parameter Type
t typing.Union[Type[StructuredDataset], typing.Any]

guess_python_type()

def guess_python_type(
    literal_type: LiteralType,
):

Converts the Flyte LiteralType to a python object type.

Parameter Type
literal_type LiteralType

isinstance_generic()

def isinstance_generic(
    obj,
    generic_alias,
):
Parameter Type
obj
generic_alias

iter_as()

def iter_as(
    ctx: FlyteContext,
    sd: literals.StructuredDataset,
    df_type: Type[DF],
    updated_metadata: StructuredDatasetMetadata,
):
Parameter Type
ctx FlyteContext
sd literals.StructuredDataset
df_type Type[DF]
updated_metadata StructuredDatasetMetadata

open_as()

def open_as(
    ctx: FlyteContext,
    sd: literals.StructuredDataset,
    df_type: Type[DF],
    updated_metadata: StructuredDatasetMetadata,
):
Parameter Type
ctx FlyteContext
sd literals.StructuredDataset
df_type Type[DF]
updated_metadata StructuredDatasetMetadata

register()

def register(
    h: Handlers,
    default_for_type: bool,
    override: bool,
    default_format_for_type: bool,
    default_storage_for_type: bool,
):

Call this with any Encoder or Decoder to register it with the flytekit type system. If your handler does not specify a protocol (e.g. s3, gs, etc.) field, then

Parameter Type
h Handlers
default_for_type bool
override bool
default_format_for_type bool
default_storage_for_type bool

register_for_protocol()

def register_for_protocol(
    h: Handlers,
    protocol: str,
    default_for_type: bool,
    override: bool,
    default_format_for_type: bool,
    default_storage_for_type: bool,
):

See the main register function instead.

Parameter Type
h Handlers
protocol str
default_for_type bool
override bool
default_format_for_type bool
default_storage_for_type bool

register_renderer()

def register_renderer(
    python_type: Type,
    renderer: Renderable,
):
Parameter Type
python_type Type
renderer Renderable

to_html()

def to_html(
    ctx: FlyteContext,
    python_val: typing.Any,
    expected_python_type: Type[T],
):

Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div

Parameter Type
ctx FlyteContext
python_val typing.Any
expected_python_type Type[T]

to_literal()

def to_literal(
    ctx: FlyteContext,
    python_val: typing.Any,
    python_type: Type[T],
    expected: LiteralType,
):

Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch

Parameter Type
ctx FlyteContext
python_val typing.Any
python_type Type[T]
expected LiteralType

to_python_value()

def to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: Type[T],
):

Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised

Parameter Type
ctx FlyteContext
lv Literal
expected_python_type Type[T]

Properties

Property Type Description
is_async
name
python_type
type_assertions_enabled

flytekit.types.structured.TopFrameRenderer

Render a DataFrame as an HTML table.

def TopFrameRenderer(
    max_rows: int,
    max_cols: int,
):
Parameter Type
max_rows int
max_cols int

Methods

Method Description
to_html() None

to_html()

def to_html(
    df: pandas.DataFrame,
):
Parameter Type
df pandas.DataFrame