EDA and Feature Engineering in Jupyter Notebook and Modeling in a Flyte Task
Once you have a Union account, install union
:
pip install union
Export the following environment variable to build and push images to your own container registry:
# replace with your registry name
export IMAGE_SPEC_REGISTRY="<your-container-registry>"
Then run the following commands to run the workflow:
$ git clone https://github.com/unionai/unionai-examples
$ cd unionai-examples
$ union run --remote <path/to/file.py> <workflow_name> <params>
The source code for this tutorial can be found here.
First, let’s import the libraries we will use in this example.
import pathlib
from dataclasses import dataclass
import numpy as np
import pandas as pd
from dataclasses_json import dataclass_json
from flytekit import Resources, kwtypes, task, workflow
from flytekitplugins.papermill import NotebookTask
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.preprocessing import RobustScaler
We define a dataclass
to store the hyperparameters of the Gradient Boosting Regressor.
@dataclass_json
@dataclass
class Hyperparameters(object):
n_estimators: int = 150
max_depth: int = 3
max_features: str = "sqrt"
min_samples_split: int = 4
random_state: int = 2
nfolds: int = 10
We define a NotebookTask
to run the
Jupyter notebook.
This notebook returns dummified_data
and dataset
as the outputs.
dummified_data
is used in this example, and dataset
is used in the upcoming example.
nb = NotebookTask(
name="eda-feature-eng-nb",
notebook_path=str(pathlib.Path(__file__).parent.absolute() / "supermarket_regression_1.ipynb"),
outputs=kwtypes(dummified_data=pd.DataFrame, dataset=str),
requests=Resources(mem="500Mi"),
)
Next, we define a cross_validate
function and a modeling
task to compute the MAE score of the data against
the Gradient Boosting Regressor.
def cross_validate(model, nfolds, feats, targets):
score = -1 * (cross_val_score(model, feats, targets, cv=nfolds, scoring="neg_mean_absolute_error"))
return np.mean(score)
@task
def modeling(
dataset: pd.DataFrame,
hyperparams: Hyperparameters,
) -> float:
y_target = dataset["Product_Supermarket_Sales"].tolist()
dataset.drop(["Product_Supermarket_Sales"], axis=1, inplace=True)
X_train, X_test, y_train, _ = train_test_split(dataset, y_target, test_size=0.3)
scaler = RobustScaler()
scaler.fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)
gb_model = GradientBoostingRegressor(
n_estimators=hyperparams.n_estimators,
max_depth=hyperparams.max_depth,
max_features=hyperparams.max_features,
min_samples_split=hyperparams.min_samples_split,
random_state=hyperparams.random_state,
)
return cross_validate(gb_model, hyperparams.nfolds, X_train, y_train)
We define a workflow
to run the notebook and the modeling
task.
@workflow
def notebook_wf(hyperparams: Hyperparameters = Hyperparameters()) -> float:
output = nb()
mae_score = modeling(dataset=output.dummified_data, hyperparams=hyperparams)
return mae_score
We can now run the notebook and the modeling task locally.
if __name__ == "__main__":
print(notebook_wf())