FlyteFile and FlyteDirectory

Upload location

With Union.ai Serverless, the remote location to which FlyteFile and FlyteDirectory upload container-local files is always a randomly generated (universally unique) location in Union.ai’s internal object store. It cannot be changed.

With Union.ai BYOC, the upload location is configurable. See FlyteFile and FlyteDirectory > Changing the data upload location.

Remote examples

Remote file example

In the example above, we started with a local file. To preserve that file across the task boundary, Union.ai uploaded it to the Union.ai object store before passing it to the next task.

You can also start with a remote file, simply by initializing the FlyteFile object with a URI pointing to a remote source. For example:

@union.task
def task_1() -> union.FlyteFile:
    remote_path = "https://people.sc.fsu.edu/~jburkardt/data/csv/biostats.csv"
    return union.FlyteFile(path=remote_path)

In this case, no uploading is needed because the source file is already in a remote location. When the object is passed out of the task, it is converted into a Blob with the remote path as the URI. After the FlyteFile is passed to the next task, you can call FlyteFile.open() on it, just as before.

If you don’t intend on passing the FlyteFile to the next task, and rather intend to open the contents of the remote file within the task, you can use from_source.

@union.task
def load_json():
    uri = "gs://my-bucket/my-directory/example.json"
    my_json = FlyteFile.from_source(uri)

    # Load the JSON file into a dictionary and print it
    with open(my_json, "r") as json_file:
        data = json.load(json_file)
    print(data)

When initializing a FlyteFile with a remote file location, all URI schemes supported by fsspec are supported, including http, https(Web), gs (Google Cloud Storage), s3 (AWS S3), abfs, and abfss (Azure Blob Filesystem).

Remote directory example

Below is an equivalent remote example for FlyteDirectory. The process of passing the FlyteDirectory between tasks is essentially identical to the FlyteFile example above.

@union.task
def task1() -> union.FlyteDirectory:
    p = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
    return union.FlyteDirectory(p)

@union.task
def task2(fd: union.FlyteDirectory): # Get a list of the directory contents and display the first csv
    files = union.FlyteDirectory.listdir(fd)
    with open(files[0], mode="r") as f:
    d = f.read()
    print(f"The first csv is: \n{d}")

@union.workflow
def workflow():
    fd = task1()
    task2(fd=fd)

Streaming

In the above examples, we showed how to access the contents of FlyteFile by calling FlyteFile.open(). The object returned by FlyteFile.open() is a stream. In the above examples, the files were small, so a simple read() was used. But for large files, you can iterate through the contents of the stream:

@union.task
def task_1() -> union.FlyteFile:
    remote_path = "https://sample-videos.com/csv/Sample-Spreadsheet-100000-rows.csv"
    return union.FlyteFile(path=remote_path)

@union.task
def task_2(ff: union.FlyteFile):
    with ff.open(mode="r") as f
    for row in f:
        do_something(row)

Downloading

Alternative, you can download the contents of a FlyteFile object to a local file in the task container. There are two ways to do this: implicitly and explicitly.

Implicit downloading

The source file of a FlyteFile object is downloaded to the local container file system automatically whenever a function is called that takes the FlyteFile object and then calls FlyteFile’s __fspath__() method.

FlyteFile implements the os.PathLike interface and therefore the __fspath__() method. FlyteFile’s implementation of __fspath__() performs a download of the source file to the local container storage and returns the path to that local file. This enables many common file-related operations in Python to be performed on the FlyteFile object.

The most prominent example of such an operation is calling Python’s built-in open() method with a FlyteFile:

@union.task
def task_2(ff: union.FlyteFile):
    with open(ff, mode="r") as f
    file_contents= f.read()
open() vs ff.open()

Note the difference between

ff.open(mode="r")

and

open(ff, mode="r")

The former calls the FlyteFile.open() method and returns an iterator without downloading the file. The latter calls the built-in Python function open(), downloads the specified FlyteFile to the local container file system, and returns a handle to that file.

Many other Python file operations (essentially, any that accept an os.PathLike object) can also be performed on a FlyteFile object and result in an automatic download.

See Downloading with FlyteFile and FlyteDirectory for more information.

Explicit downloading

You can also explicitly download a FlyteFile to the local container file system by calling FlyteFile.download():

@union.task
def task_2(ff: union.FlyteFile):
    local_path = ff.download()

This method is typically used when you want to download the file without immediately reading it.

Typed aliases

The Union SDK defines some aliases of FlyteFile with specific type annotations. Specifically, FlyteFile has the following aliases for specific file types:

  • HDF5EncodedFile
  • HTMLPage
  • JoblibSerializedFile
  • JPEGImageFile
  • PDFFile
  • PNGImageFile
  • PythonPickledFile
  • PythonNotebook
  • SVGImageFile

Similarly, FlyteDirectory has the following aliases:

  • TensorboardLogs
  • TFRecordsDirectory

These aliases can optionally be used when handling a file or directory of the specified type, although the object itself will still be a FlyteFile or FlyteDirectory. The aliased versions of the classes are syntactic markers that enforce agreement between type annotations in the signatures of task functions, but they do not perform any checks on the actual contents of the file.