1.15.4.dev2+g3e3ce2426

flytekit.extras.tasks.shell

Directory

Classes

Class Description
AttrDict Convert a dictionary to an attribute style lookup.
ExecutionParameters This is a run-time user-centric context object that is accessible to every @task method.
FlyteDirectory None.
FlyteFile None.
Interface A Python native interface object, like inspect.
OutputLocation .
ProcessResult Stores a process return code, standard output and standard error.
PythonInstanceTask This class should be used as the base class for all Tasks that do not have a user defined function body, but have.
RawShellTask None.
ShellTask None.
TaskPlugins This is the TaskPlugins factory for task types that are derivative of PythonFunctionTask.

Errors

flytekit.extras.tasks.shell.AttrDict

Convert a dictionary to an attribute style lookup. Do not use this in regular places, this is used for namespacing inputs and outputs

def AttrDict(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

flytekit.extras.tasks.shell.ExecutionParameters

This is a run-time user-centric context object that is accessible to every @task method. It can be accessed using

.. code-block:: python

flytekit.current_context()

This object provides the following

  • a statsd handler
  • a logging handler
  • the execution ID as an :py:class:flytekit.models.core.identifier.WorkflowExecutionIdentifier object
  • a working directory for the user to write arbitrary files to

Please do not confuse this object with the :py:class:flytekit.FlyteContext object.

def ExecutionParameters(
    execution_date,
    tmp_dir,
    stats,
    execution_id: typing.Optional[_identifier.WorkflowExecutionIdentifier],
    logging,
    raw_output_prefix,
    output_metadata_prefix,
    checkpoint,
    decks,
    task_id: typing.Optional[_identifier.Identifier],
    enable_deck: bool,
    kwargs,
):
Parameter Type
execution_date
tmp_dir
stats
execution_id typing.Optional[_identifier.WorkflowExecutionIdentifier]
logging
raw_output_prefix
output_metadata_prefix
checkpoint
decks
task_id typing.Optional[_identifier.Identifier]
enable_deck bool
kwargs **kwargs

Methods

Method Description
builder() None
get() Returns task specific context if present else raise an error
has_attr() None
new_builder() None
with_enable_deck() None
with_task_sandbox() None

builder()

def builder()

get()

def get(
    key: str,
):

Returns task specific context if present else raise an error. The returned context will match the key

Parameter Type
key str

has_attr()

def has_attr(
    attr_name: str,
):
Parameter Type
attr_name str

new_builder()

def new_builder(
    current: Optional[ExecutionParameters],
):
Parameter Type
current Optional[ExecutionParameters]

with_enable_deck()

def with_enable_deck(
    enable_deck: bool,
):
Parameter Type
enable_deck bool

with_task_sandbox()

def with_task_sandbox()

Properties

Property Type Description
checkpoint
decks
default_deck
enable_deck
execution_date
execution_id
logging
output_metadata_prefix
raw_output_prefix
secrets
stats
task_id
timeline_deck
working_directory

flytekit.extras.tasks.shell.FlyteDirectory

def FlyteDirectory(
    path: typing.Union[str, os.PathLike],
    downloader: typing.Optional[typing.Callable],
    remote_directory: typing.Optional[typing.Union[os.PathLike, str, typing.Literal[False]]],
):
Parameter Type
path typing.Union[str, os.PathLike]
downloader typing.Optional[typing.Callable]
remote_directory typing.Optional[typing.Union[os.PathLike, str, typing.Literal[False]]]

Methods

Method Description
crawl() Crawl returns a generator of all files prefixed by any sub-folders under the given “FlyteDirectory”
deserialize_flyte_dir() None
download() None
extension() None
from_dict() None
from_json() None
from_source() Create a new FlyteDirectory object with the remote source set to the input
listdir() This function will list all files and folders in the given directory, but without downloading the contents
new() Create a new FlyteDirectory object in current Flyte working directory
new_dir() This will create a new folder under the current folder
new_file() This will create a new file under the current folder
new_remote() Create a new FlyteDirectory object using the currently configured default remote in the context (i
schema() None
serialize_flyte_dir() None
to_dict() None
to_json() None

crawl()

def crawl(
    maxdepth: typing.Optional[int],
    topdown: bool,
    kwargs,
):

Crawl returns a generator of all files prefixed by any sub-folders under the given “FlyteDirectory”. if details=True is passed, then it will return a dictionary as specified by fsspec.

Example:

list(fd.crawl()) [("/base", “file1”), ("/base", “dir1/file1”), ("/base", “dir2/file1”), ("/base", “dir1/dir/file1”)]

list(x.crawl(detail=True)) [(’/tmp/test’, {‘my-dir/ab.py’: {’name’: ‘/tmp/test/my-dir/ab.py’, ‘size’: 0, ’type’: ‘file’, ‘created’: 1677720780.2318847, ‘islink’: False, ‘mode’: 33188, ‘uid’: 501, ‘gid’: 0, ‘mtime’: 1677720780.2317934, ‘ino’: 1694329, ’nlink’: 1}})]

Parameter Type
maxdepth typing.Optional[int]
topdown bool
kwargs **kwargs

deserialize_flyte_dir()

def deserialize_flyte_dir(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

download()

def download()

extension()

def extension()

from_dict()

def from_dict(
    kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
    infer_missing,
):
Parameter Type
kvs typing.Union[dict, list, str, int, float, bool, NoneType]
infer_missing

from_json()

def from_json(
    s: typing.Union[str, bytes, bytearray],
    parse_float,
    parse_int,
    parse_constant,
    infer_missing,
    kw,
):
Parameter Type
s typing.Union[str, bytes, bytearray]
parse_float
parse_int
parse_constant
infer_missing
kw

from_source()

def from_source(
    source: str | os.PathLike,
):

Create a new FlyteDirectory object with the remote source set to the input

Parameter Type
source `str

listdir()

def listdir(
    directory: FlyteDirectory,
):

This function will list all files and folders in the given directory, but without downloading the contents. In addition, it will return a list of FlyteFile and FlyteDirectory objects that have ability to lazily download the contents of the file/folder. For example:

.. code-block:: python

entity = FlyteDirectory.listdir(directory) for e in entity: print(“s3 object:”, e.remote_source)

s3 object: s3://test-flytedir/file1.txt

s3 object: s3://test-flytedir/file2.txt

s3 object: s3://test-flytedir/sub_dir

open(entity[0], “r”) # This will download the file to the local disk. open(entity[0], “r”) # flytekit will read data from the local disk if you open it again.

Parameter Type
directory FlyteDirectory

new()

def new(
    dirname: str | os.PathLike,
):

Create a new FlyteDirectory object in current Flyte working directory.

Parameter Type
dirname `str

new_dir()

def new_dir(
    name: typing.Optional[str],
):

This will create a new folder under the current folder. If given a name, it will use the name given, otherwise it’ll pick a random string. Collisions are not checked.

Parameter Type
name typing.Optional[str]

new_file()

def new_file(
    name: typing.Optional[str],
):

This will create a new file under the current folder. If given a name, it will use the name given, otherwise it’ll pick a random string. Collisions are not checked.

Parameter Type
name typing.Optional[str]

new_remote()

def new_remote(
    stem: typing.Optional[str],
    alt: typing.Optional[str],
):

Create a new FlyteDirectory object using the currently configured default remote in the context (i.e. the raw_output_prefix configured in the current FileAccessProvider object in the context). This is used if you explicitly have a folder somewhere that you want to create files under. If you want to write a whole folder, you can let your task return a FlyteDirectory object, and let flytekit handle the uploading.

Parameter Type
stem typing.Optional[str]
alt typing.Optional[str]

schema()

def schema(
    infer_missing: bool,
    only,
    exclude,
    many: bool,
    context,
    load_only,
    dump_only,
    partial: bool,
    unknown,
):
Parameter Type
infer_missing bool
only
exclude
many bool
context
load_only
dump_only
partial bool
unknown

serialize_flyte_dir()

def serialize_flyte_dir(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

to_dict()

def to_dict(
    encode_json,
):
Parameter Type
encode_json

to_json()

def to_json(
    skipkeys: bool,
    ensure_ascii: bool,
    check_circular: bool,
    allow_nan: bool,
    indent: typing.Union[int, str, NoneType],
    separators: typing.Tuple[str, str],
    default: typing.Callable,
    sort_keys: bool,
    kw,
):
Parameter Type
skipkeys bool
ensure_ascii bool
check_circular bool
allow_nan bool
indent typing.Union[int, str, NoneType]
separators typing.Tuple[str, str]
default typing.Callable
sort_keys bool
kw

Properties

Property Type Description
downloaded
remote_directory
remote_source
sep

flytekit.extras.tasks.shell.FlyteFile

def FlyteFile(
    path: typing.Union[str, os.PathLike],
    downloader: typing.Callable,
    remote_path: typing.Optional[typing.Union[os.PathLike, str, bool]],
    metadata: typing.Optional[dict[str, str]],
):

FlyteFile’s init method.

Parameter Type
path typing.Union[str, os.PathLike]
downloader typing.Callable
remote_path typing.Optional[typing.Union[os.PathLike, str, bool]]
metadata typing.Optional[dict[str, str]]

Methods

Method Description
deserialize_flyte_file() None
download() None
extension() None
from_dict() None
from_json() None
from_source() Create a new FlyteFile object with the remote source set to the input
new() Create a new FlyteFile object in the current Flyte working directory
new_remote_file() Create a new FlyteFile object with a remote path
open() Returns a streaming File handle
serialize_flyte_file() None
to_dict() None
to_json() None

deserialize_flyte_file()

def deserialize_flyte_file(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

download()

def download()

extension()

def extension()

from_dict()

def from_dict(
    d,
    dialect,
):
Parameter Type
d
dialect

from_json()

def from_json(
    data: typing.Union[str, bytes, bytearray],
    decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
    from_dict_kwargs: typing.Any,
):
Parameter Type
data typing.Union[str, bytes, bytearray]
decoder collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]]
from_dict_kwargs typing.Any

from_source()

def from_source(
    source: str | os.PathLike,
):

Create a new FlyteFile object with the remote source set to the input

Parameter Type
source `str

new()

def new(
    filename: str | os.PathLike,
):

Create a new FlyteFile object in the current Flyte working directory

Parameter Type
filename `str

new_remote_file()

def new_remote_file(
    name: typing.Optional[str],
    alt: typing.Optional[str],
):

Create a new FlyteFile object with a remote path.

Parameter Type
name typing.Optional[str]
alt typing.Optional[str]

open()

def open(
    mode: str,
    cache_type: typing.Optional[str],
    cache_options: typing.Optional[typing.Dict[str, typing.Any]],
):

Returns a streaming File handle

.. code-block:: python

@task def copy_file(ff: FlyteFile) -> FlyteFile: new_file = FlyteFile.new_remote_file() with ff.open(“rb”, cache_type=“readahead”) as r: with new_file.open(“wb”) as w: w.write(r.read()) return new_file

Parameter Type
mode str
cache_type typing.Optional[str]
cache_options typing.Optional[typing.Dict[str, typing.Any]]

serialize_flyte_file()

def serialize_flyte_file(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

to_dict()

def to_dict()

to_json()

def to_json(
    encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
    to_dict_kwargs: typing.Any,
):
Parameter Type
encoder collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]]
to_dict_kwargs typing.Any

Properties

Property Type Description
downloaded
remote_path
remote_source

flytekit.extras.tasks.shell.FlyteRecoverableException

Common base class for all non-exit exceptions.

def FlyteRecoverableException(
    args,
    timestamp: typing.Optional[float],
):
Parameter Type
args *args
timestamp typing.Optional[float]

Properties

Property Type Description
timestamp

flytekit.extras.tasks.shell.Interface

A Python native interface object, like inspect.signature but simpler.

def Interface(
    inputs: Union[Optional[Dict[str, Type]], Optional[Dict[str, Tuple[Type, Any]]]],
    outputs: Union[Optional[Dict[str, Type]], Optional[Dict[str, Optional[Type]]]],
    output_tuple_name: Optional[str],
    docstring: Optional[Docstring],
):
Parameter Type
inputs Union[Optional[Dict[str, Type]], Optional[Dict[str, Tuple[Type, Any]]]]
outputs Union[Optional[Dict[str, Type]], Optional[Dict[str, Optional[Type]]]]
output_tuple_name Optional[str]
docstring Optional[Docstring]

Methods

Method Description
remove_inputs() This method is useful in removing some variables from the Flyte backend inputs specification, as these are
with_inputs() Use this to add additional inputs to the interface
with_outputs() This method allows addition of extra outputs are expected from a task specification

remove_inputs()

def remove_inputs(
    vars: Optional[List[str]],
):

This method is useful in removing some variables from the Flyte backend inputs specification, as these are implicit local only inputs or will be supplied by the library at runtime. For example, spark-session etc It creates a new instance of interface with the requested variables removed

Parameter Type
vars Optional[List[str]]

with_inputs()

def with_inputs(
    extra_inputs: Dict[str, Type],
):

Use this to add additional inputs to the interface. This is useful for adding additional implicit inputs that are added without the user requesting for them

Parameter Type
extra_inputs Dict[str, Type]

with_outputs()

def with_outputs(
    extra_outputs: Dict[str, Type],
):

This method allows addition of extra outputs are expected from a task specification

Parameter Type
extra_outputs Dict[str, Type]

Properties

Property Type Description
default_inputs_as_kwargs
docstring
inputs
inputs_with_defaults
output_names
output_tuple
output_tuple_name
outputs

flytekit.extras.tasks.shell.OutputLocation

def OutputLocation(
    var: str,
    var_type: typing.Type,
    location: typing.Union[os.PathLike, str],
):
Parameter Type
var str
var_type typing.Type
location typing.Union[os.PathLike, str]

flytekit.extras.tasks.shell.ProcessResult

Stores a process return code, standard output and standard error.

def ProcessResult(
    returncode: int,
    output: str,
    error: str,
):
Parameter Type
returncode int
output str
error str

flytekit.extras.tasks.shell.PythonInstanceTask

This class should be used as the base class for all Tasks that do not have a user defined function body, but have a platform defined execute method. (Execute needs to be overridden). This base class ensures that the module loader will invoke the right class automatically, by capturing the module name and variable in the module name.

.. code-block: python

x = MyInstanceTask(name=“x”, …..)

this can be invoked as

x(a=5) # depending on the interface of the defined task

def PythonInstanceTask(
    name: str,
    task_config: T,
    task_type: str,
    task_resolver: Optional[TaskResolverMixin],
    kwargs,
):

Please see class level documentation.

Parameter Type
name str
task_config T
task_type str
task_resolver Optional[TaskResolverMixin]
kwargs **kwargs

Methods

Method Description
compile() Generates a node that encapsulates this task in a workflow definition
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition
dispatch_execute() This method translates Flyte’s Type system based input values and invokes the actual call to the executor
execute() This method will be invoked to execute the task
find_lhs() None
get_command() Returns the command which should be used in the container definition for the serialized version of this task
get_config() Returns the task config as a serializable dictionary
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary
get_default_command() Returns the default pyflyte-execute command used to run this on hosted Flyte platforms
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte
get_image() Update image spec based on fast registration usage, and return string representing the image
get_input_types() Returns the names and python types as a dictionary for the inputs of this task
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte
get_type_for_input_var() Returns the python type for an input variable by name
get_type_for_output_var() Returns the python type for the specified output variable by name
local_execute() This function is used only in the local execution path and is responsible for calling dispatch execute
local_execution_mode() None
post_execute() Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
pre_execute() This is the method that will be invoked directly before executing the task method and before all the inputs
reset_command_fn() Resets the command which should be used in the container definition of this task to the default arguments
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution
set_command_fn() By default, the task will run on the Flyte platform using the pyflyte-execute command
set_resolver() By default, flytekit uses the DefaultTaskResolver to resolve the task

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):

Generates a node that encapsulates this task in a workflow definition.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

dispatch_execute()

def dispatch_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.

  • VoidPromise is returned in the case when the task itself declares no outputs.
  • Literal Map is returned when the task returns either one more outputs in the declaration. Individual outputs may be none
  • DynamicJobSpec is returned when a dynamic workflow is executed
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

execute()

def execute(
    kwargs,
):

This method will be invoked to execute the task.

Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_command()

def get_command(
    settings: SerializationSettings,
):

Returns the command which should be used in the container definition for the serialized version of this task registered on a hosted Flyte platform.

Parameter Type
settings SerializationSettings

get_config()

def get_config(
    settings: SerializationSettings,
):

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type
settings SerializationSettings

get_container()

def get_container(
    settings: SerializationSettings,
):

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings SerializationSettings

get_custom()

def get_custom(
    settings: flytekit.configuration.SerializationSettings,
):

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_default_command()

def get_default_command(
    settings: SerializationSettings,
):

Returns the default pyflyte-execute command used to run this on hosted Flyte platforms.

Parameter Type
settings SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: SerializationSettings,
):

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type
settings SerializationSettings

