flytekit.bin.entrypoint
Directory
Classes
Class |
Description |
ExecutionParameters |
This is a run-time user-centric context object that is accessible to every @task method. |
ExecutionState |
This is the context that is active when executing a task or a local workflow. |
FastSerializationSettings |
This object hold information about settings necessary to serialize an object so that it can be fast-registered. |
FileAccessProvider |
This is the class that is available through the FlyteContext and can be used for persisting data to the remote. |
FlyteContext |
This is an internal-facing context object, that most users will not have to deal with. |
FlyteContextManager |
FlyteContextManager manages the execution context within Flytekit. |
ImageConfig |
We recommend you to use ImageConfig. |
OutputMetadataTracker |
This class is for the users to set arbitrary metadata on output literals. |
PythonTask |
Base Class for all Tasks with a Python native Interface . |
SerializationSettings |
These settings are provided while serializing a workflow and task, before registration. |
StatsConfig |
Configuration for sending statsd. |
SyncCheckpoint |
This class is NOT THREAD-SAFE!. |
Timestamp |
A ProtocolMessage. |
VoidPromise |
This object is returned for tasks that do not return any outputs (declared interface is empty). |
Errors
flytekit.bin.entrypoint.ExecutionParameters
This is a run-time user-centric context object that is accessible to every @task method. It can be accessed using
.. code-block:: python
flytekit.current_context()
This object provides the following
- a statsd handler
- a logging handler
- the execution ID as an :py:class:
flytekit.models.core.identifier.WorkflowExecutionIdentifier
object
- a working directory for the user to write arbitrary files to
Please do not confuse this object with the :py:class:flytekit.FlyteContext
object.
def ExecutionParameters(
execution_date,
tmp_dir,
stats,
execution_id: typing.Optional[_identifier.WorkflowExecutionIdentifier],
logging,
raw_output_prefix,
output_metadata_prefix,
checkpoint,
decks,
task_id: typing.Optional[_identifier.Identifier],
enable_deck: bool,
kwargs,
):
Parameter |
Type |
execution_date |
|
tmp_dir |
|
stats |
|
execution_id |
typing.Optional[_identifier.WorkflowExecutionIdentifier] |
logging |
|
raw_output_prefix |
|
output_metadata_prefix |
|
checkpoint |
|
decks |
|
task_id |
typing.Optional[_identifier.Identifier] |
enable_deck |
bool |
kwargs |
**kwargs |
Methods
builder()
get()
Returns task specific context if present else raise an error. The returned context will match the key
has_attr()
def has_attr(
attr_name: str,
):
Parameter |
Type |
attr_name |
str |
new_builder()
def new_builder(
current: Optional[ExecutionParameters],
):
Parameter |
Type |
current |
Optional[ExecutionParameters] |
with_enable_deck()
def with_enable_deck(
enable_deck: bool,
):
Parameter |
Type |
enable_deck |
bool |
with_task_sandbox()
Properties
Property |
Type |
Description |
checkpoint |
|
|
decks |
|
|
default_deck |
|
|
enable_deck |
|
|
execution_date |
|
|
execution_id |
|
|
logging |
|
|
output_metadata_prefix |
|
|
raw_output_prefix |
|
|
secrets |
|
|
stats |
|
|
task_id |
|
|
timeline_deck |
|
|
working_directory |
|
|
flytekit.bin.entrypoint.ExecutionState
This is the context that is active when executing a task or a local workflow. This carries the necessary state to
execute.
Some required things during execution deal with temporary directories, ExecutionParameters that are passed to the
user etc.
Attributes:
mode (ExecutionState.Mode): Defines the context in which the task is executed (local, hosted, etc).
working_dir (os.PathLike): Specifies the remote, external directory where inputs, outputs and other protobufs
are uploaded
engine_dir (os.PathLike):
branch_eval_mode Optional[BranchEvalMode]: Used to determine whether a branch node should execute.
user_space_params Optional[ExecutionParameters]: Provides run-time, user-centric context such as a statsd
handler, a logging handler, the current execution id and a working directory.
def ExecutionState(
working_dir: Union[os.PathLike, str],
mode: Optional[ExecutionState.Mode],
engine_dir: Optional[Union[os.PathLike, str]],
branch_eval_mode: Optional[BranchEvalMode],
user_space_params: Optional[ExecutionParameters],
):
Parameter |
Type |
working_dir |
Union[os.PathLike, str] |
mode |
Optional[ExecutionState.Mode] |
engine_dir |
Optional[Union[os.PathLike, str]] |
branch_eval_mode |
Optional[BranchEvalMode] |
user_space_params |
Optional[ExecutionParameters] |
Methods
Method |
Description |
branch_complete() |
Indicates that we are within a conditional / ifelse block and the active branch is not done |
is_local_execution() |
None |
take_branch() |
Indicates that we are within an if-else block and the current branch has evaluated to true |
with_params() |
Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values |
branch_complete()
Indicates that we are within a conditional / ifelse block and the active branch is not done.
Default to SKIPPED
is_local_execution()
take_branch()
Indicates that we are within an if-else block and the current branch has evaluated to true.
Useful only in local execution mode
with_params()
def with_params(
working_dir: Optional[os.PathLike],
mode: Optional[Mode],
engine_dir: Optional[os.PathLike],
branch_eval_mode: Optional[BranchEvalMode],
user_space_params: Optional[ExecutionParameters],
):
Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values.
Parameter |
Type |
working_dir |
Optional[os.PathLike] |
mode |
Optional[Mode] |
engine_dir |
Optional[os.PathLike] |
branch_eval_mode |
Optional[BranchEvalMode] |
user_space_params |
Optional[ExecutionParameters] |
flytekit.bin.entrypoint.FastSerializationSettings
This object hold information about settings necessary to serialize an object so that it can be fast-registered.
def FastSerializationSettings(
enabled: bool,
destination_dir: Optional[str],
distribution_location: Optional[str],
):
Parameter |
Type |
enabled |
bool |
destination_dir |
Optional[str] |
distribution_location |
Optional[str] |
Methods
from_dict()
def from_dict(
kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
infer_missing,
):
Parameter |
Type |
kvs |
typing.Union[dict, list, str, int, float, bool, NoneType] |
infer_missing |
|
from_json()
def from_json(
s: typing.Union[str, bytes, bytearray],
parse_float,
parse_int,
parse_constant,
infer_missing,
kw,
):
Parameter |
Type |
s |
typing.Union[str, bytes, bytearray] |
parse_float |
|
parse_int |
|
parse_constant |
|
infer_missing |
|
kw |
|
schema()
def schema(
infer_missing: bool,
only,
exclude,
many: bool,
context,
load_only,
dump_only,
partial: bool,
unknown,
):
Parameter |
Type |
infer_missing |
bool |
only |
|
exclude |
|
many |
bool |
context |
|
load_only |
|
dump_only |
|
partial |
bool |
unknown |
|
to_dict()
def to_dict(
encode_json,
):
Parameter |
Type |
encode_json |
|
to_json()
def to_json(
skipkeys: bool,
ensure_ascii: bool,
check_circular: bool,
allow_nan: bool,
indent: typing.Union[int, str, NoneType],
separators: typing.Tuple[str, str],
default: typing.Callable,
sort_keys: bool,
kw,
):
Parameter |
Type |
skipkeys |
bool |
ensure_ascii |
bool |
check_circular |
bool |
allow_nan |
bool |
indent |
typing.Union[int, str, NoneType] |
separators |
typing.Tuple[str, str] |
default |
typing.Callable |
sort_keys |
bool |
kw |
|
flytekit.bin.entrypoint.FileAccessProvider
This is the class that is available through the FlyteContext and can be used for persisting data to the remote
durable store.
def FileAccessProvider(
local_sandbox_dir: typing.Union[str, os.PathLike],
raw_output_prefix: str,
data_config: typing.Optional[flytekit.configuration.DataConfig],
execution_metadata: typing.Optional[dict],
):
Parameter |
Type |
local_sandbox_dir |
typing.Union[str, os.PathLike] |
raw_output_prefix |
str |
data_config |
typing.Optional[flytekit.configuration.DataConfig] |
execution_metadata |
typing.Optional[dict] |
Methods
async_get_data()
def async_get_data(
remote_path: str,
local_path: str,
is_multipart: bool,
kwargs,
):
Parameter |
Type |
remote_path |
str |
local_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
async_put_data()
def async_put_data(
local_path: typing.Union[str, os.PathLike],
remote_path: str,
is_multipart: bool,
kwargs,
):
The implication here is that we’re always going to put data to the remote location, so we .remote to ensure
we don’t use the true local proxy if the remote path is a file://
Parameter |
Type |
local_path |
typing.Union[str, os.PathLike] |
remote_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
async_put_raw_data()
def async_put_raw_data(
lpath: typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO],
upload_prefix: typing.Optional[str],
file_name: typing.Optional[str],
read_chunk_size_bytes: int,
encoding: str,
skip_raw_data_prefix: bool,
kwargs,
):
This is a more flexible version of put that accepts a file-like object or a string path.
Writes to the raw output prefix only. If you want to write to another fs use put_data or get the fsspec
file system directly.
FYI: Currently the raw output prefix set by propeller is already unique per retry and looks like
s3://my-s3-bucket/data/o4/feda4e266c748463a97d-n0-0
If lpath is a folder, then recursive will be set.
If lpath is a streamable, then it can only be a single file.
Writes to:
{raw output prefix}/{upload_prefix}/{file_name}
Parameter |
Type |
lpath |
typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO] |
upload_prefix |
typing.Optional[str] |
file_name |
typing.Optional[str] |
read_chunk_size_bytes |
int |
encoding |
str |
skip_raw_data_prefix |
bool |
kwargs |
**kwargs |
download()
def download(
remote_path: str,
local_path: str,
kwargs,
):
Downloads from remote to local
Parameter |
Type |
remote_path |
str |
local_path |
str |
kwargs |
**kwargs |
download_directory()
def download_directory(
remote_path: str,
local_path: str,
kwargs,
):
Downloads directory from given remote to local path
Parameter |
Type |
remote_path |
str |
local_path |
str |
kwargs |
**kwargs |
exists()
def exists(
path: str,
):
generate_new_custom_path()
def generate_new_custom_path(
fs: typing.Optional[fsspec.spec.AbstractFileSystem],
alt: typing.Optional[str],
stem: typing.Optional[str],
):
Generates a new path with the raw output prefix and a random string appended to it.
Optionally, you can provide an alternate prefix and a stem. If stem is provided, it
will be appended to the path instead of a random string. If alt is provided, it will
replace the first part of the output prefix, e.g. the S3 or GCS bucket.
If wanting to write to a non-random prefix in a non-default S3 bucket, this can be
called with alt=“my-alt-bucket” and stem=“my-stem” to generate a path like
s3://my-alt-bucket/default-prefix-part/my-stem
Parameter |
Type |
fs |
typing.Optional[fsspec.spec.AbstractFileSystem] |
alt |
typing.Optional[str] |
stem |
typing.Optional[str] |
get()
def get(
from_path: str,
to_path: str,
recursive: bool,
kwargs,
):
Parameter |
Type |
from_path |
str |
to_path |
str |
recursive |
bool |
kwargs |
**kwargs |
get_async_filesystem_for_path()
def get_async_filesystem_for_path(
path: str,
anonymous: bool,
kwargs,
):
Parameter |
Type |
path |
str |
anonymous |
bool |
kwargs |
**kwargs |
get_data()
def get_data(
remote_path: str,
local_path: str,
is_multipart: bool,
kwargs,
):
Parameter |
Type |
remote_path |
str |
local_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
get_file_tail()
def get_file_tail(
file_path_or_file_name: str,
):
Parameter |
Type |
file_path_or_file_name |
str |
get_filesystem()
def get_filesystem(
protocol: typing.Optional[str],
anonymous: bool,
path: typing.Optional[str],
kwargs,
):
Parameter |
Type |
protocol |
typing.Optional[str] |
anonymous |
bool |
path |
typing.Optional[str] |
kwargs |
**kwargs |
get_filesystem_for_path()
def get_filesystem_for_path(
path: str,
anonymous: bool,
kwargs,
):
Parameter |
Type |
path |
str |
anonymous |
bool |
kwargs |
**kwargs |
get_random_local_directory()
def get_random_local_directory()
get_random_local_path()
def get_random_local_path(
file_path_or_file_name: typing.Optional[str],
):
Use file_path_or_file_name, when you want a random directory, but want to preserve the leaf file name
Parameter |
Type |
file_path_or_file_name |
typing.Optional[str] |
get_random_remote_directory()
def get_random_remote_directory()
get_random_remote_path()
def get_random_remote_path(
file_path_or_file_name: typing.Optional[str],
):
Parameter |
Type |
file_path_or_file_name |
typing.Optional[str] |
get_random_string()
is_remote()
def is_remote(
path: typing.Union[str, os.PathLike],
):
Deprecated. Let’s find a replacement
Parameter |
Type |
path |
typing.Union[str, os.PathLike] |
join()
def join(
args: `*args`,
unstrip: bool,
fs: typing.Optional[fsspec.spec.AbstractFileSystem],
):
Parameter |
Type |
args |
*args |
unstrip |
bool |
fs |
typing.Optional[fsspec.spec.AbstractFileSystem] |
put_data()
def put_data(
local_path: typing.Union[str, os.PathLike],
remote_path: str,
is_multipart: bool,
kwargs,
):
The implication here is that we’re always going to put data to the remote location, so we .remote to ensure
we don’t use the true local proxy if the remote path is a file://
Parameter |
Type |
local_path |
typing.Union[str, os.PathLike] |
remote_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
put_raw_data()
def put_raw_data(
lpath: typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO],
upload_prefix: typing.Optional[str],
file_name: typing.Optional[str],
read_chunk_size_bytes: int,
encoding: str,
skip_raw_data_prefix: bool,
kwargs,
):
This is a more flexible version of put that accepts a file-like object or a string path.
Writes to the raw output prefix only. If you want to write to another fs use put_data or get the fsspec
file system directly.
FYI: Currently the raw output prefix set by propeller is already unique per retry and looks like
s3://my-s3-bucket/data/o4/feda4e266c748463a97d-n0-0
If lpath is a folder, then recursive will be set.
If lpath is a streamable, then it can only be a single file.
Writes to:
{raw output prefix}/{upload_prefix}/{file_name}
Parameter |
Type |
lpath |
typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO] |
upload_prefix |
typing.Optional[str] |
file_name |
typing.Optional[str] |
read_chunk_size_bytes |
int |
encoding |
str |
skip_raw_data_prefix |
bool |
kwargs |
**kwargs |
recursive_paths()
def recursive_paths(
f: str,
t: str,
):
Parameter |
Type |
f |
str |
t |
str |
sep()
def sep(
file_system: typing.Optional[fsspec.spec.AbstractFileSystem],
):
Parameter |
Type |
file_system |
typing.Optional[fsspec.spec.AbstractFileSystem] |
def strip_file_header(
path: str,
trim_trailing_sep: bool,
):
Drops file:// if it exists from the file
Parameter |
Type |
path |
str |
trim_trailing_sep |
bool |
upload()
def upload(
file_path: str,
to_path: str,
kwargs,
):
Parameter |
Type |
file_path |
str |
to_path |
str |
kwargs |
**kwargs |
upload_directory()
def upload_directory(
local_path: str,
remote_path: str,
kwargs,
):
Parameter |
Type |
local_path |
str |
remote_path |
str |
kwargs |
**kwargs |
Properties
Property |
Type |
Description |
data_config |
|
|
local_access |
|
|
local_sandbox_dir |
|
|
raw_output_fs |
|
|
raw_output_prefix |
|
|
flytekit.bin.entrypoint.FlyteContext
This is an internal-facing context object, that most users will not have to deal with. It’s essentially a globally
available grab bag of settings and objects that allows flytekit to do things like convert complex types, run and
compile workflows, serialize Flyte entities, etc.
Even though this object as a current_context
function on it, it should not be called directly. Please use the
:py:class:flytekit.FlyteContextManager
object instead.
Please do not confuse this object with the :py:class:flytekit.ExecutionParameters
object.
def FlyteContext(
file_access: FileAccessProvider,
level: int,
flyte_client: Optional['friendly_client.SynchronousFlyteClient'],
compilation_state: Optional[CompilationState],
execution_state: Optional[ExecutionState],
serialization_settings: Optional[SerializationSettings],
in_a_condition: bool,
origin_stackframe: Optional[traceback.FrameSummary],
output_metadata_tracker: Optional[OutputMetadataTracker],
worker_queue: Optional[Controller],
):
Parameter |
Type |
file_access |
FileAccessProvider |
level |
int |
flyte_client |
Optional['friendly_client.SynchronousFlyteClient'] |
compilation_state |
Optional[CompilationState] |
execution_state |
Optional[ExecutionState] |
serialization_settings |
Optional[SerializationSettings] |
in_a_condition |
bool |
origin_stackframe |
Optional[traceback.FrameSummary] |
output_metadata_tracker |
Optional[OutputMetadataTracker] |
worker_queue |
Optional[Controller] |
Methods
current_context()
This method exists only to maintain backwards compatibility. Please use
FlyteContextManager.current_context()
instead.
Users of flytekit should be wary not to confuse the object returned from this function
with :py:func:flytekit.current_context
enter_conditional_section()
def enter_conditional_section()
get_deck()
Returns the deck that was created as part of the last execution.
The return value depends on the execution environment. In a notebook, the return value is compatible with
IPython.display and should be rendered in the notebook.
.. code-block:: python
with flytekit.new_context() as ctx:
my_task(…)
ctx.get_deck()
OR if you wish to explicitly display
.. code-block:: python
from IPython import display
display(ctx.get_deck())
get_origin_stackframe_repr()
def get_origin_stackframe_repr()
new_builder()
new_compilation_state()
def new_compilation_state(
prefix: str,
):
Creates and returns a default compilation state. For most of the code this should be the entrypoint
of compilation, otherwise the code should always uses - with_compilation_state
Parameter |
Type |
prefix |
str |
new_execution_state()
def new_execution_state(
working_dir: Optional[os.PathLike],
):
Creates and returns a new default execution state. This should be used at the entrypoint of execution,
in all other cases it is preferable to use with_execution_state
Parameter |
Type |
working_dir |
Optional[os.PathLike] |
set_stackframe()
def set_stackframe(
s: traceback.FrameSummary,
):
Parameter |
Type |
s |
traceback.FrameSummary |
with_client()
def with_client(
c: SynchronousFlyteClient,
):
Parameter |
Type |
c |
SynchronousFlyteClient |
with_compilation_state()
def with_compilation_state(
c: CompilationState,
):
Parameter |
Type |
c |
CompilationState |
with_execution_state()
def with_execution_state(
es: ExecutionState,
):
Parameter |
Type |
es |
ExecutionState |
with_file_access()
def with_file_access(
fa: FileAccessProvider,
):
Parameter |
Type |
fa |
FileAccessProvider |
with_new_compilation_state()
def with_new_compilation_state()
def with_output_metadata_tracker(
t: OutputMetadataTracker,
):
Parameter |
Type |
t |
OutputMetadataTracker |
with_serialization_settings()
def with_serialization_settings(
ss: SerializationSettings,
):
Parameter |
Type |
ss |
SerializationSettings |
with_worker_queue()
def with_worker_queue(
wq: Controller,
):
Parameter |
Type |
wq |
Controller |
Properties
Property |
Type |
Description |
user_space_params |
|
|
flytekit.bin.entrypoint.FlyteContextManager
FlyteContextManager manages the execution context within Flytekit. It holds global state of either compilation
or Execution. It is not thread-safe and can only be run as a single threaded application currently.
Context’s within Flytekit is useful to manage compilation state and execution state. Refer to CompilationState
and ExecutionState
for more information. FlyteContextManager provides a singleton stack to manage these contexts.
Typical usage is
.. code-block:: python
FlyteContextManager.initialize()
with FlyteContextManager.with_context(o) as ctx:
pass
If required - not recommended you can use
FlyteContextManager.push_context()
but correspondingly a pop_context should be called
FlyteContextManager.pop_context()
Methods
add_signal_handler()
def add_signal_handler(
handler: typing.Callable[[int, FrameType], typing.Any],
):
Parameter |
Type |
handler |
typing.Callable[[int, FrameType], typing.Any] |
current_context()
get_origin_stackframe()
def get_origin_stackframe(
limit,
):
initialize()
Re-initializes the context and erases the entire context
pop_context()
push_context()
def push_context(
ctx: FlyteContext,
f: Optional[traceback.FrameSummary],
):
Parameter |
Type |
ctx |
FlyteContext |
f |
Optional[traceback.FrameSummary] |
size()
with_context()
def with_context(
b: FlyteContext.Builder,
):
Parameter |
Type |
b |
FlyteContext.Builder |
flytekit.bin.entrypoint.FlyteException
Common base class for all non-exit exceptions.
def FlyteException(
args,
timestamp: typing.Optional[float],
):
Parameter |
Type |
args |
*args |
timestamp |
typing.Optional[float] |
Properties
Property |
Type |
Description |
timestamp |
|
|
flytekit.bin.entrypoint.FlyteNonRecoverableSystemException
Common base class for all non-exit exceptions.
def FlyteNonRecoverableSystemException(
exc_value: Exception,
):
FlyteNonRecoverableSystemException is thrown when a system code raises an exception.
Parameter |
Type |
exc_value |
Exception |
Properties
Property |
Type |
Description |
timestamp |
|
|
value |
|
|
flytekit.bin.entrypoint.FlyteRecoverableException
Common base class for all non-exit exceptions.
def FlyteRecoverableException(
args,
timestamp: typing.Optional[float],
):
Parameter |
Type |
args |
*args |
timestamp |
typing.Optional[float] |
Properties
Property |
Type |
Description |
timestamp |
|
|
flytekit.bin.entrypoint.FlyteUserRuntimeException
Common base class for all non-exit exceptions.
def FlyteUserRuntimeException(
exc_value: Exception,
timestamp: typing.Optional[float],
):
FlyteUserRuntimeException is thrown when a user code raises an exception.
Parameter |
Type |
exc_value |
Exception |
timestamp |
typing.Optional[float] |
Properties
Property |
Type |
Description |
timestamp |
|
|
value |
|
|
flytekit.bin.entrypoint.IgnoreOutputs
This exception should be used to indicate that the outputs generated by this can be safely ignored.
This is useful in case of distributed training or peer-to-peer parallel algorithms.
flytekit.bin.entrypoint.ImageConfig
We recommend you to use ImageConfig.auto(img_name=None) to create an ImageConfig.
For example, ImageConfig.auto(img_name=““ghcr.io/flyteorg/flytecookbook:v1.0.0"”) will create an ImageConfig.
ImageConfig holds available images which can be used at registration time. A default image can be specified
along with optional additional images. Each image in the config must have a unique name.
Attributes:
default_image (Optional[Image]): The default image to be used as a container for task serialization.
images (List[Image]): Optional, additional images which can be used in task container definitions.
def ImageConfig(
default_image: Optional[Image],
images: Optional[List[Image]],
):
Parameter |
Type |
default_image |
Optional[Image] |
images |
Optional[List[Image]] |
Methods
auto()
def auto(
config_file: typing.Union[str, ConfigFile, None],
img_name: Optional[str],
):
Reads from config file or from img_name
Note that this function does not take into account the flytekit default images (see the Dockerfiles at the
base of this repo). To pick those up, see the auto_default_image function..
Parameter |
Type |
config_file |
typing.Union[str, ConfigFile, None] |
img_name |
Optional[str] |
auto_default_image()
create_from()
def create_from(
default_image: Optional[Image],
other_images: typing.Optional[typing.List[Image]],
):
Parameter |
Type |
default_image |
Optional[Image] |
other_images |
typing.Optional[typing.List[Image]] |
find_image()
Return an image, by name, if it exists.
from_dict()
def from_dict(
kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
infer_missing,
):
Parameter |
Type |
kvs |
typing.Union[dict, list, str, int, float, bool, NoneType] |
infer_missing |
|
from_images()
def from_images(
default_image: str,
m: typing.Optional[typing.Dict[str, str]],
):
Allows you to programmatically create an ImageConfig. Usually only the default_image is required, unless
your workflow uses multiple images
.. code:: python
ImageConfig.from_dict(
“ghcr.io/flyteorg/flytecookbook:v1.0.0”,
{
“spark”: “ghcr.io/flyteorg/myspark:…”,
“other”: “…”,
}
)
urn:
Parameter |
Type |
default_image |
str |
m |
typing.Optional[typing.Dict[str, str]] |
from_json()
def from_json(
s: typing.Union[str, bytes, bytearray],
parse_float,
parse_int,
parse_constant,
infer_missing,
kw,
):
Parameter |
Type |
s |
typing.Union[str, bytes, bytearray] |
parse_float |
|
parse_int |
|
parse_constant |
|
infer_missing |
|
kw |
|
schema()
def schema(
infer_missing: bool,
only,
exclude,
many: bool,
context,
load_only,
dump_only,
partial: bool,
unknown,
):
Parameter |
Type |
infer_missing |
bool |
only |
|
exclude |
|
many |
bool |
context |
|
load_only |
|
dump_only |
|
partial |
bool |
unknown |
|
to_dict()
def to_dict(
encode_json,
):
Parameter |
Type |
encode_json |
|
to_json()
def to_json(
skipkeys: bool,
ensure_ascii: bool,
check_circular: bool,
allow_nan: bool,
indent: typing.Union[int, str, NoneType],
separators: typing.Tuple[str, str],
default: typing.Callable,
sort_keys: bool,
kw,
):
Parameter |
Type |
skipkeys |
bool |
ensure_ascii |
bool |
check_circular |
bool |
allow_nan |
bool |
indent |
typing.Union[int, str, NoneType] |
separators |
typing.Tuple[str, str] |
default |
typing.Callable |
sort_keys |
bool |
kw |
|
validate_image()
def validate_image(
_: typing.Any,
param: str,
values: tuple,
):
Validates the image to match the standard format. Also validates that only one default image
is provided. a default image, is one that is specified as default=<image_uri>
or just <image_uri>
. All
other images should be provided with a name, in the format name=<image_uri>
This method can be used with the
CLI
Parameter |
Type |
_ |
typing.Any |
param |
str |
values |
tuple |
flytekit.bin.entrypoint.OutputMetadataTracker
This class is for the users to set arbitrary metadata on output literals.
Attributes:
output_metadata Optional[TaskOutputMetadata]: is a sparse dictionary of metadata that the user wants to attach
to each output of a task. The key is the output value (object) and the value is an OutputMetadata object.
def OutputMetadataTracker(
output_metadata: typing.Dict[typing.Any, OutputMetadata],
):
Parameter |
Type |
output_metadata |
typing.Dict[typing.Any, OutputMetadata] |
Methods
add()
def add(
obj: typing.Any,
metadata: OutputMetadata,
):
Parameter |
Type |
obj |
typing.Any |
metadata |
OutputMetadata |
get()
def get(
obj: typing.Any,
):
Parameter |
Type |
obj |
typing.Any |
with_params()
def with_params(
output_metadata: Optional[TaskOutputMetadata],
):
Produces a copy of the current object and set new things
Parameter |
Type |
output_metadata |
Optional[TaskOutputMetadata] |
flytekit.bin.entrypoint.PythonTask
Base Class for all Tasks with a Python native Interface
. This should be directly used for task types, that do
not have a python function to be executed. Otherwise refer to :py:class:flytekit.PythonFunctionTask
.
def PythonTask(
task_type: str,
name: str,
task_config: typing.Optional[~T],
interface: typing.Optional[flytekit.core.interface.Interface],
environment: typing.Optional[typing.Dict[str, str]],
disable_deck: typing.Optional[bool],
enable_deck: typing.Optional[bool],
deck_fields: typing.Optional[typing.Tuple[flytekit.deck.deck.DeckField, ...]],
kwargs,
):
Parameter |
Type |
task_type |
str |
name |
str |
task_config |
typing.Optional[~T] |
interface |
typing.Optional[flytekit.core.interface.Interface] |
environment |
typing.Optional[typing.Dict[str, str]] |
disable_deck |
typing.Optional[bool] |
enable_deck |
typing.Optional[bool] |
deck_fields |
typing.Optional[typing.Tuple[flytekit.deck.deck.DeckField, ...]] |
kwargs |
**kwargs |
Methods
Method |
Description |
compile() |
Generates a node that encapsulates this task in a workflow definition |
construct_node_metadata() |
Used when constructing the node that encapsulates this task as part of a broader workflow definition |
dispatch_execute() |
This method translates Flyte’s Type system based input values and invokes the actual call to the executor |
execute() |
This method will be invoked to execute the task |
find_lhs() |
None |
get_config() |
Returns the task config as a serializable dictionary |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte |
get_type_for_input_var() |
Returns the python type for an input variable by name |
get_type_for_output_var() |
Returns the python type for the specified output variable by name |
local_execute() |
This function is used only in the local execution path and is responsible for calling dispatch execute |
local_execution_mode() |
None |
post_execute() |
Post execute is called after the execution has completed, with the user_params and can be used to clean-up, |
pre_execute() |
This is the method that will be invoked directly before executing the task method and before all the inputs |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Generates a node that encapsulates this task in a workflow definition.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
This method translates Flyte’s Type system based input values and invokes the actual call to the executor
This method is also invoked during runtime.
VoidPromise
is returned in the case when the task itself declares no outputs.
Literal Map
is returned when the task returns either one more outputs in the declaration. Individual outputs
may be none
DynamicJobSpec
is returned when a dynamic workflow is executed
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
This method will be invoked to execute the task.
Parameter |
Type |
kwargs |
**kwargs |
find_lhs()
get_config()
def get_config(
settings: flytekit.configuration.SerializationSettings,
):
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom
defined for this task.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_container()
def get_container(
settings: flytekit.configuration.SerializationSettings,
):
Returns the container definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
):
Return additional plugin-specific custom data (if any) as a serializable dictionary.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: flytekit.configuration.SerializationSettings,
):
Returns the extended resources to allocate to the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: flytekit.configuration.SerializationSettings,
):
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
):
Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
def get_type_for_input_var(
k: str,
v: typing.Any,
):
Returns the python type for an input variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
):
Returns the python type for the specified output variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
This function is used only in the local execution path and is responsible for calling dispatch execute.
Use this function when calling a task with native values (or Promises containing Flyte literals derived from
Python native values).
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
post_execute()
def post_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
rval: typing.Any,
):
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):
This is the method that will be invoked directly before executing the task method and before all the inputs
are converted. One particular case where this is useful is if the context is to be modified for the user process
to get some user space parameters. This also ensures that things like SparkSession are already correctly
setup before the type transformers are called
This should return either the same context of the mutated context
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property |
Type |
Description |
deck_fields |
|
|
disable_deck |
|
|
docs |
|
|
enable_deck |
|
|
environment |
|
|
instantiated_in |
|
|
interface |
|
|
lhs |
|
|
location |
|
|
metadata |
|
|
name |
|
|
python_interface |
|
|
security_context |
|
|
task_config |
|
|
task_type |
|
|
task_type_version |
|
|
flytekit.bin.entrypoint.SerializationSettings
These settings are provided while serializing a workflow and task, before registration. This is required to get
runtime information at serialization time, as well as some defaults.
Attributes:
project (str): The project (if any) with which to register entities under.
domain (str): The domain (if any) with which to register entities under.
version (str): The version (if any) with which to register entities under.
image_config (ImageConfig): The image config used to define task container images.
env (Optional[Dict[str, str]]): Environment variables injected into task container definitions.
flytekit_virtualenv_root (Optional[str]): During out of container serialize the absolute path of the flytekit
virtualenv at serialization time won’t match the in-container value at execution time. This optional value
is used to provide the in-container virtualenv path
python_interpreter (Optional[str]): The python executable to use. This is used for spark tasks in out of
container execution.
entrypoint_settings (Optional[EntrypointSettings]): Information about the command, path and version of the
entrypoint program.
fast_serialization_settings (Optional[FastSerializationSettings]): If the code is being serialized so that it
can be fast registered (and thus omit building a Docker image) this object contains additional parameters
for serialization.
source_root (Optional[str]): The root directory of the source code.
def SerializationSettings(
image_config: ImageConfig,
project: typing.Optional[str],
domain: typing.Optional[str],
version: typing.Optional[str],
env: Optional[Dict[str, str]],
git_repo: Optional[str],
python_interpreter: str,
flytekit_virtualenv_root: Optional[str],
fast_serialization_settings: Optional[FastSerializationSettings],
source_root: Optional[str],
):
Parameter |
Type |
image_config |
ImageConfig |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
version |
typing.Optional[str] |
env |
Optional[Dict[str, str]] |
git_repo |
Optional[str] |
python_interpreter |
str |
flytekit_virtualenv_root |
Optional[str] |
fast_serialization_settings |
Optional[FastSerializationSettings] |
source_root |
Optional[str] |
Methods
default_entrypoint_settings()
def default_entrypoint_settings(
interpreter_path: str,
):
Assumes the entrypoint is installed in a virtual-environment where the interpreter is
Parameter |
Type |
interpreter_path |
str |
for_image()
def for_image(
image: str,
version: str,
project: str,
domain: str,
python_interpreter_path: str,
):
Parameter |
Type |
image |
str |
version |
str |
project |
str |
domain |
str |
python_interpreter_path |
str |
from_dict()
def from_dict(
kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
infer_missing,
):
Parameter |
Type |
kvs |
typing.Union[dict, list, str, int, float, bool, NoneType] |
infer_missing |
|
from_json()
def from_json(
s: typing.Union[str, bytes, bytearray],
parse_float,
parse_int,
parse_constant,
infer_missing,
kw,
):
Parameter |
Type |
s |
typing.Union[str, bytes, bytearray] |
parse_float |
|
parse_int |
|
parse_constant |
|
infer_missing |
|
kw |
|
from_transport()
def from_transport(
s: str,
):
new_builder()
Creates a SerializationSettings.Builder
that copies the existing serialization settings parameters and
allows for customization.
schema()
def schema(
infer_missing: bool,
only,
exclude,
many: bool,
context,
load_only,
dump_only,
partial: bool,
unknown,
):
Parameter |
Type |
infer_missing |
bool |
only |
|
exclude |
|
many |
bool |
context |
|
load_only |
|
dump_only |
|
partial |
bool |
unknown |
|
should_fast_serialize()
def should_fast_serialize()
Whether or not the serialization settings specify that entities should be serialized for fast registration.
to_dict()
def to_dict(
encode_json,
):
Parameter |
Type |
encode_json |
|
to_json()
def to_json(
skipkeys: bool,
ensure_ascii: bool,
check_circular: bool,
allow_nan: bool,
indent: typing.Union[int, str, NoneType],
separators: typing.Tuple[str, str],
default: typing.Callable,
sort_keys: bool,
kw,
):
Parameter |
Type |
skipkeys |
bool |
ensure_ascii |
bool |
check_circular |
bool |
allow_nan |
bool |
indent |
typing.Union[int, str, NoneType] |
separators |
typing.Tuple[str, str] |
default |
typing.Callable |
sort_keys |
bool |
kw |
|
venv_root_from_interpreter()
def venv_root_from_interpreter(
interpreter_path: str,
):
Computes the path of the virtual environment root, based on the passed in python interpreter path
for example /opt/venv/bin/python3 -> /opt/venv
Parameter |
Type |
interpreter_path |
str |
with_serialized_context()
def with_serialized_context()
Use this method to create a new SerializationSettings that has an environment variable set with the SerializedContext
This is useful in transporting SerializedContext to serialized and registered tasks.
The setting will be available in the env
field with the key SERIALIZED_CONTEXT_ENV_VAR
:return: A newly constructed SerializationSettings, or self, if it already has the serializationSettings
Properties
Property |
Type |
Description |
entrypoint_settings |
|
|
serialized_context |
|
|
flytekit.bin.entrypoint.StatsConfig
Configuration for sending statsd.
def StatsConfig(
host: str,
port: int,
disabled: bool,
disabled_tags: bool,
):
Parameter |
Type |
host |
str |
port |
int |
disabled |
bool |
disabled_tags |
bool |
Methods
Method |
Description |
auto() |
Reads from environment variable, followed by ConfigFile provided |
auto()
def auto(
config_file: typing.Union[str, ConfigFile],
):
Reads from environment variable, followed by ConfigFile provided
Parameter |
Type |
config_file |
typing.Union[str, ConfigFile] |
flytekit.bin.entrypoint.SyncCheckpoint
This class is NOT THREAD-SAFE!
Sync Checkpoint, will synchronously checkpoint a user given file or folder.
It will also synchronously download / restore previous checkpoints, when restore is invoked.
TODO: Implement an async checkpoint system
def SyncCheckpoint(
checkpoint_dest: str,
checkpoint_src: typing.Optional[str],
):
Parameter |
Type |
checkpoint_dest |
str |
checkpoint_src |
typing.Optional[str] |
Methods
Method |
Description |
prev_exists() |
None |
read() |
This should only be used if there is a singular checkpoint file written |
restore() |
Given a path, if a previous checkpoint exists, will be downloaded to this path |
save() |
|
write() |
This will overwrite the checkpoint |
prev_exists()
read()
This should only be used if there is a singular checkpoint file written. If more than one checkpoint file is
found, this will raise a ValueError
restore()
def restore(
path: typing.Union[pathlib.Path, str, NoneType],
):
Given a path, if a previous checkpoint exists, will be downloaded to this path.
If download is successful the downloaded path is returned
.. note:
Download will not be performed, if the checkpoint was previously restored. The method will return the
previously downloaded path.
Parameter |
Type |
path |
typing.Union[pathlib.Path, str, NoneType] |
save()
def save(
cp: typing.Union[pathlib.Path, str, _io.BufferedReader],
):
Parameter |
Type |
cp |
typing.Union[pathlib.Path, str, _io.BufferedReader] |
write()
This will overwrite the checkpoint. It can be retrieved using read or restore
flytekit.bin.entrypoint.Timestamp
A ProtocolMessage
Methods
FromDatetime()
Converts datetime to Timestamp.
FromJsonString()
def FromJsonString(
value,
):
Parse a RFC 3339 date string format to Timestamp.
FromMicroseconds()
def FromMicroseconds(
micros,
):
Converts microseconds since epoch to Timestamp.
FromMilliseconds()
def FromMilliseconds(
millis,
):
Converts milliseconds since epoch to Timestamp.
FromNanoseconds()
def FromNanoseconds(
nanos,
):
Converts nanoseconds since epoch to Timestamp.
FromSeconds()
def FromSeconds(
seconds,
):
Converts seconds since epoch to Timestamp.
GetCurrentTime()
Get the current UTC into Timestamp.
ToDatetime()
def ToDatetime(
tzinfo,
):
Converts Timestamp to a datetime.
ToJsonString()
Converts Timestamp to RFC 3339 date string format.
Returns:
A string converted from timestamp. The string is always Z-normalized
and uses 3, 6 or 9 fractional digits as required to represent the
exact time. Example of the return format: ‘1972-01-01T10:00:20.021Z’
ToMicroseconds()
Converts Timestamp to microseconds since epoch.
ToMilliseconds()
Converts Timestamp to milliseconds since epoch.
ToNanoseconds()
Converts Timestamp to nanoseconds since epoch.
ToSeconds()
Converts Timestamp to seconds since epoch.
flytekit.bin.entrypoint.VoidPromise
This object is returned for tasks that do not return any outputs (declared interface is empty)
VoidPromise cannot be interacted with and does not allow comparisons or any operations
def VoidPromise(
task_name: str,
ref: Optional[NodeOutput],
):
Parameter |
Type |
task_name |
str |
ref |
Optional[NodeOutput] |
Methods
runs_before()
def runs_before(
args,
kwargs,
):
This is a placeholder and should do nothing. It is only here to enable local execution of workflows
where a task returns nothing.
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
with_overrides()
def with_overrides(
args,
kwargs,
):
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
Properties
Property |
Type |
Description |
ref |
|
|
task_name |
|
|