FlyteFile and FlyteDirectory
With Union.ai Serverless, the remote location to which FlyteFile
and FlyteDirectory
upload container-local files is always a randomly generated (universally unique) location in Union.ai’s internal object store. It cannot be changed.
With Union.ai BYOC, the upload location is configurable. See FlyteFile and FlyteDirectory > Changing the data upload location.
Remote examples
Remote file example
In the example above, we started with a local file. To preserve that file across the task boundary, Union.ai uploaded it to the Union.ai object store before passing it to the next task.
You can also start with a remote file, simply by initializing the FlyteFile
object with a URI pointing to a remote source. For example:
@union.task
def task_1() -> union.FlyteFile:
remote_path = "https://people.sc.fsu.edu/~jburkardt/data/csv/biostats.csv"
return union.FlyteFile(path=remote_path)
In this case, no uploading is needed because the source file is already in a remote location.
When the object is passed out of the task, it is converted into a Blob
with the remote path as the URI.
After the FlyteFile
is passed to the next task, you can call FlyteFile.open()
on it, just as before.
If you don’t intend on passing the FlyteFile
to the next task, and rather intend to open the contents of the remote file within the task, you can use from_source
.
@union.task
def load_json():
uri = "gs://my-bucket/my-directory/example.json"
my_json = FlyteFile.from_source(uri)
# Load the JSON file into a dictionary and print it
with open(my_json, "r") as json_file:
data = json.load(json_file)
print(data)
When initializing a FlyteFile
with a remote file location, all URI schemes supported by fsspec
are supported, including http
, https
(Web), gs
(Google Cloud Storage), s3
(AWS S3), abfs
, and abfss
(Azure Blob Filesystem).
Remote directory example
Below is an equivalent remote example for FlyteDirectory
. The process of passing the FlyteDirectory
between tasks is essentially identical to the FlyteFile
example above.
@union.task
def task1() -> union.FlyteDirectory:
p = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
return union.FlyteDirectory(p)
@union.task
def task2(fd: union.FlyteDirectory): # Get a list of the directory contents and display the first csv
files = union.FlyteDirectory.listdir(fd)
with open(files[0], mode="r") as f:
d = f.read()
print(f"The first csv is: \n{d}")
@union.workflow
def workflow():
fd = task1()
task2(fd=fd)
Streaming
In the above examples, we showed how to access the contents of FlyteFile
by calling FlyteFile.open()
.
The object returned by FlyteFile.open()
is a stream. In the above examples, the files were small, so a simple read()
was used.
But for large files, you can iterate through the contents of the stream:
@union.task
def task_1() -> union.FlyteFile:
remote_path = "https://sample-videos.com/csv/Sample-Spreadsheet-100000-rows.csv"
return union.FlyteFile(path=remote_path)
@union.task
def task_2(ff: union.FlyteFile):
with ff.open(mode="r") as f
for row in f:
do_something(row)
Downloading
Alternative, you can download the contents of a FlyteFile
object to a local file in the task container.
There are two ways to do this: implicitly and explicitly.
Implicit downloading
The source file of a FlyteFile
object is downloaded to the local container file system automatically whenever a function is called that takes the FlyteFile
object and then calls FlyteFile
’s __fspath__()
method.
FlyteFile
implements the os.PathLike
interface and therefore the __fspath__()
method.
FlyteFile
’s implementation of __fspath__()
performs a download of the source file to the local container storage and returns the path to that local file.
This enables many common file-related operations in Python to be performed on the FlyteFile
object.
The most prominent example of such an operation is calling Python’s built-in open()
method with a FlyteFile
:
@union.task
def task_2(ff: union.FlyteFile):
with open(ff, mode="r") as f
file_contents= f.read()
Note the difference between
ff.open(mode="r")
and
open(ff, mode="r")
The former calls the FlyteFile.open()
method and returns an iterator without downloading the file.
The latter calls the built-in Python function open()
, downloads the specified FlyteFile
to the local container file system,
and returns a handle to that file.
Many other Python file operations (essentially, any that accept an os.PathLike
object) can also be performed on a FlyteFile
object and result in an automatic download.
See Downloading with FlyteFile and FlyteDirectory for more information.
Explicit downloading
You can also explicitly download a FlyteFile
to the local container file system by calling FlyteFile.download()
:
@union.task
def task_2(ff: union.FlyteFile):
local_path = ff.download()
This method is typically used when you want to download the file without immediately reading it.
Typed aliases
The Union SDK defines some aliases of FlyteFile
with specific type annotations.
Specifically, FlyteFile
has the following aliases for specific file types:
HDF5EncodedFile
HTMLPage
JoblibSerializedFile
JPEGImageFile
PDFFile
PNGImageFile
PythonPickledFile
PythonNotebook
SVGImageFile
Similarly, FlyteDirectory
has the following aliases:
TensorboardLogs
TFRecordsDirectory
These aliases can optionally be used when handling a file or directory of the specified type, although the object itself will still be a FlyteFile
or FlyteDirectory
.
The aliased versions of the classes are syntactic markers that enforce agreement between type annotations in the signatures of task functions, but they do not perform any checks on the actual contents of the file.