1.15.4.dev2+g3e3ce2426

flytekit.remote.remote

This module provides the FlyteRemote object, which is the end-user’s main starting point for interacting with a Flyte backend in an interactive and programmatic way. This of this experience as kind of like the web UI but in Python object form.

Directory

Classes

Class Description
ArrayNodeMapTask Base Class for all Tasks with a Python native Interface.
Artifact An Artifact is effectively just a metadata layer on top of data that exists in Flyte.
BranchNode None.
ClusterAssignment None.
Config This the parent configuration object and holds all the underlying configuration object types.
ConfigFile None.
CopyFileDetection Create a collection of name/value pairs.
CoreNode This class will hold all the things necessary to make an SdkNode but we won’t make one until we know things like.
DataConfig Any data storage specific configuration.
Domain Domains are fixed and unique at the global level, and provide an abstraction to isolate resources and feature configuration for different deployment environments.
ExecutionClusterLabel None.
ExecutionMetadata None.
ExecutionSpec None.
FastPackageOptions FastPackageOptions is used to set configuration options when packaging files.
FastSerializationSettings This object hold information about settings necessary to serialize an object so that it can be fast-registered.
FileAccessProvider This is the class that is available through the FlyteContext and can be used for persisting data to the remote.
FlyteContext This is an internal-facing context object, that most users will not have to deal with.
FlyteContextManager FlyteContextManager manages the execution context within Flytekit.
FlyteExecutionSpan None.
FlyteLaunchPlan A class encapsulating a remote Flyte launch plan.
FlyteNode A class encapsulating a remote Flyte node.
FlyteNodeExecution A class encapsulating a node execution being run on a Flyte remote backend.
FlyteRemote Main entrypoint for programmatically accessing a Flyte remote backend.
FlyteTask A class encapsulating a remote Flyte task.
FlyteTaskExecution A class encapsulating a task execution being run on a Flyte remote backend.
FlyteTaskNode A class encapsulating a task that a Flyte node needs to execute.
FlyteWorkflow A class encapsulating a remote Flyte workflow.
FlyteWorkflowExecution A class encapsulating a workflow execution being run on a Flyte remote backend.
Identifier None.
ImageConfig We recommend you to use ImageConfig.
ImageSpec This class is used to specify the docker image that will be used to run the task.
LaunchPlan Launch Plans are one of the core constructs of Flyte.
LaunchPlanState None.
LazyEntity Fetches the entity when the entity is called or when the entity is retrieved.
Literal None.
LiteralMap None.
LiteralsResolver LiteralsResolver is a helper class meant primarily for use with the FlyteRemote experience or any other situation.
NamedEntityIdentifier None.
Node None.
NodeExecutionGetDataResponse Currently, node, task, and workflow execution all have the same get data response.
NodeMetadata None.
NotificationList None.
Options These are options that can be configured for a launchplan during registration or overridden during an execution.
OrderedDict Dictionary that remembers insertion order.
PickledEntity Represents the structure of the pickled object stored in the .
PickledEntityMetadata Metadata for a pickled entity containing version information.
Progress Renders an auto-updating progress bar(s).
Project None.
PythonAutoContainerTask A Python AutoContainer task should be used as the base for all extensions that want the user’s code to be in the.
PythonFunctionTask A Python Function task should be used as the base for all extensions that have a python function.
PythonFunctionWorkflow Please read :std:ref:flyte:divedeep-workflows first for a high-level understanding of what workflows are in Flyte.
PythonTask Base Class for all Tasks with a Python native Interface.
ReferenceEntity None.
ReferenceLaunchPlan A reference launch plan serves as a pointer to a Launch Plan that already exists on your Flyte installation.
ReferenceSpec None.
ReferenceTask This is a reference task, the body of the function passed in through the constructor will never be used, only the.
ReferenceWorkflow A reference workflow is a pointer to a workflow that already exists on your Flyte installation.
RemoteEntity Helper class that provides a standard way to create an ABC using.
ResolvedIdentifiers None.
ResourceType None.
SerializationSettings These settings are provided while serializing a workflow and task, before registration.
Signal A ProtocolMessage.
SignalIdentifier None.
SignalListRequest A ProtocolMessage.
SignalSetRequest A ProtocolMessage.
Sort None.
SynchronousFlyteClient This is a low-level client that users can use to make direct gRPC service calls to the control plane.
TextColumn A column containing text.
TimeElapsedColumn Renders time elapsed.
TypeEngine Core Extensible TypeEngine of Flytekit.
TypedInterface None.
WorkflowBase None.
WorkflowExecutionGetDataResponse Currently, node, task, and workflow execution all have the same get data response.
WorkflowExecutionIdentifier None.
WorkflowFailurePolicy Defines the behavior for a workflow execution in the case of an observed node execution failure.
datetime datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]]).
timedelta Difference between two datetime values.

Errors

flytekit.remote.remote.ArrayNodeMapTask

Base Class for all Tasks with a Python native Interface. This should be directly used for task types, that do not have a python function to be executed. Otherwise refer to :py:class:flytekit.PythonFunctionTask.

def ArrayNodeMapTask(
    python_function_task: typing.Union[flytekit.core.python_function_task.PythonFunctionTask, flytekit.core.python_function_task.PythonInstanceTask, functools.partial],
    concurrency: typing.Optional[int],
    min_successes: typing.Optional[int],
    min_success_ratio: typing.Optional[float],
    bound_inputs: typing.Optional[typing.Set[str]],
    bound_inputs_values: typing.Optional[typing.Dict[str, typing.Any]],
    kwargs,
):
Parameter Type
python_function_task typing.Union[flytekit.core.python_function_task.PythonFunctionTask, flytekit.core.python_function_task.PythonInstanceTask, functools.partial]
concurrency typing.Optional[int]
min_successes typing.Optional[int]
min_success_ratio typing.Optional[float]
bound_inputs typing.Optional[typing.Set[str]]
bound_inputs_values typing.Optional[typing.Dict[str, typing.Any]]
kwargs **kwargs

Methods

Method Description
compile() Generates a node that encapsulates this task in a workflow definition
construct_node_metadata() This returns metadata for the parent ArrayNode, not the sub-node getting mapped over
dispatch_execute() This method translates Flyte’s Type system based input values and invokes the actual call to the executor
execute() This method will be invoked to execute the task
find_lhs() None
get_command() TODO ADD bound variables to the resolver
get_config() Returns the task config as a serializable dictionary
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte
get_input_types() Returns the names and python types as a dictionary for the inputs of this task
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte
get_type_for_input_var() Returns the python type for an input variable by name
get_type_for_output_var() We override this method from flytekit
local_execute() This function is used only in the local execution path and is responsible for calling dispatch execute
local_execution_mode() None
post_execute() Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
pre_execute() This is the method that will be invoked directly before executing the task method and before all the inputs
prepare_target() Alters the underlying run_task command to modify it for map task execution and then resets it after
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution
set_command_prefix() None

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):

Generates a node that encapsulates this task in a workflow definition.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

This returns metadata for the parent ArrayNode, not the sub-node getting mapped over

dispatch_execute()

def dispatch_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.

  • VoidPromise is returned in the case when the task itself declares no outputs.
  • Literal Map is returned when the task returns either one more outputs in the declaration. Individual outputs may be none
  • DynamicJobSpec is returned when a dynamic workflow is executed
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

execute()

def execute(
    kwargs,
):

This method will be invoked to execute the task.

Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_command()

def get_command(
    settings: flytekit.configuration.SerializationSettings,
):

TODO ADD bound variables to the resolver. Maybe we need a different resolver?

Parameter Type
settings flytekit.configuration.SerializationSettings

get_config()

def get_config(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_container()

def get_container(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_custom()

def get_custom(
    settings: flytekit.configuration.SerializationSettings,
):

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
):

Returns the python type for an input variable by name.

Parameter Type
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
):

We override this method from flytekit.core.base_task Task because the dispatch_execute method uses this interface to construct outputs. Each instance of an container_array task will however produce outputs according to the underlying run_task interface and the array plugin handler will actually create a collection from these individual outputs as the final output value.

Parameter Type
k str
v typing.Any

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

post_execute()

def post_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
    rval: typing.Any,
):

Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]
rval typing.Any

pre_execute()

def pre_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):

This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called

This should return either the same context of the mutated context

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]

prepare_target()

def prepare_target()

Alters the underlying run_task command to modify it for map task execution and then resets it after.

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

set_command_prefix()

def set_command_prefix(
    cmd: typing.Optional[typing.List[str]],
):
Parameter Type
cmd typing.Optional[typing.List[str]]

Properties

Property Type Description
bound_inputs
concurrency
deck_fields
disable_deck
docs
enable_deck
environment
execution_mode
instantiated_in
interface
is_original_sub_node_interface
lhs
location
metadata
min_success_ratio
min_successes
name
python_function_task
python_interface
security_context
task_config
task_type
task_type_version

flytekit.remote.remote.Artifact

An Artifact is effectively just a metadata layer on top of data that exists in Flyte. Most data of interest will be the output of tasks and workflows. The other category is user uploads.

This Python class has limited purpose, as a way for users to specify that tasks/workflows create Artifacts and the manner (i.e. name, partitions) in which they are created.

Control creation parameters at task/workflow execution time ::

@task def t1() -> Annotated[nn.Module, Artifact(name=“my.artifact.name”)]: …

def Artifact(
    project: Optional[str],
    domain: Optional[str],
    name: Optional[str],
    version: Optional[str],
    time_partitioned: bool,
    time_partition: Optional[TimePartition],
    time_partition_granularity: Optional[Granularity],
    partition_keys: Optional[typing.List[str]],
    partitions: Optional[Union[Partitions, typing.Dict[str, str]]],
):
Parameter Type
project Optional[str]
domain Optional[str]
name Optional[str]
version Optional[str]
time_partitioned bool
time_partition Optional[TimePartition]
time_partition_granularity Optional[Granularity]
partition_keys Optional[typing.List[str]]
partitions Optional[Union[Partitions, typing.Dict[str, str]]]

Methods

Method Description
create_from() This function allows users to declare partition values dynamically from the body of a task
embed_as_query() This should only be called in the context of a Trigger
query() None
to_id_idl() Converts this object to the IDL representation

create_from()

def create_from(
    o: O,
    card: Optional[SerializableToString],
    args: `*args`,
    kwargs,
):

This function allows users to declare partition values dynamically from the body of a task. Note that you’ll still need to annotate your task function output with the relevant Artifact object. Below, one of the partition values is bound to an input, and the other is set at runtime. Note that since tasks are not run at compile time, flytekit cannot check that you’ve bound all the partition values. It’s up to you to ensure that you’ve done so.

Pricing = Artifact(name=“pricing”, partition_keys=[“region”]) EstError = Artifact(name=“estimation_error”, partition_keys=[“dataset”], time_partitioned=True)

@task def t1() -> Annotated[pd.DataFrame, Pricing], Annotated[float, EstError]: df = get_pricing_results() dt = get_time() return Pricing.create_from(df, region=“dubai”), EstError.create_from(msq_error, dataset=“train”, time_partition=dt)

You can mix and match with the input syntax as well.

@task def my_task() -> Annotated[pd.DataFrame, RideCountData(region=Inputs.region)]: … return RideCountData.create_from(df, time_partition=datetime.datetime.now())

Parameter Type
o O
card Optional[SerializableToString]
args *args
kwargs **kwargs

embed_as_query()

def embed_as_query(
    partition: Optional[str],
    bind_to_time_partition: Optional[bool],
    expr: Optional[str],
    op: Optional[Op],
):

This should only be called in the context of a Trigger. The type of query this returns is different from the query() function. This type of query is used to reference the triggering artifact, rather than running a query.

Parameter Type
partition Optional[str]
bind_to_time_partition Optional[bool]
expr Optional[str]
op Optional[Op]

query()

def query(
    project: Optional[str],
    domain: Optional[str],
    time_partition: Optional[Union[datetime.datetime, TimePartition, art_id.InputBindingData]],
    partitions: Optional[Union[typing.Dict[str, str], Partitions]],
    kwargs,
):
Parameter Type
project Optional[str]
domain Optional[str]
time_partition Optional[Union[datetime.datetime, TimePartition, art_id.InputBindingData]]
partitions Optional[Union[typing.Dict[str, str], Partitions]]
kwargs **kwargs

to_id_idl()

def to_id_idl()

Converts this object to the IDL representation. This is here instead of translator because it’s in the interface, a relatively simple proto object that’s exposed to the user.

Properties

Property Type Description
concrete_artifact_id
partitions
time_partition

flytekit.remote.remote.BranchNode

def BranchNode(
    if_else: flytekit.models.core.workflow.IfElseBlock,
):

BranchNode is a special node that alter the flow of the workflow graph. It allows the control flow to branch at runtime based on a series of conditions that get evaluated on various parameters (e.g. inputs, primitives).

Parameter Type
if_else flytekit.models.core.workflow.IfElseBlock

Methods

Method Description
from_flyte_idl() None
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_objct,
):
Parameter Type
pb2_objct

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
if_else
is_empty

flytekit.remote.remote.ClusterAssignment

def ClusterAssignment(
    cluster_pool,
):
Parameter Type
cluster_pool

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    p,
):
Parameter Type
p

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
cluster_pool
is_empty

flytekit.remote.remote.Config

This the parent configuration object and holds all the underlying configuration object types. An instance of this object holds all the config necessary to

  1. Interactive session with Flyte backend
  2. Some parts are required for Serialization, for example Platform Config is not required
  3. Runtime of a task
def Config(
    platform: PlatformConfig,
    secrets: SecretsConfig,
    stats: StatsConfig,
    data_config: DataConfig,
    local_sandbox_path: str,
):
Parameter Type
platform PlatformConfig
secrets SecretsConfig
stats StatsConfig
data_config DataConfig
local_sandbox_path str

Methods

Method Description
auto() Automatically constructs the Config Object
for_endpoint() Creates an automatic config for the given endpoint and uses the config_file or environment variable for default
for_sandbox() Constructs a new Config object specifically to connect to :std:ref:deployment-deployment-sandbox
with_params() None

auto()

def auto(
    config_file: typing.Union[str, ConfigFile, None],
):

Automatically constructs the Config Object. The order of precedence is as follows

  1. first try to find any env vars that match the config vars specified in the FLYTE_CONFIG format.
  2. If not found in environment then values ar read from the config file
  3. If not found in the file, then the default values are used.
Parameter Type
config_file typing.Union[str, ConfigFile, None]

for_endpoint()

def for_endpoint(
    endpoint: str,
    insecure: bool,
    data_config: typing.Optional[DataConfig],
    config_file: typing.Union[str, ConfigFile],
):

Creates an automatic config for the given endpoint and uses the config_file or environment variable for default. Refer to Config.auto() to understand the default bootstrap behavior.

data_config can be used to configure how data is downloaded or uploaded to a specific Blob storage like S3 / GCS etc. But, for permissions to a specific backend just use Cloud providers reqcommendation. If using fsspec, then refer to fsspec documentation

Parameter Type
endpoint str
insecure bool
data_config typing.Optional[DataConfig]
config_file typing.Union[str, ConfigFile]

for_sandbox()

def for_sandbox()

Constructs a new Config object specifically to connect to :std:ref:deployment-deployment-sandbox. If you are using a hosted Sandbox like environment, then you may need to use port-forward or ingress urls :return: Config

with_params()

def with_params(
    platform: PlatformConfig,
    secrets: SecretsConfig,
    stats: StatsConfig,
    data_config: DataConfig,
    local_sandbox_path: str,
):
Parameter Type
platform PlatformConfig
secrets SecretsConfig
stats StatsConfig
data_config DataConfig
local_sandbox_path str

flytekit.remote.remote.ConfigFile

def ConfigFile(
    location: str,
):

Load the config from this location

Parameter Type
location str

Methods

Method Description
get() None

get()

def get(
    c: typing.Union[LegacyConfigEntry, YamlConfigEntry],
):
Parameter Type
c typing.Union[LegacyConfigEntry, YamlConfigEntry]

Properties

Property Type Description
legacy_config
yaml_config

flytekit.remote.remote.CopyFileDetection

Create a collection of name/value pairs.

Example enumeration:

class Color(Enum): … RED = 1 … BLUE = 2 … GREEN = 3

Access them by:

  • attribute access:

Color.RED <Color.RED: 1>

  • value lookup:

Color(1) <Color.RED: 1>

  • name lookup:

Color[‘RED’] <Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

len(Color) 3

list(Color) [<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

flytekit.remote.remote.CoreNode

This class will hold all the things necessary to make an SdkNode but we won’t make one until we know things like ID, which from the registration step

def CoreNode(
    id: str,
    metadata: _workflow_model.NodeMetadata,
    bindings: List[_literal_models.Binding],
    upstream_nodes: List[Node],
    flyte_entity: Any,
):
Parameter Type
id str
metadata _workflow_model.NodeMetadata
bindings List[_literal_models.Binding]
upstream_nodes List[Node]
flyte_entity Any

Methods

Method Description
runs_before() This is typically something we shouldn’t do
with_overrides() None

runs_before()

def runs_before(
    other: Node,
):

This is typically something we shouldn’t do. This modifies an attribute of the other instance rather than self. But it’s done so only because we wanted this English function to be the same as the shift function. That is, calling node_1.runs_before(node_2) and node_1 » node_2 are the same. The shift operator going the other direction is not implemented to further avoid confusion. Right shift was picked rather than left shift because that’s what most users are familiar with.

Parameter Type
other Node

with_overrides()

def with_overrides(
    node_name: Optional[str],
    aliases: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    timeout: Optional[Union[int, datetime.timedelta, object]],
    retries: Optional[int],
    interruptible: Optional[bool],
    name: Optional[str],
    task_config: Optional[Any],
    container_image: Optional[str],
    accelerator: Optional[BaseAccelerator],
    cache: Optional[bool],
    cache_version: Optional[str],
    cache_serialize: Optional[bool],
    shared_memory: Optional[Union[L[True], str]],
    pod_template: Optional[PodTemplate],
    resources: Optional[Resources],
    args,
    kwargs,
):
Parameter Type
node_name Optional[str]
aliases Optional[Dict[str, str]]
requests Optional[Resources]
limits Optional[Resources]
timeout Optional[Union[int, datetime.timedelta, object]]
retries Optional[int]
interruptible Optional[bool]
name Optional[str]
task_config Optional[Any]
container_image Optional[str]
accelerator Optional[BaseAccelerator]
cache Optional[bool]
cache_version Optional[str]
cache_serialize Optional[bool]
shared_memory Optional[Union[L[True], str]]
pod_template Optional[PodTemplate]
resources Optional[Resources]
args *args
kwargs **kwargs

Properties

Property Type Description
bindings
flyte_entity
id
metadata
name
outputs
run_entity
upstream_nodes

flytekit.remote.remote.DataConfig

Any data storage specific configuration. Please do not use this to store secrets, in S3 case, as it is used in Flyte sandbox environment we store the access key id and secret. All DataPersistence plugins are passed all DataConfig and the plugin should correctly use the right config

def DataConfig(
    s3: S3Config,
    gcs: GCSConfig,
    azure: AzureBlobStorageConfig,
    generic: GenericPersistenceConfig,
):
Parameter Type
s3 S3Config
gcs GCSConfig
azure AzureBlobStorageConfig
generic GenericPersistenceConfig

Methods

Method Description
auto() None

auto()

def auto(
    config_file: typing.Union[str, ConfigFile],
):
Parameter Type
config_file typing.Union[str, ConfigFile]

flytekit.remote.remote.Domain

Domains are fixed and unique at the global level, and provide an abstraction to isolate resources and feature configuration for different deployment environments.

def Domain(
    id,
    name,
):
Parameter Type
id
name

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
id
is_empty
name

flytekit.remote.remote.ExecutionClusterLabel

def ExecutionClusterLabel(
    value,
):

Label value to determine where the execution will be run

Parameter Type
value

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
is_empty
value

flytekit.remote.remote.ExecutionMetadata

def ExecutionMetadata(
    mode: int,
    principal: str,
    nesting: int,
    scheduled_at: Optional[datetime.datetime],
    parent_node_execution: Optional[_identifier.NodeExecutionIdentifier],
    reference_execution: Optional[_identifier.WorkflowExecutionIdentifier],
    system_metadata: Optional[SystemMetadata],
):
Parameter Type
mode int
principal str
nesting int
scheduled_at Optional[datetime.datetime]
parent_node_execution Optional[_identifier.NodeExecutionIdentifier]
reference_execution Optional[_identifier.WorkflowExecutionIdentifier]
system_metadata Optional[SystemMetadata]

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
is_empty
mode
nesting
parent_node_execution
principal
reference_execution
scheduled_at
system_metadata

flytekit.remote.remote.ExecutionSpec

def ExecutionSpec(
    launch_plan,
    metadata,
    notifications,
    disable_all,
    labels,
    annotations,
    auth_role,
    raw_output_data_config,
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    overwrite_cache: Optional[bool],
    interruptible: Optional[bool],
    envs: Optional[_common_models.Envs],
    tags: Optional[typing.List[str]],
    cluster_assignment: Optional[ClusterAssignment],
    execution_cluster_label: Optional[ExecutionClusterLabel],
):
Parameter Type
launch_plan
metadata
notifications
disable_all
labels
annotations
auth_role
raw_output_data_config
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
overwrite_cache Optional[bool]
interruptible Optional[bool]
envs Optional[_common_models.Envs]
tags Optional[typing.List[str]]
cluster_assignment Optional[ClusterAssignment]
execution_cluster_label Optional[ExecutionClusterLabel]

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    p,
):
Parameter Type
p

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
annotations
auth_role
cluster_assignment
disable_all
envs
execution_cluster_label
interruptible
is_empty
labels
launch_plan
max_parallelism
metadata
notifications
overwrite_cache
raw_output_data_config
security_context
tags

