union
Directory
Classes
Class | Description |
---|---|
ActorEnvironment |
ActorEnvironment class. |
Artifact |
This is a wrapper around the Flytekit Artifact class. |
Cache |
Cache configuration for a task. |
CachePolicy |
Base class for protocol classes. |
ContainerTask |
This is an intermediate class that represents Flyte Tasks that run a container at execution time. |
Deck |
Deck enable users to get customizable and default visibility into their tasks. |
FlyteDirectory |
|
FlyteFile |
|
ImageSpec |
This class is used to specify the docker image that will be used to run the task. |
LaunchPlan |
Launch Plans are one of the core constructs of Flyte. |
PodTemplate |
Custom PodTemplate specification for a Task. |
Resources |
This class is used to specify both resource requests and resource limits. |
Secret |
See :std:ref:cookbook:secrets for usage examples. |
StructuredDataset |
This is the user facing StructuredDataset class. |
UnionRemote |
Main entrypoint for programmatically accessing a Flyte remote backend. |
VersionParameters |
Parameters used for version hash generation. |
Methods
Method | Description |
---|---|
actor_cache() |
Cache function between actor executions. |
current_context() |
Use this method to get a handle of specific parameters available in a flyte task. |
map() |
Use to map over tasks, actors, launch plans, reference tasks and launch plans, and remote tasks and. |
map_task() |
Wrapper that creates a map task utilizing either the existing ArrayNodeMapTask. |
task() |
This is the core decorator to use for any task type in flytekit. |
workflow() |
This decorator declares a function to be a Flyte workflow. |
Methods
actor_cache()
def actor_cache(
f,
)
Cache function between actor executions.
Parameter | Type |
---|---|
f |
current_context()
def current_context()
Use this method to get a handle of specific parameters available in a flyte task.
Usage
.. code-block:: python
flytekit.current_context().logging.info(...)
Available params are documented in :py:class:flytekit.core.context_manager.ExecutionParams
.
There are some special params, that should be available
map()
def map(
target: typing.Union[flytekit.core.launch_plan.LaunchPlan, flytekit.core.python_function_task.PythonFunctionTask, ForwardRef('FlyteLaunchPlan')],
bound_inputs: typing.Optional[typing.Dict[str, typing.Any]],
concurrency: typing.Optional[int],
min_successes: typing.Optional[int],
min_success_ratio: float,
kwargs,
)
Use to map over tasks, actors, launch plans, reference tasks and launch plans, and remote tasks and launch plans.
Parameter | Type |
---|---|
target |
typing.Union[flytekit.core.launch_plan.LaunchPlan, flytekit.core.python_function_task.PythonFunctionTask, ForwardRef('FlyteLaunchPlan')] |
bound_inputs |
typing.Optional[typing.Dict[str, typing.Any]] |
concurrency |
typing.Optional[int] |
min_successes |
typing.Optional[int] |
min_success_ratio |
float |
kwargs |
**kwargs |
map_task()
def map_task(
target: typing.Union[flytekit.core.launch_plan.LaunchPlan, flytekit.core.python_function_task.PythonFunctionTask, ForwardRef('FlyteLaunchPlan')],
concurrency: typing.Optional[int],
min_successes: typing.Optional[int],
min_success_ratio: float,
kwargs,
)
Wrapper that creates a map task utilizing either the existing ArrayNodeMapTask or the drop in replacement ArrayNode implementation
Parameter | Type |
---|---|
target |
typing.Union[flytekit.core.launch_plan.LaunchPlan, flytekit.core.python_function_task.PythonFunctionTask, ForwardRef('FlyteLaunchPlan')] |
concurrency |
typing.Optional[int] |
min_successes |
typing.Optional[int] |
min_success_ratio |
float |
kwargs |
**kwargs |
task()
def task(
_task_function: Optional[Callable[P, FuncOut]],
task_config: Optional[T],
cache: Union[bool, Cache],
retries: int,
interruptible: Optional[bool],
deprecated: str,
timeout: Union[datetime.timedelta, int],
container_image: Optional[Union[str, ImageSpec]],
environment: Optional[Dict[str, str]],
requests: Optional[Resources],
limits: Optional[Resources],
secret_requests: Optional[List[Secret]],
execution_mode: PythonFunctionTask.ExecutionBehavior,
node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
task_resolver: Optional[TaskResolverMixin],
docs: Optional[Documentation],
disable_deck: Optional[bool],
enable_deck: Optional[bool],
deck_fields: Optional[Tuple[DeckField, ...]],
pod_template: Optional['PodTemplate'],
pod_template_name: Optional[str],
accelerator: Optional[BaseAccelerator],
pickle_untyped: bool,
shared_memory: Optional[Union[L[True], str]],
resources: Optional[Resources],
kwargs,
) -> Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionTask[T]], PythonFunctionTask[T]]
This is the core decorator to use for any task type in flytekit.
Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties
- Versioned (usually tied to the git revision SHA1)
- Strong interfaces (specified inputs and outputs)
- Declarative
- Independently executable
- Unit testable
For a simple python task,
.. code-block:: python
@task
def my_task(x: int, y: typing.Dict[str, str]) -> str:
...
For specific task types
.. code-block:: python
@task(task_config=Spark(), retries=3)
def my_task(x: int, y: typing.Dict[str, str]) -> str:
...
Please see some cookbook :std:ref:task examples <cookbook:tasks>
for additional information.
Parameter | Type |
---|---|
_task_function |
Optional[Callable[P, FuncOut]] |
task_config |
Optional[T] |
cache |
Union[bool, Cache] |
retries |
int |
interruptible |
Optional[bool] |
deprecated |
str |
timeout |
Union[datetime.timedelta, int] |
container_image |
Optional[Union[str, ImageSpec]] |
environment |
Optional[Dict[str, str]] |
requests |
Optional[Resources] |
limits |
Optional[Resources] |
secret_requests |
Optional[List[Secret]] |
execution_mode |
PythonFunctionTask.ExecutionBehavior |
node_dependency_hints |
Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]] |
task_resolver |
Optional[TaskResolverMixin] |
docs |
Optional[Documentation] |
disable_deck |
Optional[bool] |
enable_deck |
Optional[bool] |
deck_fields |
Optional[Tuple[DeckField, ...]] |
pod_template |
Optional['PodTemplate'] |
pod_template_name |
Optional[str] |
accelerator |
Optional[BaseAccelerator] |
pickle_untyped |
bool |
shared_memory |
Optional[Union[L[True], str]] |
resources |
Optional[Resources] |
kwargs |
**kwargs |
workflow()
def workflow(
_workflow_function: Optional[Callable[P, FuncOut]],
failure_policy: Optional[WorkflowFailurePolicy],
interruptible: bool,
on_failure: Optional[Union[WorkflowBase, Task]],
docs: Optional[Documentation],
pickle_untyped: bool,
default_options: Optional[Options],
) -> Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionWorkflow], PythonFunctionWorkflow]
This decorator declares a function to be a Flyte workflow. Workflows are declarative entities that construct a DAG of tasks using the data flow between tasks.
Unlike a task, the function body of a workflow is evaluated at serialization-time (aka compile-time). This is because while we can determine the entire structure of a task by looking at the function’s signature, workflows need to run through the function itself because the body of the function is what expresses the workflow structure. It’s also important to note that, local execution notwithstanding, it is not evaluated again when the workflow runs on Flyte. That is, workflows should not call non-Flyte entities since they are only run once (again, this is with respect to the platform, local runs notwithstanding).
Example:
.. literalinclude:: ../../../tests/flytekit/unit/core/test_workflows.py :pyobject: my_wf_example
Again, users should keep in mind that even though the body of the function looks like regular Python, it is
actually not. When flytekit scans the workflow function, the objects being passed around between the tasks are not
your typical Python values. So even though you may have a task t1() -> int
, when a = t1()
is called, a
will not be an integer so if you try to range(a)
you’ll get an error.
Please see the :ref:user guide <cookbook:workflow>
for more usage examples.
Parameter | Type |
---|---|
_workflow_function |
Optional[Callable[P, FuncOut]] |
failure_policy |
Optional[WorkflowFailurePolicy] |
interruptible |
bool |
on_failure |
Optional[Union[WorkflowBase, Task]] |
docs |
Optional[Documentation] |
pickle_untyped |
bool |
default_options |
Optional[Options] |
union.ActorEnvironment
ActorEnvironment class.
class ActorEnvironment(
name: str,
container_image: Optional[Union[str, ImageSpec]],
replica_count: int,
ttl_seconds: Optional[int],
environment: Optional[Dict[str, str]],
requests: Optional[Resources],
limits: Optional[Resources],
accelerator: Optional[BaseAccelerator],
secret_requests: Optional[List[Secret]],
pod_template: Optional[PodTemplate],
interruptible: bool,
)
Parameter | Type |
---|---|
name |
str |
container_image |
Optional[Union[str, ImageSpec]] |
replica_count |
int |
ttl_seconds |
Optional[int] |
environment |
Optional[Dict[str, str]] |
requests |
Optional[Resources] |
limits |
Optional[Resources] |
accelerator |
Optional[BaseAccelerator] |
secret_requests |
Optional[List[Secret]] |
pod_template |
Optional[PodTemplate] |
interruptible |
bool |
Properties
Property | Type | Description |
---|---|---|
task |
||
version |
union.Artifact
This is a wrapper around the Flytekit Artifact class.
This Python class has two purposes - as a Python representation of a materialized Artifact, and as a way for users to specify that tasks/workflows create Artifacts and the manner in which they are created.
Use one as input to workflow (only workflow for now) df_artifact = Artifact.get(“flyte://a1”) remote.execute(wf, inputs={“a”: df_artifact})
Note that Python fields will be missing when retrieved from the service.
class Artifact(
args,
project: Optional[str],
domain: Optional[str],
name: Optional[str],
version: Optional[str],
time_partitioned: bool,
time_partition: Optional[TimePartition],
time_partition_granularity: Optional[Granularity],
partition_keys: Optional[typing.List[str]],
partitions: Optional[Union[Partitions, typing.Dict[str, str]]],
python_val: Optional[typing.Any],
python_type: Optional[typing.Type],
literal: Optional[Literal],
literal_type: Optional[LiteralType],
short_description: Optional[str],
source: Optional[artifacts_pb2.ArtifactSource],
card: Optional[Card],
kwargs,
)
Parameter | Type |
---|---|
args |
*args |
project |
Optional[str] |
domain |
Optional[str] |
name |
Optional[str] |
version |
Optional[str] |
time_partitioned |
bool |
time_partition |
Optional[TimePartition] |
time_partition_granularity |
Optional[Granularity] |
partition_keys |
Optional[typing.List[str]] |
partitions |
Optional[Union[Partitions, typing.Dict[str, str]]] |
python_val |
Optional[typing.Any] |
python_type |
Optional[typing.Type] |
literal |
Optional[Literal] |
literal_type |
Optional[LiteralType] |
short_description |
Optional[str] |
source |
Optional[artifacts_pb2.ArtifactSource] |
card |
Optional[Card] |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
create_from() |
This function allows users to declare partition values dynamically from the body of a task. |
embed_as_query() |
This should only be called in the context of a Trigger. |
from_flyte_idl() |
Converts the IDL representation to this object. |
get() |
This function is supposed to mimic the get() behavior inputs/outputs as returned by FlyteRemote for an. |
initialize() |
Use this for when you have a Python value you want to get an Artifact object out of. |
metadata() |
|
query() |
|
set_resolver() |
|
set_source() |
|
to_create_request() |
|
to_id_idl() |
Converts this object to the IDL representation. |
create_from()
def create_from(
o: O,
card: Optional[SerializableToString],
args: `*args`,
kwargs,
) -> O
This function allows users to declare partition values dynamically from the body of a task. Note that you’ll still need to annotate your task function output with the relevant Artifact object. Below, one of the partition values is bound to an input, and the other is set at runtime. Note that since tasks are not run at compile time, flytekit cannot check that you’ve bound all the partition values. It’s up to you to ensure that you’ve done so.
Pricing = Artifact(name="pricing", partition_keys=["region"])
EstError = Artifact(name="estimation_error", partition_keys=["dataset"], time_partitioned=True)
@task
def t1() -> Annotated[pd.DataFrame, Pricing], Annotated[float, EstError]:
df = get_pricing_results()
dt = get_time()
return Pricing.create_from(df, region="dubai"), EstError.create_from(msq_error, dataset="train", time_partition=dt)
You can mix and match with the input syntax as well.
@task
def my_task() -> Annotated[pd.DataFrame, RideCountData(region=Inputs.region)]:
...
return RideCountData.create_from(df, time_partition=datetime.datetime.now())
Parameter | Type |
---|---|
o |
O |
card |
Optional[SerializableToString] |
args |
*args |
kwargs |
**kwargs |
embed_as_query()
def embed_as_query(
partition: Optional[str],
bind_to_time_partition: Optional[bool],
expr: Optional[str],
op: Optional[Op],
) -> art_id.ArtifactQuery
This should only be called in the context of a Trigger. The type of query this returns is different from the query() function. This type of query is used to reference the triggering artifact, rather than running a query.
Parameter | Type |
---|---|
partition |
Optional[str] |
bind_to_time_partition |
Optional[bool] |
expr |
Optional[str] |
op |
Optional[Op] |
from_flyte_idl()
def from_flyte_idl(
pb2: artifacts_pb2.Artifact,
) -> Artifact
Converts the IDL representation to this object.
Parameter | Type |
---|---|
pb2 |
artifacts_pb2.Artifact |
get()
def get(
as_type: Optional[typing.Type],
) -> Optional[typing.Any]
This function is supposed to mimic the get() behavior inputs/outputs as returned by FlyteRemote for an execution, leveraging the LiteralsResolver (and underneath that the TypeEngine) to turn the literal into a Python value.
Parameter | Type |
---|---|
as_type |
Optional[typing.Type] |
initialize()
def initialize(
python_val: typing.Any,
python_type: typing.Type,
name: Optional[str],
literal_type: Optional[LiteralType],
version: Optional[str],
tags: Optional[typing.List[str]],
) -> Artifact
Use this for when you have a Python value you want to get an Artifact object out of.
This function readies an Artifact for creation, it doesn’t actually create it just yet since this is a network-less call. You will need to persist it with a FlyteRemote instance: remote.create_artifact(Artifact.initialize(…))
Artifact.initialize("/path/to/file", tags={“tag1”: “val1”}) Artifact.initialize("/path/to/parquet", type=pd.DataFrame, tags=[“0.1.0”])
What’s set here is everything that isn’t set by the server. What is set by the server?
- name, version, if not set by user.
- uri Set by remote
- project, domain
Parameter | Type |
---|---|
python_val |
typing.Any |
python_type |
typing.Type |
name |
Optional[str] |
literal_type |
Optional[LiteralType] |
version |
Optional[str] |
tags |
Optional[typing.List[str]] |
metadata()
def metadata()
query()
def query(
project: Optional[str],
domain: Optional[str],
time_partition: Optional[Union[datetime.datetime, TimePartition, art_id.InputBindingData]],
partitions: Optional[Union[typing.Dict[str, str], Partitions]],
kwargs,
) -> ArtifactQuery
Parameter | Type |
---|---|
project |
Optional[str] |
domain |
Optional[str] |
time_partition |
Optional[Union[datetime.datetime, TimePartition, art_id.InputBindingData]] |
partitions |
Optional[Union[typing.Dict[str, str], Partitions]] |
kwargs |
**kwargs |
set_resolver()
def set_resolver(
resolver: LiteralsResolver,
)
Parameter | Type |
---|---|
resolver |
LiteralsResolver |
set_source()
def set_source(
source: artifacts_pb2.ArtifactSource,
)
Parameter | Type |
---|---|
source |
artifacts_pb2.ArtifactSource |
to_create_request()
def to_create_request(
a: Artifact,
) -> artifacts_pb2.CreateArtifactRequest
Parameter | Type |
---|---|
a |
Artifact |
to_id_idl()
def to_id_idl()
Converts this object to the IDL representation. This is here instead of translator because it’s in the interface, a relatively simple proto object that’s exposed to the user.
Properties
Property | Type | Description |
---|---|---|
concrete_artifact_id |
||
partitions |
||
time_partition |
union.Cache
Cache configuration for a task.
class Cache(
version: typing.Optional[str],
serialize: bool,
ignored_inputs: typing.Union[typing.Tuple[str, ...], str],
salt: str,
policies: typing.Union[typing.List[flytekit.core.cache.CachePolicy], flytekit.core.cache.CachePolicy, NoneType],
)
Parameter | Type |
---|---|
version |
typing.Optional[str] |
serialize |
bool |
ignored_inputs |
typing.Union[typing.Tuple[str, ...], str] |
salt |
str |
policies |
typing.Union[typing.List[flytekit.core.cache.CachePolicy], flytekit.core.cache.CachePolicy, NoneType] |
Methods
Method | Description |
---|---|
get_ignored_inputs() |
|
get_version() |
get_ignored_inputs()
def get_ignored_inputs()
get_version()
def get_version(
params: flytekit.core.cache.VersionParameters,
) -> str
Parameter | Type |
---|---|
params |
flytekit.core.cache.VersionParameters |
union.CachePolicy
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol):
def meth(self) -> int:
...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C:
def meth(self) -> int:
return 0
def func(x: Proto) -> int:
return x.meth()
func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto[T](Protocol):
def meth(self) -> T:
...
class CachePolicy(
args,
kwargs,
)
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
get_version() |
get_version()
def get_version(
salt: str,
params: flytekit.core.cache.VersionParameters,
) -> str
Parameter | Type |
---|---|
salt |
str |
params |
flytekit.core.cache.VersionParameters |
union.ContainerTask
This is an intermediate class that represents Flyte Tasks that run a container at execution time. This is the vast
majority of tasks - the typical @task
decorated tasks for instance all run a container. An example of
something that doesn’t run a container would be something like the Athena SQL task.
class ContainerTask(
name: str,
image: typing.Union[str, flytekit.image_spec.image_spec.ImageSpec],
command: typing.List[str],
inputs: typing.Optional[typing.OrderedDict[str, typing.Type]],
metadata: typing.Optional[flytekit.core.base_task.TaskMetadata],
arguments: typing.Optional[typing.List[str]],
outputs: typing.Optional[typing.Dict[str, typing.Type]],
requests: typing.Optional[flytekit.core.resources.Resources],
limits: typing.Optional[flytekit.core.resources.Resources],
input_data_dir: typing.Optional[str],
output_data_dir: typing.Optional[str],
metadata_format: <enum 'MetadataFormat'>,
io_strategy: typing.Optional[flytekit.core.container_task.ContainerTask.IOStrategy],
secret_requests: typing.Optional[typing.List[flytekit.models.security.Secret]],
pod_template: typing.Optional[ForwardRef('PodTemplate')],
pod_template_name: typing.Optional[str],
local_logs: bool,
resources: typing.Optional[flytekit.core.resources.Resources],
kwargs,
)
Parameter | Type |
---|---|
name |
str |
image |
typing.Union[str, flytekit.image_spec.image_spec.ImageSpec] |
command |
typing.List[str] |
inputs |
typing.Optional[typing.OrderedDict[str, typing.Type]] |
metadata |
typing.Optional[flytekit.core.base_task.TaskMetadata] |
arguments |
typing.Optional[typing.List[str]] |
outputs |
typing.Optional[typing.Dict[str, typing.Type]] |
requests |
typing.Optional[flytekit.core.resources.Resources] |
limits |
typing.Optional[flytekit.core.resources.Resources] |
input_data_dir |
typing.Optional[str] |
output_data_dir |
typing.Optional[str] |
metadata_format |
<enum 'MetadataFormat'> |
io_strategy |
typing.Optional[flytekit.core.container_task.ContainerTask.IOStrategy] |
secret_requests |
typing.Optional[typing.List[flytekit.models.security.Secret]] |
pod_template |
typing.Optional[ForwardRef('PodTemplate')] |
pod_template_name |
typing.Optional[str] |
local_logs |
bool |
resources |
typing.Optional[flytekit.core.resources.Resources] |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
compile() |
Generates a node that encapsulates this task in a workflow definition. |
construct_node_metadata() |
Used when constructing the node that encapsulates this task as part of a broader workflow definition. |
dispatch_execute() |
This method translates Flyte’s Type system based input values and invokes the actual call to the executor. |
execute() |
This method will be invoked to execute the task. |
find_lhs() |
|
get_config() |
Returns the task config as a serializable dictionary. |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte. |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary. |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte. |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task. |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte. |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte. |
get_type_for_input_var() |
Returns the python type for an input variable by name. |
get_type_for_output_var() |
Returns the python type for the specified output variable by name. |
local_execute() |
This function is used only in the local execution path and is responsible for calling dispatch execute. |
local_execution_mode() |
|
post_execute() |
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,. |
pre_execute() |
This is the method that will be invoked directly before executing the task method and before all the inputs. |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution. |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, NoneType]
Generates a node that encapsulates this task in a workflow definition.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
) -> typing.Union[flytekit.models.literals.LiteralMap, flytekit.models.dynamic_job.DynamicJobSpec, typing.Coroutine]
This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.
VoidPromise
is returned in the case when the task itself declares no outputs.Literal Map
is returned when the task returns either one more outputs in the declaration. Individual outputs may be noneDynamicJobSpec
is returned when a dynamic workflow is executed
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
def execute(
kwargs,
) -> flytekit.models.literals.LiteralMap
This method will be invoked to execute the task.
Parameter | Type |
---|---|
kwargs |
**kwargs |
find_lhs()
def find_lhs()
get_config()
def get_config(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, str]]
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_container()
def get_container(
settings: flytekit.configuration.SerializationSettings,
) -> flytekit.models.task.Container
Returns the container definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, typing.Any]]
Return additional plugin-specific custom data (if any) as a serializable dictionary.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flyteidl.core.tasks_pb2.ExtendedResources]
Returns the extended resources to allocate to the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_input_types()
def get_input_types()
Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: flytekit.configuration.SerializationSettings,
) -> flytekit.models.task.K8sPod
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Sql]
Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_type_for_input_var()
def get_type_for_input_var(
k: str,
v: typing.Any,
) -> typing.Type[typing.Any]
Returns the python type for an input variable by name.
Parameter | Type |
---|---|
k |
str |
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
) -> typing.Type[typing.Any]
Returns the python type for the specified output variable by name.
Parameter | Type |
---|---|
k |
str |
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, typing.Coroutine, NoneType]
This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
post_execute()
def post_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
rval: typing.Any,
) -> typing.Any
Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
Parameter | Type |
---|---|
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
) -> typing.Optional[flytekit.core.context_manager.ExecutionParameters]
This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called
This should return either the same context of the mutated context
Parameter | Type |
---|---|
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
) -> flytekit.models.literals.LiteralMap
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property | Type | Description |
---|---|---|
deck_fields |
If not empty, this task will output deck html file for the specified decks |
|
disable_deck |
If true, this task will not output deck html file |
|
docs |
||
enable_deck |
If true, this task will output deck html file |
|
environment |
Any environment variables that supplied during the execution of the task. |
|
instantiated_in |
||
interface |
||
lhs |
||
location |
||
metadata |
||
name |
||
python_interface |
Returns this task’s python interface. |
|
resources |
||
security_context |
||
task_config |
Returns the user-specified task config which is used for plugin-specific handling of the task. |
|
task_type |
||
task_type_version |
union.Deck
Deck enable users to get customizable and default visibility into their tasks.
Deck contains a list of renderers (FrameRenderer, MarkdownRenderer) that can generate a html file. For example, FrameRenderer can render a DataFrame as an HTML table, MarkdownRenderer can convert Markdown string to HTML
Flyte context saves a list of deck objects, and we use renderers in those decks to render the data and create an HTML file when those tasks are executed
Each task has a least three decks (input, output, default). Input/output decks are used to render tasks’ input/output data, and the default deck is used to render line plots, scatter plots or Markdown text. In addition, users can create new decks to render their data with custom renderers.
.. code-block:: python
iris_df = px.data.iris()
@task()
def t1() -> str:
md_text = '#Hello Flyte##Hello Flyte###Hello Flyte'
m = MarkdownRenderer()
s = BoxRenderer("sepal_length")
deck = flytekit.Deck("demo", s.to_html(iris_df))
deck.append(m.to_html(md_text))
default_deck = flytekit.current_context().default_deck
default_deck.append(m.to_html(md_text))
return md_text
# Use Annotated to override default renderer
@task()
def t2() -> Annotated[pd.DataFrame, TopFrameRenderer(10)]:
return iris_df
class Deck(
name: str,
html: typing.Optional[str],
auto_add_to_deck: bool,
)
Parameter | Type |
---|---|
name |
str |
html |
typing.Optional[str] |
auto_add_to_deck |
bool |
Methods
Method | Description |
---|---|
append() |
|
publish() |
append()
def append(
html: str,
) -> Deck
Parameter | Type |
---|---|
html |
str |
publish()
def publish()
Properties
Property | Type | Description |
---|---|---|
html |
||
name |
union.FlyteDirectory
class FlyteDirectory(
path: typing.Union[str, os.PathLike],
downloader: typing.Optional[typing.Callable],
remote_directory: typing.Optional[typing.Union[os.PathLike, str, typing.Literal[False]]],
)
Parameter | Type |
---|---|
path |
typing.Union[str, os.PathLike] |
downloader |
typing.Optional[typing.Callable] |
remote_directory |
typing.Optional[typing.Union[os.PathLike, str, typing.Literal[False]]] |
Methods
Method | Description |
---|---|
crawl() |
Crawl returns a generator of all files prefixed by any sub-folders under the given “FlyteDirectory”. |
deserialize_flyte_dir() |
|
download() |
|
extension() |
|
from_dict() |
|
from_json() |
|
from_source() |
Create a new FlyteDirectory object with the remote source set to the input. |
listdir() |
This function will list all files and folders in the given directory, but without downloading the contents. |
new() |
Create a new FlyteDirectory object in current Flyte working directory. |
new_dir() |
This will create a new folder under the current folder. |
new_file() |
This will create a new file under the current folder. |
new_remote() |
Create a new FlyteDirectory object using the currently configured default remote in the context (i. |
schema() |
|
serialize_flyte_dir() |
|
to_dict() |
|
to_json() |
crawl()
def crawl(
maxdepth: typing.Optional[int],
topdown: bool,
kwargs,
) -> Generator[Tuple[typing.Union[str, os.PathLike[Any]], typing.Dict[Any, Any]], None, None]
Crawl returns a generator of all files prefixed by any sub-folders under the given “FlyteDirectory”. if details=True is passed, then it will return a dictionary as specified by fsspec.
Example:
>>> list(fd.crawl())
[("/base", "file1"), ("/base", "dir1/file1"), ("/base", "dir2/file1"), ("/base", "dir1/dir/file1")]
>>> list(x.crawl(detail=True))
[('/tmp/test', {'my-dir/ab.py': {'name': '/tmp/test/my-dir/ab.py', 'size': 0, 'type': 'file',
'created': 1677720780.2318847, 'islink': False, 'mode': 33188, 'uid': 501, 'gid': 0,
'mtime': 1677720780.2317934, 'ino': 1694329, 'nlink': 1}})]
Parameter | Type |
---|---|
maxdepth |
typing.Optional[int] |
topdown |
bool |
kwargs |
**kwargs |
deserialize_flyte_dir()
def deserialize_flyte_dir(
args,
kwargs,
)
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
download()
def download()
extension()
def extension()
from_dict()
def from_dict(
kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
infer_missing,
) -> ~A
Parameter | Type |
---|---|
kvs |
typing.Union[dict, list, str, int, float, bool, NoneType] |
infer_missing |
from_json()
def from_json(
s: typing.Union[str, bytes, bytearray],
parse_float,
parse_int,
parse_constant,
infer_missing,
kw,
) -> ~A
Parameter | Type |
---|---|
s |
typing.Union[str, bytes, bytearray] |
parse_float |
|
parse_int |
|
parse_constant |
|
infer_missing |
|
kw |
from_source()
def from_source(
source: str | os.PathLike,
) -> FlyteDirectory
Create a new FlyteDirectory object with the remote source set to the input
Parameter | Type |
---|---|
source |
str | os.PathLike |
listdir()
def listdir(
directory: FlyteDirectory,
) -> typing.List[typing.Union[FlyteDirectory, FlyteFile]]
This function will list all files and folders in the given directory, but without downloading the contents. In addition, it will return a list of FlyteFile and FlyteDirectory objects that have ability to lazily download the contents of the file/folder. For example:
.. code-block:: python
entity = FlyteDirectory.listdir(directory)
for e in entity:
print("s3 object:", e.remote_source)
# s3 object: s3://test-flytedir/file1.txt
# s3 object: s3://test-flytedir/file2.txt
# s3 object: s3://test-flytedir/sub_dir
open(entity[0], "r") # This will download the file to the local disk.
open(entity[0], "r") # flytekit will read data from the local disk if you open it again.
Parameter | Type |
---|---|
directory |
FlyteDirectory |
new()
def new(
dirname: str | os.PathLike,
) -> FlyteDirectory
Create a new FlyteDirectory object in current Flyte working directory.
Parameter | Type |
---|---|
dirname |
str | os.PathLike |
new_dir()
def new_dir(
name: typing.Optional[str],
) -> FlyteDirectory
This will create a new folder under the current folder. If given a name, it will use the name given, otherwise it’ll pick a random string. Collisions are not checked.
Parameter | Type |
---|---|
name |
typing.Optional[str] |
new_file()
def new_file(
name: typing.Optional[str],
) -> FlyteFile
This will create a new file under the current folder. If given a name, it will use the name given, otherwise it’ll pick a random string. Collisions are not checked.
Parameter | Type |
---|---|
name |
typing.Optional[str] |
new_remote()
def new_remote(
stem: typing.Optional[str],
alt: typing.Optional[str],
) -> FlyteDirectory
Create a new FlyteDirectory object using the currently configured default remote in the context (i.e. the raw_output_prefix configured in the current FileAccessProvider object in the context). This is used if you explicitly have a folder somewhere that you want to create files under. If you want to write a whole folder, you can let your task return a FlyteDirectory object, and let flytekit handle the uploading.
Parameter | Type |
---|---|
stem |
typing.Optional[str] |
alt |
typing.Optional[str] |
schema()
def schema(
infer_missing: bool,
only,
exclude,
many: bool,
context,
load_only,
dump_only,
partial: bool,
unknown,
) -> SchemaType[A]
Parameter | Type |
---|---|
infer_missing |
bool |
only |
|
exclude |
|
many |
bool |
context |
|
load_only |
|
dump_only |
|
partial |
bool |
unknown |
serialize_flyte_dir()
def serialize_flyte_dir(
args,
kwargs,
)
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
to_dict()
def to_dict(
encode_json,
) -> typing.Dict[str, typing.Union[dict, list, str, int, float, bool, NoneType]]
Parameter | Type |
---|---|
encode_json |
to_json()
def to_json(
skipkeys: bool,
ensure_ascii: bool,
check_circular: bool,
allow_nan: bool,
indent: typing.Union[int, str, NoneType],
separators: typing.Tuple[str, str],
default: typing.Callable,
sort_keys: bool,
kw,
) -> str
Parameter | Type |
---|---|
skipkeys |
bool |
ensure_ascii |
bool |
check_circular |
bool |
allow_nan |
bool |
indent |
typing.Union[int, str, NoneType] |
separators |
typing.Tuple[str, str] |
default |
typing.Callable |
sort_keys |
bool |
kw |
Properties
Property | Type | Description |
---|---|---|
downloaded |
||
remote_directory |
||
remote_source |
If this is an input to a task, and the original path is s3://something, flytekit will download the directory for the user. In case the user wants access to the original path, it will be here. |
|
sep |
union.FlyteFile
class FlyteFile(
path: typing.Union[str, os.PathLike],
downloader: typing.Callable,
remote_path: typing.Optional[typing.Union[os.PathLike, str, bool]],
metadata: typing.Optional[dict[str, str]],
)
FlyteFile’s init method.
Parameter | Type |
---|---|
path |
typing.Union[str, os.PathLike] |
downloader |
typing.Callable |
remote_path |
typing.Optional[typing.Union[os.PathLike, str, bool]] |
metadata |
typing.Optional[dict[str, str]] |
Methods
Method | Description |
---|---|
deserialize_flyte_file() |
|
download() |
|
extension() |
|
from_dict() |
|
from_json() |
|
from_source() |
Create a new FlyteFile object with the remote source set to the input. |
new() |
Create a new FlyteFile object in the current Flyte working directory. |
new_remote_file() |
Create a new FlyteFile object with a remote path. |
open() |
Returns a streaming File handle. |
serialize_flyte_file() |
|
to_dict() |
|
to_json() |
deserialize_flyte_file()
def deserialize_flyte_file(
args,
kwargs,
)
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
download()
def download()
extension()
def extension()
from_dict()
def from_dict(
d,
dialect,
)
Parameter | Type |
---|---|
d |
|
dialect |
from_json()
def from_json(
data: typing.Union[str, bytes, bytearray],
decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
from_dict_kwargs: typing.Any,
) -> ~T
Parameter | Type |
---|---|
data |
typing.Union[str, bytes, bytearray] |
decoder |
collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]] |
from_dict_kwargs |
typing.Any |
from_source()
def from_source(
source: str | os.PathLike,
) -> FlyteFile
Create a new FlyteFile object with the remote source set to the input
Parameter | Type |
---|---|
source |
str | os.PathLike |
new()
def new(
filename: str | os.PathLike,
) -> FlyteFile
Create a new FlyteFile object in the current Flyte working directory
Parameter | Type |
---|---|
filename |
str | os.PathLike |
new_remote_file()
def new_remote_file(
name: typing.Optional[str],
alt: typing.Optional[str],
) -> FlyteFile
Create a new FlyteFile object with a remote path.
Parameter | Type |
---|---|
name |
typing.Optional[str] |
alt |
typing.Optional[str] |
open()
def open(
mode: str,
cache_type: typing.Optional[str],
cache_options: typing.Optional[typing.Dict[str, typing.Any]],
)
Returns a streaming File handle
.. code-block:: python
@task
def copy_file(ff: FlyteFile) -> FlyteFile:
new_file = FlyteFile.new_remote_file()
with ff.open("rb", cache_type="readahead") as r:
with new_file.open("wb") as w:
w.write(r.read())
return new_file
Parameter | Type |
---|---|
mode |
str |
cache_type |
typing.Optional[str] |
cache_options |
typing.Optional[typing.Dict[str, typing.Any]] |
serialize_flyte_file()
def serialize_flyte_file(
args,
kwargs,
)
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
to_dict()
def to_dict()
to_json()
def to_json(
encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
to_dict_kwargs: typing.Any,
) -> typing.Union[str, bytes, bytearray]
Parameter | Type |
---|---|
encoder |
collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]] |
to_dict_kwargs |
typing.Any |
Properties
Property | Type | Description |
---|---|---|
downloaded |
||
remote_path |
||
remote_source |
If this is an input to a task, and the original path is an s3 bucket, Flytekit downloads thefile for the user. In case the user wants access to the original path, it will be here. |
union.ImageSpec
This class is used to specify the docker image that will be used to run the task.
class ImageSpec(
name: str,
python_version: str,
builder: typing.Optional[str],
source_root: typing.Optional[str],
env: typing.Optional[typing.Dict[str, str]],
registry: typing.Optional[str],
packages: typing.Optional[typing.List[str]],
conda_packages: typing.Optional[typing.List[str]],
conda_channels: typing.Optional[typing.List[str]],
requirements: typing.Optional[str],
apt_packages: typing.Optional[typing.List[str]],
cuda: typing.Optional[str],
cudnn: typing.Optional[str],
base_image: typing.Union[str, ForwardRef('ImageSpec'), NoneType],
platform: str,
pip_index: typing.Optional[str],
pip_extra_index_url: typing.Optional[typing.List[str]],
pip_secret_mounts: typing.Optional[typing.List[typing.Tuple[str, str]]],
pip_extra_args: typing.Optional[str],
registry_config: typing.Optional[str],
entrypoint: typing.Optional[typing.List[str]],
commands: typing.Optional[typing.List[str]],
tag_format: typing.Optional[str],
source_copy_mode: typing.Optional[flytekit.constants.CopyFileDetection],
copy: typing.Optional[typing.List[str]],
python_exec: typing.Optional[str],
)
Parameter | Type |
---|---|
name |
str |
python_version |
str |
builder |
typing.Optional[str] |
source_root |
typing.Optional[str] |
env |
typing.Optional[typing.Dict[str, str]] |
registry |
typing.Optional[str] |
packages |
typing.Optional[typing.List[str]] |
conda_packages |
typing.Optional[typing.List[str]] |
conda_channels |
typing.Optional[typing.List[str]] |
requirements |
typing.Optional[str] |
apt_packages |
typing.Optional[typing.List[str]] |
cuda |
typing.Optional[str] |
cudnn |
typing.Optional[str] |
base_image |
typing.Union[str, ForwardRef('ImageSpec'), NoneType] |
platform |
str |
pip_index |
typing.Optional[str] |
pip_extra_index_url |
typing.Optional[typing.List[str]] |
pip_secret_mounts |
typing.Optional[typing.List[typing.Tuple[str, str]]] |
pip_extra_args |
typing.Optional[str] |
registry_config |
typing.Optional[str] |
entrypoint |
typing.Optional[typing.List[str]] |
commands |
typing.Optional[typing.List[str]] |
tag_format |
typing.Optional[str] |
source_copy_mode |
typing.Optional[flytekit.constants.CopyFileDetection] |
copy |
typing.Optional[typing.List[str]] |
python_exec |
typing.Optional[str] |
Methods
Method | Description |
---|---|
exist() |
Check if the image exists in the registry. |
force_push() |
Builder that returns a new image spec with force push enabled. |
from_env() |
Create ImageSpec with the environment’s Python version and packages pinned to the ones in the environment. |
image_name() |
Full image name with tag. |
is_container() |
Check if the current container image in the pod is built from current image spec. |
with_apt_packages() |
Builder that returns a new image spec with an additional list of apt packages that will be executed during the building process. |
with_commands() |
Builder that returns a new image spec with an additional list of commands that will be executed during the building process. |
with_copy() |
Builder that returns a new image spec with the source files copied to the destination directory. |
with_packages() |
Builder that returns a new image speck with additional python packages that will be installed during the building process. |
exist()
def exist()
Check if the image exists in the registry. Return True if the image exists in the registry, False otherwise. Return None if failed to check if the image exists due to the permission issue or other reasons.
force_push()
def force_push()
Builder that returns a new image spec with force push enabled.
from_env()
def from_env(
pinned_packages: typing.Optional[typing.List[str]],
kwargs,
) -> ImageSpec
Create ImageSpec with the environment’s Python version and packages pinned to the ones in the environment.
Parameter | Type |
---|---|
pinned_packages |
typing.Optional[typing.List[str]] |
kwargs |
**kwargs |
image_name()
def image_name()
Full image name with tag.
is_container()
def is_container()
Check if the current container image in the pod is built from current image spec. :return: True if the current container image in the pod is built from current image spec, False otherwise.
with_apt_packages()
def with_apt_packages(
apt_packages: typing.Union[str, typing.List[str]],
) -> ImageSpec
Builder that returns a new image spec with an additional list of apt packages that will be executed during the building process.
Parameter | Type |
---|---|
apt_packages |
typing.Union[str, typing.List[str]] |
with_commands()
def with_commands(
commands: typing.Union[str, typing.List[str]],
) -> ImageSpec
Builder that returns a new image spec with an additional list of commands that will be executed during the building process.
Parameter | Type |
---|---|
commands |
typing.Union[str, typing.List[str]] |
with_copy()
def with_copy(
src: typing.Union[str, typing.List[str]],
) -> ImageSpec
Builder that returns a new image spec with the source files copied to the destination directory.
Parameter | Type |
---|---|
src |
typing.Union[str, typing.List[str]] |
with_packages()
def with_packages(
packages: typing.Union[str, typing.List[str]],
) -> ImageSpec
Builder that returns a new image speck with additional python packages that will be installed during the building process.
Parameter | Type |
---|---|
packages |
typing.Union[str, typing.List[str]] |
Properties
Property | Type | Description |
---|---|---|
tag |
Calculate a hash from the image spec. The hash will be the tag of the image. We will also read the content of the requirement file and the source root to calculate the hash. Therefore, it will generate different hash if new dependencies are added or the source code is changed. Keep in mind the fields source_root and copy may be changed by update_image_spec_copy_handling, so when you call this property in relation to that function matter will change the output. |
union.LaunchPlan
Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the
:std:ref:core concepts <flyte:divedeep-launchplans>
if you are unfamiliar with them.
Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow
.. code-block:: python
@workflow
def wf(a: int, c: str) -> str:
...
Create the default launch plan with
.. code-block:: python
LaunchPlan.get_or_create(workflow=my_wf)
If you specify additional parameters, you’ll also have to give the launch plan a unique name. Default and fixed inputs can be expressed as Python native values like so:
.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # fixed_and_default_start :end-before: # fixed_and_default_end :language: python :dedent: 4
Additionally, a launch plan can be configured to run on a schedule and emit notifications.
Please see the relevant Schedule and Notification objects as well.
To configure the remaining parameters, you’ll need to import the relevant model objects as well.
.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # schedule_start :end-before: # schedule_end :language: python :dedent: 4
.. code-block:: python
from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfig
Then use as follows
.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # auth_role_start :end-before: # auth_role_end :language: python :dedent: 4
class LaunchPlan(
name: str,
workflow: _annotated_workflow.WorkflowBase,
parameters: _interface_models.ParameterMap,
fixed_inputs: _literal_models.LiteralMap,
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
)
Parameter | Type |
---|---|
name |
str |
workflow |
_annotated_workflow.WorkflowBase |
parameters |
_interface_models.ParameterMap |
fixed_inputs |
_literal_models.LiteralMap |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
Methods
Method | Description |
---|---|
clone_with() |
|
construct_node_metadata() |
|
create() |
|
get_default_launch_plan() |
Users should probably call the get_or_create function defined below instead. |
get_or_create() |
This function offers a friendlier interface for creating launch plans. |
clone_with()
def clone_with(
name: str,
parameters: Optional[_interface_models.ParameterMap],
fixed_inputs: Optional[_literal_models.LiteralMap],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
) -> LaunchPlan
Parameter | Type |
---|---|
name |
str |
parameters |
Optional[_interface_models.ParameterMap] |
fixed_inputs |
Optional[_literal_models.LiteralMap] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
construct_node_metadata()
def construct_node_metadata()
create()
def create(
name: str,
workflow: _annotated_workflow.WorkflowBase,
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
) -> LaunchPlan
Parameter | Type |
---|---|
name |
str |
workflow |
_annotated_workflow.WorkflowBase |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
get_default_launch_plan()
def get_default_launch_plan(
ctx: FlyteContext,
workflow: _annotated_workflow.WorkflowBase,
) -> LaunchPlan
Users should probably call the get_or_create function defined below instead. A default launch plan is the one that will just pick up whatever default values are defined in the workflow function signature (if any) and use the default auth information supplied during serialization, with no notifications or schedules.
Parameter | Type |
---|---|
ctx |
FlyteContext |
workflow |
_annotated_workflow.WorkflowBase |
get_or_create()
def get_or_create(
workflow: _annotated_workflow.WorkflowBase,
name: Optional[str],
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
) -> LaunchPlan
This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it will be used. If creating the default launch plan, none of the other arguments may be specified.
The resulting launch plan is also cached and if called again with the same name, the cached version is returned
Parameter | Type |
---|---|
workflow |
_annotated_workflow.WorkflowBase |
name |
Optional[str] |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
Properties
Property | Type | Description |
---|---|---|
annotations |
||
fixed_inputs |
||
interface |
||
labels |
||
max_parallelism |
||
name |
||
notifications |
||
overwrite_cache |
||
parameters |
||
python_interface |
||
raw_output_data_config |
||
saved_inputs |
||
schedule |
||
security_context |
||
should_auto_activate |
||
trigger |
||
workflow |
union.PodTemplate
Custom PodTemplate specification for a Task.
class PodTemplate(
pod_spec: typing.Optional[ForwardRef('V1PodSpec')],
primary_container_name: str,
labels: typing.Optional[typing.Dict[str, str]],
annotations: typing.Optional[typing.Dict[str, str]],
)
Parameter | Type |
---|---|
pod_spec |
typing.Optional[ForwardRef('V1PodSpec')] |
primary_container_name |
str |
labels |
typing.Optional[typing.Dict[str, str]] |
annotations |
typing.Optional[typing.Dict[str, str]] |
union.Resources
This class is used to specify both resource requests and resource limits.
.. code-block:: python
Resources(cpu="1", mem="2048") # This is 1 CPU and 2 KB of memory
Resources(cpu="100m", mem="2Gi") # This is 1/10th of a CPU and 2 gigabytes of memory
Resources(cpu=0.5, mem=1024) # This is 500m CPU and 1 KB of memory
# For Kubernetes-based tasks, pods use ephemeral local storage for scratch space, caching, and for logs.
# This allocates 1Gi of such local storage.
Resources(ephemeral_storage="1Gi")
When used together with @task(resources=)
, you a specific the request and limits with one object.
When the value is set to a tuple or list, the first value is the request and the
second value is the limit. If the value is a single value, then both the requests and limit is
set to that value. For example, the Resource(cpu=("1", "2"), mem=1024)
will set the cpu request to 1, cpu limit to 2,
mem limit and request to 1024.
.. note::
Persistent storage is not currently supported on the Flyte backend.
Please see the :std:ref:User Guide <cookbook:customizing task resources>
for detailed examples.
Also refer to the K8s conventions. <https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes>
__
class Resources(
cpu: typing.Union[str, int, float, list, tuple, NoneType],
mem: typing.Union[str, int, list, tuple, NoneType],
gpu: typing.Union[str, int, list, tuple, NoneType],
ephemeral_storage: typing.Union[str, int, NoneType],
)
Parameter | Type |
---|---|
cpu |
typing.Union[str, int, float, list, tuple, NoneType] |
mem |
typing.Union[str, int, list, tuple, NoneType] |
gpu |
typing.Union[str, int, list, tuple, NoneType] |
ephemeral_storage |
typing.Union[str, int, NoneType] |
Methods
Method | Description |
---|---|
from_dict() |
|
from_json() |
|
to_dict() |
|
to_json() |
from_dict()
def from_dict(
d,
dialect,
)
Parameter | Type |
---|---|
d |
|
dialect |
from_json()
def from_json(
data: typing.Union[str, bytes, bytearray],
decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
from_dict_kwargs: typing.Any,
) -> ~T
Parameter | Type |
---|---|
data |
typing.Union[str, bytes, bytearray] |
decoder |
collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]] |
from_dict_kwargs |
typing.Any |
to_dict()
def to_dict()
to_json()
def to_json(
encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
to_dict_kwargs: typing.Any,
) -> typing.Union[str, bytes, bytearray]
Parameter | Type |
---|---|
encoder |
collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]] |
to_dict_kwargs |
typing.Any |
union.Secret
See :std:ref:cookbook:secrets
for usage examples.
class Secret(
group: typing.Optional[str],
key: typing.Optional[str],
group_version: typing.Optional[str],
mount_requirement: <enum 'MountType'>,
env_var: typing.Optional[str],
)
Parameter | Type |
---|---|
group |
typing.Optional[str] |
key |
typing.Optional[str] |
group_version |
typing.Optional[str] |
mount_requirement |
<enum 'MountType'> |
env_var |
typing.Optional[str] |
Methods
Method | Description |
---|---|
from_flyte_idl() |
|
serialize_to_string() |
|
short_string() |
:rtype: Text. |
to_flyte_idl() |
|
verbose_string() |
:rtype: Text. |
from_flyte_idl()
def from_flyte_idl(
pb2_object: flyteidl.core.security_pb2.Secret,
) -> Secret
Parameter | Type |
---|---|
pb2_object |
flyteidl.core.security_pb2.Secret |
serialize_to_string()
def serialize_to_string()
short_string()
def short_string()
:rtype: Text
to_flyte_idl()
def to_flyte_idl()
verbose_string()
def verbose_string()
:rtype: Text
Properties
Property | Type | Description |
---|---|---|
is_empty |
union.StructuredDataset
This is the user facing StructuredDataset class. Please don’t confuse it with the literals.StructuredDataset class (that is just a model, a Python class representation of the protobuf).
class StructuredDataset(
dataframe: typing.Optional[typing.Any],
uri: typing.Optional[str],
metadata: typing.Optional[literals.StructuredDatasetMetadata],
kwargs,
)
Parameter | Type |
---|---|
dataframe |
typing.Optional[typing.Any] |
uri |
typing.Optional[str] |
metadata |
typing.Optional[literals.StructuredDatasetMetadata] |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
all() |
|
column_names() |
|
columns() |
|
deserialize_structured_dataset() |
|
from_dict() |
|
from_json() |
|
iter() |
|
open() |
|
serialize_structured_dataset() |
|
set_literal() |
A public wrapper method to set the StructuredDataset Literal. |
to_dict() |
|
to_json() |
all()
def all()
column_names()
def column_names()
columns()
def columns()
deserialize_structured_dataset()
def deserialize_structured_dataset(
args,
kwargs,
)
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
from_dict()
def from_dict(
d,
dialect,
)
Parameter | Type |
---|---|
d |
|
dialect |
from_json()
def from_json(
data: typing.Union[str, bytes, bytearray],
decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
from_dict_kwargs: typing.Any,
) -> ~T
Parameter | Type |
---|---|
data |
typing.Union[str, bytes, bytearray] |
decoder |
collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]] |
from_dict_kwargs |
typing.Any |
iter()
def iter()
open()
def open(
dataframe_type: Type[DF],
)
Parameter | Type |
---|---|
dataframe_type |
Type[DF] |
serialize_structured_dataset()
def serialize_structured_dataset(
args,
kwargs,
)
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
set_literal()
def set_literal(
ctx: FlyteContext,
expected: LiteralType,
)
A public wrapper method to set the StructuredDataset Literal.
This method provides external access to the internal _set_literal method.
Parameter | Type |
---|---|
ctx |
FlyteContext |
expected |
LiteralType |
to_dict()
def to_dict()
to_json()
def to_json(
encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
to_dict_kwargs: typing.Any,
) -> typing.Union[str, bytes, bytearray]
Parameter | Type |
---|---|
encoder |
collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]] |
to_dict_kwargs |
typing.Any |
Properties
Property | Type | Description |
---|---|---|
dataframe |
||
literal |
||
metadata |
union.UnionRemote
Main entrypoint for programmatically accessing a Flyte remote backend.
The term ‘remote’ is synonymous with ‘backend’ or ‘deployment’ and refers to a hosted instance of the Flyte platform, which comes with a Flyte Admin server on some known URI.
class UnionRemote(
config: typing.Optional[Union[Config, str]],
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: typing.Optional[bool],
kwargs,
)
Initialize a FlyteRemote object.
:type kwargs: All arguments that can be passed to create the SynchronousFlyteClient. These are usually grpc parameters, if you want to customize credentials, ssl handling etc.
Parameter | Type |
---|---|
config |
typing.Optional[Union[Config, str]] |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
typing.Optional[bool] |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
activate_launchplan() |
Given a launchplan, activate it, all previous versions are deactivated. |
approve() |
. |
async_channel() |
|
auto() |
|
create_artifact() |
Create an artifact in FlyteAdmin. |
deploy_app() |
Deploy an application. |
download() |
Download the data to the specified location. |
execute() |
Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity. |
execute_local_launch_plan() |
Execute a locally defined LaunchPlan . |
execute_local_task() |
Execute a @task-decorated function or TaskTemplate task. |
execute_local_workflow() |
Execute an @workflow decorated function. |
execute_reference_launch_plan() |
Execute a ReferenceLaunchPlan. |
execute_reference_task() |
Execute a ReferenceTask. |
execute_reference_workflow() |
Execute a ReferenceWorkflow. |
execute_remote_task_lp() |
Execute a FlyteTask, or FlyteLaunchplan. |
execute_remote_wf() |
Execute a FlyteWorkflow. |
fast_package() |
Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location. |
fast_register_workflow() |
Use this method to register a workflow with zip mode. |
fetch_active_launchplan() |
Returns the active version of the launch plan if it exists or returns None. |
fetch_execution() |
Fetch a workflow execution entity from flyte admin. |
fetch_launch_plan() |
Fetch a launchplan entity from flyte admin. |
fetch_task() |
Fetch a task entity from flyte admin. |
fetch_task_lazy() |
Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily. |
fetch_workflow() |
Fetch a workflow entity from flyte admin. |
fetch_workflow_lazy() |
Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily. |
find_launch_plan() |
|
find_launch_plan_for_node() |
|
for_endpoint() |
|
for_sandbox() |
|
from_api_key() |
Call this if you want to directly instantiate a UnionRemote from an API key. |
generate_console_http_domain() |
This should generate the domain where console is hosted. |
generate_console_url() |
Generate a UnionAI console URL for the given Flyte remote endpoint. |
get() |
General function that works with flyte tiny urls. |
get_artifact() |
Get the specified artifact. |
get_domains() |
Lists registered domains from flyte admin. |
get_execution_metrics() |
Get the metrics for a given execution. |
get_extra_headers_for_protocol() |
|
launch_backfill() |
Creates and launches a backfill workflow for the given launchplan. |
list_projects() |
Lists registered projects from flyte admin. |
list_signals() |
. |
list_tasks_by_version() |
|
raw_register() |
Raw register method, can be used to register control plane entities. |
recent_executions() |
|
register_launch_plan() |
Register a given launchplan, possibly applying overrides from the provided options. |
register_script() |
Use this method to register a workflow via script mode. |
register_task() |
Register a qualified task (PythonTask) with Remote. |
register_workflow() |
Use this method to register a workflow. |
reject() |
. |
remote_context() |
Context manager with remote-specific configuration. |
search_artifacts() |
|
set_input() |
. |
set_signal() |
. |
stop_app() |
Stop an application. |
stream_execution_events() |
Stream execution events from the given tenant. |
sync() |
This function was previously a singledispatchmethod. |
sync_execution() |
Sync a FlyteWorkflowExecution object with its corresponding remote state. |
sync_node_execution() |
Get data backing a node execution. |
sync_task_execution() |
Sync a FlyteTaskExecution object with its corresponding remote state. |
terminate() |
Terminate a workflow execution. |
upload_file() |
Function will use remote’s client to hash and then upload the file using Admin’s data proxy service. |
wait() |
Wait for an execution to finish. |
activate_launchplan()
def activate_launchplan(
ident: Identifier,
)
Given a launchplan, activate it, all previous versions are deactivated.
Parameter | Type |
---|---|
ident |
Identifier |
approve()
def approve(
signal_id: str,
execution_name: str,
project: str,
domain: str,
)
Parameter | Type |
---|---|
signal_id |
str |
execution_name |
str |
project |
str |
domain |
str |
async_channel()
def async_channel()
auto()
def auto(
config_file: typing.Union[str, ConfigFile],
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
) -> 'FlyteRemote'
Parameter | Type |
---|---|
config_file |
typing.Union[str, ConfigFile] |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
bool |
kwargs |
**kwargs |
create_artifact()
def create_artifact(
artifact: Artifact,
) -> n: The artifact as persisted in the service.
Create an artifact in FlyteAdmin.
Parameter | Type |
---|---|
artifact |
Artifact |
deploy_app()
def deploy_app(
app: App,
project: Optional[str],
domain: Optional[str],
) -> n: The App IDL for the deployed application.
Deploy an application.
Parameter | Type |
---|---|
app |
App |
project |
Optional[str] |
domain |
Optional[str] |
download()
def download(
data: typing.Union[LiteralsResolver, Literal, LiteralMap],
download_to: str,
recursive: bool,
)
Download the data to the specified location. If the data is a LiteralsResolver, LiteralMap and if recursive is specified, then all file like objects will be recursively downloaded (e.g. FlyteFile/Dir (blob), StructuredDataset etc).
Note: That it will use your sessions credentials to access the remote location. For sandbox, this should be automatically configured, assuming you are running sandbox locally. For other environments, you will need to configure your credentials appropriately.
Parameter | Type |
---|---|
data |
typing.Union[LiteralsResolver, Literal, LiteralMap] |
download_to |
str |
recursive |
bool |
execute()
def execute(
entity: typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity],
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
) -> FlyteWorkflowExecution
Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity.
This method supports:
Flyte{Task, Workflow, LaunchPlan}
remote module objects.@task
-decorated functions andTaskTemplate
tasks.@workflow
-decorated functions.LaunchPlan
objects.
For local entities, this code will attempt to find the entity first, and if missing, will compile and register the object.
Not all arguments are relevant in all circumstances. For example, there’s no reason to use the serialization settings for entities that have already been registered on Admin.
Parameter | Type |
---|---|
entity |
typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity] |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
name |
str |
version |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
image_config |
typing.Optional[ImageConfig] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_local_launch_plan()
def execute_local_launch_plan(
entity: LaunchPlan,
inputs: typing.Dict[str, typing.Any],
version: str,
project: typing.Optional[str],
domain: typing.Optional[str],
name: typing.Optional[str],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
) -> n: FlyteWorkflowExecution object
Execute a locally defined LaunchPlan
.
Parameter | Type |
---|---|
entity |
LaunchPlan |
inputs |
typing.Dict[str, typing.Any] |
version |
str |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
name |
typing.Optional[str] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_local_task()
def execute_local_task(
entity: PythonTask,
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
options: typing.Optional[Options],
serialization_settings: typing.Optional[SerializationSettings],
) -> n: FlyteWorkflowExecution object.
Execute a @task-decorated function or TaskTemplate task.
Parameter | Type |
---|---|
entity |
PythonTask |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
name |
str |
version |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
image_config |
typing.Optional[ImageConfig] |
wait |
bool |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
options |
typing.Optional[Options] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_local_workflow()
def execute_local_workflow(
entity: WorkflowBase,
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
options: typing.Optional[Options],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
) -> n: FlyteWorkflowExecution object
Execute an @workflow decorated function.
Parameter | Type |
---|---|
entity |
WorkflowBase |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
name |
str |
version |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
image_config |
typing.Optional[ImageConfig] |
options |
typing.Optional[Options] |
wait |
bool |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_reference_launch_plan()
def execute_reference_launch_plan(
entity: ReferenceLaunchPlan,
inputs: typing.Dict[str, typing.Any],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecution
Execute a ReferenceLaunchPlan.
Parameter | Type |
---|---|
entity |
ReferenceLaunchPlan |
inputs |
typing.Dict[str, typing.Any] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_reference_task()
def execute_reference_task(
entity: ReferenceTask,
inputs: typing.Dict[str, typing.Any],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecution
Execute a ReferenceTask.
Parameter | Type |
---|---|
entity |
ReferenceTask |
inputs |
typing.Dict[str, typing.Any] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_reference_workflow()
def execute_reference_workflow(
entity: ReferenceWorkflow,
inputs: typing.Dict[str, typing.Any],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecution
Execute a ReferenceWorkflow.
Parameter | Type |
---|---|
entity |
ReferenceWorkflow |
inputs |
typing.Dict[str, typing.Any] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_remote_task_lp()
def execute_remote_task_lp(
entity: typing.Union[FlyteTask, FlyteLaunchPlan],
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecution
Execute a FlyteTask, or FlyteLaunchplan.
NOTE: the name and version arguments are currently not used and only there consistency in the function signature
Parameter | Type |
---|---|
entity |
typing.Union[FlyteTask, FlyteLaunchPlan] |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_remote_wf()
def execute_remote_wf(
entity: FlyteWorkflow,
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecution
Execute a FlyteWorkflow.
NOTE: the name and version arguments are currently not used and only there consistency in the function signature
Parameter | Type |
---|---|
entity |
FlyteWorkflow |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
fast_package()
def fast_package(
root: os.PathLike,
deref_symlinks: bool,
output: str,
options: typing.Optional[FastPackageOptions],
) -> n: md5_bytes, url
Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location
Parameter | Type |
---|---|
root |
os.PathLike |
deref_symlinks |
bool |
output |
str |
options |
typing.Optional[FastPackageOptions] |
fast_register_workflow()
def fast_register_workflow(
entity: WorkflowBase,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
default_launch_plan: typing.Optional[bool],
options: typing.Optional[Options],
fast_package_options: typing.Optional[FastPackageOptions],
) -> n:
Use this method to register a workflow with zip mode.
Parameter | Type |
---|---|
entity |
WorkflowBase |
serialization_settings |
typing.Optional[SerializationSettings] |
version |
typing.Optional[str] |
default_launch_plan |
typing.Optional[bool] |
options |
typing.Optional[Options] |
fast_package_options |
typing.Optional[FastPackageOptions] |
fetch_active_launchplan()
def fetch_active_launchplan(
project: str,
domain: str,
name: str,
) -> typing.Optional[FlyteLaunchPlan]
Returns the active version of the launch plan if it exists or returns None
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
fetch_execution()
def fetch_execution(
project: str,
domain: str,
name: str,
) -> FlyteWorkflowExecution
Fetch a workflow execution entity from flyte admin.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
fetch_launch_plan()
def fetch_launch_plan(
project: str,
domain: str,
name: str,
version: str,
) -> FlyteLaunchPlan
Fetch a launchplan entity from flyte admin.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_task()
def fetch_task(
project: str,
domain: str,
name: str,
version: str,
) -> FlyteTask
Fetch a task entity from flyte admin.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_task_lazy()
def fetch_task_lazy(
project: str,
domain: str,
name: str,
version: str,
) -> LazyEntity
Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_workflow()
def fetch_workflow(
project: str,
domain: str,
name: str,
version: str,
) -> FlyteWorkflow
Fetch a workflow entity from flyte admin.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_workflow_lazy()
def fetch_workflow_lazy(
project: str,
domain: str,
name: str,
version: str,
) -> LazyEntity[FlyteWorkflow]
Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
find_launch_plan()
def find_launch_plan(
lp_ref: id_models,
node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
)
Parameter | Type |
---|---|
lp_ref |
id_models |
node_launch_plans |
Dict[id_models, launch_plan_models.LaunchPlanSpec] |
find_launch_plan_for_node()
def find_launch_plan_for_node(
node: Node,
node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
)
Parameter | Type |
---|---|
node |
Node |
node_launch_plans |
Dict[id_models, launch_plan_models.LaunchPlanSpec] |
for_endpoint()
def for_endpoint(
endpoint: str,
insecure: bool,
data_config: typing.Optional[DataConfig],
config_file: typing.Union[str, ConfigFile],
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
) -> 'FlyteRemote'
Parameter | Type |
---|---|
endpoint |
str |
insecure |
bool |
data_config |
typing.Optional[DataConfig] |
config_file |
typing.Union[str, ConfigFile] |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
bool |
kwargs |
**kwargs |
for_sandbox()
def for_sandbox(
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
) -> 'FlyteRemote'
Parameter | Type |
---|---|
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
bool |
kwargs |
**kwargs |
from_api_key()
def from_api_key(
api_key: str,
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
kwargs,
) -> 'UnionRemote'
Call this if you want to directly instantiate a UnionRemote from an API key
Parameter | Type |
---|---|
api_key |
str |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
kwargs |
**kwargs |
generate_console_http_domain()
def generate_console_http_domain()
This should generate the domain where console is hosted.
:return:
generate_console_url()
def generate_console_url(
entity: typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, FlyteLaunchPlan, Artifact],
)
Generate a UnionAI console URL for the given Flyte remote endpoint. It will also handle Union AI specific entities like Artifacts.
This will automatically determine if this is an execution or an entity and change the type automatically.
Parameter | Type |
---|---|
entity |
typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, FlyteLaunchPlan, Artifact] |
get()
def get(
uri: typing.Optional[str],
) -> typing.Optional[typing.Union[LiteralsResolver, Literal, bytes]]
General function that works with flyte tiny urls. This can return outputs (in the form of LiteralsResolver, or individual Literals for singular requests), or HTML if passed a deck link, or bytes containing HTML, if ipython is not available locally.
Parameter | Type |
---|---|
uri |
typing.Optional[str] |
get_artifact()
def get_artifact(
uri: typing.Optional[str],
artifact_key: typing.Optional[art_id.ArtifactKey],
artifact_id: typing.Optional[art_id.ArtifactID],
query: typing.Optional[typing.Union[art_id.ArtifactQuery, ArtifactQuery]],
get_details: bool,
) -> n: The artifact as persisted in the service.
Get the specified artifact.
Parameter | Type |
---|---|
uri |
typing.Optional[str] |
artifact_key |
typing.Optional[art_id.ArtifactKey] |
artifact_id |
typing.Optional[art_id.ArtifactID] |
query |
typing.Optional[typing.Union[art_id.ArtifactQuery, ArtifactQuery]] |
get_details |
bool |
get_domains()
def get_domains()
Lists registered domains from flyte admin.
:returns: typing.List[flytekit.models.domain.Domain]
get_execution_metrics()
def get_execution_metrics(
id: WorkflowExecutionIdentifier,
depth: int,
) -> FlyteExecutionSpan
Get the metrics for a given execution.
Parameter | Type |
---|---|
id |
WorkflowExecutionIdentifier |
depth |
int |
get_extra_headers_for_protocol()
def get_extra_headers_for_protocol(
native_url,
)
Parameter | Type |
---|---|
native_url |
launch_backfill()
def launch_backfill(
project: str,
domain: str,
from_date: datetime,
to_date: datetime,
launchplan: str,
launchplan_version: str,
execution_name: str,
version: str,
dry_run: bool,
execute: bool,
parallel: bool,
failure_policy: typing.Optional[WorkflowFailurePolicy],
overwrite_cache: typing.Optional[bool],
) -> n: In case of dry-run, return WorkflowBase, else if no_execute return FlyteWorkflow else in the default
Creates and launches a backfill workflow for the given launchplan. If launchplan version is not specified, then the latest launchplan is retrieved. The from_date is exclusive and end_date is inclusive and backfill run for all instances in between. :: -> (start_date - exclusive, end_date inclusive)
If dry_run is specified, the workflow is created and returned. If execute==False is specified then the workflow is created and registered. In the last case, the workflow is created, registered and executed.
The parallel
flag can be used to generate a workflow where all launchplans can be run in parallel. Default
is that execute backfill is run sequentially
Parameter | Type |
---|---|
project |
str |
domain |
str |
from_date |
datetime |
to_date |
datetime |
launchplan |
str |
launchplan_version |
str |
execution_name |
str |
version |
str |
dry_run |
bool |
execute |
bool |
parallel |
bool |
failure_policy |
typing.Optional[WorkflowFailurePolicy] |
overwrite_cache |
typing.Optional[bool] |
list_projects()
def list_projects(
limit: typing.Optional[int],
filters: typing.Optional[typing.List[filter_models.Filter]],
sort_by: typing.Optional[admin_common_models.Sort],
) -> typing.List[Project]
Lists registered projects from flyte admin.
Parameter | Type |
---|---|
limit |
typing.Optional[int] |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
sort_by |
typing.Optional[admin_common_models.Sort] |
list_signals()
def list_signals(
execution_name: str,
project: typing.Optional[str],
domain: typing.Optional[str],
limit: int,
filters: typing.Optional[typing.List[filter_models.Filter]],
) -> typing.List[Signal]
Parameter | Type |
---|---|
execution_name |
str |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
limit |
int |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
list_tasks_by_version()
def list_tasks_by_version(
version: str,
project: typing.Optional[str],
domain: typing.Optional[str],
limit: typing.Optional[int],
) -> typing.List[FlyteTask]
Parameter | Type |
---|---|
version |
str |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
limit |
typing.Optional[int] |
raw_register()
def raw_register(
cp_entity: FlyteControlPlaneEntity,
settings: SerializationSettings,
version: str,
create_default_launchplan: bool,
options: Options,
og_entity: FlyteLocalEntity,
) -> n: Identifier of the created entity
Raw register method, can be used to register control plane entities. Usually if you have a Flyte Entity like a WorkflowBase, Task, LaunchPlan then use other methods. This should be used only if you have already serialized entities
Parameter | Type |
---|---|
cp_entity |
FlyteControlPlaneEntity |
settings |
SerializationSettings |
version |
str |
create_default_launchplan |
bool |
options |
Options |
og_entity |
FlyteLocalEntity |
recent_executions()
def recent_executions(
project: typing.Optional[str],
domain: typing.Optional[str],
limit: typing.Optional[int],
filters: typing.Optional[typing.List[filter_models.Filter]],
) -> typing.List[FlyteWorkflowExecution]
Parameter | Type |
---|---|
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
limit |
typing.Optional[int] |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
register_launch_plan()
def register_launch_plan(
entity: LaunchPlan,
version: typing.Optional[str],
project: typing.Optional[str],
domain: typing.Optional[str],
options: typing.Optional[Options],
serialization_settings: typing.Optional[SerializationSettings],
) -> FlyteLaunchPlan
Register a given launchplan, possibly applying overrides from the provided options. If the underlying workflow is not already registered, it, along with any underlying entities, will also be registered. If the underlying workflow does exist (with the given project/domain/version), then only the launchplan will be registered.
Parameter | Type |
---|---|
entity |
LaunchPlan |
version |
typing.Optional[str] |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
options |
typing.Optional[Options] |
serialization_settings |
typing.Optional[SerializationSettings] |
register_script()
def register_script(
entity: typing.Union[WorkflowBase, PythonTask, LaunchPlan],
image_config: typing.Optional[ImageConfig],
version: typing.Optional[str],
project: typing.Optional[str],
domain: typing.Optional[str],
destination_dir: str,
copy_all: bool,
default_launch_plan: bool,
options: typing.Optional[Options],
source_path: typing.Optional[str],
module_name: typing.Optional[str],
envs: typing.Optional[typing.Dict[str, str]],
fast_package_options: typing.Optional[FastPackageOptions],
) -> n:
Use this method to register a workflow via script mode.
Parameter | Type |
---|---|
entity |
typing.Union[WorkflowBase, PythonTask, LaunchPlan] |
image_config |
typing.Optional[ImageConfig] |
version |
typing.Optional[str] |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
destination_dir |
str |
copy_all |
bool |
default_launch_plan |
bool |
options |
typing.Optional[Options] |
source_path |
typing.Optional[str] |
module_name |
typing.Optional[str] |
envs |
typing.Optional[typing.Dict[str, str]] |
fast_package_options |
typing.Optional[FastPackageOptions] |
register_task()
def register_task(
entity: PythonTask,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
) -> n:
Register a qualified task (PythonTask) with Remote For any conflicting parameters method arguments are regarded as overrides
Parameter | Type |
---|---|
entity |
PythonTask |
serialization_settings |
typing.Optional[SerializationSettings] |
version |
typing.Optional[str] |
register_workflow()
def register_workflow(
entity: WorkflowBase,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
default_launch_plan: typing.Optional[bool],
options: typing.Optional[Options],
) -> n:
Use this method to register a workflow.
Parameter | Type |
---|---|
entity |
WorkflowBase |
serialization_settings |
typing.Optional[SerializationSettings] |
version |
typing.Optional[str] |
default_launch_plan |
typing.Optional[bool] |
options |
typing.Optional[Options] |
reject()
def reject(
signal_id: str,
execution_name: str,
project: str,
domain: str,
)
Parameter | Type |
---|---|
signal_id |
str |
execution_name |
str |
project |
str |
domain |
str |
remote_context()
def remote_context()
Context manager with remote-specific configuration.
search_artifacts()
def search_artifacts(
project: typing.Optional[str],
domain: typing.Optional[str],
name: typing.Optional[str],
artifact_key: typing.Optional[art_id.ArtifactKey],
query: typing.Optional[ArtifactQuery],
partitions: typing.Optional[Union[Partitions, typing.Dict[str, str]]],
time_partition: typing.Optional[Union[datetime.datetime, TimePartition]],
group_by_key: bool,
limit: int,
) -> typing.List[Artifact]
Parameter | Type |
---|---|
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
name |
typing.Optional[str] |
artifact_key |
typing.Optional[art_id.ArtifactKey] |
query |
typing.Optional[ArtifactQuery] |
partitions |
typing.Optional[Union[Partitions, typing.Dict[str, str]]] |
time_partition |
typing.Optional[Union[datetime.datetime, TimePartition]] |
group_by_key |
bool |
limit |
int |
set_input()
def set_input(
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project,
domain,
python_type,
literal_type,
)
Parameter | Type |
---|---|
signal_id |
str |
execution_name |
str |
value |
typing.Union[literal_models.Literal, typing.Any] |
project |
|
domain |
|
python_type |
|
literal_type |
set_signal()
def set_signal(
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project: typing.Optional[str],
domain: typing.Optional[str],
python_type: typing.Optional[typing.Type],
literal_type: typing.Optional[type_models.LiteralType],
)
Parameter | Type |
---|---|
signal_id |
str |
execution_name |
str |
value |
typing.Union[literal_models.Literal, typing.Any] |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
python_type |
typing.Optional[typing.Type] |
literal_type |
typing.Optional[type_models.LiteralType] |
stop_app()
def stop_app(
name: str,
project: Optional[str],
domain: Optional[str],
) -> n: The App IDL for the stopped application.
Stop an application.
Parameter | Type |
---|---|
name |
str |
project |
Optional[str] |
domain |
Optional[str] |
stream_execution_events()
def stream_execution_events(
event_count: Optional[int],
include_workflow_executions: bool,
include_task_executions: bool,
include_node_executions: bool,
) -> AsyncGenerator[Union[CloudEventWorkflowExecution, CloudEventNodeExecution, CloudEventTaskExecution], None]
Stream execution events from the given tenant. This is a generator that yields events as they are received.
Events are guaranteed to be delivered at least once, and clients must implement handling for potentially out-of-order event processing. Events will be retransmitted until acknowledged, with acknowledgment occurring automatically upon normal return from the caller. Note: if an exception is raised during event processing, the acknowledgment will not occur, and the event will be redelivered in a subsequent transmission.
Parameter | Type |
---|---|
event_count |
Optional[int] |
include_workflow_executions |
bool |
include_task_executions |
bool |
include_node_executions |
bool |
sync()
def sync(
execution: FlyteWorkflowExecution,
entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
sync_nodes: bool,
) -> n: Returns the same execution object, but with additional information pulled in.
This function was previously a singledispatchmethod. We’ve removed that but this function remains so that we don’t break people.
Parameter | Type |
---|---|
execution |
FlyteWorkflowExecution |
entity_definition |
typing.Union[FlyteWorkflow, FlyteTask] |
sync_nodes |
bool |
sync_execution()
def sync_execution(
execution: FlyteWorkflowExecution,
entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
sync_nodes: bool,
) -> FlyteWorkflowExecution
Sync a FlyteWorkflowExecution object with its corresponding remote state.
Parameter | Type |
---|---|
execution |
FlyteWorkflowExecution |
entity_definition |
typing.Union[FlyteWorkflow, FlyteTask] |
sync_nodes |
bool |
sync_node_execution()
def sync_node_execution(
execution: FlyteNodeExecution,
node_mapping: typing.Dict[str, FlyteNode],
) -> FlyteNodeExecution
Get data backing a node execution. These FlyteNodeExecution objects should’ve come from Admin with the model fields already populated correctly. For purposes of the remote experience, we’d like to supplement the object with some additional fields:
- inputs/outputs
- task/workflow executions, and/or underlying node executions in the case of parent nodes
- TypedInterface (remote wrapper type)
A node can have several different types of executions behind it. That is, the node could’ve run (perhaps multiple times because of retries):
- A task
- A static subworkflow
- A dynamic subworkflow (which in turn may have run additional tasks, subwfs, and/or launch plans)
- A launch plan
The data model is complicated, so ascertaining which of these happened is a bit tricky. That logic is encapsulated in this function.
Parameter | Type |
---|---|
execution |
FlyteNodeExecution |
node_mapping |
typing.Dict[str, FlyteNode] |
sync_task_execution()
def sync_task_execution(
execution: FlyteTaskExecution,
entity_interface: typing.Optional[TypedInterface],
) -> FlyteTaskExecution
Sync a FlyteTaskExecution object with its corresponding remote state.
Parameter | Type |
---|---|
execution |
FlyteTaskExecution |
entity_interface |
typing.Optional[TypedInterface] |
terminate()
def terminate(
execution: FlyteWorkflowExecution,
cause: str,
)
Terminate a workflow execution.
Parameter | Type |
---|---|
execution |
FlyteWorkflowExecution |
cause |
str |
upload_file()
def upload_file(
to_upload: pathlib.Path,
project: typing.Optional[str],
domain: typing.Optional[str],
filename_root: typing.Optional[str],
) -> n: The uploaded location.
Function will use remote’s client to hash and then upload the file using Admin’s data proxy service.
Parameter | Type |
---|---|
to_upload |
pathlib.Path |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
filename_root |
typing.Optional[str] |
wait()
def wait(
execution: FlyteWorkflowExecution,
timeout: typing.Optional[typing.Union[timedelta, int]],
poll_interval: typing.Optional[typing.Union[timedelta, int]],
sync_nodes: bool,
) -> FlyteWorkflowExecution
Wait for an execution to finish.
Parameter | Type |
---|---|
execution |
FlyteWorkflowExecution |
timeout |
typing.Optional[typing.Union[timedelta, int]] |
poll_interval |
typing.Optional[typing.Union[timedelta, int]] |
sync_nodes |
bool |
Properties
Property | Type | Description |
---|---|---|
apps_service_client |
||
artifacts_client |
||
client |
Return a SynchronousFlyteClient for additional operations. |
|
config |
Image config. |
|
context |
||
default_domain |
Default project to use when fetching or executing flyte entities. |
|
default_project |
Default project to use when fetching or executing flyte entities. |
|
file_access |
File access provider to use for offloading non-literal inputs/outputs. |
|
hooks_async_client |
||
hooks_sync_client |
||
images_client |
||
interactive_mode_enabled |
If set to True, the FlyteRemote will pickle the task/workflow. |
|
secret_client |
||
sync_channel |
Return channel from client. This channel already has the org passed in dynamically by the interceptor. |
|
users_client |
union.VersionParameters
Parameters used for version hash generation.
param func: The function to generate a version for. This is an optional parameter and can be any callable that matches the specified parameter and return types. :type func: Optional[Callable[P, FuncOut]]
class VersionParameters(
func: typing.Callable[~P, ~FuncOut],
container_image: typing.Union[str, flytekit.image_spec.image_spec.ImageSpec, NoneType],
pod_template: typing.Optional[flytekit.core.pod_template.PodTemplate],
pod_template_name: typing.Optional[str],
)
Parameter | Type |
---|---|
func |
typing.Callable[~P, ~FuncOut] |
container_image |
typing.Union[str, flytekit.image_spec.image_spec.ImageSpec, NoneType] |
pod_template |
typing.Optional[flytekit.core.pod_template.PodTemplate] |
pod_template_name |
typing.Optional[str] |