EDA and Feature Engineering in Jupyter Notebook and Modeling in a Flyte Task

Once you have a Union account, install union:

pip install union

Export the following environment variable to build and push images to your own container registry:

# replace with your registry name
export IMAGE_SPEC_REGISTRY="<your-container-registry>"

Then run the following commands to run the workflow:

git clone https://github.com/unionai/unionai-examples
cd unionai-examples
union run --remote tutorials/sentiment_classifier/sentiment_classifier.py main --model distilbert-base-uncased

The source code for this tutorial can be found here {octicon}mark-github.

In this example, we will implement a simple pipeline that takes hyperparameters, does EDA, feature engineering (step 1: EDA and feature engineering in notebook), and measures the Gradient Boosting model’s performance using mean absolute error (MAE) (step 2: Modeling in a Flyte Task).

First, let’s import the libraries we will use in this example.

import pathlib
from dataclasses import dataclass

import numpy as np
import pandas as pd
from dataclasses_json import dataclass_json
from flytekit import Resources, kwtypes, task, workflow
from flytekitplugins.papermill import NotebookTask
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.preprocessing import RobustScaler

We define a dataclass to store the hyperparameters of the Gradient Boosting Regressor.

@dataclass_json
@dataclass
class Hyperparameters(object):
    n_estimators: int = 150
    max_depth: int = 3
    max_features: str = "sqrt"
    min_samples_split: int = 4
    random_state: int = 2
    nfolds: int = 10

We define a NotebookTask to run the Jupyter notebook. This notebook returns dummified_data and dataset as the outputs.

dummified_data is used in this example, and dataset is used in the upcoming example.

nb = NotebookTask(
    name="eda-feature-eng-nb",
    notebook_path=str(pathlib.Path(__file__).parent.absolute() / "supermarket_regression_1.ipynb"),
    outputs=kwtypes(dummified_data=pd.DataFrame, dataset=str),
    requests=Resources(mem="500Mi"),
)

Next, we define a cross_validate function and a modeling task to compute the MAE score of the data against the Gradient Boosting Regressor.

def cross_validate(model, nfolds, feats, targets):
    score = -1 * (cross_val_score(model, feats, targets, cv=nfolds, scoring="neg_mean_absolute_error"))
    return np.mean(score)


@task
def modeling(
    dataset: pd.DataFrame,
    hyperparams: Hyperparameters,
) -> float:
    y_target = dataset["Product_Supermarket_Sales"].tolist()
    dataset.drop(["Product_Supermarket_Sales"], axis=1, inplace=True)

    X_train, X_test, y_train, _ = train_test_split(dataset, y_target, test_size=0.3)

    scaler = RobustScaler()

    scaler.fit(X_train)

    X_train = scaler.transform(X_train)
    X_test = scaler.transform(X_test)

    gb_model = GradientBoostingRegressor(
        n_estimators=hyperparams.n_estimators,
        max_depth=hyperparams.max_depth,
        max_features=hyperparams.max_features,
        min_samples_split=hyperparams.min_samples_split,
        random_state=hyperparams.random_state,
    )

    return cross_validate(gb_model, hyperparams.nfolds, X_train, y_train)

We define a workflow to run the notebook and the modeling task.

@workflow
def notebook_wf(hyperparams: Hyperparameters = Hyperparameters()) -> float:
    output = nb()
    mae_score = modeling(dataset=output.dummified_data, hyperparams=hyperparams)
    return mae_score

We can now run the notebook and the modeling task locally.

if __name__ == "__main__":
    print(notebook_wf())