flytekit.remote.remote.FastPackageOptions

FastPackageOptions is used to set configuration options when packaging files.

def FastPackageOptions(
    ignores: list[Ignore],
    keep_default_ignores: bool,
    copy_style: Optional[CopyFileDetection],
    show_files: bool,
):
Parameter Type
ignores list[Ignore]
keep_default_ignores bool
copy_style Optional[CopyFileDetection]
show_files bool

flytekit.remote.remote.FastSerializationSettings

This object hold information about settings necessary to serialize an object so that it can be fast-registered.

def FastSerializationSettings(
    enabled: bool,
    destination_dir: Optional[str],
    distribution_location: Optional[str],
):
Parameter Type
enabled bool
destination_dir Optional[str]
distribution_location Optional[str]

Methods

Method Description
from_dict() None
from_json() None
schema() None
to_dict() None
to_json() None

from_dict()

def from_dict(
    kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
    infer_missing,
):
Parameter Type
kvs typing.Union[dict, list, str, int, float, bool, NoneType]
infer_missing

from_json()

def from_json(
    s: typing.Union[str, bytes, bytearray],
    parse_float,
    parse_int,
    parse_constant,
    infer_missing,
    kw,
):
Parameter Type
s typing.Union[str, bytes, bytearray]
parse_float
parse_int
parse_constant
infer_missing
kw

schema()

def schema(
    infer_missing: bool,
    only,
    exclude,
    many: bool,
    context,
    load_only,
    dump_only,
    partial: bool,
    unknown,
):
Parameter Type
infer_missing bool
only
exclude
many bool
context
load_only
dump_only
partial bool
unknown

to_dict()

def to_dict(
    encode_json,
):
Parameter Type
encode_json

to_json()

def to_json(
    skipkeys: bool,
    ensure_ascii: bool,
    check_circular: bool,
    allow_nan: bool,
    indent: typing.Union[int, str, NoneType],
    separators: typing.Tuple[str, str],
    default: typing.Callable,
    sort_keys: bool,
    kw,
):
Parameter Type
skipkeys bool
ensure_ascii bool
check_circular bool
allow_nan bool
indent typing.Union[int, str, NoneType]
separators typing.Tuple[str, str]
default typing.Callable
sort_keys bool
kw

flytekit.remote.remote.FileAccessProvider

This is the class that is available through the FlyteContext and can be used for persisting data to the remote durable store.

def FileAccessProvider(
    local_sandbox_dir: typing.Union[str, os.PathLike],
    raw_output_prefix: str,
    data_config: typing.Optional[flytekit.configuration.DataConfig],
    execution_metadata: typing.Optional[dict],
):
Parameter Type
local_sandbox_dir typing.Union[str, os.PathLike]
raw_output_prefix str
data_config typing.Optional[flytekit.configuration.DataConfig]
execution_metadata typing.Optional[dict]

Methods

Method Description
async_get_data()
async_put_data() The implication here is that we’re always going to put data to the remote location, so we
async_put_raw_data() This is a more flexible version of put that accepts a file-like object or a string path
download() Downloads from remote to local
download_directory() Downloads directory from given remote to local path
exists() None
generate_new_custom_path() Generates a new path with the raw output prefix and a random string appended to it
get() None
get_async_filesystem_for_path() None
get_data()
get_file_tail() None
get_filesystem() None
get_filesystem_for_path() None
get_random_local_directory() None
get_random_local_path() Use file_path_or_file_name, when you want a random directory, but want to preserve the leaf file name
get_random_remote_directory() None
get_random_remote_path() None
get_random_string() None
is_remote() Deprecated
join() None
put_data() The implication here is that we’re always going to put data to the remote location, so we
put_raw_data() This is a more flexible version of put that accepts a file-like object or a string path
recursive_paths() None
sep() None
strip_file_header() Drops file:// if it exists from the file
upload()
upload_directory()

async_get_data()

def async_get_data(
    remote_path: str,
    local_path: str,
    is_multipart: bool,
    kwargs,
):
Parameter Type
remote_path str
local_path str
is_multipart bool
kwargs **kwargs

async_put_data()

def async_put_data(
    local_path: typing.Union[str, os.PathLike],
    remote_path: str,
    is_multipart: bool,
    kwargs,
):

The implication here is that we’re always going to put data to the remote location, so we .remote to ensure we don’t use the true local proxy if the remote path is a file://

Parameter Type
local_path typing.Union[str, os.PathLike]
remote_path str
is_multipart bool
kwargs **kwargs

async_put_raw_data()

def async_put_raw_data(
    lpath: typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO],
    upload_prefix: typing.Optional[str],
    file_name: typing.Optional[str],
    read_chunk_size_bytes: int,
    encoding: str,
    skip_raw_data_prefix: bool,
    kwargs,
):

This is a more flexible version of put that accepts a file-like object or a string path. Writes to the raw output prefix only. If you want to write to another fs use put_data or get the fsspec file system directly. FYI: Currently the raw output prefix set by propeller is already unique per retry and looks like s3://my-s3-bucket/data/o4/feda4e266c748463a97d-n0-0

If lpath is a folder, then recursive will be set. If lpath is a streamable, then it can only be a single file.

Writes to: {raw output prefix}/{upload_prefix}/{file_name}

Parameter Type
lpath typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO]
upload_prefix typing.Optional[str]
file_name typing.Optional[str]
read_chunk_size_bytes int
encoding str
skip_raw_data_prefix bool
kwargs **kwargs

download()

def download(
    remote_path: str,
    local_path: str,
    kwargs,
):

Downloads from remote to local

Parameter Type
remote_path str
local_path str
kwargs **kwargs

download_directory()

def download_directory(
    remote_path: str,
    local_path: str,
    kwargs,
):

Downloads directory from given remote to local path

Parameter Type
remote_path str
local_path str
kwargs **kwargs

exists()

def exists(
    path: str,
):
Parameter Type
path str

generate_new_custom_path()

def generate_new_custom_path(
    fs: typing.Optional[fsspec.spec.AbstractFileSystem],
    alt: typing.Optional[str],
    stem: typing.Optional[str],
):

Generates a new path with the raw output prefix and a random string appended to it. Optionally, you can provide an alternate prefix and a stem. If stem is provided, it will be appended to the path instead of a random string. If alt is provided, it will replace the first part of the output prefix, e.g. the S3 or GCS bucket.

If wanting to write to a non-random prefix in a non-default S3 bucket, this can be called with alt=“my-alt-bucket” and stem=“my-stem” to generate a path like s3://my-alt-bucket/default-prefix-part/my-stem

Parameter Type
fs typing.Optional[fsspec.spec.AbstractFileSystem]
alt typing.Optional[str]
stem typing.Optional[str]

get()

def get(
    from_path: str,
    to_path: str,
    recursive: bool,
    kwargs,
):
Parameter Type
from_path str
to_path str
recursive bool
kwargs **kwargs

get_async_filesystem_for_path()

def get_async_filesystem_for_path(
    path: str,
    anonymous: bool,
    kwargs,
):
Parameter Type
path str
anonymous bool
kwargs **kwargs

get_data()

def get_data(
    remote_path: str,
    local_path: str,
    is_multipart: bool,
    kwargs,
):
Parameter Type
remote_path str
local_path str
is_multipart bool
kwargs **kwargs

get_file_tail()

def get_file_tail(
    file_path_or_file_name: str,
):
Parameter Type
file_path_or_file_name str

get_filesystem()

def get_filesystem(
    protocol: typing.Optional[str],
    anonymous: bool,
    path: typing.Optional[str],
    kwargs,
):
Parameter Type
protocol typing.Optional[str]
anonymous bool
path typing.Optional[str]
kwargs **kwargs

get_filesystem_for_path()

def get_filesystem_for_path(
    path: str,
    anonymous: bool,
    kwargs,
):
Parameter Type
path str
anonymous bool
kwargs **kwargs

get_random_local_directory()

def get_random_local_directory()

get_random_local_path()

def get_random_local_path(
    file_path_or_file_name: typing.Optional[str],
):

Use file_path_or_file_name, when you want a random directory, but want to preserve the leaf file name

Parameter Type
file_path_or_file_name typing.Optional[str]

get_random_remote_directory()

def get_random_remote_directory()

get_random_remote_path()

def get_random_remote_path(
    file_path_or_file_name: typing.Optional[str],
):
Parameter Type
file_path_or_file_name typing.Optional[str]

get_random_string()

def get_random_string()

is_remote()

def is_remote(
    path: typing.Union[str, os.PathLike],
):

Deprecated. Let’s find a replacement

Parameter Type
path typing.Union[str, os.PathLike]

join()

def join(
    args: `*args`,
    unstrip: bool,
    fs: typing.Optional[fsspec.spec.AbstractFileSystem],
):
Parameter Type
args *args
unstrip bool
fs typing.Optional[fsspec.spec.AbstractFileSystem]

put_data()

def put_data(
    local_path: typing.Union[str, os.PathLike],
    remote_path: str,
    is_multipart: bool,
    kwargs,
):

The implication here is that we’re always going to put data to the remote location, so we .remote to ensure we don’t use the true local proxy if the remote path is a file://

Parameter Type
local_path typing.Union[str, os.PathLike]
remote_path str
is_multipart bool
kwargs **kwargs

put_raw_data()

def put_raw_data(
    lpath: typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO],
    upload_prefix: typing.Optional[str],
    file_name: typing.Optional[str],
    read_chunk_size_bytes: int,
    encoding: str,
    skip_raw_data_prefix: bool,
    kwargs,
):

This is a more flexible version of put that accepts a file-like object or a string path. Writes to the raw output prefix only. If you want to write to another fs use put_data or get the fsspec file system directly. FYI: Currently the raw output prefix set by propeller is already unique per retry and looks like s3://my-s3-bucket/data/o4/feda4e266c748463a97d-n0-0

If lpath is a folder, then recursive will be set. If lpath is a streamable, then it can only be a single file.

Writes to: {raw output prefix}/{upload_prefix}/{file_name}

Parameter Type
lpath typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO]
upload_prefix typing.Optional[str]
file_name typing.Optional[str]
read_chunk_size_bytes int
encoding str
skip_raw_data_prefix bool
kwargs **kwargs

recursive_paths()

def recursive_paths(
    f: str,
    t: str,
):
Parameter Type
f str
t str

sep()

def sep(
    file_system: typing.Optional[fsspec.spec.AbstractFileSystem],
):
Parameter Type
file_system typing.Optional[fsspec.spec.AbstractFileSystem]

strip_file_header()

def strip_file_header(
    path: str,
    trim_trailing_sep: bool,
):

Drops file:// if it exists from the file

Parameter Type
path str
trim_trailing_sep bool

upload()

def upload(
    file_path: str,
    to_path: str,
    kwargs,
):
Parameter Type
file_path str
to_path str
kwargs **kwargs

upload_directory()

def upload_directory(
    local_path: str,
    remote_path: str,
    kwargs,
):
Parameter Type
local_path str
remote_path str
kwargs **kwargs

Properties

Property Type Description
data_config
local_access
local_sandbox_dir
raw_output_fs
raw_output_prefix

flytekit.remote.remote.FlyteAssertion

Assertion failed.

def FlyteAssertion(
    args,
    timestamp: typing.Optional[float],
):
Parameter Type
args *args
timestamp typing.Optional[float]

Properties

Property Type Description
timestamp

flytekit.remote.remote.FlyteContext

This is an internal-facing context object, that most users will not have to deal with. It’s essentially a globally available grab bag of settings and objects that allows flytekit to do things like convert complex types, run and compile workflows, serialize Flyte entities, etc.

Even though this object as a current_context function on it, it should not be called directly. Please use the :py:class:flytekit.FlyteContextManager object instead.

Please do not confuse this object with the :py:class:flytekit.ExecutionParameters object.

def FlyteContext(
    file_access: FileAccessProvider,
    level: int,
    flyte_client: Optional['friendly_client.SynchronousFlyteClient'],
    compilation_state: Optional[CompilationState],
    execution_state: Optional[ExecutionState],
    serialization_settings: Optional[SerializationSettings],
    in_a_condition: bool,
    origin_stackframe: Optional[traceback.FrameSummary],
    output_metadata_tracker: Optional[OutputMetadataTracker],
    worker_queue: Optional[Controller],
):
Parameter Type
file_access FileAccessProvider
level int
flyte_client Optional['friendly_client.SynchronousFlyteClient']
compilation_state Optional[CompilationState]
execution_state Optional[ExecutionState]
serialization_settings Optional[SerializationSettings]
in_a_condition bool
origin_stackframe Optional[traceback.FrameSummary]
output_metadata_tracker Optional[OutputMetadataTracker]
worker_queue Optional[Controller]

Methods

Method Description
current_context() This method exists only to maintain backwards compatibility
enter_conditional_section() None
get_deck() Returns the deck that was created as part of the last execution
get_origin_stackframe_repr() None
new_builder() None
new_compilation_state() Creates and returns a default compilation state
new_execution_state() Creates and returns a new default execution state
set_stackframe() None
with_client() None
with_compilation_state() None
with_execution_state() None
with_file_access() None
with_new_compilation_state() None
with_output_metadata_tracker() None
with_serialization_settings() None
with_worker_queue() None

current_context()

def current_context()

This method exists only to maintain backwards compatibility. Please use FlyteContextManager.current_context() instead.

Users of flytekit should be wary not to confuse the object returned from this function with :py:func:flytekit.current_context

enter_conditional_section()

def enter_conditional_section()

get_deck()

def get_deck()

Returns the deck that was created as part of the last execution.

The return value depends on the execution environment. In a notebook, the return value is compatible with IPython.display and should be rendered in the notebook.

.. code-block:: python

with flytekit.new_context() as ctx: my_task(…) ctx.get_deck()

OR if you wish to explicitly display

.. code-block:: python

from IPython import display display(ctx.get_deck())

get_origin_stackframe_repr()

def get_origin_stackframe_repr()

new_builder()

def new_builder()

new_compilation_state()

def new_compilation_state(
    prefix: str,
):

Creates and returns a default compilation state. For most of the code this should be the entrypoint of compilation, otherwise the code should always uses - with_compilation_state

Parameter Type
prefix str

new_execution_state()

def new_execution_state(
    working_dir: Optional[os.PathLike],
):

Creates and returns a new default execution state. This should be used at the entrypoint of execution, in all other cases it is preferable to use with_execution_state

Parameter Type
working_dir Optional[os.PathLike]

set_stackframe()

def set_stackframe(
    s: traceback.FrameSummary,
):
Parameter Type
s traceback.FrameSummary

with_client()

def with_client(
    c: SynchronousFlyteClient,
):
Parameter Type
c SynchronousFlyteClient

with_compilation_state()

def with_compilation_state(
    c: CompilationState,
):
Parameter Type
c CompilationState

with_execution_state()

def with_execution_state(
    es: ExecutionState,
):
Parameter Type
es ExecutionState

with_file_access()

def with_file_access(
    fa: FileAccessProvider,
):
Parameter Type
fa FileAccessProvider

with_new_compilation_state()

def with_new_compilation_state()

with_output_metadata_tracker()

def with_output_metadata_tracker(
    t: OutputMetadataTracker,
):
Parameter Type
t OutputMetadataTracker

with_serialization_settings()

def with_serialization_settings(
    ss: SerializationSettings,
):
Parameter Type
ss SerializationSettings

with_worker_queue()

def with_worker_queue(
    wq: Controller,
):
Parameter Type
wq Controller

Properties

Property Type Description
user_space_params

flytekit.remote.remote.FlyteContextManager

FlyteContextManager manages the execution context within Flytekit. It holds global state of either compilation or Execution. It is not thread-safe and can only be run as a single threaded application currently. Context’s within Flytekit is useful to manage compilation state and execution state. Refer to CompilationState and ExecutionState for more information. FlyteContextManager provides a singleton stack to manage these contexts.

Typical usage is

.. code-block:: python

FlyteContextManager.initialize() with FlyteContextManager.with_context(o) as ctx: pass

If required - not recommended you can use

FlyteContextManager.push_context()

but correspondingly a pop_context should be called

FlyteContextManager.pop_context()

Methods

Method Description
add_signal_handler() None
current_context() None
get_origin_stackframe() None
initialize() Re-initializes the context and erases the entire context
pop_context() None
push_context() None
size() None
with_context() None

add_signal_handler()

def add_signal_handler(
    handler: typing.Callable[[int, FrameType], typing.Any],
):
Parameter Type
handler typing.Callable[[int, FrameType], typing.Any]

current_context()

def current_context()

get_origin_stackframe()

def get_origin_stackframe(
    limit,
):
Parameter Type
limit

initialize()

def initialize()

Re-initializes the context and erases the entire context

pop_context()

def pop_context()

push_context()

def push_context(
    ctx: FlyteContext,
    f: Optional[traceback.FrameSummary],
):
Parameter Type
ctx FlyteContext
f Optional[traceback.FrameSummary]

size()

def size()

with_context()

def with_context(
    b: FlyteContext.Builder,
):
Parameter Type
b FlyteContext.Builder

flytekit.remote.remote.FlyteEntityAlreadyExistsException

Assertion failed.

def FlyteEntityAlreadyExistsException(
    args,
    timestamp: typing.Optional[float],
):
Parameter Type
args *args
timestamp typing.Optional[float]

Properties

Property Type Description
timestamp

flytekit.remote.remote.FlyteEntityNotExistException

Assertion failed.

def FlyteEntityNotExistException(
    args,
    timestamp: typing.Optional[float],
):
Parameter Type
args *args
timestamp typing.Optional[float]

Properties

Property Type Description
timestamp

flytekit.remote.remote.FlyteExecutionSpan

def FlyteExecutionSpan(
    span: flyteidl.core.metrics_pb2.Span,
):
Parameter Type
span flyteidl.core.metrics_pb2.Span

Methods

Method Description
dump() None
explain() None
from_flyte_idl() None
serialize_to_string() None
short_string()
to_flyte_idl() None
verbose_string()

dump()

def dump()

explain()

def explain()

from_flyte_idl()

def from_flyte_idl(
    pb,
):
Parameter Type
pb

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
is_empty

flytekit.remote.remote.FlyteLaunchPlan

A class encapsulating a remote Flyte launch plan.

def FlyteLaunchPlan(
    id,
    args,
    kwargs,
):
Parameter Type
id
args *args
kwargs **kwargs

Methods

Method Description
compile() None
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition
execute() None
from_flyte_idl()
local_execute() None
local_execution_mode() None
promote_from_model() None
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

compile()

def compile(
    ctx: FlyteContext,
    args,
    kwargs,
):
Parameter Type
ctx FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

execute()

def execute(
    kwargs,
):
Parameter Type
kwargs **kwargs

from_flyte_idl()

def from_flyte_idl(
    pb2,
):
Parameter Type
pb2

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

promote_from_model()

def promote_from_model(
    id: id_models.Identifier,
    model: _launch_plan_models.LaunchPlanSpec,
):
Parameter Type
id id_models.Identifier
model _launch_plan_models.LaunchPlanSpec

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
annotations
auth_role
default_inputs
entity_metadata
entity_type_text
fixed_inputs
flyte_workflow
id
interface
is_empty
is_scheduled
labels
max_parallelism
name
overwrite_cache
python_interface
raw_output_data_config
resource_type
security_context
workflow_id

flytekit.remote.remote.FlyteNode

A class encapsulating a remote Flyte node.

def FlyteNode(
    id,
    upstream_nodes,
    bindings,
    metadata,
    task_node: Optional[FlyteTaskNode],
    workflow_node: Optional[FlyteWorkflowNode],
    branch_node: Optional[FlyteBranchNode],
    gate_node: Optional[FlyteGateNode],
    array_node: Optional[FlyteArrayNode],
):
Parameter Type
id
upstream_nodes
bindings
metadata
task_node Optional[FlyteTaskNode]
workflow_node Optional[FlyteWorkflowNode]
branch_node Optional[FlyteBranchNode]
gate_node Optional[FlyteGateNode]
array_node Optional[FlyteArrayNode]

Methods

Method Description
from_flyte_idl()
promote_from_model() None
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

promote_from_model()

