FlyteFile and FlyteDirectory

FlyteFile

Files are one of the most fundamental entities that users of Python work with, and they are fully supported by Flyte. In the IDL, they are known as Blob literals which are backed by the blob type.

Let’s assume our mission here is pretty simple. We download a few CSV file links, read them with the python built-in csv.DictReader function, normalize some pre-specified columns, and output the normalized columns to another CSV file.

To clone and run the example code on this page, see the Flytesnacks repo.

First, import the libraries:

import csv
from collections import defaultdict
from pathlib import Path
from typing import List

import flytekit as fl

Define a task that accepts FlyteFile as an input. The following is a task that accepts a FlyteFile, a list of column names, and a list of column names to normalize. The task then outputs a CSV file containing only the normalized columns. For this example, we use z-score normalization, which involves mean-centering and standard-deviation-scaling.

The FlyteFile literal can be scoped with a string, which gets inserted into the format of the Blob type (“jpeg” is the string in FlyteFile[typing.TypeVar("jpeg")]). The format is entirely optional, and if not specified, defaults to "". Predefined aliases for commonly used flyte file formats are also available. You can find them here.

@fl.task
def normalize_columns(
    csv_url: fl.FlyteFile,
    column_names: List[str],
    columns_to_normalize: List[str],
    output_location: str,
) -> fl.FlyteFile:
    # read the data from the raw csv file
    parsed_data = defaultdict(list)
    with open(csv_url, newline="\n") as input_file:
        reader = csv.DictReader(input_file, fieldnames=column_names)
        next(reader)  # Skip header
        for row in reader:
            for column in columns_to_normalize:
                parsed_data[column].append(float(row[column].strip()))

    # normalize the data
    normalized_data = defaultdict(list)
    for colname, values in parsed_data.items():
        mean = sum(values) / len(values)
        std = (sum([(x - mean) ** 2 for x in values]) / len(values)) ** 0.5
        normalized_data[colname] = [(x - mean) / std for x in values]

    # write to local path
    out_path = str(Path(fl.current_context().working_directory) / f"normalized-{Path(csv_url.path).stem}.csv")
    with open(out_path, mode="w") as output_file:
        writer = csv.DictWriter(output_file, fieldnames=columns_to_normalize)
        writer.writeheader()
        for row in zip(*normalized_data.values()):
            writer.writerow({k: row[i] for i, k in enumerate(columns_to_normalize)})

    if output_location:
        return fl.FlyteFile(path=str(out_path), remote_path=output_location)
    else:
        return fl.FlyteFile(path=str(out_path))

When the image URL is sent to the task, the system translates it into a FlyteFile object on the local drive (but doesn’t download it). The act of calling the download() method should trigger the download, and the path attribute enables to open the file.

If the output_location argument is specified, it will be passed to the remote_path argument of FlyteFile, which will use that path as the storage location instead of a random location (Flyte’s object store).

When this task finishes, the system returns the FlyteFile instance, uploads the file to the location, and creates a blob literal pointing to it.

Lastly, define a workflow. The normalize_csv_files workflow has an output_location argument which is passed to the location input of the task. If it’s not an empty string, the task attempts to upload its file to that location.

@fl.workflow
def normalize_csv_file(
    csv_url: fl.FlyteFile,
    column_names: List[str],
    columns_to_normalize: List[str],
    output_location: str = "",
) -> fl.FlyteFile:
    return normalize_columns(
        csv_url=csv_url,
        column_names=column_names,
        columns_to_normalize=columns_to_normalize,
        output_location=output_location,
    )

You can run the workflow locally as follows:

if __name__ == "__main__":
    default_files = [
        (
            "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/test_data/biostats.csv",
            ["Name", "Sex", "Age", "Heights (in)", "Weight (lbs)"],
            ["Age"],
        ),
        (
            "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/test_data/faithful.csv",
            ["Index", "Eruption length (mins)", "Eruption wait (mins)"],
            ["Eruption length (mins)"],
        ),
    ]
    print(f"Running {__file__} main...")
    for index, (csv_url, column_names, columns_to_normalize) in enumerate(default_files):
        normalized_columns = normalize_csv_file(
            csv_url=csv_url,
            column_names=column_names,
            columns_to_normalize=columns_to_normalize,
        )
        print(f"Running normalize_csv_file workflow on {csv_url}: " f"{normalized_columns}")

