Samhita Alla
Daniel Sola

Cut OpenAI Batch Pipeline Costs by Over 50% with Zero Container Overhead

OpenAI real-time client is ideal for real-time responses, especially for use cases and needs where price is not a factor, which can be significant depending on the number of tokens. Instant results are often unnecessary, especially for tasks like summarizing data or running model evaluations. Developers can significantly reduce operational costs without compromising functionality by aggregating these tasks into batches and processing them asynchronously. That’s where batch processing comes in with the new OpenAI Batch API.

However, implementing a batch processing system comes with its own set of challenges. Coordinating data ingestion, managing processing queues, and orchestrating result delivery requires a robust infrastructure. Union makes things simple by helping you set up an end-to-end automated pipeline. It fetches data regularly, sends it for batch processing when needed, and notifies you when it’s done. Plus, forget about container hassles for batch polling loops, all thanks to the OpenAI Batch agent.

Union simplifies the implementation and operations of this end-to-end automated pipeline while also providing significant cost-efficiency. Most users see improved efficiency based on the following breakdown:

  1. OpenAI real-time client: 100% baseline cost
  2. OpenAI Batch client: 50% of baseline
  3. Flyte OpenAI Batch agent: 50% of baseline plus zero container overhead for batch status retrieval

With Union, you can integrate pre- and post-processing of data into a single pipeline. It transforms potential infrastructure bloat into pure, bottom-line savings. The result is a lean, resource-optimized pipeline that maximizes your cost savings while simplifying your infrastructure and operations.

OpenAI Batch agent

The OpenAI Batch API endpoint allows users to submit requests for asynchronous batch processing. 

It provides results within 24 hours, and the model will be offered at a 50% discount. Batch API is useful when you don’t require immediate results, want to save costs, and get higher rate limits.

The Flyte OpenAI Batch agent is built on top of the Batch API. It automates the creation of a batch and real-time status polling. You can provide either a JSONL file or a JSON iterator (stream your JSONs), and the agent handles uploading to OpenAI, creating the batch, and downloading the output and error files. Furthermore, with the agent, zero costs are incurred during polling the batch status.

Union lets you integrate the agent into your workflow to automate the entire process associated with preparing the data, creating a batch, and sending the outputs to downstream tasks for further processing. Since Union is a perfect fit for both data and ML pipelines, the pre- and post-processing stages can be easily handled in a single, centralized pipeline.

Image moderation system

With the ever-increasing volume of user-generated content on the internet, content moderation has become a necessity. From social media platforms to e-commerce sites, ensuring that inappropriate or offensive content is filtered out is essential for ensuring a safe environment.

AI has significantly improved the content moderation process, making it more efficient and easier to implement.

Suppose you want to build an image moderation system that:

  1. Fetches images from a cloud blob storage every x hours,
  2. Triggers a downstream workflow to send requests to a GPT-4 model for image moderation, and
  3. Triggers another workflow to read the output and error files and send an email notification.

Union enables you to implement this seamlessly as follows:

  1. Define a launch plan with a cron schedule to fetch images every x hours.
  2. Use artifacts and reactive workflows to trigger downstream workflow.
  3. Send JSON requests to GPT-4 using the Flyte OpenAI Batch agent.
  4. Wait for the batch processing to complete.
  5. Send an email notification announcing the completion status.

Let’s take a look at how we can implement this in code. 

First, let’s look at one way to organize this simple project. 

Copied to clipboard!
image-moderation
├── requirements.txt
└── src
    ├── __init__.py
    ├── batch_wf.py
    ├── make_requests.py
    ├── fetch_images.py
    └── fetch_images_wf.py

This example uses a key Union design pattern called reactive workflow. Reactive workflows use artifacts, the versioned upstream workflow outputs, to trigger other downstream workflows (check out our blog post on reactive workflows here). Using reactive workflows, we can easily separate our upstream workflow in `fetch_images_wf.py` and our downstream workflow in `batch_wf.py` into different Python modules. 

Image collection workflow

We will need our image collection workflow to produce an artifact containing a set of images that require further processing. Artifacts are first class entities of workflow outputs, allowing us to easily attach metadata through partitions, consume artifacts as input in other workflows using queries, and track the lineage of workflow output as it is passed through different workflows. We want our image collection workflow to create an `Artifact` if new images are present in our blob store. The `Artifact` will let us trigger a downstream workflow to create requests for our OpenAI Batch agent.

To start, we must fetch the images in our s3 bucket. We can use an existing Boto agent which is part of the Sagemaker integration with Flyte. Using the list_objects_v2 method, we can get the names of the images and the times they were added to the bucket (a maximum of 1000 objects can be retrieved). This will help us filter out old images that we don’t need to process.

Copied to clipboard!
from flytekitplugins.awssagemaker_inference import BotoConfig, BotoTask

list_files_config = BotoConfig(
    service="s3",
    method="list_objects_v2",
    config={
        "Bucket": "{inputs.bucket}",
        "Prefix": "{inputs.directory}",
    },
    region="us-east-2",
)

list_images_task = BotoTask(
    name="list_images",
    task_config=list_files_config,
    inputs=kwtypes(bucket=str, directory=str),
)

