0.1.170

union

Directory

Classes

Class Description
ActorEnvironment ActorEnvironment class.
Artifact This is a wrapper around the Flytekit Artifact class.
Cache Cache configuration for a task.
CachePolicy Base class for protocol classes.
ContainerTask This is an intermediate class that represents Flyte Tasks that run a container at execution time.
Deck Deck enable users to get customizable and default visibility into their tasks.
FlyteDirectory
FlyteFile
ImageSpec This class is used to specify the docker image that will be used to run the task.
LaunchPlan Launch Plans are one of the core constructs of Flyte.
PodTemplate Custom PodTemplate specification for a Task.
Resources This class is used to specify both resource requests and resource limits.
Secret See :std:ref:cookbook:secrets for usage examples.
StructuredDataset This is the user facing StructuredDataset class.
UnionRemote Main entrypoint for programmatically accessing a Flyte remote backend.
VersionParameters Parameters used for version hash generation.

Methods

Method Description
actor_cache() Cache function between actor executions.
current_context() Use this method to get a handle of specific parameters available in a flyte task.
map() Use to map over tasks, actors, launch plans, reference tasks and launch plans, and remote tasks and.
map_task() Wrapper that creates a map task utilizing either the existing ArrayNodeMapTask.
task() This is the core decorator to use for any task type in flytekit.
workflow() This decorator declares a function to be a Flyte workflow.

Methods

actor_cache()

def actor_cache(
    f,
)

Cache function between actor executions.

Parameter Type
f

current_context()

def current_context()

Use this method to get a handle of specific parameters available in a flyte task.

Usage

.. code-block:: python

flytekit.current_context().logging.info(...)

Available params are documented in :py:class:flytekit.core.context_manager.ExecutionParams. There are some special params, that should be available

map()

def map(
    target: typing.Union[flytekit.core.launch_plan.LaunchPlan, flytekit.core.python_function_task.PythonFunctionTask, ForwardRef('FlyteLaunchPlan')],
    bound_inputs: typing.Optional[typing.Dict[str, typing.Any]],
    concurrency: typing.Optional[int],
    min_successes: typing.Optional[int],
    min_success_ratio: float,
    kwargs,
)

Use to map over tasks, actors, launch plans, reference tasks and launch plans, and remote tasks and launch plans.

Parameter Type
target typing.Union[flytekit.core.launch_plan.LaunchPlan, flytekit.core.python_function_task.PythonFunctionTask, ForwardRef('FlyteLaunchPlan')]
bound_inputs typing.Optional[typing.Dict[str, typing.Any]]
concurrency typing.Optional[int]
min_successes typing.Optional[int]
min_success_ratio float
kwargs **kwargs

map_task()

def map_task(
    target: typing.Union[flytekit.core.launch_plan.LaunchPlan, flytekit.core.python_function_task.PythonFunctionTask, ForwardRef('FlyteLaunchPlan')],
    concurrency: typing.Optional[int],
    min_successes: typing.Optional[int],
    min_success_ratio: float,
    kwargs,
)

Wrapper that creates a map task utilizing either the existing ArrayNodeMapTask or the drop in replacement ArrayNode implementation

Parameter Type
target typing.Union[flytekit.core.launch_plan.LaunchPlan, flytekit.core.python_function_task.PythonFunctionTask, ForwardRef('FlyteLaunchPlan')]
concurrency typing.Optional[int]
min_successes typing.Optional[int]
min_success_ratio float
kwargs **kwargs

task()

def task(
    _task_function: Optional[Callable[P, FuncOut]],
    task_config: Optional[T],
    cache: Union[bool, Cache],
    retries: int,
    interruptible: Optional[bool],
    deprecated: str,
    timeout: Union[datetime.timedelta, int],
    container_image: Optional[Union[str, ImageSpec]],
    environment: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    secret_requests: Optional[List[Secret]],
    execution_mode: PythonFunctionTask.ExecutionBehavior,
    node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
    task_resolver: Optional[TaskResolverMixin],
    docs: Optional[Documentation],
    disable_deck: Optional[bool],
    enable_deck: Optional[bool],
    deck_fields: Optional[Tuple[DeckField, ...]],
    pod_template: Optional['PodTemplate'],
    pod_template_name: Optional[str],
    accelerator: Optional[BaseAccelerator],
    pickle_untyped: bool,
    shared_memory: Optional[Union[L[True], str]],
    resources: Optional[Resources],
    kwargs,
) -> Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionTask[T]], PythonFunctionTask[T]]

This is the core decorator to use for any task type in flytekit.

Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties

  • Versioned (usually tied to the git revision SHA1)
  • Strong interfaces (specified inputs and outputs)
  • Declarative
  • Independently executable
  • Unit testable

For a simple python task,

.. code-block:: python

@task
def my_task(x: int, y: typing.Dict[str, str]) -> str:
    ...

For specific task types

.. code-block:: python

@task(task_config=Spark(), retries=3)
def my_task(x: int, y: typing.Dict[str, str]) -> str:
    ...

Please see some cookbook :std:ref:task examples <cookbook:tasks> for additional information.

Parameter Type
_task_function Optional[Callable[P, FuncOut]]
task_config Optional[T]
cache Union[bool, Cache]
retries int
interruptible Optional[bool]
deprecated str
timeout Union[datetime.timedelta, int]
container_image Optional[Union[str, ImageSpec]]
environment Optional[Dict[str, str]]
requests Optional[Resources]
limits Optional[Resources]
secret_requests Optional[List[Secret]]
execution_mode PythonFunctionTask.ExecutionBehavior
node_dependency_hints Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]]
task_resolver Optional[TaskResolverMixin]
docs Optional[Documentation]
disable_deck Optional[bool]
enable_deck Optional[bool]
deck_fields Optional[Tuple[DeckField, ...]]
pod_template Optional['PodTemplate']
pod_template_name Optional[str]
accelerator Optional[BaseAccelerator]
pickle_untyped bool
shared_memory Optional[Union[L[True], str]]
resources Optional[Resources]
kwargs **kwargs

workflow()

def workflow(
    _workflow_function: Optional[Callable[P, FuncOut]],
    failure_policy: Optional[WorkflowFailurePolicy],
    interruptible: bool,
    on_failure: Optional[Union[WorkflowBase, Task]],
    docs: Optional[Documentation],
    pickle_untyped: bool,
    default_options: Optional[Options],
) -> Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionWorkflow], PythonFunctionWorkflow]

This decorator declares a function to be a Flyte workflow. Workflows are declarative entities that construct a DAG of tasks using the data flow between tasks.

Unlike a task, the function body of a workflow is evaluated at serialization-time (aka compile-time). This is because while we can determine the entire structure of a task by looking at the function’s signature, workflows need to run through the function itself because the body of the function is what expresses the workflow structure. It’s also important to note that, local execution notwithstanding, it is not evaluated again when the workflow runs on Flyte. That is, workflows should not call non-Flyte entities since they are only run once (again, this is with respect to the platform, local runs notwithstanding).

Example:

.. literalinclude:: ../../../tests/flytekit/unit/core/test_workflows.py :pyobject: my_wf_example

Again, users should keep in mind that even though the body of the function looks like regular Python, it is actually not. When flytekit scans the workflow function, the objects being passed around between the tasks are not your typical Python values. So even though you may have a task t1() -> int, when a = t1() is called, a will not be an integer so if you try to range(a) you’ll get an error.

Please see the :ref:user guide <cookbook:workflow> for more usage examples.

Parameter Type
_workflow_function Optional[Callable[P, FuncOut]]
failure_policy Optional[WorkflowFailurePolicy]
interruptible bool
on_failure Optional[Union[WorkflowBase, Task]]
docs Optional[Documentation]
pickle_untyped bool
default_options Optional[Options]

union.ActorEnvironment

ActorEnvironment class.

class ActorEnvironment(
    name: str,
    container_image: Optional[Union[str, ImageSpec]],
    replica_count: int,
    ttl_seconds: Optional[int],
    environment: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    accelerator: Optional[BaseAccelerator],
    secret_requests: Optional[List[Secret]],
    pod_template: Optional[PodTemplate],
    interruptible: bool,
)
Parameter Type
name str
container_image Optional[Union[str, ImageSpec]]
replica_count int
ttl_seconds Optional[int]
environment Optional[Dict[str, str]]
requests Optional[Resources]
limits Optional[Resources]
accelerator Optional[BaseAccelerator]
secret_requests Optional[List[Secret]]
pod_template Optional[PodTemplate]
interruptible bool

Properties

Property Type Description
task
version

union.Artifact

This is a wrapper around the Flytekit Artifact class.

This Python class has two purposes - as a Python representation of a materialized Artifact, and as a way for users to specify that tasks/workflows create Artifacts and the manner in which they are created.

