flytekit.types.structured
Flytekit StructuredDataset
.. currentmodule:: flytekit.types.structured
.. autosummary::
:template: custom.rst
:toctree: generated/
StructuredDataset
StructuredDatasetDecoder
StructuredDatasetEncoder
Directory
Classes
Errors
flytekit.types.structured.ArrowRenderer
Render an Arrow dataframe as an HTML table.
Methods
to_html()
def to_html(
df: pyarrow.Table,
):
Parameter |
Type |
df |
pyarrow.Table |
flytekit.types.structured.DuplicateHandlerError
Inappropriate argument value (of correct type).
flytekit.types.structured.StructuredDataset
This is the user facing StructuredDataset class. Please don’t confuse it with the literals.StructuredDataset
class (that is just a model, a Python class representation of the protobuf).
def StructuredDataset(
dataframe: typing.Optional[typing.Any],
uri: typing.Optional[str],
metadata: typing.Optional[literals.StructuredDatasetMetadata],
kwargs,
):
Parameter |
Type |
dataframe |
typing.Optional[typing.Any] |
uri |
typing.Optional[str] |
metadata |
typing.Optional[literals.StructuredDatasetMetadata] |
kwargs |
**kwargs |
Methods
all()
column_names()
columns()
deserialize_structured_dataset()
def deserialize_structured_dataset(
args,
kwargs,
):
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
from_dict()
def from_dict(
d,
dialect,
):
from_json()
def from_json(
data: typing.Union[str, bytes, bytearray],
decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
from_dict_kwargs: typing.Any,
):
Parameter |
Type |
data |
typing.Union[str, bytes, bytearray] |
decoder |
collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]] |
from_dict_kwargs |
typing.Any |
iter()
open()
def open(
dataframe_type: Type[DF],
):
Parameter |
Type |
dataframe_type |
Type[DF] |
serialize_structured_dataset()
def serialize_structured_dataset(
args,
kwargs,
):
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
set_literal()
def set_literal(
ctx: FlyteContext,
expected: LiteralType,
):
A public wrapper method to set the StructuredDataset Literal.
This method provides external access to the internal _set_literal method.
Parameter |
Type |
ctx |
FlyteContext |
expected |
LiteralType |
to_dict()
to_json()
def to_json(
encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
to_dict_kwargs: typing.Any,
):
Parameter |
Type |
encoder |
collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]] |
to_dict_kwargs |
typing.Any |
Properties
Property |
Type |
Description |
dataframe |
|
|
literal |
|
|
metadata |
|
|
flytekit.types.structured.StructuredDatasetDecoder
Helper class that provides a standard way to create an ABC using
inheritance.
def StructuredDatasetDecoder(
python_type: Type[DF],
protocol: Optional[str],
supported_format: Optional[str],
additional_protocols: Optional[List[str]],
):
Extend this abstract class, implement the decode function, and register your concrete class with the
StructuredDatasetTransformerEngine class in order for the core flytekit type engine to handle
dataframe libraries. This is the decoder interface, meaning it is used when there is a Flyte Literal value,
and we have to get a Python value out of it. For the other way, see the StructuredDatasetEncoder
Parameter |
Type |
python_type |
Type[DF] |
protocol |
Optional[str] |
supported_format |
Optional[str] |
additional_protocols |
Optional[List[str]] |
Methods
Method |
Description |
decode() |
This is code that will be called by the dataset transformer engine to ultimately translate from a Flyte Literal |
decode()
def decode(
ctx: FlyteContext,
flyte_value: literals.StructuredDataset,
current_task_metadata: StructuredDatasetMetadata,
):
This is code that will be called by the dataset transformer engine to ultimately translate from a Flyte Literal
value into a Python instance.
Parameter |
Type |
ctx |
FlyteContext |
flyte_value |
literals.StructuredDataset |
current_task_metadata |
StructuredDatasetMetadata |
Properties
Property |
Type |
Description |
protocol |
|
|
python_type |
|
|
supported_format |
|
|
flytekit.types.structured.StructuredDatasetEncoder
Helper class that provides a standard way to create an ABC using
inheritance.
def StructuredDatasetEncoder(
python_type: Type[T],
protocol: Optional[str],
supported_format: Optional[str],
):
Extend this abstract class, implement the encode function, and register your concrete class with the
StructuredDatasetTransformerEngine class in order for the core flytekit type engine to handle
dataframe libraries. This is the encoding interface, meaning it is used when there is a Python value that the
flytekit type engine is trying to convert into a Flyte Literal. For the other way, see
the StructuredDatasetEncoder
Parameter |
Type |
python_type |
Type[T] |
protocol |
Optional[str] |
supported_format |
Optional[str] |
Methods
Method |
Description |
encode() |
Even if the user code returns a plain dataframe instance, the dataset transformer engine will wrap the |
encode()
def encode(
ctx: FlyteContext,
structured_dataset: StructuredDataset,
structured_dataset_type: StructuredDatasetType,
):
Even if the user code returns a plain dataframe instance, the dataset transformer engine will wrap the
incoming dataframe with defaults set for that dataframe
type. This simplifies this function’s interface as a lot of data that could be specified by the user using
the
Parameter |
Type |
ctx |
FlyteContext |
structured_dataset |
StructuredDataset |
structured_dataset_type |
StructuredDatasetType |
Properties
Property |
Type |
Description |
protocol |
|
|
python_type |
|
|
supported_format |
|
|
Think of this transformer as a higher-level meta transformer that is used for all the dataframe types.
If you are bringing a custom data frame type, or any data frame type, to flytekit, instead of
registering with the main type engine, you should register with this transformer instead.
def StructuredDatasetTransformerEngine()
Methods
assert_type()
def assert_type(
t: Type[StructuredDataset],
v: typing.Any,
):
Parameter |
Type |
t |
Type[StructuredDataset] |
v |
typing.Any |
async_to_literal()
def async_to_literal(
ctx: FlyteContext,
python_val: Union[StructuredDataset, typing.Any],
python_type: Union[Type[StructuredDataset], Type],
expected: LiteralType,
):
Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type.
Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these
do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating
what was the mismatch
Parameter |
Type |
ctx |
FlyteContext |
python_val |
Union[StructuredDataset, typing.Any] |
python_type |
Union[Type[StructuredDataset], Type] |
expected |
LiteralType |
async_to_python_value()
def async_to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type[T] | StructuredDataset,
):
The only tricky thing with converting a Literal (say the output of an earlier task), to a Python value at
the start of a task execution, is the column subsetting behavior. For example, if you have,
def t1() -> Annotated[StructuredDataset, kwtypes(col_a=int, col_b=float)]: …
def t2(in_a: Annotated[StructuredDataset, kwtypes(col_b=float)]): …
where t2(in_a=t1()), when t2 does in_a.open(pd.DataFrame).all(), it should get a DataFrame
with only one column.
+—————————–+—————————————–+————————————–+
| | StructuredDatasetType of the incoming Literal |
+—————————–+—————————————–+————————————–+
| StructuredDatasetType | Has columns defined | [] columns or None |
| of currently running task | | |
+=============================+=========================================+======================================+
| Has columns | The StructuredDatasetType passed to the decoder will have the columns |
| defined | as defined by the type annotation of the currently running task. |
| | |
| | Decoders should then subset the incoming data to the columns requested. |
| | |
+—————————–+—————————————–+————————————–+
| [] columns or None | StructuredDatasetType passed to decoder | StructuredDatasetType passed to the |
| | will have the columns from the incoming | decoder will have an empty list of |
| | Literal. This is the scenario where | columns. |
| | the Literal returned by the running | |
| | task will have more information than | |
| | the running task’s signature. | |
+—————————–+—————————————–+————————————–+
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
`Type[T] |
dict_to_structured_dataset()
def dict_to_structured_dataset(
dict_obj: typing.Dict[str, str],
expected_python_type: Type[T] | StructuredDataset,
):
Parameter |
Type |
dict_obj |
typing.Dict[str, str] |
expected_python_type |
`Type[T] |
encode()
def encode(
ctx: FlyteContext,
sd: StructuredDataset,
df_type: Type,
protocol: str,
format: str,
structured_literal_type: StructuredDatasetType,
):
Parameter |
Type |
ctx |
FlyteContext |
sd |
StructuredDataset |
df_type |
Type |
protocol |
str |
format |
str |
structured_literal_type |
StructuredDatasetType |
from_binary_idl()
def from_binary_idl(
binary_idl_object: Binary,
expected_python_type: Type[T] | StructuredDataset,
):
If the input is from flytekit, the Life Cycle will be as follows:
Life Cycle:
binary IDL -> resolved binary -> bytes -> expected Python object
(flytekit customized (propeller processing) (flytekit binary IDL) (flytekit customized
serialization) deserialization)
Example Code:
@dataclass
class DC:
sd: StructuredDataset
@workflow
def wf(dc: DC):
t_sd(dc.sd)
Note:
- The deserialization is the same as put a structured dataset in a dataclass, which will deserialize by the mashumaro’s API.
Related PR:
Parameter |
Type |
binary_idl_object |
Binary |
expected_python_type |
`Type[T] |
from_generic_idl()
def from_generic_idl(
generic: Struct,
expected_python_type: Type[T] | StructuredDataset,
):
If the input is from Flyte Console, the Life Cycle will be as follows:
Life Cycle:
json str -> protobuf struct -> resolved protobuf struct -> expected Python object
(console user input) (console output) (propeller) (flytekit customized deserialization)
Example Code:
@dataclass
class DC:
sd: StructuredDataset
@workflow
def wf(dc: DC):
t_sd(dc.sd)
Note:
- The deserialization is the same as put a structured dataset in a dataclass, which will deserialize by the mashumaro’s API.
Related PR:
Parameter |
Type |
generic |
Struct |
expected_python_type |
`Type[T] |
get_decoder()
def get_decoder(
df_type: Type,
protocol: str,
format: str,
):
Parameter |
Type |
df_type |
Type |
protocol |
str |
format |
str |
get_encoder()
def get_encoder(
df_type: Type,
protocol: str,
format: str,
):
Parameter |
Type |
df_type |
Type |
protocol |
str |
format |
str |
get_literal_type()
def get_literal_type(
t: typing.Union[Type[StructuredDataset], typing.Any],
):
Provide a concrete implementation so that writers of custom dataframe handlers since there’s nothing that
special about the literal type. Any dataframe type will always be associated with the structured dataset type.
The other aspects of it - columns, external schema type, etc. can be read from associated metadata.
Parameter |
Type |
t |
typing.Union[Type[StructuredDataset], typing.Any] |
guess_python_type()
def guess_python_type(
literal_type: LiteralType,
):
Converts the Flyte LiteralType to a python object type.
Parameter |
Type |
literal_type |
LiteralType |
isinstance_generic()
def isinstance_generic(
obj,
generic_alias,
):
Parameter |
Type |
obj |
|
generic_alias |
|
iter_as()
def iter_as(
ctx: FlyteContext,
sd: literals.StructuredDataset,
df_type: Type[DF],
updated_metadata: StructuredDatasetMetadata,
):
Parameter |
Type |
ctx |
FlyteContext |
sd |
literals.StructuredDataset |
df_type |
Type[DF] |
updated_metadata |
StructuredDatasetMetadata |
open_as()
def open_as(
ctx: FlyteContext,
sd: literals.StructuredDataset,
df_type: Type[DF],
updated_metadata: StructuredDatasetMetadata,
):
Parameter |
Type |
ctx |
FlyteContext |
sd |
literals.StructuredDataset |
df_type |
Type[DF] |
updated_metadata |
StructuredDatasetMetadata |
register()
def register(
h: Handlers,
default_for_type: bool,
override: bool,
default_format_for_type: bool,
default_storage_for_type: bool,
):
Call this with any Encoder or Decoder to register it with the flytekit type system. If your handler does not
specify a protocol (e.g. s3, gs, etc.) field, then
Parameter |
Type |
h |
Handlers |
default_for_type |
bool |
override |
bool |
default_format_for_type |
bool |
default_storage_for_type |
bool |
register_for_protocol()
def register_for_protocol(
h: Handlers,
protocol: str,
default_for_type: bool,
override: bool,
default_format_for_type: bool,
default_storage_for_type: bool,
):
See the main register function instead.
Parameter |
Type |
h |
Handlers |
protocol |
str |
default_for_type |
bool |
override |
bool |
default_format_for_type |
bool |
default_storage_for_type |
bool |
register_renderer()
def register_renderer(
python_type: Type,
renderer: Renderable,
):
Parameter |
Type |
python_type |
Type |
renderer |
Renderable |
to_html()
def to_html(
ctx: FlyteContext,
python_val: typing.Any,
expected_python_type: Type[T],
):
Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
expected_python_type |
Type[T] |
to_literal()
def to_literal(
ctx: FlyteContext,
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
):
Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type.
Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these
do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating
what was the mismatch
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
python_type |
Type[T] |
expected |
LiteralType |
to_python_value()
def to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type[T],
):
Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
Type[T] |
Properties
Property |
Type |
Description |
is_async |
|
|
name |
|
|
python_type |
|
|
type_assertions_enabled |
|
|
flytekit.types.structured.TopFrameRenderer
Render a DataFrame as an HTML table.
def TopFrameRenderer(
max_rows: int,
max_cols: int,
):
Parameter |
Type |
max_rows |
int |
max_cols |
int |
Methods
to_html()
def to_html(
df: pandas.DataFrame,
):
Parameter |
Type |
df |
pandas.DataFrame |