def promote_from_model(
    model: _workflow_model.Node,
    sub_workflows: Optional[Dict[id_models.Identifier, _workflow_model.WorkflowTemplate]],
    node_launch_plans: Optional[Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec]],
    tasks: Dict[id_models.Identifier, FlyteTask],
    converted_sub_workflows: Dict[id_models.Identifier, FlyteWorkflow],
):
Parameter Type
model _workflow_model.Node
sub_workflows Optional[Dict[id_models.Identifier, _workflow_model.WorkflowTemplate]]
node_launch_plans Optional[Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec]]
tasks Dict[id_models.Identifier, FlyteTask]
converted_sub_workflows Dict[id_models.Identifier, FlyteWorkflow]

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
array_node
branch_node
flyte_entity
gate_node
id
inputs
is_empty
metadata
output_aliases
target
task_node
upstream_node_ids
upstream_nodes
workflow_node

flytekit.remote.remote.FlyteNodeExecution

A class encapsulating a node execution being run on a Flyte remote backend.

def FlyteNodeExecution(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

Methods

Method Description
from_flyte_idl() None
promote_from_model() None
serialize_to_string() None
short_string()
to_flyte_idl() None
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    p: flyteidl.admin.node_execution_pb2.NodeExecution,
):
Parameter Type
p flyteidl.admin.node_execution_pb2.NodeExecution

promote_from_model()

def promote_from_model(
    base_model: node_execution_models.NodeExecution,
):
Parameter Type
base_model node_execution_models.NodeExecution

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
closure
error
executions
id
input_uri
inputs
interface
is_done
is_empty
metadata
outputs
subworkflow_node_executions
task_executions
workflow_executions

flytekit.remote.remote.FlyteRemote

Main entrypoint for programmatically accessing a Flyte remote backend.

The term ‘remote’ is synonymous with ‘backend’ or ‘deployment’ and refers to a hosted instance of the Flyte platform, which comes with a Flyte Admin server on some known URI.

def FlyteRemote(
    config: Config,
    default_project: typing.Optional[str],
    default_domain: typing.Optional[str],
    data_upload_location: str,
    interactive_mode_enabled: typing.Optional[bool],
    kwargs,
):

Initialize a FlyteRemote object.

:type kwargs: All arguments that can be passed to create the SynchronousFlyteClient. These are usually grpc parameters, if you want to customize credentials, ssl handling etc.

Parameter Type
config Config
default_project typing.Optional[str]
default_domain typing.Optional[str]
data_upload_location str
interactive_mode_enabled typing.Optional[bool]
kwargs **kwargs

Methods

Method Description
activate_launchplan() Given a launchplan, activate it, all previous versions are deactivated
approve()
auto() None
download() Download the data to the specified location
execute() Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity
execute_local_launch_plan() Execute a locally defined LaunchPlan
execute_local_task() Execute a @task-decorated function or TaskTemplate task
execute_local_workflow() Execute an @workflow decorated function
execute_reference_launch_plan() Execute a ReferenceLaunchPlan
execute_reference_task() Execute a ReferenceTask
execute_reference_workflow() Execute a ReferenceWorkflow
execute_remote_task_lp() Execute a FlyteTask, or FlyteLaunchplan
execute_remote_wf() Execute a FlyteWorkflow
fast_package() Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location
fast_register_workflow() Use this method to register a workflow with zip mode
fetch_active_launchplan() Returns the active version of the launch plan if it exists or returns None
fetch_execution() Fetch a workflow execution entity from flyte admin
fetch_launch_plan() Fetch a launchplan entity from flyte admin
fetch_task() Fetch a task entity from flyte admin
fetch_task_lazy() Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily
fetch_workflow() Fetch a workflow entity from flyte admin
fetch_workflow_lazy() Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily
find_launch_plan() None
find_launch_plan_for_node() None
for_endpoint() None
for_sandbox() None
generate_console_http_domain() This should generate the domain where console is hosted
generate_console_url() Generate a Flyteconsole URL for the given Flyte remote endpoint
get() General function that works with flyte tiny urls
get_domains() Lists registered domains from flyte admin
get_execution_metrics() Get the metrics for a given execution
get_extra_headers_for_protocol() None
launch_backfill() Creates and launches a backfill workflow for the given launchplan
list_projects() Lists registered projects from flyte admin
list_signals()
list_tasks_by_version() None
raw_register() Raw register method, can be used to register control plane entities
recent_executions() None
register_launch_plan() Register a given launchplan, possibly applying overrides from the provided options
register_script() Use this method to register a workflow via script mode
register_task() Register a qualified task (PythonTask) with Remote
register_workflow() Use this method to register a workflow
reject()
remote_context() Context manager with remote-specific configuration
set_input()
set_signal()
sync() This function was previously a singledispatchmethod
sync_execution() Sync a FlyteWorkflowExecution object with its corresponding remote state
sync_node_execution() Get data backing a node execution
sync_task_execution() Sync a FlyteTaskExecution object with its corresponding remote state
terminate() Terminate a workflow execution
upload_file() Function will use remote’s client to hash and then upload the file using Admin’s data proxy service
wait() Wait for an execution to finish

activate_launchplan()

def activate_launchplan(
    ident: Identifier,
):

Given a launchplan, activate it, all previous versions are deactivated.

Parameter Type
ident Identifier

approve()

def approve(
    signal_id: str,
    execution_name: str,
    project: str,
    domain: str,
):
Parameter Type
signal_id str
execution_name str
project str
domain str

auto()

def auto(
    config_file: typing.Union[str, ConfigFile],
    default_project: typing.Optional[str],
    default_domain: typing.Optional[str],
    data_upload_location: str,
    interactive_mode_enabled: bool,
    kwargs,
):
Parameter Type
config_file typing.Union[str, ConfigFile]
default_project typing.Optional[str]
default_domain typing.Optional[str]
data_upload_location str
interactive_mode_enabled bool
kwargs **kwargs

download()

def download(
    data: typing.Union[LiteralsResolver, Literal, LiteralMap],
    download_to: str,
    recursive: bool,
):

Download the data to the specified location. If the data is a LiteralsResolver, LiteralMap and if recursive is specified, then all file like objects will be recursively downloaded (e.g. FlyteFile/Dir (blob), StructuredDataset etc).

Note: That it will use your sessions credentials to access the remote location. For sandbox, this should be automatically configured, assuming you are running sandbox locally. For other environments, you will need to configure your credentials appropriately.

Parameter Type
data typing.Union[LiteralsResolver, Literal, LiteralMap]
download_to str
recursive bool

execute()

def execute(
    entity: typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity],
    inputs: typing.Dict[str, typing.Any],
    project: str,
    domain: str,
    name: str,
    version: str,
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    image_config: typing.Optional[ImageConfig],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
    serialization_settings: typing.Optional[SerializationSettings],
):

Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity.

This method supports:

  • Flyte{Task, Workflow, LaunchPlan} remote module objects.
  • @task-decorated functions and TaskTemplate tasks.
  • @workflow-decorated functions.
  • LaunchPlan objects.

For local entities, this code will attempt to find the entity first, and if missing, will compile and register the object.

Not all arguments are relevant in all circumstances. For example, there’s no reason to use the serialization settings for entities that have already been registered on Admin.

Parameter Type
entity typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity]
inputs typing.Dict[str, typing.Any]
project str
domain str
name str
version str
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
image_config typing.Optional[ImageConfig]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]
serialization_settings typing.Optional[SerializationSettings]

execute_local_launch_plan()

def execute_local_launch_plan(
    entity: LaunchPlan,
    inputs: typing.Dict[str, typing.Any],
    version: str,
    project: typing.Optional[str],
    domain: typing.Optional[str],
    name: typing.Optional[str],
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
    serialization_settings: typing.Optional[SerializationSettings],
):

Execute a locally defined LaunchPlan.

Parameter Type
entity LaunchPlan
inputs typing.Dict[str, typing.Any]
version str
project typing.Optional[str]
domain typing.Optional[str]
name typing.Optional[str]
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]
serialization_settings typing.Optional[SerializationSettings]

execute_local_task()

def execute_local_task(
    entity: PythonTask,
    inputs: typing.Dict[str, typing.Any],
    project: str,
    domain: str,
    name: str,
    version: str,
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    image_config: typing.Optional[ImageConfig],
    wait: bool,
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
    options: typing.Optional[Options],
    serialization_settings: typing.Optional[SerializationSettings],
):

Execute a @task-decorated function or TaskTemplate task.

Parameter Type
entity PythonTask
inputs typing.Dict[str, typing.Any]
project str
domain str
name str
version str
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
image_config typing.Optional[ImageConfig]
wait bool
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]
options typing.Optional[Options]
serialization_settings typing.Optional[SerializationSettings]

execute_local_workflow()

def execute_local_workflow(
    entity: WorkflowBase,
    inputs: typing.Dict[str, typing.Any],
    project: str,
    domain: str,
    name: str,
    version: str,
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    image_config: typing.Optional[ImageConfig],
    options: typing.Optional[Options],
    wait: bool,
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
    serialization_settings: typing.Optional[SerializationSettings],
):

Execute an @workflow decorated function.

Parameter Type
entity WorkflowBase
inputs typing.Dict[str, typing.Any]
project str
domain str
name str
version str
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
image_config typing.Optional[ImageConfig]
options typing.Optional[Options]
wait bool
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]
serialization_settings typing.Optional[SerializationSettings]

execute_reference_launch_plan()

def execute_reference_launch_plan(
    entity: ReferenceLaunchPlan,
    inputs: typing.Dict[str, typing.Any],
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
):

Execute a ReferenceLaunchPlan.

Parameter Type
entity ReferenceLaunchPlan
inputs typing.Dict[str, typing.Any]
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]

execute_reference_task()

def execute_reference_task(
    entity: ReferenceTask,
    inputs: typing.Dict[str, typing.Any],
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
):

Execute a ReferenceTask.

Parameter Type
entity ReferenceTask
inputs typing.Dict[str, typing.Any]
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]

execute_reference_workflow()

def execute_reference_workflow(
    entity: ReferenceWorkflow,
    inputs: typing.Dict[str, typing.Any],
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
):

Execute a ReferenceWorkflow.

Parameter Type
entity ReferenceWorkflow
inputs typing.Dict[str, typing.Any]
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]

execute_remote_task_lp()

def execute_remote_task_lp(
    entity: typing.Union[FlyteTask, FlyteLaunchPlan],
    inputs: typing.Dict[str, typing.Any],
    project: str,
    domain: str,
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
):

Execute a FlyteTask, or FlyteLaunchplan.

NOTE: the name and version arguments are currently not used and only there consistency in the function signature

Parameter Type
entity typing.Union[FlyteTask, FlyteLaunchPlan]
inputs typing.Dict[str, typing.Any]
project str
domain str
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]

execute_remote_wf()

def execute_remote_wf(
    entity: FlyteWorkflow,
    inputs: typing.Dict[str, typing.Any],
    project: str,
    domain: str,
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
):

Execute a FlyteWorkflow.

NOTE: the name and version arguments are currently not used and only there consistency in the function signature

Parameter Type
entity FlyteWorkflow
inputs typing.Dict[str, typing.Any]
project str
domain str
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]

fast_package()

def fast_package(
    root: os.PathLike,
    deref_symlinks: bool,
    output: str,
    options: typing.Optional[FastPackageOptions],
):

Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location

Parameter Type
root os.PathLike
deref_symlinks bool
output str
options typing.Optional[FastPackageOptions]

fast_register_workflow()

def fast_register_workflow(
    entity: WorkflowBase,
    serialization_settings: typing.Optional[SerializationSettings],
    version: typing.Optional[str],
    default_launch_plan: typing.Optional[bool],
    options: typing.Optional[Options],
    fast_package_options: typing.Optional[FastPackageOptions],
):

Use this method to register a workflow with zip mode.

Parameter Type
entity WorkflowBase
serialization_settings typing.Optional[SerializationSettings]
version typing.Optional[str]
default_launch_plan typing.Optional[bool]
options typing.Optional[Options]
fast_package_options typing.Optional[FastPackageOptions]

fetch_active_launchplan()

def fetch_active_launchplan(
    project: str,
    domain: str,
    name: str,
):

Returns the active version of the launch plan if it exists or returns None

Parameter Type
project str
domain str
name str

fetch_execution()

def fetch_execution(
    project: str,
    domain: str,
    name: str,
):

Fetch a workflow execution entity from flyte admin.

Parameter Type
project str
domain str
name str

fetch_launch_plan()

def fetch_launch_plan(
    project: str,
    domain: str,
    name: str,
    version: str,
):

Fetch a launchplan entity from flyte admin.

Parameter Type
project str
domain str
name str
version str

fetch_task()

def fetch_task(
    project: str,
    domain: str,
    name: str,
    version: str,
):

Fetch a task entity from flyte admin.

Parameter Type
project str
domain str
name str
version str

fetch_task_lazy()

def fetch_task_lazy(
    project: str,
    domain: str,
    name: str,
    version: str,
):

Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily.

Parameter Type
project str
domain str
name str
version str

fetch_workflow()

def fetch_workflow(
    project: str,
    domain: str,
    name: str,
    version: str,
):

Fetch a workflow entity from flyte admin.

Parameter Type
project str
domain str
name str
version str

fetch_workflow_lazy()

def fetch_workflow_lazy(
    project: str,
    domain: str,
    name: str,
    version: str,
):

Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily.

Parameter Type
project str
domain str
name str
version str

find_launch_plan()

def find_launch_plan(
    lp_ref: id_models,
    node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
):
Parameter Type
lp_ref id_models
node_launch_plans Dict[id_models, launch_plan_models.LaunchPlanSpec]

find_launch_plan_for_node()

def find_launch_plan_for_node(
    node: Node,
    node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
):
Parameter Type
node Node
node_launch_plans Dict[id_models, launch_plan_models.LaunchPlanSpec]

for_endpoint()

def for_endpoint(
    endpoint: str,
    insecure: bool,
    data_config: typing.Optional[DataConfig],
    config_file: typing.Union[str, ConfigFile],
    default_project: typing.Optional[str],
    default_domain: typing.Optional[str],
    data_upload_location: str,
    interactive_mode_enabled: bool,
    kwargs,
):
Parameter Type
endpoint str
insecure bool
data_config typing.Optional[DataConfig]
config_file typing.Union[str, ConfigFile]
default_project typing.Optional[str]
default_domain typing.Optional[str]
data_upload_location str
interactive_mode_enabled bool
kwargs **kwargs

for_sandbox()

def for_sandbox(
    default_project: typing.Optional[str],
    default_domain: typing.Optional[str],
    data_upload_location: str,
    interactive_mode_enabled: bool,
    kwargs,
):
Parameter Type
default_project typing.Optional[str]
default_domain typing.Optional[str]
data_upload_location str
interactive_mode_enabled bool
kwargs **kwargs

generate_console_http_domain()

def generate_console_http_domain()

This should generate the domain where console is hosted.

:return:

generate_console_url()

def generate_console_url(
    entity: typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, WorkflowExecutionIdentifier, Identifier, FlyteLaunchPlan],
):

Generate a Flyteconsole URL for the given Flyte remote endpoint. This will automatically determine if this is an execution or an entity and change the type automatically

Parameter Type
entity typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, WorkflowExecutionIdentifier, Identifier, FlyteLaunchPlan]

get()

def get(
    flyte_uri: typing.Optional[str],
):

General function that works with flyte tiny urls. This can return outputs (in the form of LiteralsResolver, or individual Literals for singular requests), or HTML if passed a deck link, or bytes containing HTML, if ipython is not available locally.

Parameter Type
flyte_uri typing.Optional[str]

get_domains()

def get_domains()

Lists registered domains from flyte admin.

:returns: typing.List[flytekit.models.domain.Domain]

get_execution_metrics()

def get_execution_metrics(
    id: WorkflowExecutionIdentifier,
    depth: int,
):

Get the metrics for a given execution.

Parameter Type
id WorkflowExecutionIdentifier
depth int

get_extra_headers_for_protocol()

def get_extra_headers_for_protocol(
    native_url,
):
Parameter Type
native_url

launch_backfill()

def launch_backfill(
    project: str,
    domain: str,
    from_date: datetime,
    to_date: datetime,
    launchplan: str,
    launchplan_version: str,
    execution_name: str,
    version: str,
    dry_run: bool,
    execute: bool,
    parallel: bool,
    failure_policy: typing.Optional[WorkflowFailurePolicy],
    overwrite_cache: typing.Optional[bool],
):

Creates and launches a backfill workflow for the given launchplan. If launchplan version is not specified, then the latest launchplan is retrieved. The from_date is exclusive and end_date is inclusive and backfill run for all instances in between. :: -> (start_date - exclusive, end_date inclusive)

If dry_run is specified, the workflow is created and returned. If execute==False is specified then the workflow is created and registered. In the last case, the workflow is created, registered and executed.

The parallel flag can be used to generate a workflow where all launchplans can be run in parallel. Default is that execute backfill is run sequentially

Parameter Type
project str
domain str
from_date datetime
to_date datetime
launchplan str
launchplan_version str
execution_name str
version str
dry_run bool
execute bool
parallel bool
failure_policy typing.Optional[WorkflowFailurePolicy]
overwrite_cache typing.Optional[bool]

list_projects()

def list_projects(
    limit: typing.Optional[int],
    filters: typing.Optional[typing.List[filter_models.Filter]],
    sort_by: typing.Optional[admin_common_models.Sort],
):

Lists registered projects from flyte admin.

Parameter Type
limit typing.Optional[int]
filters typing.Optional[typing.List[filter_models.Filter]]
sort_by typing.Optional[admin_common_models.Sort]

list_signals()

def list_signals(
    execution_name: str,
    project: typing.Optional[str],
    domain: typing.Optional[str],
    limit: int,
    filters: typing.Optional[typing.List[filter_models.Filter]],
):
Parameter Type
execution_name str
project typing.Optional[str]
domain typing.Optional[str]
limit int
filters typing.Optional[typing.List[filter_models.Filter]]

list_tasks_by_version()

def list_tasks_by_version(
    version: str,
    project: typing.Optional[str],
    domain: typing.Optional[str],
    limit: typing.Optional[int],
):
Parameter Type
version str
project typing.Optional[str]
domain typing.Optional[str]
limit typing.Optional[int]

raw_register()

def raw_register(
    cp_entity: FlyteControlPlaneEntity,
    settings: SerializationSettings,
    version: str,
    create_default_launchplan: bool,
    options: Options,
    og_entity: FlyteLocalEntity,
):

Raw register method, can be used to register control plane entities. Usually if you have a Flyte Entity like a WorkflowBase, Task, LaunchPlan then use other methods. This should be used only if you have already serialized entities

Parameter Type
cp_entity FlyteControlPlaneEntity
settings SerializationSettings
version str
create_default_launchplan bool
options Options
og_entity FlyteLocalEntity

recent_executions()

def recent_executions(
    project: typing.Optional[str],
    domain: typing.Optional[str],
    limit: typing.Optional[int],
    filters: typing.Optional[typing.List[filter_models.Filter]],
):
Parameter Type
project typing.Optional[str]
domain typing.Optional[str]
limit typing.Optional[int]
filters typing.Optional[typing.List[filter_models.Filter]]

register_launch_plan()

def register_launch_plan(
    entity: LaunchPlan,
    version: typing.Optional[str],
    project: typing.Optional[str],
    domain: typing.Optional[str],
    options: typing.Optional[Options],
    serialization_settings: typing.Optional[SerializationSettings],
):

Register a given launchplan, possibly applying overrides from the provided options. If the underlying workflow is not already registered, it, along with any underlying entities, will also be registered. If the underlying workflow does exist (with the given project/domain/version), then only the launchplan will be registered.

Parameter Type
entity LaunchPlan
version typing.Optional[str]
project typing.Optional[str]
domain typing.Optional[str]
options typing.Optional[Options]
serialization_settings typing.Optional[SerializationSettings]

register_script()

def register_script(
    entity: typing.Union[WorkflowBase, PythonTask, LaunchPlan],
    image_config: typing.Optional[ImageConfig],
    version: typing.Optional[str],
    project: typing.Optional[str],
    domain: typing.Optional[str],
    destination_dir: str,
    copy_all: bool,
    default_launch_plan: bool,
    options: typing.Optional[Options],
    source_path: typing.Optional[str],
    module_name: typing.Optional[str],
    envs: typing.Optional[typing.Dict[str, str]],
    fast_package_options: typing.Optional[FastPackageOptions],
):

Use this method to register a workflow via script mode.

Parameter Type
entity typing.Union[WorkflowBase, PythonTask, LaunchPlan]
image_config typing.Optional[ImageConfig]
version typing.Optional[str]
project typing.Optional[str]
domain typing.Optional[str]
destination_dir str
copy_all bool
default_launch_plan bool
options typing.Optional[Options]
source_path typing.Optional[str]
module_name typing.Optional[str]
envs typing.Optional[typing.Dict[str, str]]
fast_package_options typing.Optional[FastPackageOptions]

register_task()

