Running a Spark Task
Once you have a Union account, install union
:
pip install union
Export the following environment variable to build and push images to your own container registry:
# replace with your registry name
export IMAGE_SPEC_REGISTRY="<your-container-registry>"
Then run the following commands to run the workflow:
$ git clone https://github.com/unionai/unionai-examples
$ cd unionai-examples
$ union run --remote <path/to/file.py> <workflow_name> <params>
The source code for this tutorial can be found here.
import datetime
import random
from operator import add
import flytekit
from flytekit import ImageSpec, Resources, task, workflow
from flytekitplugins.spark import Spark
Create an ImageSpec
to automate the retrieval of a prebuilt Spark image.
custom_image = ImageSpec(python_version="3.9", registry="ghcr.io/flyteorg", packages=["flytekitplugins-spark"])
Replace ghcr.io/flyteorg
with a container registry you’ve access to publish to.
To upload the image to the local registry in the demo cluster, indicate the registry as localhost:30000
.
To create a Spark task, add flytekitplugins.spark.Spark
config to the Flyte task.
The spark_conf
parameter can encompass configuration choices commonly employed when setting up a Spark cluster.
Additionally, if necessary, you can provide hadoop_conf
as an input.
@task(
task_config=Spark(
# This configuration is applied to the Spark cluster
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.memory": "1000M",
"spark.executor.cores": "1",
"spark.executor.instances": "2",
"spark.driver.cores": "1",
"spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar",
}
),
limits=Resources(mem="2000M"),
container_image=custom_image,
)
def hello_spark(partitions: int) -> float:
print("Starting Spark with Partitions: {}".format(partitions))
n = 1 * partitions
sess = flytekit.current_context().spark_session
count = sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
pi_val = 4.0 * count / n
return pi_val
The hello_spark
task initiates a new Spark cluster.
When executed locally, it sets up a single-node client-only cluster.
However, when executed remotely, it dynamically scales the cluster size based on the specified Spark configuration.
For this particular example,
we define a function upon which the map-reduce operation is invoked within the Spark cluster.
def f(_):
x = random.random() * 2 - 1
y = random.random() * 2 - 1
return 1 if x**2 + y**2 <= 1 else 0
Additionally, we specify a standard Flyte task that won’t be executed on the Spark cluster.
@task(
cache_version="2",
container_image=custom_image,
)
def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int:
print("My printed value: {} @ {}".format(value_to_print, date_triggered))
return 1
Finally, define a workflow that connects your tasks in a sequence. Remember, Spark and non-Spark tasks can be chained together as long as their parameter specifications match.
@workflow
def my_spark(triggered_date: datetime.datetime = datetime.datetime(2020, 9, 11)) -> float:
"""
Using the workflow is still as any other workflow. As image is a property of the task, the workflow does not care
about how the image is configured.
"""
pi = hello_spark(partitions=1)
print_every_time(value_to_print=pi, date_triggered=triggered_date)
return pi
You can execute the workflow locally.
Certain aspects of Spark, such as links to {ref}Hive <Hive>
meta stores, may not work in the local execution,
but these limitations are inherent to using Spark and are not introduced by Flyte.
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(
f"Running my_spark(triggered_date=datetime.datetime.now()) {my_spark(triggered_date=datetime.datetime.now())}"
)