0.1.dev2184+g1e0cbe7.d20250401

flytekit.core.task

Directory

Classes

Class Description
Echo Base Class for all Tasks with a Python native Interface.
ReferenceTask This is a reference task, the body of the function passed in through the constructor will never be used, only the.
TaskPlugins This is the TaskPlugins factory for task types that are derivative of PythonFunctionTask.

Methods

Method Description
decorate_function() Decorates the task with additional functionality if necessary.
eager() Eager workflow decorator.
reference_task() A reference task is a pointer to a task that already exists on your Flyte installation.
task() This is the core decorator to use for any task type in flytekit.

Variables

Property Type Description
FLYTE_ENABLE_VSCODE_KEY str
FuncOut TypeVar
P ParamSpec
T TypeVar

Methods

decorate_function()

def decorate_function(
    fn: Callable[P, Any],
) -> n: a decorated python function.

Decorates the task with additional functionality if necessary.

Parameter Type
fn Callable[P, Any]

eager()

def eager(
    _fn,
    args,
    kwargs,
) -> Union[EagerAsyncPythonFunctionTask, partial]

Eager workflow decorator.

This type of task will execute all Flyte entities within it eagerly, meaning that all python constructs can be used inside of an @eager-decorated function. This is because eager workflows use a :py:class:~flytekit.remote.remote.FlyteRemote object to kick off executions when a flyte entity needs to produce a value. Basically think about it as: every Flyte entity that is called(), the stack frame is an execution with its own Flyte URL. Results (or the error) are fetched when the execution is finished.

For example:


    from flytekit import task, eager

    @task
    def add_one(x: int) -> int:
        return x + 1

    @task
    def double(x: int) -> int:
        return x * 2

    @eager
    async def eager_workflow(x: int) -> int:
        out = add_one(x=x)
        return double(x=out)

    # run locally with asyncio
    if __name__ == "__main__":
        import asyncio

        result = asyncio.run(eager_workflow(x=1))
        print(f"Result: {result}")  # "Result: 4"

Unlike :py:func:dynamic workflows <flytekit.dynamic>, eager workflows are not compiled into a workflow spec, but uses python’s async capabilities to execute flyte entities.

Eager workflows only support @task, @workflow, and @eager entities. Conditionals are not supported, use a plain Python if statement instead.

A client_secret_group and client_secret_key is needed for authenticating via :py:class:~flytekit.remote.remote.FlyteRemote using the client_credentials authentication, which is configured via :py:class:~flytekit.configuration.PlatformConfig.


     from flytekit.remote import FlyteRemote
     from flytekit.configuration import Config

     @eager(
         remote=FlyteRemote(config=Config.auto(config_file="config.yaml")),
         client_secret_group="my_client_secret_group",
         client_secret_key="my_client_secret_key",
     )
     async def eager_workflow(x: int) -> int:
         out = await add_one(x)
         return await double(one)
 ```
Where ``config.yaml`` contains is a flytectl-compatible config file.
For more details, see [`here`](https://docs.flyte.org/en/latest/flytectl/overview.html#configuration).

When using a sandbox cluster started with ``flytectl demo start``, however, the ``client_secret_group``
and ``client_secret_key`` are not needed, :

```python

     @eager
     async def eager_workflow(x: int) -> int:
         ...
Parameter Type
_fn
args *args
kwargs **kwargs

reference_task()

def reference_task(
    project: str,
    domain: str,
    name: str,
    version: str,
) -> Callable[[Callable[..., Any]], ReferenceTask]

A reference task is a pointer to a task that already exists on your Flyte installation. This object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface. If at registration time the interface provided causes an issue with compilation, an error will be returned.

Example:

.. literalinclude:: ../../../tests/flytekit/unit/core/test_references.py :pyobject: ref_t1

Parameter Type
project str
domain str
name str
version str

task()

def task(
    _task_function: Optional[Callable[P, FuncOut]],
    task_config: Optional[T],
    cache: Union[bool, Cache],
    retries: int,
    interruptible: Optional[bool],
    deprecated: str,
    timeout: Union[datetime.timedelta, int],
    container_image: Optional[Union[str, ImageSpec]],
    environment: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    secret_requests: Optional[List[Secret]],
    execution_mode: PythonFunctionTask.ExecutionBehavior,
    node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
    task_resolver: Optional[TaskResolverMixin],
    docs: Optional[Documentation],
    disable_deck: Optional[bool],
    enable_deck: Optional[bool],
    deck_fields: Optional[Tuple[DeckField, ...]],
    pod_template: Optional['PodTemplate'],
    pod_template_name: Optional[str],
    accelerator: Optional[BaseAccelerator],
    pickle_untyped: bool,
    shared_memory: Optional[Union[L[True], str]],
    resources: Optional[Resources],
    kwargs,
) -> Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionTask[T]], PythonFunctionTask[T]]

This is the core decorator to use for any task type in flytekit.

Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties

  • Versioned (usually tied to the git revision SHA1)
  • Strong interfaces (specified inputs and outputs)
  • Declarative
  • Independently executable
  • Unit testable

