EDA and Feature Engineering in Jupyter Notebook and Modeling in a Flyte Task
Once you have a Union account, install union
:
pip install union
Export the following environment variable to build and push images to your own container registry:
# replace with your registry name
export IMAGE_SPEC_REGISTRY="<your-container-registry>"
Then run the following commands to run the workflow:
git clone https://github.com/unionai/unionai-examples
cd unionai-examples
union run --remote tutorials/sentiment_classifier/sentiment_classifier.py main --model distilbert-base-uncased
The source code for this tutorial can be found here {octicon}mark-github
.
First, let’s import the libraries we will use in this example.
import pathlib
from dataclasses import dataclass
import numpy as np
import pandas as pd
from dataclasses_json import dataclass_json
from flytekit import Resources, kwtypes, task, workflow
from flytekitplugins.papermill import NotebookTask
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.preprocessing import RobustScaler
We define a dataclass
to store the hyperparameters of the Gradient Boosting Regressor.
@dataclass_json
@dataclass
class Hyperparameters(object):
n_estimators: int = 150
max_depth: int = 3
max_features: str = "sqrt"
min_samples_split: int = 4
random_state: int = 2
nfolds: int = 10
We define a NotebookTask
to run the Jupyter notebook.
This notebook returns dummified_data
and dataset
as the outputs.
dummified_data
is used in this example, and dataset
is used in the upcoming example.
nb = NotebookTask(
name="eda-feature-eng-nb",
notebook_path=str(pathlib.Path(__file__).parent.absolute() / "supermarket_regression_1.ipynb"),
outputs=kwtypes(dummified_data=pd.DataFrame, dataset=str),
requests=Resources(mem="500Mi"),
)
Next, we define a cross_validate
function and a modeling
task to compute the MAE score of the data against
the Gradient Boosting Regressor.
def cross_validate(model, nfolds, feats, targets):
score = -1 * (cross_val_score(model, feats, targets, cv=nfolds, scoring="neg_mean_absolute_error"))
return np.mean(score)
@task
def modeling(
dataset: pd.DataFrame,
hyperparams: Hyperparameters,
) -> float:
y_target = dataset["Product_Supermarket_Sales"].tolist()
dataset.drop(["Product_Supermarket_Sales"], axis=1, inplace=True)
X_train, X_test, y_train, _ = train_test_split(dataset, y_target, test_size=0.3)
scaler = RobustScaler()
scaler.fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)
gb_model = GradientBoostingRegressor(
n_estimators=hyperparams.n_estimators,
max_depth=hyperparams.max_depth,
max_features=hyperparams.max_features,
min_samples_split=hyperparams.min_samples_split,
random_state=hyperparams.random_state,
)
return cross_validate(gb_model, hyperparams.nfolds, X_train, y_train)
We define a workflow
to run the notebook and the modeling
task.
@workflow
def notebook_wf(hyperparams: Hyperparameters = Hyperparameters()) -> float:
output = nb()
mae_score = modeling(dataset=output.dummified_data, hyperparams=hyperparams)
return mae_score
We can now run the notebook and the modeling task locally.
if __name__ == "__main__":
print(notebook_wf())