get_image()

def get_image(
    settings: SerializationSettings,
):

Update image spec based on fast registration usage, and return string representing the image

Parameter Type
settings SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: SerializationSettings,
):

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings SerializationSettings

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
):

Returns the python type for an input variable by name.

Parameter Type
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
):

Returns the python type for the specified output variable by name.

Parameter Type
k str
v typing.Any

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

post_execute()

def post_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
    rval: typing.Any,
):

Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]
rval typing.Any

pre_execute()

def pre_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):

This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called

This should return either the same context of the mutated context

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]

reset_command_fn()

def reset_command_fn()

Resets the command which should be used in the container definition of this task to the default arguments. This is useful when the command line is overridden at serialization time.

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

set_command_fn()

def set_command_fn(
    get_command_fn: Optional[Callable[[SerializationSettings], List[str]]],
):

By default, the task will run on the Flyte platform using the pyflyte-execute command. However, it can be useful to update the command with which the task is serialized for specific cases like running map tasks (“pyflyte-map-execute”) or for fast-executed tasks.

Parameter Type
get_command_fn Optional[Callable[[SerializationSettings], List[str]]]

set_resolver()

def set_resolver(
    resolver: TaskResolverMixin,
):

By default, flytekit uses the DefaultTaskResolver to resolve the task. This method allows the user to set a custom task resolver. It can be useful to override the task resolver for specific cases like running tasks in the jupyter notebook.