For a simple python task,


    @task
    def my_task(x: int, y: typing.Dict[str, str]) -> str:
        ...

For specific task types


    @task(task_config=Spark(), retries=3)
    def my_task(x: int, y: typing.Dict[str, str]) -> str:
        ...

Please see some cookbook :std:ref:task examples <cookbook:tasks> for additional information.

Parameter Type
_task_function Optional[Callable[P, FuncOut]]
task_config Optional[T]
cache Union[bool, Cache]
retries int
interruptible Optional[bool]
deprecated str
timeout Union[datetime.timedelta, int]
container_image Optional[Union[str, ImageSpec]]
environment Optional[Dict[str, str]]
requests Optional[Resources]
limits Optional[Resources]
secret_requests Optional[List[Secret]]
execution_mode PythonFunctionTask.ExecutionBehavior
node_dependency_hints Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]]
task_resolver Optional[TaskResolverMixin]
docs Optional[Documentation]
disable_deck Optional[bool]
enable_deck Optional[bool]
deck_fields Optional[Tuple[DeckField, ...]]
pod_template Optional['PodTemplate']
pod_template_name Optional[str]
accelerator Optional[BaseAccelerator]
pickle_untyped bool
shared_memory Optional[Union[L[True], str]]
resources Optional[Resources]
kwargs **kwargs

flytekit.core.task.Echo

Base Class for all Tasks with a Python native Interface. This should be directly used for task types, that do not have a python function to be executed. Otherwise refer to :py:class:flytekit.PythonFunctionTask.

class Echo(
    name: str,
    inputs: Optional[Dict[str, Type]],
    kwargs,
)

A task that simply echoes the inputs back to the user. The task’s inputs and outputs interface are the same.

FlytePropeller uses echo plugin to handle this task, and it won’t create a pod for this task. It will simply pass the inputs to the outputs. https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/plugins/testing/echo.go

Note: Make sure to enable the echo plugin in the propeller config to use this task.

task-plugins:
  enabled-plugins:
    - echo
Parameter Type
name str
inputs Optional[Dict[str, Type]]
kwargs **kwargs

Methods

Method Description
compile() Generates a node that encapsulates this task in a workflow definition.
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute() This method translates Flyte’s Type system based input values and invokes the actual call to the executor.
execute() This method will be invoked to execute the task.
find_lhs()
get_config() Returns the task config as a serializable dictionary.
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte.
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary.
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte.
get_input_types() Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
get_type_for_input_var() Returns the python type for an input variable by name.
get_type_for_output_var() Returns the python type for the specified output variable by name.
local_execute() This function is used only in the local execution path and is responsible for calling dispatch execute.
local_execution_mode()
post_execute() Post execute is called after the execution has completed, with the user_params and can be used to clean-up,.
pre_execute() This is the method that will be invoked directly before executing the task method and before all the inputs.
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution.

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, NoneType]

Generates a node that encapsulates this task in a workflow definition.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

dispatch_execute()

def dispatch_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
) -> typing.Union[flytekit.models.literals.LiteralMap, flytekit.models.dynamic_job.DynamicJobSpec, typing.Coroutine]

This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.

  • VoidPromise is returned in the case when the task itself declares no outputs.
  • Literal Map is returned when the task returns either one more outputs in the declaration. Individual outputs may be none
  • DynamicJobSpec is returned when a dynamic workflow is executed
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

execute()

def execute(
    kwargs,
) -> Any

This method will be invoked to execute the task.

Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_config()

def get_config(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, str]]

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_container()

def get_container(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Container]

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_custom()

def get_custom(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, typing.Any]]

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flyteidl.core.tasks_pb2.ExtendedResources]

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.K8sPod]

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Sql]

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
) -> typing.Type[typing.Any]

Returns the python type for an input variable by name.

Parameter Type
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
) -> typing.Type[typing.Any]

Returns the python type for the specified output variable by name.

Parameter Type
k str
v typing.Any

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, typing.Coroutine, NoneType]

This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

post_execute()

def post_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
    rval: typing.Any,
) -> typing.Any

Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]
rval typing.Any

pre_execute()

def pre_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
) -> typing.Optional[flytekit.core.context_manager.ExecutionParameters]

This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called

This should return either the same context of the mutated context

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
) -> flytekit.models.literals.LiteralMap

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

Properties

Property Type Description
deck_fields
If not empty, this task will output deck html file for the specified decks
disable_deck
If true, this task will not output deck html file
docs
enable_deck
If true, this task will output deck html file
environment
Any environment variables that supplied during the execution of the task.
instantiated_in
interface
lhs
location
metadata
name
python_interface
Returns this task’s python interface.
security_context
task_config
Returns the user-specified task config which is used for plugin-specific handling of the task.
task_type
task_type_version

flytekit.core.task.ReferenceTask

This is a reference task, the body of the function passed in through the constructor will never be used, only the signature of the function will be. The signature should also match the signature of the task you’re referencing, as stored by Flyte Admin, if not, workflows using this will break upon compilation.

