Retries and timeouts
Retry types
Union.ai allows you to automatically retry failing tasks. This section explains the configuration and application of retries.
Errors causing task failure are categorized into two main types, influencing the retry logic differently:
-
SYSTEM
: These errors arise from infrastructure-related failures, such as hardware malfunctions or network issues. They are typically transient and can often be resolved with a retry. -
USER
: These errors are due to issues in the user-defined code, like a value error or a logic mistake, which usually require code modifications to resolve.
Configuring retries
Retries in Union.ai are configurable to address both USER
and SYSTEM
errors, allowing for tailored fault tolerance strategies:
USER
error can be handled by setting the retries
attribute in the task decorator to define how many times a task should retry.
This is straightforward and directly controlled in the task definition:
import random
from flytekit import task
@task(retries=3)
def compute_mean(data: List[float]) -> float:
if random() < 0.05:
raise FlyteRecoverableException("Something bad happened 🔥")
return sum(data) / len(data)
Retrying interruptible tasks
Tasks marked as interruptible can be preempted and retried without counting against the USER error budget. This is useful for tasks running on preemptible compute resources like spot instances.
Retrying map tasks
For map tasks, the interruptible behavior aligns with that of regular tasks. The retries field in the task annotation is not necessary for handling SYSTEM errors, as these are managed by the platform’s configuration. Alternatively, the USER budget is set by defining retries in the task decorator.
See Map tasks.
Timeouts
To protect against zombie tasks that hang due to system-level issues, you can supply the timeout argument to the task decorator to make sure that problematic tasks adhere to a maximum runtime.
In this example, we make sure that the task is terminated after it’s been running for more that one hour.
from datetime import timedelta
@task(timeout=timedelta(hours=1))
def compute_mean(data: List[float]) -> float:
return sum(data) / len(data)
Notice that the timeout argument takes a built-in Python timedelta
object.