Parameter Type
resolver TaskResolverMixin

Properties

Property Type Description
container_image
deck_fields
disable_deck
docs
enable_deck
environment
instantiated_in
interface
lhs
location
metadata
name
python_interface
resources
security_context
task_config
task_resolver
task_type
task_type_version

flytekit.extras.tasks.shell.RawShellTask

def RawShellTask(
    name: str,
    debug: bool,
    script: typing.Optional[str],
    script_file: typing.Optional[str],
    task_config: ~T,
    inputs: typing.Optional[typing.Dict[str, typing.Type]],
    output_locs: typing.Optional[typing.List[flytekit.extras.tasks.shell.OutputLocation]],
    kwargs,
):

The RawShellTask is a minimal extension of the existing ShellTask. It’s purpose is to support wrapping a “raw” or “pure” shell script which needs to be executed with some environment variables set, and some arguments, which may not be known until execution time.

This class is not meant to be instantiated into tasks by users, but used with the factory function get_raw_shell_task(). An instance of this class will be returned with either user-specified or default template. The template itself will export the desired environment variables, and subsequently execute the desired “raw” script with the specified arguments.

.. note:: This means that within your workflow, you can dynamically control the env variables, arguments, and even the actual script you want to run.

.. note:: The downside is that a dynamic workflow will be required. The “raw” script passed in at execution time must be at the specified location.