def register_task(
    entity: PythonTask,
    serialization_settings: typing.Optional[SerializationSettings],
    version: typing.Optional[str],
):

Register a qualified task (PythonTask) with Remote For any conflicting parameters method arguments are regarded as overrides

Parameter Type
entity PythonTask
serialization_settings typing.Optional[SerializationSettings]
version typing.Optional[str]

register_workflow()

def register_workflow(
    entity: WorkflowBase,
    serialization_settings: typing.Optional[SerializationSettings],
    version: typing.Optional[str],
    default_launch_plan: typing.Optional[bool],
    options: typing.Optional[Options],
):

Use this method to register a workflow.

Parameter Type
entity WorkflowBase
serialization_settings typing.Optional[SerializationSettings]
version typing.Optional[str]
default_launch_plan typing.Optional[bool]
options typing.Optional[Options]

reject()

def reject(
    signal_id: str,
    execution_name: str,
    project: str,
    domain: str,
):
Parameter Type
signal_id str
execution_name str
project str
domain str

remote_context()

def remote_context()

Context manager with remote-specific configuration.

set_input()

def set_input(
    signal_id: str,
    execution_name: str,
    value: typing.Union[literal_models.Literal, typing.Any],
    project,
    domain,
    python_type,
    literal_type,
):
Parameter Type
signal_id str
execution_name str
value typing.Union[literal_models.Literal, typing.Any]
project
domain
python_type
literal_type

set_signal()

def set_signal(
    signal_id: str,
    execution_name: str,
    value: typing.Union[literal_models.Literal, typing.Any],
    project: typing.Optional[str],
    domain: typing.Optional[str],
    python_type: typing.Optional[typing.Type],
    literal_type: typing.Optional[type_models.LiteralType],
):
Parameter Type
signal_id str
execution_name str
value typing.Union[literal_models.Literal, typing.Any]
project typing.Optional[str]
domain typing.Optional[str]
python_type typing.Optional[typing.Type]
literal_type typing.Optional[type_models.LiteralType]

sync()

def sync(
    execution: FlyteWorkflowExecution,
    entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
    sync_nodes: bool,
):

This function was previously a singledispatchmethod. We’ve removed that but this function remains so that we don’t break people.

Parameter Type
execution FlyteWorkflowExecution
entity_definition typing.Union[FlyteWorkflow, FlyteTask]
sync_nodes bool

sync_execution()

def sync_execution(
    execution: FlyteWorkflowExecution,
    entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
    sync_nodes: bool,
):

Sync a FlyteWorkflowExecution object with its corresponding remote state.

Parameter Type
execution FlyteWorkflowExecution
entity_definition typing.Union[FlyteWorkflow, FlyteTask]
sync_nodes bool

sync_node_execution()

def sync_node_execution(
    execution: FlyteNodeExecution,
    node_mapping: typing.Dict[str, FlyteNode],
):

Get data backing a node execution. These FlyteNodeExecution objects should’ve come from Admin with the model fields already populated correctly. For purposes of the remote experience, we’d like to supplement the object with some additional fields:

  • inputs/outputs
  • task/workflow executions, and/or underlying node executions in the case of parent nodes
  • TypedInterface (remote wrapper type)

A node can have several different types of executions behind it. That is, the node could’ve run (perhaps multiple times because of retries):

  • A task
  • A static subworkflow
  • A dynamic subworkflow (which in turn may have run additional tasks, subwfs, and/or launch plans)
  • A launch plan

The data model is complicated, so ascertaining which of these happened is a bit tricky. That logic is encapsulated in this function.

Parameter Type
execution FlyteNodeExecution
node_mapping typing.Dict[str, FlyteNode]

sync_task_execution()

def sync_task_execution(
    execution: FlyteTaskExecution,
    entity_interface: typing.Optional[TypedInterface],
):

Sync a FlyteTaskExecution object with its corresponding remote state.

Parameter Type
execution FlyteTaskExecution
entity_interface typing.Optional[TypedInterface]

terminate()

def terminate(
    execution: FlyteWorkflowExecution,
    cause: str,
):

Terminate a workflow execution.

Parameter Type
execution FlyteWorkflowExecution
cause str

upload_file()

def upload_file(
    to_upload: pathlib.Path,
    project: typing.Optional[str],
    domain: typing.Optional[str],
    filename_root: typing.Optional[str],
):

Function will use remote’s client to hash and then upload the file using Admin’s data proxy service.

Parameter Type
to_upload pathlib.Path
project typing.Optional[str]
domain typing.Optional[str]
filename_root typing.Optional[str]

wait()

def wait(
    execution: FlyteWorkflowExecution,
    timeout: typing.Optional[typing.Union[timedelta, int]],
    poll_interval: typing.Optional[typing.Union[timedelta, int]],
    sync_nodes: bool,
):

Wait for an execution to finish.

Parameter Type
execution FlyteWorkflowExecution
timeout typing.Optional[typing.Union[timedelta, int]]
poll_interval typing.Optional[typing.Union[timedelta, int]]
sync_nodes bool

Properties

Property Type Description
client
config
context
default_domain
default_project
file_access
interactive_mode_enabled

flytekit.remote.remote.FlyteTask

A class encapsulating a remote Flyte task.

def FlyteTask(
    id,
    type,
    metadata,
    interface,
    custom,
    container,
    task_type_version: int,
    security_context,
    config,
    k8s_pod,
    sql,
    extended_resources,
    should_register: bool,
):
Parameter Type
id
type
metadata
interface
custom
container
task_type_version int
security_context
config
k8s_pod
sql
extended_resources
should_register bool

Methods

Method Description
compile() None
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition
execute() None
from_flyte_idl()
local_execute() None
local_execution_mode() None
promote_from_model() None
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

execute()

def execute(
    kwargs,
):
Parameter Type
kwargs **kwargs

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

promote_from_model()

def promote_from_model(
    base_model: _task_model.TaskTemplate,
):
Parameter Type
base_model _task_model.TaskTemplate

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
config
container
custom
docs
entity_type_text
extended_resources
id
interface
is_empty
k8s_pod
metadata
name
python_interface
resource_type
security_context
should_register
sql
task_type_version
template
type

flytekit.remote.remote.FlyteTaskExecution

A class encapsulating a task execution being run on a Flyte remote backend.

def FlyteTaskExecution(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

Methods

Method Description
from_flyte_idl()
promote_from_model() None
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    proto,
):
Parameter Type
proto

promote_from_model()

def promote_from_model(
    base_model: admin_task_execution_models.TaskExecution,
):
Parameter Type
base_model admin_task_execution_models.TaskExecution

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
closure
error
id
input_uri
inputs
is_done
is_empty
is_parent
outputs
task

flytekit.remote.remote.FlyteTaskNode

A class encapsulating a task that a Flyte node needs to execute.

def FlyteTaskNode(
    flyte_task: FlyteTask,
):

Refers to the task that the Node is to execute. This is currently a oneof in protobuf, but there’s only one option currently. This code should be updated when more options are available.

Parameter Type
flyte_task FlyteTask

Methods

Method Description
from_flyte_idl()
promote_from_model() Takes the idl wrapper for a TaskNode,
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

promote_from_model()

def promote_from_model(
    task: FlyteTask,
):

Takes the idl wrapper for a TaskNode, and returns the hydrated Flytekit object for it by fetching it with the FlyteTask control plane.

Parameter Type
task FlyteTask

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
flyte_task
is_empty
overrides
reference_id

flytekit.remote.remote.FlyteValueException

Inappropriate argument value (of correct type).

def FlyteValueException(
    received_value,
    error_message,
):
Parameter Type
received_value
error_message

Properties

Property Type Description
timestamp

flytekit.remote.remote.FlyteWorkflow

A class encapsulating a remote Flyte workflow.

def FlyteWorkflow(
    id: id_models.Identifier,
    nodes: List[FlyteNode],
    interface,
    output_bindings,
    metadata,
    metadata_defaults,
    subworkflows: Optional[List[FlyteWorkflow]],
    tasks: Optional[List[FlyteTask]],
    launch_plans: Optional[Dict[id_models.Identifier, launch_plan_models.LaunchPlanSpec]],
    compiled_closure: Optional[compiler_models.CompiledWorkflowClosure],
    should_register: bool,
):
Parameter Type
id id_models.Identifier
nodes List[FlyteNode]
interface
output_bindings
metadata
metadata_defaults
subworkflows Optional[List[FlyteWorkflow]]
tasks Optional[List[FlyteTask]]
launch_plans Optional[Dict[id_models.Identifier, launch_plan_models.LaunchPlanSpec]]
compiled_closure Optional[compiler_models.CompiledWorkflowClosure]
should_register bool

Methods

Method Description
compile() None
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition
execute() None
from_flyte_idl()
get_non_system_nodes() None
local_execute() None
local_execution_mode() None
promote_from_closure() Extracts out the relevant portions of a FlyteWorkflow from a closure from the control plane
promote_from_model() None
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

execute()

def execute(
    kwargs,
):
Parameter Type
kwargs **kwargs

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

get_non_system_nodes()

def get_non_system_nodes(
    nodes: List[_workflow_models.Node],
):
Parameter Type
nodes List[_workflow_models.Node]

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

promote_from_closure()

def promote_from_closure(
    closure: compiler_models.CompiledWorkflowClosure,
    node_launch_plans: Optional[Dict[id_models, launch_plan_models.LaunchPlanSpec]],
):

Extracts out the relevant portions of a FlyteWorkflow from a closure from the control plane.

Parameter Type
closure compiler_models.CompiledWorkflowClosure
node_launch_plans Optional[Dict[id_models, launch_plan_models.LaunchPlanSpec]]

promote_from_model()

def promote_from_model(
    base_model: _workflow_models.WorkflowTemplate,
    sub_workflows: Optional[Dict[Identifier, _workflow_models.WorkflowTemplate]],
    tasks: Optional[Dict[Identifier, FlyteTask]],
    node_launch_plans: Optional[Dict[Identifier, launch_plan_models.LaunchPlanSpec]],
):
Parameter Type
base_model _workflow_models.WorkflowTemplate
sub_workflows Optional[Dict[Identifier, _workflow_models.WorkflowTemplate]]
tasks Optional[Dict[Identifier, FlyteTask]]
node_launch_plans Optional[Dict[Identifier, launch_plan_models.LaunchPlanSpec]]

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
docs
entity_type_text
failure_node
flyte_nodes
flyte_sub_workflows
flyte_tasks
id
interface
is_empty
metadata
metadata_defaults
name
nodes
outputs
python_interface
resource_type
should_register
sub_workflows
template

flytekit.remote.remote.FlyteWorkflowExecution

A class encapsulating a workflow execution being run on a Flyte remote backend.

def FlyteWorkflowExecution(
    type_hints: Optional[Dict[str, typing.Type]],
    remote: Optional['FlyteRemote'],
    args,
    kwargs,
):
Parameter Type
type_hints Optional[Dict[str, typing.Type]]
remote Optional['FlyteRemote']
args *args
kwargs **kwargs

Methods

Method Description
from_flyte_idl()
promote_from_model() None
serialize_to_string() None
short_string()
sync() Sync the state of the current execution and returns a new object with the updated state
to_flyte_idl()
verbose_string()
wait() Wait for the execution to complete

from_flyte_idl()

def from_flyte_idl(
    pb,
):
Parameter Type
pb

promote_from_model()

def promote_from_model(
    base_model: execution_models.Execution,
    remote: Optional['FlyteRemote'],
    type_hints: Optional[Dict[str, typing.Type]],
):
Parameter Type
base_model execution_models.Execution
remote Optional['FlyteRemote']
type_hints Optional[Dict[str, typing.Type]]

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

sync()

def sync(
    sync_nodes: bool,
):

Sync the state of the current execution and returns a new object with the updated state.

Parameter Type
sync_nodes bool

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

wait()

def wait(
    timeout: Optional[Union[timedelta, int]],
    poll_interval: Optional[Union[timedelta, int]],
    sync_nodes: bool,
):

Wait for the execution to complete. This is a blocking call.

Parameter Type
timeout Optional[Union[timedelta, int]]
poll_interval Optional[Union[timedelta, int]]
sync_nodes bool

Properties

Property Type Description
closure
error
execution_url
flyte_workflow
id
inputs
is_done
is_empty
is_successful
node_executions
outputs
spec

flytekit.remote.remote.Identifier

def Identifier(
    resource_type,
    project,
    domain,
    name,
    version,
):
Parameter Type
resource_type
project
domain
name
version

Methods

Method Description
from_flyte_idl()
resource_type_name() None
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    p,
):
Parameter Type
p

resource_type_name()

def resource_type_name()

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
domain
is_empty
name
project
resource_type
version

flytekit.remote.remote.ImageConfig

We recommend you to use ImageConfig.auto(img_name=None) to create an ImageConfig. For example, ImageConfig.auto(img_name=““ghcr.io/flyteorg/flytecookbook:v1.0.0"”) will create an ImageConfig.

ImageConfig holds available images which can be used at registration time. A default image can be specified along with optional additional images. Each image in the config must have a unique name.

Attributes: default_image (Optional[Image]): The default image to be used as a container for task serialization. images (List[Image]): Optional, additional images which can be used in task container definitions.

def ImageConfig(
    default_image: Optional[Image],
    images: Optional[List[Image]],
):
Parameter Type
default_image Optional[Image]
images Optional[List[Image]]

Methods

Method Description
auto() Reads from config file or from img_name
auto_default_image() None
create_from() None
find_image() Return an image, by name, if it exists
from_dict() None
from_images() Allows you to programmatically create an ImageConfig
from_json() None
schema() None
to_dict() None
to_json() None
validate_image() Validates the image to match the standard format

auto()

def auto(
    config_file: typing.Union[str, ConfigFile, None],
    img_name: Optional[str],
):

Reads from config file or from img_name Note that this function does not take into account the flytekit default images (see the Dockerfiles at the base of this repo). To pick those up, see the auto_default_image function..

Parameter Type
config_file typing.Union[str, ConfigFile, None]
img_name Optional[str]

auto_default_image()

def auto_default_image()

create_from()

def create_from(
    default_image: Optional[Image],
    other_images: typing.Optional[typing.List[Image]],
):
Parameter Type
default_image Optional[Image]
other_images typing.Optional[typing.List[Image]]

find_image()

def find_image(
    name,
):

Return an image, by name, if it exists.

Parameter Type
name

from_dict()

def from_dict(
    kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
    infer_missing,
):
Parameter Type
kvs typing.Union[dict, list, str, int, float, bool, NoneType]
infer_missing

from_images()

def from_images(
    default_image: str,
    m: typing.Optional[typing.Dict[str, str]],
):

Allows you to programmatically create an ImageConfig. Usually only the default_image is required, unless your workflow uses multiple images

.. code:: python

ImageConfig.from_dict( “ghcr.io/flyteorg/flytecookbook:v1.0.0”, { “spark”: “ghcr.io/flyteorg/myspark:…”, “other”: “…”, } )

urn:

Parameter Type
default_image str
m typing.Optional[typing.Dict[str, str]]

from_json()

def from_json(
    s: typing.Union[str, bytes, bytearray],
    parse_float,
    parse_int,
    parse_constant,
    infer_missing,
    kw,
):
Parameter Type
s typing.Union[str, bytes, bytearray]
parse_float
parse_int
parse_constant
infer_missing
kw

schema()

def schema(
    infer_missing: bool,
    only,
    exclude,
    many: bool,
    context,
    load_only,
    dump_only,
    partial: bool,
    unknown,
):
Parameter Type
infer_missing bool
only
exclude
many bool
context
load_only
dump_only
partial bool
unknown

to_dict()

def to_dict(
    encode_json,
):
Parameter Type
encode_json

to_json()

def to_json(
    skipkeys: bool,
    ensure_ascii: bool,
    check_circular: bool,
    allow_nan: bool,
    indent: typing.Union[int, str, NoneType],
    separators: typing.Tuple[str, str],
    default: typing.Callable,
    sort_keys: bool,
    kw,
):
Parameter Type
skipkeys bool
ensure_ascii bool
check_circular bool
allow_nan bool
indent typing.Union[int, str, NoneType]
separators typing.Tuple[str, str]
default typing.Callable
sort_keys bool
kw

validate_image()

def validate_image(
    _: typing.Any,
    param: str,
    values: tuple,
):

Validates the image to match the standard format. Also validates that only one default image is provided. a default image, is one that is specified as default=<image_uri> or just <image_uri>. All other images should be provided with a name, in the format name=<image_uri> This method can be used with the CLI

Parameter Type
_ typing.Any
param str
values tuple

flytekit.remote.remote.ImageSpec

This class is used to specify the docker image that will be used to run the task.

def ImageSpec(
    name: str,
    python_version: str,
    builder: typing.Optional[str],
    source_root: typing.Optional[str],
    env: typing.Optional[typing.Dict[str, str]],
    registry: typing.Optional[str],
    packages: typing.Optional[typing.List[str]],
    conda_packages: typing.Optional[typing.List[str]],
    conda_channels: typing.Optional[typing.List[str]],
    requirements: typing.Optional[str],
    apt_packages: typing.Optional[typing.List[str]],
    cuda: typing.Optional[str],
    cudnn: typing.Optional[str],
    base_image: typing.Union[str, ForwardRef('ImageSpec'), NoneType],
    platform: str,
    pip_index: typing.Optional[str],
    pip_extra_index_url: typing.Optional[typing.List[str]],
    pip_secret_mounts: typing.Optional[typing.List[typing.Tuple[str, str]]],
    pip_extra_args: typing.Optional[str],
    registry_config: typing.Optional[str],
    entrypoint: typing.Optional[typing.List[str]],
    commands: typing.Optional[typing.List[str]],
    tag_format: typing.Optional[str],
    source_copy_mode: typing.Optional[flytekit.constants.CopyFileDetection],
    copy: typing.Optional[typing.List[str]],
    python_exec: typing.Optional[str],
):
Parameter Type
name str
python_version str
builder typing.Optional[str]
source_root typing.Optional[str]
env typing.Optional[typing.Dict[str, str]]
registry typing.Optional[str]
packages typing.Optional[typing.List[str]]
conda_packages typing.Optional[typing.List[str]]
conda_channels typing.Optional[typing.List[str]]
requirements typing.Optional[str]
apt_packages typing.Optional[typing.List[str]]
cuda typing.Optional[str]
cudnn typing.Optional[str]
base_image typing.Union[str, ForwardRef('ImageSpec'), NoneType]
platform str
pip_index typing.Optional[str]
pip_extra_index_url typing.Optional[typing.List[str]]
pip_secret_mounts typing.Optional[typing.List[typing.Tuple[str, str]]]
pip_extra_args typing.Optional[str]
registry_config typing.Optional[str]
entrypoint typing.Optional[typing.List[str]]
commands typing.Optional[typing.List[str]]
tag_format typing.Optional[str]
source_copy_mode typing.Optional[flytekit.constants.CopyFileDetection]
copy typing.Optional[typing.List[str]]
python_exec typing.Optional[str]

Methods

Method Description
exist() Check if the image exists in the registry
force_push() Builder that returns a new image spec with force push enabled
from_env() Create ImageSpec with the environment’s Python version and packages pinned to the ones in the environment
image_name() Full image name with tag
is_container() Check if the current container image in the pod is built from current image spec
with_apt_packages() Builder that returns a new image spec with an additional list of apt packages that will be executed during the building process
with_commands() Builder that returns a new image spec with an additional list of commands that will be executed during the building process
with_copy() Builder that returns a new image spec with the source files copied to the destination directory
with_packages() Builder that returns a new image speck with additional python packages that will be installed during the building process

exist()

def exist()

Check if the image exists in the registry. Return True if the image exists in the registry, False otherwise. Return None if failed to check if the image exists due to the permission issue or other reasons.

force_push()

def force_push()

Builder that returns a new image spec with force push enabled.

from_env()

def from_env(
    pinned_packages: typing.Optional[typing.List[str]],
    kwargs,
):

Create ImageSpec with the environment’s Python version and packages pinned to the ones in the environment.

Parameter Type
pinned_packages typing.Optional[typing.List[str]]
kwargs **kwargs

image_name()

def image_name()

Full image name with tag.

is_container()

def is_container()

Check if the current container image in the pod is built from current image spec. :return: True if the current container image in the pod is built from current image spec, False otherwise.

with_apt_packages()

def with_apt_packages(
    apt_packages: typing.Union[str, typing.List[str]],
):

Builder that returns a new image spec with an additional list of apt packages that will be executed during the building process.

Parameter Type
apt_packages typing.Union[str, typing.List[str]]

with_commands()

def with_commands(
    commands: typing.Union[str, typing.List[str]],
):

Builder that returns a new image spec with an additional list of commands that will be executed during the building process.

Parameter Type
commands typing.Union[str, typing.List[str]]

with_copy()

def with_copy(
    src: typing.Union[str, typing.List[str]],
):

Builder that returns a new image spec with the source files copied to the destination directory.

Parameter Type
src typing.Union[str, typing.List[str]]

with_packages()

def with_packages(
    packages: typing.Union[str, typing.List[str]],
):

Builder that returns a new image speck with additional python packages that will be installed during the building process.

Parameter Type
packages typing.Union[str, typing.List[str]]

Properties

Property Type Description
tag

flytekit.remote.remote.LaunchPlan

Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the :std:ref:core concepts <flyte:divedeep-launchplans> if you are unfamiliar with them.

Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow

.. code-block:: python

@workflow def wf(a: int, c: str) -> str: …

Create the default launch plan with

.. code-block:: python

LaunchPlan.get_or_create(workflow=my_wf)

If you specify additional parameters, you’ll also have to give the launch plan a unique name. Default and fixed inputs can be expressed as Python native values like so:

.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # fixed_and_default_start :end-before: # fixed_and_default_end :language: python :dedent: 4

Additionally, a launch plan can be configured to run on a schedule and emit notifications.

Please see the relevant Schedule and Notification objects as well.

To configure the remaining parameters, you’ll need to import the relevant model objects as well.

.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # schedule_start :end-before: # schedule_end :language: python :dedent: 4

.. code-block:: python

from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfig

Then use as follows

.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # auth_role_start :end-before: # auth_role_end :language: python :dedent: 4

def LaunchPlan(
    name: str,
    workflow: _annotated_workflow.WorkflowBase,
    parameters: _interface_models.ParameterMap,
    fixed_inputs: _literal_models.LiteralMap,
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):
Parameter Type
name str
workflow _annotated_workflow.WorkflowBase
parameters _interface_models.ParameterMap
fixed_inputs _literal_models.LiteralMap
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

Methods

Method Description
clone_with() None
construct_node_metadata() None
create() None
get_default_launch_plan() Users should probably call the get_or_create function defined below instead
get_or_create() This function offers a friendlier interface for creating launch plans

clone_with()

def clone_with(
    name: str,
    parameters: Optional[_interface_models.ParameterMap],
    fixed_inputs: Optional[_literal_models.LiteralMap],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):
Parameter Type
name str
parameters Optional[_interface_models.ParameterMap]
fixed_inputs Optional[_literal_models.LiteralMap]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

construct_node_metadata()

def construct_node_metadata()

create()

def create(
    name: str,
    workflow: _annotated_workflow.WorkflowBase,
    default_inputs: Optional[Dict[str, Any]],
    fixed_inputs: Optional[Dict[str, Any]],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    auth_role: Optional[_common_models.AuthRole],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):
Parameter Type
name str
workflow _annotated_workflow.WorkflowBase
default_inputs Optional[Dict[str, Any]]
fixed_inputs Optional[Dict[str, Any]]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
auth_role Optional[_common_models.AuthRole]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

get_default_launch_plan()

def get_default_launch_plan(
    ctx: FlyteContext,
    workflow: _annotated_workflow.WorkflowBase,
):

Users should probably call the get_or_create function defined below instead. A default launch plan is the one that will just pick up whatever default values are defined in the workflow function signature (if any) and use the default auth information supplied during serialization, with no notifications or schedules.

Parameter Type
ctx FlyteContext
workflow _annotated_workflow.WorkflowBase

get_or_create()

def get_or_create(
    workflow: _annotated_workflow.WorkflowBase,
    name: Optional[str],
    default_inputs: Optional[Dict[str, Any]],
    fixed_inputs: Optional[Dict[str, Any]],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    auth_role: Optional[_common_models.AuthRole],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):

This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it will be used. If creating the default launch plan, none of the other arguments may be specified.

The resulting launch plan is also cached and if called again with the same name, the cached version is returned

Parameter Type
workflow _annotated_workflow.WorkflowBase
name Optional[str]
default_inputs Optional[Dict[str, Any]]
fixed_inputs Optional[Dict[str, Any]]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
auth_role Optional[_common_models.AuthRole]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

Properties

Property Type Description
annotations
fixed_inputs
interface
labels
max_parallelism
name
notifications
overwrite_cache
parameters
python_interface
raw_output_data_config
saved_inputs
schedule
security_context
should_auto_activate
trigger
workflow

flytekit.remote.remote.LaunchPlanState

Methods

Method Description
enum_to_string()

enum_to_string()

def enum_to_string(
    val,
):
Parameter Type
val

flytekit.remote.remote.LazyEntity

Fetches the entity when the entity is called or when the entity is retrieved. The entity is derived from RemoteEntity so that it behaves exactly like the mimicked entity.

def LazyEntity(
    name: str,
    getter: typing.Callable[[], ~T],
    args,
    kwargs,
):
Parameter Type
name str
getter typing.Callable[[], ~T]
args *args
kwargs **kwargs

Methods

Method Description
compile() None
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition
entity_fetched() None
execute() None
local_execute() None
local_execution_mode() None

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

entity_fetched()

def entity_fetched()

execute()

def execute(
    kwargs,
):
Parameter Type
kwargs **kwargs

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

Properties

Property Type Description
entity
id
name
python_interface

flytekit.remote.remote.Literal

def Literal(
    scalar: typing.Optional[flytekit.models.literals.Scalar],
    collection: typing.Optional[flytekit.models.literals.LiteralCollection],
    map: typing.Optional[flytekit.models.literals.LiteralMap],
    hash: typing.Optional[str],
    metadata: typing.Optional[typing.Dict[str, str]],
    offloaded_metadata: typing.Optional[flytekit.models.literals.LiteralOffloadedMetadata],
):

This IDL message represents a literal value in the Flyte ecosystem.

Parameter Type
scalar typing.Optional[flytekit.models.literals.Scalar]
collection typing.Optional[flytekit.models.literals.LiteralCollection]
map typing.Optional[flytekit.models.literals.LiteralMap]
hash typing.Optional[str]
metadata typing.Optional[typing.Dict[str, str]]
offloaded_metadata typing.Optional[flytekit.models.literals.LiteralOffloadedMetadata]

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
set_metadata() Note: This is a mutation on the literal
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object: flyteidl.core.literals_pb2.Literal,
):
Parameter Type
pb2_object flyteidl.core.literals_pb2.Literal

