Niels Bantilan

Structured Dataset

Tabular data is ubiquitous today and nowhere more so than in data-engineering and data science tasks, where inputs and outputs are typically 2-dimensional vectors or higher-dimension hypercubes. 

As with most type systems, Python has primitives, container types like maps and tuples, and support for user-defined structures. However, while there’s a rich variety of dataframe classes (pandas, spark, pandera, etc.), there’s no native Python type that represents a dataframe in the abstract. This is the gap that the StructuredDataset type is meant to fill.  It is for this same reason that flytekit ships with the FlyteFile, and FlyteDirectory types. This type was introduced into Flyte in Q4 2021, and over the past year we’ve refined its implementation and squashed a few bugs.

Readers whom have been with us on our journey for a while may be questioning why this type feels similar to another type, FlyteSchema. FlyteSchema was designed for exactly the same reason, but this initial implementation was lacking in a couple respects:

  • The column type support was limited. Container types like lists and maps cannot be expressed
  • Extensibility wasn’t straightforward. In particular, knowing how to choose a specific plugin to handle a specific input or output schema was confusing and didn’t generalize well to situations like a task returning two dataframes that needed to be written to two different storage locations.

Readers may also be wondering what benefit such a type serves at all.  Indeed when Flyte started, dataframes were represented simply as a directory of files. With an explicit type though, and the increased transparency that brings, Flyte is able to

  • Eliminate boilerplate code users would otherwise need to write to serialize/deserialize from file objects into dataframe instances,
  • Eliminate additional inputs/outputs that convey metadata around the format of the tabular data held in those files; not everything fits in the uri and format that the FlyteDirectory object provides,
  • Add flexibility around how dataframe files are loaded; the base FlyteDirectory transformer is pretty limited,
  • Offer a range of dataframe specific functionality - enforce compatibility of different schemas (not only at compile time, but also runtime since type information is carried along in the literal), store third-party schema definitions, and potentially in the future, render sample data, provide summary statistics, etc.

When designing the new type, we wanted to keep the experience as close to pure Python and existing flytekit as possible. To that end, users can still take advantage of this new type by merely declaring

Copied to clipboard!
@task
def generate_pandas() -> pd.DataFrame:
    return my_pd_df

Flytekit will detect the pandas dataframe return signature and convert the interface for the task to the new StructuredDataset type. Using this simplest form however, the user is not able to set the additional dataframe information alluded to above,

  • Serialized byte format
  • Storage driver and location
  • Column type information
  • Additional 3rd party schema information

This is by design as we wanted the default case to suffice for the majority of use-cases, and to require as few keystrokes or changes to existing code as possible. Specifying these is simple however, and relies on Python variable annotations, which is designed expressly to supplement types with arbitrary metadata.

Adding column and desired serialization format would look like this.

Copied to clipboard!
my_cols = OrderedDict([("Name", str), ("Age", int)])

@task
def generate_pandas() -> Annotated[pd.DataFrame, my_cols, "avro"]: ...

To understand how these are used, and to understand how to specify storage location information, let’s first discuss how the Structured Dataset plugin environment works.

Two things need to happen with any dataframe instance when interacting with Flyte

  • Serialization/deserialization from/to the Python instance to bytes (in the format specified above).
  • Transmission/retrieval of those bits to/from somewhere.

Each Structured Dataset plugin (called Encoder or Decoder in the code below), will need to perform both of these steps. Flytekit decides which of the loaded plugins to invoke based on three attributes:

  1. The byte format
  2. The storage location
  3. The Python type in the task or workflow signature.

These three keys uniquely identify which encoder (used then converting a dataframe in Python memory to a Flyte value, e.g. when a task finishes and returns a dataframe) or decoder (used when hydrating a dataframe in memory from a Flyte value, e.g. when a task starts and has a dataframe input) to invoke.

However, it is awkward to require users to use typing.Annotated on every signature. Therefore, flytekit has a default byte-format for every Python dataframe type registered with flytekit. The storage backend is also simple. By default, the data will be written to the same place that all other pointer-types (FlyteFile, FlyteDirectory, etc.) are written to. This is controlled by the output data prefix option in Flyte which is configurable on multiple levels. That is to say, in the simple default case

Copied to clipboard!
@task
def generate_pandas() -> pd.DataFrame: ...

when this task returns, flytekit will,

  1. Look up the default format for pandas dataframes, 
  2. Look up the default storage location based on the raw output prefix setting,
  3. Use these two settings to select an Encoder and invoke it.

From here, the Encoder is responsible for returning a Flyte Structured Dataset literal representing the dataframe. It is the expectation that the dataframe is properly serialized and stored of course by the encoder.

What about in the more complex case? How would a task return say two dataframes.

  • the first dataframe be written to BigQuery and serialized by one of their libraries,
  • the second needs to be serialized to csv and written at a specific location in GCS different from the generic pointer-data bucket

To trigger non-default behavior, users will have to use a new object.

