{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "8711cbe4",
   "metadata": {},
   "source": [
    "# How to Trigger the Feast Workflow using FlyteRemote\n",
    "\n",
    "The goal of this notebook is to train a simple [Gaussian Naive Bayes model using sklearn](https://scikit-learn.org/stable/modules/generated/sklearn.naive_bayes.GaussianNB.html) on a modified [Horse-Colic dataset from UCI](https://archive.ics.uci.edu/ml/datasets/Horse+Colic).\n",
    "\n",
    "The model aims to classify if the lesion of the horse is surgical or not.\n",
    "\n",
    "Let's get started!"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "d5f891e8",
   "metadata": {},
   "source": [
    "Set the AWS environment variables before importing Flytekit."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "75ff01b3",
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "os.environ[\"FLYTE_AWS_ENDPOINT\"] = os.environ[\"FEAST_S3_ENDPOINT_URL\"] = \"http://localhost:30084/\"\n",
    "os.environ[\"FLYTE_AWS_ACCESS_KEY_ID\"] = os.environ[\"AWS_ACCESS_KEY_ID\"] = \"minio\"\n",
    "os.environ[\"FLYTE_AWS_SECRET_ACCESS_KEY\"] = os.environ[\"AWS_SECRET_ACCESS_KEY\"] = \"miniostorage\""
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "f79f28b4",
   "metadata": {},
   "source": [
    "## 01. Register the code\n",
    "\n",
    "The actual workflow code is auto-documented and rendered using sphinx [here](https://docs.flyte.org/projects/cookbook/en/latest/auto/case_studies/feature_engineering/feast_integration/index.html). We've used [Flytekit](https://docs.flyte.org/projects/flytekit/en/latest/) to express the pipeline in pure Python.\n",
    "\n",
    "You can use [FlyteConsole](https://github.com/flyteorg/flyteconsole) to launch, monitor, and introspect Flyte executions. However here, let's use [flytekit.remote](https://docs.flyte.org/projects/flytekit/en/latest/design/control_plane.html) to interact with the Flyte backend."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "a2330891",
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/Users/samhitaalla/.pyenv/versions/3.9.9/envs/flytesnacks/lib/python3.9/site-packages/tqdm/auto.py:22: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n",
      "  from .autonotebook import tqdm as notebook_tqdm\n"
     ]
    }
   ],
   "source": [
    "from flytekit.remote import FlyteRemote\n",
    "from flytekit.configuration import Config\n",
    "\n",
    "# The `for_sandbox` method instantiates a connection to the demo cluster.\n",
    "remote = FlyteRemote(\n",
    "    config=Config.for_sandbox(),\n",
    "    default_project=\"flytesnacks\",\n",
    "    default_domain=\"development\"\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "d3d32822",
   "metadata": {},
   "source": [
    "The ``register_script`` method can be used to register the workflow."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "id": "e5a60057",
   "metadata": {},
   "outputs": [],
   "source": [
    "from flytekit.configuration import ImageConfig\n",
    "\n",
    "from feast_workflow import feast_workflow\n",
    "\n",
    "wf = remote.register_script(\n",
    "    feast_workflow,\n",
    "    image_config=ImageConfig.from_images(\n",
    "        \"ghcr.io/flyteorg/flytecookbook:feast_integration-latest\"\n",
    "    ),\n",
    "    version=\"v2\",\n",
    "    source_path=\"../\",\n",
    "    module_name=\"feast_workflow\",\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "8fcea449",
   "metadata": {},
   "source": [
    "## 02: Launch an execution"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "64d6295a",
   "metadata": {},
   "source": [
    "FlyteRemote provides convenient methods to retrieve version of the pipeline from the remote server.\n",
    "\n",
    "**NOTE**: It is possible to get a specific version of the workflow and trigger a launch for that, but let's just get the latest."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "d28014f8",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'v1'"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "lp = remote.fetch_launch_plan(name=\"feast_integration.feast_workflow.feast_workflow\")\n",
    "lp.id.version"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "c71210a7",
   "metadata": {},
   "source": [
    "The ``execute`` method can be used to execute a Flyte entity — a launch plan in our case."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "c13770fc",
   "metadata": {},
   "outputs": [],
   "source": [
    "execution = remote.execute(\n",
    "    lp,\n",
    "    inputs={\"num_features_univariate\": 5},\n",
    "    wait=True\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "07bd9e37",
   "metadata": {},
   "source": [
    "## 03. Sync an execution\n",
    "\n",
    "You can sync an execution to retrieve the workflow's outputs. ``sync_nodes`` is set to True to retrieve the intermediary nodes' outputs as well.\n",
    "\n",
    "**NOTE**: It is possible to fetch an existing execution or simply retrieve an already commenced execution. Also, if you launch an execution with the same name, Flyte will respect that and not restart a new execution!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "a8bd9614",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Execution f218aba055ba34a3fb75 is in SUCCEEDED phase\n"
     ]
    }
   ],
   "source": [
    "from flytekit.models.core.execution import WorkflowExecutionPhase\n",
    "\n",
    "synced_execution = remote.sync(execution, sync_nodes=True)\n",
    "print(f\"Execution {synced_execution.id.name} is in {WorkflowExecutionPhase.enum_to_string(synced_execution.closure.phase)} phase\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "65e5b181",
   "metadata": {},
   "source": [
    "## 04. Retrieve the output\n",
    "\n",
    "Fetch the model and the model prediction."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "ab24b1c0",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "/var/folders/6r/9pdkgpkd5nx1t34ndh1f_3q80000gn/T/flyteaqx6tlyu/control_plane_metadata/local_flytekit/e1a690494fe33da04a4dca7737096234/0c81c76dc3a029267a96f275431b5bc5.npy"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "model = synced_execution.outputs[\"o0\"]\n",
    "prediction = synced_execution.outputs[\"o1\"]\n",
    "prediction"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "af8277d3",
   "metadata": {},
   "source": [
    "**NOTE**: The output model is available locally as a JobLibSerialized file, which can be downloaded and loaded."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "8a841e22",
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "/var/folders/6r/9pdkgpkd5nx1t34ndh1f_3q80000gn/T/flyteaqx6tlyu/control_plane_metadata/local_flytekit/91246ef2160dde99a7512ab3aa9aa2ce/model.joblib.dat"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "model"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "91b3bcbc",
   "metadata": {},
   "source": [
    "Fetch the ``repo_config``."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "a21feeb6",
   "metadata": {},
   "outputs": [],
   "source": [
    "repo_config = synced_execution.node_executions[\"n0\"].outputs[\"o0\"]"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "7a535b4d",
   "metadata": {},
   "source": [
    "## 05. Generate predictions\n",
    "\n",
    "Re-use the `predict` function from the workflow to generate predictions — Flytekit will automatically manage the IO for you!"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "dff58f63",
   "metadata": {},
   "source": [
    "### Load features from the online feature store"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "c7a2c3c4",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'total protein': [70.0],\n",
       " 'peripheral pulse': [3.0],\n",
       " 'nasogastric reflux PH': [4.718545454545455],\n",
       " 'surgical lesion': ['1'],\n",
       " 'rectal temperature': [38.17717842323652],\n",
       " 'nasogastric tube': ['1.751269035532995'],\n",
       " 'Hospital Number': ['533738'],\n",
       " 'packed cell volume': [43.0],\n",
       " 'outcome': ['1'],\n",
       " 'abdominal distension': [4.0]}"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import os\n",
    "\n",
    "from feast_workflow import predict, FEAST_FEATURES, retrieve_online\n",
    "\n",
    "inference_point = retrieve_online(\n",
    "    repo_config=repo_config,\n",
    "    online_store=synced_execution.node_executions[\"n4\"].outputs[\"o0\"],\n",
    "    data_point=533738,\n",
    ")\n",
    "inference_point"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "9a49e572",
   "metadata": {},
   "source": [
    "### Generate a prediction"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "e44c62e2",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "array(['2'], dtype='<U1')"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "predict(model_ser=model, features=inference_point)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3.9.9 64-bit ('flytesnacks')",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.9"
  },
  "vscode": {
   "interpreter": {
    "hash": "93d1c4f33f306e18e1c08a771c972fe86afbedaedb2338666e30a98a5179caac"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}