You can enable type validation if you have the python-magic package installed.

Mac OSLinux
$  brew install libmagic
$ sudo apt-get install libmagic1

Currently, type validation is only supported on the Mac OS and Linux platforms.

Streaming support

Flyte 1.5 introduced support for streaming FlyteFile types via the fsspec library. This integration enables efficient, on-demand access to remote files, eliminating the need for fully downloading them to local storage.

This feature is marked as experimental. We’d love feedback on the API! @Peeter we should provide a link here for people to give feedback, thoughts?

Here is a simple example of removing some columns from a CSV file and writing the result to a new file:

@fl.task()
def remove_some_rows(ff: fl.FlyteFile) -> fl.FlyteFile:
    """
    Remove the rows that the value of city is 'Seattle'.
    This is an example with streaming support.
    """
    new_file = fl.FlyteFile.new_remote_file("data_without_seattle.csv")
    with ff.open("r") as r:
        with new_file.open("w") as w:
            df = pd.read_csv(r)
            df = df[df["City"] != "Seattle"]
            df.to_csv(w, index=False)

FlyteDirectory

In addition to files, folders are another fundamental operating system primitive. Flyte supports folders in the form of multi-part blobs.

To clone and run the example code on this page, see the Flytesnacks repo.

To begin, import the libraries:

import csv
import urllib.request
from collections import defaultdict
from pathlib import Path
from typing import List

import flytekit as fl

Building upon the previous example demonstrated in the FlyteFile section, let’s continue by considering the normalization of columns in a CSV file.

The following task downloads a list of URLs pointing to CSV files and returns the folder path in a FlyteDirectory object.

@fl.task
def download_files(csv_urls: List[str]) -> union.FlyteDirectory:
    working_dir = fl.current_context().working_directory
    local_dir = Path(working_dir) / "csv_files"
    local_dir.mkdir(exist_ok=True)

    # get the number of digits needed to preserve the order of files in the local directory
    zfill_len = len(str(len(csv_urls)))
    for idx, remote_location in enumerate(csv_urls):
        # prefix the file name with the index location of the file in the original csv_urls list
        local_image = Path(local_dir) / f"{str(idx).zfill(zfill_len)}_{Path(remote_location).name}"
        urllib.request.urlretrieve(remote_location, local_image)
    return fl.FlyteDirectory(path=str(local_dir))

You can annotate a FlyteDirectory when you want to download or upload the contents of the directory in batches. For example,

@fl.task
def t1(directory: Annotated[fl.FlyteDirectory, BatchSize(10)]) -> Annotated[fl.FlyteDirectory, BatchSize(100)]:
    ...
    return fl.FlyteDirectory(...)

Flytekit efficiently downloads files from the specified input directory in 10-file chunks.
It then loads these chunks into memory before writing them to the local disk.
The process repeats for subsequent sets of 10 files.
Similarly, for outputs, Flytekit uploads the resulting directory in chunks of 100.

We define a helper function to normalize the columns in-place.

This is a plain Python function that will be called in a subsequent Flyte task. This example demonstrates how Flyte tasks are simply entrypoints of execution, which can themselves call other functions and routines that are written in pure Python.

def normalize_columns(
    local_csv_file: str,
    column_names: List[str],
    columns_to_normalize: List[str],
):
    # read the data from the raw csv file
    parsed_data = defaultdict(list)
    with open(local_csv_file, newline="\n") as input_file:
        reader = csv.DictReader(input_file, fieldnames=column_names)
        for row in (x for i, x in enumerate(reader) if i > 0):
            for column in columns_to_normalize:
                parsed_data[column].append(float(row[column].strip()))

    # normalize the data
    normalized_data = defaultdict(list)
    for colname, values in parsed_data.items():
        mean = sum(values) / len(values)
        std = (sum([(x - mean) ** 2 for x in values]) / len(values)) ** 0.5
        normalized_data[colname] = [(x - mean) / std for x in values]

    # overwrite the csv file with the normalized columns
    with open(local_csv_file, mode="w") as output_file:
        writer = csv.DictWriter(output_file, fieldnames=columns_to_normalize)
        writer.writeheader()
        for row in zip(*normalized_data.values()):
            writer.writerow({k: row[i] for i, k in enumerate(columns_to_normalize)})