These args are forwarded directly to the parent ShellTask constructor as behavior does not diverge

Parameter Type
name str
debug bool
script typing.Optional[str]
script_file typing.Optional[str]
task_config ~T
inputs typing.Optional[typing.Dict[str, typing.Type]]
output_locs typing.Optional[typing.List[flytekit.extras.tasks.shell.OutputLocation]]
kwargs **kwargs

Methods

Method Description
compile() Generates a node that encapsulates this task in a workflow definition
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition
dispatch_execute() This method translates Flyte’s Type system based input values and invokes the actual call to the executor
execute() Executes the given script by substituting the inputs and outputs and extracts the outputs from the filesystem
find_lhs() None
get_command() Returns the command which should be used in the container definition for the serialized version of this task
get_config() Returns the task config as a serializable dictionary
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary
get_default_command() Returns the default pyflyte-execute command used to run this on hosted Flyte platforms
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte
get_image() Update image spec based on fast registration usage, and return string representing the image
get_input_types() Returns the names and python types as a dictionary for the inputs of this task
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte
get_type_for_input_var() Returns the python type for an input variable by name
get_type_for_output_var() Returns the python type for the specified output variable by name
local_execute() This function is used only in the local execution path and is responsible for calling dispatch execute
local_execution_mode() None
make_export_string_from_env_dict() Utility function to convert a dictionary of desired environment variable key: value pairs into a string of
post_execute() Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
pre_execute() This is the method that will be invoked directly before executing the task method and before all the inputs
reset_command_fn() Resets the command which should be used in the container definition of this task to the default arguments
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution
set_command_fn() By default, the task will run on the Flyte platform using the pyflyte-execute command
set_resolver() By default, flytekit uses the DefaultTaskResolver to resolve the task

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):

