Batch agent example

# %% [markdown]
# (openai_batch_agent_example_usage)=
#
# # Batching Requests for Asynchronous Processing
#
# This example demonstrates how to send a batch of API requests to GPT models for asynchronous processing.
#
# Every batch input should include `custom_id`, `method`, `url`, and `body`.
# You can provide either a `JSONLFile` or `Iterator[JSON]`, and the agent handles the file upload to OpenAI,
# creation of the batch, and downloading of the output and error files.
#
# ## Using `Iterator`
#
# Here's how you can provide an `Iterator` as an input to the agent:
# %%
import os
from typing import Iterator

import  union
from flytekit.types.file import JSONLFile
from flytekit.types.iterator import JSON
from flytekitplugins.openai import BatchResult, create_batch


def jsons():
    for x in [
        {
            "custom_id": "request-1",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-3.5-turbo",
                "messages": [
                    {"role": "system", "content": "You are a helpful assistant."},
                    {"role": "user", "content": "What is 2+2?"},
                ],
            },
        },
        {
            "custom_id": "request-2",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-3.5-turbo",
                "messages": [
                    {"role": "system", "content": "You are a helpful assistant."},
                    {"role": "user", "content": "Who won the world series in 2020?"},
                ],
            },
        },
    ]:
        yield x


iterator_batch = create_batch(
    name="gpt-3.5-turbo-iterator",
    openai_organization="your-org",
    secret=union.Secret(group="openai", key="api-key"),
)


@union.workflow
def json_iterator_wf(json_vals: Iterator[JSON] = jsons()) -> BatchResult:
    return iterator_batch(jsonl_in=json_vals)


# %% [markdown]
# The `create_batch` function returns an imperative workflow responsible for uploading the JSON data to OpenAI,
# creating a batch, polling the status of the batch to check for completion, and downloading the
# output and error files. It also accepts a `config` parameter, allowing you to provide `metadata`, `endpoint`,
# and `completion_window` values. These parameters default to their respective default values.
#
# `BatchResult` is a dataclass that contains the paths to the output file and the error file.
#
# ## Using `JSONLFile`
#
# The following code snippet demonstrates how to send a JSONL file to the `create_batch` function:
# %%
file_batch = create_batch(
    name="gpt-3.5-turbo-file",
    openai_organization="your-org",
    secret=Secret(group="openai", key="api-key"),
    is_json_iterator=False,
)


@union.workflow
def jsonl_wf(
    jsonl_file: JSONLFile = os.path.join(os.path.dirname(os.path.realpath(__file__)), "data.jsonl")
) -> BatchResult:
    return file_batch(jsonl_in=jsonl_file)


# %% [markdown]
# The iterator **streams JSON objects to a JSONL file**. If you have large batches of requests or have distinct JSON objects that
# you want to run predictions on, we recommend you use the iterator.
#
# You can find more info about the [Batch API in the OpenAI docs](https://help.openai.com/en/articles/9197833-batch-api-faq).