Use one as input to workflow (only workflow for now) df_artifact = Artifact.get(“flyte://a1”) remote.execute(wf, inputs={“a”: df_artifact})

Note that Python fields will be missing when retrieved from the service.

class Artifact(
    args,
    project: Optional[str],
    domain: Optional[str],
    name: Optional[str],
    version: Optional[str],
    time_partitioned: bool,
    time_partition: Optional[TimePartition],
    time_partition_granularity: Optional[Granularity],
    partition_keys: Optional[typing.List[str]],
    partitions: Optional[Union[Partitions, typing.Dict[str, str]]],
    python_val: Optional[typing.Any],
    python_type: Optional[typing.Type],
    literal: Optional[Literal],
    literal_type: Optional[LiteralType],
    short_description: Optional[str],
    source: Optional[artifacts_pb2.ArtifactSource],
    card: Optional[Card],
    kwargs,
)
Parameter Type
args *args
project Optional[str]
domain Optional[str]
name Optional[str]
version Optional[str]
time_partitioned bool
time_partition Optional[TimePartition]
time_partition_granularity Optional[Granularity]
partition_keys Optional[typing.List[str]]
partitions Optional[Union[Partitions, typing.Dict[str, str]]]
python_val Optional[typing.Any]
python_type Optional[typing.Type]
literal Optional[Literal]
literal_type Optional[LiteralType]
short_description Optional[str]
source Optional[artifacts_pb2.ArtifactSource]
card Optional[Card]
kwargs **kwargs

Methods

Method Description
create_from() This function allows users to declare partition values dynamically from the body of a task.
embed_as_query() This should only be called in the context of a Trigger.
from_flyte_idl() Converts the IDL representation to this object.
get() This function is supposed to mimic the get() behavior inputs/outputs as returned by FlyteRemote for an.
initialize() Use this for when you have a Python value you want to get an Artifact object out of.
metadata()
query()
set_resolver()
set_source()
to_create_request()
to_id_idl() Converts this object to the IDL representation.

create_from()

def create_from(
    o: O,
    card: Optional[SerializableToString],
    args: `*args`,
    kwargs,
) -> O

This function allows users to declare partition values dynamically from the body of a task. Note that you’ll still need to annotate your task function output with the relevant Artifact object. Below, one of the partition values is bound to an input, and the other is set at runtime. Note that since tasks are not run at compile time, flytekit cannot check that you’ve bound all the partition values. It’s up to you to ensure that you’ve done so.

Pricing = Artifact(name="pricing", partition_keys=["region"])
EstError = Artifact(name="estimation_error", partition_keys=["dataset"], time_partitioned=True)

@task
def t1() -> Annotated[pd.DataFrame, Pricing], Annotated[float, EstError]:
    df = get_pricing_results()
    dt = get_time()
    return Pricing.create_from(df, region="dubai"),             EstError.create_from(msq_error, dataset="train", time_partition=dt)

You can mix and match with the input syntax as well.

@task
def my_task() -> Annotated[pd.DataFrame, RideCountData(region=Inputs.region)]:
    ...
    return RideCountData.create_from(df, time_partition=datetime.datetime.now())
Parameter Type
o O
card Optional[SerializableToString]
args *args
kwargs **kwargs

embed_as_query()

def embed_as_query(
    partition: Optional[str],
    bind_to_time_partition: Optional[bool],
    expr: Optional[str],
    op: Optional[Op],
) -> art_id.ArtifactQuery

This should only be called in the context of a Trigger. The type of query this returns is different from the query() function. This type of query is used to reference the triggering artifact, rather than running a query.

Parameter Type
partition Optional[str]
bind_to_time_partition Optional[bool]
expr Optional[str]
op Optional[Op]

from_flyte_idl()

def from_flyte_idl(
    pb2: artifacts_pb2.Artifact,
) -> Artifact

Converts the IDL representation to this object.

Parameter Type
pb2 artifacts_pb2.Artifact

get()

def get(
    as_type: Optional[typing.Type],
) -> Optional[typing.Any]

This function is supposed to mimic the get() behavior inputs/outputs as returned by FlyteRemote for an execution, leveraging the LiteralsResolver (and underneath that the TypeEngine) to turn the literal into a Python value.

Parameter Type
as_type Optional[typing.Type]

initialize()

def initialize(
    python_val: typing.Any,
    python_type: typing.Type,
    name: Optional[str],
    literal_type: Optional[LiteralType],
    version: Optional[str],
    tags: Optional[typing.List[str]],
) -> Artifact

Use this for when you have a Python value you want to get an Artifact object out of.

This function readies an Artifact for creation, it doesn’t actually create it just yet since this is a network-less call. You will need to persist it with a FlyteRemote instance: remote.create_artifact(Artifact.initialize(…))

Artifact.initialize("/path/to/file", tags={“tag1”: “val1”}) Artifact.initialize("/path/to/parquet", type=pd.DataFrame, tags=[“0.1.0”])

What’s set here is everything that isn’t set by the server. What is set by the server?

  • name, version, if not set by user.
  • uri Set by remote
  • project, domain
Parameter Type
python_val typing.Any
python_type typing.Type
name Optional[str]
literal_type Optional[LiteralType]
version Optional[str]
tags Optional[typing.List[str]]

metadata()

def metadata()

query()

def query(
    project: Optional[str],
    domain: Optional[str],
    time_partition: Optional[Union[datetime.datetime, TimePartition, art_id.InputBindingData]],
    partitions: Optional[Union[typing.Dict[str, str], Partitions]],
    kwargs,
) -> ArtifactQuery
Parameter Type
project Optional[str]
domain Optional[str]
time_partition Optional[Union[datetime.datetime, TimePartition, art_id.InputBindingData]]
partitions Optional[Union[typing.Dict[str, str], Partitions]]
kwargs **kwargs

set_resolver()

def set_resolver(
    resolver: LiteralsResolver,
)
Parameter Type
resolver LiteralsResolver

set_source()

def set_source(
    source: artifacts_pb2.ArtifactSource,
)
Parameter Type
source artifacts_pb2.ArtifactSource

to_create_request()

def to_create_request(
    a: Artifact,
) -> artifacts_pb2.CreateArtifactRequest
Parameter Type
a Artifact

to_id_idl()

def to_id_idl()

Converts this object to the IDL representation. This is here instead of translator because it’s in the interface, a relatively simple proto object that’s exposed to the user.

Properties

Property Type Description
concrete_artifact_id
partitions
time_partition

union.Cache

Cache configuration for a task.

class Cache(
    version: typing.Optional[str],
    serialize: bool,
    ignored_inputs: typing.Union[typing.Tuple[str, ...], str],
    salt: str,
    policies: typing.Union[typing.List[flytekit.core.cache.CachePolicy], flytekit.core.cache.CachePolicy, NoneType],
)
Parameter Type
version typing.Optional[str]
serialize bool
ignored_inputs typing.Union[typing.Tuple[str, ...], str]
salt str
policies typing.Union[typing.List[flytekit.core.cache.CachePolicy], flytekit.core.cache.CachePolicy, NoneType]

Methods

Method Description
get_ignored_inputs()
get_version()

get_ignored_inputs()

def get_ignored_inputs()

get_version()

def get_version(
    params: flytekit.core.cache.VersionParameters,
) -> str
Parameter Type
params flytekit.core.cache.VersionParameters

union.CachePolicy

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...
class CachePolicy(
    args,
    kwargs,
)
Parameter Type
args *args
kwargs **kwargs

Methods

Method Description
get_version()

get_version()

def get_version(
    salt: str,
    params: flytekit.core.cache.VersionParameters,
) -> str
Parameter Type
salt str
params flytekit.core.cache.VersionParameters

union.ContainerTask

This is an intermediate class that represents Flyte Tasks that run a container at execution time. This is the vast majority of tasks - the typical @task decorated tasks for instance all run a container. An example of something that doesn’t run a container would be something like the Athena SQL task.

class ContainerTask(
    name: str,
    image: typing.Union[str, flytekit.image_spec.image_spec.ImageSpec],
    command: typing.List[str],
    inputs: typing.Optional[typing.OrderedDict[str, typing.Type]],
    metadata: typing.Optional[flytekit.core.base_task.TaskMetadata],
    arguments: typing.Optional[typing.List[str]],
    outputs: typing.Optional[typing.Dict[str, typing.Type]],
    requests: typing.Optional[flytekit.core.resources.Resources],
    limits: typing.Optional[flytekit.core.resources.Resources],
    input_data_dir: typing.Optional[str],
    output_data_dir: typing.Optional[str],
    metadata_format: <enum 'MetadataFormat'>,
    io_strategy: typing.Optional[flytekit.core.container_task.ContainerTask.IOStrategy],
    secret_requests: typing.Optional[typing.List[flytekit.models.security.Secret]],
    pod_template: typing.Optional[ForwardRef('PodTemplate')],
    pod_template_name: typing.Optional[str],
    local_logs: bool,
    resources: typing.Optional[flytekit.core.resources.Resources],
    kwargs,
)
Parameter Type
name str
image typing.Union[str, flytekit.image_spec.image_spec.ImageSpec]
command typing.List[str]
inputs typing.Optional[typing.OrderedDict[str, typing.Type]]
metadata typing.Optional[flytekit.core.base_task.TaskMetadata]
arguments typing.Optional[typing.List[str]]
outputs typing.Optional[typing.Dict[str, typing.Type]]
requests typing.Optional[flytekit.core.resources.Resources]
limits typing.Optional[flytekit.core.resources.Resources]
input_data_dir typing.Optional[str]
output_data_dir typing.Optional[str]
metadata_format <enum 'MetadataFormat'>
io_strategy typing.Optional[flytekit.core.container_task.ContainerTask.IOStrategy]
secret_requests typing.Optional[typing.List[flytekit.models.security.Secret]]
pod_template typing.Optional[ForwardRef('PodTemplate')]
pod_template_name typing.Optional[str]
local_logs bool
resources typing.Optional[flytekit.core.resources.Resources]
kwargs **kwargs

Methods

Method Description
compile() Generates a node that encapsulates this task in a workflow definition.
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute() This method translates Flyte’s Type system based input values and invokes the actual call to the executor.
execute() This method will be invoked to execute the task.
find_lhs()
get_config() Returns the task config as a serializable dictionary.
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte.
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary.
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte.
get_input_types() Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
get_type_for_input_var() Returns the python type for an input variable by name.
get_type_for_output_var() Returns the python type for the specified output variable by name.
local_execute() This function is used only in the local execution path and is responsible for calling dispatch execute.
local_execution_mode()
post_execute() Post execute is called after the execution has completed, with the user_params and can be used to clean-up,.
pre_execute() This is the method that will be invoked directly before executing the task method and before all the inputs.
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution.

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, NoneType]

Generates a node that encapsulates this task in a workflow definition.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

dispatch_execute()

def dispatch_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
) -> typing.Union[flytekit.models.literals.LiteralMap, flytekit.models.dynamic_job.DynamicJobSpec, typing.Coroutine]