Then, we structure a workflow like so, where `fetch_images` checks the result of the Boto Agent for new images, and a `conditional` is used to emit an `Artifact` we call `ImageDir` if there are new images in the bucket since the last time the workflow was run.

Copied to clipboard!
from flytekit.core.artifact import Artifact

ImageDir = Artifact(name="image_directory")

@task
def fetch_images(
    images_response: dict, kickoff_time: datetime
) -> Tuple[int, list[FlyteFile]]:
    # Define a cutoff time based on schedule to filter out older images.
    ...
    return n_images, images

@task
def emit_artifact(img_dir: list[FlyteFile]) -> Annotated[list[FlyteFile], ImageDir]:
    return ImageDir.create_from(img_dir)

...

@workflow
def fetch_images_wf(...):
    images_response = list_images_task(...)
    n_images, img_dir = fetch_images(images_response=images_response, ...)
    (
        conditional("check_images")
        .if_(n_images != 0)
        .then(emit_artifact(img_dir=img_dir))
        .else_()
        .then(dont_emit_artifact())
    )

Taking a closer look, we use `list[FlyteFile]` to pass around the images we want to process. 

To round off our upstream `fetch_images_wf` workflow, we create a `LaunchPlan` to execute our workflow at a desired schedule using `CronSchedule`.

Copied to clipboard!
from flytekit import LaunchPlan, CronSchedule

HOUR_CYCLE = 6

fetch_images_lp = LaunchPlan.get_or_create(
    ...
    schedule=CronSchedule(
        schedule=f"0 */{HOUR_CYCLE} * * *",
    ),
)

Batch request workflow

Our second workflow creates and sends requests to the OpenAI API. This is downstream of our image collection workflow and contains two steps; creating a JSON Lines request for the Flyte OpenAI Batch agent, and calling the agent. 

Copied to clipboard!
from flytekit.types.iterator import JSON

def encode_image_to_base64(...):
    ...
    return encoded_string

@task
def create_request(img_dir: list[FlyteFile]) -> Iterator[JSON]:
    for i, file in enumerate(img_dir):
        ...
        # Format a request dictionary called `completion_request` with the following prompt:
        # prompt: "Answer the following yes or no question with either 'Yes.' or 'No.' followed by a description of why. Does this image have explicit content?"
        batch_request = {
            "custom_id": f"request-{i}",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": completion_request,
        }
        yield batch_request

This task simply loops through the images we need to process and yields requests to send to the OpenAI Batch API.

Next, we define a trigger on the `ImageDir` artifact that automatically executes the workflow for OpenAI batch processing. The batch workflow accepts `jsonl_in` as a runtime parameter, which we provide by sending the JSON generator returned from the `create_request` task.

Copied to clipboard!
from flytekitplugins.openai import BatchResult, create_batch
from unionai.artifacts import OnArtifact

on_image_dir = OnArtifact(trigger_on=ImageDir)

file_batch = create_batch(
    name="image-moderation",
    openai_organization="unionai",
    secret=Secret(...),
)

@workflow
def batch_wf(...) -> BatchResult:
    json_generator = create_request(...)
    return file_batch(jsonl_in=json_generator)

batch_request_lp = LaunchPlan.get_or_create(
    name="batch_request_lp",
    workflow=batch_wf,
    trigger=on_image_dir,
    notifications=[
        Email(
            phases=[WorkflowExecutionPhase.SUCCEEDED],
            recipients_email=[...],
        )
    ],
)

It is also worth noting the use of `notification` in the `LaunchPlan`. These few lines allow us to send email or Slack messages upon the successful completion of the downstream workflow. This is very handy given the fact that the OpenAI Batch may take quite a while to return any completion results; we don’t want to constantly check the status of our request. 

Navigating to the execution link, we can see that the execution took 53 minutes to complete and returns S3 URIs to successful and failed batch jobs.

The OpenAI Batch agent adds JSON objects to a JSONL file, uploads it to OpenAI, runs the batch query, waits for completion, and then downloads the output and error files.

Here are some example responses returned by the OpenAI Batch API:

Copied to clipboard!
No. The image features a close-up view of a green leaf with water droplets, which does not contain any explicit content.

Yes. The image contains explicit content as it depicts drug use, showing a syringe and a spoon with substances likely associated with illegal drug consumption. The representation of this activity is explicit as it clearly conveys a scenario involving drug abuse, which could be graphic or disturbing to some viewers.

Leveraging the long-running, stateless, and locally testable Boto and OpenAI Batch agents, we could make requests to external resources in a simple yet cost effective way. These processes were decoupled and automated through launch plan schedules, artifacts, and reactive workflows, all while maintaining a clear record of data lineage. The end-to-end example is on GitHub

Build efficient batch inference pipelines

With the addition of the OpenAI Batch agent into Union, you can build end-to-end batch inference pipelines with pre- and post-processing included. This integration also helps you save costs in two key ways:

  1. Models are offered at a 50% discount.
  2. There are no additional costs incurred when polling the batch status using Union.

If this interests you, check out our documentation. The agent will soon be available on Union serverless! Contact the Union team if you're interested in implementing batch inference solutions.

Agents