flytekit.remote.remote
This module provides the FlyteRemote
object, which is the end-user’s main starting point for interacting
with a Flyte backend in an interactive and programmatic way. This of this experience as kind of like the web UI
but in Python object form.
Directory
Classes
Class |
Description |
ArrayNodeMapTask |
Base Class for all Tasks with a Python native Interface . |
Artifact |
An Artifact is effectively just a metadata layer on top of data that exists in Flyte. |
BranchNode |
None. |
ClusterAssignment |
None. |
Config |
This the parent configuration object and holds all the underlying configuration object types. |
ConfigFile |
None. |
CopyFileDetection |
Create a collection of name/value pairs. |
CoreNode |
This class will hold all the things necessary to make an SdkNode but we won’t make one until we know things like. |
DataConfig |
Any data storage specific configuration. |
Domain |
Domains are fixed and unique at the global level, and provide an abstraction to isolate resources and feature configuration for different deployment environments. |
ExecutionClusterLabel |
None. |
ExecutionMetadata |
None. |
ExecutionSpec |
None. |
FastPackageOptions |
FastPackageOptions is used to set configuration options when packaging files. |
FastSerializationSettings |
This object hold information about settings necessary to serialize an object so that it can be fast-registered. |
FileAccessProvider |
This is the class that is available through the FlyteContext and can be used for persisting data to the remote. |
FlyteContext |
This is an internal-facing context object, that most users will not have to deal with. |
FlyteContextManager |
FlyteContextManager manages the execution context within Flytekit. |
FlyteExecutionSpan |
None. |
FlyteLaunchPlan |
A class encapsulating a remote Flyte launch plan. |
FlyteNode |
A class encapsulating a remote Flyte node. |
FlyteNodeExecution |
A class encapsulating a node execution being run on a Flyte remote backend. |
FlyteRemote |
Main entrypoint for programmatically accessing a Flyte remote backend. |
FlyteTask |
A class encapsulating a remote Flyte task. |
FlyteTaskExecution |
A class encapsulating a task execution being run on a Flyte remote backend. |
FlyteTaskNode |
A class encapsulating a task that a Flyte node needs to execute. |
FlyteWorkflow |
A class encapsulating a remote Flyte workflow. |
FlyteWorkflowExecution |
A class encapsulating a workflow execution being run on a Flyte remote backend. |
Identifier |
None. |
ImageConfig |
We recommend you to use ImageConfig. |
ImageSpec |
This class is used to specify the docker image that will be used to run the task. |
LaunchPlan |
Launch Plans are one of the core constructs of Flyte. |
LaunchPlanState |
None. |
LazyEntity |
Fetches the entity when the entity is called or when the entity is retrieved. |
Literal |
None. |
LiteralMap |
None. |
LiteralsResolver |
LiteralsResolver is a helper class meant primarily for use with the FlyteRemote experience or any other situation. |
NamedEntityIdentifier |
None. |
Node |
None. |
NodeExecutionGetDataResponse |
Currently, node, task, and workflow execution all have the same get data response. |
NodeMetadata |
None. |
NotificationList |
None. |
Options |
These are options that can be configured for a launchplan during registration or overridden during an execution. |
OrderedDict |
Dictionary that remembers insertion order. |
PickledEntity |
Represents the structure of the pickled object stored in the . |
PickledEntityMetadata |
Metadata for a pickled entity containing version information. |
Progress |
Renders an auto-updating progress bar(s). |
Project |
None. |
PythonAutoContainerTask |
A Python AutoContainer task should be used as the base for all extensions that want the user’s code to be in the. |
PythonFunctionTask |
A Python Function task should be used as the base for all extensions that have a python function. |
PythonFunctionWorkflow |
Please read :std:ref:flyte:divedeep-workflows first for a high-level understanding of what workflows are in Flyte. |
PythonTask |
Base Class for all Tasks with a Python native Interface . |
ReferenceEntity |
None. |
ReferenceLaunchPlan |
A reference launch plan serves as a pointer to a Launch Plan that already exists on your Flyte installation. |
ReferenceSpec |
None. |
ReferenceTask |
This is a reference task, the body of the function passed in through the constructor will never be used, only the. |
ReferenceWorkflow |
A reference workflow is a pointer to a workflow that already exists on your Flyte installation. |
RemoteEntity |
Helper class that provides a standard way to create an ABC using. |
ResolvedIdentifiers |
None. |
ResourceType |
None. |
SerializationSettings |
These settings are provided while serializing a workflow and task, before registration. |
Signal |
A ProtocolMessage. |
SignalIdentifier |
None. |
SignalListRequest |
A ProtocolMessage. |
SignalSetRequest |
A ProtocolMessage. |
Sort |
None. |
SynchronousFlyteClient |
This is a low-level client that users can use to make direct gRPC service calls to the control plane. |
TextColumn |
A column containing text. |
TimeElapsedColumn |
Renders time elapsed. |
TypeEngine |
Core Extensible TypeEngine of Flytekit. |
TypedInterface |
None. |
WorkflowBase |
None. |
WorkflowExecutionGetDataResponse |
Currently, node, task, and workflow execution all have the same get data response. |
WorkflowExecutionIdentifier |
None. |
WorkflowFailurePolicy |
Defines the behavior for a workflow execution in the case of an observed node execution failure. |
datetime |
datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]]). |
timedelta |
Difference between two datetime values. |
Errors
flytekit.remote.remote.ArrayNodeMapTask
Base Class for all Tasks with a Python native Interface
. This should be directly used for task types, that do
not have a python function to be executed. Otherwise refer to :py:class:flytekit.PythonFunctionTask
.
def ArrayNodeMapTask(
python_function_task: typing.Union[flytekit.core.python_function_task.PythonFunctionTask, flytekit.core.python_function_task.PythonInstanceTask, functools.partial],
concurrency: typing.Optional[int],
min_successes: typing.Optional[int],
min_success_ratio: typing.Optional[float],
bound_inputs: typing.Optional[typing.Set[str]],
bound_inputs_values: typing.Optional[typing.Dict[str, typing.Any]],
kwargs,
):
Parameter |
Type |
python_function_task |
typing.Union[flytekit.core.python_function_task.PythonFunctionTask, flytekit.core.python_function_task.PythonInstanceTask, functools.partial] |
concurrency |
typing.Optional[int] |
min_successes |
typing.Optional[int] |
min_success_ratio |
typing.Optional[float] |
bound_inputs |
typing.Optional[typing.Set[str]] |
bound_inputs_values |
typing.Optional[typing.Dict[str, typing.Any]] |
kwargs |
**kwargs |
Methods
Method |
Description |
compile() |
Generates a node that encapsulates this task in a workflow definition |
construct_node_metadata() |
This returns metadata for the parent ArrayNode, not the sub-node getting mapped over |
dispatch_execute() |
This method translates Flyte’s Type system based input values and invokes the actual call to the executor |
execute() |
This method will be invoked to execute the task |
find_lhs() |
None |
get_command() |
TODO ADD bound variables to the resolver |
get_config() |
Returns the task config as a serializable dictionary |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte |
get_type_for_input_var() |
Returns the python type for an input variable by name |
get_type_for_output_var() |
We override this method from flytekit |
local_execute() |
This function is used only in the local execution path and is responsible for calling dispatch execute |
local_execution_mode() |
None |
post_execute() |
Post execute is called after the execution has completed, with the user_params and can be used to clean-up, |
pre_execute() |
This is the method that will be invoked directly before executing the task method and before all the inputs |
prepare_target() |
Alters the underlying run_task command to modify it for map task execution and then resets it after |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution |
set_command_prefix() |
None |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Generates a node that encapsulates this task in a workflow definition.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
This returns metadata for the parent ArrayNode, not the sub-node getting mapped over
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
This method translates Flyte’s Type system based input values and invokes the actual call to the executor
This method is also invoked during runtime.
VoidPromise
is returned in the case when the task itself declares no outputs.
Literal Map
is returned when the task returns either one more outputs in the declaration. Individual outputs
may be none
DynamicJobSpec
is returned when a dynamic workflow is executed
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
This method will be invoked to execute the task.
Parameter |
Type |
kwargs |
**kwargs |
find_lhs()
get_command()
def get_command(
settings: flytekit.configuration.SerializationSettings,
):
TODO ADD bound variables to the resolver. Maybe we need a different resolver?
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_config()
def get_config(
settings: flytekit.configuration.SerializationSettings,
):
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom
defined for this task.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_container()
def get_container(
settings: flytekit.configuration.SerializationSettings,
):
Returns the container definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
):
Return additional plugin-specific custom data (if any) as a serializable dictionary.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: flytekit.configuration.SerializationSettings,
):
Returns the extended resources to allocate to the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: flytekit.configuration.SerializationSettings,
):
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
):
Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
def get_type_for_input_var(
k: str,
v: typing.Any,
):
Returns the python type for an input variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
):
We override this method from flytekit.core.base_task Task because the dispatch_execute method uses this
interface to construct outputs. Each instance of an container_array task will however produce outputs
according to the underlying run_task interface and the array plugin handler will actually create a collection
from these individual outputs as the final output value.
Parameter |
Type |
k |
str |
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
This function is used only in the local execution path and is responsible for calling dispatch execute.
Use this function when calling a task with native values (or Promises containing Flyte literals derived from
Python native values).
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
post_execute()
def post_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
rval: typing.Any,
):
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):
This is the method that will be invoked directly before executing the task method and before all the inputs
are converted. One particular case where this is useful is if the context is to be modified for the user process
to get some user space parameters. This also ensures that things like SparkSession are already correctly
setup before the type transformers are called
This should return either the same context of the mutated context
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
prepare_target()
Alters the underlying run_task command to modify it for map task execution and then resets it after.
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
set_command_prefix()
def set_command_prefix(
cmd: typing.Optional[typing.List[str]],
):
Parameter |
Type |
cmd |
typing.Optional[typing.List[str]] |
Properties
Property |
Type |
Description |
bound_inputs |
|
|
concurrency |
|
|
deck_fields |
|
|
disable_deck |
|
|
docs |
|
|
enable_deck |
|
|
environment |
|
|
execution_mode |
|
|
instantiated_in |
|
|
interface |
|
|
is_original_sub_node_interface |
|
|
lhs |
|
|
location |
|
|
metadata |
|
|
min_success_ratio |
|
|
min_successes |
|
|
name |
|
|
python_function_task |
|
|
python_interface |
|
|
security_context |
|
|
task_config |
|
|
task_type |
|
|
task_type_version |
|
|
flytekit.remote.remote.Artifact
An Artifact is effectively just a metadata layer on top of data that exists in Flyte. Most data of interest
will be the output of tasks and workflows. The other category is user uploads.
This Python class has limited purpose, as a way for users to specify that tasks/workflows create Artifacts
and the manner (i.e. name, partitions) in which they are created.
Control creation parameters at task/workflow execution time ::
@task
def t1() -> Annotated[nn.Module, Artifact(name=“my.artifact.name”)]:
…
def Artifact(
project: Optional[str],
domain: Optional[str],
name: Optional[str],
version: Optional[str],
time_partitioned: bool,
time_partition: Optional[TimePartition],
time_partition_granularity: Optional[Granularity],
partition_keys: Optional[typing.List[str]],
partitions: Optional[Union[Partitions, typing.Dict[str, str]]],
):
Parameter |
Type |
project |
Optional[str] |
domain |
Optional[str] |
name |
Optional[str] |
version |
Optional[str] |
time_partitioned |
bool |
time_partition |
Optional[TimePartition] |
time_partition_granularity |
Optional[Granularity] |
partition_keys |
Optional[typing.List[str]] |
partitions |
Optional[Union[Partitions, typing.Dict[str, str]]] |
Methods
Method |
Description |
create_from() |
This function allows users to declare partition values dynamically from the body of a task |
embed_as_query() |
This should only be called in the context of a Trigger |
query() |
None |
to_id_idl() |
Converts this object to the IDL representation |
create_from()
def create_from(
o: O,
card: Optional[SerializableToString],
args: `*args`,
kwargs,
):
This function allows users to declare partition values dynamically from the body of a task. Note that you’ll
still need to annotate your task function output with the relevant Artifact object. Below, one of the partition
values is bound to an input, and the other is set at runtime. Note that since tasks are not run at compile
time, flytekit cannot check that you’ve bound all the partition values. It’s up to you to ensure that you’ve
done so.
Pricing = Artifact(name=“pricing”, partition_keys=[“region”])
EstError = Artifact(name=“estimation_error”, partition_keys=[“dataset”], time_partitioned=True)
@task
def t1() -> Annotated[pd.DataFrame, Pricing], Annotated[float, EstError]:
df = get_pricing_results()
dt = get_time()
return Pricing.create_from(df, region=“dubai”), EstError.create_from(msq_error, dataset=“train”, time_partition=dt)
You can mix and match with the input syntax as well.
@task
def my_task() -> Annotated[pd.DataFrame, RideCountData(region=Inputs.region)]:
…
return RideCountData.create_from(df, time_partition=datetime.datetime.now())
Parameter |
Type |
o |
O |
card |
Optional[SerializableToString] |
args |
*args |
kwargs |
**kwargs |
embed_as_query()
def embed_as_query(
partition: Optional[str],
bind_to_time_partition: Optional[bool],
expr: Optional[str],
op: Optional[Op],
):
This should only be called in the context of a Trigger. The type of query this returns is different from the
query() function. This type of query is used to reference the triggering artifact, rather than running a query.
Parameter |
Type |
partition |
Optional[str] |
bind_to_time_partition |
Optional[bool] |
expr |
Optional[str] |
op |
Optional[Op] |
query()
def query(
project: Optional[str],
domain: Optional[str],
time_partition: Optional[Union[datetime.datetime, TimePartition, art_id.InputBindingData]],
partitions: Optional[Union[typing.Dict[str, str], Partitions]],
kwargs,
):
Parameter |
Type |
project |
Optional[str] |
domain |
Optional[str] |
time_partition |
Optional[Union[datetime.datetime, TimePartition, art_id.InputBindingData]] |
partitions |
Optional[Union[typing.Dict[str, str], Partitions]] |
kwargs |
**kwargs |
to_id_idl()
Converts this object to the IDL representation.
This is here instead of translator because it’s in the interface, a relatively simple proto object
that’s exposed to the user.
Properties
Property |
Type |
Description |
concrete_artifact_id |
|
|
partitions |
|
|
time_partition |
|
|
flytekit.remote.remote.BranchNode
def BranchNode(
if_else: flytekit.models.core.workflow.IfElseBlock,
):
BranchNode is a special node that alter the flow of the workflow graph. It allows the control flow to branch at
runtime based on a series of conditions that get evaluated on various parameters (e.g. inputs, primitives).
Parameter |
Type |
if_else |
flytekit.models.core.workflow.IfElseBlock |
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_objct,
):
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
if_else |
|
|
is_empty |
|
|
flytekit.remote.remote.ClusterAssignment
def ClusterAssignment(
cluster_pool,
):
Parameter |
Type |
cluster_pool |
|
Methods
from_flyte_idl()
def from_flyte_idl(
p,
):
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
cluster_pool |
|
|
is_empty |
|
|
flytekit.remote.remote.Config
This the parent configuration object and holds all the underlying configuration object types. An instance of
this object holds all the config necessary to
- Interactive session with Flyte backend
- Some parts are required for Serialization, for example Platform Config is not required
- Runtime of a task
def Config(
platform: PlatformConfig,
secrets: SecretsConfig,
stats: StatsConfig,
data_config: DataConfig,
local_sandbox_path: str,
):
Parameter |
Type |
platform |
PlatformConfig |
secrets |
SecretsConfig |
stats |
StatsConfig |
data_config |
DataConfig |
local_sandbox_path |
str |
Methods
Method |
Description |
auto() |
Automatically constructs the Config Object |
for_endpoint() |
Creates an automatic config for the given endpoint and uses the config_file or environment variable for default |
for_sandbox() |
Constructs a new Config object specifically to connect to :std:ref:deployment-deployment-sandbox |
with_params() |
None |
auto()
def auto(
config_file: typing.Union[str, ConfigFile, None],
):
Automatically constructs the Config Object. The order of precedence is as follows
- first try to find any env vars that match the config vars specified in the FLYTE_CONFIG format.
- If not found in environment then values ar read from the config file
- If not found in the file, then the default values are used.
Parameter |
Type |
config_file |
typing.Union[str, ConfigFile, None] |
for_endpoint()
def for_endpoint(
endpoint: str,
insecure: bool,
data_config: typing.Optional[DataConfig],
config_file: typing.Union[str, ConfigFile],
):
Creates an automatic config for the given endpoint and uses the config_file or environment variable for default.
Refer to Config.auto()
to understand the default bootstrap behavior.
data_config can be used to configure how data is downloaded or uploaded to a specific Blob storage like S3 / GCS etc.
But, for permissions to a specific backend just use Cloud providers reqcommendation. If using fsspec, then
refer to fsspec documentation
Parameter |
Type |
endpoint |
str |
insecure |
bool |
data_config |
typing.Optional[DataConfig] |
config_file |
typing.Union[str, ConfigFile] |
for_sandbox()
Constructs a new Config object specifically to connect to :std:ref:deployment-deployment-sandbox
.
If you are using a hosted Sandbox like environment, then you may need to use port-forward or ingress urls
:return: Config
with_params()
def with_params(
platform: PlatformConfig,
secrets: SecretsConfig,
stats: StatsConfig,
data_config: DataConfig,
local_sandbox_path: str,
):
Parameter |
Type |
platform |
PlatformConfig |
secrets |
SecretsConfig |
stats |
StatsConfig |
data_config |
DataConfig |
local_sandbox_path |
str |
flytekit.remote.remote.ConfigFile
def ConfigFile(
location: str,
):
Load the config from this location
Parameter |
Type |
location |
str |
Methods
Method |
Description |
get() |
None |
get()
def get(
c: typing.Union[LegacyConfigEntry, YamlConfigEntry],
):
Parameter |
Type |
c |
typing.Union[LegacyConfigEntry, YamlConfigEntry] |
Properties
Property |
Type |
Description |
legacy_config |
|
|
yaml_config |
|
|
flytekit.remote.remote.CopyFileDetection
Create a collection of name/value pairs.
Example enumeration:
class Color(Enum):
… RED = 1
… BLUE = 2
… GREEN = 3
Access them by:
Color.RED
<Color.RED: 1>
Color(1)
<Color.RED: 1>
Color[‘RED’]
<Color.RED: 1>
Enumerations can be iterated over, and know how many members they have:
len(Color)
3
list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]
Methods can be added to enumerations, and members can have their own
attributes – see the documentation for details.
flytekit.remote.remote.CoreNode
This class will hold all the things necessary to make an SdkNode but we won’t make one until we know things like
ID, which from the registration step
def CoreNode(
id: str,
metadata: _workflow_model.NodeMetadata,
bindings: List[_literal_models.Binding],
upstream_nodes: List[Node],
flyte_entity: Any,
):
Parameter |
Type |
id |
str |
metadata |
_workflow_model.NodeMetadata |
bindings |
List[_literal_models.Binding] |
upstream_nodes |
List[Node] |
flyte_entity |
Any |
Methods
runs_before()
def runs_before(
other: Node,
):
This is typically something we shouldn’t do. This modifies an attribute of the other instance rather than
self. But it’s done so only because we wanted this English function to be the same as the shift function.
That is, calling node_1.runs_before(node_2) and node_1 » node_2 are the same. The shift operator going the
other direction is not implemented to further avoid confusion. Right shift was picked rather than left shift
because that’s what most users are familiar with.
Parameter |
Type |
other |
Node |
with_overrides()
def with_overrides(
node_name: Optional[str],
aliases: Optional[Dict[str, str]],
requests: Optional[Resources],
limits: Optional[Resources],
timeout: Optional[Union[int, datetime.timedelta, object]],
retries: Optional[int],
interruptible: Optional[bool],
name: Optional[str],
task_config: Optional[Any],
container_image: Optional[str],
accelerator: Optional[BaseAccelerator],
cache: Optional[bool],
cache_version: Optional[str],
cache_serialize: Optional[bool],
shared_memory: Optional[Union[L[True], str]],
pod_template: Optional[PodTemplate],
resources: Optional[Resources],
args,
kwargs,
):
Parameter |
Type |
node_name |
Optional[str] |
aliases |
Optional[Dict[str, str]] |
requests |
Optional[Resources] |
limits |
Optional[Resources] |
timeout |
Optional[Union[int, datetime.timedelta, object]] |
retries |
Optional[int] |
interruptible |
Optional[bool] |
name |
Optional[str] |
task_config |
Optional[Any] |
container_image |
Optional[str] |
accelerator |
Optional[BaseAccelerator] |
cache |
Optional[bool] |
cache_version |
Optional[str] |
cache_serialize |
Optional[bool] |
shared_memory |
Optional[Union[L[True], str]] |
pod_template |
Optional[PodTemplate] |
resources |
Optional[Resources] |
args |
*args |
kwargs |
**kwargs |
Properties
Property |
Type |
Description |
bindings |
|
|
flyte_entity |
|
|
id |
|
|
metadata |
|
|
name |
|
|
outputs |
|
|
run_entity |
|
|
upstream_nodes |
|
|
flytekit.remote.remote.DataConfig
Any data storage specific configuration. Please do not use this to store secrets, in S3 case, as it is used in
Flyte sandbox environment we store the access key id and secret.
All DataPersistence plugins are passed all DataConfig and the plugin should correctly use the right config
def DataConfig(
s3: S3Config,
gcs: GCSConfig,
azure: AzureBlobStorageConfig,
generic: GenericPersistenceConfig,
):
Parameter |
Type |
s3 |
S3Config |
gcs |
GCSConfig |
azure |
AzureBlobStorageConfig |
generic |
GenericPersistenceConfig |
Methods
Method |
Description |
auto() |
None |
auto()
def auto(
config_file: typing.Union[str, ConfigFile],
):
Parameter |
Type |
config_file |
typing.Union[str, ConfigFile] |
flytekit.remote.remote.Domain
Domains are fixed and unique at the global level, and provide an abstraction to isolate resources and feature configuration for different deployment environments.
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
id |
|
|
is_empty |
|
|
name |
|
|
flytekit.remote.remote.ExecutionClusterLabel
def ExecutionClusterLabel(
value,
):
Label value to determine where the execution will be run
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
is_empty |
|
|
value |
|
|
def ExecutionMetadata(
mode: int,
principal: str,
nesting: int,
scheduled_at: Optional[datetime.datetime],
parent_node_execution: Optional[_identifier.NodeExecutionIdentifier],
reference_execution: Optional[_identifier.WorkflowExecutionIdentifier],
system_metadata: Optional[SystemMetadata],
):
Parameter |
Type |
mode |
int |
principal |
str |
nesting |
int |
scheduled_at |
Optional[datetime.datetime] |
parent_node_execution |
Optional[_identifier.NodeExecutionIdentifier] |
reference_execution |
Optional[_identifier.WorkflowExecutionIdentifier] |
system_metadata |
Optional[SystemMetadata] |
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
is_empty |
|
|
mode |
|
|
nesting |
|
|
parent_node_execution |
|
|
principal |
|
|
reference_execution |
|
|
scheduled_at |
|
|
system_metadata |
|
|
flytekit.remote.remote.ExecutionSpec
def ExecutionSpec(
launch_plan,
metadata,
notifications,
disable_all,
labels,
annotations,
auth_role,
raw_output_data_config,
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
overwrite_cache: Optional[bool],
interruptible: Optional[bool],
envs: Optional[_common_models.Envs],
tags: Optional[typing.List[str]],
cluster_assignment: Optional[ClusterAssignment],
execution_cluster_label: Optional[ExecutionClusterLabel],
):
Parameter |
Type |
launch_plan |
|
metadata |
|
notifications |
|
disable_all |
|
labels |
|
annotations |
|
auth_role |
|
raw_output_data_config |
|
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
overwrite_cache |
Optional[bool] |
interruptible |
Optional[bool] |
envs |
Optional[_common_models.Envs] |
tags |
Optional[typing.List[str]] |
cluster_assignment |
Optional[ClusterAssignment] |
execution_cluster_label |
Optional[ExecutionClusterLabel] |
Methods
from_flyte_idl()
def from_flyte_idl(
p,
):
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
annotations |
|
|
auth_role |
|
|
cluster_assignment |
|
|
disable_all |
|
|
envs |
|
|
execution_cluster_label |
|
|
interruptible |
|
|
is_empty |
|
|
labels |
|
|
launch_plan |
|
|
max_parallelism |
|
|
metadata |
|
|
notifications |
|
|
overwrite_cache |
|
|
raw_output_data_config |
|
|
security_context |
|
|
tags |
|
|
flytekit.remote.remote.FastPackageOptions
FastPackageOptions is used to set configuration options when packaging files.
def FastPackageOptions(
ignores: list[Ignore],
keep_default_ignores: bool,
copy_style: Optional[CopyFileDetection],
show_files: bool,
):
Parameter |
Type |
ignores |
list[Ignore] |
keep_default_ignores |
bool |
copy_style |
Optional[CopyFileDetection] |
show_files |
bool |
flytekit.remote.remote.FastSerializationSettings
This object hold information about settings necessary to serialize an object so that it can be fast-registered.
def FastSerializationSettings(
enabled: bool,
destination_dir: Optional[str],
distribution_location: Optional[str],
):
Parameter |
Type |
enabled |
bool |
destination_dir |
Optional[str] |
distribution_location |
Optional[str] |
Methods
from_dict()
def from_dict(
kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
infer_missing,
):
Parameter |
Type |
kvs |
typing.Union[dict, list, str, int, float, bool, NoneType] |
infer_missing |
|
from_json()
def from_json(
s: typing.Union[str, bytes, bytearray],
parse_float,
parse_int,
parse_constant,
infer_missing,
kw,
):
Parameter |
Type |
s |
typing.Union[str, bytes, bytearray] |
parse_float |
|
parse_int |
|
parse_constant |
|
infer_missing |
|
kw |
|
schema()
def schema(
infer_missing: bool,
only,
exclude,
many: bool,
context,
load_only,
dump_only,
partial: bool,
unknown,
):
Parameter |
Type |
infer_missing |
bool |
only |
|
exclude |
|
many |
bool |
context |
|
load_only |
|
dump_only |
|
partial |
bool |
unknown |
|
to_dict()
def to_dict(
encode_json,
):
Parameter |
Type |
encode_json |
|
to_json()
def to_json(
skipkeys: bool,
ensure_ascii: bool,
check_circular: bool,
allow_nan: bool,
indent: typing.Union[int, str, NoneType],
separators: typing.Tuple[str, str],
default: typing.Callable,
sort_keys: bool,
kw,
):
Parameter |
Type |
skipkeys |
bool |
ensure_ascii |
bool |
check_circular |
bool |
allow_nan |
bool |
indent |
typing.Union[int, str, NoneType] |
separators |
typing.Tuple[str, str] |
default |
typing.Callable |
sort_keys |
bool |
kw |
|
flytekit.remote.remote.FileAccessProvider
This is the class that is available through the FlyteContext and can be used for persisting data to the remote
durable store.
def FileAccessProvider(
local_sandbox_dir: typing.Union[str, os.PathLike],
raw_output_prefix: str,
data_config: typing.Optional[flytekit.configuration.DataConfig],
execution_metadata: typing.Optional[dict],
):
Parameter |
Type |
local_sandbox_dir |
typing.Union[str, os.PathLike] |
raw_output_prefix |
str |
data_config |
typing.Optional[flytekit.configuration.DataConfig] |
execution_metadata |
typing.Optional[dict] |
Methods
async_get_data()
def async_get_data(
remote_path: str,
local_path: str,
is_multipart: bool,
kwargs,
):
Parameter |
Type |
remote_path |
str |
local_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
async_put_data()
def async_put_data(
local_path: typing.Union[str, os.PathLike],
remote_path: str,
is_multipart: bool,
kwargs,
):
The implication here is that we’re always going to put data to the remote location, so we .remote to ensure
we don’t use the true local proxy if the remote path is a file://
Parameter |
Type |
local_path |
typing.Union[str, os.PathLike] |
remote_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
async_put_raw_data()
def async_put_raw_data(
lpath: typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO],
upload_prefix: typing.Optional[str],
file_name: typing.Optional[str],
read_chunk_size_bytes: int,
encoding: str,
skip_raw_data_prefix: bool,
kwargs,
):
This is a more flexible version of put that accepts a file-like object or a string path.
Writes to the raw output prefix only. If you want to write to another fs use put_data or get the fsspec
file system directly.
FYI: Currently the raw output prefix set by propeller is already unique per retry and looks like
s3://my-s3-bucket/data/o4/feda4e266c748463a97d-n0-0
If lpath is a folder, then recursive will be set.
If lpath is a streamable, then it can only be a single file.
Writes to:
{raw output prefix}/{upload_prefix}/{file_name}
Parameter |
Type |
lpath |
typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO] |
upload_prefix |
typing.Optional[str] |
file_name |
typing.Optional[str] |
read_chunk_size_bytes |
int |
encoding |
str |
skip_raw_data_prefix |
bool |
kwargs |
**kwargs |
download()
def download(
remote_path: str,
local_path: str,
kwargs,
):
Downloads from remote to local
Parameter |
Type |
remote_path |
str |
local_path |
str |
kwargs |
**kwargs |
download_directory()
def download_directory(
remote_path: str,
local_path: str,
kwargs,
):
Downloads directory from given remote to local path
Parameter |
Type |
remote_path |
str |
local_path |
str |
kwargs |
**kwargs |
exists()
def exists(
path: str,
):
generate_new_custom_path()
def generate_new_custom_path(
fs: typing.Optional[fsspec.spec.AbstractFileSystem],
alt: typing.Optional[str],
stem: typing.Optional[str],
):
Generates a new path with the raw output prefix and a random string appended to it.
Optionally, you can provide an alternate prefix and a stem. If stem is provided, it
will be appended to the path instead of a random string. If alt is provided, it will
replace the first part of the output prefix, e.g. the S3 or GCS bucket.
If wanting to write to a non-random prefix in a non-default S3 bucket, this can be
called with alt=“my-alt-bucket” and stem=“my-stem” to generate a path like
s3://my-alt-bucket/default-prefix-part/my-stem
Parameter |
Type |
fs |
typing.Optional[fsspec.spec.AbstractFileSystem] |
alt |
typing.Optional[str] |
stem |
typing.Optional[str] |
get()
def get(
from_path: str,
to_path: str,
recursive: bool,
kwargs,
):
Parameter |
Type |
from_path |
str |
to_path |
str |
recursive |
bool |
kwargs |
**kwargs |
get_async_filesystem_for_path()
def get_async_filesystem_for_path(
path: str,
anonymous: bool,
kwargs,
):
Parameter |
Type |
path |
str |
anonymous |
bool |
kwargs |
**kwargs |
get_data()
def get_data(
remote_path: str,
local_path: str,
is_multipart: bool,
kwargs,
):
Parameter |
Type |
remote_path |
str |
local_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
get_file_tail()
def get_file_tail(
file_path_or_file_name: str,
):
Parameter |
Type |
file_path_or_file_name |
str |
get_filesystem()
def get_filesystem(
protocol: typing.Optional[str],
anonymous: bool,
path: typing.Optional[str],
kwargs,
):
Parameter |
Type |
protocol |
typing.Optional[str] |
anonymous |
bool |
path |
typing.Optional[str] |
kwargs |
**kwargs |
get_filesystem_for_path()
def get_filesystem_for_path(
path: str,
anonymous: bool,
kwargs,
):
Parameter |
Type |
path |
str |
anonymous |
bool |
kwargs |
**kwargs |
get_random_local_directory()
def get_random_local_directory()
get_random_local_path()
def get_random_local_path(
file_path_or_file_name: typing.Optional[str],
):
Use file_path_or_file_name, when you want a random directory, but want to preserve the leaf file name
Parameter |
Type |
file_path_or_file_name |
typing.Optional[str] |
get_random_remote_directory()
def get_random_remote_directory()
get_random_remote_path()
def get_random_remote_path(
file_path_or_file_name: typing.Optional[str],
):
Parameter |
Type |
file_path_or_file_name |
typing.Optional[str] |
get_random_string()
is_remote()
def is_remote(
path: typing.Union[str, os.PathLike],
):
Deprecated. Let’s find a replacement
Parameter |
Type |
path |
typing.Union[str, os.PathLike] |
join()
def join(
args: `*args`,
unstrip: bool,
fs: typing.Optional[fsspec.spec.AbstractFileSystem],
):
Parameter |
Type |
args |
*args |
unstrip |
bool |
fs |
typing.Optional[fsspec.spec.AbstractFileSystem] |
put_data()
def put_data(
local_path: typing.Union[str, os.PathLike],
remote_path: str,
is_multipart: bool,
kwargs,
):
The implication here is that we’re always going to put data to the remote location, so we .remote to ensure
we don’t use the true local proxy if the remote path is a file://
Parameter |
Type |
local_path |
typing.Union[str, os.PathLike] |
remote_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
put_raw_data()
def put_raw_data(
lpath: typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO],
upload_prefix: typing.Optional[str],
file_name: typing.Optional[str],
read_chunk_size_bytes: int,
encoding: str,
skip_raw_data_prefix: bool,
kwargs,
):
This is a more flexible version of put that accepts a file-like object or a string path.
Writes to the raw output prefix only. If you want to write to another fs use put_data or get the fsspec
file system directly.
FYI: Currently the raw output prefix set by propeller is already unique per retry and looks like
s3://my-s3-bucket/data/o4/feda4e266c748463a97d-n0-0
If lpath is a folder, then recursive will be set.
If lpath is a streamable, then it can only be a single file.
Writes to:
{raw output prefix}/{upload_prefix}/{file_name}
Parameter |
Type |
lpath |
typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO] |
upload_prefix |
typing.Optional[str] |
file_name |
typing.Optional[str] |
read_chunk_size_bytes |
int |
encoding |
str |
skip_raw_data_prefix |
bool |
kwargs |
**kwargs |
recursive_paths()
def recursive_paths(
f: str,
t: str,
):
Parameter |
Type |
f |
str |
t |
str |
sep()
def sep(
file_system: typing.Optional[fsspec.spec.AbstractFileSystem],
):
Parameter |
Type |
file_system |
typing.Optional[fsspec.spec.AbstractFileSystem] |
def strip_file_header(
path: str,
trim_trailing_sep: bool,
):
Drops file:// if it exists from the file
Parameter |
Type |
path |
str |
trim_trailing_sep |
bool |
upload()
def upload(
file_path: str,
to_path: str,
kwargs,
):
Parameter |
Type |
file_path |
str |
to_path |
str |
kwargs |
**kwargs |
upload_directory()
def upload_directory(
local_path: str,
remote_path: str,
kwargs,
):
Parameter |
Type |
local_path |
str |
remote_path |
str |
kwargs |
**kwargs |
Properties
Property |
Type |
Description |
data_config |
|
|
local_access |
|
|
local_sandbox_dir |
|
|
raw_output_fs |
|
|
raw_output_prefix |
|
|
flytekit.remote.remote.FlyteAssertion
Assertion failed.
def FlyteAssertion(
args,
timestamp: typing.Optional[float],
):
Parameter |
Type |
args |
*args |
timestamp |
typing.Optional[float] |
Properties
Property |
Type |
Description |
timestamp |
|
|
flytekit.remote.remote.FlyteContext
This is an internal-facing context object, that most users will not have to deal with. It’s essentially a globally
available grab bag of settings and objects that allows flytekit to do things like convert complex types, run and
compile workflows, serialize Flyte entities, etc.
Even though this object as a current_context
function on it, it should not be called directly. Please use the
:py:class:flytekit.FlyteContextManager
object instead.
Please do not confuse this object with the :py:class:flytekit.ExecutionParameters
object.
def FlyteContext(
file_access: FileAccessProvider,
level: int,
flyte_client: Optional['friendly_client.SynchronousFlyteClient'],
compilation_state: Optional[CompilationState],
execution_state: Optional[ExecutionState],
serialization_settings: Optional[SerializationSettings],
in_a_condition: bool,
origin_stackframe: Optional[traceback.FrameSummary],
output_metadata_tracker: Optional[OutputMetadataTracker],
worker_queue: Optional[Controller],
):
Parameter |
Type |
file_access |
FileAccessProvider |
level |
int |
flyte_client |
Optional['friendly_client.SynchronousFlyteClient'] |
compilation_state |
Optional[CompilationState] |
execution_state |
Optional[ExecutionState] |
serialization_settings |
Optional[SerializationSettings] |
in_a_condition |
bool |
origin_stackframe |
Optional[traceback.FrameSummary] |
output_metadata_tracker |
Optional[OutputMetadataTracker] |
worker_queue |
Optional[Controller] |
Methods
current_context()
This method exists only to maintain backwards compatibility. Please use
FlyteContextManager.current_context()
instead.
Users of flytekit should be wary not to confuse the object returned from this function
with :py:func:flytekit.current_context
enter_conditional_section()
def enter_conditional_section()
get_deck()
Returns the deck that was created as part of the last execution.
The return value depends on the execution environment. In a notebook, the return value is compatible with
IPython.display and should be rendered in the notebook.
.. code-block:: python
with flytekit.new_context() as ctx:
my_task(…)
ctx.get_deck()
OR if you wish to explicitly display
.. code-block:: python
from IPython import display
display(ctx.get_deck())
get_origin_stackframe_repr()
def get_origin_stackframe_repr()
new_builder()
new_compilation_state()
def new_compilation_state(
prefix: str,
):
Creates and returns a default compilation state. For most of the code this should be the entrypoint
of compilation, otherwise the code should always uses - with_compilation_state
Parameter |
Type |
prefix |
str |
new_execution_state()
def new_execution_state(
working_dir: Optional[os.PathLike],
):
Creates and returns a new default execution state. This should be used at the entrypoint of execution,
in all other cases it is preferable to use with_execution_state
Parameter |
Type |
working_dir |
Optional[os.PathLike] |
set_stackframe()
def set_stackframe(
s: traceback.FrameSummary,
):
Parameter |
Type |
s |
traceback.FrameSummary |
with_client()
def with_client(
c: SynchronousFlyteClient,
):
Parameter |
Type |
c |
SynchronousFlyteClient |
with_compilation_state()
def with_compilation_state(
c: CompilationState,
):
Parameter |
Type |
c |
CompilationState |
with_execution_state()
def with_execution_state(
es: ExecutionState,
):
Parameter |
Type |
es |
ExecutionState |
with_file_access()
def with_file_access(
fa: FileAccessProvider,
):
Parameter |
Type |
fa |
FileAccessProvider |
with_new_compilation_state()
def with_new_compilation_state()
def with_output_metadata_tracker(
t: OutputMetadataTracker,
):
Parameter |
Type |
t |
OutputMetadataTracker |
with_serialization_settings()
def with_serialization_settings(
ss: SerializationSettings,
):
Parameter |
Type |
ss |
SerializationSettings |
with_worker_queue()
def with_worker_queue(
wq: Controller,
):
Parameter |
Type |
wq |
Controller |
Properties
Property |
Type |
Description |
user_space_params |
|
|
flytekit.remote.remote.FlyteContextManager
FlyteContextManager manages the execution context within Flytekit. It holds global state of either compilation
or Execution. It is not thread-safe and can only be run as a single threaded application currently.
Context’s within Flytekit is useful to manage compilation state and execution state. Refer to CompilationState
and ExecutionState
for more information. FlyteContextManager provides a singleton stack to manage these contexts.
Typical usage is
.. code-block:: python
FlyteContextManager.initialize()
with FlyteContextManager.with_context(o) as ctx:
pass
If required - not recommended you can use
FlyteContextManager.push_context()
but correspondingly a pop_context should be called
FlyteContextManager.pop_context()
Methods
add_signal_handler()
def add_signal_handler(
handler: typing.Callable[[int, FrameType], typing.Any],
):
Parameter |
Type |
handler |
typing.Callable[[int, FrameType], typing.Any] |
current_context()
get_origin_stackframe()
def get_origin_stackframe(
limit,
):
initialize()
Re-initializes the context and erases the entire context
pop_context()
push_context()
def push_context(
ctx: FlyteContext,
f: Optional[traceback.FrameSummary],
):
Parameter |
Type |
ctx |
FlyteContext |
f |
Optional[traceback.FrameSummary] |
size()
with_context()
def with_context(
b: FlyteContext.Builder,
):
Parameter |
Type |
b |
FlyteContext.Builder |
flytekit.remote.remote.FlyteEntityAlreadyExistsException
Assertion failed.
def FlyteEntityAlreadyExistsException(
args,
timestamp: typing.Optional[float],
):
Parameter |
Type |
args |
*args |
timestamp |
typing.Optional[float] |
Properties
Property |
Type |
Description |
timestamp |
|
|
flytekit.remote.remote.FlyteEntityNotExistException
Assertion failed.
def FlyteEntityNotExistException(
args,
timestamp: typing.Optional[float],
):
Parameter |
Type |
args |
*args |
timestamp |
typing.Optional[float] |
Properties
Property |
Type |
Description |
timestamp |
|
|
flytekit.remote.remote.FlyteExecutionSpan
def FlyteExecutionSpan(
span: flyteidl.core.metrics_pb2.Span,
):
Parameter |
Type |
span |
flyteidl.core.metrics_pb2.Span |
Methods
dump()
explain()
from_flyte_idl()
def from_flyte_idl(
pb,
):
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
is_empty |
|
|
flytekit.remote.remote.FlyteLaunchPlan
A class encapsulating a remote Flyte launch plan.
def FlyteLaunchPlan(
id,
args,
kwargs,
):
Parameter |
Type |
id |
|
args |
*args |
kwargs |
**kwargs |
Methods
compile()
def compile(
ctx: FlyteContext,
args,
kwargs,
):
Parameter |
Type |
ctx |
FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
execute()
Parameter |
Type |
kwargs |
**kwargs |
from_flyte_idl()
def from_flyte_idl(
pb2,
):
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
def promote_from_model(
id: id_models.Identifier,
model: _launch_plan_models.LaunchPlanSpec,
):
Parameter |
Type |
id |
id_models.Identifier |
model |
_launch_plan_models.LaunchPlanSpec |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
annotations |
|
|
auth_role |
|
|
default_inputs |
|
|
entity_metadata |
|
|
entity_type_text |
|
|
fixed_inputs |
|
|
flyte_workflow |
|
|
id |
|
|
interface |
|
|
is_empty |
|
|
is_scheduled |
|
|
labels |
|
|
max_parallelism |
|
|
name |
|
|
overwrite_cache |
|
|
python_interface |
|
|
raw_output_data_config |
|
|
resource_type |
|
|
security_context |
|
|
workflow_id |
|
|
flytekit.remote.remote.FlyteNode
A class encapsulating a remote Flyte node.
def FlyteNode(
id,
upstream_nodes,
bindings,
metadata,
task_node: Optional[FlyteTaskNode],
workflow_node: Optional[FlyteWorkflowNode],
branch_node: Optional[FlyteBranchNode],
gate_node: Optional[FlyteGateNode],
array_node: Optional[FlyteArrayNode],
):
Parameter |
Type |
id |
|
upstream_nodes |
|
bindings |
|
metadata |
|
task_node |
Optional[FlyteTaskNode] |
workflow_node |
Optional[FlyteWorkflowNode] |
branch_node |
Optional[FlyteBranchNode] |
gate_node |
Optional[FlyteGateNode] |
array_node |
Optional[FlyteArrayNode] |
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
def promote_from_model(
model: _workflow_model.Node,
sub_workflows: Optional[Dict[id_models.Identifier, _workflow_model.WorkflowTemplate]],
node_launch_plans: Optional[Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec]],
tasks: Dict[id_models.Identifier, FlyteTask],
converted_sub_workflows: Dict[id_models.Identifier, FlyteWorkflow],
):
Parameter |
Type |
model |
_workflow_model.Node |
sub_workflows |
Optional[Dict[id_models.Identifier, _workflow_model.WorkflowTemplate]] |
node_launch_plans |
Optional[Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec]] |
tasks |
Dict[id_models.Identifier, FlyteTask] |
converted_sub_workflows |
Dict[id_models.Identifier, FlyteWorkflow] |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
array_node |
|
|
branch_node |
|
|
flyte_entity |
|
|
gate_node |
|
|
id |
|
|
inputs |
|
|
is_empty |
|
|
metadata |
|
|
output_aliases |
|
|
target |
|
|
task_node |
|
|
upstream_node_ids |
|
|
upstream_nodes |
|
|
workflow_node |
|
|
flytekit.remote.remote.FlyteNodeExecution
A class encapsulating a node execution being run on a Flyte remote backend.
def FlyteNodeExecution(
args,
kwargs,
):
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
Methods
from_flyte_idl()
def from_flyte_idl(
p: flyteidl.admin.node_execution_pb2.NodeExecution,
):
Parameter |
Type |
p |
flyteidl.admin.node_execution_pb2.NodeExecution |
def promote_from_model(
base_model: node_execution_models.NodeExecution,
):
Parameter |
Type |
base_model |
node_execution_models.NodeExecution |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
closure |
|
|
error |
|
|
executions |
|
|
id |
|
|
input_uri |
|
|
inputs |
|
|
interface |
|
|
is_done |
|
|
is_empty |
|
|
metadata |
|
|
outputs |
|
|
subworkflow_node_executions |
|
|
task_executions |
|
|
workflow_executions |
|
|
flytekit.remote.remote.FlyteRemote
Main entrypoint for programmatically accessing a Flyte remote backend.
The term ‘remote’ is synonymous with ‘backend’ or ‘deployment’ and refers to a hosted instance of the
Flyte platform, which comes with a Flyte Admin server on some known URI.
def FlyteRemote(
config: Config,
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: typing.Optional[bool],
kwargs,
):
Initialize a FlyteRemote object.
:type kwargs: All arguments that can be passed to create the SynchronousFlyteClient. These are usually grpc
parameters, if you want to customize credentials, ssl handling etc.
Parameter |
Type |
config |
Config |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
typing.Optional[bool] |
kwargs |
**kwargs |
Methods
Method |
Description |
activate_launchplan() |
Given a launchplan, activate it, all previous versions are deactivated |
approve() |
|
auto() |
None |
download() |
Download the data to the specified location |
execute() |
Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity |
execute_local_launch_plan() |
Execute a locally defined LaunchPlan |
execute_local_task() |
Execute a @task-decorated function or TaskTemplate task |
execute_local_workflow() |
Execute an @workflow decorated function |
execute_reference_launch_plan() |
Execute a ReferenceLaunchPlan |
execute_reference_task() |
Execute a ReferenceTask |
execute_reference_workflow() |
Execute a ReferenceWorkflow |
execute_remote_task_lp() |
Execute a FlyteTask, or FlyteLaunchplan |
execute_remote_wf() |
Execute a FlyteWorkflow |
fast_package() |
Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location |
fast_register_workflow() |
Use this method to register a workflow with zip mode |
fetch_active_launchplan() |
Returns the active version of the launch plan if it exists or returns None |
fetch_execution() |
Fetch a workflow execution entity from flyte admin |
fetch_launch_plan() |
Fetch a launchplan entity from flyte admin |
fetch_task() |
Fetch a task entity from flyte admin |
fetch_task_lazy() |
Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily |
fetch_workflow() |
Fetch a workflow entity from flyte admin |
fetch_workflow_lazy() |
Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily |
find_launch_plan() |
None |
find_launch_plan_for_node() |
None |
for_endpoint() |
None |
for_sandbox() |
None |
generate_console_http_domain() |
This should generate the domain where console is hosted |
generate_console_url() |
Generate a Flyteconsole URL for the given Flyte remote endpoint |
get() |
General function that works with flyte tiny urls |
get_domains() |
Lists registered domains from flyte admin |
get_execution_metrics() |
Get the metrics for a given execution |
get_extra_headers_for_protocol() |
None |
launch_backfill() |
Creates and launches a backfill workflow for the given launchplan |
list_projects() |
Lists registered projects from flyte admin |
list_signals() |
|
list_tasks_by_version() |
None |
raw_register() |
Raw register method, can be used to register control plane entities |
recent_executions() |
None |
register_launch_plan() |
Register a given launchplan, possibly applying overrides from the provided options |
register_script() |
Use this method to register a workflow via script mode |
register_task() |
Register a qualified task (PythonTask) with Remote |
register_workflow() |
Use this method to register a workflow |
reject() |
|
remote_context() |
Context manager with remote-specific configuration |
set_input() |
|
set_signal() |
|
sync() |
This function was previously a singledispatchmethod |
sync_execution() |
Sync a FlyteWorkflowExecution object with its corresponding remote state |
sync_node_execution() |
Get data backing a node execution |
sync_task_execution() |
Sync a FlyteTaskExecution object with its corresponding remote state |
terminate() |
Terminate a workflow execution |
upload_file() |
Function will use remote’s client to hash and then upload the file using Admin’s data proxy service |
wait() |
Wait for an execution to finish |
activate_launchplan()
def activate_launchplan(
ident: Identifier,
):
Given a launchplan, activate it, all previous versions are deactivated.
Parameter |
Type |
ident |
Identifier |
approve()
def approve(
signal_id: str,
execution_name: str,
project: str,
domain: str,
):
Parameter |
Type |
signal_id |
str |
execution_name |
str |
project |
str |
domain |
str |
auto()
def auto(
config_file: typing.Union[str, ConfigFile],
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
):
Parameter |
Type |
config_file |
typing.Union[str, ConfigFile] |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
bool |
kwargs |
**kwargs |
download()
def download(
data: typing.Union[LiteralsResolver, Literal, LiteralMap],
download_to: str,
recursive: bool,
):
Download the data to the specified location. If the data is a LiteralsResolver, LiteralMap and if recursive is
specified, then all file like objects will be recursively downloaded (e.g. FlyteFile/Dir (blob),
StructuredDataset etc).
Note: That it will use your sessions credentials to access the remote location. For sandbox, this should be
automatically configured, assuming you are running sandbox locally. For other environments, you will need to
configure your credentials appropriately.
Parameter |
Type |
data |
typing.Union[LiteralsResolver, Literal, LiteralMap] |
download_to |
str |
recursive |
bool |
execute()
def execute(
entity: typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity],
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
):
Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity.
This method supports:
Flyte{Task, Workflow, LaunchPlan}
remote module objects.
@task
-decorated functions and TaskTemplate
tasks.
@workflow
-decorated functions.
LaunchPlan
objects.
For local entities, this code will attempt to find the entity first, and if missing, will compile and register
the object.
Not all arguments are relevant in all circumstances. For example, there’s no reason to use the serialization
settings for entities that have already been registered on Admin.
Parameter |
Type |
entity |
typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity] |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
name |
str |
version |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
image_config |
typing.Optional[ImageConfig] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_local_launch_plan()
def execute_local_launch_plan(
entity: LaunchPlan,
inputs: typing.Dict[str, typing.Any],
version: str,
project: typing.Optional[str],
domain: typing.Optional[str],
name: typing.Optional[str],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
):
Execute a locally defined LaunchPlan
.
Parameter |
Type |
entity |
LaunchPlan |
inputs |
typing.Dict[str, typing.Any] |
version |
str |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
name |
typing.Optional[str] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_local_task()
def execute_local_task(
entity: PythonTask,
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
options: typing.Optional[Options],
serialization_settings: typing.Optional[SerializationSettings],
):
Execute a @task-decorated function or TaskTemplate task.
Parameter |
Type |
entity |
PythonTask |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
name |
str |
version |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
image_config |
typing.Optional[ImageConfig] |
wait |
bool |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
options |
typing.Optional[Options] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_local_workflow()
def execute_local_workflow(
entity: WorkflowBase,
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
options: typing.Optional[Options],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
):
Execute an @workflow decorated function.
Parameter |
Type |
entity |
WorkflowBase |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
name |
str |
version |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
image_config |
typing.Optional[ImageConfig] |
options |
typing.Optional[Options] |
wait |
bool |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_reference_launch_plan()
def execute_reference_launch_plan(
entity: ReferenceLaunchPlan,
inputs: typing.Dict[str, typing.Any],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a ReferenceLaunchPlan.
Parameter |
Type |
entity |
ReferenceLaunchPlan |
inputs |
typing.Dict[str, typing.Any] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_reference_task()
def execute_reference_task(
entity: ReferenceTask,
inputs: typing.Dict[str, typing.Any],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a ReferenceTask.
Parameter |
Type |
entity |
ReferenceTask |
inputs |
typing.Dict[str, typing.Any] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_reference_workflow()
def execute_reference_workflow(
entity: ReferenceWorkflow,
inputs: typing.Dict[str, typing.Any],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a ReferenceWorkflow.
Parameter |
Type |
entity |
ReferenceWorkflow |
inputs |
typing.Dict[str, typing.Any] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_remote_task_lp()
def execute_remote_task_lp(
entity: typing.Union[FlyteTask, FlyteLaunchPlan],
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a FlyteTask, or FlyteLaunchplan.
NOTE: the name and version arguments are currently not used and only there consistency in the function signature
Parameter |
Type |
entity |
typing.Union[FlyteTask, FlyteLaunchPlan] |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_remote_wf()
def execute_remote_wf(
entity: FlyteWorkflow,
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a FlyteWorkflow.
NOTE: the name and version arguments are currently not used and only there consistency in the function signature
Parameter |
Type |
entity |
FlyteWorkflow |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
fast_package()
def fast_package(
root: os.PathLike,
deref_symlinks: bool,
output: str,
options: typing.Optional[FastPackageOptions],
):
Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location
Parameter |
Type |
root |
os.PathLike |
deref_symlinks |
bool |
output |
str |
options |
typing.Optional[FastPackageOptions] |
fast_register_workflow()
def fast_register_workflow(
entity: WorkflowBase,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
default_launch_plan: typing.Optional[bool],
options: typing.Optional[Options],
fast_package_options: typing.Optional[FastPackageOptions],
):
Use this method to register a workflow with zip mode.
Parameter |
Type |
entity |
WorkflowBase |
serialization_settings |
typing.Optional[SerializationSettings] |
version |
typing.Optional[str] |
default_launch_plan |
typing.Optional[bool] |
options |
typing.Optional[Options] |
fast_package_options |
typing.Optional[FastPackageOptions] |
fetch_active_launchplan()
def fetch_active_launchplan(
project: str,
domain: str,
name: str,
):
Returns the active version of the launch plan if it exists or returns None
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
fetch_execution()
def fetch_execution(
project: str,
domain: str,
name: str,
):
Fetch a workflow execution entity from flyte admin.
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
fetch_launch_plan()
def fetch_launch_plan(
project: str,
domain: str,
name: str,
version: str,
):
Fetch a launchplan entity from flyte admin.
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_task()
def fetch_task(
project: str,
domain: str,
name: str,
version: str,
):
Fetch a task entity from flyte admin.
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_task_lazy()
def fetch_task_lazy(
project: str,
domain: str,
name: str,
version: str,
):
Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily.
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_workflow()
def fetch_workflow(
project: str,
domain: str,
name: str,
version: str,
):
Fetch a workflow entity from flyte admin.
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_workflow_lazy()
def fetch_workflow_lazy(
project: str,
domain: str,
name: str,
version: str,
):
Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily.
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
find_launch_plan()
def find_launch_plan(
lp_ref: id_models,
node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
):
Parameter |
Type |
lp_ref |
id_models |
node_launch_plans |
Dict[id_models, launch_plan_models.LaunchPlanSpec] |
find_launch_plan_for_node()
def find_launch_plan_for_node(
node: Node,
node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
):
Parameter |
Type |
node |
Node |
node_launch_plans |
Dict[id_models, launch_plan_models.LaunchPlanSpec] |
for_endpoint()
def for_endpoint(
endpoint: str,
insecure: bool,
data_config: typing.Optional[DataConfig],
config_file: typing.Union[str, ConfigFile],
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
):
Parameter |
Type |
endpoint |
str |
insecure |
bool |
data_config |
typing.Optional[DataConfig] |
config_file |
typing.Union[str, ConfigFile] |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
bool |
kwargs |
**kwargs |
for_sandbox()
def for_sandbox(
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
):
Parameter |
Type |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
bool |
kwargs |
**kwargs |
generate_console_http_domain()
def generate_console_http_domain()
This should generate the domain where console is hosted.
:return:
generate_console_url()
def generate_console_url(
entity: typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, WorkflowExecutionIdentifier, Identifier, FlyteLaunchPlan],
):
Generate a Flyteconsole URL for the given Flyte remote endpoint.
This will automatically determine if this is an execution or an entity and change the type automatically
Parameter |
Type |
entity |
typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, WorkflowExecutionIdentifier, Identifier, FlyteLaunchPlan] |
get()
def get(
flyte_uri: typing.Optional[str],
):
General function that works with flyte tiny urls. This can return outputs (in the form of LiteralsResolver, or
individual Literals for singular requests), or HTML if passed a deck link, or bytes containing HTML,
if ipython is not available locally.
Parameter |
Type |
flyte_uri |
typing.Optional[str] |
get_domains()
Lists registered domains from flyte admin.
:returns: typing.List[flytekit.models.domain.Domain]
get_execution_metrics()
def get_execution_metrics(
id: WorkflowExecutionIdentifier,
depth: int,
):
Get the metrics for a given execution.
Parameter |
Type |
id |
WorkflowExecutionIdentifier |
depth |
int |
def get_extra_headers_for_protocol(
native_url,
):
Parameter |
Type |
native_url |
|
launch_backfill()
def launch_backfill(
project: str,
domain: str,
from_date: datetime,
to_date: datetime,
launchplan: str,
launchplan_version: str,
execution_name: str,
version: str,
dry_run: bool,
execute: bool,
parallel: bool,
failure_policy: typing.Optional[WorkflowFailurePolicy],
overwrite_cache: typing.Optional[bool],
):
Creates and launches a backfill workflow for the given launchplan. If launchplan version is not specified,
then the latest launchplan is retrieved.
The from_date is exclusive and end_date is inclusive and backfill run for all instances in between. ::
-> (start_date - exclusive, end_date inclusive)
If dry_run is specified, the workflow is created and returned.
If execute==False is specified then the workflow is created and registered.
In the last case, the workflow is created, registered and executed.
The parallel
flag can be used to generate a workflow where all launchplans can be run in parallel. Default
is that execute backfill is run sequentially
Parameter |
Type |
project |
str |
domain |
str |
from_date |
datetime |
to_date |
datetime |
launchplan |
str |
launchplan_version |
str |
execution_name |
str |
version |
str |
dry_run |
bool |
execute |
bool |
parallel |
bool |
failure_policy |
typing.Optional[WorkflowFailurePolicy] |
overwrite_cache |
typing.Optional[bool] |
list_projects()
def list_projects(
limit: typing.Optional[int],
filters: typing.Optional[typing.List[filter_models.Filter]],
sort_by: typing.Optional[admin_common_models.Sort],
):
Lists registered projects from flyte admin.
Parameter |
Type |
limit |
typing.Optional[int] |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
sort_by |
typing.Optional[admin_common_models.Sort] |
list_signals()
def list_signals(
execution_name: str,
project: typing.Optional[str],
domain: typing.Optional[str],
limit: int,
filters: typing.Optional[typing.List[filter_models.Filter]],
):
Parameter |
Type |
execution_name |
str |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
limit |
int |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
list_tasks_by_version()
def list_tasks_by_version(
version: str,
project: typing.Optional[str],
domain: typing.Optional[str],
limit: typing.Optional[int],
):
Parameter |
Type |
version |
str |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
limit |
typing.Optional[int] |
raw_register()
def raw_register(
cp_entity: FlyteControlPlaneEntity,
settings: SerializationSettings,
version: str,
create_default_launchplan: bool,
options: Options,
og_entity: FlyteLocalEntity,
):
Raw register method, can be used to register control plane entities. Usually if you have a Flyte Entity like a
WorkflowBase, Task, LaunchPlan then use other methods. This should be used only if you have already serialized entities
Parameter |
Type |
cp_entity |
FlyteControlPlaneEntity |
settings |
SerializationSettings |
version |
str |
create_default_launchplan |
bool |
options |
Options |
og_entity |
FlyteLocalEntity |
recent_executions()
def recent_executions(
project: typing.Optional[str],
domain: typing.Optional[str],
limit: typing.Optional[int],
filters: typing.Optional[typing.List[filter_models.Filter]],
):
Parameter |
Type |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
limit |
typing.Optional[int] |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
register_launch_plan()
def register_launch_plan(
entity: LaunchPlan,
version: typing.Optional[str],
project: typing.Optional[str],
domain: typing.Optional[str],
options: typing.Optional[Options],
serialization_settings: typing.Optional[SerializationSettings],
):
Register a given launchplan, possibly applying overrides from the provided options. If the underlying workflow
is not already registered, it, along with any underlying entities, will also be registered. If the underlying
workflow does exist (with the given project/domain/version), then only the launchplan will be registered.
Parameter |
Type |
entity |
LaunchPlan |
version |
typing.Optional[str] |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
options |
typing.Optional[Options] |
serialization_settings |
typing.Optional[SerializationSettings] |
register_script()
def register_script(
entity: typing.Union[WorkflowBase, PythonTask, LaunchPlan],
image_config: typing.Optional[ImageConfig],
version: typing.Optional[str],
project: typing.Optional[str],
domain: typing.Optional[str],
destination_dir: str,
copy_all: bool,
default_launch_plan: bool,
options: typing.Optional[Options],
source_path: typing.Optional[str],
module_name: typing.Optional[str],
envs: typing.Optional[typing.Dict[str, str]],
fast_package_options: typing.Optional[FastPackageOptions],
):
Use this method to register a workflow via script mode.
Parameter |
Type |
entity |
typing.Union[WorkflowBase, PythonTask, LaunchPlan] |
image_config |
typing.Optional[ImageConfig] |
version |
typing.Optional[str] |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
destination_dir |
str |
copy_all |
bool |
default_launch_plan |
bool |
options |
typing.Optional[Options] |
source_path |
typing.Optional[str] |
module_name |
typing.Optional[str] |
envs |
typing.Optional[typing.Dict[str, str]] |
fast_package_options |
typing.Optional[FastPackageOptions] |
register_task()
def register_task(
entity: PythonTask,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
):
Register a qualified task (PythonTask) with Remote
For any conflicting parameters method arguments are regarded as overrides
Parameter |
Type |
entity |
PythonTask |
serialization_settings |
typing.Optional[SerializationSettings] |
version |
typing.Optional[str] |
register_workflow()
def register_workflow(
entity: WorkflowBase,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
default_launch_plan: typing.Optional[bool],
options: typing.Optional[Options],
):
Use this method to register a workflow.
Parameter |
Type |
entity |
WorkflowBase |
serialization_settings |
typing.Optional[SerializationSettings] |
version |
typing.Optional[str] |
default_launch_plan |
typing.Optional[bool] |
options |
typing.Optional[Options] |
reject()
def reject(
signal_id: str,
execution_name: str,
project: str,
domain: str,
):
Parameter |
Type |
signal_id |
str |
execution_name |
str |
project |
str |
domain |
str |
remote_context()
Context manager with remote-specific configuration.
def set_input(
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project,
domain,
python_type,
literal_type,
):
Parameter |
Type |
signal_id |
str |
execution_name |
str |
value |
typing.Union[literal_models.Literal, typing.Any] |
project |
|
domain |
|
python_type |
|
literal_type |
|
set_signal()
def set_signal(
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project: typing.Optional[str],
domain: typing.Optional[str],
python_type: typing.Optional[typing.Type],
literal_type: typing.Optional[type_models.LiteralType],
):
Parameter |
Type |
signal_id |
str |
execution_name |
str |
value |
typing.Union[literal_models.Literal, typing.Any] |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
python_type |
typing.Optional[typing.Type] |
literal_type |
typing.Optional[type_models.LiteralType] |
sync()
def sync(
execution: FlyteWorkflowExecution,
entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
sync_nodes: bool,
):
This function was previously a singledispatchmethod. We’ve removed that but this function remains
so that we don’t break people.
Parameter |
Type |
execution |
FlyteWorkflowExecution |
entity_definition |
typing.Union[FlyteWorkflow, FlyteTask] |
sync_nodes |
bool |
sync_execution()
def sync_execution(
execution: FlyteWorkflowExecution,
entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
sync_nodes: bool,
):
Sync a FlyteWorkflowExecution object with its corresponding remote state.
Parameter |
Type |
execution |
FlyteWorkflowExecution |
entity_definition |
typing.Union[FlyteWorkflow, FlyteTask] |
sync_nodes |
bool |
sync_node_execution()
def sync_node_execution(
execution: FlyteNodeExecution,
node_mapping: typing.Dict[str, FlyteNode],
):
Get data backing a node execution. These FlyteNodeExecution objects should’ve come from Admin with the model
fields already populated correctly. For purposes of the remote experience, we’d like to supplement the object
with some additional fields:
- inputs/outputs
- task/workflow executions, and/or underlying node executions in the case of parent nodes
- TypedInterface (remote wrapper type)
A node can have several different types of executions behind it. That is, the node could’ve run (perhaps
multiple times because of retries):
- A task
- A static subworkflow
- A dynamic subworkflow (which in turn may have run additional tasks, subwfs, and/or launch plans)
- A launch plan
The data model is complicated, so ascertaining which of these happened is a bit tricky. That logic is
encapsulated in this function.
Parameter |
Type |
execution |
FlyteNodeExecution |
node_mapping |
typing.Dict[str, FlyteNode] |
sync_task_execution()
def sync_task_execution(
execution: FlyteTaskExecution,
entity_interface: typing.Optional[TypedInterface],
):
Sync a FlyteTaskExecution object with its corresponding remote state.
Parameter |
Type |
execution |
FlyteTaskExecution |
entity_interface |
typing.Optional[TypedInterface] |
terminate()
def terminate(
execution: FlyteWorkflowExecution,
cause: str,
):
Terminate a workflow execution.
Parameter |
Type |
execution |
FlyteWorkflowExecution |
cause |
str |
upload_file()
def upload_file(
to_upload: pathlib.Path,
project: typing.Optional[str],
domain: typing.Optional[str],
filename_root: typing.Optional[str],
):
Function will use remote’s client to hash and then upload the file using Admin’s data proxy service.
Parameter |
Type |
to_upload |
pathlib.Path |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
filename_root |
typing.Optional[str] |
wait()
def wait(
execution: FlyteWorkflowExecution,
timeout: typing.Optional[typing.Union[timedelta, int]],
poll_interval: typing.Optional[typing.Union[timedelta, int]],
sync_nodes: bool,
):
Wait for an execution to finish.
Parameter |
Type |
execution |
FlyteWorkflowExecution |
timeout |
typing.Optional[typing.Union[timedelta, int]] |
poll_interval |
typing.Optional[typing.Union[timedelta, int]] |
sync_nodes |
bool |
Properties
Property |
Type |
Description |
client |
|
|
config |
|
|
context |
|
|
default_domain |
|
|
default_project |
|
|
file_access |
|
|
interactive_mode_enabled |
|
|
flytekit.remote.remote.FlyteTask
A class encapsulating a remote Flyte task.
def FlyteTask(
id,
type,
metadata,
interface,
custom,
container,
task_type_version: int,
security_context,
config,
k8s_pod,
sql,
extended_resources,
should_register: bool,
):
Parameter |
Type |
id |
|
type |
|
metadata |
|
interface |
|
custom |
|
container |
|
task_type_version |
int |
security_context |
|
config |
|
k8s_pod |
|
sql |
|
extended_resources |
|
should_register |
bool |
Methods
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
execute()
Parameter |
Type |
kwargs |
**kwargs |
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
def promote_from_model(
base_model: _task_model.TaskTemplate,
):
Parameter |
Type |
base_model |
_task_model.TaskTemplate |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
config |
|
|
container |
|
|
custom |
|
|
docs |
|
|
entity_type_text |
|
|
extended_resources |
|
|
id |
|
|
interface |
|
|
is_empty |
|
|
k8s_pod |
|
|
metadata |
|
|
name |
|
|
python_interface |
|
|
resource_type |
|
|
security_context |
|
|
should_register |
|
|
sql |
|
|
task_type_version |
|
|
template |
|
|
type |
|
|
flytekit.remote.remote.FlyteTaskExecution
A class encapsulating a task execution being run on a Flyte remote backend.
def FlyteTaskExecution(
args,
kwargs,
):
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
Methods
from_flyte_idl()
def from_flyte_idl(
proto,
):
def promote_from_model(
base_model: admin_task_execution_models.TaskExecution,
):
Parameter |
Type |
base_model |
admin_task_execution_models.TaskExecution |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
closure |
|
|
error |
|
|
id |
|
|
input_uri |
|
|
inputs |
|
|
is_done |
|
|
is_empty |
|
|
is_parent |
|
|
outputs |
|
|
task |
|
|
flytekit.remote.remote.FlyteTaskNode
A class encapsulating a task that a Flyte node needs to execute.
def FlyteTaskNode(
flyte_task: FlyteTask,
):
Refers to the task that the Node is to execute.
This is currently a oneof in protobuf, but there’s only one option currently.
This code should be updated when more options are available.
Parameter |
Type |
flyte_task |
FlyteTask |
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
def promote_from_model(
task: FlyteTask,
):
Takes the idl wrapper for a TaskNode,
and returns the hydrated Flytekit object for it by fetching it with the FlyteTask control plane.
Parameter |
Type |
task |
FlyteTask |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
flyte_task |
|
|
is_empty |
|
|
overrides |
|
|
reference_id |
|
|
flytekit.remote.remote.FlyteValueException
Inappropriate argument value (of correct type).
def FlyteValueException(
received_value,
error_message,
):
Parameter |
Type |
received_value |
|
error_message |
|
Properties
Property |
Type |
Description |
timestamp |
|
|
flytekit.remote.remote.FlyteWorkflow
A class encapsulating a remote Flyte workflow.
def FlyteWorkflow(
id: id_models.Identifier,
nodes: List[FlyteNode],
interface,
output_bindings,
metadata,
metadata_defaults,
subworkflows: Optional[List[FlyteWorkflow]],
tasks: Optional[List[FlyteTask]],
launch_plans: Optional[Dict[id_models.Identifier, launch_plan_models.LaunchPlanSpec]],
compiled_closure: Optional[compiler_models.CompiledWorkflowClosure],
should_register: bool,
):
Parameter |
Type |
id |
id_models.Identifier |
nodes |
List[FlyteNode] |
interface |
|
output_bindings |
|
metadata |
|
metadata_defaults |
|
subworkflows |
Optional[List[FlyteWorkflow]] |
tasks |
Optional[List[FlyteTask]] |
launch_plans |
Optional[Dict[id_models.Identifier, launch_plan_models.LaunchPlanSpec]] |
compiled_closure |
Optional[compiler_models.CompiledWorkflowClosure] |
should_register |
bool |
Methods
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
execute()
Parameter |
Type |
kwargs |
**kwargs |
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
get_non_system_nodes()
def get_non_system_nodes(
nodes: List[_workflow_models.Node],
):
Parameter |
Type |
nodes |
List[_workflow_models.Node] |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
def promote_from_closure(
closure: compiler_models.CompiledWorkflowClosure,
node_launch_plans: Optional[Dict[id_models, launch_plan_models.LaunchPlanSpec]],
):
Extracts out the relevant portions of a FlyteWorkflow from a closure from the control plane.
Parameter |
Type |
closure |
compiler_models.CompiledWorkflowClosure |
node_launch_plans |
Optional[Dict[id_models, launch_plan_models.LaunchPlanSpec]] |
def promote_from_model(
base_model: _workflow_models.WorkflowTemplate,
sub_workflows: Optional[Dict[Identifier, _workflow_models.WorkflowTemplate]],
tasks: Optional[Dict[Identifier, FlyteTask]],
node_launch_plans: Optional[Dict[Identifier, launch_plan_models.LaunchPlanSpec]],
):
Parameter |
Type |
base_model |
_workflow_models.WorkflowTemplate |
sub_workflows |
Optional[Dict[Identifier, _workflow_models.WorkflowTemplate]] |
tasks |
Optional[Dict[Identifier, FlyteTask]] |
node_launch_plans |
Optional[Dict[Identifier, launch_plan_models.LaunchPlanSpec]] |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
docs |
|
|
entity_type_text |
|
|
failure_node |
|
|
flyte_nodes |
|
|
flyte_sub_workflows |
|
|
flyte_tasks |
|
|
id |
|
|
interface |
|
|
is_empty |
|
|
metadata |
|
|
metadata_defaults |
|
|
name |
|
|
nodes |
|
|
outputs |
|
|
python_interface |
|
|
resource_type |
|
|
should_register |
|
|
sub_workflows |
|
|
template |
|
|
flytekit.remote.remote.FlyteWorkflowExecution
A class encapsulating a workflow execution being run on a Flyte remote backend.
def FlyteWorkflowExecution(
type_hints: Optional[Dict[str, typing.Type]],
remote: Optional['FlyteRemote'],
args,
kwargs,
):
Parameter |
Type |
type_hints |
Optional[Dict[str, typing.Type]] |
remote |
Optional['FlyteRemote'] |
args |
*args |
kwargs |
**kwargs |
Methods
from_flyte_idl()
def from_flyte_idl(
pb,
):
def promote_from_model(
base_model: execution_models.Execution,
remote: Optional['FlyteRemote'],
type_hints: Optional[Dict[str, typing.Type]],
):
Parameter |
Type |
base_model |
execution_models.Execution |
remote |
Optional['FlyteRemote'] |
type_hints |
Optional[Dict[str, typing.Type]] |
serialize_to_string()
def serialize_to_string()
short_string()
sync()
def sync(
sync_nodes: bool,
):
Sync the state of the current execution and returns a new object with the updated state.
Parameter |
Type |
sync_nodes |
bool |
to_flyte_idl()
verbose_string()
wait()
def wait(
timeout: Optional[Union[timedelta, int]],
poll_interval: Optional[Union[timedelta, int]],
sync_nodes: bool,
):
Wait for the execution to complete. This is a blocking call.
Parameter |
Type |
timeout |
Optional[Union[timedelta, int]] |
poll_interval |
Optional[Union[timedelta, int]] |
sync_nodes |
bool |
Properties
Property |
Type |
Description |
closure |
|
|
error |
|
|
execution_url |
|
|
flyte_workflow |
|
|
id |
|
|
inputs |
|
|
is_done |
|
|
is_empty |
|
|
is_successful |
|
|
node_executions |
|
|
outputs |
|
|
spec |
|
|
flytekit.remote.remote.Identifier
def Identifier(
resource_type,
project,
domain,
name,
version,
):
Parameter |
Type |
resource_type |
|
project |
|
domain |
|
name |
|
version |
|
Methods
from_flyte_idl()
def from_flyte_idl(
p,
):
resource_type_name()
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
domain |
|
|
is_empty |
|
|
name |
|
|
project |
|
|
resource_type |
|
|
version |
|
|
flytekit.remote.remote.ImageConfig
We recommend you to use ImageConfig.auto(img_name=None) to create an ImageConfig.
For example, ImageConfig.auto(img_name=““ghcr.io/flyteorg/flytecookbook:v1.0.0"”) will create an ImageConfig.
ImageConfig holds available images which can be used at registration time. A default image can be specified
along with optional additional images. Each image in the config must have a unique name.
Attributes:
default_image (Optional[Image]): The default image to be used as a container for task serialization.
images (List[Image]): Optional, additional images which can be used in task container definitions.
def ImageConfig(
default_image: Optional[Image],
images: Optional[List[Image]],
):
Parameter |
Type |
default_image |
Optional[Image] |
images |
Optional[List[Image]] |
Methods
auto()
def auto(
config_file: typing.Union[str, ConfigFile, None],
img_name: Optional[str],
):
Reads from config file or from img_name
Note that this function does not take into account the flytekit default images (see the Dockerfiles at the
base of this repo). To pick those up, see the auto_default_image function..
Parameter |
Type |
config_file |
typing.Union[str, ConfigFile, None] |
img_name |
Optional[str] |
auto_default_image()
create_from()
def create_from(
default_image: Optional[Image],
other_images: typing.Optional[typing.List[Image]],
):
Parameter |
Type |
default_image |
Optional[Image] |
other_images |
typing.Optional[typing.List[Image]] |
find_image()
Return an image, by name, if it exists.
from_dict()
def from_dict(
kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
infer_missing,
):
Parameter |
Type |
kvs |
typing.Union[dict, list, str, int, float, bool, NoneType] |
infer_missing |
|
from_images()
def from_images(
default_image: str,
m: typing.Optional[typing.Dict[str, str]],
):
Allows you to programmatically create an ImageConfig. Usually only the default_image is required, unless
your workflow uses multiple images
.. code:: python
ImageConfig.from_dict(
“ghcr.io/flyteorg/flytecookbook:v1.0.0”,
{
“spark”: “ghcr.io/flyteorg/myspark:…”,
“other”: “…”,
}
)
urn:
Parameter |
Type |
default_image |
str |
m |
typing.Optional[typing.Dict[str, str]] |
from_json()
def from_json(
s: typing.Union[str, bytes, bytearray],
parse_float,
parse_int,
parse_constant,
infer_missing,
kw,
):
Parameter |
Type |
s |
typing.Union[str, bytes, bytearray] |
parse_float |
|
parse_int |
|
parse_constant |
|
infer_missing |
|
kw |
|
schema()
def schema(
infer_missing: bool,
only,
exclude,
many: bool,
context,
load_only,
dump_only,
partial: bool,
unknown,
):
Parameter |
Type |
infer_missing |
bool |
only |
|
exclude |
|
many |
bool |
context |
|
load_only |
|
dump_only |
|
partial |
bool |
unknown |
|
to_dict()
def to_dict(
encode_json,
):
Parameter |
Type |
encode_json |
|
to_json()
def to_json(
skipkeys: bool,
ensure_ascii: bool,
check_circular: bool,
allow_nan: bool,
indent: typing.Union[int, str, NoneType],
separators: typing.Tuple[str, str],
default: typing.Callable,
sort_keys: bool,
kw,
):
Parameter |
Type |
skipkeys |
bool |
ensure_ascii |
bool |
check_circular |
bool |
allow_nan |
bool |
indent |
typing.Union[int, str, NoneType] |
separators |
typing.Tuple[str, str] |
default |
typing.Callable |
sort_keys |
bool |
kw |
|
validate_image()
def validate_image(
_: typing.Any,
param: str,
values: tuple,
):
Validates the image to match the standard format. Also validates that only one default image
is provided. a default image, is one that is specified as default=<image_uri>
or just <image_uri>
. All
other images should be provided with a name, in the format name=<image_uri>
This method can be used with the
CLI
Parameter |
Type |
_ |
typing.Any |
param |
str |
values |
tuple |
flytekit.remote.remote.ImageSpec
This class is used to specify the docker image that will be used to run the task.
def ImageSpec(
name: str,
python_version: str,
builder: typing.Optional[str],
source_root: typing.Optional[str],
env: typing.Optional[typing.Dict[str, str]],
registry: typing.Optional[str],
packages: typing.Optional[typing.List[str]],
conda_packages: typing.Optional[typing.List[str]],
conda_channels: typing.Optional[typing.List[str]],
requirements: typing.Optional[str],
apt_packages: typing.Optional[typing.List[str]],
cuda: typing.Optional[str],
cudnn: typing.Optional[str],
base_image: typing.Union[str, ForwardRef('ImageSpec'), NoneType],
platform: str,
pip_index: typing.Optional[str],
pip_extra_index_url: typing.Optional[typing.List[str]],
pip_secret_mounts: typing.Optional[typing.List[typing.Tuple[str, str]]],
pip_extra_args: typing.Optional[str],
registry_config: typing.Optional[str],
entrypoint: typing.Optional[typing.List[str]],
commands: typing.Optional[typing.List[str]],
tag_format: typing.Optional[str],
source_copy_mode: typing.Optional[flytekit.constants.CopyFileDetection],
copy: typing.Optional[typing.List[str]],
python_exec: typing.Optional[str],
):
Parameter |
Type |
name |
str |
python_version |
str |
builder |
typing.Optional[str] |
source_root |
typing.Optional[str] |
env |
typing.Optional[typing.Dict[str, str]] |
registry |
typing.Optional[str] |
packages |
typing.Optional[typing.List[str]] |
conda_packages |
typing.Optional[typing.List[str]] |
conda_channels |
typing.Optional[typing.List[str]] |
requirements |
typing.Optional[str] |
apt_packages |
typing.Optional[typing.List[str]] |
cuda |
typing.Optional[str] |
cudnn |
typing.Optional[str] |
base_image |
typing.Union[str, ForwardRef('ImageSpec'), NoneType] |
platform |
str |
pip_index |
typing.Optional[str] |
pip_extra_index_url |
typing.Optional[typing.List[str]] |
pip_secret_mounts |
typing.Optional[typing.List[typing.Tuple[str, str]]] |
pip_extra_args |
typing.Optional[str] |
registry_config |
typing.Optional[str] |
entrypoint |
typing.Optional[typing.List[str]] |
commands |
typing.Optional[typing.List[str]] |
tag_format |
typing.Optional[str] |
source_copy_mode |
typing.Optional[flytekit.constants.CopyFileDetection] |
copy |
typing.Optional[typing.List[str]] |
python_exec |
typing.Optional[str] |
Methods
Method |
Description |
exist() |
Check if the image exists in the registry |
force_push() |
Builder that returns a new image spec with force push enabled |
from_env() |
Create ImageSpec with the environment’s Python version and packages pinned to the ones in the environment |
image_name() |
Full image name with tag |
is_container() |
Check if the current container image in the pod is built from current image spec |
with_apt_packages() |
Builder that returns a new image spec with an additional list of apt packages that will be executed during the building process |
with_commands() |
Builder that returns a new image spec with an additional list of commands that will be executed during the building process |
with_copy() |
Builder that returns a new image spec with the source files copied to the destination directory |
with_packages() |
Builder that returns a new image speck with additional python packages that will be installed during the building process |
exist()
Check if the image exists in the registry.
Return True if the image exists in the registry, False otherwise.
Return None if failed to check if the image exists due to the permission issue or other reasons.
force_push()
Builder that returns a new image spec with force push enabled.
from_env()
def from_env(
pinned_packages: typing.Optional[typing.List[str]],
kwargs,
):
Create ImageSpec with the environment’s Python version and packages pinned to the ones in the environment.
Parameter |
Type |
pinned_packages |
typing.Optional[typing.List[str]] |
kwargs |
**kwargs |
image_name()
Full image name with tag.
is_container()
Check if the current container image in the pod is built from current image spec.
:return: True if the current container image in the pod is built from current image spec, False otherwise.
with_apt_packages()
def with_apt_packages(
apt_packages: typing.Union[str, typing.List[str]],
):
Builder that returns a new image spec with an additional list of apt packages that will be executed during the building process.
Parameter |
Type |
apt_packages |
typing.Union[str, typing.List[str]] |
with_commands()
def with_commands(
commands: typing.Union[str, typing.List[str]],
):
Builder that returns a new image spec with an additional list of commands that will be executed during the building process.
Parameter |
Type |
commands |
typing.Union[str, typing.List[str]] |
with_copy()
def with_copy(
src: typing.Union[str, typing.List[str]],
):
Builder that returns a new image spec with the source files copied to the destination directory.
Parameter |
Type |
src |
typing.Union[str, typing.List[str]] |
with_packages()
def with_packages(
packages: typing.Union[str, typing.List[str]],
):
Builder that returns a new image speck with additional python packages that will be installed during the building process.
Parameter |
Type |
packages |
typing.Union[str, typing.List[str]] |
Properties
Property |
Type |
Description |
tag |
|
|
flytekit.remote.remote.LaunchPlan
Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the
:std:ref:core concepts <flyte:divedeep-launchplans>
if you are unfamiliar with them.
Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional
attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow
.. code-block:: python
@workflow
def wf(a: int, c: str) -> str:
…
Create the default launch plan with
.. code-block:: python
LaunchPlan.get_or_create(workflow=my_wf)
If you specify additional parameters, you’ll also have to give the launch plan a unique name. Default and
fixed inputs can be expressed as Python native values like so:
.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py
:start-after: # fixed_and_default_start
:end-before: # fixed_and_default_end
:language: python
:dedent: 4
Additionally, a launch plan can be configured to run on a schedule and emit notifications.
Please see the relevant Schedule and Notification objects as well.
To configure the remaining parameters, you’ll need to import the relevant model objects as well.
.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py
:start-after: # schedule_start
:end-before: # schedule_end
:language: python
:dedent: 4
.. code-block:: python
from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfig
Then use as follows
.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py
:start-after: # auth_role_start
:end-before: # auth_role_end
:language: python
:dedent: 4
def LaunchPlan(
name: str,
workflow: _annotated_workflow.WorkflowBase,
parameters: _interface_models.ParameterMap,
fixed_inputs: _literal_models.LiteralMap,
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
Parameter |
Type |
name |
str |
workflow |
_annotated_workflow.WorkflowBase |
parameters |
_interface_models.ParameterMap |
fixed_inputs |
_literal_models.LiteralMap |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
Methods
clone_with()
def clone_with(
name: str,
parameters: Optional[_interface_models.ParameterMap],
fixed_inputs: Optional[_literal_models.LiteralMap],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
Parameter |
Type |
name |
str |
parameters |
Optional[_interface_models.ParameterMap] |
fixed_inputs |
Optional[_literal_models.LiteralMap] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
def construct_node_metadata()
create()
def create(
name: str,
workflow: _annotated_workflow.WorkflowBase,
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
Parameter |
Type |
name |
str |
workflow |
_annotated_workflow.WorkflowBase |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
get_default_launch_plan()
def get_default_launch_plan(
ctx: FlyteContext,
workflow: _annotated_workflow.WorkflowBase,
):
Users should probably call the get_or_create function defined below instead. A default launch plan is the one
that will just pick up whatever default values are defined in the workflow function signature (if any) and
use the default auth information supplied during serialization, with no notifications or schedules.
Parameter |
Type |
ctx |
FlyteContext |
workflow |
_annotated_workflow.WorkflowBase |
get_or_create()
def get_or_create(
workflow: _annotated_workflow.WorkflowBase,
name: Optional[str],
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not
supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it
will be used. If creating the default launch plan, none of the other arguments may be specified.
The resulting launch plan is also cached and if called again with the same name, the
cached version is returned
Parameter |
Type |
workflow |
_annotated_workflow.WorkflowBase |
name |
Optional[str] |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
Properties
Property |
Type |
Description |
annotations |
|
|
fixed_inputs |
|
|
interface |
|
|
labels |
|
|
max_parallelism |
|
|
name |
|
|
notifications |
|
|
overwrite_cache |
|
|
parameters |
|
|
python_interface |
|
|
raw_output_data_config |
|
|
saved_inputs |
|
|
schedule |
|
|
security_context |
|
|
should_auto_activate |
|
|
trigger |
|
|
workflow |
|
|
flytekit.remote.remote.LaunchPlanState
Methods
enum_to_string()
def enum_to_string(
val,
):
flytekit.remote.remote.LazyEntity
Fetches the entity when the entity is called or when the entity is retrieved.
The entity is derived from RemoteEntity so that it behaves exactly like the mimicked entity.
def LazyEntity(
name: str,
getter: typing.Callable[[], ~T],
args,
kwargs,
):
Parameter |
Type |
name |
str |
getter |
typing.Callable[[], ~T] |
args |
*args |
kwargs |
**kwargs |
Methods
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
entity_fetched()
execute()
Parameter |
Type |
kwargs |
**kwargs |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
Properties
Property |
Type |
Description |
entity |
|
|
id |
|
|
name |
|
|
python_interface |
|
|
flytekit.remote.remote.Literal
def Literal(
scalar: typing.Optional[flytekit.models.literals.Scalar],
collection: typing.Optional[flytekit.models.literals.LiteralCollection],
map: typing.Optional[flytekit.models.literals.LiteralMap],
hash: typing.Optional[str],
metadata: typing.Optional[typing.Dict[str, str]],
offloaded_metadata: typing.Optional[flytekit.models.literals.LiteralOffloadedMetadata],
):
This IDL message represents a literal value in the Flyte ecosystem.
Parameter |
Type |
scalar |
typing.Optional[flytekit.models.literals.Scalar] |
collection |
typing.Optional[flytekit.models.literals.LiteralCollection] |
map |
typing.Optional[flytekit.models.literals.LiteralMap] |
hash |
typing.Optional[str] |
metadata |
typing.Optional[typing.Dict[str, str]] |
offloaded_metadata |
typing.Optional[flytekit.models.literals.LiteralOffloadedMetadata] |
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object: flyteidl.core.literals_pb2.Literal,
):
Parameter |
Type |
pb2_object |
flyteidl.core.literals_pb2.Literal |
serialize_to_string()
def serialize_to_string()
def set_metadata(
metadata: typing.Dict[str, str],
):
Note: This is a mutation on the literal
Parameter |
Type |
metadata |
typing.Dict[str, str] |
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
collection |
|
|
hash |
|
|
is_empty |
|
|
map |
|
|
metadata |
|
|
offloaded_metadata |
|
|
scalar |
|
|
value |
|
|
flytekit.remote.remote.LiteralMap
def LiteralMap(
literals,
):
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
is_empty |
|
|
literals |
|
|
flytekit.remote.remote.LiteralsResolver
LiteralsResolver is a helper class meant primarily for use with the FlyteRemote experience or any other situation
where you might be working with LiteralMaps. This object allows the caller to specify the Python type that should
correspond to an element of the map.
def LiteralsResolver(
literals: typing.Dict[str, Literal],
variable_map: Optional[Dict[str, _interface_models.Variable]],
ctx: Optional[FlyteContext],
):
Parameter |
Type |
literals |
typing.Dict[str, Literal] |
variable_map |
Optional[Dict[str, _interface_models.Variable]] |
ctx |
Optional[FlyteContext] |
Methods
as_python_native()
def as_python_native(
python_interface: Interface,
):
This should return the native Python representation, compatible with unpacking.
This function relies on Python interface outputs being ordered correctly.
Parameter |
Type |
python_interface |
Interface |
clear()
D.clear() -> None. Remove all items from D.
copy()
fromkeys()
def fromkeys(
iterable,
value,
):
Parameter |
Type |
iterable |
|
value |
|
get()
def get(
attr: str,
as_type: Optional[typing.Type],
):
This will get the attr
value from the Literal map, and invoke the TypeEngine to convert it into a Python
native value. A Python type can optionally be supplied. If successful, the native value will be cached and
future calls will return the cached value instead.
Parameter |
Type |
attr |
str |
as_type |
Optional[typing.Type] |
get_literal()
def get_literal(
key: str,
):
items()
D.items() -> a set-like object providing a view on D’s items
keys()
D.keys() -> a set-like object providing a view on D’s keys
pop()
def pop(
key,
default,
):
D.pop(k[,d]) -> v, remove specified key and return the corresponding value.
If key is not found, d is returned if given, otherwise KeyError is raised.
Parameter |
Type |
key |
|
default |
|
popitem()
D.popitem() -> (k, v), remove and return some (key, value) pair
as a 2-tuple; but raise KeyError if D is empty.
setdefault()
def setdefault(
key,
default,
):
D.setdefault(k[,d]) -> D.get(k,d), also set D[k]=d if k not in D
Parameter |
Type |
key |
|
default |
|
update()
def update(
other,
kwds,
):
D.update([E, ]**F) -> None. Update D from mapping/iterable E and F.
If E present and has a .keys() method, does: for k in E.keys(): D[k] = E[k]
If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v
In either case, this is followed by: for k, v in F.items(): D[k] = v
Parameter |
Type |
other |
|
kwds |
|
update_type_hints()
def update_type_hints(
type_hints: typing.Dict[str, typing.Type],
):
Parameter |
Type |
type_hints |
typing.Dict[str, typing.Type] |
values()
D.values() -> an object providing a view on D’s values
Properties
Property |
Type |
Description |
literals |
|
|
native_values |
|
|
variable_map |
|
|
flytekit.remote.remote.NamedEntityIdentifier
def NamedEntityIdentifier(
project,
domain,
name,
):
Parameter |
Type |
project |
|
domain |
|
name |
|
Methods
from_flyte_idl()
def from_flyte_idl(
idl_object,
):
Parameter |
Type |
idl_object |
|
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
Stores object to a Flyte-IDL defined protobuf.
verbose_string()
Properties
Property |
Type |
Description |
domain |
|
|
is_empty |
|
|
name |
|
|
project |
|
|
flytekit.remote.remote.Node
def Node(
id,
metadata,
inputs,
upstream_node_ids,
output_aliases,
task_node,
workflow_node,
branch_node,
gate_node: typing.Optional[flytekit.models.core.workflow.GateNode],
array_node: typing.Optional[flytekit.models.core.workflow.ArrayNode],
):
A Workflow graph Node. One unit of execution in the graph. Each node can be linked to a Task,
a Workflow or a branch node. One of the nodes must be specified.
Parameter |
Type |
id |
|
metadata |
|
inputs |
|
upstream_node_ids |
|
output_aliases |
|
task_node |
|
workflow_node |
|
branch_node |
|
gate_node |
typing.Optional[flytekit.models.core.workflow.GateNode] |
array_node |
typing.Optional[flytekit.models.core.workflow.ArrayNode] |
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
array_node |
|
|
branch_node |
|
|
gate_node |
|
|
id |
|
|
inputs |
|
|
is_empty |
|
|
metadata |
|
|
output_aliases |
|
|
target |
|
|
task_node |
|
|
upstream_node_ids |
|
|
workflow_node |
|
|
flytekit.remote.remote.NodeExecutionGetDataResponse
Currently, node, task, and workflow execution all have the same get data response. So we’ll create this common
superclass to reduce code duplication until things diverge in the future.
def NodeExecutionGetDataResponse(
args,
dynamic_workflow: typing.Optional[DynamicWorkflowNodeMetadata],
kwargs,
):
Parameter |
Type |
args |
*args |
dynamic_workflow |
typing.Optional[DynamicWorkflowNodeMetadata] |
kwargs |
**kwargs |
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
dynamic_workflow |
|
|
full_inputs |
|
|
full_outputs |
|
|
inputs |
|
|
is_empty |
|
|
outputs |
|
|
def NodeMetadata(
name,
timeout,
retries,
interruptible: typing.Optional[bool],
cacheable: typing.Optional[bool],
cache_version: typing.Optional[str],
cache_serializable: typing.Optional[bool],
):
Defines extra information about the Node.
Parameter |
Type |
name |
|
timeout |
|
retries |
|
interruptible |
typing.Optional[bool] |
cacheable |
typing.Optional[bool] |
cache_version |
typing.Optional[str] |
cache_serializable |
typing.Optional[bool] |
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
cache_serializable |
|
|
cache_version |
|
|
cacheable |
|
|
interruptible |
|
|
is_empty |
|
|
name |
|
|
retries |
|
|
timeout |
|
|
flytekit.remote.remote.NotificationList
def NotificationList(
notifications,
):
Parameter |
Type |
notifications |
|
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
is_empty |
|
|
notifications |
|
|
flytekit.remote.remote.Options
These are options that can be configured for a launchplan during registration or overridden during an execution.
For instance two people may want to run the same workflow but have the offloaded data stored in two different
buckets. Or you may want labels or annotations to be different. This object is used when launching an execution
in a Flyte backend, and also when registering launch plans.
def Options(
labels: typing.Optional[flytekit.models.common.Labels],
annotations: typing.Optional[flytekit.models.common.Annotations],
raw_output_data_config: typing.Optional[flytekit.models.common.RawOutputDataConfig],
security_context: typing.Optional[flytekit.models.security.SecurityContext],
max_parallelism: typing.Optional[int],
notifications: typing.Optional[typing.List[flytekit.models.common.Notification]],
disable_notifications: typing.Optional[bool],
overwrite_cache: typing.Optional[bool],
):
Parameter |
Type |
labels |
typing.Optional[flytekit.models.common.Labels] |
annotations |
typing.Optional[flytekit.models.common.Annotations] |
raw_output_data_config |
typing.Optional[flytekit.models.common.RawOutputDataConfig] |
security_context |
typing.Optional[flytekit.models.security.SecurityContext] |
max_parallelism |
typing.Optional[int] |
notifications |
typing.Optional[typing.List[flytekit.models.common.Notification]] |
disable_notifications |
typing.Optional[bool] |
overwrite_cache |
typing.Optional[bool] |
Methods
default_from()
def default_from(
k8s_service_account: typing.Optional[str],
raw_data_prefix: typing.Optional[str],
):
Parameter |
Type |
k8s_service_account |
typing.Optional[str] |
raw_data_prefix |
typing.Optional[str] |
flytekit.remote.remote.OrderedDict
Dictionary that remembers insertion order
flytekit.remote.remote.PickledEntity
Represents the structure of the pickled object stored in the .pkl file for interactive mode.
Attributes:
metadata: Metadata about the pickled entities including Python version
entities: Dictionary mapping entity names to their PythonAutoContainerTask instances
def PickledEntity(
metadata: PickledEntityMetadata,
entities: Dict[str, PythonAutoContainerTask],
):
Parameter |
Type |
metadata |
PickledEntityMetadata |
entities |
Dict[str, PythonAutoContainerTask] |
Metadata for a pickled entity containing version information.
Attributes:
python_version: The Python version string (e.g. “3.12.0”) used to create the pickle
def PickledEntityMetadata(
python_version: str,
):
Parameter |
Type |
python_version |
str |
flytekit.remote.remote.Progress
Renders an auto-updating progress bar(s).
def Progress(
columns: typing.Union[str, rich.progress.ProgressColumn],
console: typing.Optional[rich.console.Console],
auto_refresh: bool,
refresh_per_second: float,
speed_estimate_period: float,
transient: bool,
redirect_stdout: bool,
redirect_stderr: bool,
get_time: typing.Optional[typing.Callable[[], float]],
disable: bool,
expand: bool,
):
Parameter |
Type |
columns |
typing.Union[str, rich.progress.ProgressColumn] |
console |
typing.Optional[rich.console.Console] |
auto_refresh |
bool |
refresh_per_second |
float |
speed_estimate_period |
float |
transient |
bool |
redirect_stdout |
bool |
redirect_stderr |
bool |
get_time |
typing.Optional[typing.Callable[[], float]] |
disable |
bool |
expand |
bool |
Methods
add_task()
def add_task(
description: str,
start: bool,
total: typing.Optional[float],
completed: int,
visible: bool,
fields: typing.Any,
):
Add a new ’task’ to the Progress display.
Parameter |
Type |
description |
str |
start |
bool |
total |
typing.Optional[float] |
completed |
int |
visible |
bool |
fields |
typing.Any |
advance()
def advance(
task_id: rich.progress.TaskID,
advance: float,
):
Advance task by a number of steps.
Parameter |
Type |
task_id |
rich.progress.TaskID |
advance |
float |
get_default_columns()
def get_default_columns()
Get the default columns used for a new Progress instance:
- a text column for the description (TextColumn)
- the bar itself (BarColumn)
- a text column showing completion percentage (TextColumn)
- an estimated-time-remaining column (TimeRemainingColumn)
If the Progress instance is created without passing a columns argument,
the default columns defined here will be used.
You can also create a Progress instance using custom columns before
and/or after the defaults, as in this example:
progress = Progress(
SpinnerColumn(),
*Progress.get_default_columns(),
“Elapsed:”,
TimeElapsedColumn(),
)
This code shows the creation of a Progress display, containing
a spinner to the left, the default columns, and a labeled elapsed
time column.
get_renderable()
Get a renderable for the progress display.
get_renderables()
Get a number of renderables for the progress display.
make_tasks_table()
def make_tasks_table(
tasks: typing.Iterable[rich.progress.Task],
):
Get a table to render the Progress display.
Parameter |
Type |
tasks |
typing.Iterable[rich.progress.Task] |
open()
def open(
file: typing.Union[str, ForwardRef('PathLike[str]'), bytes],
mode: typing.Union[typing.Literal['rb'], typing.Literal['rt'], typing.Literal['r']],
buffering: int,
encoding: typing.Optional[str],
errors: typing.Optional[str],
newline: typing.Optional[str],
total: typing.Optional[int],
task_id: typing.Optional[rich.progress.TaskID],
description: str,
):
Track progress while reading from a binary file.
Parameter |
Type |
file |
typing.Union[str, ForwardRef('PathLike[str]'), bytes] |
mode |
typing.Union[typing.Literal['rb'], typing.Literal['rt'], typing.Literal['r']] |
buffering |
int |
encoding |
typing.Optional[str] |
errors |
typing.Optional[str] |
newline |
typing.Optional[str] |
total |
typing.Optional[int] |
task_id |
typing.Optional[rich.progress.TaskID] |
description |
str |
refresh()
Refresh (render) the progress information.
remove_task()
def remove_task(
task_id: rich.progress.TaskID,
):
Delete a task if it exists.
Parameter |
Type |
task_id |
rich.progress.TaskID |
reset()
def reset(
task_id: rich.progress.TaskID,
start: bool,
total: typing.Optional[float],
completed: int,
visible: typing.Optional[bool],
description: typing.Optional[str],
fields: typing.Any,
):
Reset a task so completed is 0 and the clock is reset.
Parameter |
Type |
task_id |
rich.progress.TaskID |
start |
bool |
total |
typing.Optional[float] |
completed |
int |
visible |
typing.Optional[bool] |
description |
typing.Optional[str] |
fields |
typing.Any |
start()
Start the progress display.
start_task()
def start_task(
task_id: rich.progress.TaskID,
):
Start a task.
Starts a task (used when calculating elapsed time). You may need to call this manually,
if you called add_task
with start=False
.
Parameter |
Type |
task_id |
rich.progress.TaskID |
stop()
Stop the progress display.
stop_task()
def stop_task(
task_id: rich.progress.TaskID,
):
Stop a task.
This will freeze the elapsed time on the task.
Parameter |
Type |
task_id |
rich.progress.TaskID |
track()
def track(
sequence: typing.Union[typing.Iterable[~ProgressType], typing.Sequence[~ProgressType]],
total: typing.Optional[float],
completed: int,
task_id: typing.Optional[rich.progress.TaskID],
description: str,
update_period: float,
):
Track progress by iterating over a sequence.
Parameter |
Type |
sequence |
typing.Union[typing.Iterable[~ProgressType], typing.Sequence[~ProgressType]] |
total |
typing.Optional[float] |
completed |
int |
task_id |
typing.Optional[rich.progress.TaskID] |
description |
str |
update_period |
float |
update()
def update(
task_id: rich.progress.TaskID,
total: typing.Optional[float],
completed: typing.Optional[float],
advance: typing.Optional[float],
description: typing.Optional[str],
visible: typing.Optional[bool],
refresh: bool,
fields: typing.Any,
):
Update information associated with a task.
Parameter |
Type |
task_id |
rich.progress.TaskID |
total |
typing.Optional[float] |
completed |
typing.Optional[float] |
advance |
typing.Optional[float] |
description |
typing.Optional[str] |
visible |
typing.Optional[bool] |
refresh |
bool |
fields |
typing.Any |
wrap_file()
def wrap_file(
file: typing.BinaryIO,
total: typing.Optional[int],
task_id: typing.Optional[rich.progress.TaskID],
description: str,
):
Track progress file reading from a binary file.
Parameter |
Type |
file |
typing.BinaryIO |
total |
typing.Optional[int] |
task_id |
typing.Optional[rich.progress.TaskID] |
description |
str |
Properties
Property |
Type |
Description |
console |
|
|
finished |
|
|
task_ids |
|
|
tasks |
|
|
flytekit.remote.remote.Project
def Project(
id,
name,
description,
state,
):
A project represents a logical grouping used to organize entities (tasks, workflows, executions) in the Flyte
platform.
Parameter |
Type |
id |
|
name |
|
description |
|
state |
|
Methods
active_project()
def active_project(
id,
):
archived_project()
def archived_project(
id,
):
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
description |
|
|
id |
|
|
is_empty |
|
|
name |
|
|
state |
|
|
flytekit.remote.remote.PythonAutoContainerTask
A Python AutoContainer task should be used as the base for all extensions that want the user’s code to be in the
container and the container information to be automatically captured.
This base will auto configure the image and image version to be used for all its derivatives.
If you are looking to extend, you might prefer to use PythonFunctionTask
or PythonInstanceTask
def PythonAutoContainerTask(
name: str,
task_config: T,
task_type,
container_image: Optional[Union[str, ImageSpec]],
requests: Optional[Resources],
limits: Optional[Resources],
environment: Optional[Dict[str, str]],
task_resolver: Optional[TaskResolverMixin],
secret_requests: Optional[List[Secret]],
pod_template: Optional[PodTemplate],
pod_template_name: Optional[str],
accelerator: Optional[BaseAccelerator],
shared_memory: Optional[Union[L[True], str]],
resources: Optional[Resources],
kwargs,
):
Parameter |
Type |
name |
str |
task_config |
T |
task_type |
|
container_image |
Optional[Union[str, ImageSpec]] |
requests |
Optional[Resources] |
limits |
Optional[Resources] |
environment |
Optional[Dict[str, str]] |
task_resolver |
Optional[TaskResolverMixin] |
secret_requests |
Optional[List[Secret]] |
pod_template |
Optional[PodTemplate] |
pod_template_name |
Optional[str] |
accelerator |
Optional[BaseAccelerator] |
shared_memory |
Optional[Union[L[True], str]] |
resources |
Optional[Resources] |
kwargs |
**kwargs |
Methods
Method |
Description |
compile() |
Generates a node that encapsulates this task in a workflow definition |
construct_node_metadata() |
Used when constructing the node that encapsulates this task as part of a broader workflow definition |
dispatch_execute() |
This method translates Flyte’s Type system based input values and invokes the actual call to the executor |
execute() |
This method will be invoked to execute the task |
find_lhs() |
None |
get_command() |
Returns the command which should be used in the container definition for the serialized version of this task |
get_config() |
Returns the task config as a serializable dictionary |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary |
get_default_command() |
Returns the default pyflyte-execute command used to run this on hosted Flyte platforms |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte |
get_image() |
Update image spec based on fast registration usage, and return string representing the image |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte |
get_type_for_input_var() |
Returns the python type for an input variable by name |
get_type_for_output_var() |
Returns the python type for the specified output variable by name |
local_execute() |
This function is used only in the local execution path and is responsible for calling dispatch execute |
local_execution_mode() |
None |
post_execute() |
Post execute is called after the execution has completed, with the user_params and can be used to clean-up, |
pre_execute() |
This is the method that will be invoked directly before executing the task method and before all the inputs |
reset_command_fn() |
Resets the command which should be used in the container definition of this task to the default arguments |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution |
set_command_fn() |
By default, the task will run on the Flyte platform using the pyflyte-execute command |
set_resolver() |
By default, flytekit uses the DefaultTaskResolver to resolve the task |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Generates a node that encapsulates this task in a workflow definition.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
This method translates Flyte’s Type system based input values and invokes the actual call to the executor
This method is also invoked during runtime.
VoidPromise
is returned in the case when the task itself declares no outputs.
Literal Map
is returned when the task returns either one more outputs in the declaration. Individual outputs
may be none
DynamicJobSpec
is returned when a dynamic workflow is executed
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
This method will be invoked to execute the task.
Parameter |
Type |
kwargs |
**kwargs |
find_lhs()
get_command()
def get_command(
settings: SerializationSettings,
):
Returns the command which should be used in the container definition for the serialized version of this task
registered on a hosted Flyte platform.
Parameter |
Type |
settings |
SerializationSettings |
get_config()
def get_config(
settings: SerializationSettings,
):
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom
defined for this task.
Parameter |
Type |
settings |
SerializationSettings |
get_container()
def get_container(
settings: SerializationSettings,
):
Returns the container definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
):
Return additional plugin-specific custom data (if any) as a serializable dictionary.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_default_command()
def get_default_command(
settings: SerializationSettings,
):
Returns the default pyflyte-execute command used to run this on hosted Flyte platforms.
Parameter |
Type |
settings |
SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: SerializationSettings,
):
Returns the extended resources to allocate to the task on hosted Flyte.
Parameter |
Type |
settings |
SerializationSettings |
get_image()
def get_image(
settings: SerializationSettings,
):
Update image spec based on fast registration usage, and return string representing the image
Parameter |
Type |
settings |
SerializationSettings |
Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: SerializationSettings,
):
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
):
Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
def get_type_for_input_var(
k: str,
v: typing.Any,
):
Returns the python type for an input variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
):
Returns the python type for the specified output variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
This function is used only in the local execution path and is responsible for calling dispatch execute.
Use this function when calling a task with native values (or Promises containing Flyte literals derived from
Python native values).
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
post_execute()
def post_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
rval: typing.Any,
):
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):
This is the method that will be invoked directly before executing the task method and before all the inputs
are converted. One particular case where this is useful is if the context is to be modified for the user process
to get some user space parameters. This also ensures that things like SparkSession are already correctly
setup before the type transformers are called
This should return either the same context of the mutated context
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
reset_command_fn()
Resets the command which should be used in the container definition of this task to the default arguments.
This is useful when the command line is overridden at serialization time.
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
set_command_fn()
def set_command_fn(
get_command_fn: Optional[Callable[[SerializationSettings], List[str]]],
):
By default, the task will run on the Flyte platform using the pyflyte-execute command.
However, it can be useful to update the command with which the task is serialized for specific cases like
running map tasks (“pyflyte-map-execute”) or for fast-executed tasks.
Parameter |
Type |
get_command_fn |
Optional[Callable[[SerializationSettings], List[str]]] |
set_resolver()
def set_resolver(
resolver: TaskResolverMixin,
):
By default, flytekit uses the DefaultTaskResolver to resolve the task. This method allows the user to set a custom
task resolver. It can be useful to override the task resolver for specific cases like running tasks in the jupyter notebook.
Parameter |
Type |
resolver |
TaskResolverMixin |
Properties
Property |
Type |
Description |
container_image |
|
|
deck_fields |
|
|
disable_deck |
|
|
docs |
|
|
enable_deck |
|
|
environment |
|
|
instantiated_in |
|
|
interface |
|
|
lhs |
|
|
location |
|
|
metadata |
|
|
name |
|
|
python_interface |
|
|
resources |
|
|
security_context |
|
|
task_config |
|
|
task_resolver |
|
|
task_type |
|
|
task_type_version |
|
|
flytekit.remote.remote.PythonFunctionTask
A Python Function task should be used as the base for all extensions that have a python function. It will
automatically detect interface of the python function and when serialized on the hosted Flyte platform handles the
writing execution command to execute the function
It is advised this task is used using the @task decorator as follows
.. code-block: python
@task
def my_func(a: int) -> str:
…
In the above code, the name of the function, the module, and the interface (inputs = int and outputs = str) will be
auto detected.
def PythonFunctionTask(
task_config: T,
task_function: Callable,
task_type,
ignore_input_vars: Optional[List[str]],
execution_mode: ExecutionBehavior,
task_resolver: Optional[TaskResolverMixin],
node_dependency_hints: Optional[Iterable[Union['PythonFunctionTask', '_annotated_launch_plan.LaunchPlan', WorkflowBase]]],
pickle_untyped: bool,
kwargs,
):
Parameter |
Type |
task_config |
T |
task_function |
Callable |
task_type |
|
ignore_input_vars |
Optional[List[str]] |
execution_mode |
ExecutionBehavior |
task_resolver |
Optional[TaskResolverMixin] |
node_dependency_hints |
Optional[Iterable[Union['PythonFunctionTask', '_annotated_launch_plan.LaunchPlan', WorkflowBase]]] |
pickle_untyped |
bool |
kwargs |
**kwargs |
Methods
Method |
Description |
compile() |
Generates a node that encapsulates this task in a workflow definition |
compile_into_workflow() |
In the case of dynamic workflows, this function will produce a workflow definition at execution time which will |
construct_node_metadata() |
Used when constructing the node that encapsulates this task as part of a broader workflow definition |
dispatch_execute() |
This method translates Flyte’s Type system based input values and invokes the actual call to the executor |
dynamic_execute() |
By the time this function is invoked, the local_execute function should have unwrapped the Promises and Flyte |
execute() |
This method will be invoked to execute the task |
find_lhs() |
None |
get_command() |
Returns the command which should be used in the container definition for the serialized version of this task |
get_config() |
Returns the task config as a serializable dictionary |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary |
get_default_command() |
Returns the default pyflyte-execute command used to run this on hosted Flyte platforms |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte |
get_image() |
Update image spec based on fast registration usage, and return string representing the image |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte |
get_type_for_input_var() |
Returns the python type for an input variable by name |
get_type_for_output_var() |
Returns the python type for the specified output variable by name |
local_execute() |
This function is used only in the local execution path and is responsible for calling dispatch execute |
local_execution_mode() |
None |
post_execute() |
Post execute is called after the execution has completed, with the user_params and can be used to clean-up, |
pre_execute() |
This is the method that will be invoked directly before executing the task method and before all the inputs |
reset_command_fn() |
Resets the command which should be used in the container definition of this task to the default arguments |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution |
set_command_fn() |
By default, the task will run on the Flyte platform using the pyflyte-execute command |
set_resolver() |
By default, flytekit uses the DefaultTaskResolver to resolve the task |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Generates a node that encapsulates this task in a workflow definition.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
compile_into_workflow()
def compile_into_workflow(
ctx: FlyteContext,
task_function: Callable,
kwargs,
):
In the case of dynamic workflows, this function will produce a workflow definition at execution time which will
then proceed to be executed.
Parameter |
Type |
ctx |
FlyteContext |
task_function |
Callable |
kwargs |
**kwargs |
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
This method translates Flyte’s Type system based input values and invokes the actual call to the executor
This method is also invoked during runtime.
VoidPromise
is returned in the case when the task itself declares no outputs.
Literal Map
is returned when the task returns either one more outputs in the declaration. Individual outputs
may be none
DynamicJobSpec
is returned when a dynamic workflow is executed
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
dynamic_execute()
def dynamic_execute(
task_function: Callable,
kwargs,
):
By the time this function is invoked, the local_execute function should have unwrapped the Promises and Flyte
literal wrappers so that the kwargs we are working with here are now Python native literal values. This
function is also expected to return Python native literal values.
Since the user code within a dynamic task constitute a workflow, we have to first compile the workflow, and
then execute that workflow.
When running for real in production, the task would stop after the compilation step, and then create a file
representing that newly generated workflow, instead of executing it.
Parameter |
Type |
task_function |
Callable |
kwargs |
**kwargs |
execute()
This method will be invoked to execute the task. If you do decide to override this method you must also
handle dynamic tasks or you will no longer be able to use the task as a dynamic task generator.
Parameter |
Type |
kwargs |
**kwargs |
find_lhs()
get_command()
def get_command(
settings: SerializationSettings,
):
Returns the command which should be used in the container definition for the serialized version of this task
registered on a hosted Flyte platform.
Parameter |
Type |
settings |
SerializationSettings |
get_config()
def get_config(
settings: SerializationSettings,
):
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom
defined for this task.
Parameter |
Type |
settings |
SerializationSettings |
get_container()
def get_container(
settings: SerializationSettings,
):
Returns the container definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
):
Return additional plugin-specific custom data (if any) as a serializable dictionary.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_default_command()
def get_default_command(
settings: SerializationSettings,
):
Returns the default pyflyte-execute command used to run this on hosted Flyte platforms.
Parameter |
Type |
settings |
SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: SerializationSettings,
):
Returns the extended resources to allocate to the task on hosted Flyte.
Parameter |
Type |
settings |
SerializationSettings |
get_image()
def get_image(
settings: SerializationSettings,
):
Update image spec based on fast registration usage, and return string representing the image
Parameter |
Type |
settings |
SerializationSettings |
Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: SerializationSettings,
):
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
):
Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
def get_type_for_input_var(
k: str,
v: typing.Any,
):
Returns the python type for an input variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
):
Returns the python type for the specified output variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
This function is used only in the local execution path and is responsible for calling dispatch execute.
Use this function when calling a task with native values (or Promises containing Flyte literals derived from
Python native values).
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
post_execute()
def post_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
rval: typing.Any,
):
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):
This is the method that will be invoked directly before executing the task method and before all the inputs
are converted. One particular case where this is useful is if the context is to be modified for the user process
to get some user space parameters. This also ensures that things like SparkSession are already correctly
setup before the type transformers are called
This should return either the same context of the mutated context
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
reset_command_fn()
Resets the command which should be used in the container definition of this task to the default arguments.
This is useful when the command line is overridden at serialization time.
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
set_command_fn()
def set_command_fn(
get_command_fn: Optional[Callable[[SerializationSettings], List[str]]],
):
By default, the task will run on the Flyte platform using the pyflyte-execute command.
However, it can be useful to update the command with which the task is serialized for specific cases like
running map tasks (“pyflyte-map-execute”) or for fast-executed tasks.
Parameter |
Type |
get_command_fn |
Optional[Callable[[SerializationSettings], List[str]]] |
set_resolver()
def set_resolver(
resolver: TaskResolverMixin,
):
By default, flytekit uses the DefaultTaskResolver to resolve the task. This method allows the user to set a custom
task resolver. It can be useful to override the task resolver for specific cases like running tasks in the jupyter notebook.
Parameter |
Type |
resolver |
TaskResolverMixin |
Properties
Property |
Type |
Description |
container_image |
|
|
deck_fields |
|
|
disable_deck |
|
|
docs |
|
|
enable_deck |
|
|
environment |
|
|
execution_mode |
|
|
instantiated_in |
|
|
interface |
|
|
lhs |
|
|
location |
|
|
metadata |
|
|
name |
|
|
node_dependency_hints |
|
|
python_interface |
|
|
resources |
|
|
security_context |
|
|
task_config |
|
|
task_function |
|
|
task_resolver |
|
|
task_type |
|
|
task_type_version |
|
|
flytekit.remote.remote.PythonFunctionWorkflow
Please read :std:ref:flyte:divedeep-workflows
first for a high-level understanding of what workflows are in Flyte.
This Python object represents a workflow defined by a function and decorated with the
:py:func:@workflow <flytekit.workflow>
decorator. Please see notes on that object for additional information.
def PythonFunctionWorkflow(
workflow_function: Callable,
metadata: WorkflowMetadata,
default_metadata: WorkflowMetadataDefaults,
docstring: Optional[Docstring],
on_failure: Optional[Union[WorkflowBase, Task]],
docs: Optional[Documentation],
pickle_untyped: bool,
default_options: Optional[Options],
):
Parameter |
Type |
workflow_function |
Callable |
metadata |
WorkflowMetadata |
default_metadata |
WorkflowMetadataDefaults |
docstring |
Optional[Docstring] |
on_failure |
Optional[Union[WorkflowBase, Task]] |
docs |
Optional[Documentation] |
pickle_untyped |
bool |
default_options |
Optional[Options] |
Methods
add()
def add(
t: flytekit.core.python_auto_container.PythonAutoContainerTask,
):
Parameter |
Type |
t |
flytekit.core.python_auto_container.PythonAutoContainerTask |
compile()
Supply static Python native values in the kwargs if you want them to be used in the compilation. This mimics
a ‘closure’ in the traditional sense of the word.
Parameter |
Type |
kwargs |
**kwargs |
def construct_node_metadata()
execute()
This function is here only to try to streamline the pattern between workflows and tasks. Since tasks
call execute from dispatch_execute which is in local_execute, workflows should also call an execute inside
local_execute. This makes mocking cleaner.
Parameter |
Type |
kwargs |
**kwargs |
find_lhs()
get_all_tasks()
Future proof method. Just making it easy to access all tasks (Not required today as we auto register them)
load_task()
def load_task(
loader_args: typing.List[str],
):
Given the set of identifier keys, should return one Python Task or raise an error if not found
Parameter |
Type |
loader_args |
typing.List[str] |
loader_args()
def loader_args(
settings: flytekit.configuration.SerializationSettings,
t: flytekit.core.python_auto_container.PythonAutoContainerTask,
):
This is responsible for turning an instance of a task into args that the load_task function can reconstitute.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
t |
flytekit.core.python_auto_container.PythonAutoContainerTask |
local_execute()
def local_execute(
ctx: FlyteContext,
kwargs,
):
Parameter |
Type |
ctx |
FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
task_name()
def task_name(
t: PythonAutoContainerTask,
):
Overridable function that can optionally return a custom name for a given task
Parameter |
Type |
t |
PythonAutoContainerTask |
Properties
Property |
Type |
Description |
default_options |
|
|
docs |
|
|
failure_node |
|
|
function |
|
|
instantiated_in |
|
|
interface |
|
|
lhs |
|
|
location |
|
|
name |
|
|
nodes |
|
|
on_failure |
|
|
output_bindings |
|
|
python_interface |
|
|
short_name |
|
|
workflow_metadata |
|
|
workflow_metadata_defaults |
|
|
flytekit.remote.remote.PythonTask
Base Class for all Tasks with a Python native Interface
. This should be directly used for task types, that do
not have a python function to be executed. Otherwise refer to :py:class:flytekit.PythonFunctionTask
.
def PythonTask(
task_type: str,
name: str,
task_config: typing.Optional[~T],
interface: typing.Optional[flytekit.core.interface.Interface],
environment: typing.Optional[typing.Dict[str, str]],
disable_deck: typing.Optional[bool],
enable_deck: typing.Optional[bool],
deck_fields: typing.Optional[typing.Tuple[flytekit.deck.deck.DeckField, ...]],
kwargs,
):
Parameter |
Type |
task_type |
str |
name |
str |
task_config |
typing.Optional[~T] |
interface |
typing.Optional[flytekit.core.interface.Interface] |
environment |
typing.Optional[typing.Dict[str, str]] |
disable_deck |
typing.Optional[bool] |
enable_deck |
typing.Optional[bool] |
deck_fields |
typing.Optional[typing.Tuple[flytekit.deck.deck.DeckField, ...]] |
kwargs |
**kwargs |
Methods
Method |
Description |
compile() |
Generates a node that encapsulates this task in a workflow definition |
construct_node_metadata() |
Used when constructing the node that encapsulates this task as part of a broader workflow definition |
dispatch_execute() |
This method translates Flyte’s Type system based input values and invokes the actual call to the executor |
execute() |
This method will be invoked to execute the task |
find_lhs() |
None |
get_config() |
Returns the task config as a serializable dictionary |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte |
get_type_for_input_var() |
Returns the python type for an input variable by name |
get_type_for_output_var() |
Returns the python type for the specified output variable by name |
local_execute() |
This function is used only in the local execution path and is responsible for calling dispatch execute |
local_execution_mode() |
None |
post_execute() |
Post execute is called after the execution has completed, with the user_params and can be used to clean-up, |
pre_execute() |
This is the method that will be invoked directly before executing the task method and before all the inputs |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Generates a node that encapsulates this task in a workflow definition.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
This method translates Flyte’s Type system based input values and invokes the actual call to the executor
This method is also invoked during runtime.
VoidPromise
is returned in the case when the task itself declares no outputs.
Literal Map
is returned when the task returns either one more outputs in the declaration. Individual outputs
may be none
DynamicJobSpec
is returned when a dynamic workflow is executed
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
This method will be invoked to execute the task.
Parameter |
Type |
kwargs |
**kwargs |
find_lhs()
get_config()
def get_config(
settings: flytekit.configuration.SerializationSettings,
):
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom
defined for this task.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_container()
def get_container(
settings: flytekit.configuration.SerializationSettings,
):
Returns the container definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
):
Return additional plugin-specific custom data (if any) as a serializable dictionary.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: flytekit.configuration.SerializationSettings,
):
Returns the extended resources to allocate to the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: flytekit.configuration.SerializationSettings,
):
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
):
Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
def get_type_for_input_var(
k: str,
v: typing.Any,
):
Returns the python type for an input variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
):
Returns the python type for the specified output variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
This function is used only in the local execution path and is responsible for calling dispatch execute.
Use this function when calling a task with native values (or Promises containing Flyte literals derived from
Python native values).
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
post_execute()
def post_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
rval: typing.Any,
):
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):
This is the method that will be invoked directly before executing the task method and before all the inputs
are converted. One particular case where this is useful is if the context is to be modified for the user process
to get some user space parameters. This also ensures that things like SparkSession are already correctly
setup before the type transformers are called
This should return either the same context of the mutated context
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property |
Type |
Description |
deck_fields |
|
|
disable_deck |
|
|
docs |
|
|
enable_deck |
|
|
environment |
|
|
instantiated_in |
|
|
interface |
|
|
lhs |
|
|
location |
|
|
metadata |
|
|
name |
|
|
python_interface |
|
|
security_context |
|
|
task_config |
|
|
task_type |
|
|
task_type_version |
|
|
flytekit.remote.remote.ReferenceEntity
def ReferenceEntity(
reference: typing.Union[flytekit.core.reference_entity.WorkflowReference, flytekit.core.reference_entity.TaskReference, flytekit.core.reference_entity.LaunchPlanReference],
inputs: typing.Dict[str, typing.Type],
outputs: typing.Dict[str, typing.Type],
):
Parameter |
Type |
reference |
typing.Union[flytekit.core.reference_entity.WorkflowReference, flytekit.core.reference_entity.TaskReference, flytekit.core.reference_entity.LaunchPlanReference] |
inputs |
typing.Dict[str, typing.Type] |
outputs |
typing.Dict[str, typing.Type] |
Methods
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
execute()
Parameter |
Type |
kwargs |
**kwargs |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Please see the local_execute comments in the main task.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
unwrap_literal_map_and_execute()
def unwrap_literal_map_and_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Please see the implementation of the dispatch_execute function in the real task.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property |
Type |
Description |
id |
|
|
interface |
|
|
name |
|
|
python_interface |
|
|
reference |
|
|
flytekit.remote.remote.ReferenceLaunchPlan
A reference launch plan serves as a pointer to a Launch Plan that already exists on your Flyte installation. This
object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface.
If at registration time the interface provided causes an issue with compilation, an error will be returned.
def ReferenceLaunchPlan(
project: str,
domain: str,
name: str,
version: str,
inputs: Dict[str, Type],
outputs: Dict[str, Type],
):
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
inputs |
Dict[str, Type] |
outputs |
Dict[str, Type] |
Methods
clone_with()
def clone_with(
name: str,
parameters: Optional[_interface_models.ParameterMap],
fixed_inputs: Optional[_literal_models.LiteralMap],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
Parameter |
Type |
name |
str |
parameters |
Optional[_interface_models.ParameterMap] |
fixed_inputs |
Optional[_literal_models.LiteralMap] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
create()
def create(
name: str,
workflow: _annotated_workflow.WorkflowBase,
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
Parameter |
Type |
name |
str |
workflow |
_annotated_workflow.WorkflowBase |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
execute()
Parameter |
Type |
kwargs |
**kwargs |
get_default_launch_plan()
def get_default_launch_plan(
ctx: FlyteContext,
workflow: _annotated_workflow.WorkflowBase,
):
Users should probably call the get_or_create function defined below instead. A default launch plan is the one
that will just pick up whatever default values are defined in the workflow function signature (if any) and
use the default auth information supplied during serialization, with no notifications or schedules.
Parameter |
Type |
ctx |
FlyteContext |
workflow |
_annotated_workflow.WorkflowBase |
get_or_create()
def get_or_create(
workflow: _annotated_workflow.WorkflowBase,
name: Optional[str],
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not
supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it
will be used. If creating the default launch plan, none of the other arguments may be specified.
The resulting launch plan is also cached and if called again with the same name, the
cached version is returned
Parameter |
Type |
workflow |
_annotated_workflow.WorkflowBase |
name |
Optional[str] |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Please see the local_execute comments in the main task.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
unwrap_literal_map_and_execute()
def unwrap_literal_map_and_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Please see the implementation of the dispatch_execute function in the real task.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property |
Type |
Description |
annotations |
|
|
fixed_inputs |
|
|
id |
|
|
interface |
|
|
labels |
|
|
max_parallelism |
|
|
name |
|
|
notifications |
|
|
overwrite_cache |
|
|
parameters |
|
|
python_interface |
|
|
raw_output_data_config |
|
|
reference |
|
|
saved_inputs |
|
|
schedule |
|
|
security_context |
|
|
should_auto_activate |
|
|
trigger |
|
|
workflow |
|
|
flytekit.remote.remote.ReferenceSpec
def ReferenceSpec(
template: flytekit.core.reference_entity.ReferenceTemplate,
):
Parameter |
Type |
template |
flytekit.core.reference_entity.ReferenceTemplate |
Properties
Property |
Type |
Description |
template |
|
|
flytekit.remote.remote.ReferenceTask
This is a reference task, the body of the function passed in through the constructor will never be used, only the
signature of the function will be. The signature should also match the signature of the task you’re referencing,
as stored by Flyte Admin, if not, workflows using this will break upon compilation.
def ReferenceTask(
project: str,
domain: str,
name: str,
version: str,
inputs: Dict[str, type],
outputs: Dict[str, Type],
):
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
inputs |
Dict[str, type] |
outputs |
Dict[str, Type] |
Methods
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
This method translates Flyte’s Type system based input values and invokes the actual call to the executor
This method is also invoked during runtime.
VoidPromise
is returned in the case when the task itself declares no outputs.
Literal Map
is returned when the task returns either one more outputs in the declaration. Individual outputs
may be none
DynamicJobSpec
is returned when a dynamic workflow is executed
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
Parameter |
Type |
kwargs |
**kwargs |
find_lhs()
get_config()
def get_config(
settings: flytekit.configuration.SerializationSettings,
):
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom
defined for this task.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_container()
def get_container(
settings: flytekit.configuration.SerializationSettings,
):
Returns the container definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
):
Return additional plugin-specific custom data (if any) as a serializable dictionary.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: flytekit.configuration.SerializationSettings,
):
Returns the extended resources to allocate to the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: flytekit.configuration.SerializationSettings,
):
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
):
Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
def get_type_for_input_var(
k: str,
v: typing.Any,
):
Returns the python type for an input variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
):
Returns the python type for the specified output variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Please see the local_execute comments in the main task.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
post_execute()
def post_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
rval: typing.Any,
):
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):
This is the method that will be invoked directly before executing the task method and before all the inputs
are converted. One particular case where this is useful is if the context is to be modified for the user process
to get some user space parameters. This also ensures that things like SparkSession are already correctly
setup before the type transformers are called
This should return either the same context of the mutated context
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
unwrap_literal_map_and_execute()
def unwrap_literal_map_and_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Please see the implementation of the dispatch_execute function in the real task.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property |
Type |
Description |
deck_fields |
|
|
disable_deck |
|
|
docs |
|
|
enable_deck |
|
|
environment |
|
|
id |
|
|
instantiated_in |
|
|
interface |
|
|
lhs |
|
|
location |
|
|
metadata |
|
|
name |
|
|
python_interface |
|
|
reference |
|
|
security_context |
|
|
task_config |
|
|
task_type |
|
|
task_type_version |
|
|
flytekit.remote.remote.ReferenceWorkflow
A reference workflow is a pointer to a workflow that already exists on your Flyte installation. This
object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface.
If at registration time the interface provided causes an issue with compilation, an error will be returned.
def ReferenceWorkflow(
project: str,
domain: str,
name: str,
version: str,
inputs: Dict[str, Type],
outputs: Dict[str, Type],
):
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
inputs |
Dict[str, Type] |
outputs |
Dict[str, Type] |
Methods
add()
def add(
t: flytekit.core.python_auto_container.PythonAutoContainerTask,
):
Parameter |
Type |
t |
flytekit.core.python_auto_container.PythonAutoContainerTask |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
execute()
Parameter |
Type |
kwargs |
**kwargs |
find_lhs()
get_all_tasks()
Future proof method. Just making it easy to access all tasks (Not required today as we auto register them)
load_task()
def load_task(
loader_args: typing.List[str],
):
Given the set of identifier keys, should return one Python Task or raise an error if not found
Parameter |
Type |
loader_args |
typing.List[str] |
loader_args()
def loader_args(
settings: flytekit.configuration.SerializationSettings,
t: flytekit.core.python_auto_container.PythonAutoContainerTask,
):
This is responsible for turning an instance of a task into args that the load_task function can reconstitute.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
t |
flytekit.core.python_auto_container.PythonAutoContainerTask |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Please see the local_execute comments in the main task.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
task_name()
def task_name(
t: PythonAutoContainerTask,
):
Overridable function that can optionally return a custom name for a given task
Parameter |
Type |
t |
PythonAutoContainerTask |
unwrap_literal_map_and_execute()
def unwrap_literal_map_and_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Please see the implementation of the dispatch_execute function in the real task.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property |
Type |
Description |
default_options |
|
|
docs |
|
|
failure_node |
|
|
function |
|
|
id |
|
|
instantiated_in |
|
|
interface |
|
|
lhs |
|
|
location |
|
|
name |
|
|
nodes |
|
|
on_failure |
|
|
output_bindings |
|
|
python_interface |
|
|
reference |
|
|
short_name |
|
|
workflow_metadata |
|
|
workflow_metadata_defaults |
|
|
flytekit.remote.remote.RegistrationSkipped
RegistrationSkipped error is raised when trying to register an entity that is not registrable.
flytekit.remote.remote.RemoteEntity
Helper class that provides a standard way to create an ABC using
inheritance.
def RemoteEntity(
args,
kwargs,
):
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
Methods
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
execute()
Parameter |
Type |
kwargs |
**kwargs |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
Properties
Property |
Type |
Description |
id |
|
|
name |
|
|
python_interface |
|
|
flytekit.remote.remote.ResolvedIdentifiers
def ResolvedIdentifiers(
project: str,
domain: str,
name: str,
version: str,
):
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
flytekit.remote.remote.ResourceType
flytekit.remote.remote.SerializationSettings
These settings are provided while serializing a workflow and task, before registration. This is required to get
runtime information at serialization time, as well as some defaults.
Attributes:
project (str): The project (if any) with which to register entities under.
domain (str): The domain (if any) with which to register entities under.
version (str): The version (if any) with which to register entities under.
image_config (ImageConfig): The image config used to define task container images.
env (Optional[Dict[str, str]]): Environment variables injected into task container definitions.
flytekit_virtualenv_root (Optional[str]): During out of container serialize the absolute path of the flytekit
virtualenv at serialization time won’t match the in-container value at execution time. This optional value
is used to provide the in-container virtualenv path
python_interpreter (Optional[str]): The python executable to use. This is used for spark tasks in out of
container execution.
entrypoint_settings (Optional[EntrypointSettings]): Information about the command, path and version of the
entrypoint program.
fast_serialization_settings (Optional[FastSerializationSettings]): If the code is being serialized so that it
can be fast registered (and thus omit building a Docker image) this object contains additional parameters
for serialization.
source_root (Optional[str]): The root directory of the source code.
def SerializationSettings(
image_config: ImageConfig,
project: typing.Optional[str],
domain: typing.Optional[str],
version: typing.Optional[str],
env: Optional[Dict[str, str]],
git_repo: Optional[str],
python_interpreter: str,
flytekit_virtualenv_root: Optional[str],
fast_serialization_settings: Optional[FastSerializationSettings],
source_root: Optional[str],
):
Parameter |
Type |
image_config |
ImageConfig |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
version |
typing.Optional[str] |
env |
Optional[Dict[str, str]] |
git_repo |
Optional[str] |
python_interpreter |
str |
flytekit_virtualenv_root |
Optional[str] |
fast_serialization_settings |
Optional[FastSerializationSettings] |
source_root |
Optional[str] |
Methods
default_entrypoint_settings()
def default_entrypoint_settings(
interpreter_path: str,
):
Assumes the entrypoint is installed in a virtual-environment where the interpreter is
Parameter |
Type |
interpreter_path |
str |
for_image()
def for_image(
image: str,
version: str,
project: str,
domain: str,
python_interpreter_path: str,
):
Parameter |
Type |
image |
str |
version |
str |
project |
str |
domain |
str |
python_interpreter_path |
str |
from_dict()
def from_dict(
kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
infer_missing,
):
Parameter |
Type |
kvs |
typing.Union[dict, list, str, int, float, bool, NoneType] |
infer_missing |
|
from_json()
def from_json(
s: typing.Union[str, bytes, bytearray],
parse_float,
parse_int,
parse_constant,
infer_missing,
kw,
):
Parameter |
Type |
s |
typing.Union[str, bytes, bytearray] |
parse_float |
|
parse_int |
|
parse_constant |
|
infer_missing |
|
kw |
|
from_transport()
def from_transport(
s: str,
):
new_builder()
Creates a SerializationSettings.Builder
that copies the existing serialization settings parameters and
allows for customization.
schema()
def schema(
infer_missing: bool,
only,
exclude,
many: bool,
context,
load_only,
dump_only,
partial: bool,
unknown,
):
Parameter |
Type |
infer_missing |
bool |
only |
|
exclude |
|
many |
bool |
context |
|
load_only |
|
dump_only |
|
partial |
bool |
unknown |
|
should_fast_serialize()
def should_fast_serialize()
Whether or not the serialization settings specify that entities should be serialized for fast registration.
to_dict()
def to_dict(
encode_json,
):
Parameter |
Type |
encode_json |
|
to_json()
def to_json(
skipkeys: bool,
ensure_ascii: bool,
check_circular: bool,
allow_nan: bool,
indent: typing.Union[int, str, NoneType],
separators: typing.Tuple[str, str],
default: typing.Callable,
sort_keys: bool,
kw,
):
Parameter |
Type |
skipkeys |
bool |
ensure_ascii |
bool |
check_circular |
bool |
allow_nan |
bool |
indent |
typing.Union[int, str, NoneType] |
separators |
typing.Tuple[str, str] |
default |
typing.Callable |
sort_keys |
bool |
kw |
|
venv_root_from_interpreter()
def venv_root_from_interpreter(
interpreter_path: str,
):
Computes the path of the virtual environment root, based on the passed in python interpreter path
for example /opt/venv/bin/python3 -> /opt/venv
Parameter |
Type |
interpreter_path |
str |
with_serialized_context()
def with_serialized_context()
Use this method to create a new SerializationSettings that has an environment variable set with the SerializedContext
This is useful in transporting SerializedContext to serialized and registered tasks.
The setting will be available in the env
field with the key SERIALIZED_CONTEXT_ENV_VAR
:return: A newly constructed SerializationSettings, or self, if it already has the serializationSettings
Properties
Property |
Type |
Description |
entrypoint_settings |
|
|
serialized_context |
|
|
flytekit.remote.remote.Signal
A ProtocolMessage
flytekit.remote.remote.SignalIdentifier
def SignalIdentifier(
signal_id: str,
execution_id: flytekit.models.core.identifier.WorkflowExecutionIdentifier,
):
Parameter |
Type |
signal_id |
str |
execution_id |
flytekit.models.core.identifier.WorkflowExecutionIdentifier |
Methods
from_flyte_idl()
def from_flyte_idl(
proto: flyteidl.core.identifier_pb2.SignalIdentifier,
):
Parameter |
Type |
proto |
flyteidl.core.identifier_pb2.SignalIdentifier |
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
execution_id |
|
|
is_empty |
|
|
signal_id |
|
|
flytekit.remote.remote.SignalListRequest
A ProtocolMessage
flytekit.remote.remote.SignalSetRequest
A ProtocolMessage
flytekit.remote.remote.Sort
def Sort(
key,
direction,
):
Parameter |
Type |
key |
|
direction |
|
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
from_python_std()
def from_python_std(
text,
):
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
direction |
|
|
is_empty |
|
|
key |
|
|
flytekit.remote.remote.SynchronousFlyteClient
This is a low-level client that users can use to make direct gRPC service calls to the control plane. See the
:std:doc:service spec <idl:protos/docs/service/index>
. This is more user-friendly interface than the
:py:class:raw client <flytekit.clients.raw.RawSynchronousFlyteClient>
so users should try to use this class
first. Create a client by ::
SynchronousFlyteClient(“your.domain:port”, insecure=True)
insecure should be True if your flyteadmin deployment doesn’t have SSL enabled
def SynchronousFlyteClient(
cfg: PlatformConfig,
kwargs,
):
Initializes a gRPC channel to the given Flyte Admin service.
Parameter |
Type |
cfg |
PlatformConfig |
kwargs |
**kwargs |
Methods
create_download_link()
def create_download_link(
create_download_link_request: _dataproxy_pb2.CreateDownloadLinkRequest,
):
Parameter |
Type |
create_download_link_request |
_dataproxy_pb2.CreateDownloadLinkRequest |
create_download_location()
def create_download_location(
create_download_location_request: _dataproxy_pb2.CreateDownloadLocationRequest,
):
Parameter |
Type |
create_download_location_request |
_dataproxy_pb2.CreateDownloadLocationRequest |
create_execution()
def create_execution(
project,
domain,
name,
execution_spec,
inputs,
):
This will create an execution for the given execution spec.
Parameter |
Type |
project |
|
domain |
|
name |
|
execution_spec |
|
inputs |
|
create_launch_plan()
def create_launch_plan(
launch_plan_identifer,
launch_plan_spec,
):
This will create a launch plan definition in the Admin database. Once successful, the launch plan object can be
retrieved via the client or viewed via the UI or command-line interfaces.
.. note ::
Overwrites are not supported so any request for a given project, domain, name, and version that exists in
the database must match the existing definition exactly. This also means that as long as the request
remains identical, calling this method multiple times will result in success.
Parameter |
Type |
launch_plan_identifer |
|
launch_plan_spec |
|
create_task()
def create_task(
task_identifer,
task_spec,
):
This will create a task definition in the Admin database. Once successful, the task object can be
retrieved via the client or viewed via the UI or command-line interfaces.
.. note ::
Overwrites are not supported so any request for a given project, domain, name, and version that exists in
the database must match the existing definition exactly. Furthermore, as long as the request
remains identical, calling this method multiple times will result in success.
Parameter |
Type |
task_identifer |
|
task_spec |
|
create_upload_location()
def create_upload_location(
create_upload_location_request: _dataproxy_pb2.CreateUploadLocationRequest,
):
Get a signed url to be used during fast registration
Parameter |
Type |
create_upload_location_request |
_dataproxy_pb2.CreateUploadLocationRequest |
create_workflow()
def create_workflow(
workflow_identifier,
workflow_spec,
):
This will create a workflow definition in the Admin database. Once successful, the workflow object can be
retrieved via the client or viewed via the UI or command-line interfaces.
.. note ::
Overwrites are not supported so any request for a given project, domain, name, and version that exists in
the database must match the existing definition exactly. Furthermore, as long as the request
remains identical, calling this method multiple times will result in success.
Parameter |
Type |
workflow_identifier |
|
workflow_spec |
|
get_active_launch_plan()
def get_active_launch_plan(
identifier,
):
Retrieves the active launch plan entity given a named entity identifier (project, domain, name). Raises an
error if no active launch plan exists.
Parameter |
Type |
identifier |
|
get_control_plane_version()
def get_control_plane_version()
Retrieve the Control Plane version from Flyteadmin.
This method calls Flyteadmin’s GetVersion API to obtain the current version information of the control plane.
The retrieved version can be used to enable or disable specific features based on the Flyteadmin version.
Returns:
str: The version string of the control plane.
get_data()
def get_data(
flyte_uri: str,
):
Parameter |
Type |
flyte_uri |
str |
get_domains()
This returns a list of domains.
get_download_artifact_signed_url()
def get_download_artifact_signed_url(
node_id: str,
project: str,
domain: str,
name: str,
artifact_type: <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x105ae79b0>,
expires_in: datetime.timedelta,
):
Get a signed url for an artifact.
Parameter |
Type |
node_id |
str |
project |
str |
domain |
str |
name |
str |
artifact_type |
<google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x105ae79b0> |
expires_in |
datetime.timedelta |
get_download_signed_url()
def get_download_signed_url(
native_url: str,
expires_in: datetime.timedelta,
):
Parameter |
Type |
native_url |
str |
expires_in |
datetime.timedelta |
get_execution()
def get_execution(
id,
):
get_execution_data()
def get_execution_data(
id,
):
Returns signed URLs to LiteralMap blobs for an execution’s inputs and outputs (when available).
get_execution_metrics()
def get_execution_metrics(
id,
depth,
):
Returns metrics partitioning and categorizing the workflow execution time-series.
get_node_execution()
def get_node_execution(
node_execution_identifier,
):
Parameter |
Type |
node_execution_identifier |
|
get_node_execution_data()
def get_node_execution_data(
node_execution_identifier,
):
Returns signed URLs to LiteralMap blobs for a node execution’s inputs and outputs (when available).
Parameter |
Type |
node_execution_identifier |
|
get_project_domain_attributes()
def get_project_domain_attributes(
project,
domain,
resource_type,
):
Fetches the custom attributes set for a project and domain combination.
Parameter |
Type |
project |
|
domain |
|
resource_type |
|
get_task_execution()
def get_task_execution(
id,
):
get_task_execution_data()
def get_task_execution_data(
task_execution_identifier,
):
Returns signed URLs to LiteralMap blobs for a node execution’s inputs and outputs (when available).
Parameter |
Type |
task_execution_identifier |
|
get_upload_signed_url()
def get_upload_signed_url(
project: str,
domain: str,
content_md5: typing.Optional[bytes],
filename: typing.Optional[str],
expires_in: typing.Optional[datetime.timedelta],
filename_root: typing.Optional[str],
add_content_md5_metadata: bool,
):
Get a signed url to be used during fast registration
Parameter |
Type |
project |
str |
domain |
str |
content_md5 |
typing.Optional[bytes] |
filename |
typing.Optional[str] |
expires_in |
typing.Optional[datetime.timedelta] |
filename_root |
typing.Optional[str] |
add_content_md5_metadata |
bool |
get_workflow_attributes()
def get_workflow_attributes(
project,
domain,
workflow,
resource_type,
):
Fetches the custom attributes set for a project, domain, and workflow combination.
Parameter |
Type |
project |
|
domain |
|
workflow |
|
resource_type |
|
list_active_launch_plans_paginated()
def list_active_launch_plans_paginated(
project,
domain,
limit,
token,
sort_by,
):
This returns a page of currently active launch plan meta-information for launch plans in a given project and
domain.
.. note ::
This is a paginated API. Use the token field in the request to specify a page offset token.
The user of the API is responsible for providing this token.
.. note ::
If entries are added to the database between requests for different pages, it is possible to receive
entries on the second page that also appeared on the first.
Parameter |
Type |
project |
|
domain |
|
limit |
|
token |
|
sort_by |
|
list_executions_paginated()
def list_executions_paginated(
project,
domain,
limit,
token,
filters,
sort_by,
):
This returns a page of executions in a given project and domain.
.. note ::
This is a paginated API. Use the token field in the request to specify a page offset token.
The user of the API is responsible for providing this token.
.. note ::
If entries are added to the database between requests for different pages, it is possible to receive
entries on the second page that also appeared on the first.
Parameter |
Type |
project |
|
domain |
|
limit |
|
token |
|
filters |
|
sort_by |
|
list_launch_plan_ids_paginated()
def list_launch_plan_ids_paginated(
project,
domain,
limit,
token,
sort_by,
):
This returns a page of identifiers for the launch plans for a given project and domain. Filters can also be
specified.
.. note ::
This is a paginated API. Use the token field in the request to specify a page offset token.
The user of the API is responsible for providing this token.
.. note ::
If entries are added to the database between requests for different pages, it is possible to receive
entries on the second page that also appeared on the first.
Parameter |
Type |
project |
|
domain |
|
limit |
|
token |
|
sort_by |
|
list_launch_plans_paginated()
def list_launch_plans_paginated(
identifier,
limit,
token,
filters,
sort_by,
):
This returns a page of launch plan meta-information for launch plans in a given project and domain. Optionally,
specifying a name will limit the results to only workflows with that name in the given project and domain.
.. note ::
This is a paginated API. Use the token field in the request to specify a page offset token.
The user of the API is responsible for providing this token.
.. note ::
If entries are added to the database between requests for different pages, it is possible to receive
entries on the second page that also appeared on the first.
Parameter |
Type |
identifier |
|
limit |
|
token |
|
filters |
|
sort_by |
|
list_matchable_attributes()
def list_matchable_attributes(
resource_type,
):
Fetches all custom attributes for a resource type.
Parameter |
Type |
resource_type |
|
list_node_executions()
def list_node_executions(
workflow_execution_identifier,
limit: int,
token: typing.Optional[str],
filters: typing.List[flytekit.models.filters.Filter],
sort_by: flytekit.models.admin.common.Sort,
unique_parent_id: str,
):
Get node executions associated with a given workflow execution.
Parameter |
Type |
workflow_execution_identifier |
|
limit |
int |
token |
typing.Optional[str] |
filters |
typing.List[flytekit.models.filters.Filter] |
sort_by |
flytekit.models.admin.common.Sort |
unique_parent_id |
str |
list_node_executions_for_task_paginated()
def list_node_executions_for_task_paginated(
task_execution_identifier,
limit,
token,
filters,
sort_by,
):
This returns nodes spawned by a specific task execution. This is generally from things like dynamic tasks.
Parameter |
Type |
task_execution_identifier |
|
limit |
|
token |
|
filters |
|
sort_by |
|
list_node_executions_paginated()
def list_node_executions_paginated(
node_execution_list_request,
):
Parameter |
Type |
node_execution_list_request |
|
list_projects()
def list_projects(
project_list_request: typing.Optional[ProjectListRequest],
):
This will return a list of the projects registered with the Flyte Admin Service
Parameter |
Type |
project_list_request |
typing.Optional[ProjectListRequest] |
list_projects_paginated()
def list_projects_paginated(
limit,
token,
filters,
sort_by,
):
This returns a page of projects.
.. note ::
This is a paginated API. Use the token field in the request to specify a page offset token.
The user of the API is responsible for providing this token.
.. note ::
If entries are added to the database between requests for different pages, it is possible to receive
entries on the second page that also appeared on the first.
Parameter |
Type |
limit |
|
token |
|
filters |
|
sort_by |
|
list_signals()
def list_signals(
signal_list_request: SignalListRequest,
):
This lists signals
Parameter |
Type |
signal_list_request |
SignalListRequest |
list_task_executions_paginated()
def list_task_executions_paginated(
node_execution_identifier,
limit,
token,
filters,
sort_by,
):
Parameter |
Type |
node_execution_identifier |
|
limit |
|
token |
|
filters |
|
sort_by |
|
list_task_ids_paginated()
def list_task_ids_paginated(
project,
domain,
limit,
token,
sort_by,
):
This returns a page of identifiers for the tasks for a given project and domain. Filters can also be
specified.
.. note ::
This is a paginated API. Use the token field in the request to specify a page offset token.
The user of the API is responsible for providing this token.
.. note ::
If entries are added to the database between requests for different pages, it is possible to receive
entries on the second page that also appeared on the first.
Parameter |
Type |
project |
|
domain |
|
limit |
|
token |
|
sort_by |
|
list_tasks_paginated()
def list_tasks_paginated(
identifier,
limit,
token,
filters,
sort_by,
):
This returns a page of task metadata for tasks in a given project and domain. Optionally,
specifying a name will limit the results to only tasks with that name in the given project and domain.
.. note ::
This is a paginated API. Use the token field in the request to specify a page offset token.
The user of the API is responsible for providing this token.
.. note ::
If entries are added to the database between requests for different pages, it is possible to receive
entries on the second page that also appeared on the first.
Parameter |
Type |
identifier |
|
limit |
|
token |
|
filters |
|
sort_by |
|
list_workflow_ids_paginated()
def list_workflow_ids_paginated(
project,
domain,
limit,
token,
sort_by,
):
This returns a page of identifiers for the workflows for a given project and domain. Filters can also be
specified.
.. note ::
This is a paginated API. Use the token field in the request to specify a page offset token.
The user of the API is responsible for providing this token.
.. note ::
If entries are added to the database between requests for different pages, it is possible to receive
entries on the second page that also appeared on the first.
Parameter |
Type |
project |
|
domain |
|
limit |
|
token |
|
sort_by |
|
list_workflows_paginated()
def list_workflows_paginated(
identifier,
limit,
token,
filters,
sort_by,
):
This returns a page of workflow meta-information for workflows in a given project and domain. Optionally,
specifying a name will limit the results to only workflows with that name in the given project and domain.
.. note ::
This is a paginated API. Use the token field in the request to specify a page offset token.
The user of the API is responsible for providing this token.
.. note ::
If entries are added to the database between requests for different pages, it is possible to receive
entries on the second page that also appeared on the first.
Parameter |
Type |
identifier |
|
limit |
|
token |
|
filters |
|
sort_by |
|
recover_execution()
def recover_execution(
id,
name: str,
):
Recreates a previously-run workflow execution that will only start executing from the last known failure point.
Parameter |
Type |
id |
|
name |
str |
register_project()
def register_project(
project,
):
Registers a project.
relaunch_execution()
def relaunch_execution(
id,
name,
):
set_signal()
def set_signal(
signal_set_request: SignalSetRequest,
):
This sets a signal
Parameter |
Type |
signal_set_request |
SignalSetRequest |
terminate_execution()
def terminate_execution(
id,
cause,
):
update_launch_plan()
def update_launch_plan(
id,
state,
):
Updates a launch plan. Currently, this can only be used to update a given launch plan’s state (ACTIVE v.
INACTIVE) for schedules. If a launch plan with a given project, domain, and name is set to ACTIVE,
then any other launch plan with the same project, domain, and name that was set to ACTIVE will be switched to
INACTIVE in one transaction.
update_named_entity()
def update_named_entity(
resource_type,
id,
metadata,
):
Updates the metadata associated with a named entity. A named entity is designated a resource, e.g. a workflow,
task or launch plan specified by {project, domain, name} across all versions of the resource.
Parameter |
Type |
resource_type |
|
id |
|
metadata |
|
update_project()
def update_project(
project,
):
Update an existing project specified by id.
update_project_domain_attributes()
def update_project_domain_attributes(
project,
domain,
matching_attributes,
):
Sets custom attributes for a project and domain combination.
Parameter |
Type |
project |
|
domain |
|
matching_attributes |
|
update_workflow_attributes()
def update_workflow_attributes(
project,
domain,
workflow,
matching_attributes,
):
Sets custom attributes for a project, domain, and workflow combination.
Parameter |
Type |
project |
|
domain |
|
workflow |
|
matching_attributes |
|
with_root_certificate()
def with_root_certificate(
cfg: PlatformConfig,
root_cert_file: str,
):
Parameter |
Type |
cfg |
PlatformConfig |
root_cert_file |
str |
Properties
Property |
Type |
Description |
raw |
|
|
url |
|
|
flytekit.remote.remote.TextColumn
A column containing text.
def TextColumn(
text_format: str,
style: typing.Union[str, ForwardRef('Style')],
justify: typing.Literal['default', 'left', 'center', 'right', 'full'],
markup: bool,
highlighter: typing.Optional[rich.highlighter.Highlighter],
table_column: typing.Optional[rich.table.Column],
):
Parameter |
Type |
text_format |
str |
style |
typing.Union[str, ForwardRef('Style')] |
justify |
typing.Literal['default', 'left', 'center', 'right', 'full'] |
markup |
bool |
highlighter |
typing.Optional[rich.highlighter.Highlighter] |
table_column |
typing.Optional[rich.table.Column] |
Methods
get_table_column()
Get a table column, used to build tasks table.
render()
def render(
task: Task,
):
Should return a renderable object.
flytekit.remote.remote.TimeElapsedColumn
Renders time elapsed.
def TimeElapsedColumn(
table_column: typing.Optional[rich.table.Column],
):
Parameter |
Type |
table_column |
typing.Optional[rich.table.Column] |
Methods
get_table_column()
Get a table column, used to build tasks table.
render()
def render(
task: Task,
):
Show time elapsed.
flytekit.remote.remote.TypeEngine
Core Extensible TypeEngine of Flytekit. This should be used to extend the capabilities of FlyteKits type system.
Users can implement their own TypeTransformers and register them with the TypeEngine. This will allow special handling
of user objects
Methods
async_to_literal()
def async_to_literal(
ctx: FlyteContext,
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
):
Converts a python value of a given type and expected LiteralType
into a resolved Literal
value.
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
python_type |
Type[T] |
expected |
LiteralType |
async_to_python_value()
def async_to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type,
):
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
Type |
calculate_hash()
def calculate_hash(
python_val: typing.Any,
python_type: Type[T],
):
Parameter |
Type |
python_val |
typing.Any |
python_type |
Type[T] |
dict_to_literal_map()
def dict_to_literal_map(
ctx: FlyteContext,
d: typing.Dict[str, typing.Any],
type_hints: Optional[typing.Dict[str, type]],
):
Parameter |
Type |
ctx |
FlyteContext |
d |
typing.Dict[str, typing.Any] |
type_hints |
Optional[typing.Dict[str, type]] |
dict_to_literal_map_pb()
def dict_to_literal_map_pb(
ctx: FlyteContext,
d: typing.Dict[str, typing.Any],
type_hints: Optional[typing.Dict[str, type]],
):
Parameter |
Type |
ctx |
FlyteContext |
d |
typing.Dict[str, typing.Any] |
type_hints |
Optional[typing.Dict[str, type]] |
def get_available_transformers()
Returns all python types for which transformers are available
def get_transformer(
python_type: Type,
):
Implements a recursive search for the transformer.
Parameter |
Type |
python_type |
Type |
guess_python_type()
def guess_python_type(
flyte_type: LiteralType,
):
Transforms a flyte-specific LiteralType
to a regular python value.
Parameter |
Type |
flyte_type |
LiteralType |
guess_python_types()
def guess_python_types(
flyte_variable_dict: typing.Dict[str, _interface_models.Variable],
):
Transforms a dictionary of flyte-specific Variable
objects to a dictionary of regular python values.
Parameter |
Type |
flyte_variable_dict |
typing.Dict[str, _interface_models.Variable] |
def lazy_import_transformers()
Only load the transformers if needed.
literal_map_to_kwargs()
def literal_map_to_kwargs(
ctx: FlyteContext,
lm: LiteralMap,
python_types: typing.Optional[typing.Dict[str, type]],
literal_types: typing.Optional[typing.Dict[str, _interface_models.Variable]],
):
Parameter |
Type |
ctx |
FlyteContext |
lm |
LiteralMap |
python_types |
typing.Optional[typing.Dict[str, type]] |
literal_types |
typing.Optional[typing.Dict[str, _interface_models.Variable]] |
named_tuple_to_variable_map()
def named_tuple_to_variable_map(
t: typing.NamedTuple,
):
Converts a python-native NamedTuple
to a flyte-specific VariableMap of named literals.
Parameter |
Type |
t |
typing.NamedTuple |
register()
def register(
transformer: TypeTransformer,
additional_types: Optional[typing.List[Type]],
):
This should be used for all types that respond with the right type annotation when you use type(…) function
Parameter |
Type |
transformer |
TypeTransformer |
additional_types |
Optional[typing.List[Type]] |
register_additional_type()
def register_additional_type(
transformer: TypeTransformer[T],
additional_type: Type[T],
override,
):
Parameter |
Type |
transformer |
TypeTransformer[T] |
additional_type |
Type[T] |
override |
|
register_restricted_type()
def register_restricted_type(
name: str,
type: Type[T],
):
Parameter |
Type |
name |
str |
type |
Type[T] |
to_html()
def to_html(
ctx: FlyteContext,
python_val: typing.Any,
expected_python_type: Type[typing.Any],
):
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
expected_python_type |
Type[typing.Any] |
to_literal()
def to_literal(
ctx: FlyteContext,
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
):
The current dance is because we are allowing users to call from an async function, this synchronous
to_literal function, and allowing this to_literal function, to then invoke yet another async function,
namely an async transformer.
Parameter |
Type |
ctx |
FlyteContext |
python_val |
typing.Any |
python_type |
Type[T] |
expected |
LiteralType |
to_literal_checks()
def to_literal_checks(
python_val: typing.Any,
python_type: Type[T],
expected: LiteralType,
):
Parameter |
Type |
python_val |
typing.Any |
python_type |
Type[T] |
expected |
LiteralType |
to_literal_type()
def to_literal_type(
python_type: Type[T],
):
Converts a python type into a flyte specific LiteralType
Parameter |
Type |
python_type |
Type[T] |
to_python_value()
def to_python_value(
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type,
):
Converts a Literal value with an expected python type into a python value.
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
expected_python_type |
Type |
unwrap_offloaded_literal()
def unwrap_offloaded_literal(
ctx: FlyteContext,
lv: Literal,
):
Parameter |
Type |
ctx |
FlyteContext |
lv |
Literal |
flytekit.remote.remote.TypedInterface
def TypedInterface(
inputs,
outputs,
):
Please note that this model is slightly incorrect, but is more user-friendly. The underlying inputs and
outputs are represented directly as Python dicts, rather than going through the additional VariableMap layer.
Parameter |
Type |
inputs |
|
outputs |
|
Methods
from_flyte_idl()
def from_flyte_idl(
proto: flyteidl.core.interface_pb2.TypedInterface,
):
Parameter |
Type |
proto |
flyteidl.core.interface_pb2.TypedInterface |
def promote_from_model(
model,
):
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
def transform_interface_to_list(
bound_inputs: typing.Set[str],
excluded_inputs: typing.Set[str],
):
Takes a single task interface and interpolates it to an array interface - to allow performing distributed
python map like functions
Parameter |
Type |
bound_inputs |
typing.Set[str] |
excluded_inputs |
typing.Set[str] |
verbose_string()
Properties
Property |
Type |
Description |
inputs |
|
|
is_empty |
|
|
outputs |
|
|
flytekit.remote.remote.WorkflowBase
def WorkflowBase(
name: str,
workflow_metadata: WorkflowMetadata,
workflow_metadata_defaults: WorkflowMetadataDefaults,
python_interface: Interface,
on_failure: Optional[Union[WorkflowBase, Task]],
docs: Optional[Documentation],
default_options: Optional[Options],
kwargs,
):
Parameter |
Type |
name |
str |
workflow_metadata |
WorkflowMetadata |
workflow_metadata_defaults |
WorkflowMetadataDefaults |
python_interface |
Interface |
on_failure |
Optional[Union[WorkflowBase, Task]] |
docs |
Optional[Documentation] |
default_options |
Optional[Options] |
kwargs |
**kwargs |
Methods
compile()
Parameter |
Type |
kwargs |
**kwargs |
def construct_node_metadata()
execute()
Parameter |
Type |
kwargs |
**kwargs |
local_execute()
def local_execute(
ctx: FlyteContext,
kwargs,
):
Parameter |
Type |
ctx |
FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
Properties
Property |
Type |
Description |
default_options |
|
|
docs |
|
|
failure_node |
|
|
interface |
|
|
name |
|
|
nodes |
|
|
on_failure |
|
|
output_bindings |
|
|
python_interface |
|
|
short_name |
|
|
workflow_metadata |
|
|
workflow_metadata_defaults |
|
|
flytekit.remote.remote.WorkflowExecutionGetDataResponse
Currently, node, task, and workflow execution all have the same get data response. So we’ll create this common
superclass to reduce code duplication until things diverge in the future.
def WorkflowExecutionGetDataResponse(
inputs,
outputs,
full_inputs,
full_outputs,
):
Parameter |
Type |
inputs |
|
outputs |
|
full_inputs |
|
full_outputs |
|
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
full_inputs |
|
|
full_outputs |
|
|
inputs |
|
|
is_empty |
|
|
outputs |
|
|
flytekit.remote.remote.WorkflowExecutionIdentifier
def WorkflowExecutionIdentifier(
project,
domain,
name,
):
Parameter |
Type |
project |
|
domain |
|
name |
|
Methods
from_flyte_idl()
def from_flyte_idl(
p,
):
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
domain |
|
|
is_empty |
|
|
name |
|
|
project |
|
|
flytekit.remote.remote.WorkflowFailurePolicy
Defines the behavior for a workflow execution in the case of an observed node execution failure. By default, a
workflow execution will immediately enter a failed state if a component node fails.
flytekit.remote.remote.datetime
datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]])
The year, month and day arguments are required. tzinfo may be None, or an
instance of a tzinfo subclass. The remaining arguments may be ints.
flytekit.remote.remote.timedelta
Difference between two datetime values.
timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)
All arguments are optional and default to 0.
Arguments may be integers or floats, and may be positive or negative.