serialize_to_string()

def serialize_to_string()

set_metadata()

def set_metadata(
    metadata: typing.Dict[str, str],
):

Note: This is a mutation on the literal

Parameter Type
metadata typing.Dict[str, str]

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
collection
hash
is_empty
map
metadata
offloaded_metadata
scalar
value

flytekit.remote.remote.LiteralMap

def LiteralMap(
    literals,
):
Parameter Type
literals

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
is_empty
literals

flytekit.remote.remote.LiteralsResolver

LiteralsResolver is a helper class meant primarily for use with the FlyteRemote experience or any other situation where you might be working with LiteralMaps. This object allows the caller to specify the Python type that should correspond to an element of the map.

def LiteralsResolver(
    literals: typing.Dict[str, Literal],
    variable_map: Optional[Dict[str, _interface_models.Variable]],
    ctx: Optional[FlyteContext],
):
Parameter Type
literals typing.Dict[str, Literal]
variable_map Optional[Dict[str, _interface_models.Variable]]
ctx Optional[FlyteContext]

Methods

Method Description
as_python_native() This should return the native Python representation, compatible with unpacking
clear() D
copy() None
fromkeys() None
get() This will get the attr value from the Literal map, and invoke the TypeEngine to convert it into a Python
get_literal() None
items() D
keys() D
pop() D
popitem() D
setdefault() D
update() D
update_type_hints() None
values() D

as_python_native()

def as_python_native(
    python_interface: Interface,
):

This should return the native Python representation, compatible with unpacking. This function relies on Python interface outputs being ordered correctly.

Parameter Type
python_interface Interface

clear()

def clear()

D.clear() -> None. Remove all items from D.

copy()

def copy()

fromkeys()

def fromkeys(
    iterable,
    value,
):
Parameter Type
iterable
value

get()

def get(
    attr: str,
    as_type: Optional[typing.Type],
):

This will get the attr value from the Literal map, and invoke the TypeEngine to convert it into a Python native value. A Python type can optionally be supplied. If successful, the native value will be cached and future calls will return the cached value instead.

Parameter Type
attr str
as_type Optional[typing.Type]

get_literal()

def get_literal(
    key: str,
):
Parameter Type
key str

items()

def items()

D.items() -> a set-like object providing a view on D’s items

keys()

def keys()

D.keys() -> a set-like object providing a view on D’s keys

pop()

def pop(
    key,
    default,
):

D.pop(k[,d]) -> v, remove specified key and return the corresponding value. If key is not found, d is returned if given, otherwise KeyError is raised.

Parameter Type
key
default

popitem()

def popitem()

D.popitem() -> (k, v), remove and return some (key, value) pair as a 2-tuple; but raise KeyError if D is empty.

setdefault()

def setdefault(
    key,
    default,
):

D.setdefault(k[,d]) -> D.get(k,d), also set D[k]=d if k not in D

Parameter Type
key
default

update()

def update(
    other,
    kwds,
):

D.update([E, ]**F) -> None. Update D from mapping/iterable E and F. If E present and has a .keys() method, does: for k in E.keys(): D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v

Parameter Type
other
kwds

update_type_hints()

def update_type_hints(
    type_hints: typing.Dict[str, typing.Type],
):
Parameter Type
type_hints typing.Dict[str, typing.Type]

values()

def values()

D.values() -> an object providing a view on D’s values

Properties

Property Type Description
literals
native_values
variable_map

flytekit.remote.remote.NamedEntityIdentifier

def NamedEntityIdentifier(
    project,
    domain,
    name,
):
Parameter Type
project
domain
name

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl() Stores object to a Flyte-IDL defined protobuf
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    idl_object,
):
Parameter Type
idl_object

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

Stores object to a Flyte-IDL defined protobuf.

verbose_string()

def verbose_string()

Properties

Property Type Description
domain
is_empty
name
project

flytekit.remote.remote.Node

def Node(
    id,
    metadata,
    inputs,
    upstream_node_ids,
    output_aliases,
    task_node,
    workflow_node,
    branch_node,
    gate_node: typing.Optional[flytekit.models.core.workflow.GateNode],
    array_node: typing.Optional[flytekit.models.core.workflow.ArrayNode],
):

A Workflow graph Node. One unit of execution in the graph. Each node can be linked to a Task, a Workflow or a branch node. One of the nodes must be specified.

Parameter Type
id
metadata
inputs
upstream_node_ids
output_aliases
task_node
workflow_node
branch_node
gate_node typing.Optional[flytekit.models.core.workflow.GateNode]
array_node typing.Optional[flytekit.models.core.workflow.ArrayNode]

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
array_node
branch_node
gate_node
id
inputs
is_empty
metadata
output_aliases
target
task_node
upstream_node_ids
workflow_node

flytekit.remote.remote.NodeExecutionGetDataResponse

Currently, node, task, and workflow execution all have the same get data response. So we’ll create this common superclass to reduce code duplication until things diverge in the future.

def NodeExecutionGetDataResponse(
    args,
    dynamic_workflow: typing.Optional[DynamicWorkflowNodeMetadata],
    kwargs,
):
Parameter Type
args *args
dynamic_workflow typing.Optional[DynamicWorkflowNodeMetadata]
kwargs **kwargs

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
dynamic_workflow
full_inputs
full_outputs
inputs
is_empty
outputs

flytekit.remote.remote.NodeMetadata

def NodeMetadata(
    name,
    timeout,
    retries,
    interruptible: typing.Optional[bool],
    cacheable: typing.Optional[bool],
    cache_version: typing.Optional[str],
    cache_serializable: typing.Optional[bool],
):

Defines extra information about the Node.

Parameter Type
name
timeout
retries
interruptible typing.Optional[bool]
cacheable typing.Optional[bool]
cache_version typing.Optional[str]
cache_serializable typing.Optional[bool]

Methods

Method Description
from_flyte_idl() None
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
cache_serializable
cache_version
cacheable
interruptible
is_empty
name
retries
timeout

flytekit.remote.remote.NotificationList

def NotificationList(
    notifications,
):
Parameter Type
notifications

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
is_empty
notifications

flytekit.remote.remote.Options

These are options that can be configured for a launchplan during registration or overridden during an execution. For instance two people may want to run the same workflow but have the offloaded data stored in two different buckets. Or you may want labels or annotations to be different. This object is used when launching an execution in a Flyte backend, and also when registering launch plans.

def Options(
    labels: typing.Optional[flytekit.models.common.Labels],
    annotations: typing.Optional[flytekit.models.common.Annotations],
    raw_output_data_config: typing.Optional[flytekit.models.common.RawOutputDataConfig],
    security_context: typing.Optional[flytekit.models.security.SecurityContext],
    max_parallelism: typing.Optional[int],
    notifications: typing.Optional[typing.List[flytekit.models.common.Notification]],
    disable_notifications: typing.Optional[bool],
    overwrite_cache: typing.Optional[bool],
):
Parameter Type
labels typing.Optional[flytekit.models.common.Labels]
annotations typing.Optional[flytekit.models.common.Annotations]
raw_output_data_config typing.Optional[flytekit.models.common.RawOutputDataConfig]
security_context typing.Optional[flytekit.models.security.SecurityContext]
max_parallelism typing.Optional[int]
notifications typing.Optional[typing.List[flytekit.models.common.Notification]]
disable_notifications typing.Optional[bool]
overwrite_cache typing.Optional[bool]

Methods

Method Description
default_from() None

default_from()

def default_from(
    k8s_service_account: typing.Optional[str],
    raw_data_prefix: typing.Optional[str],
):
Parameter Type
k8s_service_account typing.Optional[str]
raw_data_prefix typing.Optional[str]

flytekit.remote.remote.OrderedDict

Dictionary that remembers insertion order

flytekit.remote.remote.PickledEntity

Represents the structure of the pickled object stored in the .pkl file for interactive mode.

Attributes: metadata: Metadata about the pickled entities including Python version entities: Dictionary mapping entity names to their PythonAutoContainerTask instances

def PickledEntity(
    metadata: PickledEntityMetadata,
    entities: Dict[str, PythonAutoContainerTask],
):
Parameter Type
metadata PickledEntityMetadata
entities Dict[str, PythonAutoContainerTask]

flytekit.remote.remote.PickledEntityMetadata

Metadata for a pickled entity containing version information.

Attributes: python_version: The Python version string (e.g. “3.12.0”) used to create the pickle

def PickledEntityMetadata(
    python_version: str,
):
Parameter Type
python_version str

flytekit.remote.remote.Progress

Renders an auto-updating progress bar(s).

def Progress(
    columns: typing.Union[str, rich.progress.ProgressColumn],
    console: typing.Optional[rich.console.Console],
    auto_refresh: bool,
    refresh_per_second: float,
    speed_estimate_period: float,
    transient: bool,
    redirect_stdout: bool,
    redirect_stderr: bool,
    get_time: typing.Optional[typing.Callable[[], float]],
    disable: bool,
    expand: bool,
):
Parameter Type
columns typing.Union[str, rich.progress.ProgressColumn]
console typing.Optional[rich.console.Console]
auto_refresh bool
refresh_per_second float
speed_estimate_period float
transient bool
redirect_stdout bool
redirect_stderr bool
get_time typing.Optional[typing.Callable[[], float]]
disable bool
expand bool

Methods

Method Description
add_task() Add a new ’task’ to the Progress display
advance() Advance task by a number of steps
get_default_columns() Get the default columns used for a new Progress instance:
get_renderable() Get a renderable for the progress display
get_renderables() Get a number of renderables for the progress display
make_tasks_table() Get a table to render the Progress display
open() Track progress while reading from a binary file
refresh() Refresh (render) the progress information
remove_task() Delete a task if it exists
reset() Reset a task so completed is 0 and the clock is reset
start() Start the progress display
start_task() Start a task
stop() Stop the progress display
stop_task() Stop a task
track() Track progress by iterating over a sequence
update() Update information associated with a task
wrap_file() Track progress file reading from a binary file

add_task()

def add_task(
    description: str,
    start: bool,
    total: typing.Optional[float],
    completed: int,
    visible: bool,
    fields: typing.Any,
):

Add a new ’task’ to the Progress display.

Parameter Type
description str
start bool
total typing.Optional[float]
completed int
visible bool
fields typing.Any

advance()

def advance(
    task_id: rich.progress.TaskID,
    advance: float,
):

Advance task by a number of steps.

Parameter Type
task_id rich.progress.TaskID
advance float

get_default_columns()

def get_default_columns()

Get the default columns used for a new Progress instance:

  • a text column for the description (TextColumn)
  • the bar itself (BarColumn)
  • a text column showing completion percentage (TextColumn)
  • an estimated-time-remaining column (TimeRemainingColumn) If the Progress instance is created without passing a columns argument, the default columns defined here will be used.

You can also create a Progress instance using custom columns before and/or after the defaults, as in this example:

progress = Progress( SpinnerColumn(), *Progress.get_default_columns(), “Elapsed:”, TimeElapsedColumn(), )

This code shows the creation of a Progress display, containing a spinner to the left, the default columns, and a labeled elapsed time column.

get_renderable()

def get_renderable()

Get a renderable for the progress display.

get_renderables()

def get_renderables()

Get a number of renderables for the progress display.

make_tasks_table()

def make_tasks_table(
    tasks: typing.Iterable[rich.progress.Task],
):

Get a table to render the Progress display.

Parameter Type
tasks typing.Iterable[rich.progress.Task]

open()

def open(
    file: typing.Union[str, ForwardRef('PathLike[str]'), bytes],
    mode: typing.Union[typing.Literal['rb'], typing.Literal['rt'], typing.Literal['r']],
    buffering: int,
    encoding: typing.Optional[str],
    errors: typing.Optional[str],
    newline: typing.Optional[str],
    total: typing.Optional[int],
    task_id: typing.Optional[rich.progress.TaskID],
    description: str,
):

Track progress while reading from a binary file.

Parameter Type
file typing.Union[str, ForwardRef('PathLike[str]'), bytes]
mode typing.Union[typing.Literal['rb'], typing.Literal['rt'], typing.Literal['r']]
buffering int
encoding typing.Optional[str]
errors typing.Optional[str]
newline typing.Optional[str]
total typing.Optional[int]
task_id typing.Optional[rich.progress.TaskID]
description str

refresh()

def refresh()

Refresh (render) the progress information.

remove_task()

def remove_task(
    task_id: rich.progress.TaskID,
):

Delete a task if it exists.

Parameter Type
task_id rich.progress.TaskID

reset()

def reset(
    task_id: rich.progress.TaskID,
    start: bool,
    total: typing.Optional[float],
    completed: int,
    visible: typing.Optional[bool],
    description: typing.Optional[str],
    fields: typing.Any,
):

Reset a task so completed is 0 and the clock is reset.

Parameter Type
task_id rich.progress.TaskID
start bool
total typing.Optional[float]
completed int
visible typing.Optional[bool]
description typing.Optional[str]
fields typing.Any

start()

def start()

Start the progress display.

start_task()

def start_task(
    task_id: rich.progress.TaskID,
):

Start a task.

Starts a task (used when calculating elapsed time). You may need to call this manually, if you called add_task with start=False.

Parameter Type
task_id rich.progress.TaskID

stop()

def stop()

Stop the progress display.

stop_task()

def stop_task(
    task_id: rich.progress.TaskID,
):

Stop a task.

This will freeze the elapsed time on the task.

Parameter Type
task_id rich.progress.TaskID

track()

def track(
    sequence: typing.Union[typing.Iterable[~ProgressType], typing.Sequence[~ProgressType]],
    total: typing.Optional[float],
    completed: int,
    task_id: typing.Optional[rich.progress.TaskID],
    description: str,
    update_period: float,
):

Track progress by iterating over a sequence.

Parameter Type
sequence typing.Union[typing.Iterable[~ProgressType], typing.Sequence[~ProgressType]]
total typing.Optional[float]
completed int
task_id typing.Optional[rich.progress.TaskID]
description str
update_period float

update()

def update(
    task_id: rich.progress.TaskID,
    total: typing.Optional[float],
    completed: typing.Optional[float],
    advance: typing.Optional[float],
    description: typing.Optional[str],
    visible: typing.Optional[bool],
    refresh: bool,
    fields: typing.Any,
):

Update information associated with a task.

Parameter Type
task_id rich.progress.TaskID
total typing.Optional[float]
completed typing.Optional[float]
advance typing.Optional[float]
description typing.Optional[str]
visible typing.Optional[bool]
refresh bool
fields typing.Any

wrap_file()

def wrap_file(
    file: typing.BinaryIO,
    total: typing.Optional[int],
    task_id: typing.Optional[rich.progress.TaskID],
    description: str,
):

Track progress file reading from a binary file.

Parameter Type
file typing.BinaryIO
total typing.Optional[int]
task_id typing.Optional[rich.progress.TaskID]
description str

Properties

Property Type Description
console
finished
task_ids
tasks

flytekit.remote.remote.Project

def Project(
    id,
    name,
    description,
    state,
):

A project represents a logical grouping used to organize entities (tasks, workflows, executions) in the Flyte platform.

Parameter Type
id
name
description
state