This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.

  • VoidPromise is returned in the case when the task itself declares no outputs.
  • Literal Map is returned when the task returns either one more outputs in the declaration. Individual outputs may be none
  • DynamicJobSpec is returned when a dynamic workflow is executed
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

execute()

def execute(
    kwargs,
) -> flytekit.models.literals.LiteralMap

This method will be invoked to execute the task.

Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_config()

def get_config(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, str]]

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_container()

def get_container(
    settings: flytekit.configuration.SerializationSettings,
) -> flytekit.models.task.Container

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_custom()

def get_custom(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, typing.Any]]

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flyteidl.core.tasks_pb2.ExtendedResources]

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: flytekit.configuration.SerializationSettings,
) -> flytekit.models.task.K8sPod

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Sql]

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
) -> typing.Type[typing.Any]

Returns the python type for an input variable by name.

Parameter Type
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
) -> typing.Type[typing.Any]

Returns the python type for the specified output variable by name.

Parameter Type
k str
v typing.Any

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, typing.Coroutine, NoneType]

This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

post_execute()

def post_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
    rval: typing.Any,
) -> typing.Any

Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]
rval typing.Any

pre_execute()

def pre_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
) -> typing.Optional[flytekit.core.context_manager.ExecutionParameters]

This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called

This should return either the same context of the mutated context

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
) -> flytekit.models.literals.LiteralMap

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

Properties

Property Type Description
deck_fields
If not empty, this task will output deck html file for the specified decks
disable_deck
If true, this task will not output deck html file
docs
enable_deck
If true, this task will output deck html file
environment
Any environment variables that supplied during the execution of the task.
instantiated_in
interface
lhs
location
metadata
name
python_interface
Returns this task’s python interface.
resources
security_context
task_config
Returns the user-specified task config which is used for plugin-specific handling of the task.
task_type
task_type_version

union.Deck

Deck enable users to get customizable and default visibility into their tasks.

Deck contains a list of renderers (FrameRenderer, MarkdownRenderer) that can generate a html file. For example, FrameRenderer can render a DataFrame as an HTML table, MarkdownRenderer can convert Markdown string to HTML

Flyte context saves a list of deck objects, and we use renderers in those decks to render the data and create an HTML file when those tasks are executed

Each task has a least three decks (input, output, default). Input/output decks are used to render tasks’ input/output data, and the default deck is used to render line plots, scatter plots or Markdown text. In addition, users can create new decks to render their data with custom renderers.

.. code-block:: python

iris_df = px.data.iris()

@task()
def t1() -> str:
    md_text = '#Hello Flyte##Hello Flyte###Hello Flyte'
    m = MarkdownRenderer()
    s = BoxRenderer("sepal_length")
    deck = flytekit.Deck("demo", s.to_html(iris_df))
    deck.append(m.to_html(md_text))
    default_deck = flytekit.current_context().default_deck
    default_deck.append(m.to_html(md_text))
    return md_text


# Use Annotated to override default renderer
@task()
def t2() -> Annotated[pd.DataFrame, TopFrameRenderer(10)]:
    return iris_df
class Deck(
    name: str,
    html: typing.Optional[str],
    auto_add_to_deck: bool,
)
Parameter Type
name str
html typing.Optional[str]
auto_add_to_deck bool

Methods

Method Description
append()
publish()

append()

def append(
    html: str,
) -> Deck
Parameter Type
html str

publish()

def publish()

Properties

Property Type Description
html
name

union.FlyteDirectory

class FlyteDirectory(
    path: typing.Union[str, os.PathLike],
    downloader: typing.Optional[typing.Callable],
    remote_directory: typing.Optional[typing.Union[os.PathLike, str, typing.Literal[False]]],
)
Parameter Type
path typing.Union[str, os.PathLike]
downloader typing.Optional[typing.Callable]
remote_directory typing.Optional[typing.Union[os.PathLike, str, typing.Literal[False]]]

Methods

Method Description
crawl() Crawl returns a generator of all files prefixed by any sub-folders under the given “FlyteDirectory”.
deserialize_flyte_dir()
download()
extension()
from_dict()
from_json()
from_source() Create a new FlyteDirectory object with the remote source set to the input.
listdir() This function will list all files and folders in the given directory, but without downloading the contents.
new() Create a new FlyteDirectory object in current Flyte working directory.
new_dir() This will create a new folder under the current folder.
new_file() This will create a new file under the current folder.
new_remote() Create a new FlyteDirectory object using the currently configured default remote in the context (i.
schema()
serialize_flyte_dir()
to_dict()
to_json()

crawl()

def crawl(
    maxdepth: typing.Optional[int],
    topdown: bool,
    kwargs,
) -> Generator[Tuple[typing.Union[str, os.PathLike[Any]], typing.Dict[Any, Any]], None, None]

Crawl returns a generator of all files prefixed by any sub-folders under the given “FlyteDirectory”. if details=True is passed, then it will return a dictionary as specified by fsspec.

Example:

>>> list(fd.crawl())
[("/base", "file1"), ("/base", "dir1/file1"), ("/base", "dir2/file1"), ("/base", "dir1/dir/file1")]

>>> list(x.crawl(detail=True))
[('/tmp/test', {'my-dir/ab.py': {'name': '/tmp/test/my-dir/ab.py', 'size': 0, 'type': 'file',
 'created': 1677720780.2318847, 'islink': False, 'mode': 33188, 'uid': 501, 'gid': 0,
  'mtime': 1677720780.2317934, 'ino': 1694329, 'nlink': 1}})]
Parameter Type
maxdepth typing.Optional[int]
topdown bool
kwargs **kwargs

deserialize_flyte_dir()

def deserialize_flyte_dir(
    args,
    kwargs,
)
Parameter Type
args *args
kwargs **kwargs

download()

def download()

extension()

def extension()

from_dict()

def from_dict(
    kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
    infer_missing,
) -> ~A
Parameter Type
kvs typing.Union[dict, list, str, int, float, bool, NoneType]
infer_missing

from_json()

def from_json(
    s: typing.Union[str, bytes, bytearray],
    parse_float,
    parse_int,
    parse_constant,
    infer_missing,
    kw,
) -> ~A
Parameter Type
s typing.Union[str, bytes, bytearray]
parse_float
parse_int
parse_constant
infer_missing
kw

from_source()

def from_source(
    source: str | os.PathLike,
) -> FlyteDirectory

Create a new FlyteDirectory object with the remote source set to the input

Parameter Type
source str | os.PathLike

listdir()

def listdir(
    directory: FlyteDirectory,
) -> typing.List[typing.Union[FlyteDirectory, FlyteFile]]

This function will list all files and folders in the given directory, but without downloading the contents. In addition, it will return a list of FlyteFile and FlyteDirectory objects that have ability to lazily download the contents of the file/folder. For example:

.. code-block:: python

entity = FlyteDirectory.listdir(directory)
for e in entity:
    print("s3 object:", e.remote_source)
    # s3 object: s3://test-flytedir/file1.txt
    # s3 object: s3://test-flytedir/file2.txt
    # s3 object: s3://test-flytedir/sub_dir

open(entity[0], "r")  # This will download the file to the local disk.
open(entity[0], "r")  # flytekit will read data from the local disk if you open it again.
Parameter Type
directory FlyteDirectory

new()

def new(
    dirname: str | os.PathLike,
) -> FlyteDirectory

Create a new FlyteDirectory object in current Flyte working directory.

Parameter Type
dirname str | os.PathLike

new_dir()

def new_dir(
    name: typing.Optional[str],
) -> FlyteDirectory

This will create a new folder under the current folder. If given a name, it will use the name given, otherwise it’ll pick a random string. Collisions are not checked.

Parameter Type
name typing.Optional[str]

new_file()

def new_file(
    name: typing.Optional[str],
) -> FlyteFile

This will create a new file under the current folder. If given a name, it will use the name given, otherwise it’ll pick a random string. Collisions are not checked.

Parameter Type
name typing.Optional[str]

new_remote()

def new_remote(
    stem: typing.Optional[str],
    alt: typing.Optional[str],
) -> FlyteDirectory

Create a new FlyteDirectory object using the currently configured default remote in the context (i.e. the raw_output_prefix configured in the current FileAccessProvider object in the context). This is used if you explicitly have a folder somewhere that you want to create files under. If you want to write a whole folder, you can let your task return a FlyteDirectory object, and let flytekit handle the uploading.

Parameter Type
stem typing.Optional[str]
alt typing.Optional[str]

schema()

def schema(
    infer_missing: bool,
    only,
    exclude,
    many: bool,
    context,
    load_only,
    dump_only,
    partial: bool,
    unknown,
) -> SchemaType[A]
Parameter Type
infer_missing bool
only
exclude
many bool
context
load_only
dump_only
partial bool
unknown

serialize_flyte_dir()

def serialize_flyte_dir(
    args,
    kwargs,
)
Parameter Type
args *args
kwargs **kwargs

to_dict()

def to_dict(
    encode_json,
) -> typing.Dict[str, typing.Union[dict, list, str, int, float, bool, NoneType]]
Parameter Type
encode_json

to_json()

def to_json(
    skipkeys: bool,
    ensure_ascii: bool,
    check_circular: bool,
    allow_nan: bool,
    indent: typing.Union[int, str, NoneType],
    separators: typing.Tuple[str, str],
    default: typing.Callable,
    sort_keys: bool,
    kw,
) -> str
Parameter Type
skipkeys bool
ensure_ascii bool
check_circular bool
allow_nan bool
indent typing.Union[int, str, NoneType]
separators typing.Tuple[str, str]
default typing.Callable
sort_keys bool
kw

Properties

Property Type Description
downloaded
remote_directory
remote_source
If this is an input to a task, and the original path is s3://something, flytekit will download the
directory for the user. In case the user wants access to the original path, it will be here.
sep

union.FlyteFile

class FlyteFile(
    path: typing.Union[str, os.PathLike],
    downloader: typing.Callable,
    remote_path: typing.Optional[typing.Union[os.PathLike, str, bool]],
    metadata: typing.Optional[dict[str, str]],
)

FlyteFile’s init method.

Parameter Type
path typing.Union[str, os.PathLike]
downloader typing.Callable
remote_path typing.Optional[typing.Union[os.PathLike, str, bool]]
metadata typing.Optional[dict[str, str]]

Methods

Method Description
deserialize_flyte_file()
download()
extension()
from_dict()
from_json()
from_source() Create a new FlyteFile object with the remote source set to the input.
new() Create a new FlyteFile object in the current Flyte working directory.
new_remote_file() Create a new FlyteFile object with a remote path.
open() Returns a streaming File handle.
serialize_flyte_file()
to_dict()
to_json()

deserialize_flyte_file()

def deserialize_flyte_file(
    args,
    kwargs,
)
Parameter Type
args *args
kwargs **kwargs

download()

def download()

extension()

def extension()

from_dict()

def from_dict(
    d,
    dialect,
)
Parameter Type
d
dialect

from_json()

def from_json(
    data: typing.Union[str, bytes, bytearray],
    decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
    from_dict_kwargs: typing.Any,
) -> ~T
Parameter Type
data typing.Union[str, bytes, bytearray]
decoder collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]]
from_dict_kwargs typing.Any