Copied to clipboard!
from flytekit import StructuredDataset
@task
def t1() -> typing.Tuple[StructuredDataset, StructuredDataset]:
    ...
    return StructuredDataset(df1, uri="bq://project:flyte.table"), \
           StructuredDataset(df2, uri="gs://auxiliary-bucket/data")

Basically, if you want the default behavior (which is itself configurable based on which plugins are loaded), you can work just with your current raw dataframe classes. If you want to customize the Flyte interaction behavior, you’ll need to wrap your dataframe in a StructuredDataset wrapper object.

Note also that no format was specified in either of the StructuredDataset constructors, or in the signature. So how did the BigQuery encoder get invoked? This is because the stock BigQuery encoder is loaded into flytekit with an empty format. The flytekit StructuredDatasetTransformerEngine interprets that to mean that it is a generic encoder (or decoder) and can work across formats, if a more specific format is not found.

This flexibility also comes into play when it comes to column information. Type-checking is obviously one of the benefits of using a typed system like Flyte. For structured datasets however, type compatibility also depends on column and format information.

​​Some StructuredDatasetType B is compatible with another StructuredDatasetType A if and only if:

  • B.columns ⊆ A.columns; or A.columns is empty or B.columns is empty
  • A.format == B.format; or A.format is empty or B.format is empty

That is, to take advantage of column level type checking, users must specify column information on both ends.

With the context covered above, here is a quick demo on how one might build a custom decoder for pandas dataframes that leverages AWS’s S3 Select capability. Note the S3 Select capabilities are a little limited, this code is for demonstration purposes only and should not be used in production.

Copied to clipboard!
from flytekit.types.structured.structured_dataset import (
    PARQUET,
    StructuredDatasetDecoder,
    StructuredDatasetTransformerEngine,
)

class ExperimentalNaiveS3SelectDecoder(StructuredDatasetDecoder):
    def __init__(self):
        super().__init__(pd.DataFrame, "s3", PARQUET)

As mentioned earlier, flytekit selects a handler based on three attributes, the Python class, the storage engine, and the storage byte format. These three are reflected in the constructor. Note, there is some built-in convenience here we glossed over. If the middle argument, here s3, is not specified, then the transformer is assumed to work for all the storage backends that the stock flytekit data persistence layer handles, and it will be registered once per each of these.

Next comes the custom logic. Since this is a decoder, we’ll need to override the decode() function. The important bit here to note is the final argument to the method: current_task_metadata. This contains the type of the current task that we’re operating on. Assume for the moment that a prior task generated a dataframe with a hundred columns, and in the current task, we need only ten of them. Assuming this downstream task has been properly annotated, those ten columns will be in this argument. 

Copied to clipboard!
def decode(
        self,
        ctx: FlyteContext,
        flyte_value: literals.StructuredDataset,
        current_task_metadata: StructuredDatasetMetadata,
    ) -> pd.DataFrame:
        if current_task_metadata.structured_dataset_type and current_task_metadata.structured_dataset_type.columns:
            columns = [c.name for c in current_task_metadata.structured_dataset_type.columns]
            sql_cols = ",".join(columns)
        else:
            sql_cols = "*"
        # This is because the default encoder (ie the output of the
        # previous task), adds this file name.
        s3_file = flyte_value.uri + "/00000"
        sql = f"select {sql_cols} from s3object"

        bucket, prefix = S3Persistence._split_s3_path_to_bucket_and_key(s3_file)  # noqa
        s3 = boto3.client("s3")
        r = s3.select_object_content(
            Bucket=bucket,
            Key=prefix,
            ExpressionType="SQL",
            Expression=sql,
            InputSerialization={"Parquet": {}},
            OutputSerialization={"CSV": {}},
        )

        lines = []
        for event in r['Payload']:
            if 'Records' in event:
                records = event['Records']['Payload'].decode('utf-8')
                if not str(records) == "":
                    lines.append(records)
        lines = "\n".join(lines)
        df = pd.read_csv(StringIO(lines), header=None)
        print(f"Selected df {df}")
        return df

With the column information, we can pass the requested columns as sql to the boto3 s3 client’s select object content API. Again, this code is meant merely to motivate and illustrate what is possible with custom structured dataset handlers, rather than a model of production-ready code. One caveat to highlight for instance is the fact that the boto3 library doesn’t play well with the fsspec pip dependency we often recommend.

To get this custom decoder to be picked up, we’ll also need to register it with the StructuredDatasetTransformerEngine. Think of this class as any other transformer in the flytekit Type Engine, but is a registry in its own right of these encoders and decoders.

Copied to clipboard!
StructuredDatasetTransformerEngine.register(ExperimentalNaiveS3SelectDecoder(), override=True)

See more of the sample code on this branch. We hope you’ve enjoyed reading this blog. There are of course additional features, design elements, and information relevant for those of you who wish to write custom handlers for this type. For additional information, please see the user guide as well as check out some of the existing extensions. As always find us online on Slack and flyte.org.

Article