BLASTX Example

Once you have a Union account, install union:

pip install union

Export the following environment variable to build and push images to your own container registry:

# replace with your registry name
export IMAGE_SPEC_REGISTRY="<your-container-registry>"

Then run the following commands to run the workflow:

git clone https://github.com/unionai/unionai-examples
cd unionai-examples
union run --remote tutorials/sentiment_classifier/sentiment_classifier.py main --model distilbert-base-uncased

The source code for this tutorial can be found here {octicon}mark-github.

This demonstration will utilize BLASTX to search for a nucleotide sequence within a local protein database.

Import the necessary libraries.

from pathlib import Path
from typing import NamedTuple

import matplotlib.pyplot as plt
import pandas as pd
import requests
from flytekit import conditional, kwtypes, task, workflow
from flytekit.extras.tasks.shell import OutputLocation, ShellTask
from flytekit.types.file import FlyteFile, PNGImageFile

Download the data from GitHub.

When running code on the demo cluster, make sure data is included in the Docker image. Uncomment copy data command in the Dockerfile.

def download_dataset():
    Path("kitasatospora").mkdir(exist_ok=True)
    r = requests.get("https://api.github.com/repos/flyteorg/flytesnacks/contents/blast/kitasatospora?ref=datasets")
    for each_file in r.json():
        download_url = each_file["download_url"]
        file_name = f"kitasatospora/{Path(download_url).name}"
        if not Path(file_name).exists():
            r_file = requests.get(each_file["download_url"])
            open(file_name, "wb").write(r_file.content)

A ShellTask allows you to run commands on the shell. In this example, we use a ShellTask to create and execute the BLASTX command. Start by specifying the location of the BLAST output file. Then, define variables that hold the paths to the input query sequence file, the database we are searching against, and the output file for BLAST. Finally, generate and run the BLASTX command. Both the standard output (stdout) and standard error (stderr) are captured and saved in the stdout variable. The {inputs} and {outputs} are placeholders for input and output values, respectively.

blastx_on_shell = ShellTask(
    name="blastx",
    debug=True,
    script="""
    mkdir -p {inputs.outdir}

    query={inputs.datadir}/{inputs.query}
    db={inputs.datadir}/{inputs.db}
    blastout={inputs.outdir}/{inputs.blast_output}

    blastx -out $blastout -outfmt 6 -query $query -db $db >> {outputs.stdout} 2>&1
    """,
    inputs=kwtypes(datadir=str, query=str, outdir=str, blast_output=str, db=str),
    output_locs=[
        OutputLocation(var="stdout", var_type=FlyteFile, location="stdout.txt"),
        OutputLocation(
            var="blastout",
            var_type=FlyteFile,
            location="{inputs.outdir}/{inputs.blast_output}",
        ),
    ],
)

The outfmt=6 option requests BLASTX to generate a tab-separated plain text file, which is convenient for automated processing. If the command runs successfully, there should be no standard output or error (stdout and stderr should be empty).

Next, define a task to load the BLASTX output. The task returns a pandas DataFrame and a plot. The file containing the BLASTX results is referred to as blastout.

BLASTXOutput = NamedTuple("blastx_output", result=pd.DataFrame, plot=PNGImageFile)


@task
def blastx_output(blastout: FlyteFile) -> BLASTXOutput:
    # read BLASTX output
    result = pd.read_csv(blastout, sep="\t", header=None)

    # define column headers
    headers = [
        "query",
        "subject",
        "pc_identity",
        "aln_length",
        "mismatches",
        "gaps_opened",
        "query_start",
        "query_end",
        "subject_start",
        "subject_end",
        "e_value",
        "bitscore",
    ]

    # assign headers
    result.columns = headers

    # create a scatterplot
    result.plot.scatter("pc_identity", "e_value")
    plt.title("E value vs %identity")
    plot = "plot.png"
    plt.savefig(plot)

    return BLASTXOutput(result=result.head(), plot=plot)

Verify that the BLASTX run was successful by checking if the standard output and error are empty.

@task
def is_batchx_success(stdout: FlyteFile) -> bool:
    if open(stdout).read():
        return False
    else:
        return True

Create a workflow that calls the previously defined tasks. A {ref}conditional <conditional> statement is used to check the success of the BLASTX command.

@workflow
def blast_wf(
    datadir: str = "kitasatospora",
    outdir: str = "output",
    query: str = "k_sp_CB01950_penicillin.fasta",
    db: str = "kitasatospora_proteins.faa",
    blast_output: str = "AMK19_00175_blastx_kitasatospora.tab",
) -> BLASTXOutput:
    stdout, blastout = blastx_on_shell(datadir=datadir, outdir=outdir, query=query, db=db, blast_output=blast_output)
    result = is_batchx_success(stdout=stdout)
    final_result, plot = (
        conditional("blastx_output")
        .if_(result.is_true())
        .then(blastx_output(blastout=blastout))
        .else_()
        .fail("BLASTX failed")
    )
    return BLASTXOutput(result=final_result, plot=plot)

Run the workflow locally.

if __name__ == "__main__":
    print("Downloading dataset...")
    download_dataset()
    print("Running BLASTX...")
    print(f"BLASTX result: {blast_wf()}")