from_source()

def from_source(
    source: str | os.PathLike,
) -> FlyteFile

Create a new FlyteFile object with the remote source set to the input

Parameter Type
source str | os.PathLike

new()

def new(
    filename: str | os.PathLike,
) -> FlyteFile

Create a new FlyteFile object in the current Flyte working directory

Parameter Type
filename str | os.PathLike

new_remote_file()

def new_remote_file(
    name: typing.Optional[str],
    alt: typing.Optional[str],
) -> FlyteFile

Create a new FlyteFile object with a remote path.

Parameter Type
name typing.Optional[str]
alt typing.Optional[str]

open()

def open(
    mode: str,
    cache_type: typing.Optional[str],
    cache_options: typing.Optional[typing.Dict[str, typing.Any]],
)

Returns a streaming File handle

.. code-block:: python

@task
def copy_file(ff: FlyteFile) -> FlyteFile:
    new_file = FlyteFile.new_remote_file()
    with ff.open("rb", cache_type="readahead") as r:
        with new_file.open("wb") as w:
            w.write(r.read())
    return new_file
Parameter Type
mode str
cache_type typing.Optional[str]
cache_options typing.Optional[typing.Dict[str, typing.Any]]

serialize_flyte_file()

def serialize_flyte_file(
    args,
    kwargs,
)
Parameter Type
args *args
kwargs **kwargs

to_dict()

def to_dict()

to_json()

def to_json(
    encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
    to_dict_kwargs: typing.Any,
) -> typing.Union[str, bytes, bytearray]
Parameter Type
encoder collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]]
to_dict_kwargs typing.Any

Properties

Property Type Description
downloaded
remote_path
remote_source
If this is an input to a task, and the original path is an s3 bucket, Flytekit downloads the
file for the user. In case the user wants access to the original path, it will be here.

union.ImageSpec

This class is used to specify the docker image that will be used to run the task.

class ImageSpec(
    name: str,
    python_version: str,
    builder: typing.Optional[str],
    source_root: typing.Optional[str],
    env: typing.Optional[typing.Dict[str, str]],
    registry: typing.Optional[str],
    packages: typing.Optional[typing.List[str]],
    conda_packages: typing.Optional[typing.List[str]],
    conda_channels: typing.Optional[typing.List[str]],
    requirements: typing.Optional[str],
    apt_packages: typing.Optional[typing.List[str]],
    cuda: typing.Optional[str],
    cudnn: typing.Optional[str],
    base_image: typing.Union[str, ForwardRef('ImageSpec'), NoneType],
    platform: str,
    pip_index: typing.Optional[str],
    pip_extra_index_url: typing.Optional[typing.List[str]],
    pip_secret_mounts: typing.Optional[typing.List[typing.Tuple[str, str]]],
    pip_extra_args: typing.Optional[str],
    registry_config: typing.Optional[str],
    entrypoint: typing.Optional[typing.List[str]],
    commands: typing.Optional[typing.List[str]],
    tag_format: typing.Optional[str],
    source_copy_mode: typing.Optional[flytekit.constants.CopyFileDetection],
    copy: typing.Optional[typing.List[str]],
    python_exec: typing.Optional[str],
)
Parameter Type
name str
python_version str
builder typing.Optional[str]
source_root typing.Optional[str]
env typing.Optional[typing.Dict[str, str]]
registry typing.Optional[str]
packages typing.Optional[typing.List[str]]
conda_packages typing.Optional[typing.List[str]]
conda_channels typing.Optional[typing.List[str]]
requirements typing.Optional[str]
apt_packages typing.Optional[typing.List[str]]
cuda typing.Optional[str]
cudnn typing.Optional[str]
base_image typing.Union[str, ForwardRef('ImageSpec'), NoneType]
platform str
pip_index typing.Optional[str]
pip_extra_index_url typing.Optional[typing.List[str]]
pip_secret_mounts typing.Optional[typing.List[typing.Tuple[str, str]]]
pip_extra_args typing.Optional[str]
registry_config typing.Optional[str]
entrypoint typing.Optional[typing.List[str]]
commands typing.Optional[typing.List[str]]
tag_format typing.Optional[str]
source_copy_mode typing.Optional[flytekit.constants.CopyFileDetection]
copy typing.Optional[typing.List[str]]
python_exec typing.Optional[str]

Methods

Method Description
exist() Check if the image exists in the registry.
force_push() Builder that returns a new image spec with force push enabled.
from_env() Create ImageSpec with the environment’s Python version and packages pinned to the ones in the environment.
image_name() Full image name with tag.
is_container() Check if the current container image in the pod is built from current image spec.
with_apt_packages() Builder that returns a new image spec with an additional list of apt packages that will be executed during the building process.
with_commands() Builder that returns a new image spec with an additional list of commands that will be executed during the building process.
with_copy() Builder that returns a new image spec with the source files copied to the destination directory.
with_packages() Builder that returns a new image speck with additional python packages that will be installed during the building process.

exist()

def exist()

Check if the image exists in the registry. Return True if the image exists in the registry, False otherwise. Return None if failed to check if the image exists due to the permission issue or other reasons.

force_push()

def force_push()

Builder that returns a new image spec with force push enabled.

from_env()

def from_env(
    pinned_packages: typing.Optional[typing.List[str]],
    kwargs,
) -> ImageSpec

Create ImageSpec with the environment’s Python version and packages pinned to the ones in the environment.

Parameter Type
pinned_packages typing.Optional[typing.List[str]]
kwargs **kwargs

image_name()

def image_name()

Full image name with tag.

is_container()

def is_container()

Check if the current container image in the pod is built from current image spec. :return: True if the current container image in the pod is built from current image spec, False otherwise.

with_apt_packages()

def with_apt_packages(
    apt_packages: typing.Union[str, typing.List[str]],
) -> ImageSpec

Builder that returns a new image spec with an additional list of apt packages that will be executed during the building process.

Parameter Type
apt_packages typing.Union[str, typing.List[str]]

with_commands()

def with_commands(
    commands: typing.Union[str, typing.List[str]],
) -> ImageSpec

Builder that returns a new image spec with an additional list of commands that will be executed during the building process.

Parameter Type
commands typing.Union[str, typing.List[str]]

with_copy()

def with_copy(
    src: typing.Union[str, typing.List[str]],
) -> ImageSpec

Builder that returns a new image spec with the source files copied to the destination directory.

Parameter Type
src typing.Union[str, typing.List[str]]

with_packages()

def with_packages(
    packages: typing.Union[str, typing.List[str]],
) -> ImageSpec

Builder that returns a new image speck with additional python packages that will be installed during the building process.

Parameter Type
packages typing.Union[str, typing.List[str]]

Properties

Property Type Description
tag
Calculate a hash from the image spec. The hash will be the tag of the image.
We will also read the content of the requirement file and the source root to calculate the hash.
Therefore, it will generate different hash if new dependencies are added or the source code is changed.
Keep in mind the fields source_root and copy may be changed by update_image_spec_copy_handling, so when
you call this property in relation to that function matter will change the output.

union.LaunchPlan

Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the :std:ref:core concepts <flyte:divedeep-launchplans> if you are unfamiliar with them.

Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow

.. code-block:: python

@workflow
def wf(a: int, c: str) -> str:
    ...

Create the default launch plan with

.. code-block:: python

LaunchPlan.get_or_create(workflow=my_wf)

If you specify additional parameters, you’ll also have to give the launch plan a unique name. Default and fixed inputs can be expressed as Python native values like so:

.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # fixed_and_default_start :end-before: # fixed_and_default_end :language: python :dedent: 4

Additionally, a launch plan can be configured to run on a schedule and emit notifications.

Please see the relevant Schedule and Notification objects as well.

To configure the remaining parameters, you’ll need to import the relevant model objects as well.

.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # schedule_start :end-before: # schedule_end :language: python :dedent: 4

.. code-block:: python

from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfig

Then use as follows

.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # auth_role_start :end-before: # auth_role_end :language: python :dedent: 4

class LaunchPlan(
    name: str,
    workflow: _annotated_workflow.WorkflowBase,
    parameters: _interface_models.ParameterMap,
    fixed_inputs: _literal_models.LiteralMap,
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
)
Parameter Type
name str
workflow _annotated_workflow.WorkflowBase
parameters _interface_models.ParameterMap
fixed_inputs _literal_models.LiteralMap
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

Methods

Method Description
clone_with()
construct_node_metadata()
create()
get_default_launch_plan() Users should probably call the get_or_create function defined below instead.
get_or_create() This function offers a friendlier interface for creating launch plans.