Generates a node that encapsulates this task in a workflow definition.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

dispatch_execute()

def dispatch_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.

  • VoidPromise is returned in the case when the task itself declares no outputs.
  • Literal Map is returned when the task returns either one more outputs in the declaration. Individual outputs may be none
  • DynamicJobSpec is returned when a dynamic workflow is executed
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

execute()

def execute(
    kwargs,
):

Executes the given script by substituting the inputs and outputs and extracts the outputs from the filesystem

Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_command()

def get_command(
    settings: SerializationSettings,
):

Returns the command which should be used in the container definition for the serialized version of this task registered on a hosted Flyte platform.

Parameter Type
settings SerializationSettings

get_config()

def get_config(
    settings: SerializationSettings,
):

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type
settings SerializationSettings

get_container()

def get_container(
    settings: SerializationSettings,
):

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings SerializationSettings

get_custom()

def get_custom(
    settings: flytekit.configuration.SerializationSettings,
):

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_default_command()

def get_default_command(
    settings: SerializationSettings,
):

Returns the default pyflyte-execute command used to run this on hosted Flyte platforms.

Parameter Type
settings SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: SerializationSettings,
):

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type
settings SerializationSettings

get_image()

def get_image(
    settings: SerializationSettings,
):

Update image spec based on fast registration usage, and return string representing the image