We then define a task that accepts the previously downloaded folder, along with some metadata about the column names of each file in the directory and the column names that we want to normalize.

@fl.task
def normalize_all_files(
    csv_files_dir: fl.FlyteDirectory,
    columns_metadata: List[List[str]],
    columns_to_normalize_metadata: List[List[str]],
) -> union.FlyteDirectory:
    for local_csv_file, column_names, columns_to_normalize in zip(
        # make sure we sort the files in the directory to preserve the original order of the csv urls
        list(sorted(Path(csv_files_dir).iterdir())),
        columns_metadata,
        columns_to_normalize_metadata,
    ):
        normalize_columns(local_csv_file, column_names, columns_to_normalize)
    return fl.FlyteDirectory(path=csv_files_dir.path)

Compose all the above tasks into a workflow. This workflow accepts a list of URL strings pointing to a remote location containing a CSV file, a list of column names associated with each CSV file, and a list of columns that we want to normalize.

@fl.workflow
def download_and_normalize_csv_files(
    csv_urls: List[str],
    columns_metadata: List[List[str]],
    columns_to_normalize_metadata: List[List[str]],
) -> fl.FlyteDirectory:
    directory = download_files(csv_urls=csv_urls)
    return normalize_all_files(
        csv_files_dir=directory,
        columns_metadata=columns_metadata,
        columns_to_normalize_metadata=columns_to_normalize_metadata,
    )

You can run the workflow locally as follows:

if __name__ == "__main__":
    csv_urls = [
        "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/test_data/biostats.csv",
        "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/test_data/faithful.csv",
    ]
    columns_metadata = [
        ["Name", "Sex", "Age", "Heights (in)", "Weight (lbs)"],
        ["Index", "Eruption length (mins)", "Eruption wait (mins)"],
    ]
    columns_to_normalize_metadata = [
        ["Age"],
        ["Eruption length (mins)"],
    ]

    print(f"Running {__file__} main...")
    directory = download_and_normalize_csv_files(
        csv_urls=csv_urls,
        columns_metadata=columns_metadata,
        columns_to_normalize_metadata=columns_to_normalize_metadata,
    )
    print(f"Running download_and_normalize_csv_files on {csv_urls}: " f"{directory}")

Changing the data upload location

Upload location

With Flyte Serverless, the remote location to which FlyteFile and FlyteDirectory upload container-local files is always a randomly generated (universally unique) location in Flyte’s internal object store. It cannot be changed.

With Flyte BYOC, the upload location is configurable.

By default, Flyte uploads local files or directories to the default raw data store (Flyte’s dedicated internal object store). However, you can change the upload location by setting the raw data prefix to your own bucket or specifying the remote_path for a FlyteFile or FlyteDirectory.

Setting up your own object store bucket

For details on how to set up your own object store bucket, consult the direction for your cloud provider:

Changing the raw data prefix

If you would like files or directories to be uploaded to your own bucket, you can specify the AWS, GCS, or Azure bucket in the raw data prefix parameter at the workflow level on registration or per execution on the command line or in the UI. This setting can be done at the workflow level on registration or per execution on the command line or in the UI.

Flyte will create a directory with a unique, random name in your bucket for each FlyteFile or FlyteDirectory data write to guarantee that you never overwrite your data.

Specifying remote_path for a FlyteFile or FlyteDirectory

If you specify the remote_path when initializing your FlyteFile (or FlyteDirectory), the underlying data is written to that precise location with no randomization.

Using remote_path will overwrite data

If you set remote_path to a static string, subsequent runs of the same task will overwrite the file. If you want to use a dynamically generated path, you will have to generate it yourself.

Remote examples

Remote file example

In the example above, we started with a local file. To preserve that file across the task boundary, Flyte uploaded it to the Flyte object store before passing it to the next task.

You can also start with a remote file, simply by initializing the FlyteFile object with a URI pointing to a remote source. For example:

@fl.task
def task_1() -> fl.FlyteFile:
    remote_path = "https://people.sc.fsu.edu/~jburkardt/data/csv/biostats.csv"
    return fl.FlyteFile(path=remote_path)