clone_with()

def clone_with(
    name: str,
    parameters: Optional[_interface_models.ParameterMap],
    fixed_inputs: Optional[_literal_models.LiteralMap],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
) -> LaunchPlan
Parameter Type
name str
parameters Optional[_interface_models.ParameterMap]
fixed_inputs Optional[_literal_models.LiteralMap]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

construct_node_metadata()

def construct_node_metadata()

create()

def create(
    name: str,
    workflow: _annotated_workflow.WorkflowBase,
    default_inputs: Optional[Dict[str, Any]],
    fixed_inputs: Optional[Dict[str, Any]],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    auth_role: Optional[_common_models.AuthRole],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
) -> LaunchPlan
Parameter Type
name str
workflow _annotated_workflow.WorkflowBase
default_inputs Optional[Dict[str, Any]]
fixed_inputs Optional[Dict[str, Any]]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
auth_role Optional[_common_models.AuthRole]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

get_default_launch_plan()

def get_default_launch_plan(
    ctx: FlyteContext,
    workflow: _annotated_workflow.WorkflowBase,
) -> LaunchPlan

Users should probably call the get_or_create function defined below instead. A default launch plan is the one that will just pick up whatever default values are defined in the workflow function signature (if any) and use the default auth information supplied during serialization, with no notifications or schedules.

Parameter Type
ctx FlyteContext
workflow _annotated_workflow.WorkflowBase

get_or_create()

def get_or_create(
    workflow: _annotated_workflow.WorkflowBase,
    name: Optional[str],
    default_inputs: Optional[Dict[str, Any]],
    fixed_inputs: Optional[Dict[str, Any]],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    auth_role: Optional[_common_models.AuthRole],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
) -> LaunchPlan

This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it will be used. If creating the default launch plan, none of the other arguments may be specified.

The resulting launch plan is also cached and if called again with the same name, the cached version is returned

Parameter Type
workflow _annotated_workflow.WorkflowBase
name Optional[str]
default_inputs Optional[Dict[str, Any]]
fixed_inputs Optional[Dict[str, Any]]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
auth_role Optional[_common_models.AuthRole]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

Properties

Property Type Description
annotations
fixed_inputs
interface
labels
max_parallelism
name
notifications
overwrite_cache
parameters
python_interface
raw_output_data_config
saved_inputs
schedule
security_context
should_auto_activate
trigger
workflow

union.PodTemplate

Custom PodTemplate specification for a Task.

class PodTemplate(
    pod_spec: typing.Optional[ForwardRef('V1PodSpec')],
    primary_container_name: str,
    labels: typing.Optional[typing.Dict[str, str]],
    annotations: typing.Optional[typing.Dict[str, str]],
)
Parameter Type
pod_spec typing.Optional[ForwardRef('V1PodSpec')]
primary_container_name str
labels typing.Optional[typing.Dict[str, str]]
annotations typing.Optional[typing.Dict[str, str]]

union.Resources

This class is used to specify both resource requests and resource limits.

.. code-block:: python

Resources(cpu="1", mem="2048")  # This is 1 CPU and 2 KB of memory
Resources(cpu="100m", mem="2Gi")  # This is 1/10th of a CPU and 2 gigabytes of memory
Resources(cpu=0.5, mem=1024) # This is 500m CPU and 1 KB of memory

# For Kubernetes-based tasks, pods use ephemeral local storage for scratch space, caching, and for logs.
# This allocates 1Gi of such local storage.
Resources(ephemeral_storage="1Gi")

When used together with @task(resources=), you a specific the request and limits with one object. When the value is set to a tuple or list, the first value is the request and the second value is the limit. If the value is a single value, then both the requests and limit is set to that value. For example, the Resource(cpu=("1", "2"), mem=1024) will set the cpu request to 1, cpu limit to 2, mem limit and request to 1024.

.. note::

Persistent storage is not currently supported on the Flyte backend.

Please see the :std:ref:User Guide <cookbook:customizing task resources> for detailed examples. Also refer to the K8s conventions. <https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes>__

class Resources(
    cpu: typing.Union[str, int, float, list, tuple, NoneType],
    mem: typing.Union[str, int, list, tuple, NoneType],
    gpu: typing.Union[str, int, list, tuple, NoneType],
    ephemeral_storage: typing.Union[str, int, NoneType],
)
Parameter Type
cpu typing.Union[str, int, float, list, tuple, NoneType]
mem typing.Union[str, int, list, tuple, NoneType]
gpu typing.Union[str, int, list, tuple, NoneType]
ephemeral_storage typing.Union[str, int, NoneType]

Methods

Method Description
from_dict()
from_json()
to_dict()
to_json()

from_dict()

def from_dict(
    d,
    dialect,
)
Parameter Type
d
dialect

from_json()

def from_json(
    data: typing.Union[str, bytes, bytearray],
    decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
    from_dict_kwargs: typing.Any,
) -> ~T
Parameter Type
data typing.Union[str, bytes, bytearray]
decoder collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]]
from_dict_kwargs typing.Any

to_dict()

def to_dict()

to_json()

def to_json(
    encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
    to_dict_kwargs: typing.Any,
) -> typing.Union[str, bytes, bytearray]
Parameter Type
encoder collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]]
to_dict_kwargs typing.Any

union.Secret

See :std:ref:cookbook:secrets for usage examples.

class Secret(
    group: typing.Optional[str],
    key: typing.Optional[str],
    group_version: typing.Optional[str],
    mount_requirement: <enum 'MountType'>,
    env_var: typing.Optional[str],
)
Parameter Type
group typing.Optional[str]
key typing.Optional[str]
group_version typing.Optional[str]
mount_requirement <enum 'MountType'>
env_var typing.Optional[str]

Methods

Method Description
from_flyte_idl()
serialize_to_string()
short_string() :rtype: Text.
to_flyte_idl()
verbose_string() :rtype: Text.

from_flyte_idl()

def from_flyte_idl(
    pb2_object: flyteidl.core.security_pb2.Secret,
) -> Secret
Parameter Type
pb2_object flyteidl.core.security_pb2.Secret

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

:rtype: Text

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

:rtype: Text

Properties

Property Type Description
is_empty

union.StructuredDataset

This is the user facing StructuredDataset class. Please don’t confuse it with the literals.StructuredDataset class (that is just a model, a Python class representation of the protobuf).

class StructuredDataset(
    dataframe: typing.Optional[typing.Any],
    uri: typing.Optional[str],
    metadata: typing.Optional[literals.StructuredDatasetMetadata],
    kwargs,
)
Parameter Type
dataframe typing.Optional[typing.Any]
uri typing.Optional[str]
metadata typing.Optional[literals.StructuredDatasetMetadata]
kwargs **kwargs

Methods

Method Description
all()
column_names()
columns()
deserialize_structured_dataset()
from_dict()
from_json()
iter()
open()
serialize_structured_dataset()
set_literal() A public wrapper method to set the StructuredDataset Literal.
to_dict()
to_json()

all()

def all()

column_names()

def column_names()

columns()

def columns()

deserialize_structured_dataset()

def deserialize_structured_dataset(
    args,
    kwargs,
)
Parameter Type
args *args
kwargs **kwargs

from_dict()

def from_dict(
    d,
    dialect,
)
Parameter Type
d
dialect

from_json()

def from_json(
    data: typing.Union[str, bytes, bytearray],
    decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
    from_dict_kwargs: typing.Any,
) -> ~T
Parameter Type
data typing.Union[str, bytes, bytearray]
decoder collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]]
from_dict_kwargs typing.Any

iter()

def iter()

open()

def open(
    dataframe_type: Type[DF],
)
Parameter Type
dataframe_type Type[DF]

serialize_structured_dataset()

def serialize_structured_dataset(
    args,
    kwargs,
)
Parameter Type
args *args
kwargs **kwargs

set_literal()

def set_literal(
    ctx: FlyteContext,
    expected: LiteralType,
)

A public wrapper method to set the StructuredDataset Literal.

This method provides external access to the internal _set_literal method.

Parameter Type
ctx FlyteContext
expected LiteralType

to_dict()

def to_dict()

to_json()

def to_json(
    encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
    to_dict_kwargs: typing.Any,
) -> typing.Union[str, bytes, bytearray]
Parameter Type
encoder collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]]
to_dict_kwargs typing.Any

Properties

Property Type Description
dataframe
literal
metadata

union.UnionRemote

Main entrypoint for programmatically accessing a Flyte remote backend.

The term ‘remote’ is synonymous with ‘backend’ or ‘deployment’ and refers to a hosted instance of the Flyte platform, which comes with a Flyte Admin server on some known URI.

class UnionRemote(
    config: typing.Optional[Union[Config, str]],
    default_project: typing.Optional[str],
    default_domain: typing.Optional[str],
    data_upload_location: str,
    interactive_mode_enabled: typing.Optional[bool],
    kwargs,
)

Initialize a FlyteRemote object.

:type kwargs: All arguments that can be passed to create the SynchronousFlyteClient. These are usually grpc parameters, if you want to customize credentials, ssl handling etc.

Parameter Type
config typing.Optional[Union[Config, str]]
default_project typing.Optional[str]
default_domain typing.Optional[str]
data_upload_location str
interactive_mode_enabled typing.Optional[bool]
kwargs **kwargs

Methods