Parameter Type
settings SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: SerializationSettings,
):

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings SerializationSettings

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
):

Returns the python type for an input variable by name.

Parameter Type
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
):

Returns the python type for the specified output variable by name.

Parameter Type
k str
v typing.Any

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

make_export_string_from_env_dict()

def make_export_string_from_env_dict(
    d: typing.Dict[str, str],
):

Utility function to convert a dictionary of desired environment variable key: value pairs into a string of

export k1=v1
export k2=v2
...
Parameter Type
d typing.Dict[str, str]

post_execute()

def post_execute(
    user_params: flytekit.core.context_manager.ExecutionParameters,
    rval: typing.Any,
):

Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op

Parameter Type
user_params flytekit.core.context_manager.ExecutionParameters
rval typing.Any

pre_execute()

def pre_execute(
    user_params: flytekit.core.context_manager.ExecutionParameters,
):

This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called

This should return either the same context of the mutated context

Parameter Type
user_params flytekit.core.context_manager.ExecutionParameters

reset_command_fn()

def reset_command_fn()

Resets the command which should be used in the container definition of this task to the default arguments. This is useful when the command line is overridden at serialization time.

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

set_command_fn()

def set_command_fn(
    get_command_fn: Optional[Callable[[SerializationSettings], List[str]]],
):