In this case, no uploading is needed because the source file is already in a remote location. When the object is passed out of the task, it is converted into a Blob with the remote path as the URI. After the FlyteFile is passed to the next task, you can call FlyteFile.open() on it, just as before.

If you don’t intend on passing the FlyteFile to the next task, and rather intend to open the contents of the remote file within the task, you can use from_source.

@fl.task
def load_json():
    uri = "gs://my-bucket/my-directory/example.json"
    my_json = FlyteFile.from_source(uri)

    # Load the JSON file into a dictionary and print it
    with open(my_json, "r") as json_file:
        data = json.load(json_file)
    print(data)

When initializing a FlyteFile with a remote file location, all URI schemes supported by fsspec are supported, including http, https(Web), gs (Google Cloud Storage), s3 (AWS S3), abfs, and abfss (Azure Blob Filesystem).

Remote directory example

Below is an equivalent remote example for FlyteDirectory. The process of passing the FlyteDirectory between tasks is essentially identical to the FlyteFile example above.

@fl.task
def task1() -> fl.FlyteDirectory:
    p = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
    return fl.FlyteDirectory(p)

@fl.task
def task2(fd: fl.FlyteDirectory): # Get a list of the directory contents and display the first csv
    files = fl.FlyteDirectory.listdir(fd)
    with open(files[0], mode="r") as f:
    d = f.read()
    print(f"The first csv is: \n{d}")

@fl.workflow
def workflow():
    fd = task1()
    task2(fd=fd)

Streaming

In the above examples, we showed how to access the contents of FlyteFile by calling FlyteFile.open(). The object returned by FlyteFile.open() is a stream. In the above examples, the files were small, so a simple read() was used. But for large files, you can iterate through the contents of the stream:

@fl.task
def task_1() -> fl.FlyteFile:
    remote_path = "https://sample-videos.com/csv/Sample-Spreadsheet-100000-rows.csv"
    return fl.FlyteFile(path=remote_path)

@fl.task
def task_2(ff: fl.FlyteFile):
    with ff.open(mode="r") as f
    for row in f:
        do_something(row)

Downloading

Alternative, you can download the contents of a FlyteFile object to a local file in the task container. There are two ways to do this: implicitly and explicitly.

Implicit downloading

The source file of a FlyteFile object is downloaded to the local container file system automatically whenever a function is called that takes the FlyteFile object and then calls FlyteFile’s __fspath__() method.

FlyteFile implements the os.PathLike interface and therefore the __fspath__() method. FlyteFile’s implementation of __fspath__() performs a download of the source file to the local container storage and returns the path to that local file. This enables many common file-related operations in Python to be performed on the FlyteFile object.

The most prominent example of such an operation is calling Python’s built-in open() method with a FlyteFile:

@fl.task
def task_2(ff: fl.FlyteFile):
    with open(ff, mode="r") as f
    file_contents= f.read()
open() vs ff.open()

Note the difference between

ff.open(mode="r")

and

open(ff, mode="r")

The former calls the FlyteFile.open() method and returns an iterator without downloading the file. The latter calls the built-in Python function open(), downloads the specified FlyteFile to the local container file system, and returns a handle to that file.

Many other Python file operations (essentially, any that accept an os.PathLike object) can also be performed on a FlyteFile object and result in an automatic download.

See Downloading with FlyteFile and FlyteDirectory for more information.

Explicit downloading

You can also explicitly download a FlyteFile to the local container file system by calling FlyteFile.download():

@fl.task
def task_2(ff: fl.FlyteFile):
    local_path = ff.download()

This method is typically used when you want to download the file without immediately reading it.

Typed aliases

The Flytekit SDK defines some aliases of FlyteFile with specific type annotations. Specifically, FlyteFile has the following aliases for specific file types:

  • HDF5EncodedFile
  • HTMLPage
  • JoblibSerializedFile
  • JPEGImageFile
  • PDFFile
  • PNGImageFile
  • PythonPickledFile
  • PythonNotebook
  • SVGImageFile

Similarly, FlyteDirectory has the following aliases:

  • TensorboardLogs
  • TFRecordsDirectory

These aliases can optionally be used when handling a file or directory of the specified type, although the object itself will still be a FlyteFile or FlyteDirectory. The aliased versions of the classes are syntactic markers that enforce agreement between type annotations in the signatures of task functions, but they do not perform any checks on the actual contents of the file.