flytekit.types.schema.types
Directory
Classes
| Class | Description |
|---|---|
FlyteSchema |
|
FlyteSchemaTransformer |
Base transformer type that should be implemented for every python native type that can be handled by flytekit. |
LocalIOSchemaReader |
Base SchemaReader to handle any readers (that can manage their own IO or otherwise). |
LocalIOSchemaWriter |
Abstract base class for generic types. |
SchemaEngine |
This is the core Engine that handles all schema sub-systems. |
SchemaFormat |
Represents the schema storage format (at rest). |
SchemaHandler |
|
SchemaOpenMode |
Create a collection of name/value pairs. |
SchemaReader |
Base SchemaReader to handle any readers (that can manage their own IO or otherwise). |
SchemaWriter |
Abstract base class for generic types. |
Methods
| Method | Description |
|---|---|
generate_ordered_files() |
Variables
| Property | Type | Description |
|---|---|---|
MESSAGEPACK |
str |
|
T |
TypeVar |
Methods
generate_ordered_files()
def generate_ordered_files(
directory: os.PathLike,
n: int,
) -> typing.Generator[str, None, None]| Parameter | Type | Description |
|---|---|---|
directory |
os.PathLike |
|
n |
int |
flytekit.types.schema.types.FlyteSchema
class FlyteSchema(
local_path: typing.Optional[str],
remote_path: typing.Optional[str],
supported_mode: SchemaOpenMode,
downloader: typing.Optional[typing.Callable],
)| Parameter | Type | Description |
|---|---|---|
local_path |
typing.Optional[str] |
|
remote_path |
typing.Optional[str] |
|
supported_mode |
SchemaOpenMode |
|
downloader |
typing.Optional[typing.Callable] |
Methods
| Method | Description |
|---|---|
as_readonly() |
|
column_names() |
|
columns() |
|
deserialize_flyte_schema() |
|
format() |
|
from_dict() |
|
from_json() |
|
open() |
Returns a reader or writer depending on the mode of the object when created. |
serialize_flyte_schema() |
|
to_dict() |
|
to_json() |
as_readonly()
def as_readonly()column_names()
def column_names()columns()
def columns()deserialize_flyte_schema()
def deserialize_flyte_schema(
info,
) -> FlyteSchema| Parameter | Type | Description |
|---|---|---|
info |
format()
def format()from_dict()
def from_dict(
d,
dialect,
)| Parameter | Type | Description |
|---|---|---|
d |
||
dialect |
from_json()
def from_json(
data: typing.Union[str, bytes, bytearray],
decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
from_dict_kwargs: typing.Any,
) -> ~T| Parameter | Type | Description |
|---|---|---|
data |
typing.Union[str, bytes, bytearray] |
|
decoder |
collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]] |
|
from_dict_kwargs |
typing.Any |
open()
def open(
dataframe_fmt: typing.Optional[type],
override_mode: typing.Optional[SchemaOpenMode],
) -> typing.Union[SchemaReader, SchemaWriter]Returns a reader or writer depending on the mode of the object when created. This mode can be overridden, but will depend on whether the override can be performed. For example, if the Object was created in a read-mode a “write mode” override is not allowed. if the object was created in write-mode, a read is allowed.
| Parameter | Type | Description |
|---|---|---|
dataframe_fmt |
typing.Optional[type] |
Type of the dataframe for example pandas.DataFrame etc |
override_mode |
typing.Optional[SchemaOpenMode] |
overrides the default mode (Read, Write) SchemaOpenMode.READ, SchemaOpenMode.Write So if you have written to a schema and want to re-open it for reading, you can use this mode. A ReadOnly Schema object cannot be opened in write mode. |
serialize_flyte_schema()
def serialize_flyte_schema()to_dict()
def to_dict()to_json()
def to_json(
encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
to_dict_kwargs: typing.Any,
) -> typing.Union[str, bytes, bytearray]| Parameter | Type | Description |
|---|---|---|
encoder |
collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]] |
|
to_dict_kwargs |
typing.Any |
Properties
| Property | Type | Description |
|---|---|---|
local_path |
||
supported_mode |
flytekit.types.schema.types.FlyteSchemaTransformer
Base transformer type that should be implemented for every python native type that can be handled by flytekit
def FlyteSchemaTransformer()Methods
| Method | Description |
|---|---|
assert_type() |
|
async_to_literal() |
Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. |
async_to_python_value() |
Converts the given Literal to a Python Type. |
dict_to_flyte_schema() |
|
from_binary_idl() |
If the input is from flytekit, the Life Cycle will be as follows:. |
from_generic_idl() |
If the input is from Flyte Console, the Life Cycle will be as follows:. |
get_literal_type() |
Converts the python type to a Flyte LiteralType. |
guess_python_type() |
Converts the Flyte LiteralType to a python object type. |
isinstance_generic() |
|
to_html() |
Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div. |
to_literal() |
Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. |
to_python_value() |
Converts the given Literal to a Python Type. |
assert_type()
def assert_type(
t: Type[FlyteSchema],
v: typing.Any,
)| Parameter | Type | Description |
|---|---|---|
t |
Type[FlyteSchema] |
|
v |
typing.Any |
async_to_literal()
def async_to_literal(
ctx: FlyteContext,
python_val: FlyteSchema,
python_type: Type[FlyteSchema],
expected: LiteralType,
) -> LiteralConverts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch
| Parameter | Type | Description |
|---|---|---|
ctx |
FlyteContext |
A FlyteContext, useful in accessing the filesystem and other attributes |
python_val |
FlyteSchema |
The actual value to be transformed |
python_type |
Type[FlyteSchema] |
The assumed type of the value (this matches the declared type on the function) |
expected |
LiteralType |
Expected Literal Type |
async_to_python_value()
def async_to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type[FlyteSchema],
) -> FlyteSchemaConverts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised
| Parameter | Type | Description |
|---|---|---|
ctx |
FlyteContext |
FlyteContext |
lv |
Literal |
The received literal Value |
expected_python_type |
Type[FlyteSchema] |
Expected native python type that should be returned |
dict_to_flyte_schema()
def dict_to_flyte_schema(
dict_obj: typing.Dict[str, str],
expected_python_type: Type[FlyteSchema],
) -> FlyteSchema| Parameter | Type | Description |
|---|---|---|
dict_obj |
typing.Dict[str, str] |
|
expected_python_type |
Type[FlyteSchema] |
from_binary_idl()
def from_binary_idl(
binary_idl_object: Binary,
expected_python_type: Type[FlyteSchema],
) -> FlyteSchemaIf the input is from flytekit, the Life Cycle will be as follows:
Life Cycle: binary IDL -> resolved binary -> bytes -> expected Python object (flytekit customized (propeller processing) (flytekit binary IDL) (flytekit customized serialization) deserialization)
Example Code: @dataclass class DC: fs: FlyteSchema
@workflow def wf(dc: DC): t_fs(dc.fs)
Note:
- The deserialization is the same as put a flyte schema in a dataclass, which will deserialize by the mashumaro’s API.
Related PR:
- Title: Override Dataclass Serialization/Deserialization Behavior for FlyteTypes via Mashumaro
- Link: https://github.com/flyteorg/flytekit/pull/2554
| Parameter | Type | Description |
|---|---|---|
binary_idl_object |
Binary |
|
expected_python_type |
Type[FlyteSchema] |
from_generic_idl()
def from_generic_idl(
generic: Struct,
expected_python_type: Type[FlyteSchema],
) -> FlyteSchemaIf the input is from Flyte Console, the Life Cycle will be as follows:
Life Cycle: json str -> protobuf struct -> resolved protobuf struct -> expected Python object (console user input) (console output) (propeller) (flytekit customized deserialization)
Example Code: @dataclass class DC: fs: FlyteSchema
@workflow def wf(dc: DC): t_fs(dc.fs)
Note:
- The deserialization is the same as put a flyte schema in a dataclass, which will deserialize by the mashumaro’s API.
Related PR:
- Title: Override Dataclass Serialization/Deserialization Behavior for FlyteTypes via Mashumaro
- Link: https://github.com/flyteorg/flytekit/pull/2554
| Parameter | Type | Description |
|---|---|---|
generic |
Struct |
|
expected_python_type |
Type[FlyteSchema] |
get_literal_type()
def get_literal_type(
t: Type[FlyteSchema],
) -> LiteralTypeConverts the python type to a Flyte LiteralType
| Parameter | Type | Description |
|---|---|---|
t |
Type[FlyteSchema] |
guess_python_type()
def guess_python_type(
literal_type: LiteralType,
) -> Type[FlyteSchema]Converts the Flyte LiteralType to a python object type.
| Parameter | Type | Description |
|---|---|---|
literal_type |
LiteralType |
isinstance_generic()
def isinstance_generic(
obj,
generic_alias,
)| Parameter | Type | Description |
|---|---|---|
obj |
||
generic_alias |
to_html()
def to_html(
ctx: FlyteContext,
python_val: T,
expected_python_type: Type[T],
) -> strConverts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
| Parameter | Type | Description |
|---|---|---|
ctx |
FlyteContext |
|
python_val |
T |
|
expected_python_type |
Type[T] |
to_literal()
def to_literal(
ctx: FlyteContext,
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
) -> LiteralConverts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch
| Parameter | Type | Description |
|---|---|---|
ctx |
FlyteContext |
A FlyteContext, useful in accessing the filesystem and other attributes |
python_val |
typing.Any |
The actual value to be transformed |
python_type |
Type[T] |
The assumed type of the value (this matches the declared type on the function) |
expected |
LiteralType |
Expected Literal Type |
to_python_value()
def to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type[T],
) -> Optional[T]Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised
| Parameter | Type | Description |
|---|---|---|
ctx |
FlyteContext |
FlyteContext |
lv |
Literal |
The received literal Value |
expected_python_type |
Type[T] |
Expected native python type that should be returned |
Properties
| Property | Type | Description |
|---|---|---|
is_async |
||
name |
||
python_type |
This returns the python type |
|
type_assertions_enabled |
Indicates if the transformer wants type assertions to be enabled at the core type engine layer |
flytekit.types.schema.types.LocalIOSchemaReader
Base SchemaReader to handle any readers (that can manage their own IO or otherwise) Use the simplified base LocalIOSchemaReader for non distributed dataframes
class LocalIOSchemaReader(
from_path: str,
cols: typing.Optional[typing.Dict[str, type]],
fmt: SchemaFormat,
)| Parameter | Type | Description |
|---|---|---|
from_path |
str |
|
cols |
typing.Optional[typing.Dict[str, type]] |
|
fmt |
SchemaFormat |
Methods
| Method | Description |
|---|---|
all() |
|
iter() |
all()
def all(
kwargs,
) -> T| Parameter | Type | Description |
|---|---|---|
kwargs |
**kwargs |
iter()
def iter(
kwargs,
) -> typing.Generator[T, None, None]| Parameter | Type | Description |
|---|---|---|
kwargs |
**kwargs |
Properties
| Property | Type | Description |
|---|---|---|
column_names |
||
from_path |
flytekit.types.schema.types.LocalIOSchemaWriter
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class’s name::
class Mapping[KT, VT]:
def __getitem__(self, key: KT) -> VT:
...
# Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
try:
return mapping[key]
except KeyError:
return default
class LocalIOSchemaWriter(
to_local_path: str,
cols: typing.Optional[typing.Dict[str, type]],
fmt: SchemaFormat,
)| Parameter | Type | Description |
|---|---|---|
to_local_path |
str |
|
cols |
typing.Optional[typing.Dict[str, type]] |
|
fmt |
SchemaFormat |
Methods
| Method | Description |
|---|---|
write() |
write()
def write(
dfs,
kwargs,
)| Parameter | Type | Description |
|---|---|---|
dfs |
||
kwargs |
**kwargs |
Properties
| Property | Type | Description |
|---|---|---|
column_names |
||
to_path |
flytekit.types.schema.types.SchemaEngine
This is the core Engine that handles all schema sub-systems. All schema types needs to be registered with this to allow direct support for that type in FlyteSchema. e.g. of possible supported types are Pandas.DataFrame, Spark.DataFrame, Vaex.DataFrame, etc.
Methods
| Method | Description |
|---|---|
get_handler() |
|
register_handler() |
Register a new handler that can create a SchemaReader and SchemaWriter for the expected type. |
get_handler()
def get_handler(
t: Type,
) -> SchemaHandler| Parameter | Type | Description |
|---|---|---|
t |
Type |
register_handler()
def register_handler(
h: SchemaHandler,
)Register a new handler that can create a SchemaReader and SchemaWriter for the expected type.
| Parameter | Type | Description |
|---|---|---|
h |
SchemaHandler |
flytekit.types.schema.types.SchemaFormat
Represents the schema storage format (at rest). Currently only parquet is supported
flytekit.types.schema.types.SchemaHandler
class SchemaHandler(
name: str,
object_type: Type,
reader: Type[SchemaReader],
writer: Type[SchemaWriter],
handles_remote_io: bool,
)| Parameter | Type | Description |
|---|---|---|
name |
str |
|
object_type |
Type |
|
reader |
Type[SchemaReader] |
|
writer |
Type[SchemaWriter] |
|
handles_remote_io |
bool |
flytekit.types.schema.types.SchemaOpenMode
Create a collection of name/value pairs.
Example enumeration:
class Color(Enum): … RED = 1 … BLUE = 2 … GREEN = 3
Access them by:
-
attribute access:
Color.RED <Color.RED: 1>
-
value lookup:
Color(1) <Color.RED: 1>
-
name lookup:
Color[‘RED’] <Color.RED: 1>
Enumerations can be iterated over, and know how many members they have:
len(Color) 3
list(Color) [<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]
Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.
flytekit.types.schema.types.SchemaReader
Base SchemaReader to handle any readers (that can manage their own IO or otherwise) Use the simplified base LocalIOSchemaReader for non distributed dataframes
class SchemaReader(
from_path: str,
cols: typing.Optional[typing.Dict[str, type]],
fmt: SchemaFormat,
)| Parameter | Type | Description |
|---|---|---|
from_path |
str |
|
cols |
typing.Optional[typing.Dict[str, type]] |
|
fmt |
SchemaFormat |
Methods
| Method | Description |
|---|---|
all() |
|
iter() |
all()
def all(
kwargs,
) -> T| Parameter | Type | Description |
|---|---|---|
kwargs |
**kwargs |
iter()
def iter(
kwargs,
) -> typing.Generator[T, None, None]| Parameter | Type | Description |
|---|---|---|
kwargs |
**kwargs |
Properties
| Property | Type | Description |
|---|---|---|
column_names |
||
from_path |
flytekit.types.schema.types.SchemaWriter
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class’s name::
class Mapping[KT, VT]:
def __getitem__(self, key: KT) -> VT:
...
# Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
try:
return mapping[key]
except KeyError:
return default
class SchemaWriter(
to_path: str,
cols: typing.Optional[typing.Dict[str, type]],
fmt: SchemaFormat,
)| Parameter | Type | Description |
|---|---|---|
to_path |
str |
|
cols |
typing.Optional[typing.Dict[str, type]] |
|
fmt |
SchemaFormat |
Methods
| Method | Description |
|---|---|
write() |
write()
def write(
dfs,
kwargs,
)| Parameter | Type | Description |
|---|---|---|
dfs |
||
kwargs |
**kwargs |
Properties
| Property | Type | Description |
|---|---|---|
column_names |
||
to_path |