By default, the task will run on the Flyte platform using the pyflyte-execute command. However, it can be useful to update the command with which the task is serialized for specific cases like running map tasks (“pyflyte-map-execute”) or for fast-executed tasks.

Parameter Type
get_command_fn Optional[Callable[[SerializationSettings], List[str]]]

set_resolver()

def set_resolver(
    resolver: TaskResolverMixin,
):

By default, flytekit uses the DefaultTaskResolver to resolve the task. This method allows the user to set a custom task resolver. It can be useful to override the task resolver for specific cases like running tasks in the jupyter notebook.

Parameter Type
resolver TaskResolverMixin

Properties

Property Type Description
container_image
deck_fields
disable_deck
docs
enable_deck
environment
instantiated_in
interface
lhs
location
metadata
name
python_interface
resources
result
script
script_file
security_context
task_config
task_resolver
task_type
task_type_version

flytekit.extras.tasks.shell.ShellTask

def ShellTask(
    name: str,
    debug: bool,
    script: typing.Optional[str],
    script_file: typing.Optional[str],
    task_config: ~T,
    shell: str,
    inputs: typing.Optional[typing.Dict[str, typing.Type]],
    output_locs: typing.Optional[typing.List[flytekit.extras.tasks.shell.OutputLocation]],
    kwargs,
):
Parameter Type
name str
debug bool
script typing.Optional[str]
script_file typing.Optional[str]
task_config ~T
shell str
inputs typing.Optional[typing.Dict[str, typing.Type]]
output_locs typing.Optional[typing.List[flytekit.extras.tasks.shell.OutputLocation]]
kwargs **kwargs

Methods

Method Description
compile() Generates a node that encapsulates this task in a workflow definition
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition
dispatch_execute() This method translates Flyte’s Type system based input values and invokes the actual call to the executor
execute() Executes the given script by substituting the inputs and outputs and extracts the outputs from the filesystem
find_lhs() None
get_command() Returns the command which should be used in the container definition for the serialized version of this task
get_config() Returns the task config as a serializable dictionary
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary
get_default_command() Returns the default pyflyte-execute command used to run this on hosted Flyte platforms
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte
get_image() Update image spec based on fast registration usage, and return string representing the image
get_input_types() Returns the names and python types as a dictionary for the inputs of this task
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte
get_type_for_input_var() Returns the python type for an input variable by name
get_type_for_output_var() Returns the python type for the specified output variable by name
local_execute() This function is used only in the local execution path and is responsible for calling dispatch execute
local_execution_mode() None
post_execute() Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
pre_execute() This is the method that will be invoked directly before executing the task method and before all the inputs
reset_command_fn() Resets the command which should be used in the container definition of this task to the default arguments
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution
set_command_fn() By default, the task will run on the Flyte platform using the pyflyte-execute command
set_resolver() By default, flytekit uses the DefaultTaskResolver to resolve the task

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):

Generates a node that encapsulates this task in a workflow definition.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

dispatch_execute()

def dispatch_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.

  • VoidPromise is returned in the case when the task itself declares no outputs.
  • Literal Map is returned when the task returns either one more outputs in the declaration. Individual outputs may be none
  • DynamicJobSpec is returned when a dynamic workflow is executed
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

execute()

def execute(
    kwargs,
):

Executes the given script by substituting the inputs and outputs and extracts the outputs from the filesystem

Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_command()

def get_command(
    settings: SerializationSettings,
):

Returns the command which should be used in the container definition for the serialized version of this task registered on a hosted Flyte platform.

Parameter Type
settings SerializationSettings

get_config()

def get_config(
    settings: SerializationSettings,
):

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type
settings SerializationSettings

get_container()

def get_container(
    settings: SerializationSettings,
):

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings SerializationSettings

get_custom()

def get_custom(
    settings: flytekit.configuration.SerializationSettings,
):

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_default_command()

def get_default_command(
    settings: SerializationSettings,
):

Returns the default pyflyte-execute command used to run this on hosted Flyte platforms.