Method Description
activate_launchplan() Given a launchplan, activate it, all previous versions are deactivated.
approve() .
async_channel()
auto()
create_artifact() Create an artifact in FlyteAdmin.
deploy_app() Deploy an application.
download() Download the data to the specified location.
execute() Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity.
execute_local_launch_plan() Execute a locally defined LaunchPlan.
execute_local_task() Execute a @task-decorated function or TaskTemplate task.
execute_local_workflow() Execute an @workflow decorated function.
execute_reference_launch_plan() Execute a ReferenceLaunchPlan.
execute_reference_task() Execute a ReferenceTask.
execute_reference_workflow() Execute a ReferenceWorkflow.
execute_remote_task_lp() Execute a FlyteTask, or FlyteLaunchplan.
execute_remote_wf() Execute a FlyteWorkflow.
fast_package() Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location.
fast_register_workflow() Use this method to register a workflow with zip mode.
fetch_active_launchplan() Returns the active version of the launch plan if it exists or returns None.
fetch_execution() Fetch a workflow execution entity from flyte admin.
fetch_launch_plan() Fetch a launchplan entity from flyte admin.
fetch_task() Fetch a task entity from flyte admin.
fetch_task_lazy() Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily.
fetch_workflow() Fetch a workflow entity from flyte admin.
fetch_workflow_lazy() Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily.
find_launch_plan()
find_launch_plan_for_node()
for_endpoint()
for_sandbox()
from_api_key() Call this if you want to directly instantiate a UnionRemote from an API key.
generate_console_http_domain() This should generate the domain where console is hosted.
generate_console_url() Generate a UnionAI console URL for the given Flyte remote endpoint.
get() General function that works with flyte tiny urls.
get_artifact() Get the specified artifact.
get_domains() Lists registered domains from flyte admin.
get_execution_metrics() Get the metrics for a given execution.
get_extra_headers_for_protocol()
launch_backfill() Creates and launches a backfill workflow for the given launchplan.
list_projects() Lists registered projects from flyte admin.
list_signals() .
list_tasks_by_version()
raw_register() Raw register method, can be used to register control plane entities.
recent_executions()
register_launch_plan() Register a given launchplan, possibly applying overrides from the provided options.
register_script() Use this method to register a workflow via script mode.
register_task() Register a qualified task (PythonTask) with Remote.
register_workflow() Use this method to register a workflow.
reject() .
remote_context() Context manager with remote-specific configuration.
search_artifacts()
set_input() .
set_signal() .
stop_app() Stop an application.
stream_execution_events() Stream execution events from the given tenant.
sync() This function was previously a singledispatchmethod.
sync_execution() Sync a FlyteWorkflowExecution object with its corresponding remote state.
sync_node_execution() Get data backing a node execution.
sync_task_execution() Sync a FlyteTaskExecution object with its corresponding remote state.
terminate() Terminate a workflow execution.
upload_file() Function will use remote’s client to hash and then upload the file using Admin’s data proxy service.
wait() Wait for an execution to finish.

activate_launchplan()

def activate_launchplan(
    ident: Identifier,
)

Given a launchplan, activate it, all previous versions are deactivated.

Parameter Type
ident Identifier

approve()

def approve(
    signal_id: str,
    execution_name: str,
    project: str,
    domain: str,
)
Parameter Type
signal_id str
execution_name str
project str
domain str

async_channel()

def async_channel()

auto()

def auto(
    config_file: typing.Union[str, ConfigFile],
    default_project: typing.Optional[str],
    default_domain: typing.Optional[str],
    data_upload_location: str,
    interactive_mode_enabled: bool,
    kwargs,
) -> 'FlyteRemote'
Parameter Type
config_file typing.Union[str, ConfigFile]
default_project typing.Optional[str]
default_domain typing.Optional[str]
data_upload_location str
interactive_mode_enabled bool
kwargs **kwargs

create_artifact()

def create_artifact(
    artifact: Artifact,
) -> n: The artifact as persisted in the service.

Create an artifact in FlyteAdmin.

Parameter Type
artifact Artifact

deploy_app()

def deploy_app(
    app: App,
    project: Optional[str],
    domain: Optional[str],
) -> n: The App IDL for the deployed application.

Deploy an application.

Parameter Type
app App
project Optional[str]
domain Optional[str]

download()

def download(
    data: typing.Union[LiteralsResolver, Literal, LiteralMap],
    download_to: str,
    recursive: bool,
)

Download the data to the specified location. If the data is a LiteralsResolver, LiteralMap and if recursive is specified, then all file like objects will be recursively downloaded (e.g. FlyteFile/Dir (blob), StructuredDataset etc).

Note: That it will use your sessions credentials to access the remote location. For sandbox, this should be automatically configured, assuming you are running sandbox locally. For other environments, you will need to configure your credentials appropriately.

Parameter Type
data typing.Union[LiteralsResolver, Literal, LiteralMap]
download_to str
recursive bool

execute()

def execute(
    entity: typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity],
    inputs: typing.Dict[str, typing.Any],
    project: str,
    domain: str,
    name: str,
    version: str,
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    image_config: typing.Optional[ImageConfig],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
    serialization_settings: typing.Optional[SerializationSettings],
) -> FlyteWorkflowExecution

Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity.

This method supports:

  • Flyte{Task, Workflow, LaunchPlan} remote module objects.
  • @task-decorated functions and TaskTemplate tasks.
  • @workflow-decorated functions.
  • LaunchPlan objects.

For local entities, this code will attempt to find the entity first, and if missing, will compile and register the object.

Not all arguments are relevant in all circumstances. For example, there’s no reason to use the serialization settings for entities that have already been registered on Admin.

Parameter Type
entity typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity]
inputs typing.Dict[str, typing.Any]
project str
domain str
name str
version str
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
image_config typing.Optional[ImageConfig]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]
serialization_settings typing.Optional[SerializationSettings]

execute_local_launch_plan()

def execute_local_launch_plan(
    entity: LaunchPlan,
    inputs: typing.Dict[str, typing.Any],
    version: str,
    project: typing.Optional[str],
    domain: typing.Optional[str],
    name: typing.Optional[str],
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
    serialization_settings: typing.Optional[SerializationSettings],
) -> n: FlyteWorkflowExecution object

Execute a locally defined LaunchPlan.

Parameter Type
entity LaunchPlan
inputs typing.Dict[str, typing.Any]
version str
project typing.Optional[str]
domain typing.Optional[str]
name typing.Optional[str]
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]
serialization_settings typing.Optional[SerializationSettings]

execute_local_task()

def execute_local_task(
    entity: PythonTask,
    inputs: typing.Dict[str, typing.Any],
    project: str,
    domain: str,
    name: str,
    version: str,
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    image_config: typing.Optional[ImageConfig],
    wait: bool,
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
    options: typing.Optional[Options],
    serialization_settings: typing.Optional[SerializationSettings],
) -> n: FlyteWorkflowExecution object.

Execute a @task-decorated function or TaskTemplate task.

Parameter Type
entity PythonTask
inputs typing.Dict[str, typing.Any]
project str
domain str
name str
version str
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
image_config typing.Optional[ImageConfig]
wait bool
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]
options typing.Optional[Options]
serialization_settings typing.Optional[SerializationSettings]

execute_local_workflow()

def execute_local_workflow(
    entity: WorkflowBase,
    inputs: typing.Dict[str, typing.Any],
    project: str,
    domain: str,
    name: str,
    version: str,
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    image_config: typing.Optional[ImageConfig],
    options: typing.Optional[Options],
    wait: bool,
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
    serialization_settings: typing.Optional[SerializationSettings],
) -> n: FlyteWorkflowExecution object

Execute an @workflow decorated function.

Parameter Type
entity WorkflowBase
inputs typing.Dict[str, typing.Any]
project str
domain str
name str
version str
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
image_config typing.Optional[ImageConfig]
options typing.Optional[Options]
wait bool
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]
serialization_settings typing.Optional[SerializationSettings]

execute_reference_launch_plan()

def execute_reference_launch_plan(
    entity: ReferenceLaunchPlan,
    inputs: typing.Dict[str, typing.Any],
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecution

Execute a ReferenceLaunchPlan.

Parameter Type
entity ReferenceLaunchPlan
inputs typing.Dict[str, typing.Any]
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]

execute_reference_task()

def execute_reference_task(
    entity: ReferenceTask,
    inputs: typing.Dict[str, typing.Any],
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecution

Execute a ReferenceTask.

Parameter Type
entity ReferenceTask
inputs typing.Dict[str, typing.Any]
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]

execute_reference_workflow()

def execute_reference_workflow(
    entity: ReferenceWorkflow,
    inputs: typing.Dict[str, typing.Any],
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecution

Execute a ReferenceWorkflow.

Parameter Type
entity ReferenceWorkflow
inputs typing.Dict[str, typing.Any]
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]

execute_remote_task_lp()

def execute_remote_task_lp(
    entity: typing.Union[FlyteTask, FlyteLaunchPlan],
    inputs: typing.Dict[str, typing.Any],
    project: str,
    domain: str,
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecution

Execute a FlyteTask, or FlyteLaunchplan.

NOTE: the name and version arguments are currently not used and only there consistency in the function signature

Parameter Type
entity typing.Union[FlyteTask, FlyteLaunchPlan]
inputs typing.Dict[str, typing.Any]
project str
domain str
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]

execute_remote_wf()

def execute_remote_wf(
    entity: FlyteWorkflow,
    inputs: typing.Dict[str, typing.Any],
    project: str,
    domain: str,
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecution

Execute a FlyteWorkflow.

NOTE: the name and version arguments are currently not used and only there consistency in the function signature

Parameter Type
entity FlyteWorkflow
inputs typing.Dict[str, typing.Any]
project str
domain str
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]

fast_package()

def fast_package(
    root: os.PathLike,
    deref_symlinks: bool,
    output: str,
    options: typing.Optional[FastPackageOptions],
) -> n: md5_bytes, url

Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location

Parameter Type
root os.PathLike
deref_symlinks bool
output str
options typing.Optional[FastPackageOptions]

fast_register_workflow()

def fast_register_workflow(
    entity: WorkflowBase,
    serialization_settings: typing.Optional[SerializationSettings],
    version: typing.Optional[str],
    default_launch_plan: typing.Optional[bool],
    options: typing.Optional[Options],
    fast_package_options: typing.Optional[FastPackageOptions],
) -> n:

