flytekit.remote.data
Directory
Classes
Class | Description |
---|---|
BlobType |
This type represents offloaded data and is typically used for things like files. |
FileAccessProvider |
This is the class that is available through the FlyteContext and can be used for persisting data to the remote. |
Literal |
None. |
RichCallback |
Base class and interface for callback mechanism. |
flytekit.remote.data.BlobType
This type represents offloaded data and is typically used for things like files.
def BlobType(
format,
dimensionality,
):
Parameter | Type |
---|---|
format |
|
dimensionality |
Methods
Method | Description |
---|---|
from_flyte_idl() |
|
serialize_to_string() |
None |
short_string() |
|
to_flyte_idl() |
|
verbose_string() |
from_flyte_idl()
def from_flyte_idl(
proto,
):
Parameter | Type |
---|---|
proto |
serialize_to_string()
def serialize_to_string()
short_string()
def short_string()
to_flyte_idl()
def to_flyte_idl()
verbose_string()
def verbose_string()
Properties
Property | Type | Description |
---|---|---|
dimensionality | ||
format | ||
is_empty |
flytekit.remote.data.FileAccessProvider
This is the class that is available through the FlyteContext and can be used for persisting data to the remote durable store.
def FileAccessProvider(
local_sandbox_dir: typing.Union[str, os.PathLike],
raw_output_prefix: str,
data_config: typing.Optional[flytekit.configuration.DataConfig],
execution_metadata: typing.Optional[dict],
):
Parameter | Type |
---|---|
local_sandbox_dir |
typing.Union[str, os.PathLike] |
raw_output_prefix |
str |
data_config |
typing.Optional[flytekit.configuration.DataConfig] |
execution_metadata |
typing.Optional[dict] |
Methods
Method | Description |
---|---|
async_get_data() |
|
async_put_data() |
The implication here is that we’re always going to put data to the remote location, so we |
async_put_raw_data() |
This is a more flexible version of put that accepts a file-like object or a string path |
download() |
Downloads from remote to local |
download_directory() |
Downloads directory from given remote to local path |
exists() |
None |
generate_new_custom_path() |
Generates a new path with the raw output prefix and a random string appended to it |
get() |
None |
get_async_filesystem_for_path() |
None |
get_data() |
|
get_file_tail() |
None |
get_filesystem() |
None |
get_filesystem_for_path() |
None |
get_random_local_directory() |
None |
get_random_local_path() |
Use file_path_or_file_name, when you want a random directory, but want to preserve the leaf file name |
get_random_remote_directory() |
None |
get_random_remote_path() |
None |
get_random_string() |
None |
is_remote() |
Deprecated |
join() |
None |
put_data() |
The implication here is that we’re always going to put data to the remote location, so we |
put_raw_data() |
This is a more flexible version of put that accepts a file-like object or a string path |
recursive_paths() |
None |
sep() |
None |
strip_file_header() |
Drops file:// if it exists from the file |
upload() |
|
upload_directory() |
async_get_data()
def async_get_data(
remote_path: str,
local_path: str,
is_multipart: bool,
kwargs,
):
Parameter | Type |
---|---|
remote_path |
str |
local_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
async_put_data()
def async_put_data(
local_path: typing.Union[str, os.PathLike],
remote_path: str,
is_multipart: bool,
kwargs,
):
The implication here is that we’re always going to put data to the remote location, so we .remote to ensure we don’t use the true local proxy if the remote path is a file://
Parameter | Type |
---|---|
local_path |
typing.Union[str, os.PathLike] |
remote_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
async_put_raw_data()
def async_put_raw_data(
lpath: typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO],
upload_prefix: typing.Optional[str],
file_name: typing.Optional[str],
read_chunk_size_bytes: int,
encoding: str,
skip_raw_data_prefix: bool,
kwargs,
):
This is a more flexible version of put that accepts a file-like object or a string path. Writes to the raw output prefix only. If you want to write to another fs use put_data or get the fsspec file system directly. FYI: Currently the raw output prefix set by propeller is already unique per retry and looks like s3://my-s3-bucket/data/o4/feda4e266c748463a97d-n0-0
If lpath is a folder, then recursive will be set. If lpath is a streamable, then it can only be a single file.
Writes to: {raw output prefix}/{upload_prefix}/{file_name}
Parameter | Type |
---|---|
lpath |
typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO] |
upload_prefix |
typing.Optional[str] |
file_name |
typing.Optional[str] |
read_chunk_size_bytes |
int |
encoding |
str |
skip_raw_data_prefix |
bool |
kwargs |
**kwargs |
download()
def download(
remote_path: str,
local_path: str,
kwargs,
):
Downloads from remote to local
Parameter | Type |
---|---|
remote_path |
str |
local_path |
str |
kwargs |
**kwargs |
download_directory()
def download_directory(
remote_path: str,
local_path: str,
kwargs,
):
Downloads directory from given remote to local path
Parameter | Type |
---|---|
remote_path |
str |
local_path |
str |
kwargs |
**kwargs |
exists()
def exists(
path: str,
):
Parameter | Type |
---|---|
path |
str |
generate_new_custom_path()
def generate_new_custom_path(
fs: typing.Optional[fsspec.spec.AbstractFileSystem],
alt: typing.Optional[str],
stem: typing.Optional[str],
):
Generates a new path with the raw output prefix and a random string appended to it. Optionally, you can provide an alternate prefix and a stem. If stem is provided, it will be appended to the path instead of a random string. If alt is provided, it will replace the first part of the output prefix, e.g. the S3 or GCS bucket.
If wanting to write to a non-random prefix in a non-default S3 bucket, this can be called with alt=“my-alt-bucket” and stem=“my-stem” to generate a path like s3://my-alt-bucket/default-prefix-part/my-stem
Parameter | Type |
---|---|
fs |
typing.Optional[fsspec.spec.AbstractFileSystem] |
alt |
typing.Optional[str] |
stem |
typing.Optional[str] |
get()
def get(
from_path: str,
to_path: str,
recursive: bool,
kwargs,
):
Parameter | Type |
---|---|
from_path |
str |
to_path |
str |
recursive |
bool |
kwargs |
**kwargs |
get_async_filesystem_for_path()
def get_async_filesystem_for_path(
path: str,
anonymous: bool,
kwargs,
):
Parameter | Type |
---|---|
path |
str |
anonymous |
bool |
kwargs |
**kwargs |
get_data()
def get_data(
remote_path: str,
local_path: str,
is_multipart: bool,
kwargs,
):
Parameter | Type |
---|---|
remote_path |
str |
local_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
get_file_tail()
def get_file_tail(
file_path_or_file_name: str,
):
Parameter | Type |
---|---|
file_path_or_file_name |
str |
get_filesystem()
def get_filesystem(
protocol: typing.Optional[str],
anonymous: bool,
path: typing.Optional[str],
kwargs,
):
Parameter | Type |
---|---|
protocol |
typing.Optional[str] |
anonymous |
bool |
path |
typing.Optional[str] |
kwargs |
**kwargs |
get_filesystem_for_path()
def get_filesystem_for_path(
path: str,
anonymous: bool,
kwargs,
):
Parameter | Type |
---|---|
path |
str |
anonymous |
bool |
kwargs |
**kwargs |
get_random_local_directory()
def get_random_local_directory()
get_random_local_path()
def get_random_local_path(
file_path_or_file_name: typing.Optional[str],
):
Use file_path_or_file_name, when you want a random directory, but want to preserve the leaf file name
Parameter | Type |
---|---|
file_path_or_file_name |
typing.Optional[str] |
get_random_remote_directory()
def get_random_remote_directory()
get_random_remote_path()
def get_random_remote_path(
file_path_or_file_name: typing.Optional[str],
):
Parameter | Type |
---|---|
file_path_or_file_name |
typing.Optional[str] |
get_random_string()
def get_random_string()
is_remote()
def is_remote(
path: typing.Union[str, os.PathLike],
):
Deprecated. Let’s find a replacement
Parameter | Type |
---|---|
path |
typing.Union[str, os.PathLike] |
join()
def join(
args: `*args`,
unstrip: bool,
fs: typing.Optional[fsspec.spec.AbstractFileSystem],
):
Parameter | Type |
---|---|
args |
*args |
unstrip |
bool |
fs |
typing.Optional[fsspec.spec.AbstractFileSystem] |
put_data()
def put_data(
local_path: typing.Union[str, os.PathLike],
remote_path: str,
is_multipart: bool,
kwargs,
):
The implication here is that we’re always going to put data to the remote location, so we .remote to ensure we don’t use the true local proxy if the remote path is a file://
Parameter | Type |
---|---|
local_path |
typing.Union[str, os.PathLike] |
remote_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
put_raw_data()
def put_raw_data(
lpath: typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO],
upload_prefix: typing.Optional[str],
file_name: typing.Optional[str],
read_chunk_size_bytes: int,
encoding: str,
skip_raw_data_prefix: bool,
kwargs,
):
This is a more flexible version of put that accepts a file-like object or a string path. Writes to the raw output prefix only. If you want to write to another fs use put_data or get the fsspec file system directly. FYI: Currently the raw output prefix set by propeller is already unique per retry and looks like s3://my-s3-bucket/data/o4/feda4e266c748463a97d-n0-0
If lpath is a folder, then recursive will be set. If lpath is a streamable, then it can only be a single file.
Writes to: {raw output prefix}/{upload_prefix}/{file_name}
Parameter | Type |
---|---|
lpath |
typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO] |
upload_prefix |
typing.Optional[str] |
file_name |
typing.Optional[str] |
read_chunk_size_bytes |
int |
encoding |
str |
skip_raw_data_prefix |
bool |
kwargs |
**kwargs |
recursive_paths()
def recursive_paths(
f: str,
t: str,
):
Parameter | Type |
---|---|
f |
str |
t |
str |
sep()
def sep(
file_system: typing.Optional[fsspec.spec.AbstractFileSystem],
):
Parameter | Type |
---|---|
file_system |
typing.Optional[fsspec.spec.AbstractFileSystem] |
strip_file_header()
def strip_file_header(
path: str,
trim_trailing_sep: bool,
):
Drops file:// if it exists from the file
Parameter | Type |
---|---|
path |
str |
trim_trailing_sep |
bool |
upload()
def upload(
file_path: str,
to_path: str,
kwargs,
):
Parameter | Type |
---|---|
file_path |
str |
to_path |
str |
kwargs |
**kwargs |
upload_directory()
def upload_directory(
local_path: str,
remote_path: str,
kwargs,
):
Parameter | Type |
---|---|
local_path |
str |
remote_path |
str |
kwargs |
**kwargs |
Properties
Property | Type | Description |
---|---|---|
data_config | ||
local_access | ||
local_sandbox_dir | ||
raw_output_fs | ||
raw_output_prefix |
flytekit.remote.data.Literal
def Literal(
scalar: typing.Optional[flytekit.models.literals.Scalar],
collection: typing.Optional[flytekit.models.literals.LiteralCollection],
map: typing.Optional[flytekit.models.literals.LiteralMap],
hash: typing.Optional[str],
metadata: typing.Optional[typing.Dict[str, str]],
offloaded_metadata: typing.Optional[flytekit.models.literals.LiteralOffloadedMetadata],
):
This IDL message represents a literal value in the Flyte ecosystem.
Parameter | Type |
---|---|
scalar |
typing.Optional[flytekit.models.literals.Scalar] |
collection |
typing.Optional[flytekit.models.literals.LiteralCollection] |
map |
typing.Optional[flytekit.models.literals.LiteralMap] |
hash |
typing.Optional[str] |
metadata |
typing.Optional[typing.Dict[str, str]] |
offloaded_metadata |
typing.Optional[flytekit.models.literals.LiteralOffloadedMetadata] |
Methods
Method | Description |
---|---|
from_flyte_idl() |
|
serialize_to_string() |
None |
set_metadata() |
Note: This is a mutation on the literal |
short_string() |
|
to_flyte_idl() |
|
verbose_string() |
from_flyte_idl()
def from_flyte_idl(
pb2_object: flyteidl.core.literals_pb2.Literal,
):
Parameter | Type |
---|---|
pb2_object |
flyteidl.core.literals_pb2.Literal |
serialize_to_string()
def serialize_to_string()
set_metadata()
def set_metadata(
metadata: typing.Dict[str, str],
):
Note: This is a mutation on the literal
Parameter | Type |
---|---|
metadata |
typing.Dict[str, str] |
short_string()
def short_string()
to_flyte_idl()
def to_flyte_idl()
verbose_string()
def verbose_string()
Properties
Property | Type | Description |
---|---|---|
collection | ||
hash | ||
is_empty | ||
map | ||
metadata | ||
offloaded_metadata | ||
scalar | ||
value |
flytekit.remote.data.RichCallback
Base class and interface for callback mechanism
This class can be used directly for monitoring file transfers by
providing callback=Callback(hooks=...)
(see the hooks
argument,
below), or subclassed for more specialised behaviour.
Parameters
size: int (optional)
Nominal quantity for the value that corresponds to a complete
transfer, e.g., total number of tiles or total number of
bytes
value: int (0)
Starting internal counter value
hooks: dict or None
A dict of named functions to be called on each update. The signature
of these must be f(size, value, **kwargs)
def RichCallback(
rich_kwargs: typing.Optional[typing.Dict],
kwargs,
):
Parameter | Type |
---|---|
rich_kwargs |
typing.Optional[typing.Dict] |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
absolute_update() |
Set the internal value state |
as_callback() |
Transform callback= |
branch() |
Set callbacks for child transfers |
branch_coro() |
Wraps a coroutine, and pass a new child callback to it |
branched() |
Return callback for child transfers |
call() |
Execute hook(s) with current state |
close() |
Close callback |
no_op() |
None |
relative_update() |
Delta increment the internal counter |
set_size() |
Set the internal maximum size attribute |
wrap() |
Wrap an iterable to call relative_update on each iterations |
absolute_update()
def absolute_update(
value,
):
Set the internal value state
Triggers call()
Parameters
value: int
Parameter | Type |
---|---|
value |
as_callback()
def as_callback(
maybe_callback,
):
Transform callback=… into Callback instance
For the special value of None
, return the global instance of
NoOpCallback
. This is an alternative to including
callback=DEFAULT_CALLBACK
directly in a method signature.
Parameter | Type |
---|---|
maybe_callback |
branch()
def branch(
path_1,
path_2,
kwargs,
):
Set callbacks for child transfers
If this callback is operating at a higher level, e.g., put, which may
trigger transfers that can also be monitored. The passed kwargs are
to be mutated to add callback=
, if this class supports branching
to children.
Parameters
path_1: str Child’s source path path_2: str Child’s destination path kwargs: dict arguments passed to child method, e.g., put_file.
Returns
Parameter | Type |
---|---|
path_1 |
|
path_2 |
|
kwargs |
**kwargs |
branch_coro()
def branch_coro(
fn,
):
Wraps a coroutine, and pass a new child callback to it.
Parameter | Type |
---|---|
fn |
branched()
def branched(
path_1,
path_2,
kwargs,
):
Return callback for child transfers
If this callback is operating at a higher level, e.g., put, which may
trigger transfers that can also be monitored. The function returns a callback
that has to be passed to the child method, e.g., put_file,
as callback=
argument.
The implementation uses callback.branch
for compatibility.
When implementing callbacks, it is recommended to override this function instead
of branch
and avoid calling super().branched(...)
.
Prefer using this function over branch
.
Parameters
path_1: str Child’s source path path_2: str Child’s destination path **kwargs: Arbitrary keyword arguments
Returns
callback: Callback A callback instance to be passed to the child method
Parameter | Type |
---|---|
path_1 |
|
path_2 |
|
kwargs |
**kwargs |
call()
def call(
hook_name,
kwargs,
):
Execute hook(s) with current state
Each function is passed the internal size and current value
Parameters
hook_name: str or None If given, execute on this hook kwargs: passed on to (all) hook(s)
Parameter | Type |
---|---|
hook_name |
|
kwargs |
**kwargs |
close()
def close()
Close callback.
no_op()
def no_op(
_,
__,
):
Parameter | Type |
---|---|
_ |
|
__ |
relative_update()
def relative_update(
inc,
):
Delta increment the internal counter
Triggers call()
Parameters
inc: int
Parameter | Type |
---|---|
inc |
set_size()
def set_size(
size,
):
Set the internal maximum size attribute
Usually called if not initially set at instantiation. Note that this
triggers a call()
.
Parameters
size: int
Parameter | Type |
---|---|
size |
wrap()
def wrap(
iterable,
):
Wrap an iterable to call relative_update
on each iterations
Parameters
iterable: Iterable The iterable that is being wrapped
Parameter | Type |
---|---|
iterable |