Parameter Type
settings SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: SerializationSettings,
):

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type
settings SerializationSettings

get_image()

def get_image(
    settings: SerializationSettings,
):

Update image spec based on fast registration usage, and return string representing the image

Parameter Type
settings SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: SerializationSettings,
):

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings SerializationSettings

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
):

Returns the python type for an input variable by name.

Parameter Type
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
):

Returns the python type for the specified output variable by name.

Parameter Type
k str
v typing.Any

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

post_execute()

def post_execute(
    user_params: flytekit.core.context_manager.ExecutionParameters,
    rval: typing.Any,
):

Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op

Parameter Type
user_params flytekit.core.context_manager.ExecutionParameters
rval typing.Any

pre_execute()

def pre_execute(
    user_params: flytekit.core.context_manager.ExecutionParameters,
):

This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called

This should return either the same context of the mutated context

Parameter Type
user_params flytekit.core.context_manager.ExecutionParameters

reset_command_fn()

def reset_command_fn()

Resets the command which should be used in the container definition of this task to the default arguments. This is useful when the command line is overridden at serialization time.

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

set_command_fn()

def set_command_fn(
    get_command_fn: Optional[Callable[[SerializationSettings], List[str]]],
):

By default, the task will run on the Flyte platform using the pyflyte-execute command. However, it can be useful to update the command with which the task is serialized for specific cases like running map tasks (“pyflyte-map-execute”) or for fast-executed tasks.

Parameter Type
get_command_fn Optional[Callable[[SerializationSettings], List[str]]]

set_resolver()

def set_resolver(
    resolver: TaskResolverMixin,
):

By default, flytekit uses the DefaultTaskResolver to resolve the task. This method allows the user to set a custom task resolver. It can be useful to override the task resolver for specific cases like running tasks in the jupyter notebook.

Parameter Type
resolver TaskResolverMixin

Properties

Property Type Description
container_image
deck_fields
disable_deck
docs
enable_deck
environment
instantiated_in
interface
lhs
location
metadata
name
python_interface
resources
result
script
script_file
security_context
task_config
task_resolver
task_type
task_type_version

flytekit.extras.tasks.shell.TaskPlugins

This is the TaskPlugins factory for task types that are derivative of PythonFunctionTask. Every task that the user wishes to use should be available in this factory. Usage

.. code-block:: python

TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type)

config_object_type is any class that will be passed to the plugin_object as task_config

Plugin_object_type is a derivative of PythonFunctionTask

Examples of available task plugins include different query-based plugins such as :py:class:flytekitplugins.athena.task.AthenaTask and :py:class:flytekitplugins.hive.task.HiveTask, kubeflow operators like :py:class:plugins.kfpytorch.flytekitplugins.kfpytorch.task.PyTorchFunctionTask and :py:class:plugins.kftensorflow.flytekitplugins.kftensorflow.task.TensorflowFunctionTask, and generic plugins like :py:class:flytekitplugins.pod.task.PodFunctionTask which doesn’t integrate with third party tools or services.

The task_config is different for every task plugin type. This is filled out by users when they define a task to specify plugin-specific behavior and features. For example, with a query type task plugin, the config might store information related to which database to query.

The plugin_object_type can be used to customize execution behavior and task serialization properties in tandem with the task_config.

Methods

Method Description
find_pythontask_plugin() Returns a PluginObjectType if found or returns the base PythonFunctionTask
register_pythontask_plugin() Use this method to register a new plugin into Flytekit

find_pythontask_plugin()

def find_pythontask_plugin(
    plugin_config_type: type,
):

Returns a PluginObjectType if found or returns the base PythonFunctionTask

Parameter Type
plugin_config_type type

register_pythontask_plugin()

def register_pythontask_plugin(
    plugin_config_type: type,
    plugin: Type[PythonFunctionTask],
):

Use this method to register a new plugin into Flytekit. Usage ::

.. code-block:: python

TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type)

config_object_type is any class that will be passed to the plugin_object as task_config

Plugin_object_type is a derivative of PythonFunctionTask

Parameter Type
plugin_config_type type
plugin Type[PythonFunctionTask]