Use this method to register a workflow with zip mode.

Parameter Type
entity WorkflowBase
serialization_settings typing.Optional[SerializationSettings]
version typing.Optional[str]
default_launch_plan typing.Optional[bool]
options typing.Optional[Options]
fast_package_options typing.Optional[FastPackageOptions]

fetch_active_launchplan()

def fetch_active_launchplan(
    project: str,
    domain: str,
    name: str,
) -> typing.Optional[FlyteLaunchPlan]

Returns the active version of the launch plan if it exists or returns None

Parameter Type
project str
domain str
name str

fetch_execution()

def fetch_execution(
    project: str,
    domain: str,
    name: str,
) -> FlyteWorkflowExecution

Fetch a workflow execution entity from flyte admin.

Parameter Type
project str
domain str
name str

fetch_launch_plan()

def fetch_launch_plan(
    project: str,
    domain: str,
    name: str,
    version: str,
) -> FlyteLaunchPlan

Fetch a launchplan entity from flyte admin.

Parameter Type
project str
domain str
name str
version str

fetch_task()

def fetch_task(
    project: str,
    domain: str,
    name: str,
    version: str,
) -> FlyteTask

Fetch a task entity from flyte admin.

Parameter Type
project str
domain str
name str
version str

fetch_task_lazy()

def fetch_task_lazy(
    project: str,
    domain: str,
    name: str,
    version: str,
) -> LazyEntity

Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily.

Parameter Type
project str
domain str
name str
version str

fetch_workflow()

def fetch_workflow(
    project: str,
    domain: str,
    name: str,
    version: str,
) -> FlyteWorkflow

Fetch a workflow entity from flyte admin.

Parameter Type
project str
domain str
name str
version str

fetch_workflow_lazy()

def fetch_workflow_lazy(
    project: str,
    domain: str,
    name: str,
    version: str,
) -> LazyEntity[FlyteWorkflow]

Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily.

Parameter Type
project str
domain str
name str
version str

find_launch_plan()

def find_launch_plan(
    lp_ref: id_models,
    node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
)
Parameter Type
lp_ref id_models
node_launch_plans Dict[id_models, launch_plan_models.LaunchPlanSpec]

find_launch_plan_for_node()

def find_launch_plan_for_node(
    node: Node,
    node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
)
Parameter Type
node Node
node_launch_plans Dict[id_models, launch_plan_models.LaunchPlanSpec]

for_endpoint()

def for_endpoint(
    endpoint: str,
    insecure: bool,
    data_config: typing.Optional[DataConfig],
    config_file: typing.Union[str, ConfigFile],
    default_project: typing.Optional[str],
    default_domain: typing.Optional[str],
    data_upload_location: str,
    interactive_mode_enabled: bool,
    kwargs,
) -> 'FlyteRemote'
Parameter Type
endpoint str
insecure bool
data_config typing.Optional[DataConfig]
config_file typing.Union[str, ConfigFile]
default_project typing.Optional[str]
default_domain typing.Optional[str]
data_upload_location str
interactive_mode_enabled bool
kwargs **kwargs

for_sandbox()

def for_sandbox(
    default_project: typing.Optional[str],
    default_domain: typing.Optional[str],
    data_upload_location: str,
    interactive_mode_enabled: bool,
    kwargs,
) -> 'FlyteRemote'
Parameter Type
default_project typing.Optional[str]
default_domain typing.Optional[str]
data_upload_location str
interactive_mode_enabled bool
kwargs **kwargs

from_api_key()

def from_api_key(
    api_key: str,
    default_project: typing.Optional[str],
    default_domain: typing.Optional[str],
    data_upload_location: str,
    kwargs,
) -> 'UnionRemote'

Call this if you want to directly instantiate a UnionRemote from an API key

Parameter Type
api_key str
default_project typing.Optional[str]
default_domain typing.Optional[str]
data_upload_location str
kwargs **kwargs

generate_console_http_domain()

def generate_console_http_domain()

This should generate the domain where console is hosted.

:return:

generate_console_url()

def generate_console_url(
    entity: typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, FlyteLaunchPlan, Artifact],
)

Generate a UnionAI console URL for the given Flyte remote endpoint. It will also handle Union AI specific entities like Artifacts.

This will automatically determine if this is an execution or an entity and change the type automatically.

Parameter Type
entity typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, FlyteLaunchPlan, Artifact]

get()

def get(
    uri: typing.Optional[str],
) -> typing.Optional[typing.Union[LiteralsResolver, Literal, bytes]]

General function that works with flyte tiny urls. This can return outputs (in the form of LiteralsResolver, or individual Literals for singular requests), or HTML if passed a deck link, or bytes containing HTML, if ipython is not available locally.

Parameter Type
uri typing.Optional[str]

get_artifact()

def get_artifact(
    uri: typing.Optional[str],
    artifact_key: typing.Optional[art_id.ArtifactKey],
    artifact_id: typing.Optional[art_id.ArtifactID],
    query: typing.Optional[typing.Union[art_id.ArtifactQuery, ArtifactQuery]],
    get_details: bool,
) -> n: The artifact as persisted in the service.

Get the specified artifact.

Parameter Type
uri typing.Optional[str]
artifact_key typing.Optional[art_id.ArtifactKey]
artifact_id typing.Optional[art_id.ArtifactID]
query typing.Optional[typing.Union[art_id.ArtifactQuery, ArtifactQuery]]
get_details bool

get_domains()

def get_domains()

Lists registered domains from flyte admin.

:returns: typing.List[flytekit.models.domain.Domain]

get_execution_metrics()

def get_execution_metrics(
    id: WorkflowExecutionIdentifier,
    depth: int,
) -> FlyteExecutionSpan

Get the metrics for a given execution.

Parameter Type
id WorkflowExecutionIdentifier
depth int

get_extra_headers_for_protocol()

def get_extra_headers_for_protocol(
    native_url,
)
Parameter Type
native_url

launch_backfill()

def launch_backfill(
    project: str,
    domain: str,
    from_date: datetime,
    to_date: datetime,
    launchplan: str,
    launchplan_version: str,
    execution_name: str,
    version: str,
    dry_run: bool,
    execute: bool,
    parallel: bool,
    failure_policy: typing.Optional[WorkflowFailurePolicy],
    overwrite_cache: typing.Optional[bool],
) -> n: In case of dry-run, return WorkflowBase, else if no_execute return FlyteWorkflow else in the default

Creates and launches a backfill workflow for the given launchplan. If launchplan version is not specified, then the latest launchplan is retrieved. The from_date is exclusive and end_date is inclusive and backfill run for all instances in between. :: -> (start_date - exclusive, end_date inclusive)

If dry_run is specified, the workflow is created and returned. If execute==False is specified then the workflow is created and registered. In the last case, the workflow is created, registered and executed.

The parallel flag can be used to generate a workflow where all launchplans can be run in parallel. Default is that execute backfill is run sequentially

Parameter Type
project str
domain str
from_date datetime
to_date datetime
launchplan str
launchplan_version str
execution_name str
version str
dry_run bool
execute bool
parallel bool
failure_policy typing.Optional[WorkflowFailurePolicy]
overwrite_cache typing.Optional[bool]

list_projects()

def list_projects(
    limit: typing.Optional[int],
    filters: typing.Optional[typing.List[filter_models.Filter]],
    sort_by: typing.Optional[admin_common_models.Sort],
) -> typing.List[Project]

Lists registered projects from flyte admin.

Parameter Type
limit typing.Optional[int]
filters typing.Optional[typing.List[filter_models.Filter]]
sort_by typing.Optional[admin_common_models.Sort]

list_signals()

def list_signals(
    execution_name: str,
    project: typing.Optional[str],
    domain: typing.Optional[str],
    limit: int,
    filters: typing.Optional[typing.List[filter_models.Filter]],
) -> typing.List[Signal]
Parameter Type
execution_name str
project typing.Optional[str]
domain typing.Optional[str]
limit int
filters typing.Optional[typing.List[filter_models.Filter]]

list_tasks_by_version()

def list_tasks_by_version(
    version: str,
    project: typing.Optional[str],
    domain: typing.Optional[str],
    limit: typing.Optional[int],
) -> typing.List[FlyteTask]
Parameter Type
version str
project typing.Optional[str]
domain typing.Optional[str]
limit typing.Optional[int]

raw_register()

def raw_register(
    cp_entity: FlyteControlPlaneEntity,
    settings: SerializationSettings,
    version: str,
    create_default_launchplan: bool,
    options: Options,
    og_entity: FlyteLocalEntity,
) -> n: Identifier of the created entity

Raw register method, can be used to register control plane entities. Usually if you have a Flyte Entity like a WorkflowBase, Task, LaunchPlan then use other methods. This should be used only if you have already serialized entities

Parameter Type
cp_entity FlyteControlPlaneEntity
settings SerializationSettings
version str
create_default_launchplan bool
options Options
og_entity FlyteLocalEntity

recent_executions()

def recent_executions(
    project: typing.Optional[str],
    domain: typing.Optional[str],
    limit: typing.Optional[int],
    filters: typing.Optional[typing.List[filter_models.Filter]],
) -> typing.List[FlyteWorkflowExecution]
Parameter Type
project typing.Optional[str]
domain typing.Optional[str]
limit typing.Optional[int]
filters typing.Optional[typing.List[filter_models.Filter]]

register_launch_plan()

def register_launch_plan(
    entity: LaunchPlan,
    version: typing.Optional[str],
    project: typing.Optional[str],
    domain: typing.Optional[str],
    options: typing.Optional[Options],
    serialization_settings: typing.Optional[SerializationSettings],
) -> FlyteLaunchPlan

Register a given launchplan, possibly applying overrides from the provided options. If the underlying workflow is not already registered, it, along with any underlying entities, will also be registered. If the underlying workflow does exist (with the given project/domain/version), then only the launchplan will be registered.

