flytekit.core.task
Directory
Classes
| Class | Description |
|---|---|
Echo |
Base Class for all Tasks with a Python native Interface. |
ReferenceTask |
This is a reference task, the body of the function passed in through the constructor will never be used, only the. |
TaskPlugins |
This is the TaskPlugins factory for task types that are derivative of PythonFunctionTask. |
Methods
| Method | Description |
|---|---|
decorate_function() |
Decorates the task with additional functionality if necessary. |
eager() |
Eager workflow decorator. |
reference_task() |
A reference task is a pointer to a task that already exists on your Flyte installation. |
task() |
This is the core decorator to use for any task type in flytekit. |
Variables
| Property | Type | Description |
|---|---|---|
FLYTE_ENABLE_VSCODE_KEY |
str |
|
FuncOut |
TypeVar |
|
P |
ParamSpec |
|
T |
TypeVar |
Methods
decorate_function()
def decorate_function(
fn: Callable[P, Any],
) -> Callable[P, Any]Decorates the task with additional functionality if necessary.
| Parameter | Type | Description |
|---|---|---|
fn |
Callable[P, Any] |
python function to decorate. :return: a decorated python function. |
eager()
def eager(
_fn,
args,
kwargs,
) -> Union[EagerAsyncPythonFunctionTask, partial]Eager workflow decorator.
This type of task will execute all Flyte entities within it eagerly, meaning that all python constructs can be
used inside of an @eager-decorated function. This is because eager workflows use a
flytekit.remote.remote.FlyteRemote object to kick off executions when a flyte entity needs to produce a
value. Basically think about it as: every Flyte entity that is called(), the stack frame is an execution with its
own Flyte URL. Results (or the error) are fetched when the execution is finished.
For example:
from flytekit import task, eager
@task
def add_one(x: int) -> int:
return x + 1
@task
def double(x: int) -> int:
return x * 2
@eager
async def eager_workflow(x: int) -> int:
out = add_one(x=x)
return double(x=out)
# run locally with asyncio
if __name__ == "__main__":
import asyncio
result = asyncio.run(eager_workflow(x=1))
print(f"Result: {result}") # "Result: 4"Unlike workflows (dynamic), eager workflows are not compiled into a workflow spec, but
uses python’s
async capabilities to execute flyte entities.
Eager workflows only support @task, @workflow, and @eager entities. Conditionals are not supported, use a
plain Python if statement instead.
A client_secret_group and client_secret_key is needed for authenticating via
flytekit.remote.remote.FlyteRemote using the client_credentials authentication, which is
configured via flytekit.configuration.PlatformConfig.
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
@eager(
remote=FlyteRemote(config=Config.auto(config_file="config.yaml")),
client_secret_group="my_client_secret_group",
client_secret_key="my_client_secret_key",
)
async def eager_workflow(x: int) -> int:
out = await add_one(x)
return await double(one)
```
Where ``config.yaml`` contains is a flytectl-compatible config file.
For more details, see [`here`](https://docs.flyte.org/en/latest/flytectl/overview.html#configuration).
When using a sandbox cluster started with ``flytectl demo start``, however, the ``client_secret_group``
and ``client_secret_key`` are not needed, :
```python
@eager
async def eager_workflow(x: int) -> int:
...| Parameter | Type | Description |
|---|---|---|
_fn |
||
args |
*args |
|
kwargs |
**kwargs |
reference_task()
def reference_task(
project: str,
domain: str,
name: str,
version: str,
) -> Callable[[Callable[..., Any]], ReferenceTask]A reference task is a pointer to a task that already exists on your Flyte installation. This object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface. If at registration time the interface provided causes an issue with compilation, an error will be returned.
Example:
@reference_task(
project="flytesnacks",
domain="development",
name="recipes.aaa.simple.join_strings",
version="553018f39e519bdb2597b652639c30ce16b99c79",
)
def ref_t1(a: typing.List[str]) -> str:
'''
The empty function acts as a convenient skeleton to make it intuitive to call/reference this task from workflows.
The interface of the task must match that of the remote task. Otherwise, remote compilation of the workflow will
fail.
'''
return "hello"| Parameter | Type | Description |
|---|---|---|
project |
str |
|
domain |
str |
|
name |
str |
|
version |
str |
task()
def task(
_task_function: Optional[Callable[P, FuncOut]],
task_config: Optional[T],
cache: Union[bool, Cache],
retries: int,
interruptible: Optional[bool],
deprecated: str,
timeout: Union[datetime.timedelta, int],
container_image: Optional[Union[str, ImageSpec]],
environment: Optional[Dict[str, str]],
requests: Optional[Resources],
limits: Optional[Resources],
secret_requests: Optional[List[Secret]],
execution_mode: PythonFunctionTask.ExecutionBehavior,
node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
task_resolver: Optional[TaskResolverMixin],
docs: Optional[Documentation],
disable_deck: Optional[bool],
enable_deck: Optional[bool],
deck_fields: Optional[Tuple[DeckField, ...]],
pod_template: Optional['PodTemplate'],
pod_template_name: Optional[str],
accelerator: Optional[BaseAccelerator],
pickle_untyped: bool,
shared_memory: Optional[Union[L[True], str]],
resources: Optional[Resources],
labels: Optional[dict[str, str]],
annotations: Optional[dict[str, str]],
kwargs,
) -> Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionTask[T]], PythonFunctionTask[T]]This is the core decorator to use for any task type in flytekit.
Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties
- Versioned (usually tied to the git revision SHA1)
- Strong interfaces (specified inputs and outputs)
- Declarative
- Independently executable
- Unit testable
For a simple python task,
@task
def my_task(x: int, y: typing.Dict[str, str]) -> str:
...For specific task types
@task(task_config=Spark(), retries=3)
def my_task(x: int, y: typing.Dict[str, str]) -> str:
...Please see some cookbook :std:ref:task examples <cookbook:tasks> for additional information.
| Parameter | Type | Description |
|---|---|---|
_task_function |
Optional[Callable[P, FuncOut]] |
This argument is implicitly passed and represents the decorated function |
task_config |
Optional[T] |
This argument provides configuration for a specific task types. Please refer to the plugins documentation for the right object to use. |
cache |
Union[bool, Cache] |
Boolean or Cache that indicates how caching is configured. :deprecated param cache_serialize: (deprecated - please use Cache) Boolean that indicates if identical (ie. same inputs) instances of this task should be executed in serial when caching is enabled. This means that given multiple concurrent executions over identical inputs, only a single instance executes and the rest wait to reuse the cached results. This parameter does nothing without also setting the cache parameter. :deprecated param cache_version: (deprecated - please use Cache) Cache version to use. Changes to the task signature will automatically trigger a cache miss, but you can always manually update this field as well to force a cache miss. You should also manually bump this version if the function body/business logic has changed, but the signature hasn’t. :deprecated param cache_ignore_input_vars: (deprecated - please use Cache) Input variables that should not be included when calculating hash for cache. |
retries |
int |
Number of times to retry this task during a workflow execution. |
interruptible |
Optional[bool] |
[Optional] Boolean that indicates that this task can be interrupted and/or scheduled on nodes with lower QoS guarantees. This will directly reduce the $/execution cost associated, at the cost of performance penalties due to potential interruptions. Requires additional Flyte platform level configuration. If no value is provided, the task will inherit this attribute from its workflow, as follows: No values set for interruptible at the task or workflow level - task is not interruptible Task has interruptible=True, but workflow has no value set - task is interruptible Workflow has interruptible=True, but task has no value set - task is interruptible Workflow has interruptible=False, but task has interruptible=True - task is interruptible Workflow has interruptible=True, but task has interruptible=False - task is not interruptible |
deprecated |
str |
A string that can be used to provide a warning message for deprecated task. Absence / empty str indicates that the task is active and not deprecated |
timeout |
Union[datetime.timedelta, int] |
the max amount of time for which one execution of this task should be executed for. The execution will be terminated if the runtime exceeds the given timeout (approximately). |
container_image |
Optional[Union[str, ImageSpec]] |
By default the configured FLYTE_INTERNAL_IMAGE is used for every task. This directive can be used to provide an alternate image for a specific task. This is useful for the cases in which images bloat because of various dependencies and a dependency is only required for this or a set of tasks, and they vary from the default. python # Use default image name `fqn` and alter the tag to `tag-{{default.tag}}` tag of the default image # with a prefix. In this case, it is assumed that the image like # flytecookbook:tag-gitsha is published alongwith the default of flytecookbook:gitsha @task(container_image='{{.images.default.fqn}}:tag-{{images.default.tag}}') def foo(): ... # Refer to configurations to configure fqns for other images besides default. In this case it will # lookup for an image named xyz @task(container_image='{{.images.xyz.fqn}}:{{images.default.tag}}') def foo2(): ... |
environment |
Optional[Dict[str, str]] |
Environment variables that should be added for this tasks execution |
requests |
Optional[Resources] |
Specify compute resource requests for your task. For Pod-plugin tasks, these values will apply only to the primary container. |
limits |
Optional[Resources] |
Compute limits. Specify compute resource limits for your task. For Pod-plugin tasks, these values will apply only to the primary container. For more information, please see {{< py_class_ref flytekit.Resources >}}. |
secret_requests |
Optional[List[Secret]] |
Keys that can identify the secrets supplied at runtime. Ideally the secret keys should also be semi-descriptive. The key values will be available from runtime, if the backend is configured to provide secrets and if secrets are available in the configured secrets store. Possible options for secret stores are - Vault, Confidant, Kube secrets, AWS KMS etc Refer to {{< py_class_ref Secret >}} to understand how to specify the request for a secret. It may change based on the backend provider. > [!NOTE] > During local execution, the secrets will be pulled from the local environment variables with the format {GROUP}_{GROUP_VERSION}_{KEY}, where all the characters are capitalized and the prefix is not used. |
execution_mode |
PythonFunctionTask.ExecutionBehavior |
This is mainly for internal use. Please ignore. It is filled in automatically. |
node_dependency_hints |
Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]] |
A list of tasks, launchplans, or workflows that this task depends on. This is only for dynamic tasks/workflows, where flyte cannot automatically determine the dependencies prior to runtime. Even on dynamic tasks this is optional, but in some scenarios it will make registering the workflow easier, because it allows registration to be done the same as for static tasks/workflows. For example this is useful to run launchplans dynamically, because launchplans must be registered on flyteadmin before they can be run. Tasks and workflows do not have this requirement. python @workflow def workflow0(): ... launchplan0 = LaunchPlan.get_or_create(workflow0) # Specify node_dependency_hints so that launchplan0 will be registered on flyteadmin, despite this being a # dynamic task. @dynamic(node_dependency_hints=[launchplan0]) def launch_dynamically(): # To run a sub-launchplan it must have previously been registered on flyteadmin. return [launchplan0]*10 |
task_resolver |
Optional[TaskResolverMixin] |
Provide a custom task resolver. |
docs |
Optional[Documentation] |
Documentation about this task |
disable_deck |
Optional[bool] |
If true, this task will not output deck html file |
enable_deck |
Optional[bool] |
If true, this task will output deck html file |
deck_fields |
Optional[Tuple[DeckField, ...]] |
If specified and enble_deck is True, this task will output deck html file with the fields specified in the tuple |
pod_template |
Optional['PodTemplate'] |
Custom PodTemplate for this task. |
pod_template_name |
Optional[str] |
The name of the existing PodTemplate resource which will be used in this task. |
accelerator |
Optional[BaseAccelerator] |
The accelerator to use for this task. |
pickle_untyped |
bool |
Boolean that indicates if the task allows unspecified data types. |
shared_memory |
Optional[Union[L[True], str]] |
If True, then shared memory will be attached to the container where the size is equal to the allocated memory. If int, then the shared memory is set to that size. |
resources |
Optional[Resources] |
Specify both the request and the limit. When the value is set to a tuple or list, the first value is the request and the second value is the limit. If the value is a single value, then both the requests and limit is set to that value. For example, the Resource(cpu=("1", "2"), mem="1Gi") will set the cpu request to 1, cpu limit to 2, and mem request to 1Gi. |
labels |
Optional[dict[str, str]] |
Labels to be applied to the task resource. |
annotations |
Optional[dict[str, str]] |
Annotations to be applied to the task resource. |
kwargs |
**kwargs |
flytekit.core.task.Echo
Base Class for all Tasks with a Python native Interface. This should be directly used for task types, that do
not have a python function to be executed. Otherwise refer to flytekit.PythonFunctionTask.
class Echo(
name: str,
inputs: Optional[Dict[str, Type]],
kwargs,
)A task that simply echoes the inputs back to the user. The task’s inputs and outputs interface are the same.
FlytePropeller uses echo plugin to handle this task, and it won’t create a pod for this task. It will simply pass the inputs to the outputs. https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/plugins/testing/echo.go
Note: Make sure to enable the echo plugin in the propeller config to use this task.
task-plugins:
enabled-plugins:
- echo| Parameter | Type | Description |
|---|---|---|
name |
str |
The name of the task. |
inputs |
Optional[Dict[str, Type]] |
Name and type of inputs specified as a dictionary. e.g. {“a”: int, “b”: str}. |
kwargs |
**kwargs |
All other args required by the parent type - PythonTask. |
Methods
| Method | Description |
|---|---|
compile() |
Generates a node that encapsulates this task in a workflow definition. |
construct_node_metadata() |
Used when constructing the node that encapsulates this task as part of a broader workflow definition. |
dispatch_execute() |
This method translates Flyte’s Type system based input values and invokes the actual call to the executor. |
execute() |
This method will be invoked to execute the task. |
find_lhs() |
|
get_config() |
Returns the task config as a serializable dictionary. |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte. |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary. |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte. |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task. |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte. |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte. |
get_type_for_input_var() |
Returns the python type for an input variable by name. |
get_type_for_output_var() |
Returns the python type for the specified output variable by name. |
local_execute() |
This function is used only in the local execution path and is responsible for calling dispatch execute. |
local_execution_mode() |
|
post_execute() |
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,. |
pre_execute() |
This is the method that will be invoked directly before executing the task method and before all the inputs. |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution. |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, NoneType]Generates a node that encapsulates this task in a workflow definition.
| Parameter | Type | Description |
|---|---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
|
args |
*args |
|
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
) -> typing.Union[flytekit.models.literals.LiteralMap, flytekit.models.dynamic_job.DynamicJobSpec, typing.Coroutine]This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.
VoidPromiseis returned in the case when the task itself declares no outputs.Literal Mapis returned when the task returns either one more outputs in the declaration. Individual outputs may be noneDynamicJobSpecis returned when a dynamic workflow is executed
| Parameter | Type | Description |
|---|---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
|
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
def execute(
kwargs,
) -> AnyThis method will be invoked to execute the task.
| Parameter | Type | Description |
|---|---|---|
kwargs |
**kwargs |
find_lhs()
def find_lhs()get_config()
def get_config(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, str]]Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_container()
def get_container(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Container]Returns the container definition (if any) that is used to run the task on hosted Flyte.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, typing.Any]]Return additional plugin-specific custom data (if any) as a serializable dictionary.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flyteidl.core.tasks_pb2.ExtendedResources]Returns the extended resources to allocate to the task on hosted Flyte.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_input_types()
def get_input_types()Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.K8sPod]Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Sql]Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_type_for_input_var()
def get_type_for_input_var(
k: str,
v: typing.Any,
) -> typing.Type[typing.Any]Returns the python type for an input variable by name.
| Parameter | Type | Description |
|---|---|---|
k |
str |
|
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
) -> typing.Type[typing.Any]Returns the python type for the specified output variable by name.
| Parameter | Type | Description |
|---|---|---|
k |
str |
|
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, typing.Coroutine, NoneType]This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).
| Parameter | Type | Description |
|---|---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
|
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()post_execute()
def post_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
rval: typing.Any,
) -> typing.AnyPost execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
| Parameter | Type | Description |
|---|---|---|
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
are the modified user params as created during the pre_execute step |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
) -> typing.Optional[flytekit.core.context_manager.ExecutionParameters]This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called
This should return either the same context of the mutated context
| Parameter | Type | Description |
|---|---|---|
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
) -> flytekit.models.literals.LiteralMapCall dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
| Parameter | Type | Description |
|---|---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
|
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
| Property | Type | Description |
|---|---|---|
deck_fields |
If not empty, this task will output deck html file for the specified decks |
|
disable_deck |
If true, this task will not output deck html file |
|
docs |
||
enable_deck |
If true, this task will output deck html file |
|
environment |
Any environment variables that supplied during the execution of the task. |
|
instantiated_in |
||
interface |
||
lhs |
||
location |
||
metadata |
||
name |
||
python_interface |
Returns this task’s python interface. |
|
security_context |
||
task_config |
Returns the user-specified task config which is used for plugin-specific handling of the task. |
|
task_type |
||
task_type_version |
flytekit.core.task.ReferenceTask
This is a reference task, the body of the function passed in through the constructor will never be used, only the signature of the function will be. The signature should also match the signature of the task you’re referencing, as stored by Flyte Admin, if not, workflows using this will break upon compilation.
class ReferenceTask(
project: str,
domain: str,
name: str,
version: str,
inputs: Dict[str, type],
outputs: Dict[str, Type],
)| Parameter | Type | Description |
|---|---|---|
project |
str |
|
domain |
str |
|
name |
str |
A unique name for the task instantiation. This is unique for every instance of task. |
version |
str |
|
inputs |
Dict[str, type] |
|
outputs |
Dict[str, Type] |
Methods
| Method | Description |
|---|---|
compile() |
|
construct_node_metadata() |
|
dispatch_execute() |
This method translates Flyte’s Type system based input values and invokes the actual call to the executor. |
execute() |
|
find_lhs() |
|
get_config() |
Returns the task config as a serializable dictionary. |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte. |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary. |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte. |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task. |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte. |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte. |
get_type_for_input_var() |
Returns the python type for an input variable by name. |
get_type_for_output_var() |
Returns the python type for the specified output variable by name. |
local_execute() |
Please see the local_execute comments in the main task. |
local_execution_mode() |
|
post_execute() |
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,. |
pre_execute() |
This is the method that will be invoked directly before executing the task method and before all the inputs. |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution. |
unwrap_literal_map_and_execute() |
Please see the implementation of the dispatch_execute function in the real task. |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
)| Parameter | Type | Description |
|---|---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
|
args |
*args |
|
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
) -> typing.Union[flytekit.models.literals.LiteralMap, flytekit.models.dynamic_job.DynamicJobSpec, typing.Coroutine]This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.
VoidPromiseis returned in the case when the task itself declares no outputs.Literal Mapis returned when the task returns either one more outputs in the declaration. Individual outputs may be noneDynamicJobSpecis returned when a dynamic workflow is executed
| Parameter | Type | Description |
|---|---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
|
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
def execute(
kwargs,
) -> typing.Any| Parameter | Type | Description |
|---|---|---|
kwargs |
**kwargs |
find_lhs()
def find_lhs()get_config()
def get_config(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, str]]Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_container()
def get_container(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Container]Returns the container definition (if any) that is used to run the task on hosted Flyte.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, typing.Any]]Return additional plugin-specific custom data (if any) as a serializable dictionary.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flyteidl.core.tasks_pb2.ExtendedResources]Returns the extended resources to allocate to the task on hosted Flyte.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_input_types()
def get_input_types()Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.K8sPod]Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Sql]Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_type_for_input_var()
def get_type_for_input_var(
k: str,
v: typing.Any,
) -> typing.Type[typing.Any]Returns the python type for an input variable by name.
| Parameter | Type | Description |
|---|---|---|
k |
str |
|
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
) -> typing.Type[typing.Any]Returns the python type for the specified output variable by name.
| Parameter | Type | Description |
|---|---|---|
k |
str |
|
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, NoneType]Please see the local_execute comments in the main task.
| Parameter | Type | Description |
|---|---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
|
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()post_execute()
def post_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
rval: typing.Any,
) -> typing.AnyPost execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
| Parameter | Type | Description |
|---|---|---|
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
are the modified user params as created during the pre_execute step |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
) -> typing.Optional[flytekit.core.context_manager.ExecutionParameters]This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called
This should return either the same context of the mutated context
| Parameter | Type | Description |
|---|---|---|
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
) -> flytekit.models.literals.LiteralMapCall dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
| Parameter | Type | Description |
|---|---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
|
input_literal_map |
flytekit.models.literals.LiteralMap |
unwrap_literal_map_and_execute()
def unwrap_literal_map_and_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
) -> flytekit.models.literals.LiteralMapPlease see the implementation of the dispatch_execute function in the real task.
| Parameter | Type | Description |
|---|---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
|
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
| Property | Type | Description |
|---|---|---|
deck_fields |
If not empty, this task will output deck html file for the specified decks |
|
disable_deck |
If true, this task will not output deck html file |
|
docs |
||
enable_deck |
If true, this task will output deck html file |
|
environment |
Any environment variables that supplied during the execution of the task. |
|
id |
||
instantiated_in |
||
interface |
||
lhs |
||
location |
||
metadata |
||
name |
||
python_interface |
||
reference |
||
security_context |
||
task_config |
Returns the user-specified task config which is used for plugin-specific handling of the task. |
|
task_type |
||
task_type_version |
flytekit.core.task.TaskPlugins
This is the TaskPlugins factory for task types that are derivative of PythonFunctionTask. Every task that the user wishes to use should be available in this factory. Usage
TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type)
# config_object_type is any class that will be passed to the plugin_object as task_config
# Plugin_object_type is a derivative of ``PythonFunctionTask``Examples of available task plugins include different query-based plugins such as
flytekitplugins.athena.task.AthenaTask and flytekitplugins.hive.task.HiveTask, kubeflow
operators like plugins.kfpytorch.flytekitplugins.kfpytorch.task.PyTorchFunctionTask and
plugins.kftensorflow.flytekitplugins.kftensorflow.task.TensorflowFunctionTask, and generic plugins like
flytekitplugins.pod.task.PodFunctionTask which doesn’t integrate with third party tools or services.
The task_config is different for every task plugin type. This is filled out by users when they define a task to
specify plugin-specific behavior and features. For example, with a query type task plugin, the config might store
information related to which database to query.
The plugin_object_type can be used to customize execution behavior and task serialization properties in tandem
with the task_config.
Methods
| Method | Description |
|---|---|
find_pythontask_plugin() |
Returns a PluginObjectType if found or returns the base PythonFunctionTask. |
register_pythontask_plugin() |
Use this method to register a new plugin into Flytekit. |
find_pythontask_plugin()
def find_pythontask_plugin(
plugin_config_type: type,
) -> Type[PythonFunctionTask]Returns a PluginObjectType if found or returns the base PythonFunctionTask
| Parameter | Type | Description |
|---|---|---|
plugin_config_type |
type |
register_pythontask_plugin()
def register_pythontask_plugin(
plugin_config_type: type,
plugin: Type[PythonFunctionTask],
)Use this method to register a new plugin into Flytekit. Usage ::
TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type)
# config_object_type is any class that will be passed to the plugin_object as task_config
# Plugin_object_type is a derivative of ``PythonFunctionTask``| Parameter | Type | Description |
|---|---|---|
plugin_config_type |
type |
|
plugin |
Type[PythonFunctionTask] |