{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"id": "8711cbe4",
"metadata": {},
"source": [
"# How to Trigger the Feast Workflow using FlyteRemote\n",
"\n",
"The goal of this notebook is to train a simple [Gaussian Naive Bayes model using sklearn](https://scikit-learn.org/stable/modules/generated/sklearn.naive_bayes.GaussianNB.html) on a modified [Horse-Colic dataset from UCI](https://archive.ics.uci.edu/ml/datasets/Horse+Colic).\n",
"\n",
"The model aims to classify if the lesion of the horse is surgical or not.\n",
"\n",
"Let's get started!"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "d5f891e8",
"metadata": {},
"source": [
"Set the AWS environment variables before importing Flytekit."
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "75ff01b3",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"os.environ[\"FLYTE_AWS_ENDPOINT\"] = os.environ[\"FEAST_S3_ENDPOINT_URL\"] = \"http://localhost:30084/\"\n",
"os.environ[\"FLYTE_AWS_ACCESS_KEY_ID\"] = os.environ[\"AWS_ACCESS_KEY_ID\"] = \"minio\"\n",
"os.environ[\"FLYTE_AWS_SECRET_ACCESS_KEY\"] = os.environ[\"AWS_SECRET_ACCESS_KEY\"] = \"miniostorage\""
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "f79f28b4",
"metadata": {},
"source": [
"## 01. Register the code\n",
"\n",
"The actual workflow code is auto-documented and rendered using sphinx [here](https://docs.flyte.org/projects/cookbook/en/latest/auto/case_studies/feature_engineering/feast_integration/index.html). We've used [Flytekit](https://docs.flyte.org/projects/flytekit/en/latest/) to express the pipeline in pure Python.\n",
"\n",
"You can use [FlyteConsole](https://github.com/flyteorg/flyteconsole) to launch, monitor, and introspect Flyte executions. However here, let's use [flytekit.remote](https://docs.flyte.org/projects/flytekit/en/latest/design/control_plane.html) to interact with the Flyte backend."
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "a2330891",
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/samhitaalla/.pyenv/versions/3.9.9/envs/flytesnacks/lib/python3.9/site-packages/tqdm/auto.py:22: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n",
" from .autonotebook import tqdm as notebook_tqdm\n"
]
}
],
"source": [
"from flytekit.remote import FlyteRemote\n",
"from flytekit.configuration import Config\n",
"\n",
"# The `for_sandbox` method instantiates a connection to the demo cluster.\n",
"remote = FlyteRemote(\n",
" config=Config.for_sandbox(),\n",
" default_project=\"flytesnacks\",\n",
" default_domain=\"development\"\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "d3d32822",
"metadata": {},
"source": [
"The ``register_script`` method can be used to register the workflow."
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "e5a60057",
"metadata": {},
"outputs": [],
"source": [
"from flytekit.configuration import ImageConfig\n",
"\n",
"from feast_workflow import feast_workflow\n",
"\n",
"wf = remote.register_script(\n",
" feast_workflow,\n",
" image_config=ImageConfig.from_images(\n",
" \"ghcr.io/flyteorg/flytecookbook:feast_integration-latest\"\n",
" ),\n",
" version=\"v2\",\n",
" source_path=\"../\",\n",
" module_name=\"feast_workflow\",\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "8fcea449",
"metadata": {},
"source": [
"## 02: Launch an execution"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "64d6295a",
"metadata": {},
"source": [
"FlyteRemote provides convenient methods to retrieve version of the pipeline from the remote server.\n",
"\n",
"**NOTE**: It is possible to get a specific version of the workflow and trigger a launch for that, but let's just get the latest."
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "d28014f8",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'v1'"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"lp = remote.fetch_launch_plan(name=\"feast_integration.feast_workflow.feast_workflow\")\n",
"lp.id.version"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "c71210a7",
"metadata": {},
"source": [
"The ``execute`` method can be used to execute a Flyte entity — a launch plan in our case."
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "c13770fc",
"metadata": {},
"outputs": [],
"source": [
"execution = remote.execute(\n",
" lp,\n",
" inputs={\"num_features_univariate\": 5},\n",
" wait=True\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "07bd9e37",
"metadata": {},
"source": [
"## 03. Sync an execution\n",
"\n",
"You can sync an execution to retrieve the workflow's outputs. ``sync_nodes`` is set to True to retrieve the intermediary nodes' outputs as well.\n",
"\n",
"**NOTE**: It is possible to fetch an existing execution or simply retrieve an already commenced execution. Also, if you launch an execution with the same name, Flyte will respect that and not restart a new execution!"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "a8bd9614",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution f218aba055ba34a3fb75 is in SUCCEEDED phase\n"
]
}
],
"source": [
"from flytekit.models.core.execution import WorkflowExecutionPhase\n",
"\n",
"synced_execution = remote.sync(execution, sync_nodes=True)\n",
"print(f\"Execution {synced_execution.id.name} is in {WorkflowExecutionPhase.enum_to_string(synced_execution.closure.phase)} phase\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "65e5b181",
"metadata": {},
"source": [
"## 04. Retrieve the output\n",
"\n",
"Fetch the model and the model prediction."
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "ab24b1c0",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"/var/folders/6r/9pdkgpkd5nx1t34ndh1f_3q80000gn/T/flyteaqx6tlyu/control_plane_metadata/local_flytekit/e1a690494fe33da04a4dca7737096234/0c81c76dc3a029267a96f275431b5bc5.npy"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"model = synced_execution.outputs[\"o0\"]\n",
"prediction = synced_execution.outputs[\"o1\"]\n",
"prediction"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "af8277d3",
"metadata": {},
"source": [
"**NOTE**: The output model is available locally as a JobLibSerialized file, which can be downloaded and loaded."
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "8a841e22",
"metadata": {
"scrolled": true
},
"outputs": [
{
"data": {
"text/plain": [
"/var/folders/6r/9pdkgpkd5nx1t34ndh1f_3q80000gn/T/flyteaqx6tlyu/control_plane_metadata/local_flytekit/91246ef2160dde99a7512ab3aa9aa2ce/model.joblib.dat"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"model"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "91b3bcbc",
"metadata": {},
"source": [
"Fetch the ``repo_config``."
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "a21feeb6",
"metadata": {},
"outputs": [],
"source": [
"repo_config = synced_execution.node_executions[\"n0\"].outputs[\"o0\"]"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "7a535b4d",
"metadata": {},
"source": [
"## 05. Generate predictions\n",
"\n",
"Re-use the `predict` function from the workflow to generate predictions — Flytekit will automatically manage the IO for you!"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "dff58f63",
"metadata": {},
"source": [
"### Load features from the online feature store"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "c7a2c3c4",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'total protein': [70.0],\n",
" 'peripheral pulse': [3.0],\n",
" 'nasogastric reflux PH': [4.718545454545455],\n",
" 'surgical lesion': ['1'],\n",
" 'rectal temperature': [38.17717842323652],\n",
" 'nasogastric tube': ['1.751269035532995'],\n",
" 'Hospital Number': ['533738'],\n",
" 'packed cell volume': [43.0],\n",
" 'outcome': ['1'],\n",
" 'abdominal distension': [4.0]}"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import os\n",
"\n",
"from feast_workflow import predict, FEAST_FEATURES, retrieve_online\n",
"\n",
"inference_point = retrieve_online(\n",
" repo_config=repo_config,\n",
" online_store=synced_execution.node_executions[\"n4\"].outputs[\"o0\"],\n",
" data_point=533738,\n",
")\n",
"inference_point"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "9a49e572",
"metadata": {},
"source": [
"### Generate a prediction"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "e44c62e2",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"array(['2'], dtype='<U1')"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"predict(model_ser=model, features=inference_point)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.9.9 64-bit ('flytesnacks')",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.9"
},
"vscode": {
"interpreter": {
"hash": "93d1c4f33f306e18e1c08a771c972fe86afbedaedb2338666e30a98a5179caac"
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}