flytekit.core.task
Directory
Classes
Class | Description |
---|---|
Echo |
Base Class for all Tasks with a Python native Interface . |
ReferenceTask |
This is a reference task, the body of the function passed in through the constructor will never be used, only the. |
TaskPlugins |
This is the TaskPlugins factory for task types that are derivative of PythonFunctionTask. |
Methods
Method | Description |
---|---|
decorate_function() |
Decorates the task with additional functionality if necessary. |
eager() |
Eager workflow decorator. |
reference_task() |
A reference task is a pointer to a task that already exists on your Flyte installation. |
task() |
This is the core decorator to use for any task type in flytekit. |
Variables
Property | Type | Description |
---|---|---|
FLYTE_ENABLE_VSCODE_KEY |
str |
|
FuncOut |
TypeVar |
|
P |
ParamSpec |
|
T |
TypeVar |
Methods
decorate_function()
def decorate_function(
fn: Callable[P, Any],
) -> n: a decorated python function.
Decorates the task with additional functionality if necessary.
Parameter | Type |
---|---|
fn |
Callable[P, Any] |
eager()
def eager(
_fn,
args,
kwargs,
) -> Union[EagerAsyncPythonFunctionTask, partial]
Eager workflow decorator.
This type of task will execute all Flyte entities within it eagerly, meaning that all python constructs can be
used inside of an @eager
-decorated function. This is because eager workflows use a
:py:class:~flytekit.remote.remote.FlyteRemote
object to kick off executions when a flyte entity needs to produce a
value. Basically think about it as: every Flyte entity that is called(), the stack frame is an execution with its
own Flyte URL. Results (or the error) are fetched when the execution is finished.
For example:
from flytekit import task, eager
@task
def add_one(x: int) -> int:
return x + 1
@task
def double(x: int) -> int:
return x * 2
@eager
async def eager_workflow(x: int) -> int:
out = add_one(x=x)
return double(x=out)
# run locally with asyncio
if __name__ == "__main__":
import asyncio
result = asyncio.run(eager_workflow(x=1))
print(f"Result: {result}") # "Result: 4"
Unlike :py:func:dynamic workflows <flytekit.dynamic>
, eager workflows are not compiled into a workflow spec, but
uses python’s async
capabilities to execute flyte entities.
Eager workflows only support @task
, @workflow
, and @eager
entities. Conditionals are not supported, use a
plain Python if statement instead.
A client_secret_group
and client_secret_key
is needed for authenticating via
:py:class:~flytekit.remote.remote.FlyteRemote
using the client_credentials
authentication, which is
configured via :py:class:~flytekit.configuration.PlatformConfig
.
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
@eager(
remote=FlyteRemote(config=Config.auto(config_file="config.yaml")),
client_secret_group="my_client_secret_group",
client_secret_key="my_client_secret_key",
)
async def eager_workflow(x: int) -> int:
out = await add_one(x)
return await double(one)
```
Where ``config.yaml`` contains is a flytectl-compatible config file.
For more details, see [`here`](https://docs.flyte.org/en/latest/flytectl/overview.html#configuration).
When using a sandbox cluster started with ``flytectl demo start``, however, the ``client_secret_group``
and ``client_secret_key`` are not needed, :
```python
@eager
async def eager_workflow(x: int) -> int:
...
Parameter | Type |
---|---|
_fn |
|
args |
*args |
kwargs |
**kwargs |
reference_task()
def reference_task(
project: str,
domain: str,
name: str,
version: str,
) -> Callable[[Callable[..., Any]], ReferenceTask]
A reference task is a pointer to a task that already exists on your Flyte installation. This object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface. If at registration time the interface provided causes an issue with compilation, an error will be returned.
Example:
.. literalinclude:: ../../../tests/flytekit/unit/core/test_references.py :pyobject: ref_t1
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
task()
def task(
_task_function: Optional[Callable[P, FuncOut]],
task_config: Optional[T],
cache: Union[bool, Cache],
retries: int,
interruptible: Optional[bool],
deprecated: str,
timeout: Union[datetime.timedelta, int],
container_image: Optional[Union[str, ImageSpec]],
environment: Optional[Dict[str, str]],
requests: Optional[Resources],
limits: Optional[Resources],
secret_requests: Optional[List[Secret]],
execution_mode: PythonFunctionTask.ExecutionBehavior,
node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
task_resolver: Optional[TaskResolverMixin],
docs: Optional[Documentation],
disable_deck: Optional[bool],
enable_deck: Optional[bool],
deck_fields: Optional[Tuple[DeckField, ...]],
pod_template: Optional['PodTemplate'],
pod_template_name: Optional[str],
accelerator: Optional[BaseAccelerator],
pickle_untyped: bool,
shared_memory: Optional[Union[L[True], str]],
resources: Optional[Resources],
kwargs,
) -> Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionTask[T]], PythonFunctionTask[T]]
This is the core decorator to use for any task type in flytekit.
Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties
- Versioned (usually tied to the git revision SHA1)
- Strong interfaces (specified inputs and outputs)
- Declarative
- Independently executable
- Unit testable
For a simple python task,
@task
def my_task(x: int, y: typing.Dict[str, str]) -> str:
...
For specific task types
@task(task_config=Spark(), retries=3)
def my_task(x: int, y: typing.Dict[str, str]) -> str:
...
Please see some cookbook :std:ref:task examples <cookbook:tasks>
for additional information.
Parameter | Type |
---|---|
_task_function |
Optional[Callable[P, FuncOut]] |
task_config |
Optional[T] |
cache |
Union[bool, Cache] |
retries |
int |
interruptible |
Optional[bool] |
deprecated |
str |
timeout |
Union[datetime.timedelta, int] |
container_image |
Optional[Union[str, ImageSpec]] |
environment |
Optional[Dict[str, str]] |
requests |
Optional[Resources] |
limits |
Optional[Resources] |
secret_requests |
Optional[List[Secret]] |
execution_mode |
PythonFunctionTask.ExecutionBehavior |
node_dependency_hints |
Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]] |
task_resolver |
Optional[TaskResolverMixin] |
docs |
Optional[Documentation] |
disable_deck |
Optional[bool] |
enable_deck |
Optional[bool] |
deck_fields |
Optional[Tuple[DeckField, ...]] |
pod_template |
Optional['PodTemplate'] |
pod_template_name |
Optional[str] |
accelerator |
Optional[BaseAccelerator] |
pickle_untyped |
bool |
shared_memory |
Optional[Union[L[True], str]] |
resources |
Optional[Resources] |
kwargs |
**kwargs |
flytekit.core.task.Echo
Base Class for all Tasks with a Python native Interface
. This should be directly used for task types, that do
not have a python function to be executed. Otherwise refer to :py:class:flytekit.PythonFunctionTask
.
class Echo(
name: str,
inputs: Optional[Dict[str, Type]],
kwargs,
)
A task that simply echoes the inputs back to the user. The task’s inputs and outputs interface are the same.
FlytePropeller uses echo plugin to handle this task, and it won’t create a pod for this task. It will simply pass the inputs to the outputs. https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/plugins/testing/echo.go
Note: Make sure to enable the echo plugin in the propeller config to use this task.
task-plugins:
enabled-plugins:
- echo
Parameter | Type |
---|---|
name |
str |
inputs |
Optional[Dict[str, Type]] |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
compile() |
Generates a node that encapsulates this task in a workflow definition. |
construct_node_metadata() |
Used when constructing the node that encapsulates this task as part of a broader workflow definition. |
dispatch_execute() |
This method translates Flyte’s Type system based input values and invokes the actual call to the executor. |
execute() |
This method will be invoked to execute the task. |
find_lhs() |
|
get_config() |
Returns the task config as a serializable dictionary. |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte. |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary. |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte. |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task. |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte. |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte. |
get_type_for_input_var() |
Returns the python type for an input variable by name. |
get_type_for_output_var() |
Returns the python type for the specified output variable by name. |
local_execute() |
This function is used only in the local execution path and is responsible for calling dispatch execute. |
local_execution_mode() |
|
post_execute() |
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,. |
pre_execute() |
This is the method that will be invoked directly before executing the task method and before all the inputs. |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution. |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, NoneType]
Generates a node that encapsulates this task in a workflow definition.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
) -> typing.Union[flytekit.models.literals.LiteralMap, flytekit.models.dynamic_job.DynamicJobSpec, typing.Coroutine]
This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.
VoidPromise
is returned in the case when the task itself declares no outputs.Literal Map
is returned when the task returns either one more outputs in the declaration. Individual outputs may be noneDynamicJobSpec
is returned when a dynamic workflow is executed
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
def execute(
kwargs,
) -> Any
This method will be invoked to execute the task.
Parameter | Type |
---|---|
kwargs |
**kwargs |
find_lhs()
def find_lhs()
get_config()
def get_config(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, str]]
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_container()
def get_container(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Container]
Returns the container definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, typing.Any]]
Return additional plugin-specific custom data (if any) as a serializable dictionary.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flyteidl.core.tasks_pb2.ExtendedResources]
Returns the extended resources to allocate to the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_input_types()
def get_input_types()
Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.K8sPod]
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Sql]
Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_type_for_input_var()
def get_type_for_input_var(
k: str,
v: typing.Any,
) -> typing.Type[typing.Any]
Returns the python type for an input variable by name.
Parameter | Type |
---|---|
k |
str |
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
) -> typing.Type[typing.Any]
Returns the python type for the specified output variable by name.
Parameter | Type |
---|---|
k |
str |
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, typing.Coroutine, NoneType]
This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
post_execute()
def post_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
rval: typing.Any,
) -> typing.Any
Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
Parameter | Type |
---|---|
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
) -> typing.Optional[flytekit.core.context_manager.ExecutionParameters]
This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called
This should return either the same context of the mutated context
Parameter | Type |
---|---|
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
) -> flytekit.models.literals.LiteralMap
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property | Type | Description |
---|---|---|
deck_fields |
If not empty, this task will output deck html file for the specified decks |
|
disable_deck |
If true, this task will not output deck html file |
|
docs |
||
enable_deck |
If true, this task will output deck html file |
|
environment |
Any environment variables that supplied during the execution of the task. |
|
instantiated_in |
||
interface |
||
lhs |
||
location |
||
metadata |
||
name |
||
python_interface |
Returns this task’s python interface. |
|
security_context |
||
task_config |
Returns the user-specified task config which is used for plugin-specific handling of the task. |
|
task_type |
||
task_type_version |
flytekit.core.task.ReferenceTask
This is a reference task, the body of the function passed in through the constructor will never be used, only the signature of the function will be. The signature should also match the signature of the task you’re referencing, as stored by Flyte Admin, if not, workflows using this will break upon compilation.
class ReferenceTask(
project: str,
domain: str,
name: str,
version: str,
inputs: Dict[str, type],
outputs: Dict[str, Type],
)
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
inputs |
Dict[str, type] |
outputs |
Dict[str, Type] |
Methods
Method | Description |
---|---|
compile() |
|
construct_node_metadata() |
|
dispatch_execute() |
This method translates Flyte’s Type system based input values and invokes the actual call to the executor. |
execute() |
|
find_lhs() |
|
get_config() |
Returns the task config as a serializable dictionary. |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte. |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary. |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte. |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task. |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte. |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte. |
get_type_for_input_var() |
Returns the python type for an input variable by name. |
get_type_for_output_var() |
Returns the python type for the specified output variable by name. |
local_execute() |
Please see the local_execute comments in the main task. |
local_execution_mode() |
|
post_execute() |
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,. |
pre_execute() |
This is the method that will be invoked directly before executing the task method and before all the inputs. |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution. |
unwrap_literal_map_and_execute() |
Please see the implementation of the dispatch_execute function in the real task. |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
)
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
) -> typing.Union[flytekit.models.literals.LiteralMap, flytekit.models.dynamic_job.DynamicJobSpec, typing.Coroutine]
This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.
VoidPromise
is returned in the case when the task itself declares no outputs.Literal Map
is returned when the task returns either one more outputs in the declaration. Individual outputs may be noneDynamicJobSpec
is returned when a dynamic workflow is executed
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
def execute(
kwargs,
) -> typing.Any
Parameter | Type |
---|---|
kwargs |
**kwargs |
find_lhs()
def find_lhs()
get_config()
def get_config(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, str]]
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_container()
def get_container(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Container]
Returns the container definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, typing.Any]]
Return additional plugin-specific custom data (if any) as a serializable dictionary.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flyteidl.core.tasks_pb2.ExtendedResources]
Returns the extended resources to allocate to the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_input_types()
def get_input_types()
Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.K8sPod]
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Sql]
Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_type_for_input_var()
def get_type_for_input_var(
k: str,
v: typing.Any,
) -> typing.Type[typing.Any]
Returns the python type for an input variable by name.
Parameter | Type |
---|---|
k |
str |
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
) -> typing.Type[typing.Any]
Returns the python type for the specified output variable by name.
Parameter | Type |
---|---|
k |
str |
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, NoneType]
Please see the local_execute comments in the main task.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
post_execute()
def post_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
rval: typing.Any,
) -> typing.Any
Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
Parameter | Type |
---|---|
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
) -> typing.Optional[flytekit.core.context_manager.ExecutionParameters]
This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called
This should return either the same context of the mutated context
Parameter | Type |
---|---|
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
) -> flytekit.models.literals.LiteralMap
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
unwrap_literal_map_and_execute()
def unwrap_literal_map_and_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
) -> flytekit.models.literals.LiteralMap
Please see the implementation of the dispatch_execute function in the real task.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property | Type | Description |
---|---|---|
deck_fields |
If not empty, this task will output deck html file for the specified decks |
|
disable_deck |
If true, this task will not output deck html file |
|
docs |
||
enable_deck |
If true, this task will output deck html file |
|
environment |
Any environment variables that supplied during the execution of the task. |
|
id |
||
instantiated_in |
||
interface |
||
lhs |
||
location |
||
metadata |
||
name |
||
python_interface |
||
reference |
||
security_context |
||
task_config |
Returns the user-specified task config which is used for plugin-specific handling of the task. |
|
task_type |
||
task_type_version |
flytekit.core.task.TaskPlugins
This is the TaskPlugins factory for task types that are derivative of PythonFunctionTask. Every task that the user wishes to use should be available in this factory. Usage
TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type)
# config_object_type is any class that will be passed to the plugin_object as task_config
# Plugin_object_type is a derivative of ``PythonFunctionTask``
Examples of available task plugins include different query-based plugins such as
:py:class:flytekitplugins.athena.task.AthenaTask
and :py:class:flytekitplugins.hive.task.HiveTask
, kubeflow
operators like :py:class:plugins.kfpytorch.flytekitplugins.kfpytorch.task.PyTorchFunctionTask
and
:py:class:plugins.kftensorflow.flytekitplugins.kftensorflow.task.TensorflowFunctionTask
, and generic plugins like
:py:class:flytekitplugins.pod.task.PodFunctionTask
which doesn’t integrate with third party tools or services.
The task_config
is different for every task plugin type. This is filled out by users when they define a task to
specify plugin-specific behavior and features. For example, with a query type task plugin, the config might store
information related to which database to query.
The plugin_object_type
can be used to customize execution behavior and task serialization properties in tandem
with the task_config
.
Methods
Method | Description |
---|---|
find_pythontask_plugin() |
Returns a PluginObjectType if found or returns the base PythonFunctionTask. |
register_pythontask_plugin() |
Use this method to register a new plugin into Flytekit. |
find_pythontask_plugin()
def find_pythontask_plugin(
plugin_config_type: type,
) -> Type[PythonFunctionTask]
Returns a PluginObjectType if found or returns the base PythonFunctionTask
Parameter | Type |
---|---|
plugin_config_type |
type |
register_pythontask_plugin()
def register_pythontask_plugin(
plugin_config_type: type,
plugin: Type[PythonFunctionTask],
)
Use this method to register a new plugin into Flytekit. Usage ::
TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type)
# config_object_type is any class that will be passed to the plugin_object as task_config
# Plugin_object_type is a derivative of ``PythonFunctionTask``
Parameter | Type |
---|---|
plugin_config_type |
type |
plugin |
Type[PythonFunctionTask] |