Parameter Type
entity LaunchPlan
version typing.Optional[str]
project typing.Optional[str]
domain typing.Optional[str]
options typing.Optional[Options]
serialization_settings typing.Optional[SerializationSettings]

register_script()

def register_script(
    entity: typing.Union[WorkflowBase, PythonTask, LaunchPlan],
    image_config: typing.Optional[ImageConfig],
    version: typing.Optional[str],
    project: typing.Optional[str],
    domain: typing.Optional[str],
    destination_dir: str,
    copy_all: bool,
    default_launch_plan: bool,
    options: typing.Optional[Options],
    source_path: typing.Optional[str],
    module_name: typing.Optional[str],
    envs: typing.Optional[typing.Dict[str, str]],
    fast_package_options: typing.Optional[FastPackageOptions],
) -> n:

Use this method to register a workflow via script mode.

Parameter Type
entity typing.Union[WorkflowBase, PythonTask, LaunchPlan]
image_config typing.Optional[ImageConfig]
version typing.Optional[str]
project typing.Optional[str]
domain typing.Optional[str]
destination_dir str
copy_all bool
default_launch_plan bool
options typing.Optional[Options]
source_path typing.Optional[str]
module_name typing.Optional[str]
envs typing.Optional[typing.Dict[str, str]]
fast_package_options typing.Optional[FastPackageOptions]

register_task()

def register_task(
    entity: PythonTask,
    serialization_settings: typing.Optional[SerializationSettings],
    version: typing.Optional[str],
) -> n:

Register a qualified task (PythonTask) with Remote For any conflicting parameters method arguments are regarded as overrides

Parameter Type
entity PythonTask
serialization_settings typing.Optional[SerializationSettings]
version typing.Optional[str]

register_workflow()

def register_workflow(
    entity: WorkflowBase,
    serialization_settings: typing.Optional[SerializationSettings],
    version: typing.Optional[str],
    default_launch_plan: typing.Optional[bool],
    options: typing.Optional[Options],
) -> n:

Use this method to register a workflow.

Parameter Type
entity WorkflowBase
serialization_settings typing.Optional[SerializationSettings]
version typing.Optional[str]
default_launch_plan typing.Optional[bool]
options typing.Optional[Options]

reject()

def reject(
    signal_id: str,
    execution_name: str,
    project: str,
    domain: str,
)
Parameter Type
signal_id str
execution_name str
project str
domain str

remote_context()

def remote_context()

Context manager with remote-specific configuration.

search_artifacts()

def search_artifacts(
    project: typing.Optional[str],
    domain: typing.Optional[str],
    name: typing.Optional[str],
    artifact_key: typing.Optional[art_id.ArtifactKey],
    query: typing.Optional[ArtifactQuery],
    partitions: typing.Optional[Union[Partitions, typing.Dict[str, str]]],
    time_partition: typing.Optional[Union[datetime.datetime, TimePartition]],
    group_by_key: bool,
    limit: int,
) -> typing.List[Artifact]
Parameter Type
project typing.Optional[str]
domain typing.Optional[str]
name typing.Optional[str]
artifact_key typing.Optional[art_id.ArtifactKey]
query typing.Optional[ArtifactQuery]
partitions typing.Optional[Union[Partitions, typing.Dict[str, str]]]
time_partition typing.Optional[Union[datetime.datetime, TimePartition]]
group_by_key bool
limit int

set_input()

def set_input(
    signal_id: str,
    execution_name: str,
    value: typing.Union[literal_models.Literal, typing.Any],
    project,
    domain,
    python_type,
    literal_type,
)
Parameter Type
signal_id str
execution_name str
value typing.Union[literal_models.Literal, typing.Any]
project
domain
python_type
literal_type

set_signal()

def set_signal(
    signal_id: str,
    execution_name: str,
    value: typing.Union[literal_models.Literal, typing.Any],
    project: typing.Optional[str],
    domain: typing.Optional[str],
    python_type: typing.Optional[typing.Type],
    literal_type: typing.Optional[type_models.LiteralType],
)
Parameter Type
signal_id str
execution_name str
value typing.Union[literal_models.Literal, typing.Any]
project typing.Optional[str]
domain typing.Optional[str]
python_type typing.Optional[typing.Type]
literal_type typing.Optional[type_models.LiteralType]

stop_app()

def stop_app(
    name: str,
    project: Optional[str],
    domain: Optional[str],
) -> n: The App IDL for the stopped application.

Stop an application.

Parameter Type
name str
project Optional[str]
domain Optional[str]

stream_execution_events()

def stream_execution_events(
    event_count: Optional[int],
    include_workflow_executions: bool,
    include_task_executions: bool,
    include_node_executions: bool,
) -> AsyncGenerator[Union[CloudEventWorkflowExecution, CloudEventNodeExecution, CloudEventTaskExecution], None]

Stream execution events from the given tenant. This is a generator that yields events as they are received.

Events are guaranteed to be delivered at least once, and clients must implement handling for potentially out-of-order event processing. Events will be retransmitted until acknowledged, with acknowledgment occurring automatically upon normal return from the caller. Note: if an exception is raised during event processing, the acknowledgment will not occur, and the event will be redelivered in a subsequent transmission.

Parameter Type
event_count Optional[int]
include_workflow_executions bool
include_task_executions bool
include_node_executions bool

sync()

def sync(
    execution: FlyteWorkflowExecution,
    entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
    sync_nodes: bool,
) -> n: Returns the same execution object, but with additional information pulled in.

This function was previously a singledispatchmethod. We’ve removed that but this function remains so that we don’t break people.

Parameter Type
execution FlyteWorkflowExecution
entity_definition typing.Union[FlyteWorkflow, FlyteTask]
sync_nodes bool

sync_execution()

def sync_execution(
    execution: FlyteWorkflowExecution,
    entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
    sync_nodes: bool,
) -> FlyteWorkflowExecution

Sync a FlyteWorkflowExecution object with its corresponding remote state.

Parameter Type
execution FlyteWorkflowExecution
entity_definition typing.Union[FlyteWorkflow, FlyteTask]
sync_nodes bool

sync_node_execution()

def sync_node_execution(
    execution: FlyteNodeExecution,
    node_mapping: typing.Dict[str, FlyteNode],
) -> FlyteNodeExecution

Get data backing a node execution. These FlyteNodeExecution objects should’ve come from Admin with the model fields already populated correctly. For purposes of the remote experience, we’d like to supplement the object with some additional fields:

  • inputs/outputs
  • task/workflow executions, and/or underlying node executions in the case of parent nodes
  • TypedInterface (remote wrapper type)

A node can have several different types of executions behind it. That is, the node could’ve run (perhaps multiple times because of retries):

  • A task
  • A static subworkflow
  • A dynamic subworkflow (which in turn may have run additional tasks, subwfs, and/or launch plans)
  • A launch plan

The data model is complicated, so ascertaining which of these happened is a bit tricky. That logic is encapsulated in this function.

Parameter Type
execution FlyteNodeExecution
node_mapping typing.Dict[str, FlyteNode]

sync_task_execution()

def sync_task_execution(
    execution: FlyteTaskExecution,
    entity_interface: typing.Optional[TypedInterface],
) -> FlyteTaskExecution

Sync a FlyteTaskExecution object with its corresponding remote state.

Parameter Type
execution FlyteTaskExecution
entity_interface typing.Optional[TypedInterface]

terminate()

def terminate(
    execution: FlyteWorkflowExecution,
    cause: str,
)

Terminate a workflow execution.

Parameter Type
execution FlyteWorkflowExecution
cause str

upload_file()

def upload_file(
    to_upload: pathlib.Path,
    project: typing.Optional[str],
    domain: typing.Optional[str],
    filename_root: typing.Optional[str],
) -> n: The uploaded location.

Function will use remote’s client to hash and then upload the file using Admin’s data proxy service.

Parameter Type
to_upload pathlib.Path
project typing.Optional[str]
domain typing.Optional[str]
filename_root typing.Optional[str]

wait()

def wait(
    execution: FlyteWorkflowExecution,
    timeout: typing.Optional[typing.Union[timedelta, int]],
    poll_interval: typing.Optional[typing.Union[timedelta, int]],
    sync_nodes: bool,
) -> FlyteWorkflowExecution

Wait for an execution to finish.

Parameter Type
execution FlyteWorkflowExecution
timeout typing.Optional[typing.Union[timedelta, int]]
poll_interval typing.Optional[typing.Union[timedelta, int]]
sync_nodes bool

Properties

Property Type Description
apps_service_client
artifacts_client
client
Return a SynchronousFlyteClient for additional operations.
config
Image config.
context
default_domain
Default project to use when fetching or executing flyte entities.
default_project
Default project to use when fetching or executing flyte entities.
file_access
File access provider to use for offloading non-literal inputs/outputs.
hooks_async_client
hooks_sync_client
images_client
interactive_mode_enabled
If set to True, the FlyteRemote will pickle the task/workflow.
secret_client
sync_channel
Return channel from client. This channel already has the org passed in dynamically by the interceptor.
users_client

union.VersionParameters

Parameters used for version hash generation.

param func: The function to generate a version for. This is an optional parameter and can be any callable that matches the specified parameter and return types. :type func: Optional[Callable[P, FuncOut]]

class VersionParameters(
    func: typing.Callable[~P, ~FuncOut],
    container_image: typing.Union[str, flytekit.image_spec.image_spec.ImageSpec, NoneType],
    pod_template: typing.Optional[flytekit.core.pod_template.PodTemplate],
    pod_template_name: typing.Optional[str],
)
Parameter Type
func typing.Callable[~P, ~FuncOut]
container_image typing.Union[str, flytekit.image_spec.image_spec.ImageSpec, NoneType]
pod_template typing.Optional[flytekit.core.pod_template.PodTemplate]
pod_template_name typing.Optional[str]