Methods

Method Description
active_project() None
archived_project() None
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

active_project()

def active_project(
    id,
):
Parameter Type
id

archived_project()

def archived_project(
    id,
):
Parameter Type
id

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
description
id
is_empty
name
state

flytekit.remote.remote.PythonAutoContainerTask

A Python AutoContainer task should be used as the base for all extensions that want the user’s code to be in the container and the container information to be automatically captured. This base will auto configure the image and image version to be used for all its derivatives.

If you are looking to extend, you might prefer to use PythonFunctionTask or PythonInstanceTask

def PythonAutoContainerTask(
    name: str,
    task_config: T,
    task_type,
    container_image: Optional[Union[str, ImageSpec]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    environment: Optional[Dict[str, str]],
    task_resolver: Optional[TaskResolverMixin],
    secret_requests: Optional[List[Secret]],
    pod_template: Optional[PodTemplate],
    pod_template_name: Optional[str],
    accelerator: Optional[BaseAccelerator],
    shared_memory: Optional[Union[L[True], str]],
    resources: Optional[Resources],
    kwargs,
):
Parameter Type
name str
task_config T
task_type
container_image Optional[Union[str, ImageSpec]]
requests Optional[Resources]
limits Optional[Resources]
environment Optional[Dict[str, str]]
task_resolver Optional[TaskResolverMixin]
secret_requests Optional[List[Secret]]
pod_template Optional[PodTemplate]
pod_template_name Optional[str]
accelerator Optional[BaseAccelerator]
shared_memory Optional[Union[L[True], str]]
resources Optional[Resources]
kwargs **kwargs

Methods

Method Description
compile() Generates a node that encapsulates this task in a workflow definition
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition
dispatch_execute() This method translates Flyte’s Type system based input values and invokes the actual call to the executor
execute() This method will be invoked to execute the task
find_lhs() None
get_command() Returns the command which should be used in the container definition for the serialized version of this task
get_config() Returns the task config as a serializable dictionary
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary
get_default_command() Returns the default pyflyte-execute command used to run this on hosted Flyte platforms
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte
get_image() Update image spec based on fast registration usage, and return string representing the image
get_input_types() Returns the names and python types as a dictionary for the inputs of this task
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte
get_type_for_input_var() Returns the python type for an input variable by name
get_type_for_output_var() Returns the python type for the specified output variable by name
local_execute() This function is used only in the local execution path and is responsible for calling dispatch execute
local_execution_mode() None
post_execute() Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
pre_execute() This is the method that will be invoked directly before executing the task method and before all the inputs
reset_command_fn() Resets the command which should be used in the container definition of this task to the default arguments
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution
set_command_fn() By default, the task will run on the Flyte platform using the pyflyte-execute command
set_resolver() By default, flytekit uses the DefaultTaskResolver to resolve the task

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):

Generates a node that encapsulates this task in a workflow definition.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

dispatch_execute()

def dispatch_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.

  • VoidPromise is returned in the case when the task itself declares no outputs.
  • Literal Map is returned when the task returns either one more outputs in the declaration. Individual outputs may be none
  • DynamicJobSpec is returned when a dynamic workflow is executed
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

execute()

def execute(
    kwargs,
):

This method will be invoked to execute the task.

Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_command()

def get_command(
    settings: SerializationSettings,
):

Returns the command which should be used in the container definition for the serialized version of this task registered on a hosted Flyte platform.

Parameter Type
settings SerializationSettings

get_config()

def get_config(
    settings: SerializationSettings,
):

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type
settings SerializationSettings

get_container()

def get_container(
    settings: SerializationSettings,
):

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings SerializationSettings

get_custom()

def get_custom(
    settings: flytekit.configuration.SerializationSettings,
):

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_default_command()

def get_default_command(
    settings: SerializationSettings,
):

Returns the default pyflyte-execute command used to run this on hosted Flyte platforms.

Parameter Type
settings SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: SerializationSettings,
):

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type
settings SerializationSettings

get_image()

def get_image(
    settings: SerializationSettings,
):

Update image spec based on fast registration usage, and return string representing the image

Parameter Type
settings SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: SerializationSettings,
):

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings SerializationSettings

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
):

Returns the python type for an input variable by name.

Parameter Type
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
):

Returns the python type for the specified output variable by name.

Parameter Type
k str
v typing.Any

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

post_execute()

def post_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
    rval: typing.Any,
):

Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]
rval typing.Any

pre_execute()

def pre_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):

This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called

This should return either the same context of the mutated context

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]

reset_command_fn()

def reset_command_fn()

Resets the command which should be used in the container definition of this task to the default arguments. This is useful when the command line is overridden at serialization time.

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

set_command_fn()

def set_command_fn(
    get_command_fn: Optional[Callable[[SerializationSettings], List[str]]],
):

By default, the task will run on the Flyte platform using the pyflyte-execute command. However, it can be useful to update the command with which the task is serialized for specific cases like running map tasks (“pyflyte-map-execute”) or for fast-executed tasks.

Parameter Type
get_command_fn Optional[Callable[[SerializationSettings], List[str]]]

set_resolver()

def set_resolver(
    resolver: TaskResolverMixin,
):

By default, flytekit uses the DefaultTaskResolver to resolve the task. This method allows the user to set a custom task resolver. It can be useful to override the task resolver for specific cases like running tasks in the jupyter notebook.

Parameter Type
resolver TaskResolverMixin

Properties

Property Type Description
container_image
deck_fields
disable_deck
docs
enable_deck
environment
instantiated_in
interface
lhs
location
metadata
name
python_interface
resources
security_context
task_config
task_resolver
task_type
task_type_version

flytekit.remote.remote.PythonFunctionTask

A Python Function task should be used as the base for all extensions that have a python function. It will automatically detect interface of the python function and when serialized on the hosted Flyte platform handles the writing execution command to execute the function

It is advised this task is used using the @task decorator as follows

.. code-block: python

@task def my_func(a: int) -> str: …

In the above code, the name of the function, the module, and the interface (inputs = int and outputs = str) will be auto detected.

def PythonFunctionTask(
    task_config: T,
    task_function: Callable,
    task_type,
    ignore_input_vars: Optional[List[str]],
    execution_mode: ExecutionBehavior,
    task_resolver: Optional[TaskResolverMixin],
    node_dependency_hints: Optional[Iterable[Union['PythonFunctionTask', '_annotated_launch_plan.LaunchPlan', WorkflowBase]]],
    pickle_untyped: bool,
    kwargs,
):
Parameter Type
task_config T
task_function Callable
task_type
ignore_input_vars Optional[List[str]]
execution_mode ExecutionBehavior
task_resolver Optional[TaskResolverMixin]
node_dependency_hints Optional[Iterable[Union['PythonFunctionTask', '_annotated_launch_plan.LaunchPlan', WorkflowBase]]]
pickle_untyped bool
kwargs **kwargs

Methods

Method Description
compile() Generates a node that encapsulates this task in a workflow definition
compile_into_workflow() In the case of dynamic workflows, this function will produce a workflow definition at execution time which will
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition
dispatch_execute() This method translates Flyte’s Type system based input values and invokes the actual call to the executor
dynamic_execute() By the time this function is invoked, the local_execute function should have unwrapped the Promises and Flyte
execute() This method will be invoked to execute the task
find_lhs() None
get_command() Returns the command which should be used in the container definition for the serialized version of this task
get_config() Returns the task config as a serializable dictionary
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary
get_default_command() Returns the default pyflyte-execute command used to run this on hosted Flyte platforms
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte
get_image() Update image spec based on fast registration usage, and return string representing the image
get_input_types() Returns the names and python types as a dictionary for the inputs of this task
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte
get_type_for_input_var() Returns the python type for an input variable by name
get_type_for_output_var() Returns the python type for the specified output variable by name
local_execute() This function is used only in the local execution path and is responsible for calling dispatch execute
local_execution_mode() None
post_execute() Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
pre_execute() This is the method that will be invoked directly before executing the task method and before all the inputs
reset_command_fn() Resets the command which should be used in the container definition of this task to the default arguments
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution
set_command_fn() By default, the task will run on the Flyte platform using the pyflyte-execute command
set_resolver() By default, flytekit uses the DefaultTaskResolver to resolve the task

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):

Generates a node that encapsulates this task in a workflow definition.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

compile_into_workflow()

def compile_into_workflow(
    ctx: FlyteContext,
    task_function: Callable,
    kwargs,
):

In the case of dynamic workflows, this function will produce a workflow definition at execution time which will then proceed to be executed.

Parameter Type
ctx FlyteContext
task_function Callable
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

dispatch_execute()

def dispatch_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.

  • VoidPromise is returned in the case when the task itself declares no outputs.
  • Literal Map is returned when the task returns either one more outputs in the declaration. Individual outputs may be none
  • DynamicJobSpec is returned when a dynamic workflow is executed
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

dynamic_execute()

def dynamic_execute(
    task_function: Callable,
    kwargs,
):

By the time this function is invoked, the local_execute function should have unwrapped the Promises and Flyte literal wrappers so that the kwargs we are working with here are now Python native literal values. This function is also expected to return Python native literal values.

Since the user code within a dynamic task constitute a workflow, we have to first compile the workflow, and then execute that workflow.

When running for real in production, the task would stop after the compilation step, and then create a file representing that newly generated workflow, instead of executing it.

Parameter Type
task_function Callable
kwargs **kwargs

execute()

def execute(
    kwargs,
):

This method will be invoked to execute the task. If you do decide to override this method you must also handle dynamic tasks or you will no longer be able to use the task as a dynamic task generator.

Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_command()

def get_command(
    settings: SerializationSettings,
):

Returns the command which should be used in the container definition for the serialized version of this task registered on a hosted Flyte platform.

Parameter Type
settings SerializationSettings

get_config()

def get_config(
    settings: SerializationSettings,
):

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type
settings SerializationSettings

get_container()

def get_container(
    settings: SerializationSettings,
):

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings SerializationSettings

get_custom()

def get_custom(
    settings: flytekit.configuration.SerializationSettings,
):

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_default_command()

def get_default_command(
    settings: SerializationSettings,
):

Returns the default pyflyte-execute command used to run this on hosted Flyte platforms.

Parameter Type
settings SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: SerializationSettings,
):

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type
settings SerializationSettings

get_image()

def get_image(
    settings: SerializationSettings,
):

Update image spec based on fast registration usage, and return string representing the image

Parameter Type
settings SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: SerializationSettings,
):

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings SerializationSettings

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
):

Returns the python type for an input variable by name.

Parameter Type
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
):

Returns the python type for the specified output variable by name.

Parameter Type
k str
v typing.Any

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

post_execute()

def post_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
    rval: typing.Any,
):

Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]
rval typing.Any

pre_execute()

def pre_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):

This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called

This should return either the same context of the mutated context

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]

reset_command_fn()

def reset_command_fn()

Resets the command which should be used in the container definition of this task to the default arguments. This is useful when the command line is overridden at serialization time.

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

set_command_fn()

def set_command_fn(
    get_command_fn: Optional[Callable[[SerializationSettings], List[str]]],
):

By default, the task will run on the Flyte platform using the pyflyte-execute command. However, it can be useful to update the command with which the task is serialized for specific cases like running map tasks (“pyflyte-map-execute”) or for fast-executed tasks.

Parameter Type
get_command_fn Optional[Callable[[SerializationSettings], List[str]]]

set_resolver()

def set_resolver(
    resolver: TaskResolverMixin,
):

By default, flytekit uses the DefaultTaskResolver to resolve the task. This method allows the user to set a custom task resolver. It can be useful to override the task resolver for specific cases like running tasks in the jupyter notebook.

Parameter Type
resolver TaskResolverMixin

Properties

Property Type Description
container_image
deck_fields
disable_deck
docs
enable_deck
environment
execution_mode
instantiated_in
interface
lhs
location
metadata
name
node_dependency_hints
python_interface
resources
security_context
task_config
task_function
task_resolver
task_type
task_type_version

flytekit.remote.remote.PythonFunctionWorkflow

Please read :std:ref:flyte:divedeep-workflows first for a high-level understanding of what workflows are in Flyte. This Python object represents a workflow defined by a function and decorated with the :py:func:@workflow <flytekit.workflow> decorator. Please see notes on that object for additional information.

def PythonFunctionWorkflow(
    workflow_function: Callable,
    metadata: WorkflowMetadata,
    default_metadata: WorkflowMetadataDefaults,
    docstring: Optional[Docstring],
    on_failure: Optional[Union[WorkflowBase, Task]],
    docs: Optional[Documentation],
    pickle_untyped: bool,
    default_options: Optional[Options],
):
Parameter Type
workflow_function Callable
metadata WorkflowMetadata
default_metadata WorkflowMetadataDefaults
docstring Optional[Docstring]
on_failure Optional[Union[WorkflowBase, Task]]
docs Optional[Documentation]
pickle_untyped bool
default_options Optional[Options]

Methods

Method Description
add() None
compile() Supply static Python native values in the kwargs if you want them to be used in the compilation
construct_node_metadata() None
execute() This function is here only to try to streamline the pattern between workflows and tasks
find_lhs() None
get_all_tasks() Future proof method
load_task() Given the set of identifier keys, should return one Python Task or raise an error if not found
loader_args() This is responsible for turning an instance of a task into args that the load_task function can reconstitute
local_execute() None
local_execution_mode() None
task_name() Overridable function that can optionally return a custom name for a given task

add()

def add(
    t: flytekit.core.python_auto_container.PythonAutoContainerTask,
):
Parameter Type
t flytekit.core.python_auto_container.PythonAutoContainerTask

compile()

def compile(
    kwargs,
):

Supply static Python native values in the kwargs if you want them to be used in the compilation. This mimics a ‘closure’ in the traditional sense of the word.

Parameter Type
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

execute()

def execute(
    kwargs,
):

This function is here only to try to streamline the pattern between workflows and tasks. Since tasks call execute from dispatch_execute which is in local_execute, workflows should also call an execute inside local_execute. This makes mocking cleaner.

Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_all_tasks()

def get_all_tasks()

Future proof method. Just making it easy to access all tasks (Not required today as we auto register them)

load_task()

def load_task(
    loader_args: typing.List[str],
):

Given the set of identifier keys, should return one Python Task or raise an error if not found

Parameter Type
loader_args typing.List[str]

loader_args()

def loader_args(
    settings: flytekit.configuration.SerializationSettings,
    t: flytekit.core.python_auto_container.PythonAutoContainerTask,
):

This is responsible for turning an instance of a task into args that the load_task function can reconstitute.

Parameter Type
settings flytekit.configuration.SerializationSettings
t flytekit.core.python_auto_container.PythonAutoContainerTask

local_execute()

def local_execute(
    ctx: FlyteContext,
    kwargs,
):
Parameter Type
ctx FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

task_name()

def task_name(
    t: PythonAutoContainerTask,
):

Overridable function that can optionally return a custom name for a given task

Parameter Type
t PythonAutoContainerTask

Properties

Property Type Description
default_options
docs
failure_node
function
instantiated_in
interface
lhs
location
name
nodes
on_failure
output_bindings
python_interface
short_name
workflow_metadata
workflow_metadata_defaults

flytekit.remote.remote.PythonTask

Base Class for all Tasks with a Python native Interface. This should be directly used for task types, that do not have a python function to be executed. Otherwise refer to :py:class:flytekit.PythonFunctionTask.

def PythonTask(
    task_type: str,
    name: str,
    task_config: typing.Optional[~T],
    interface: typing.Optional[flytekit.core.interface.Interface],
    environment: typing.Optional[typing.Dict[str, str]],
    disable_deck: typing.Optional[bool],
    enable_deck: typing.Optional[bool],
    deck_fields: typing.Optional[typing.Tuple[flytekit.deck.deck.DeckField, ...]],
    kwargs,
):
Parameter Type
task_type str
name str
task_config typing.Optional[~T]
interface typing.Optional[flytekit.core.interface.Interface]
environment typing.Optional[typing.Dict[str, str]]
disable_deck typing.Optional[bool]
enable_deck typing.Optional[bool]
deck_fields typing.Optional[typing.Tuple[flytekit.deck.deck.DeckField, ...]]
kwargs **kwargs

Methods

Method Description
compile() Generates a node that encapsulates this task in a workflow definition
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition
dispatch_execute() This method translates Flyte’s Type system based input values and invokes the actual call to the executor
execute() This method will be invoked to execute the task
find_lhs() None
get_config() Returns the task config as a serializable dictionary
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte
get_input_types() Returns the names and python types as a dictionary for the inputs of this task
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte
get_type_for_input_var() Returns the python type for an input variable by name
get_type_for_output_var() Returns the python type for the specified output variable by name
local_execute() This function is used only in the local execution path and is responsible for calling dispatch execute
local_execution_mode() None
post_execute() Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
pre_execute() This is the method that will be invoked directly before executing the task method and before all the inputs
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):

Generates a node that encapsulates this task in a workflow definition.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

dispatch_execute()

def dispatch_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.

  • VoidPromise is returned in the case when the task itself declares no outputs.
  • Literal Map is returned when the task returns either one more outputs in the declaration. Individual outputs may be none
  • DynamicJobSpec is returned when a dynamic workflow is executed
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

execute()

def execute(
    kwargs,
):

This method will be invoked to execute the task.

Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_config()

def get_config(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_container()

def get_container(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_custom()

def get_custom(
    settings: flytekit.configuration.SerializationSettings,
):

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
):

Returns the python type for an input variable by name.

Parameter Type
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
):

Returns the python type for the specified output variable by name.

Parameter Type
k str
v typing.Any

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

post_execute()

def post_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
    rval: typing.Any,
):

Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]
rval typing.Any

pre_execute()

def pre_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):

This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called

This should return either the same context of the mutated context

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

Properties

Property Type Description
deck_fields
disable_deck
docs
enable_deck
environment
instantiated_in
interface
lhs
location
metadata
name
python_interface
security_context
task_config
task_type
task_type_version

flytekit.remote.remote.ReferenceEntity

def ReferenceEntity(
    reference: typing.Union[flytekit.core.reference_entity.WorkflowReference, flytekit.core.reference_entity.TaskReference, flytekit.core.reference_entity.LaunchPlanReference],
    inputs: typing.Dict[str, typing.Type],
    outputs: typing.Dict[str, typing.Type],
):
Parameter Type
reference typing.Union[flytekit.core.reference_entity.WorkflowReference, flytekit.core.reference_entity.TaskReference, flytekit.core.reference_entity.LaunchPlanReference]
inputs typing.Dict[str, typing.Type]
outputs typing.Dict[str, typing.Type]

Methods

Method Description
compile() None
construct_node_metadata() None
execute() None
local_execute() Please see the local_execute comments in the main task
local_execution_mode() None
unwrap_literal_map_and_execute() Please see the implementation of the dispatch_execute function in the real task

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

execute()

def execute(
    kwargs,
):
Parameter Type
kwargs **kwargs

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

Please see the local_execute comments in the main task.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

unwrap_literal_map_and_execute()

def unwrap_literal_map_and_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Please see the implementation of the dispatch_execute function in the real task.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

Properties

Property Type Description
id
interface
name
python_interface
reference

flytekit.remote.remote.ReferenceLaunchPlan

A reference launch plan serves as a pointer to a Launch Plan that already exists on your Flyte installation. This object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface. If at registration time the interface provided causes an issue with compilation, an error will be returned.

def ReferenceLaunchPlan(
    project: str,
    domain: str,
    name: str,
    version: str,
    inputs: Dict[str, Type],
    outputs: Dict[str, Type],
):
Parameter Type
project str
domain str
name str
version str
inputs Dict[str, Type]
outputs Dict[str, Type]

Methods

Method Description
clone_with() None
compile() None
construct_node_metadata() None
create() None
execute() None
get_default_launch_plan() Users should probably call the get_or_create function defined below instead
get_or_create() This function offers a friendlier interface for creating launch plans
local_execute() Please see the local_execute comments in the main task
local_execution_mode() None
unwrap_literal_map_and_execute() Please see the implementation of the dispatch_execute function in the real task

clone_with()

def clone_with(
    name: str,
    parameters: Optional[_interface_models.ParameterMap],
    fixed_inputs: Optional[_literal_models.LiteralMap],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):
Parameter Type
name str
parameters Optional[_interface_models.ParameterMap]
fixed_inputs Optional[_literal_models.LiteralMap]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

create()

def create(
    name: str,
    workflow: _annotated_workflow.WorkflowBase,
    default_inputs: Optional[Dict[str, Any]],
    fixed_inputs: Optional[Dict[str, Any]],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    auth_role: Optional[_common_models.AuthRole],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):
Parameter Type
name str
workflow _annotated_workflow.WorkflowBase
default_inputs Optional[Dict[str, Any]]
fixed_inputs Optional[Dict[str, Any]]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
auth_role Optional[_common_models.AuthRole]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

execute()

def execute(
    kwargs,
):
Parameter Type
kwargs **kwargs

get_default_launch_plan()

def get_default_launch_plan(
    ctx: FlyteContext,
    workflow: _annotated_workflow.WorkflowBase,
):

