flytekit.extras.tasks.shell
Directory
Classes
Class | Description |
---|---|
AttrDict |
Convert a dictionary to an attribute style lookup. |
ExecutionParameters |
This is a run-time user-centric context object that is accessible to every @task method. |
FlyteDirectory |
None. |
FlyteFile |
None. |
Interface |
A Python native interface object, like inspect. |
OutputLocation |
. |
ProcessResult |
Stores a process return code, standard output and standard error. |
PythonInstanceTask |
This class should be used as the base class for all Tasks that do not have a user defined function body, but have. |
RawShellTask |
None. |
ShellTask |
None. |
TaskPlugins |
This is the TaskPlugins factory for task types that are derivative of PythonFunctionTask. |
Errors
flytekit.extras.tasks.shell.AttrDict
Convert a dictionary to an attribute style lookup. Do not use this in regular places, this is used for namespacing inputs and outputs
def AttrDict(
args,
kwargs,
):
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
flytekit.extras.tasks.shell.ExecutionParameters
This is a run-time user-centric context object that is accessible to every @task method. It can be accessed using
.. code-block:: python
flytekit.current_context()
This object provides the following
- a statsd handler
- a logging handler
- the execution ID as an :py:class:
flytekit.models.core.identifier.WorkflowExecutionIdentifier
object - a working directory for the user to write arbitrary files to
Please do not confuse this object with the :py:class:flytekit.FlyteContext
object.
def ExecutionParameters(
execution_date,
tmp_dir,
stats,
execution_id: typing.Optional[_identifier.WorkflowExecutionIdentifier],
logging,
raw_output_prefix,
output_metadata_prefix,
checkpoint,
decks,
task_id: typing.Optional[_identifier.Identifier],
enable_deck: bool,
kwargs,
):
Parameter | Type |
---|---|
execution_date |
|
tmp_dir |
|
stats |
|
execution_id |
typing.Optional[_identifier.WorkflowExecutionIdentifier] |
logging |
|
raw_output_prefix |
|
output_metadata_prefix |
|
checkpoint |
|
decks |
|
task_id |
typing.Optional[_identifier.Identifier] |
enable_deck |
bool |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
builder() |
None |
get() |
Returns task specific context if present else raise an error |
has_attr() |
None |
new_builder() |
None |
with_enable_deck() |
None |
with_task_sandbox() |
None |
builder()
def builder()
get()
def get(
key: str,
):
Returns task specific context if present else raise an error. The returned context will match the key
Parameter | Type |
---|---|
key |
str |
has_attr()
def has_attr(
attr_name: str,
):
Parameter | Type |
---|---|
attr_name |
str |
new_builder()
def new_builder(
current: Optional[ExecutionParameters],
):
Parameter | Type |
---|---|
current |
Optional[ExecutionParameters] |
with_enable_deck()
def with_enable_deck(
enable_deck: bool,
):
Parameter | Type |
---|---|
enable_deck |
bool |
with_task_sandbox()
def with_task_sandbox()
Properties
Property | Type | Description |
---|---|---|
checkpoint | ||
decks | ||
default_deck | ||
enable_deck | ||
execution_date | ||
execution_id | ||
logging | ||
output_metadata_prefix | ||
raw_output_prefix | ||
secrets | ||
stats | ||
task_id | ||
timeline_deck | ||
working_directory |
flytekit.extras.tasks.shell.FlyteDirectory
def FlyteDirectory(
path: typing.Union[str, os.PathLike],
downloader: typing.Optional[typing.Callable],
remote_directory: typing.Optional[typing.Union[os.PathLike, str, typing.Literal[False]]],
):
Parameter | Type |
---|---|
path |
typing.Union[str, os.PathLike] |
downloader |
typing.Optional[typing.Callable] |
remote_directory |
typing.Optional[typing.Union[os.PathLike, str, typing.Literal[False]]] |
Methods
Method | Description |
---|---|
crawl() |
Crawl returns a generator of all files prefixed by any sub-folders under the given “FlyteDirectory” |
deserialize_flyte_dir() |
None |
download() |
None |
extension() |
None |
from_dict() |
None |
from_json() |
None |
from_source() |
Create a new FlyteDirectory object with the remote source set to the input |
listdir() |
This function will list all files and folders in the given directory, but without downloading the contents |
new() |
Create a new FlyteDirectory object in current Flyte working directory |
new_dir() |
This will create a new folder under the current folder |
new_file() |
This will create a new file under the current folder |
new_remote() |
Create a new FlyteDirectory object using the currently configured default remote in the context (i |
schema() |
None |
serialize_flyte_dir() |
None |
to_dict() |
None |
to_json() |
None |
crawl()
def crawl(
maxdepth: typing.Optional[int],
topdown: bool,
kwargs,
):
Crawl returns a generator of all files prefixed by any sub-folders under the given “FlyteDirectory”. if details=True is passed, then it will return a dictionary as specified by fsspec.
Example:
list(fd.crawl()) [("/base", “file1”), ("/base", “dir1/file1”), ("/base", “dir2/file1”), ("/base", “dir1/dir/file1”)]
list(x.crawl(detail=True)) [(’/tmp/test’, {‘my-dir/ab.py’: {’name’: ‘/tmp/test/my-dir/ab.py’, ‘size’: 0, ’type’: ‘file’, ‘created’: 1677720780.2318847, ‘islink’: False, ‘mode’: 33188, ‘uid’: 501, ‘gid’: 0, ‘mtime’: 1677720780.2317934, ‘ino’: 1694329, ’nlink’: 1}})]
Parameter | Type |
---|---|
maxdepth |
typing.Optional[int] |
topdown |
bool |
kwargs |
**kwargs |
deserialize_flyte_dir()
def deserialize_flyte_dir(
args,
kwargs,
):
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
download()
def download()
extension()
def extension()
from_dict()
def from_dict(
kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
infer_missing,
):
Parameter | Type |
---|---|
kvs |
typing.Union[dict, list, str, int, float, bool, NoneType] |
infer_missing |
from_json()
def from_json(
s: typing.Union[str, bytes, bytearray],
parse_float,
parse_int,
parse_constant,
infer_missing,
kw,
):
Parameter | Type |
---|---|
s |
typing.Union[str, bytes, bytearray] |
parse_float |
|
parse_int |
|
parse_constant |
|
infer_missing |
|
kw |
from_source()
def from_source(
source: str | os.PathLike,
):
Create a new FlyteDirectory object with the remote source set to the input
Parameter | Type |
---|---|
source |
`str |
listdir()
def listdir(
directory: FlyteDirectory,
):
This function will list all files and folders in the given directory, but without downloading the contents. In addition, it will return a list of FlyteFile and FlyteDirectory objects that have ability to lazily download the contents of the file/folder. For example:
.. code-block:: python
entity = FlyteDirectory.listdir(directory) for e in entity: print(“s3 object:”, e.remote_source)
s3 object: s3://test-flytedir/file1.txt
s3 object: s3://test-flytedir/file2.txt
s3 object: s3://test-flytedir/sub_dir
open(entity[0], “r”) # This will download the file to the local disk. open(entity[0], “r”) # flytekit will read data from the local disk if you open it again.
Parameter | Type |
---|---|
directory |
FlyteDirectory |
new()
def new(
dirname: str | os.PathLike,
):
Create a new FlyteDirectory object in current Flyte working directory.
Parameter | Type |
---|---|
dirname |
`str |
new_dir()
def new_dir(
name: typing.Optional[str],
):
This will create a new folder under the current folder. If given a name, it will use the name given, otherwise it’ll pick a random string. Collisions are not checked.
Parameter | Type |
---|---|
name |
typing.Optional[str] |
new_file()
def new_file(
name: typing.Optional[str],
):
This will create a new file under the current folder. If given a name, it will use the name given, otherwise it’ll pick a random string. Collisions are not checked.
Parameter | Type |
---|---|
name |
typing.Optional[str] |
new_remote()
def new_remote(
stem: typing.Optional[str],
alt: typing.Optional[str],
):
Create a new FlyteDirectory object using the currently configured default remote in the context (i.e. the raw_output_prefix configured in the current FileAccessProvider object in the context). This is used if you explicitly have a folder somewhere that you want to create files under. If you want to write a whole folder, you can let your task return a FlyteDirectory object, and let flytekit handle the uploading.
Parameter | Type |
---|---|
stem |
typing.Optional[str] |
alt |
typing.Optional[str] |
schema()
def schema(
infer_missing: bool,
only,
exclude,
many: bool,
context,
load_only,
dump_only,
partial: bool,
unknown,
):
Parameter | Type |
---|---|
infer_missing |
bool |
only |
|
exclude |
|
many |
bool |
context |
|
load_only |
|
dump_only |
|
partial |
bool |
unknown |
serialize_flyte_dir()
def serialize_flyte_dir(
args,
kwargs,
):
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
to_dict()
def to_dict(
encode_json,
):
Parameter | Type |
---|---|
encode_json |
to_json()
def to_json(
skipkeys: bool,
ensure_ascii: bool,
check_circular: bool,
allow_nan: bool,
indent: typing.Union[int, str, NoneType],
separators: typing.Tuple[str, str],
default: typing.Callable,
sort_keys: bool,
kw,
):
Parameter | Type |
---|---|
skipkeys |
bool |
ensure_ascii |
bool |
check_circular |
bool |
allow_nan |
bool |
indent |
typing.Union[int, str, NoneType] |
separators |
typing.Tuple[str, str] |
default |
typing.Callable |
sort_keys |
bool |
kw |
Properties
Property | Type | Description |
---|---|---|
downloaded | ||
remote_directory | ||
remote_source | ||
sep |
flytekit.extras.tasks.shell.FlyteFile
def FlyteFile(
path: typing.Union[str, os.PathLike],
downloader: typing.Callable,
remote_path: typing.Optional[typing.Union[os.PathLike, str, bool]],
metadata: typing.Optional[dict[str, str]],
):
FlyteFile’s init method.
Parameter | Type |
---|---|
path |
typing.Union[str, os.PathLike] |
downloader |
typing.Callable |
remote_path |
typing.Optional[typing.Union[os.PathLike, str, bool]] |
metadata |
typing.Optional[dict[str, str]] |
Methods
Method | Description |
---|---|
deserialize_flyte_file() |
None |
download() |
None |
extension() |
None |
from_dict() |
None |
from_json() |
None |
from_source() |
Create a new FlyteFile object with the remote source set to the input |
new() |
Create a new FlyteFile object in the current Flyte working directory |
new_remote_file() |
Create a new FlyteFile object with a remote path |
open() |
Returns a streaming File handle |
serialize_flyte_file() |
None |
to_dict() |
None |
to_json() |
None |
deserialize_flyte_file()
def deserialize_flyte_file(
args,
kwargs,
):
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
download()
def download()
extension()
def extension()
from_dict()
def from_dict(
d,
dialect,
):
Parameter | Type |
---|---|
d |
|
dialect |
from_json()
def from_json(
data: typing.Union[str, bytes, bytearray],
decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
from_dict_kwargs: typing.Any,
):
Parameter | Type |
---|---|
data |
typing.Union[str, bytes, bytearray] |
decoder |
collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]] |
from_dict_kwargs |
typing.Any |
from_source()
def from_source(
source: str | os.PathLike,
):
Create a new FlyteFile object with the remote source set to the input
Parameter | Type |
---|---|
source |
`str |
new()
def new(
filename: str | os.PathLike,
):
Create a new FlyteFile object in the current Flyte working directory
Parameter | Type |
---|---|
filename |
`str |
new_remote_file()
def new_remote_file(
name: typing.Optional[str],
alt: typing.Optional[str],
):
Create a new FlyteFile object with a remote path.
Parameter | Type |
---|---|
name |
typing.Optional[str] |
alt |
typing.Optional[str] |
open()
def open(
mode: str,
cache_type: typing.Optional[str],
cache_options: typing.Optional[typing.Dict[str, typing.Any]],
):
Returns a streaming File handle
.. code-block:: python
@task def copy_file(ff: FlyteFile) -> FlyteFile: new_file = FlyteFile.new_remote_file() with ff.open(“rb”, cache_type=“readahead”) as r: with new_file.open(“wb”) as w: w.write(r.read()) return new_file
Parameter | Type |
---|---|
mode |
str |
cache_type |
typing.Optional[str] |
cache_options |
typing.Optional[typing.Dict[str, typing.Any]] |
serialize_flyte_file()
def serialize_flyte_file(
args,
kwargs,
):
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
to_dict()
def to_dict()
to_json()
def to_json(
encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
to_dict_kwargs: typing.Any,
):
Parameter | Type |
---|---|
encoder |
collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]] |
to_dict_kwargs |
typing.Any |
Properties
Property | Type | Description |
---|---|---|
downloaded | ||
remote_path | ||
remote_source |
flytekit.extras.tasks.shell.FlyteRecoverableException
Common base class for all non-exit exceptions.
def FlyteRecoverableException(
args,
timestamp: typing.Optional[float],
):
Parameter | Type |
---|---|
args |
*args |
timestamp |
typing.Optional[float] |
Properties
Property | Type | Description |
---|---|---|
timestamp |
flytekit.extras.tasks.shell.Interface
A Python native interface object, like inspect.signature but simpler.
def Interface(
inputs: Union[Optional[Dict[str, Type]], Optional[Dict[str, Tuple[Type, Any]]]],
outputs: Union[Optional[Dict[str, Type]], Optional[Dict[str, Optional[Type]]]],
output_tuple_name: Optional[str],
docstring: Optional[Docstring],
):
Parameter | Type |
---|---|
inputs |
Union[Optional[Dict[str, Type]], Optional[Dict[str, Tuple[Type, Any]]]] |
outputs |
Union[Optional[Dict[str, Type]], Optional[Dict[str, Optional[Type]]]] |
output_tuple_name |
Optional[str] |
docstring |
Optional[Docstring] |
Methods
Method | Description |
---|---|
remove_inputs() |
This method is useful in removing some variables from the Flyte backend inputs specification, as these are |
with_inputs() |
Use this to add additional inputs to the interface |
with_outputs() |
This method allows addition of extra outputs are expected from a task specification |
remove_inputs()
def remove_inputs(
vars: Optional[List[str]],
):
This method is useful in removing some variables from the Flyte backend inputs specification, as these are implicit local only inputs or will be supplied by the library at runtime. For example, spark-session etc It creates a new instance of interface with the requested variables removed
Parameter | Type |
---|---|
vars |
Optional[List[str]] |
with_inputs()
def with_inputs(
extra_inputs: Dict[str, Type],
):
Use this to add additional inputs to the interface. This is useful for adding additional implicit inputs that are added without the user requesting for them
Parameter | Type |
---|---|
extra_inputs |
Dict[str, Type] |
with_outputs()
def with_outputs(
extra_outputs: Dict[str, Type],
):
This method allows addition of extra outputs are expected from a task specification
Parameter | Type |
---|---|
extra_outputs |
Dict[str, Type] |
Properties
Property | Type | Description |
---|---|---|
default_inputs_as_kwargs | ||
docstring | ||
inputs | ||
inputs_with_defaults | ||
output_names | ||
output_tuple | ||
output_tuple_name | ||
outputs |
flytekit.extras.tasks.shell.OutputLocation
def OutputLocation(
var: str,
var_type: typing.Type,
location: typing.Union[os.PathLike, str],
):
Parameter | Type |
---|---|
var |
str |
var_type |
typing.Type |
location |
typing.Union[os.PathLike, str] |
flytekit.extras.tasks.shell.ProcessResult
Stores a process return code, standard output and standard error.
def ProcessResult(
returncode: int,
output: str,
error: str,
):
Parameter | Type |
---|---|
returncode |
int |
output |
str |
error |
str |
flytekit.extras.tasks.shell.PythonInstanceTask
This class should be used as the base class for all Tasks that do not have a user defined function body, but have a platform defined execute method. (Execute needs to be overridden). This base class ensures that the module loader will invoke the right class automatically, by capturing the module name and variable in the module name.
.. code-block: python
x = MyInstanceTask(name=“x”, …..)
this can be invoked as
x(a=5) # depending on the interface of the defined task
def PythonInstanceTask(
name: str,
task_config: T,
task_type: str,
task_resolver: Optional[TaskResolverMixin],
kwargs,
):
Please see class level documentation.
Parameter | Type |
---|---|
name |
str |
task_config |
T |
task_type |
str |
task_resolver |
Optional[TaskResolverMixin] |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
compile() |
Generates a node that encapsulates this task in a workflow definition |
construct_node_metadata() |
Used when constructing the node that encapsulates this task as part of a broader workflow definition |
dispatch_execute() |
This method translates Flyte’s Type system based input values and invokes the actual call to the executor |
execute() |
This method will be invoked to execute the task |
find_lhs() |
None |
get_command() |
Returns the command which should be used in the container definition for the serialized version of this task |
get_config() |
Returns the task config as a serializable dictionary |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary |
get_default_command() |
Returns the default pyflyte-execute command used to run this on hosted Flyte platforms |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte |
get_image() |
Update image spec based on fast registration usage, and return string representing the image |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte |
get_type_for_input_var() |
Returns the python type for an input variable by name |
get_type_for_output_var() |
Returns the python type for the specified output variable by name |
local_execute() |
This function is used only in the local execution path and is responsible for calling dispatch execute |
local_execution_mode() |
None |
post_execute() |
Post execute is called after the execution has completed, with the user_params and can be used to clean-up, |
pre_execute() |
This is the method that will be invoked directly before executing the task method and before all the inputs |
reset_command_fn() |
Resets the command which should be used in the container definition of this task to the default arguments |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution |
set_command_fn() |
By default, the task will run on the Flyte platform using the pyflyte-execute command |
set_resolver() |
By default, flytekit uses the DefaultTaskResolver to resolve the task |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Generates a node that encapsulates this task in a workflow definition.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.
VoidPromise
is returned in the case when the task itself declares no outputs.Literal Map
is returned when the task returns either one more outputs in the declaration. Individual outputs may be noneDynamicJobSpec
is returned when a dynamic workflow is executed
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
def execute(
kwargs,
):
This method will be invoked to execute the task.
Parameter | Type |
---|---|
kwargs |
**kwargs |
find_lhs()
def find_lhs()
get_command()
def get_command(
settings: SerializationSettings,
):
Returns the command which should be used in the container definition for the serialized version of this task registered on a hosted Flyte platform.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_config()
def get_config(
settings: SerializationSettings,
):
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_container()
def get_container(
settings: SerializationSettings,
):
Returns the container definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
):
Return additional plugin-specific custom data (if any) as a serializable dictionary.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_default_command()
def get_default_command(
settings: SerializationSettings,
):
Returns the default pyflyte-execute command used to run this on hosted Flyte platforms.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: SerializationSettings,
):
Returns the extended resources to allocate to the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_image()
def get_image(
settings: SerializationSettings,
):
Update image spec based on fast registration usage, and return string representing the image
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_input_types()
def get_input_types()
Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: SerializationSettings,
):
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
):
Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_type_for_input_var()
def get_type_for_input_var(
k: str,
v: typing.Any,
):
Returns the python type for an input variable by name.
Parameter | Type |
---|---|
k |
str |
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
):
Returns the python type for the specified output variable by name.
Parameter | Type |
---|---|
k |
str |
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
post_execute()
def post_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
rval: typing.Any,
):
Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
Parameter | Type |
---|---|
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):
This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called
This should return either the same context of the mutated context
Parameter | Type |
---|---|
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
reset_command_fn()
def reset_command_fn()
Resets the command which should be used in the container definition of this task to the default arguments. This is useful when the command line is overridden at serialization time.
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
set_command_fn()
def set_command_fn(
get_command_fn: Optional[Callable[[SerializationSettings], List[str]]],
):
By default, the task will run on the Flyte platform using the pyflyte-execute command. However, it can be useful to update the command with which the task is serialized for specific cases like running map tasks (“pyflyte-map-execute”) or for fast-executed tasks.
Parameter | Type |
---|---|
get_command_fn |
Optional[Callable[[SerializationSettings], List[str]]] |
set_resolver()
def set_resolver(
resolver: TaskResolverMixin,
):
By default, flytekit uses the DefaultTaskResolver to resolve the task. This method allows the user to set a custom task resolver. It can be useful to override the task resolver for specific cases like running tasks in the jupyter notebook.
Parameter | Type |
---|---|
resolver |
TaskResolverMixin |
Properties
Property | Type | Description |
---|---|---|
container_image | ||
deck_fields | ||
disable_deck | ||
docs | ||
enable_deck | ||
environment | ||
instantiated_in | ||
interface | ||
lhs | ||
location | ||
metadata | ||
name | ||
python_interface | ||
resources | ||
security_context | ||
task_config | ||
task_resolver | ||
task_type | ||
task_type_version |
flytekit.extras.tasks.shell.RawShellTask
def RawShellTask(
name: str,
debug: bool,
script: typing.Optional[str],
script_file: typing.Optional[str],
task_config: ~T,
inputs: typing.Optional[typing.Dict[str, typing.Type]],
output_locs: typing.Optional[typing.List[flytekit.extras.tasks.shell.OutputLocation]],
kwargs,
):
The RawShellTask
is a minimal extension of the existing ShellTask
. It’s purpose is to support wrapping a
“raw” or “pure” shell script which needs to be executed with some environment variables set, and some arguments,
which may not be known until execution time.
This class is not meant to be instantiated into tasks by users, but used with the factory function
get_raw_shell_task()
. An instance of this class will be returned with either user-specified or default
template. The template itself will export the desired environment variables, and subsequently execute the
desired “raw” script with the specified arguments.
.. note:: This means that within your workflow, you can dynamically control the env variables, arguments, and even the actual script you want to run.
.. note:: The downside is that a dynamic workflow will be required. The “raw” script passed in at execution time must be at the specified location.
These args are forwarded directly to the parent ShellTask
constructor as behavior does not diverge
Parameter | Type |
---|---|
name |
str |
debug |
bool |
script |
typing.Optional[str] |
script_file |
typing.Optional[str] |
task_config |
~T |
inputs |
typing.Optional[typing.Dict[str, typing.Type]] |
output_locs |
typing.Optional[typing.List[flytekit.extras.tasks.shell.OutputLocation]] |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
compile() |
Generates a node that encapsulates this task in a workflow definition |
construct_node_metadata() |
Used when constructing the node that encapsulates this task as part of a broader workflow definition |
dispatch_execute() |
This method translates Flyte’s Type system based input values and invokes the actual call to the executor |
execute() |
Executes the given script by substituting the inputs and outputs and extracts the outputs from the filesystem |
find_lhs() |
None |
get_command() |
Returns the command which should be used in the container definition for the serialized version of this task |
get_config() |
Returns the task config as a serializable dictionary |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary |
get_default_command() |
Returns the default pyflyte-execute command used to run this on hosted Flyte platforms |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte |
get_image() |
Update image spec based on fast registration usage, and return string representing the image |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte |
get_type_for_input_var() |
Returns the python type for an input variable by name |
get_type_for_output_var() |
Returns the python type for the specified output variable by name |
local_execute() |
This function is used only in the local execution path and is responsible for calling dispatch execute |
local_execution_mode() |
None |
make_export_string_from_env_dict() |
Utility function to convert a dictionary of desired environment variable key: value pairs into a string of |
post_execute() |
Post execute is called after the execution has completed, with the user_params and can be used to clean-up, |
pre_execute() |
This is the method that will be invoked directly before executing the task method and before all the inputs |
reset_command_fn() |
Resets the command which should be used in the container definition of this task to the default arguments |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution |
set_command_fn() |
By default, the task will run on the Flyte platform using the pyflyte-execute command |
set_resolver() |
By default, flytekit uses the DefaultTaskResolver to resolve the task |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Generates a node that encapsulates this task in a workflow definition.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.
VoidPromise
is returned in the case when the task itself declares no outputs.Literal Map
is returned when the task returns either one more outputs in the declaration. Individual outputs may be noneDynamicJobSpec
is returned when a dynamic workflow is executed
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
def execute(
kwargs,
):
Executes the given script by substituting the inputs and outputs and extracts the outputs from the filesystem
Parameter | Type |
---|---|
kwargs |
**kwargs |
find_lhs()
def find_lhs()
get_command()
def get_command(
settings: SerializationSettings,
):
Returns the command which should be used in the container definition for the serialized version of this task registered on a hosted Flyte platform.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_config()
def get_config(
settings: SerializationSettings,
):
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_container()
def get_container(
settings: SerializationSettings,
):
Returns the container definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
):
Return additional plugin-specific custom data (if any) as a serializable dictionary.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_default_command()
def get_default_command(
settings: SerializationSettings,
):
Returns the default pyflyte-execute command used to run this on hosted Flyte platforms.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: SerializationSettings,
):
Returns the extended resources to allocate to the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_image()
def get_image(
settings: SerializationSettings,
):
Update image spec based on fast registration usage, and return string representing the image
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_input_types()
def get_input_types()
Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: SerializationSettings,
):
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
):
Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_type_for_input_var()
def get_type_for_input_var(
k: str,
v: typing.Any,
):
Returns the python type for an input variable by name.
Parameter | Type |
---|---|
k |
str |
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
):
Returns the python type for the specified output variable by name.
Parameter | Type |
---|---|
k |
str |
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
make_export_string_from_env_dict()
def make_export_string_from_env_dict(
d: typing.Dict[str, str],
):
Utility function to convert a dictionary of desired environment variable key: value pairs into a string of
export k1=v1
export k2=v2
...
Parameter | Type |
---|---|
d |
typing.Dict[str, str] |
post_execute()
def post_execute(
user_params: flytekit.core.context_manager.ExecutionParameters,
rval: typing.Any,
):
Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
Parameter | Type |
---|---|
user_params |
flytekit.core.context_manager.ExecutionParameters |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: flytekit.core.context_manager.ExecutionParameters,
):
This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called
This should return either the same context of the mutated context
Parameter | Type |
---|---|
user_params |
flytekit.core.context_manager.ExecutionParameters |
reset_command_fn()
def reset_command_fn()
Resets the command which should be used in the container definition of this task to the default arguments. This is useful when the command line is overridden at serialization time.
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
set_command_fn()
def set_command_fn(
get_command_fn: Optional[Callable[[SerializationSettings], List[str]]],
):
By default, the task will run on the Flyte platform using the pyflyte-execute command. However, it can be useful to update the command with which the task is serialized for specific cases like running map tasks (“pyflyte-map-execute”) or for fast-executed tasks.
Parameter | Type |
---|---|
get_command_fn |
Optional[Callable[[SerializationSettings], List[str]]] |
set_resolver()
def set_resolver(
resolver: TaskResolverMixin,
):
By default, flytekit uses the DefaultTaskResolver to resolve the task. This method allows the user to set a custom task resolver. It can be useful to override the task resolver for specific cases like running tasks in the jupyter notebook.
Parameter | Type |
---|---|
resolver |
TaskResolverMixin |
Properties
Property | Type | Description |
---|---|---|
container_image | ||
deck_fields | ||
disable_deck | ||
docs | ||
enable_deck | ||
environment | ||
instantiated_in | ||
interface | ||
lhs | ||
location | ||
metadata | ||
name | ||
python_interface | ||
resources | ||
result | ||
script | ||
script_file | ||
security_context | ||
task_config | ||
task_resolver | ||
task_type | ||
task_type_version |
flytekit.extras.tasks.shell.ShellTask
def ShellTask(
name: str,
debug: bool,
script: typing.Optional[str],
script_file: typing.Optional[str],
task_config: ~T,
shell: str,
inputs: typing.Optional[typing.Dict[str, typing.Type]],
output_locs: typing.Optional[typing.List[flytekit.extras.tasks.shell.OutputLocation]],
kwargs,
):
Parameter | Type |
---|---|
name |
str |
debug |
bool |
script |
typing.Optional[str] |
script_file |
typing.Optional[str] |
task_config |
~T |
shell |
str |
inputs |
typing.Optional[typing.Dict[str, typing.Type]] |
output_locs |
typing.Optional[typing.List[flytekit.extras.tasks.shell.OutputLocation]] |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
compile() |
Generates a node that encapsulates this task in a workflow definition |
construct_node_metadata() |
Used when constructing the node that encapsulates this task as part of a broader workflow definition |
dispatch_execute() |
This method translates Flyte’s Type system based input values and invokes the actual call to the executor |
execute() |
Executes the given script by substituting the inputs and outputs and extracts the outputs from the filesystem |
find_lhs() |
None |
get_command() |
Returns the command which should be used in the container definition for the serialized version of this task |
get_config() |
Returns the task config as a serializable dictionary |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary |
get_default_command() |
Returns the default pyflyte-execute command used to run this on hosted Flyte platforms |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte |
get_image() |
Update image spec based on fast registration usage, and return string representing the image |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte |
get_type_for_input_var() |
Returns the python type for an input variable by name |
get_type_for_output_var() |
Returns the python type for the specified output variable by name |
local_execute() |
This function is used only in the local execution path and is responsible for calling dispatch execute |
local_execution_mode() |
None |
post_execute() |
Post execute is called after the execution has completed, with the user_params and can be used to clean-up, |
pre_execute() |
This is the method that will be invoked directly before executing the task method and before all the inputs |
reset_command_fn() |
Resets the command which should be used in the container definition of this task to the default arguments |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution |
set_command_fn() |
By default, the task will run on the Flyte platform using the pyflyte-execute command |
set_resolver() |
By default, flytekit uses the DefaultTaskResolver to resolve the task |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Generates a node that encapsulates this task in a workflow definition.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.
VoidPromise
is returned in the case when the task itself declares no outputs.Literal Map
is returned when the task returns either one more outputs in the declaration. Individual outputs may be noneDynamicJobSpec
is returned when a dynamic workflow is executed
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
def execute(
kwargs,
):
Executes the given script by substituting the inputs and outputs and extracts the outputs from the filesystem
Parameter | Type |
---|---|
kwargs |
**kwargs |
find_lhs()
def find_lhs()
get_command()
def get_command(
settings: SerializationSettings,
):
Returns the command which should be used in the container definition for the serialized version of this task registered on a hosted Flyte platform.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_config()
def get_config(
settings: SerializationSettings,
):
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_container()
def get_container(
settings: SerializationSettings,
):
Returns the container definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
):
Return additional plugin-specific custom data (if any) as a serializable dictionary.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_default_command()
def get_default_command(
settings: SerializationSettings,
):
Returns the default pyflyte-execute command used to run this on hosted Flyte platforms.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: SerializationSettings,
):
Returns the extended resources to allocate to the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_image()
def get_image(
settings: SerializationSettings,
):
Update image spec based on fast registration usage, and return string representing the image
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_input_types()
def get_input_types()
Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: SerializationSettings,
):
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
):
Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
Parameter | Type |
---|---|
settings |
flytekit.configuration.SerializationSettings |
get_type_for_input_var()
def get_type_for_input_var(
k: str,
v: typing.Any,
):
Returns the python type for an input variable by name.
Parameter | Type |
---|---|
k |
str |
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
):
Returns the python type for the specified output variable by name.
Parameter | Type |
---|---|
k |
str |
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
post_execute()
def post_execute(
user_params: flytekit.core.context_manager.ExecutionParameters,
rval: typing.Any,
):
Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
Parameter | Type |
---|---|
user_params |
flytekit.core.context_manager.ExecutionParameters |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: flytekit.core.context_manager.ExecutionParameters,
):
This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called
This should return either the same context of the mutated context
Parameter | Type |
---|---|
user_params |
flytekit.core.context_manager.ExecutionParameters |
reset_command_fn()
def reset_command_fn()
Resets the command which should be used in the container definition of this task to the default arguments. This is useful when the command line is overridden at serialization time.
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
set_command_fn()
def set_command_fn(
get_command_fn: Optional[Callable[[SerializationSettings], List[str]]],
):
By default, the task will run on the Flyte platform using the pyflyte-execute command. However, it can be useful to update the command with which the task is serialized for specific cases like running map tasks (“pyflyte-map-execute”) or for fast-executed tasks.
Parameter | Type |
---|---|
get_command_fn |
Optional[Callable[[SerializationSettings], List[str]]] |
set_resolver()
def set_resolver(
resolver: TaskResolverMixin,
):
By default, flytekit uses the DefaultTaskResolver to resolve the task. This method allows the user to set a custom task resolver. It can be useful to override the task resolver for specific cases like running tasks in the jupyter notebook.
Parameter | Type |
---|---|
resolver |
TaskResolverMixin |
Properties
Property | Type | Description |
---|---|---|
container_image | ||
deck_fields | ||
disable_deck | ||
docs | ||
enable_deck | ||
environment | ||
instantiated_in | ||
interface | ||
lhs | ||
location | ||
metadata | ||
name | ||
python_interface | ||
resources | ||
result | ||
script | ||
script_file | ||
security_context | ||
task_config | ||
task_resolver | ||
task_type | ||
task_type_version |
flytekit.extras.tasks.shell.TaskPlugins
This is the TaskPlugins factory for task types that are derivative of PythonFunctionTask. Every task that the user wishes to use should be available in this factory. Usage
.. code-block:: python
TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type)
config_object_type is any class that will be passed to the plugin_object as task_config
Plugin_object_type is a derivative of PythonFunctionTask
Examples of available task plugins include different query-based plugins such as
:py:class:flytekitplugins.athena.task.AthenaTask
and :py:class:flytekitplugins.hive.task.HiveTask
, kubeflow
operators like :py:class:plugins.kfpytorch.flytekitplugins.kfpytorch.task.PyTorchFunctionTask
and
:py:class:plugins.kftensorflow.flytekitplugins.kftensorflow.task.TensorflowFunctionTask
, and generic plugins like
:py:class:flytekitplugins.pod.task.PodFunctionTask
which doesn’t integrate with third party tools or services.
The task_config
is different for every task plugin type. This is filled out by users when they define a task to
specify plugin-specific behavior and features. For example, with a query type task plugin, the config might store
information related to which database to query.
The plugin_object_type
can be used to customize execution behavior and task serialization properties in tandem
with the task_config
.
Methods
Method | Description |
---|---|
find_pythontask_plugin() |
Returns a PluginObjectType if found or returns the base PythonFunctionTask |
register_pythontask_plugin() |
Use this method to register a new plugin into Flytekit |
find_pythontask_plugin()
def find_pythontask_plugin(
plugin_config_type: type,
):
Returns a PluginObjectType if found or returns the base PythonFunctionTask
Parameter | Type |
---|---|
plugin_config_type |
type |
register_pythontask_plugin()
def register_pythontask_plugin(
plugin_config_type: type,
plugin: Type[PythonFunctionTask],
):
Use this method to register a new plugin into Flytekit. Usage ::
.. code-block:: python
TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type)
config_object_type is any class that will be passed to the plugin_object as task_config
Plugin_object_type is a derivative of PythonFunctionTask
Parameter | Type |
---|---|
plugin_config_type |
type |
plugin |
Type[PythonFunctionTask] |