class ReferenceTask(
    project: str,
    domain: str,
    name: str,
    version: str,
    inputs: Dict[str, type],
    outputs: Dict[str, Type],
)
Parameter Type
project str
domain str
name str
version str
inputs Dict[str, type]
outputs Dict[str, Type]

Methods

Method Description
compile()
construct_node_metadata()
dispatch_execute() This method translates Flyte’s Type system based input values and invokes the actual call to the executor.
execute()
find_lhs()
get_config() Returns the task config as a serializable dictionary.
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte.
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary.
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte.
get_input_types() Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
get_type_for_input_var() Returns the python type for an input variable by name.
get_type_for_output_var() Returns the python type for the specified output variable by name.
local_execute() Please see the local_execute comments in the main task.
local_execution_mode()
post_execute() Post execute is called after the execution has completed, with the user_params and can be used to clean-up,.
pre_execute() This is the method that will be invoked directly before executing the task method and before all the inputs.
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution.
unwrap_literal_map_and_execute() Please see the implementation of the dispatch_execute function in the real task.

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
)
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

dispatch_execute()

def dispatch_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
) -> typing.Union[flytekit.models.literals.LiteralMap, flytekit.models.dynamic_job.DynamicJobSpec, typing.Coroutine]

This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.

  • VoidPromise is returned in the case when the task itself declares no outputs.
  • Literal Map is returned when the task returns either one more outputs in the declaration. Individual outputs may be none
  • DynamicJobSpec is returned when a dynamic workflow is executed
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

execute()

def execute(
    kwargs,
) -> typing.Any
Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_config()

def get_config(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, str]]

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_container()

def get_container(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Container]

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_custom()

def get_custom(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, typing.Any]]

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flyteidl.core.tasks_pb2.ExtendedResources]

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.K8sPod]

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Sql]

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
) -> typing.Type[typing.Any]

Returns the python type for an input variable by name.

Parameter Type
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
) -> typing.Type[typing.Any]

Returns the python type for the specified output variable by name.

Parameter Type
k str
v typing.Any

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, NoneType]

Please see the local_execute comments in the main task.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

post_execute()

def post_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
    rval: typing.Any,
) -> typing.Any

Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]
rval typing.Any

pre_execute()

def pre_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
) -> typing.Optional[flytekit.core.context_manager.ExecutionParameters]

This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called

This should return either the same context of the mutated context

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
) -> flytekit.models.literals.LiteralMap

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

unwrap_literal_map_and_execute()

def unwrap_literal_map_and_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
) -> flytekit.models.literals.LiteralMap

Please see the implementation of the dispatch_execute function in the real task.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

Properties

Property Type Description
deck_fields
If not empty, this task will output deck html file for the specified decks
disable_deck
If true, this task will not output deck html file
docs
enable_deck
If true, this task will output deck html file
environment
Any environment variables that supplied during the execution of the task.
id
instantiated_in
interface
lhs
location
metadata
name
python_interface
reference
security_context
task_config
Returns the user-specified task config which is used for plugin-specific handling of the task.
task_type
task_type_version

flytekit.core.task.TaskPlugins

This is the TaskPlugins factory for task types that are derivative of PythonFunctionTask. Every task that the user wishes to use should be available in this factory. Usage


    TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type)
    # config_object_type is any class that will be passed to the plugin_object as task_config
    # Plugin_object_type is a derivative of ``PythonFunctionTask``

Examples of available task plugins include different query-based plugins such as :py:class:flytekitplugins.athena.task.AthenaTask and :py:class:flytekitplugins.hive.task.HiveTask, kubeflow operators like :py:class:plugins.kfpytorch.flytekitplugins.kfpytorch.task.PyTorchFunctionTask and :py:class:plugins.kftensorflow.flytekitplugins.kftensorflow.task.TensorflowFunctionTask, and generic plugins like :py:class:flytekitplugins.pod.task.PodFunctionTask which doesn’t integrate with third party tools or services.

The task_config is different for every task plugin type. This is filled out by users when they define a task to specify plugin-specific behavior and features. For example, with a query type task plugin, the config might store information related to which database to query.

The plugin_object_type can be used to customize execution behavior and task serialization properties in tandem with the task_config.

Methods

Method Description
find_pythontask_plugin() Returns a PluginObjectType if found or returns the base PythonFunctionTask.
register_pythontask_plugin() Use this method to register a new plugin into Flytekit.

find_pythontask_plugin()

def find_pythontask_plugin(
    plugin_config_type: type,
) -> Type[PythonFunctionTask]

Returns a PluginObjectType if found or returns the base PythonFunctionTask

Parameter Type
plugin_config_type type

register_pythontask_plugin()

def register_pythontask_plugin(
    plugin_config_type: type,
    plugin: Type[PythonFunctionTask],
)

Use this method to register a new plugin into Flytekit. Usage ::


    TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type)
    # config_object_type is any class that will be passed to the plugin_object as task_config
    # Plugin_object_type is a derivative of ``PythonFunctionTask``
Parameter Type
plugin_config_type type
plugin Type[PythonFunctionTask]