Users should probably call the get_or_create function defined below instead. A default launch plan is the one that will just pick up whatever default values are defined in the workflow function signature (if any) and use the default auth information supplied during serialization, with no notifications or schedules.

Parameter Type
ctx FlyteContext
workflow _annotated_workflow.WorkflowBase

get_or_create()

def get_or_create(
    workflow: _annotated_workflow.WorkflowBase,
    name: Optional[str],
    default_inputs: Optional[Dict[str, Any]],
    fixed_inputs: Optional[Dict[str, Any]],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    auth_role: Optional[_common_models.AuthRole],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):

This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it will be used. If creating the default launch plan, none of the other arguments may be specified.

The resulting launch plan is also cached and if called again with the same name, the cached version is returned

Parameter Type
workflow _annotated_workflow.WorkflowBase
name Optional[str]
default_inputs Optional[Dict[str, Any]]
fixed_inputs Optional[Dict[str, Any]]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
auth_role Optional[_common_models.AuthRole]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

Please see the local_execute comments in the main task.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

unwrap_literal_map_and_execute()

def unwrap_literal_map_and_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Please see the implementation of the dispatch_execute function in the real task.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

Properties

Property Type Description
annotations
fixed_inputs
id
interface
labels
max_parallelism
name
notifications
overwrite_cache
parameters
python_interface
raw_output_data_config
reference
saved_inputs
schedule
security_context
should_auto_activate
trigger
workflow

flytekit.remote.remote.ReferenceSpec

def ReferenceSpec(
    template: flytekit.core.reference_entity.ReferenceTemplate,
):
Parameter Type
template flytekit.core.reference_entity.ReferenceTemplate

Properties

Property Type Description
template

flytekit.remote.remote.ReferenceTask

This is a reference task, the body of the function passed in through the constructor will never be used, only the signature of the function will be. The signature should also match the signature of the task you’re referencing, as stored by Flyte Admin, if not, workflows using this will break upon compilation.

def ReferenceTask(
    project: str,
    domain: str,
    name: str,
    version: str,
    inputs: Dict[str, type],
    outputs: Dict[str, Type],
):
Parameter Type
project str
domain str
name str
version str
inputs Dict[str, type]
outputs Dict[str, Type]

Methods

Method Description
compile() None
construct_node_metadata() None
dispatch_execute() This method translates Flyte’s Type system based input values and invokes the actual call to the executor
execute() None
find_lhs() None
get_config() Returns the task config as a serializable dictionary
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte
get_input_types() Returns the names and python types as a dictionary for the inputs of this task
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte
get_type_for_input_var() Returns the python type for an input variable by name
get_type_for_output_var() Returns the python type for the specified output variable by name
local_execute() Please see the local_execute comments in the main task
local_execution_mode() None
post_execute() Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
pre_execute() This is the method that will be invoked directly before executing the task method and before all the inputs
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution
unwrap_literal_map_and_execute() Please see the implementation of the dispatch_execute function in the real task

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

dispatch_execute()

def dispatch_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.

  • VoidPromise is returned in the case when the task itself declares no outputs.
  • Literal Map is returned when the task returns either one more outputs in the declaration. Individual outputs may be none
  • DynamicJobSpec is returned when a dynamic workflow is executed
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

execute()

def execute(
    kwargs,
):
Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_config()

def get_config(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_container()

def get_container(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_custom()

def get_custom(
    settings: flytekit.configuration.SerializationSettings,
):

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
):

Returns the python type for an input variable by name.

Parameter Type
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
):

Returns the python type for the specified output variable by name.

Parameter Type
k str
v typing.Any

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

Please see the local_execute comments in the main task.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

post_execute()

def post_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
    rval: typing.Any,
):

Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]
rval typing.Any

pre_execute()

def pre_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):

This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called

This should return either the same context of the mutated context

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

unwrap_literal_map_and_execute()

def unwrap_literal_map_and_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Please see the implementation of the dispatch_execute function in the real task.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

Properties

Property Type Description
deck_fields
disable_deck
docs
enable_deck
environment
id
instantiated_in
interface
lhs
location
metadata
name
python_interface
reference
security_context
task_config
task_type
task_type_version

flytekit.remote.remote.ReferenceWorkflow

A reference workflow is a pointer to a workflow that already exists on your Flyte installation. This object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface. If at registration time the interface provided causes an issue with compilation, an error will be returned.

def ReferenceWorkflow(
    project: str,
    domain: str,
    name: str,
    version: str,
    inputs: Dict[str, Type],
    outputs: Dict[str, Type],
):
Parameter Type
project str
domain str
name str
version str
inputs Dict[str, Type]
outputs Dict[str, Type]

Methods

Method Description
add() None
compile() None
construct_node_metadata() None
execute() None
find_lhs() None
get_all_tasks() Future proof method
load_task() Given the set of identifier keys, should return one Python Task or raise an error if not found
loader_args() This is responsible for turning an instance of a task into args that the load_task function can reconstitute
local_execute() Please see the local_execute comments in the main task
local_execution_mode() None
task_name() Overridable function that can optionally return a custom name for a given task
unwrap_literal_map_and_execute() Please see the implementation of the dispatch_execute function in the real task

add()

def add(
    t: flytekit.core.python_auto_container.PythonAutoContainerTask,
):
Parameter Type
t flytekit.core.python_auto_container.PythonAutoContainerTask

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

execute()

def execute(
    kwargs,
):
Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_all_tasks()

def get_all_tasks()

Future proof method. Just making it easy to access all tasks (Not required today as we auto register them)

load_task()

def load_task(
    loader_args: typing.List[str],
):

Given the set of identifier keys, should return one Python Task or raise an error if not found

Parameter Type
loader_args typing.List[str]

loader_args()

def loader_args(
    settings: flytekit.configuration.SerializationSettings,
    t: flytekit.core.python_auto_container.PythonAutoContainerTask,
):

This is responsible for turning an instance of a task into args that the load_task function can reconstitute.

Parameter Type
settings flytekit.configuration.SerializationSettings
t flytekit.core.python_auto_container.PythonAutoContainerTask

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

Please see the local_execute comments in the main task.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

task_name()

def task_name(
    t: PythonAutoContainerTask,
):

Overridable function that can optionally return a custom name for a given task

Parameter Type
t PythonAutoContainerTask

unwrap_literal_map_and_execute()

def unwrap_literal_map_and_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Please see the implementation of the dispatch_execute function in the real task.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

Properties

Property Type Description
default_options
docs
failure_node
function
id
instantiated_in
interface
lhs
location
name
nodes
on_failure
output_bindings
python_interface
reference
short_name
workflow_metadata
workflow_metadata_defaults

flytekit.remote.remote.RegistrationSkipped

RegistrationSkipped error is raised when trying to register an entity that is not registrable.

flytekit.remote.remote.RemoteEntity

Helper class that provides a standard way to create an ABC using inheritance.

def RemoteEntity(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

Methods

Method Description
compile() None
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition
execute() None
local_execute() None
local_execution_mode() None

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

execute()

def execute(
    kwargs,
):
Parameter Type
kwargs **kwargs

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

Properties

Property Type Description
id
name
python_interface

flytekit.remote.remote.ResolvedIdentifiers

def ResolvedIdentifiers(
    project: str,
    domain: str,
    name: str,
    version: str,
):
Parameter Type
project str
domain str
name str
version str

flytekit.remote.remote.ResourceType

flytekit.remote.remote.SerializationSettings

These settings are provided while serializing a workflow and task, before registration. This is required to get runtime information at serialization time, as well as some defaults.

Attributes: project (str): The project (if any) with which to register entities under. domain (str): The domain (if any) with which to register entities under. version (str): The version (if any) with which to register entities under. image_config (ImageConfig): The image config used to define task container images. env (Optional[Dict[str, str]]): Environment variables injected into task container definitions. flytekit_virtualenv_root (Optional[str]): During out of container serialize the absolute path of the flytekit virtualenv at serialization time won’t match the in-container value at execution time. This optional value is used to provide the in-container virtualenv path python_interpreter (Optional[str]): The python executable to use. This is used for spark tasks in out of container execution. entrypoint_settings (Optional[EntrypointSettings]): Information about the command, path and version of the entrypoint program. fast_serialization_settings (Optional[FastSerializationSettings]): If the code is being serialized so that it can be fast registered (and thus omit building a Docker image) this object contains additional parameters for serialization. source_root (Optional[str]): The root directory of the source code.

def SerializationSettings(
    image_config: ImageConfig,
    project: typing.Optional[str],
    domain: typing.Optional[str],
    version: typing.Optional[str],
    env: Optional[Dict[str, str]],
    git_repo: Optional[str],
    python_interpreter: str,
    flytekit_virtualenv_root: Optional[str],
    fast_serialization_settings: Optional[FastSerializationSettings],
    source_root: Optional[str],
):
Parameter Type
image_config ImageConfig
project typing.Optional[str]
domain typing.Optional[str]
version typing.Optional[str]
env Optional[Dict[str, str]]
git_repo Optional[str]
python_interpreter str
flytekit_virtualenv_root Optional[str]
fast_serialization_settings Optional[FastSerializationSettings]
source_root Optional[str]

Methods

Method Description
default_entrypoint_settings() Assumes the entrypoint is installed in a virtual-environment where the interpreter is
for_image() None
from_dict() None
from_json() None
from_transport() None
new_builder() Creates a ``SerializationSettings
schema() None
should_fast_serialize() Whether or not the serialization settings specify that entities should be serialized for fast registration
to_dict() None
to_json() None
venv_root_from_interpreter() Computes the path of the virtual environment root, based on the passed in python interpreter path
with_serialized_context() Use this method to create a new SerializationSettings that has an environment variable set with the SerializedContext

default_entrypoint_settings()

def default_entrypoint_settings(
    interpreter_path: str,
):

Assumes the entrypoint is installed in a virtual-environment where the interpreter is

Parameter Type
interpreter_path str

for_image()

def for_image(
    image: str,
    version: str,
    project: str,
    domain: str,
    python_interpreter_path: str,
):
Parameter Type
image str
version str
project str
domain str
python_interpreter_path str

from_dict()

def from_dict(
    kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
    infer_missing,
):
Parameter Type
kvs typing.Union[dict, list, str, int, float, bool, NoneType]
infer_missing

from_json()

def from_json(
    s: typing.Union[str, bytes, bytearray],
    parse_float,
    parse_int,
    parse_constant,
    infer_missing,
    kw,
):
Parameter Type
s typing.Union[str, bytes, bytearray]
parse_float
parse_int
parse_constant
infer_missing
kw

from_transport()

def from_transport(
    s: str,
):
Parameter Type
s str

new_builder()

def new_builder()

Creates a SerializationSettings.Builder that copies the existing serialization settings parameters and allows for customization.

schema()

def schema(
    infer_missing: bool,
    only,
    exclude,
    many: bool,
    context,
    load_only,
    dump_only,
    partial: bool,
    unknown,
):
Parameter Type
infer_missing bool
only
exclude
many bool
context
load_only
dump_only
partial bool
unknown

should_fast_serialize()

def should_fast_serialize()

Whether or not the serialization settings specify that entities should be serialized for fast registration.

to_dict()

def to_dict(
    encode_json,
):
Parameter Type
encode_json

to_json()

def to_json(
    skipkeys: bool,
    ensure_ascii: bool,
    check_circular: bool,
    allow_nan: bool,
    indent: typing.Union[int, str, NoneType],
    separators: typing.Tuple[str, str],
    default: typing.Callable,
    sort_keys: bool,
    kw,
):
Parameter Type
skipkeys bool
ensure_ascii bool
check_circular bool
allow_nan bool
indent typing.Union[int, str, NoneType]
separators typing.Tuple[str, str]
default typing.Callable
sort_keys bool
kw

venv_root_from_interpreter()

def venv_root_from_interpreter(
    interpreter_path: str,
):

Computes the path of the virtual environment root, based on the passed in python interpreter path for example /opt/venv/bin/python3 -> /opt/venv

Parameter Type
interpreter_path str

with_serialized_context()

def with_serialized_context()

Use this method to create a new SerializationSettings that has an environment variable set with the SerializedContext This is useful in transporting SerializedContext to serialized and registered tasks. The setting will be available in the env field with the key SERIALIZED_CONTEXT_ENV_VAR :return: A newly constructed SerializationSettings, or self, if it already has the serializationSettings

Properties

Property Type Description
entrypoint_settings
serialized_context

flytekit.remote.remote.Signal

A ProtocolMessage

flytekit.remote.remote.SignalIdentifier

def SignalIdentifier(
    signal_id: str,
    execution_id: flytekit.models.core.identifier.WorkflowExecutionIdentifier,
):
Parameter Type
signal_id str
execution_id flytekit.models.core.identifier.WorkflowExecutionIdentifier

Methods

Method Description
from_flyte_idl() None
serialize_to_string() None
short_string()
to_flyte_idl() None
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    proto: flyteidl.core.identifier_pb2.SignalIdentifier,
):
Parameter Type
proto flyteidl.core.identifier_pb2.SignalIdentifier

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
execution_id
is_empty
signal_id

flytekit.remote.remote.SignalListRequest

A ProtocolMessage

flytekit.remote.remote.SignalSetRequest

A ProtocolMessage

flytekit.remote.remote.Sort

def Sort(
    key,
    direction,
):
Parameter Type
key
direction

Methods

Method Description
from_flyte_idl()
from_python_std()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

from_python_std()

def from_python_std(
    text,
):
Parameter Type
text

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
direction
is_empty
key

flytekit.remote.remote.SynchronousFlyteClient

This is a low-level client that users can use to make direct gRPC service calls to the control plane. See the :std:doc:service spec <idl:protos/docs/service/index>. This is more user-friendly interface than the :py:class:raw client <flytekit.clients.raw.RawSynchronousFlyteClient> so users should try to use this class first. Create a client by ::

SynchronousFlyteClient(“your.domain:port”, insecure=True)

insecure should be True if your flyteadmin deployment doesn’t have SSL enabled

def SynchronousFlyteClient(
    cfg: PlatformConfig,
    kwargs,
):

Initializes a gRPC channel to the given Flyte Admin service.

Parameter Type
cfg PlatformConfig
kwargs **kwargs

Methods

Method Description
create_download_link() None
create_download_location() None
create_execution() This will create an execution for the given execution spec
create_launch_plan() This will create a launch plan definition in the Admin database
create_task() This will create a task definition in the Admin database
create_upload_location() Get a signed url to be used during fast registration
create_workflow() This will create a workflow definition in the Admin database
get_active_launch_plan() Retrieves the active launch plan entity given a named entity identifier (project, domain, name)
get_control_plane_version() Retrieve the Control Plane version from Flyteadmin
get_data() None
get_domains() This returns a list of domains
get_download_artifact_signed_url() Get a signed url for an artifact
get_download_signed_url() None
get_execution()
get_execution_data() Returns signed URLs to LiteralMap blobs for an execution’s inputs and outputs (when available)
get_execution_metrics() Returns metrics partitioning and categorizing the workflow execution time-series
get_node_execution()
get_node_execution_data() Returns signed URLs to LiteralMap blobs for a node execution’s inputs and outputs (when available)
get_project_domain_attributes() Fetches the custom attributes set for a project and domain combination
get_task_execution()
get_task_execution_data() Returns signed URLs to LiteralMap blobs for a node execution’s inputs and outputs (when available)
get_upload_signed_url() Get a signed url to be used during fast registration
get_workflow_attributes() Fetches the custom attributes set for a project, domain, and workflow combination
list_active_launch_plans_paginated() This returns a page of currently active launch plan meta-information for launch plans in a given project and
list_executions_paginated() This returns a page of executions in a given project and domain
list_launch_plan_ids_paginated() This returns a page of identifiers for the launch plans for a given project and domain
list_launch_plans_paginated() This returns a page of launch plan meta-information for launch plans in a given project and domain
list_matchable_attributes() Fetches all custom attributes for a resource type
list_node_executions() Get node executions associated with a given workflow execution
list_node_executions_for_task_paginated() This returns nodes spawned by a specific task execution
list_node_executions_paginated()
list_projects() This will return a list of the projects registered with the Flyte Admin Service
list_projects_paginated() This returns a page of projects
list_signals() This lists signals
list_task_executions_paginated()
list_task_ids_paginated() This returns a page of identifiers for the tasks for a given project and domain
list_tasks_paginated() This returns a page of task metadata for tasks in a given project and domain
list_workflow_ids_paginated() This returns a page of identifiers for the workflows for a given project and domain
list_workflows_paginated() This returns a page of workflow meta-information for workflows in a given project and domain
recover_execution() Recreates a previously-run workflow execution that will only start executing from the last known failure point
register_project() Registers a project
relaunch_execution()
set_signal() This sets a signal
terminate_execution()
update_launch_plan() Updates a launch plan
update_named_entity() Updates the metadata associated with a named entity
update_project() Update an existing project specified by id
update_project_domain_attributes() Sets custom attributes for a project and domain combination
update_workflow_attributes() Sets custom attributes for a project, domain, and workflow combination
with_root_certificate() None
def create_download_link(
    create_download_link_request: _dataproxy_pb2.CreateDownloadLinkRequest,
):
Parameter Type
create_download_link_request _dataproxy_pb2.CreateDownloadLinkRequest

create_download_location()

def create_download_location(
    create_download_location_request: _dataproxy_pb2.CreateDownloadLocationRequest,
):
Parameter Type
create_download_location_request _dataproxy_pb2.CreateDownloadLocationRequest

create_execution()

def create_execution(
    project,
    domain,
    name,
    execution_spec,
    inputs,
):

This will create an execution for the given execution spec.

Parameter Type
project
domain
name
execution_spec
inputs

create_launch_plan()

def create_launch_plan(
    launch_plan_identifer,
    launch_plan_spec,
):

This will create a launch plan definition in the Admin database. Once successful, the launch plan object can be retrieved via the client or viewed via the UI or command-line interfaces.

.. note ::

Overwrites are not supported so any request for a given project, domain, name, and version that exists in the database must match the existing definition exactly. This also means that as long as the request remains identical, calling this method multiple times will result in success.

Parameter Type
launch_plan_identifer
launch_plan_spec

create_task()

def create_task(
    task_identifer,
    task_spec,
):

This will create a task definition in the Admin database. Once successful, the task object can be retrieved via the client or viewed via the UI or command-line interfaces.

.. note ::

Overwrites are not supported so any request for a given project, domain, name, and version that exists in the database must match the existing definition exactly. Furthermore, as long as the request remains identical, calling this method multiple times will result in success.

Parameter Type
task_identifer
task_spec

create_upload_location()

def create_upload_location(
    create_upload_location_request: _dataproxy_pb2.CreateUploadLocationRequest,
):

Get a signed url to be used during fast registration

Parameter Type
create_upload_location_request _dataproxy_pb2.CreateUploadLocationRequest

create_workflow()

def create_workflow(
    workflow_identifier,
    workflow_spec,
):

This will create a workflow definition in the Admin database. Once successful, the workflow object can be retrieved via the client or viewed via the UI or command-line interfaces.

.. note ::

Overwrites are not supported so any request for a given project, domain, name, and version that exists in the database must match the existing definition exactly. Furthermore, as long as the request remains identical, calling this method multiple times will result in success.

Parameter Type
workflow_identifier
workflow_spec

get_active_launch_plan()

def get_active_launch_plan(
    identifier,
):

Retrieves the active launch plan entity given a named entity identifier (project, domain, name). Raises an error if no active launch plan exists.

Parameter Type
identifier

get_control_plane_version()

def get_control_plane_version()

Retrieve the Control Plane version from Flyteadmin.

This method calls Flyteadmin’s GetVersion API to obtain the current version information of the control plane. The retrieved version can be used to enable or disable specific features based on the Flyteadmin version.

Returns: str: The version string of the control plane.

get_data()

def get_data(
    flyte_uri: str,
):
Parameter Type
flyte_uri str

get_domains()

def get_domains()

This returns a list of domains.

get_download_artifact_signed_url()

def get_download_artifact_signed_url(
    node_id: str,
    project: str,
    domain: str,
    name: str,
    artifact_type: <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x105ae79b0>,
    expires_in: datetime.timedelta,
):

Get a signed url for an artifact.

Parameter Type
node_id str
project str
domain str
name str
artifact_type <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x105ae79b0>
expires_in datetime.timedelta

get_download_signed_url()

def get_download_signed_url(
    native_url: str,
    expires_in: datetime.timedelta,
):
Parameter Type
native_url str
expires_in datetime.timedelta

get_execution()

def get_execution(
    id,
):
Parameter Type
id

get_execution_data()

def get_execution_data(
    id,
):

Returns signed URLs to LiteralMap blobs for an execution’s inputs and outputs (when available).

Parameter Type
id

get_execution_metrics()

def get_execution_metrics(
    id,
    depth,
):

Returns metrics partitioning and categorizing the workflow execution time-series.

Parameter Type
id
depth

get_node_execution()

