flytekit.types.structured.structured_dataset
Directory
Classes
Errors
flytekit.types.structured.structured_dataset.ABC
Helper class that provides a standard way to create an ABC using
inheritance.
flytekit.types.structured.structured_dataset.Annotated
Add context-specific metadata to a type.
Example: Annotated[int, runtime_check.Unsigned] indicates to the
hypothetical runtime_check module that this type is an unsigned int.
Every other consumer of this type can ignore this metadata and treat
this type as int.
The first argument to Annotated must be a valid type.
Details:
- It’s an error to call
Annotated
with less than two arguments.
- Access the metadata via the
__metadata__
attribute::
assert Annotated[int, ‘$’].metadata == (’$’,)
- Nested Annotated types are flattened::
assert Annotated[Annotated[T, Ann1, Ann2], Ann3] == Annotated[T, Ann1, Ann2, Ann3]
- Instantiating an annotated type is equivalent to instantiating the
underlying type::
assert AnnotatedC, Ann1 == C(5)
- Annotated can be used as a generic type alias::
type Optimized[T] = Annotated[T, runtime.Optimize()]
type checker will treat Optimized[int]
as equivalent to Annotated[int, runtime.Optimize()]
type OptimizedList[T] = Annotated[list[T], runtime.Optimize()]
type checker will treat OptimizedList[int]
as equivalent to Annotated[list[int], runtime.Optimize()]
- Annotated cannot be used with an unpacked TypeVarTuple::
type Variadic[*Ts] = Annotated[*Ts, Ann1] # NOT valid
This would be equivalent to::
Annotated[T1, T2, T3, …, Ann1]
where T1, T2 etc. are TypeVars, which would be invalid, because
only one type should be passed to Annotated.
Base transformer type that should be implemented for every python native type that can be handled by flytekit
def AsyncTypeTransformer(
name: str,
t: Type[T],
enable_type_assertions: bool,
):
Parameter |
Type |
name |
str |
t |
Type[T] |
enable_type_assertions |
bool |
Methods
assert_type()
def assert_type(
t: Type[T],
v: T,
):
Parameter |
Type |
t |
Type[T] |
v |
T |
async_to_literal()
def async_to_literal(
ctx: FlyteContext,
python_val: T,
python_type: Type[T],
expected: LiteralType,
):
Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type.
Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these
do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating
what was the mismatch
Parameter |
Type |
ctx |
FlyteContext |
python_val |
T |
python_type |
Type[T] |
expected |
LiteralType |
async_to_python_value()
def async_to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type[T],
):
Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
Type[T] |
from_binary_idl()
def from_binary_idl(
binary_idl_object: Binary,
expected_python_type: Type[T],
):
This function primarily handles deserialization for untyped dicts, dataclasses, Pydantic BaseModels, and attribute access.ï½€
For untyped dict, dataclass, and pydantic basemodel:
Life Cycle (Untyped Dict as example):
python val -> msgpack bytes -> binary literal scalar -> msgpack bytes -> python val
(to_literal) (from_binary_idl)
For attribute access:
Life Cycle:
python val -> msgpack bytes -> binary literal scalar -> resolved golang value -> binary literal scalar -> msgpack bytes -> python val
(to_literal) (propeller attribute access) (from_binary_idl)
Parameter |
Type |
binary_idl_object |
Binary |
expected_python_type |
Type[T] |
from_generic_idl()
def from_generic_idl(
generic: Struct,
expected_python_type: Type[T],
):
TODO: Support all Flyte Types.
This is for dataclass attribute access from input created from the Flyte Console.
Note:
- This can be removed in the future when the Flyte Console support generate Binary IDL Scalar as input.
Parameter |
Type |
generic |
Struct |
expected_python_type |
Type[T] |
get_literal_type()
def get_literal_type(
t: Type[T],
):
Converts the python type to a Flyte LiteralType
guess_python_type()
def guess_python_type(
literal_type: LiteralType,
):
Converts the Flyte LiteralType to a python object type.
Parameter |
Type |
literal_type |
LiteralType |
isinstance_generic()
def isinstance_generic(
obj,
generic_alias,
):
Parameter |
Type |
obj |
|
generic_alias |
|
to_html()
def to_html(
ctx: FlyteContext,
python_val: T,
expected_python_type: Type[T],
):
Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
Parameter |
Type |
ctx |
FlyteContext |
python_val |
T |
expected_python_type |
Type[T] |
to_literal()
def to_literal(
ctx: FlyteContext,
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
):
Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type.
Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these
do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating
what was the mismatch
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
python_type |
Type[T] |
expected |
LiteralType |
to_python_value()
def to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type[T],
):
Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
Type[T] |
Properties
Property |
Type |
Description |
is_async |
|
|
name |
|
|
python_type |
|
|
type_assertions_enabled |
|
|
flytekit.types.structured.structured_dataset.Binary
def Binary(
value,
tag,
):
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
is_empty |
|
|
tag |
|
|
value |
|
|
flytekit.types.structured.structured_dataset.DataClassJSONMixin
Methods
from_dict()
def from_dict(
d,
dialect,
):
from_json()
def from_json(
data: typing.Union[str, bytes, bytearray],
decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
from_dict_kwargs: typing.Any,
):
Parameter |
Type |
data |
typing.Union[str, bytes, bytearray] |
decoder |
collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]] |
from_dict_kwargs |
typing.Any |
to_dict()
to_json()
def to_json(
encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
to_dict_kwargs: typing.Any,
):
Parameter |
Type |
encoder |
collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]] |
to_dict_kwargs |
typing.Any |
flytekit.types.structured.structured_dataset.DuplicateHandlerError
Inappropriate argument value (of correct type).
flytekit.types.structured.structured_dataset.FlyteContext
This is an internal-facing context object, that most users will not have to deal with. It’s essentially a globally
available grab bag of settings and objects that allows flytekit to do things like convert complex types, run and
compile workflows, serialize Flyte entities, etc.
Even though this object as a current_context
function on it, it should not be called directly. Please use the
:py:class:flytekit.FlyteContextManager
object instead.
Please do not confuse this object with the :py:class:flytekit.ExecutionParameters
object.
def FlyteContext(
file_access: FileAccessProvider,
level: int,
flyte_client: Optional['friendly_client.SynchronousFlyteClient'],
compilation_state: Optional[CompilationState],
execution_state: Optional[ExecutionState],
serialization_settings: Optional[SerializationSettings],
in_a_condition: bool,
origin_stackframe: Optional[traceback.FrameSummary],
output_metadata_tracker: Optional[OutputMetadataTracker],
worker_queue: Optional[Controller],
):
Parameter |
Type |
file_access |
FileAccessProvider |
level |
int |
flyte_client |
Optional['friendly_client.SynchronousFlyteClient'] |
compilation_state |
Optional[CompilationState] |
execution_state |
Optional[ExecutionState] |
serialization_settings |
Optional[SerializationSettings] |
in_a_condition |
bool |
origin_stackframe |
Optional[traceback.FrameSummary] |
output_metadata_tracker |
Optional[OutputMetadataTracker] |
worker_queue |
Optional[Controller] |
Methods
current_context()
This method exists only to maintain backwards compatibility. Please use
FlyteContextManager.current_context()
instead.
Users of flytekit should be wary not to confuse the object returned from this function
with :py:func:flytekit.current_context
enter_conditional_section()
def enter_conditional_section()
get_deck()
Returns the deck that was created as part of the last execution.
The return value depends on the execution environment. In a notebook, the return value is compatible with
IPython.display and should be rendered in the notebook.
.. code-block:: python
with flytekit.new_context() as ctx:
my_task(…)
ctx.get_deck()
OR if you wish to explicitly display
.. code-block:: python
from IPython import display
display(ctx.get_deck())
get_origin_stackframe_repr()
def get_origin_stackframe_repr()
new_builder()
new_compilation_state()
def new_compilation_state(
prefix: str,
):
Creates and returns a default compilation state. For most of the code this should be the entrypoint
of compilation, otherwise the code should always uses - with_compilation_state
Parameter |
Type |
prefix |
str |
new_execution_state()
def new_execution_state(
working_dir: Optional[os.PathLike],
):
Creates and returns a new default execution state. This should be used at the entrypoint of execution,
in all other cases it is preferable to use with_execution_state
Parameter |
Type |
working_dir |
Optional[os.PathLike] |
set_stackframe()
def set_stackframe(
s: traceback.FrameSummary,
):
Parameter |
Type |
s |
traceback.FrameSummary |
with_client()
def with_client(
c: SynchronousFlyteClient,
):
Parameter |
Type |
c |
SynchronousFlyteClient |
with_compilation_state()
def with_compilation_state(
c: CompilationState,
):
Parameter |
Type |
c |
CompilationState |
with_execution_state()
def with_execution_state(
es: ExecutionState,
):
Parameter |
Type |
es |
ExecutionState |
with_file_access()
def with_file_access(
fa: FileAccessProvider,
):
Parameter |
Type |
fa |
FileAccessProvider |
with_new_compilation_state()
def with_new_compilation_state()
def with_output_metadata_tracker(
t: OutputMetadataTracker,
):
Parameter |
Type |
t |
OutputMetadataTracker |
with_serialization_settings()
def with_serialization_settings(
ss: SerializationSettings,
):
Parameter |
Type |
ss |
SerializationSettings |
with_worker_queue()
def with_worker_queue(
wq: Controller,
):
Parameter |
Type |
wq |
Controller |
Properties
Property |
Type |
Description |
user_space_params |
|
|
flytekit.types.structured.structured_dataset.FlyteContextManager
FlyteContextManager manages the execution context within Flytekit. It holds global state of either compilation
or Execution. It is not thread-safe and can only be run as a single threaded application currently.
Context’s within Flytekit is useful to manage compilation state and execution state. Refer to CompilationState
and ExecutionState
for more information. FlyteContextManager provides a singleton stack to manage these contexts.
Typical usage is
.. code-block:: python
FlyteContextManager.initialize()
with FlyteContextManager.with_context(o) as ctx:
pass
If required - not recommended you can use
FlyteContextManager.push_context()
but correspondingly a pop_context should be called
FlyteContextManager.pop_context()
Methods
add_signal_handler()
def add_signal_handler(
handler: typing.Callable[[int, FrameType], typing.Any],
):
Parameter |
Type |
handler |
typing.Callable[[int, FrameType], typing.Any] |
current_context()
get_origin_stackframe()
def get_origin_stackframe(
limit,
):
initialize()
Re-initializes the context and erases the entire context
pop_context()
push_context()
def push_context(
ctx: FlyteContext,
f: Optional[traceback.FrameSummary],
):
Parameter |
Type |
ctx |
FlyteContext |
f |
Optional[traceback.FrameSummary] |
size()
with_context()
def with_context(
b: FlyteContext.Builder,
):
Parameter |
Type |
b |
FlyteContext.Builder |
flytekit.types.structured.structured_dataset.Generic
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from
Generic when they declare a parameter list after the class’s name::
class Mapping[KT, VT]:
def getitem(self, key: KT) -> VT:
…
Etc.
On older versions of Python, however, generic classes have to
explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as
follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
try:
return mapping[key]
except KeyError:
return default
flytekit.types.structured.structured_dataset.Literal
def Literal(
scalar: typing.Optional[flytekit.models.literals.Scalar],
collection: typing.Optional[flytekit.models.literals.LiteralCollection],
map: typing.Optional[flytekit.models.literals.LiteralMap],
hash: typing.Optional[str],
metadata: typing.Optional[typing.Dict[str, str]],
offloaded_metadata: typing.Optional[flytekit.models.literals.LiteralOffloadedMetadata],
):
This IDL message represents a literal value in the Flyte ecosystem.
Parameter |
Type |
scalar |
typing.Optional[flytekit.models.literals.Scalar] |
collection |
typing.Optional[flytekit.models.literals.LiteralCollection] |
map |
typing.Optional[flytekit.models.literals.LiteralMap] |
hash |
typing.Optional[str] |
metadata |
typing.Optional[typing.Dict[str, str]] |
offloaded_metadata |
typing.Optional[flytekit.models.literals.LiteralOffloadedMetadata] |
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object: flyteidl.core.literals_pb2.Literal,
):
Parameter |
Type |
pb2_object |
flyteidl.core.literals_pb2.Literal |
serialize_to_string()
def serialize_to_string()
def set_metadata(
metadata: typing.Dict[str, str],
):
Note: This is a mutation on the literal
Parameter |
Type |
metadata |
typing.Dict[str, str] |
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
collection |
|
|
hash |
|
|
is_empty |
|
|
map |
|
|
metadata |
|
|
offloaded_metadata |
|
|
scalar |
|
|
value |
|
|
flytekit.types.structured.structured_dataset.LiteralType
def LiteralType(
simple,
schema,
collection_type,
map_value_type,
blob,
enum_type,
union_type,
structured_dataset_type,
metadata,
structure,
annotation,
):
This is a oneof message, only one of the kwargs may be set, representing one of the Flyte types.
Parameter |
Type |
simple |
|
schema |
|
collection_type |
|
map_value_type |
|
blob |
|
enum_type |
|
union_type |
|
structured_dataset_type |
|
metadata |
|
structure |
|
annotation |
|
Methods
from_flyte_idl()
def from_flyte_idl(
proto,
):
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
annotation |
|
|
blob |
|
|
collection_type |
|
|
enum_type |
|
|
is_empty |
|
|
map_value_type |
|
|
metadata |
|
|
schema |
|
|
simple |
|
|
structure |
|
|
structured_dataset_type |
|
|
union_type |
|
|
flytekit.types.structured.structured_dataset.Renderable
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol):
def meth(self) -> int:
…
Such classes are primarily used with static type checkers that recognize
structural subtyping (static duck-typing).
For example::
class C:
def meth(self) -> int:
return 0
def func(x: Proto) -> int:
return x.meth()
func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with
@typing.runtime_checkable act as simple-minded runtime protocols that check
only the presence of given attributes, ignoring their type signatures.
Protocol classes can be generic, they are defined as::
class GenProtoT:
def meth(self) -> T:
…
def Renderable(
args,
kwargs,
):
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
Methods
Method |
Description |
to_html() |
Convert an object(markdown, pandas |
to_html()
def to_html(
python_value: typing.Any,
):
Convert an object(markdown, pandas.dataframe) to HTML and return HTML as a unicode string.
Returns: An HTML document as a string.
Parameter |
Type |
python_value |
typing.Any |
flytekit.types.structured.structured_dataset.Scalar
def Scalar(
primitive: typing.Optional[flytekit.models.literals.Primitive],
blob: typing.Optional[flytekit.models.literals.Blob],
binary: typing.Optional[flytekit.models.literals.Binary],
schema: typing.Optional[flytekit.models.literals.Schema],
union: typing.Optional[flytekit.models.literals.Union],
none_type: typing.Optional[flytekit.models.literals.Void],
error: typing.Optional[flytekit.models.types.Error],
generic: typing.Optional[google.protobuf.struct_pb2.Struct],
structured_dataset: typing.Optional[flytekit.models.literals.StructuredDataset],
):
Scalar wrapper around Flyte types. Only one can be specified.
Parameter |
Type |
primitive |
typing.Optional[flytekit.models.literals.Primitive] |
blob |
typing.Optional[flytekit.models.literals.Blob] |
binary |
typing.Optional[flytekit.models.literals.Binary] |
schema |
typing.Optional[flytekit.models.literals.Schema] |
union |
typing.Optional[flytekit.models.literals.Union] |
none_type |
typing.Optional[flytekit.models.literals.Void] |
error |
typing.Optional[flytekit.models.types.Error] |
generic |
typing.Optional[google.protobuf.struct_pb2.Struct] |
structured_dataset |
typing.Optional[flytekit.models.literals.StructuredDataset] |
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
binary |
|
|
blob |
|
|
error |
|
|
generic |
|
|
is_empty |
|
|
none_type |
|
|
primitive |
|
|
schema |
|
|
structured_dataset |
|
|
union |
|
|
value |
|
|
flytekit.types.structured.structured_dataset.SchemaType
def SchemaType(
columns,
):
Methods
from_flyte_idl()
def from_flyte_idl(
proto,
):
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
columns |
|
|
is_empty |
|
|
flytekit.types.structured.structured_dataset.SerializableType
flytekit.types.structured.structured_dataset.Struct
A ProtocolMessage
Methods
get_or_create_list()
def get_or_create_list(
key,
):
Returns a list for this key, creating if it didn’t exist already.
get_or_create_struct()
def get_or_create_struct(
key,
):
Returns a struct for this key, creating if it didn’t exist already.
items()
keys()
update()
def update(
dictionary,
):
Parameter |
Type |
dictionary |
|
values()
flytekit.types.structured.structured_dataset.StructuredDataset
This is the user facing StructuredDataset class. Please don’t confuse it with the literals.StructuredDataset
class (that is just a model, a Python class representation of the protobuf).
def StructuredDataset(
dataframe: typing.Optional[typing.Any],
uri: typing.Optional[str],
metadata: typing.Optional[literals.StructuredDatasetMetadata],
kwargs,
):
Parameter |
Type |
dataframe |
typing.Optional[typing.Any] |
uri |
typing.Optional[str] |
metadata |
typing.Optional[literals.StructuredDatasetMetadata] |
kwargs |
**kwargs |
Methods
all()
column_names()
columns()
deserialize_structured_dataset()
def deserialize_structured_dataset(
args,
kwargs,
):
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
from_dict()
def from_dict(
d,
dialect,
):
from_json()
def from_json(
data: typing.Union[str, bytes, bytearray],
decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
from_dict_kwargs: typing.Any,
):
Parameter |
Type |
data |
typing.Union[str, bytes, bytearray] |
decoder |
collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]] |
from_dict_kwargs |
typing.Any |
iter()
open()
def open(
dataframe_type: Type[DF],
):
Parameter |
Type |
dataframe_type |
Type[DF] |
serialize_structured_dataset()
def serialize_structured_dataset(
args,
kwargs,
):
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
set_literal()
def set_literal(
ctx: FlyteContext,
expected: LiteralType,
):
A public wrapper method to set the StructuredDataset Literal.
This method provides external access to the internal _set_literal method.
Parameter |
Type |
ctx |
FlyteContext |
expected |
LiteralType |
to_dict()
to_json()
def to_json(
encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
to_dict_kwargs: typing.Any,
):
Parameter |
Type |
encoder |
collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]] |
to_dict_kwargs |
typing.Any |
Properties
Property |
Type |
Description |
dataframe |
|
|
literal |
|
|
metadata |
|
|
flytekit.types.structured.structured_dataset.StructuredDatasetDecoder
Helper class that provides a standard way to create an ABC using
inheritance.
def StructuredDatasetDecoder(
python_type: Type[DF],
protocol: Optional[str],
supported_format: Optional[str],
additional_protocols: Optional[List[str]],
):
Extend this abstract class, implement the decode function, and register your concrete class with the
StructuredDatasetTransformerEngine class in order for the core flytekit type engine to handle
dataframe libraries. This is the decoder interface, meaning it is used when there is a Flyte Literal value,
and we have to get a Python value out of it. For the other way, see the StructuredDatasetEncoder
Parameter |
Type |
python_type |
Type[DF] |
protocol |
Optional[str] |
supported_format |
Optional[str] |
additional_protocols |
Optional[List[str]] |
Methods
Method |
Description |
decode() |
This is code that will be called by the dataset transformer engine to ultimately translate from a Flyte Literal |
decode()
def decode(
ctx: FlyteContext,
flyte_value: literals.StructuredDataset,
current_task_metadata: StructuredDatasetMetadata,
):
This is code that will be called by the dataset transformer engine to ultimately translate from a Flyte Literal
value into a Python instance.
Parameter |
Type |
ctx |
FlyteContext |
flyte_value |
literals.StructuredDataset |
current_task_metadata |
StructuredDatasetMetadata |
Properties
Property |
Type |
Description |
protocol |
|
|
python_type |
|
|
supported_format |
|
|
flytekit.types.structured.structured_dataset.StructuredDatasetEncoder
Helper class that provides a standard way to create an ABC using
inheritance.
def StructuredDatasetEncoder(
python_type: Type[T],
protocol: Optional[str],
supported_format: Optional[str],
):
Extend this abstract class, implement the encode function, and register your concrete class with the
StructuredDatasetTransformerEngine class in order for the core flytekit type engine to handle
dataframe libraries. This is the encoding interface, meaning it is used when there is a Python value that the
flytekit type engine is trying to convert into a Flyte Literal. For the other way, see
the StructuredDatasetEncoder
Parameter |
Type |
python_type |
Type[T] |
protocol |
Optional[str] |
supported_format |
Optional[str] |
Methods
Method |
Description |
encode() |
Even if the user code returns a plain dataframe instance, the dataset transformer engine will wrap the |
encode()
def encode(
ctx: FlyteContext,
structured_dataset: StructuredDataset,
structured_dataset_type: StructuredDatasetType,
):
Even if the user code returns a plain dataframe instance, the dataset transformer engine will wrap the
incoming dataframe with defaults set for that dataframe
type. This simplifies this function’s interface as a lot of data that could be specified by the user using
the
Parameter |
Type |
ctx |
FlyteContext |
structured_dataset |
StructuredDataset |
structured_dataset_type |
StructuredDatasetType |
Properties
Property |
Type |
Description |
protocol |
|
|
python_type |
|
|
supported_format |
|
|
str(object=’’) -> str
str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or
errors is specified, then the object must expose a data buffer
that will be decoded using the given encoding and error handler.
Otherwise, returns the result of object.str() (if defined)
or repr(object).
encoding defaults to sys.getdefaultencoding().
errors defaults to ‘strict’.
def StructuredDatasetMetadata(
structured_dataset_type: typing.Optional[flytekit.models.types.StructuredDatasetType],
):
Parameter |
Type |
structured_dataset_type |
typing.Optional[flytekit.models.types.StructuredDatasetType] |
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object: flyteidl.core.literals_pb2.StructuredDatasetMetadata,
):
Parameter |
Type |
pb2_object |
flyteidl.core.literals_pb2.StructuredDatasetMetadata |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
is_empty |
|
|
structured_dataset_type |
|
|
Think of this transformer as a higher-level meta transformer that is used for all the dataframe types.
If you are bringing a custom data frame type, or any data frame type, to flytekit, instead of
registering with the main type engine, you should register with this transformer instead.
def StructuredDatasetTransformerEngine()
Methods
assert_type()
def assert_type(
t: Type[StructuredDataset],
v: typing.Any,
):
Parameter |
Type |
t |
Type[StructuredDataset] |
v |
typing.Any |
async_to_literal()
def async_to_literal(
ctx: FlyteContext,
python_val: Union[StructuredDataset, typing.Any],
python_type: Union[Type[StructuredDataset], Type],
expected: LiteralType,
):
Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type.
Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these
do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating
what was the mismatch
Parameter |
Type |
ctx |
FlyteContext |
python_val |
Union[StructuredDataset, typing.Any] |
python_type |
Union[Type[StructuredDataset], Type] |
expected |
LiteralType |
async_to_python_value()
def async_to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type[T] | StructuredDataset,
):
The only tricky thing with converting a Literal (say the output of an earlier task), to a Python value at
the start of a task execution, is the column subsetting behavior. For example, if you have,
def t1() -> Annotated[StructuredDataset, kwtypes(col_a=int, col_b=float)]: …
def t2(in_a: Annotated[StructuredDataset, kwtypes(col_b=float)]): …
where t2(in_a=t1()), when t2 does in_a.open(pd.DataFrame).all(), it should get a DataFrame
with only one column.
+—————————–+—————————————–+————————————–+
| | StructuredDatasetType of the incoming Literal |
+—————————–+—————————————–+————————————–+
| StructuredDatasetType | Has columns defined | [] columns or None |
| of currently running task | | |
+=============================+=========================================+======================================+
| Has columns | The StructuredDatasetType passed to the decoder will have the columns |
| defined | as defined by the type annotation of the currently running task. |
| | |
| | Decoders should then subset the incoming data to the columns requested. |
| | |
+—————————–+—————————————–+————————————–+
| [] columns or None | StructuredDatasetType passed to decoder | StructuredDatasetType passed to the |
| | will have the columns from the incoming | decoder will have an empty list of |
| | Literal. This is the scenario where | columns. |
| | the Literal returned by the running | |
| | task will have more information than | |
| | the running task’s signature. | |
+—————————–+—————————————–+————————————–+
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
`Type[T] |
dict_to_structured_dataset()
def dict_to_structured_dataset(
dict_obj: typing.Dict[str, str],
expected_python_type: Type[T] | StructuredDataset,
):
Parameter |
Type |
dict_obj |
typing.Dict[str, str] |
expected_python_type |
`Type[T] |
encode()
def encode(
ctx: FlyteContext,
sd: StructuredDataset,
df_type: Type,
protocol: str,
format: str,
structured_literal_type: StructuredDatasetType,
):
Parameter |
Type |
ctx |
FlyteContext |
sd |
StructuredDataset |
df_type |
Type |
protocol |
str |
format |
str |
structured_literal_type |
StructuredDatasetType |
from_binary_idl()
def from_binary_idl(
binary_idl_object: Binary,
expected_python_type: Type[T] | StructuredDataset,
):
If the input is from flytekit, the Life Cycle will be as follows:
Life Cycle:
binary IDL -> resolved binary -> bytes -> expected Python object
(flytekit customized (propeller processing) (flytekit binary IDL) (flytekit customized
serialization) deserialization)
Example Code:
@dataclass
class DC:
sd: StructuredDataset
@workflow
def wf(dc: DC):
t_sd(dc.sd)
Note:
- The deserialization is the same as put a structured dataset in a dataclass, which will deserialize by the mashumaro’s API.
Related PR:
Parameter |
Type |
binary_idl_object |
Binary |
expected_python_type |
`Type[T] |
from_generic_idl()
def from_generic_idl(
generic: Struct,
expected_python_type: Type[T] | StructuredDataset,
):
If the input is from Flyte Console, the Life Cycle will be as follows:
Life Cycle:
json str -> protobuf struct -> resolved protobuf struct -> expected Python object
(console user input) (console output) (propeller) (flytekit customized deserialization)
Example Code:
@dataclass
class DC:
sd: StructuredDataset
@workflow
def wf(dc: DC):
t_sd(dc.sd)
Note:
- The deserialization is the same as put a structured dataset in a dataclass, which will deserialize by the mashumaro’s API.
Related PR:
Parameter |
Type |
generic |
Struct |
expected_python_type |
`Type[T] |
get_decoder()
def get_decoder(
df_type: Type,
protocol: str,
format: str,
):
Parameter |
Type |
df_type |
Type |
protocol |
str |
format |
str |
get_encoder()
def get_encoder(
df_type: Type,
protocol: str,
format: str,
):
Parameter |
Type |
df_type |
Type |
protocol |
str |
format |
str |
get_literal_type()
def get_literal_type(
t: typing.Union[Type[StructuredDataset], typing.Any],
):
Provide a concrete implementation so that writers of custom dataframe handlers since there’s nothing that
special about the literal type. Any dataframe type will always be associated with the structured dataset type.
The other aspects of it - columns, external schema type, etc. can be read from associated metadata.
Parameter |
Type |
t |
typing.Union[Type[StructuredDataset], typing.Any] |
guess_python_type()
def guess_python_type(
literal_type: LiteralType,
):
Converts the Flyte LiteralType to a python object type.
Parameter |
Type |
literal_type |
LiteralType |
isinstance_generic()
def isinstance_generic(
obj,
generic_alias,
):
Parameter |
Type |
obj |
|
generic_alias |
|
iter_as()
def iter_as(
ctx: FlyteContext,
sd: literals.StructuredDataset,
df_type: Type[DF],
updated_metadata: StructuredDatasetMetadata,
):
Parameter |
Type |
ctx |
FlyteContext |
sd |
literals.StructuredDataset |
df_type |
Type[DF] |
updated_metadata |
StructuredDatasetMetadata |
open_as()
def open_as(
ctx: FlyteContext,
sd: literals.StructuredDataset,
df_type: Type[DF],
updated_metadata: StructuredDatasetMetadata,
):
Parameter |
Type |
ctx |
FlyteContext |
sd |
literals.StructuredDataset |
df_type |
Type[DF] |
updated_metadata |
StructuredDatasetMetadata |
register()
def register(
h: Handlers,
default_for_type: bool,
override: bool,
default_format_for_type: bool,
default_storage_for_type: bool,
):
Call this with any Encoder or Decoder to register it with the flytekit type system. If your handler does not
specify a protocol (e.g. s3, gs, etc.) field, then
Parameter |
Type |
h |
Handlers |
default_for_type |
bool |
override |
bool |
default_format_for_type |
bool |
default_storage_for_type |
bool |
register_for_protocol()
def register_for_protocol(
h: Handlers,
protocol: str,
default_for_type: bool,
override: bool,
default_format_for_type: bool,
default_storage_for_type: bool,
):
See the main register function instead.
Parameter |
Type |
h |
Handlers |
protocol |
str |
default_for_type |
bool |
override |
bool |
default_format_for_type |
bool |
default_storage_for_type |
bool |
register_renderer()
def register_renderer(
python_type: Type,
renderer: Renderable,
):
Parameter |
Type |
python_type |
Type |
renderer |
Renderable |
to_html()
def to_html(
ctx: FlyteContext,
python_val: typing.Any,
expected_python_type: Type[T],
):
Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
expected_python_type |
Type[T] |
to_literal()
def to_literal(
ctx: FlyteContext,
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
):
Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type.
Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these
do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating
what was the mismatch
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
python_type |
Type[T] |
expected |
LiteralType |
to_python_value()
def to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type[T],
):
Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
Type[T] |
Properties
Property |
Type |
Description |
is_async |
|
|
name |
|
|
python_type |
|
|
type_assertions_enabled |
|
|
flytekit.types.structured.structured_dataset.StructuredDatasetType
def StructuredDatasetType(
columns: typing.List[flytekit.models.types.StructuredDatasetType.DatasetColumn],
format: str,
external_schema_type: str,
external_schema_bytes: bytes,
):
Parameter |
Type |
columns |
typing.List[flytekit.models.types.StructuredDatasetType.DatasetColumn] |
format |
str |
external_schema_type |
str |
external_schema_bytes |
bytes |
Methods
from_flyte_idl()
def from_flyte_idl(
proto: flyteidl.core.types_pb2.StructuredDatasetType,
):
Parameter |
Type |
proto |
flyteidl.core.types_pb2.StructuredDatasetType |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
columns |
|
|
external_schema_bytes |
|
|
external_schema_type |
|
|
format |
|
|
is_empty |
|
|
flytekit.types.structured.structured_dataset.TypeEngine
Core Extensible TypeEngine of Flytekit. This should be used to extend the capabilities of FlyteKits type system.
Users can implement their own TypeTransformers and register them with the TypeEngine. This will allow special handling
of user objects
Methods
async_to_literal()
def async_to_literal(
ctx: FlyteContext,
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
):
Converts a python value of a given type and expected LiteralType
into a resolved Literal
value.
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
python_type |
Type[T] |
expected |
LiteralType |
async_to_python_value()
def async_to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type,
):
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
Type |
calculate_hash()
def calculate_hash(
python_val: typing.Any,
python_type: Type[T],
):
Parameter |
Type |
python_val |
typing.Any |
python_type |
Type[T] |
dict_to_literal_map()
def dict_to_literal_map(
ctx: FlyteContext,
d: typing.Dict[str, typing.Any],
type_hints: Optional[typing.Dict[str, type]],
):
Parameter |
Type |
ctx |
FlyteContext |
d |
typing.Dict[str, typing.Any] |
type_hints |
Optional[typing.Dict[str, type]] |
dict_to_literal_map_pb()
def dict_to_literal_map_pb(
ctx: FlyteContext,
d: typing.Dict[str, typing.Any],
type_hints: Optional[typing.Dict[str, type]],
):
Parameter |
Type |
ctx |
FlyteContext |
d |
typing.Dict[str, typing.Any] |
type_hints |
Optional[typing.Dict[str, type]] |
def get_available_transformers()
Returns all python types for which transformers are available
def get_transformer(
python_type: Type,
):
Implements a recursive search for the transformer.
Parameter |
Type |
python_type |
Type |
guess_python_type()
def guess_python_type(
flyte_type: LiteralType,
):
Transforms a flyte-specific LiteralType
to a regular python value.
Parameter |
Type |
flyte_type |
LiteralType |
guess_python_types()
def guess_python_types(
flyte_variable_dict: typing.Dict[str, _interface_models.Variable],
):
Transforms a dictionary of flyte-specific Variable
objects to a dictionary of regular python values.
Parameter |
Type |
flyte_variable_dict |
typing.Dict[str, _interface_models.Variable] |
def lazy_import_transformers()
Only load the transformers if needed.
literal_map_to_kwargs()
def literal_map_to_kwargs(
ctx: FlyteContext,
lm: LiteralMap,
python_types: typing.Optional[typing.Dict[str, type]],
literal_types: typing.Optional[typing.Dict[str, _interface_models.Variable]],
):
Parameter |
Type |
ctx |
FlyteContext |
lm |
LiteralMap |
python_types |
typing.Optional[typing.Dict[str, type]] |
literal_types |
typing.Optional[typing.Dict[str, _interface_models.Variable]] |
named_tuple_to_variable_map()
def named_tuple_to_variable_map(
t: typing.NamedTuple,
):
Converts a python-native NamedTuple
to a flyte-specific VariableMap of named literals.
Parameter |
Type |
t |
typing.NamedTuple |
register()
def register(
transformer: TypeTransformer,
additional_types: Optional[typing.List[Type]],
):
This should be used for all types that respond with the right type annotation when you use type(…) function
Parameter |
Type |
transformer |
TypeTransformer |
additional_types |
Optional[typing.List[Type]] |
register_additional_type()
def register_additional_type(
transformer: TypeTransformer[T],
additional_type: Type[T],
override,
):
Parameter |
Type |
transformer |
TypeTransformer[T] |
additional_type |
Type[T] |
override |
|
register_restricted_type()
def register_restricted_type(
name: str,
type: Type[T],
):
Parameter |
Type |
name |
str |
type |
Type[T] |
to_html()
def to_html(
ctx: FlyteContext,
python_val: typing.Any,
expected_python_type: Type[typing.Any],
):
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
expected_python_type |
Type[typing.Any] |
to_literal()
def to_literal(
ctx: FlyteContext,
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
):
The current dance is because we are allowing users to call from an async function, this synchronous
to_literal function, and allowing this to_literal function, to then invoke yet another async function,
namely an async transformer.
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
python_type |
Type[T] |
expected |
LiteralType |
to_literal_checks()
def to_literal_checks(
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
):
Parameter |
Type |
python_val |
typing.Any |
python_type |
Type[T] |
expected |
LiteralType |
to_literal_type()
def to_literal_type(
python_type: Type[T],
):
Converts a python type into a flyte specific LiteralType
Parameter |
Type |
python_type |
Type[T] |
to_python_value()
def to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type,
):
Converts a Literal value with an expected python type into a python value.
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
Type |
unwrap_offloaded_literal()
def unwrap_offloaded_literal(
ctx: FlyteContext,
lv: Literal,
):
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
Inappropriate argument type.