def get_node_execution(
    node_execution_identifier,
):
Parameter Type
node_execution_identifier

get_node_execution_data()

def get_node_execution_data(
    node_execution_identifier,
):

Returns signed URLs to LiteralMap blobs for a node execution’s inputs and outputs (when available).

Parameter Type
node_execution_identifier

get_project_domain_attributes()

def get_project_domain_attributes(
    project,
    domain,
    resource_type,
):

Fetches the custom attributes set for a project and domain combination.

Parameter Type
project
domain
resource_type

get_task_execution()

def get_task_execution(
    id,
):
Parameter Type
id

get_task_execution_data()

def get_task_execution_data(
    task_execution_identifier,
):

Returns signed URLs to LiteralMap blobs for a node execution’s inputs and outputs (when available).

Parameter Type
task_execution_identifier

get_upload_signed_url()

def get_upload_signed_url(
    project: str,
    domain: str,
    content_md5: typing.Optional[bytes],
    filename: typing.Optional[str],
    expires_in: typing.Optional[datetime.timedelta],
    filename_root: typing.Optional[str],
    add_content_md5_metadata: bool,
):

Get a signed url to be used during fast registration

Parameter Type
project str
domain str
content_md5 typing.Optional[bytes]
filename typing.Optional[str]
expires_in typing.Optional[datetime.timedelta]
filename_root typing.Optional[str]
add_content_md5_metadata bool

get_workflow_attributes()

def get_workflow_attributes(
    project,
    domain,
    workflow,
    resource_type,
):

Fetches the custom attributes set for a project, domain, and workflow combination.

Parameter Type
project
domain
workflow
resource_type

list_active_launch_plans_paginated()

def list_active_launch_plans_paginated(
    project,
    domain,
    limit,
    token,
    sort_by,
):

This returns a page of currently active launch plan meta-information for launch plans in a given project and domain.

.. note ::

This is a paginated API. Use the token field in the request to specify a page offset token. The user of the API is responsible for providing this token.

.. note ::

If entries are added to the database between requests for different pages, it is possible to receive entries on the second page that also appeared on the first.

Parameter Type
project
domain
limit
token
sort_by

list_executions_paginated()

def list_executions_paginated(
    project,
    domain,
    limit,
    token,
    filters,
    sort_by,
):

This returns a page of executions in a given project and domain.

.. note ::

This is a paginated API. Use the token field in the request to specify a page offset token. The user of the API is responsible for providing this token.

.. note ::

If entries are added to the database between requests for different pages, it is possible to receive entries on the second page that also appeared on the first.

Parameter Type
project
domain
limit
token
filters
sort_by

list_launch_plan_ids_paginated()

def list_launch_plan_ids_paginated(
    project,
    domain,
    limit,
    token,
    sort_by,
):

This returns a page of identifiers for the launch plans for a given project and domain. Filters can also be specified.

.. note ::

This is a paginated API. Use the token field in the request to specify a page offset token. The user of the API is responsible for providing this token.

.. note ::

If entries are added to the database between requests for different pages, it is possible to receive entries on the second page that also appeared on the first.

Parameter Type
project
domain
limit
token
sort_by

list_launch_plans_paginated()

def list_launch_plans_paginated(
    identifier,
    limit,
    token,
    filters,
    sort_by,
):

This returns a page of launch plan meta-information for launch plans in a given project and domain. Optionally, specifying a name will limit the results to only workflows with that name in the given project and domain.

.. note ::

This is a paginated API. Use the token field in the request to specify a page offset token. The user of the API is responsible for providing this token.

.. note ::

If entries are added to the database between requests for different pages, it is possible to receive entries on the second page that also appeared on the first.

Parameter Type
identifier
limit
token
filters
sort_by

list_matchable_attributes()

def list_matchable_attributes(
    resource_type,
):

Fetches all custom attributes for a resource type.

Parameter Type
resource_type

list_node_executions()

def list_node_executions(
    workflow_execution_identifier,
    limit: int,
    token: typing.Optional[str],
    filters: typing.List[flytekit.models.filters.Filter],
    sort_by: flytekit.models.admin.common.Sort,
    unique_parent_id: str,
):

Get node executions associated with a given workflow execution.

Parameter Type
workflow_execution_identifier
limit int
token typing.Optional[str]
filters typing.List[flytekit.models.filters.Filter]
sort_by flytekit.models.admin.common.Sort
unique_parent_id str

list_node_executions_for_task_paginated()

def list_node_executions_for_task_paginated(
    task_execution_identifier,
    limit,
    token,
    filters,
    sort_by,
):

This returns nodes spawned by a specific task execution. This is generally from things like dynamic tasks.

Parameter Type
task_execution_identifier
limit
token
filters
sort_by

list_node_executions_paginated()

def list_node_executions_paginated(
    node_execution_list_request,
):
Parameter Type
node_execution_list_request

list_projects()

def list_projects(
    project_list_request: typing.Optional[ProjectListRequest],
):

This will return a list of the projects registered with the Flyte Admin Service

Parameter Type
project_list_request typing.Optional[ProjectListRequest]

list_projects_paginated()

def list_projects_paginated(
    limit,
    token,
    filters,
    sort_by,
):

This returns a page of projects.

.. note ::

This is a paginated API. Use the token field in the request to specify a page offset token. The user of the API is responsible for providing this token.

.. note ::

If entries are added to the database between requests for different pages, it is possible to receive entries on the second page that also appeared on the first.

Parameter Type
limit
token
filters
sort_by

list_signals()

def list_signals(
    signal_list_request: SignalListRequest,
):

This lists signals

Parameter Type
signal_list_request SignalListRequest

list_task_executions_paginated()

def list_task_executions_paginated(
    node_execution_identifier,
    limit,
    token,
    filters,
    sort_by,
):
Parameter Type
node_execution_identifier
limit
token
filters
sort_by

list_task_ids_paginated()

def list_task_ids_paginated(
    project,
    domain,
    limit,
    token,
    sort_by,
):

This returns a page of identifiers for the tasks for a given project and domain. Filters can also be specified.

.. note ::

This is a paginated API. Use the token field in the request to specify a page offset token. The user of the API is responsible for providing this token.

.. note ::

If entries are added to the database between requests for different pages, it is possible to receive entries on the second page that also appeared on the first.

Parameter Type
project
domain
limit
token
sort_by

list_tasks_paginated()

def list_tasks_paginated(
    identifier,
    limit,
    token,
    filters,
    sort_by,
):

This returns a page of task metadata for tasks in a given project and domain. Optionally, specifying a name will limit the results to only tasks with that name in the given project and domain.

.. note ::

This is a paginated API. Use the token field in the request to specify a page offset token. The user of the API is responsible for providing this token.

.. note ::

If entries are added to the database between requests for different pages, it is possible to receive entries on the second page that also appeared on the first.

Parameter Type
identifier
limit
token
filters
sort_by

list_workflow_ids_paginated()

def list_workflow_ids_paginated(
    project,
    domain,
    limit,
    token,
    sort_by,
):

This returns a page of identifiers for the workflows for a given project and domain. Filters can also be specified.

.. note ::

This is a paginated API. Use the token field in the request to specify a page offset token. The user of the API is responsible for providing this token.

.. note ::

If entries are added to the database between requests for different pages, it is possible to receive entries on the second page that also appeared on the first.

Parameter Type
project
domain
limit
token
sort_by

list_workflows_paginated()

def list_workflows_paginated(
    identifier,
    limit,
    token,
    filters,
    sort_by,
):

This returns a page of workflow meta-information for workflows in a given project and domain. Optionally, specifying a name will limit the results to only workflows with that name in the given project and domain.

.. note ::

This is a paginated API. Use the token field in the request to specify a page offset token. The user of the API is responsible for providing this token.

.. note ::

If entries are added to the database between requests for different pages, it is possible to receive entries on the second page that also appeared on the first.

Parameter Type
identifier
limit
token
filters
sort_by

recover_execution()

def recover_execution(
    id,
    name: str,
):

Recreates a previously-run workflow execution that will only start executing from the last known failure point.

Parameter Type
id
name str

register_project()

def register_project(
    project,
):

Registers a project.

Parameter Type
project

relaunch_execution()

def relaunch_execution(
    id,
    name,
):
Parameter Type
id
name

set_signal()

def set_signal(
    signal_set_request: SignalSetRequest,
):

This sets a signal

Parameter Type
signal_set_request SignalSetRequest

terminate_execution()

def terminate_execution(
    id,
    cause,
):
Parameter Type
id
cause

update_launch_plan()

def update_launch_plan(
    id,
    state,
):

Updates a launch plan. Currently, this can only be used to update a given launch plan’s state (ACTIVE v. INACTIVE) for schedules. If a launch plan with a given project, domain, and name is set to ACTIVE, then any other launch plan with the same project, domain, and name that was set to ACTIVE will be switched to INACTIVE in one transaction.

Parameter Type
id
state

update_named_entity()

def update_named_entity(
    resource_type,
    id,
    metadata,
):

Updates the metadata associated with a named entity. A named entity is designated a resource, e.g. a workflow, task or launch plan specified by {project, domain, name} across all versions of the resource.

Parameter Type
resource_type
id
metadata

update_project()

def update_project(
    project,
):

Update an existing project specified by id.

Parameter Type
project

update_project_domain_attributes()

def update_project_domain_attributes(
    project,
    domain,
    matching_attributes,
):

Sets custom attributes for a project and domain combination.

Parameter Type
project
domain
matching_attributes

update_workflow_attributes()

def update_workflow_attributes(
    project,
    domain,
    workflow,
    matching_attributes,
):

Sets custom attributes for a project, domain, and workflow combination.

Parameter Type
project
domain
workflow
matching_attributes

with_root_certificate()

def with_root_certificate(
    cfg: PlatformConfig,
    root_cert_file: str,
):
Parameter Type
cfg PlatformConfig
root_cert_file str

Properties

Property Type Description
raw
url

flytekit.remote.remote.TextColumn

A column containing text.

def TextColumn(
    text_format: str,
    style: typing.Union[str, ForwardRef('Style')],
    justify: typing.Literal['default', 'left', 'center', 'right', 'full'],
    markup: bool,
    highlighter: typing.Optional[rich.highlighter.Highlighter],
    table_column: typing.Optional[rich.table.Column],
):
Parameter Type
text_format str
style typing.Union[str, ForwardRef('Style')]
justify typing.Literal['default', 'left', 'center', 'right', 'full']
markup bool
highlighter typing.Optional[rich.highlighter.Highlighter]
table_column typing.Optional[rich.table.Column]

Methods

Method Description
get_table_column() Get a table column, used to build tasks table
render() Should return a renderable object

get_table_column()

def get_table_column()

Get a table column, used to build tasks table.

render()

def render(
    task: Task,
):

Should return a renderable object.

Parameter Type
task Task

flytekit.remote.remote.TimeElapsedColumn

Renders time elapsed.

def TimeElapsedColumn(
    table_column: typing.Optional[rich.table.Column],
):
Parameter Type
table_column typing.Optional[rich.table.Column]

Methods

Method Description
get_table_column() Get a table column, used to build tasks table
render() Show time elapsed

get_table_column()

def get_table_column()

Get a table column, used to build tasks table.

render()

def render(
    task: Task,
):

Show time elapsed.

Parameter Type
task Task

flytekit.remote.remote.TypeEngine

Core Extensible TypeEngine of Flytekit. This should be used to extend the capabilities of FlyteKits type system. Users can implement their own TypeTransformers and register them with the TypeEngine. This will allow special handling of user objects

Methods

Method Description
async_to_literal() Converts a python value of a given type and expected LiteralType into a resolved Literal value
async_to_python_value() None
calculate_hash() None
dict_to_literal_map() None
dict_to_literal_map_pb() None
get_available_transformers() Returns all python types for which transformers are available
get_transformer() Implements a recursive search for the transformer
guess_python_type() Transforms a flyte-specific LiteralType to a regular python value
guess_python_types() Transforms a dictionary of flyte-specific Variable objects to a dictionary of regular python values
lazy_import_transformers() Only load the transformers if needed
literal_map_to_kwargs() None
named_tuple_to_variable_map() Converts a python-native NamedTuple to a flyte-specific VariableMap of named literals
register() This should be used for all types that respond with the right type annotation when you use type(
register_additional_type() None
register_restricted_type() None
to_html() None
to_literal() The current dance is because we are allowing users to call from an async function, this synchronous
to_literal_checks() None
to_literal_type() Converts a python type into a flyte specific LiteralType
to_python_value() Converts a Literal value with an expected python type into a python value
unwrap_offloaded_literal() None

async_to_literal()

def async_to_literal(
    ctx: FlyteContext,
    python_val: typing.Any,
    python_type: Type[T],
    expected: LiteralType,
):

Converts a python value of a given type and expected LiteralType into a resolved Literal value.

Parameter Type
ctx FlyteContext
python_val typing.Any
python_type Type[T]
expected LiteralType

async_to_python_value()

def async_to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: Type,
):
Parameter Type
ctx FlyteContext
lv Literal
expected_python_type Type

calculate_hash()

def calculate_hash(
    python_val: typing.Any,
    python_type: Type[T],
):
Parameter Type
python_val typing.Any
python_type Type[T]

dict_to_literal_map()

def dict_to_literal_map(
    ctx: FlyteContext,
    d: typing.Dict[str, typing.Any],
    type_hints: Optional[typing.Dict[str, type]],
):
Parameter Type
ctx FlyteContext
d typing.Dict[str, typing.Any]
type_hints Optional[typing.Dict[str, type]]

dict_to_literal_map_pb()

def dict_to_literal_map_pb(
    ctx: FlyteContext,
    d: typing.Dict[str, typing.Any],
    type_hints: Optional[typing.Dict[str, type]],
):
Parameter Type
ctx FlyteContext
d typing.Dict[str, typing.Any]
type_hints Optional[typing.Dict[str, type]]

get_available_transformers()

def get_available_transformers()

Returns all python types for which transformers are available

get_transformer()

def get_transformer(
    python_type: Type,
):

Implements a recursive search for the transformer.

Parameter Type
python_type Type

guess_python_type()

def guess_python_type(
    flyte_type: LiteralType,
):

Transforms a flyte-specific LiteralType to a regular python value.

Parameter Type
flyte_type LiteralType

guess_python_types()

def guess_python_types(
    flyte_variable_dict: typing.Dict[str, _interface_models.Variable],
):

Transforms a dictionary of flyte-specific Variable objects to a dictionary of regular python values.

Parameter Type
flyte_variable_dict typing.Dict[str, _interface_models.Variable]

lazy_import_transformers()

def lazy_import_transformers()

Only load the transformers if needed.

literal_map_to_kwargs()

def literal_map_to_kwargs(
    ctx: FlyteContext,
    lm: LiteralMap,
    python_types: typing.Optional[typing.Dict[str, type]],
    literal_types: typing.Optional[typing.Dict[str, _interface_models.Variable]],
):
Parameter Type
ctx FlyteContext
lm LiteralMap
python_types typing.Optional[typing.Dict[str, type]]
literal_types typing.Optional[typing.Dict[str, _interface_models.Variable]]

named_tuple_to_variable_map()

def named_tuple_to_variable_map(
    t: typing.NamedTuple,
):

Converts a python-native NamedTuple to a flyte-specific VariableMap of named literals.

Parameter Type
t typing.NamedTuple

register()

def register(
    transformer: TypeTransformer,
    additional_types: Optional[typing.List[Type]],
):

This should be used for all types that respond with the right type annotation when you use type(…) function

Parameter Type
transformer TypeTransformer
additional_types Optional[typing.List[Type]]

register_additional_type()

def register_additional_type(
    transformer: TypeTransformer[T],
    additional_type: Type[T],
    override,
):
Parameter Type
transformer TypeTransformer[T]
additional_type Type[T]
override

register_restricted_type()

def register_restricted_type(
    name: str,
    type: Type[T],
):
Parameter Type
name str
type Type[T]

to_html()

def to_html(
    ctx: FlyteContext,
    python_val: typing.Any,
    expected_python_type: Type[typing.Any],
):
Parameter Type
ctx FlyteContext
python_val typing.Any
expected_python_type Type[typing.Any]

to_literal()

def to_literal(
    ctx: FlyteContext,
    python_val: typing.Any,
    python_type: Type[T],
    expected: LiteralType,
):

The current dance is because we are allowing users to call from an async function, this synchronous to_literal function, and allowing this to_literal function, to then invoke yet another async function, namely an async transformer.

Parameter Type
ctx FlyteContext
python_val typing.Any
python_type Type[T]
expected LiteralType

to_literal_checks()

def to_literal_checks(
    python_val: typing.Any,
    python_type: Type[T],
    expected: LiteralType,
):
Parameter Type
python_val typing.Any
python_type Type[T]
expected LiteralType

to_literal_type()

def to_literal_type(
    python_type: Type[T],
):

Converts a python type into a flyte specific LiteralType

Parameter Type
python_type Type[T]

to_python_value()

def to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: Type,
):

Converts a Literal value with an expected python type into a python value.

Parameter Type
ctx FlyteContext
lv Literal
expected_python_type Type

unwrap_offloaded_literal()

def unwrap_offloaded_literal(
    ctx: FlyteContext,
    lv: Literal,
):
Parameter Type
ctx FlyteContext
lv Literal

flytekit.remote.remote.TypedInterface

def TypedInterface(
    inputs,
    outputs,
):

Please note that this model is slightly incorrect, but is more user-friendly. The underlying inputs and outputs are represented directly as Python dicts, rather than going through the additional VariableMap layer.

Parameter Type
inputs
outputs

Methods

Method Description
from_flyte_idl()
promote_from_model()
serialize_to_string() None
short_string()
to_flyte_idl() None
transform_interface_to_list() Takes a single task interface and interpolates it to an array interface - to allow performing distributed
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    proto: flyteidl.core.interface_pb2.TypedInterface,
):
Parameter Type
proto flyteidl.core.interface_pb2.TypedInterface

promote_from_model()

def promote_from_model(
    model,
):
Parameter Type
model

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

transform_interface_to_list()

def transform_interface_to_list(
    bound_inputs: typing.Set[str],
    excluded_inputs: typing.Set[str],
):

Takes a single task interface and interpolates it to an array interface - to allow performing distributed python map like functions

Parameter Type
bound_inputs typing.Set[str]
excluded_inputs typing.Set[str]

verbose_string()

def verbose_string()

Properties

Property Type Description
inputs
is_empty
outputs

flytekit.remote.remote.WorkflowBase

def WorkflowBase(
    name: str,
    workflow_metadata: WorkflowMetadata,
    workflow_metadata_defaults: WorkflowMetadataDefaults,
    python_interface: Interface,
    on_failure: Optional[Union[WorkflowBase, Task]],
    docs: Optional[Documentation],
    default_options: Optional[Options],
    kwargs,
):
Parameter Type
name str
workflow_metadata WorkflowMetadata
workflow_metadata_defaults WorkflowMetadataDefaults
python_interface Interface
on_failure Optional[Union[WorkflowBase, Task]]
docs Optional[Documentation]
default_options Optional[Options]
kwargs **kwargs

Methods

Method Description
compile() None
construct_node_metadata() None
execute() None
local_execute() None
local_execution_mode() None

compile()

def compile(
    kwargs,
):
Parameter Type
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

execute()

def execute(
    kwargs,
):
Parameter Type
kwargs **kwargs

local_execute()

def local_execute(
    ctx: FlyteContext,
    kwargs,
):
Parameter Type
ctx FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

Properties

Property Type Description
default_options
docs
failure_node
interface
name
nodes
on_failure
output_bindings
python_interface
short_name
workflow_metadata
workflow_metadata_defaults

flytekit.remote.remote.WorkflowExecutionGetDataResponse

Currently, node, task, and workflow execution all have the same get data response. So we’ll create this common superclass to reduce code duplication until things diverge in the future.

def WorkflowExecutionGetDataResponse(
    inputs,
    outputs,
    full_inputs,
    full_outputs,
):
Parameter Type
inputs
outputs
full_inputs
full_outputs

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    pb2_object,
):
Parameter Type
pb2_object

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
full_inputs
full_outputs
inputs
is_empty
outputs

flytekit.remote.remote.WorkflowExecutionIdentifier

def WorkflowExecutionIdentifier(
    project,
    domain,
    name,
):
Parameter Type
project
domain
name

Methods

Method Description
from_flyte_idl()
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

from_flyte_idl()

def from_flyte_idl(
    p,
):
Parameter Type
p

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
domain
is_empty
name
project

flytekit.remote.remote.WorkflowFailurePolicy

Defines the behavior for a workflow execution in the case of an observed node execution failure. By default, a workflow execution will immediately enter a failed state if a component node fails.

flytekit.remote.remote.datetime

datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]])

The year, month and day arguments are required. tzinfo may be None, or an instance of a tzinfo subclass. The remaining arguments may be ints.

flytekit.remote.remote.timedelta

Difference between two datetime values.

timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)

All arguments are optional and default to 0. Arguments may be integers or floats, and may be positive or negative.