1.15.4.dev2+g3e3ce2426

flytekit.bin.entrypoint

Directory

Classes

Class Description
ExecutionParameters This is a run-time user-centric context object that is accessible to every @task method.
ExecutionState This is the context that is active when executing a task or a local workflow.
FastSerializationSettings This object hold information about settings necessary to serialize an object so that it can be fast-registered.
FileAccessProvider This is the class that is available through the FlyteContext and can be used for persisting data to the remote.
FlyteContext This is an internal-facing context object, that most users will not have to deal with.
FlyteContextManager FlyteContextManager manages the execution context within Flytekit.
ImageConfig We recommend you to use ImageConfig.
OutputMetadataTracker This class is for the users to set arbitrary metadata on output literals.
PythonTask Base Class for all Tasks with a Python native Interface.
SerializationSettings These settings are provided while serializing a workflow and task, before registration.
StatsConfig Configuration for sending statsd.
SyncCheckpoint This class is NOT THREAD-SAFE!.
Timestamp A ProtocolMessage.
VoidPromise This object is returned for tasks that do not return any outputs (declared interface is empty).

Errors

flytekit.bin.entrypoint.ExecutionParameters

This is a run-time user-centric context object that is accessible to every @task method. It can be accessed using

.. code-block:: python

flytekit.current_context()

This object provides the following

  • a statsd handler
  • a logging handler
  • the execution ID as an :py:class:flytekit.models.core.identifier.WorkflowExecutionIdentifier object
  • a working directory for the user to write arbitrary files to

Please do not confuse this object with the :py:class:flytekit.FlyteContext object.

def ExecutionParameters(
    execution_date,
    tmp_dir,
    stats,
    execution_id: typing.Optional[_identifier.WorkflowExecutionIdentifier],
    logging,
    raw_output_prefix,
    output_metadata_prefix,
    checkpoint,
    decks,
    task_id: typing.Optional[_identifier.Identifier],
    enable_deck: bool,
    kwargs,
):
Parameter Type
execution_date
tmp_dir
stats
execution_id typing.Optional[_identifier.WorkflowExecutionIdentifier]
logging
raw_output_prefix
output_metadata_prefix
checkpoint
decks
task_id typing.Optional[_identifier.Identifier]
enable_deck bool
kwargs **kwargs

Methods

Method Description
builder() None
get() Returns task specific context if present else raise an error
has_attr() None
new_builder() None
with_enable_deck() None
with_task_sandbox() None

builder()

def builder()

get()

def get(
    key: str,
):

Returns task specific context if present else raise an error. The returned context will match the key

Parameter Type
key str

has_attr()

def has_attr(
    attr_name: str,
):
Parameter Type
attr_name str

new_builder()

def new_builder(
    current: Optional[ExecutionParameters],
):
Parameter Type
current Optional[ExecutionParameters]

with_enable_deck()

def with_enable_deck(
    enable_deck: bool,
):
Parameter Type
enable_deck bool

with_task_sandbox()

def with_task_sandbox()

Properties

Property Type Description
checkpoint
decks
default_deck
enable_deck
execution_date
execution_id
logging
output_metadata_prefix
raw_output_prefix
secrets
stats
task_id
timeline_deck
working_directory

flytekit.bin.entrypoint.ExecutionState

This is the context that is active when executing a task or a local workflow. This carries the necessary state to execute. Some required things during execution deal with temporary directories, ExecutionParameters that are passed to the user etc.

Attributes: mode (ExecutionState.Mode): Defines the context in which the task is executed (local, hosted, etc). working_dir (os.PathLike): Specifies the remote, external directory where inputs, outputs and other protobufs are uploaded engine_dir (os.PathLike): branch_eval_mode Optional[BranchEvalMode]: Used to determine whether a branch node should execute. user_space_params Optional[ExecutionParameters]: Provides run-time, user-centric context such as a statsd handler, a logging handler, the current execution id and a working directory.

def ExecutionState(
    working_dir: Union[os.PathLike, str],
    mode: Optional[ExecutionState.Mode],
    engine_dir: Optional[Union[os.PathLike, str]],
    branch_eval_mode: Optional[BranchEvalMode],
    user_space_params: Optional[ExecutionParameters],
):
Parameter Type
working_dir Union[os.PathLike, str]
mode Optional[ExecutionState.Mode]
engine_dir Optional[Union[os.PathLike, str]]
branch_eval_mode Optional[BranchEvalMode]
user_space_params Optional[ExecutionParameters]

Methods

Method Description
branch_complete() Indicates that we are within a conditional / ifelse block and the active branch is not done
is_local_execution() None
take_branch() Indicates that we are within an if-else block and the current branch has evaluated to true
with_params() Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values

branch_complete()

def branch_complete()

Indicates that we are within a conditional / ifelse block and the active branch is not done. Default to SKIPPED

is_local_execution()

def is_local_execution()

take_branch()

def take_branch()

Indicates that we are within an if-else block and the current branch has evaluated to true. Useful only in local execution mode

with_params()

def with_params(
    working_dir: Optional[os.PathLike],
    mode: Optional[Mode],
    engine_dir: Optional[os.PathLike],
    branch_eval_mode: Optional[BranchEvalMode],
    user_space_params: Optional[ExecutionParameters],
):

Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values.

Parameter Type
working_dir Optional[os.PathLike]
mode Optional[Mode]
engine_dir Optional[os.PathLike]
branch_eval_mode Optional[BranchEvalMode]
user_space_params Optional[ExecutionParameters]

flytekit.bin.entrypoint.FastSerializationSettings

This object hold information about settings necessary to serialize an object so that it can be fast-registered.

def FastSerializationSettings(
    enabled: bool,
    destination_dir: Optional[str],
    distribution_location: Optional[str],
):
Parameter Type
enabled bool
destination_dir Optional[str]
distribution_location Optional[str]

Methods

Method Description
from_dict() None
from_json() None
schema() None
to_dict() None
to_json() None

from_dict()

def from_dict(
    kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
    infer_missing,
):
Parameter Type
kvs typing.Union[dict, list, str, int, float, bool, NoneType]
infer_missing

from_json()

def from_json(
    s: typing.Union[str, bytes, bytearray],
    parse_float,
    parse_int,
    parse_constant,
    infer_missing,
    kw,
):
Parameter Type
s typing.Union[str, bytes, bytearray]
parse_float
parse_int
parse_constant
infer_missing
kw

schema()

def schema(
    infer_missing: bool,
    only,
    exclude,
    many: bool,
    context,
    load_only,
    dump_only,
    partial: bool,
    unknown,
):
Parameter Type
infer_missing bool
only
exclude
many bool
context
load_only
dump_only
partial bool
unknown

to_dict()

def to_dict(
    encode_json,
):
Parameter Type
encode_json

to_json()

def to_json(
    skipkeys: bool,
    ensure_ascii: bool,
    check_circular: bool,
    allow_nan: bool,
    indent: typing.Union[int, str, NoneType],
    separators: typing.Tuple[str, str],
    default: typing.Callable,
    sort_keys: bool,
    kw,
):
Parameter Type
skipkeys bool
ensure_ascii bool
check_circular bool
allow_nan bool
indent typing.Union[int, str, NoneType]
separators typing.Tuple[str, str]
default typing.Callable
sort_keys bool
kw

flytekit.bin.entrypoint.FileAccessProvider

This is the class that is available through the FlyteContext and can be used for persisting data to the remote durable store.

def FileAccessProvider(
    local_sandbox_dir: typing.Union[str, os.PathLike],
    raw_output_prefix: str,
    data_config: typing.Optional[flytekit.configuration.DataConfig],
    execution_metadata: typing.Optional[dict],
):
Parameter Type
local_sandbox_dir typing.Union[str, os.PathLike]
raw_output_prefix str
data_config typing.Optional[flytekit.configuration.DataConfig]
execution_metadata typing.Optional[dict]

Methods

Method Description
async_get_data()
async_put_data() The implication here is that we’re always going to put data to the remote location, so we
async_put_raw_data() This is a more flexible version of put that accepts a file-like object or a string path
download() Downloads from remote to local
download_directory() Downloads directory from given remote to local path
exists() None
generate_new_custom_path() Generates a new path with the raw output prefix and a random string appended to it
get() None
get_async_filesystem_for_path() None
get_data()
get_file_tail() None
get_filesystem() None
get_filesystem_for_path() None
get_random_local_directory() None
get_random_local_path() Use file_path_or_file_name, when you want a random directory, but want to preserve the leaf file name
get_random_remote_directory() None
get_random_remote_path() None
get_random_string() None
is_remote() Deprecated
join() None
put_data() The implication here is that we’re always going to put data to the remote location, so we
put_raw_data() This is a more flexible version of put that accepts a file-like object or a string path
recursive_paths() None
sep() None
strip_file_header() Drops file:// if it exists from the file
upload()
upload_directory()

async_get_data()

def async_get_data(
    remote_path: str,
    local_path: str,
    is_multipart: bool,
    kwargs,
):
Parameter Type
remote_path str
local_path str
is_multipart bool
kwargs **kwargs

async_put_data()

def async_put_data(
    local_path: typing.Union[str, os.PathLike],
    remote_path: str,
    is_multipart: bool,
    kwargs,
):

The implication here is that we’re always going to put data to the remote location, so we .remote to ensure we don’t use the true local proxy if the remote path is a file://

Parameter Type
local_path typing.Union[str, os.PathLike]
remote_path str
is_multipart bool
kwargs **kwargs

async_put_raw_data()

def async_put_raw_data(
    lpath: typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO],
    upload_prefix: typing.Optional[str],
    file_name: typing.Optional[str],
    read_chunk_size_bytes: int,
    encoding: str,
    skip_raw_data_prefix: bool,
    kwargs,
):

This is a more flexible version of put that accepts a file-like object or a string path. Writes to the raw output prefix only. If you want to write to another fs use put_data or get the fsspec file system directly. FYI: Currently the raw output prefix set by propeller is already unique per retry and looks like s3://my-s3-bucket/data/o4/feda4e266c748463a97d-n0-0

If lpath is a folder, then recursive will be set. If lpath is a streamable, then it can only be a single file.

Writes to: {raw output prefix}/{upload_prefix}/{file_name}

Parameter Type
lpath typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO]
upload_prefix typing.Optional[str]
file_name typing.Optional[str]
read_chunk_size_bytes int
encoding str
skip_raw_data_prefix bool
kwargs **kwargs

download()

def download(
    remote_path: str,
    local_path: str,
    kwargs,
):

Downloads from remote to local

Parameter Type
remote_path str
local_path str
kwargs **kwargs

download_directory()

def download_directory(
    remote_path: str,
    local_path: str,
    kwargs,
):

Downloads directory from given remote to local path

Parameter Type
remote_path str
local_path str
kwargs **kwargs

exists()

def exists(
    path: str,
):
Parameter Type
path str

generate_new_custom_path()

def generate_new_custom_path(
    fs: typing.Optional[fsspec.spec.AbstractFileSystem],
    alt: typing.Optional[str],
    stem: typing.Optional[str],
):

Generates a new path with the raw output prefix and a random string appended to it. Optionally, you can provide an alternate prefix and a stem. If stem is provided, it will be appended to the path instead of a random string. If alt is provided, it will replace the first part of the output prefix, e.g. the S3 or GCS bucket.

If wanting to write to a non-random prefix in a non-default S3 bucket, this can be called with alt=“my-alt-bucket” and stem=“my-stem” to generate a path like s3://my-alt-bucket/default-prefix-part/my-stem

Parameter Type
fs typing.Optional[fsspec.spec.AbstractFileSystem]
alt typing.Optional[str]
stem typing.Optional[str]

get()

def get(
    from_path: str,
    to_path: str,
    recursive: bool,
    kwargs,
):
Parameter Type
from_path str
to_path str
recursive bool
kwargs **kwargs

get_async_filesystem_for_path()

def get_async_filesystem_for_path(
    path: str,
    anonymous: bool,
    kwargs,
):
Parameter Type
path str
anonymous bool
kwargs **kwargs

get_data()

def get_data(
    remote_path: str,
    local_path: str,
    is_multipart: bool,
    kwargs,
):
Parameter Type
remote_path str
local_path str
is_multipart bool
kwargs **kwargs

get_file_tail()

def get_file_tail(
    file_path_or_file_name: str,
):
Parameter Type
file_path_or_file_name str

get_filesystem()

def get_filesystem(
    protocol: typing.Optional[str],
    anonymous: bool,
    path: typing.Optional[str],
    kwargs,
):
Parameter Type
protocol typing.Optional[str]
anonymous bool
path typing.Optional[str]
kwargs **kwargs

get_filesystem_for_path()

def get_filesystem_for_path(
    path: str,
    anonymous: bool,
    kwargs,
):
Parameter Type
path str
anonymous bool
kwargs **kwargs

get_random_local_directory()

def get_random_local_directory()

get_random_local_path()

def get_random_local_path(
    file_path_or_file_name: typing.Optional[str],
):

Use file_path_or_file_name, when you want a random directory, but want to preserve the leaf file name

Parameter Type
file_path_or_file_name typing.Optional[str]

get_random_remote_directory()

def get_random_remote_directory()

get_random_remote_path()

def get_random_remote_path(
    file_path_or_file_name: typing.Optional[str],
):
Parameter Type
file_path_or_file_name typing.Optional[str]

get_random_string()

def get_random_string()

is_remote()

def is_remote(
    path: typing.Union[str, os.PathLike],
):

Deprecated. Let’s find a replacement

Parameter Type
path typing.Union[str, os.PathLike]

join()

def join(
    args: `*args`,
    unstrip: bool,
    fs: typing.Optional[fsspec.spec.AbstractFileSystem],
):
Parameter Type
args *args
unstrip bool
fs typing.Optional[fsspec.spec.AbstractFileSystem]

put_data()

def put_data(
    local_path: typing.Union[str, os.PathLike],
    remote_path: str,
    is_multipart: bool,
    kwargs,
):

The implication here is that we’re always going to put data to the remote location, so we .remote to ensure we don’t use the true local proxy if the remote path is a file://

Parameter Type
local_path typing.Union[str, os.PathLike]
remote_path str
is_multipart bool
kwargs **kwargs

put_raw_data()

def put_raw_data(
    lpath: typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO],
    upload_prefix: typing.Optional[str],
    file_name: typing.Optional[str],
    read_chunk_size_bytes: int,
    encoding: str,
    skip_raw_data_prefix: bool,
    kwargs,
):

This is a more flexible version of put that accepts a file-like object or a string path. Writes to the raw output prefix only. If you want to write to another fs use put_data or get the fsspec file system directly. FYI: Currently the raw output prefix set by propeller is already unique per retry and looks like s3://my-s3-bucket/data/o4/feda4e266c748463a97d-n0-0

If lpath is a folder, then recursive will be set. If lpath is a streamable, then it can only be a single file.

Writes to: {raw output prefix}/{upload_prefix}/{file_name}

Parameter Type
lpath typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO]
upload_prefix typing.Optional[str]
file_name typing.Optional[str]
read_chunk_size_bytes int
encoding str
skip_raw_data_prefix bool
kwargs **kwargs

recursive_paths()

def recursive_paths(
    f: str,
    t: str,
):
Parameter Type
f str
t str

sep()

def sep(
    file_system: typing.Optional[fsspec.spec.AbstractFileSystem],
):
Parameter Type
file_system typing.Optional[fsspec.spec.AbstractFileSystem]

strip_file_header()

def strip_file_header(
    path: str,
    trim_trailing_sep: bool,
):

Drops file:// if it exists from the file

Parameter Type
path str
trim_trailing_sep bool

upload()

def upload(
    file_path: str,
    to_path: str,
    kwargs,
):
Parameter Type
file_path str
to_path str
kwargs **kwargs

upload_directory()

def upload_directory(
    local_path: str,
    remote_path: str,
    kwargs,
):
Parameter Type
local_path str
remote_path str
kwargs **kwargs

Properties

Property Type Description
data_config
local_access
local_sandbox_dir
raw_output_fs
raw_output_prefix

flytekit.bin.entrypoint.FlyteContext

This is an internal-facing context object, that most users will not have to deal with. It’s essentially a globally available grab bag of settings and objects that allows flytekit to do things like convert complex types, run and compile workflows, serialize Flyte entities, etc.

Even though this object as a current_context function on it, it should not be called directly. Please use the :py:class:flytekit.FlyteContextManager object instead.

Please do not confuse this object with the :py:class:flytekit.ExecutionParameters object.

def FlyteContext(
    file_access: FileAccessProvider,
    level: int,
    flyte_client: Optional['friendly_client.SynchronousFlyteClient'],
    compilation_state: Optional[CompilationState],
    execution_state: Optional[ExecutionState],
    serialization_settings: Optional[SerializationSettings],
    in_a_condition: bool,
    origin_stackframe: Optional[traceback.FrameSummary],
    output_metadata_tracker: Optional[OutputMetadataTracker],
    worker_queue: Optional[Controller],
):
Parameter Type
file_access FileAccessProvider
level int
flyte_client Optional['friendly_client.SynchronousFlyteClient']
compilation_state Optional[CompilationState]
execution_state Optional[ExecutionState]
serialization_settings Optional[SerializationSettings]
in_a_condition bool
origin_stackframe Optional[traceback.FrameSummary]
output_metadata_tracker Optional[OutputMetadataTracker]
worker_queue Optional[Controller]

Methods

Method Description
current_context() This method exists only to maintain backwards compatibility
enter_conditional_section() None
get_deck() Returns the deck that was created as part of the last execution
get_origin_stackframe_repr() None
new_builder() None
new_compilation_state() Creates and returns a default compilation state
new_execution_state() Creates and returns a new default execution state
set_stackframe() None
with_client() None
with_compilation_state() None
with_execution_state() None
with_file_access() None
with_new_compilation_state() None
with_output_metadata_tracker() None
with_serialization_settings() None
with_worker_queue() None

current_context()

def current_context()

This method exists only to maintain backwards compatibility. Please use FlyteContextManager.current_context() instead.

Users of flytekit should be wary not to confuse the object returned from this function with :py:func:flytekit.current_context

enter_conditional_section()

def enter_conditional_section()

get_deck()

def get_deck()

Returns the deck that was created as part of the last execution.

The return value depends on the execution environment. In a notebook, the return value is compatible with IPython.display and should be rendered in the notebook.

.. code-block:: python

with flytekit.new_context() as ctx: my_task(…) ctx.get_deck()

OR if you wish to explicitly display

.. code-block:: python

from IPython import display display(ctx.get_deck())

get_origin_stackframe_repr()

def get_origin_stackframe_repr()

new_builder()

def new_builder()

new_compilation_state()

def new_compilation_state(
    prefix: str,
):

Creates and returns a default compilation state. For most of the code this should be the entrypoint of compilation, otherwise the code should always uses - with_compilation_state

Parameter Type
prefix str

new_execution_state()

def new_execution_state(
    working_dir: Optional[os.PathLike],
):

Creates and returns a new default execution state. This should be used at the entrypoint of execution, in all other cases it is preferable to use with_execution_state

Parameter Type
working_dir Optional[os.PathLike]

set_stackframe()

def set_stackframe(
    s: traceback.FrameSummary,
):
Parameter Type
s traceback.FrameSummary

with_client()

def with_client(
    c: SynchronousFlyteClient,
):
Parameter Type
c SynchronousFlyteClient

with_compilation_state()

def with_compilation_state(
    c: CompilationState,
):
Parameter Type
c CompilationState

with_execution_state()

def with_execution_state(
    es: ExecutionState,
):
Parameter Type
es ExecutionState

with_file_access()

def with_file_access(
    fa: FileAccessProvider,
):
Parameter Type
fa FileAccessProvider

with_new_compilation_state()

def with_new_compilation_state()

with_output_metadata_tracker()

def with_output_metadata_tracker(
    t: OutputMetadataTracker,
):
Parameter Type
t OutputMetadataTracker

with_serialization_settings()

def with_serialization_settings(
    ss: SerializationSettings,
):
Parameter Type
ss SerializationSettings

with_worker_queue()

def with_worker_queue(
    wq: Controller,
):
Parameter Type
wq Controller

Properties

Property Type Description
user_space_params

flytekit.bin.entrypoint.FlyteContextManager

FlyteContextManager manages the execution context within Flytekit. It holds global state of either compilation or Execution. It is not thread-safe and can only be run as a single threaded application currently. Context’s within Flytekit is useful to manage compilation state and execution state. Refer to CompilationState and ExecutionState for more information. FlyteContextManager provides a singleton stack to manage these contexts.

Typical usage is

.. code-block:: python

FlyteContextManager.initialize() with FlyteContextManager.with_context(o) as ctx: pass

If required - not recommended you can use

FlyteContextManager.push_context()

but correspondingly a pop_context should be called

FlyteContextManager.pop_context()

Methods

Method Description
add_signal_handler() None
current_context() None
get_origin_stackframe() None
initialize() Re-initializes the context and erases the entire context
pop_context() None
push_context() None
size() None
with_context() None

add_signal_handler()

def add_signal_handler(
    handler: typing.Callable[[int, FrameType], typing.Any],
):
Parameter Type
handler typing.Callable[[int, FrameType], typing.Any]

current_context()

def current_context()

get_origin_stackframe()

def get_origin_stackframe(
    limit,
):
Parameter Type
limit

initialize()

def initialize()

Re-initializes the context and erases the entire context

pop_context()

def pop_context()

push_context()

def push_context(
    ctx: FlyteContext,
    f: Optional[traceback.FrameSummary],
):
Parameter Type
ctx FlyteContext
f Optional[traceback.FrameSummary]

size()

def size()

with_context()

def with_context(
    b: FlyteContext.Builder,
):
Parameter Type
b FlyteContext.Builder

flytekit.bin.entrypoint.FlyteException

Common base class for all non-exit exceptions.

def FlyteException(
    args,
    timestamp: typing.Optional[float],
):
Parameter Type
args *args
timestamp typing.Optional[float]

Properties

Property Type Description
timestamp

flytekit.bin.entrypoint.FlyteNonRecoverableSystemException

Common base class for all non-exit exceptions.

def FlyteNonRecoverableSystemException(
    exc_value: Exception,
):

FlyteNonRecoverableSystemException is thrown when a system code raises an exception.

Parameter Type
exc_value Exception

Properties

Property Type Description
timestamp
value

flytekit.bin.entrypoint.FlyteRecoverableException

Common base class for all non-exit exceptions.

def FlyteRecoverableException(
    args,
    timestamp: typing.Optional[float],
):
Parameter Type
args *args
timestamp typing.Optional[float]

Properties

Property Type Description
timestamp

flytekit.bin.entrypoint.FlyteUserRuntimeException

Common base class for all non-exit exceptions.

def FlyteUserRuntimeException(
    exc_value: Exception,
    timestamp: typing.Optional[float],
):

FlyteUserRuntimeException is thrown when a user code raises an exception.

Parameter Type
exc_value Exception
timestamp typing.Optional[float]

Properties

Property Type Description
timestamp
value

flytekit.bin.entrypoint.IgnoreOutputs

This exception should be used to indicate that the outputs generated by this can be safely ignored. This is useful in case of distributed training or peer-to-peer parallel algorithms.

flytekit.bin.entrypoint.ImageConfig

We recommend you to use ImageConfig.auto(img_name=None) to create an ImageConfig. For example, ImageConfig.auto(img_name=““ghcr.io/flyteorg/flytecookbook:v1.0.0"”) will create an ImageConfig.

ImageConfig holds available images which can be used at registration time. A default image can be specified along with optional additional images. Each image in the config must have a unique name.

Attributes: default_image (Optional[Image]): The default image to be used as a container for task serialization. images (List[Image]): Optional, additional images which can be used in task container definitions.

def ImageConfig(
    default_image: Optional[Image],
    images: Optional[List[Image]],
):
Parameter Type
default_image Optional[Image]
images Optional[List[Image]]

Methods

Method Description
auto() Reads from config file or from img_name
auto_default_image() None
create_from() None
find_image() Return an image, by name, if it exists
from_dict() None
from_images() Allows you to programmatically create an ImageConfig
from_json() None
schema() None
to_dict() None
to_json() None
validate_image() Validates the image to match the standard format

auto()

def auto(
    config_file: typing.Union[str, ConfigFile, None],
    img_name: Optional[str],
):

Reads from config file or from img_name Note that this function does not take into account the flytekit default images (see the Dockerfiles at the base of this repo). To pick those up, see the auto_default_image function..

Parameter Type
config_file typing.Union[str, ConfigFile, None]
img_name Optional[str]

auto_default_image()

def auto_default_image()

create_from()

def create_from(
    default_image: Optional[Image],
    other_images: typing.Optional[typing.List[Image]],
):
Parameter Type
default_image Optional[Image]
other_images typing.Optional[typing.List[Image]]

find_image()

def find_image(
    name,
):

Return an image, by name, if it exists.

Parameter Type
name

from_dict()

def from_dict(
    kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
    infer_missing,
):
Parameter Type
kvs typing.Union[dict, list, str, int, float, bool, NoneType]
infer_missing

from_images()

def from_images(
    default_image: str,
    m: typing.Optional[typing.Dict[str, str]],
):

Allows you to programmatically create an ImageConfig. Usually only the default_image is required, unless your workflow uses multiple images

.. code:: python

ImageConfig.from_dict( “ghcr.io/flyteorg/flytecookbook:v1.0.0”, { “spark”: “ghcr.io/flyteorg/myspark:…”, “other”: “…”, } )

urn:

Parameter Type
default_image str
m typing.Optional[typing.Dict[str, str]]

from_json()

def from_json(
    s: typing.Union[str, bytes, bytearray],
    parse_float,
    parse_int,
    parse_constant,
    infer_missing,
    kw,
):
Parameter Type
s typing.Union[str, bytes, bytearray]
parse_float
parse_int
parse_constant
infer_missing
kw

schema()

def schema(
    infer_missing: bool,
    only,
    exclude,
    many: bool,
    context,
    load_only,
    dump_only,
    partial: bool,
    unknown,
):
Parameter Type
infer_missing bool
only
exclude
many bool
context
load_only
dump_only
partial bool
unknown

to_dict()

def to_dict(
    encode_json,
):
Parameter Type
encode_json

to_json()

def to_json(
    skipkeys: bool,
    ensure_ascii: bool,
    check_circular: bool,
    allow_nan: bool,
    indent: typing.Union[int, str, NoneType],
    separators: typing.Tuple[str, str],
    default: typing.Callable,
    sort_keys: bool,
    kw,
):
Parameter Type
skipkeys bool
ensure_ascii bool
check_circular bool
allow_nan bool
indent typing.Union[int, str, NoneType]
separators typing.Tuple[str, str]
default typing.Callable
sort_keys bool
kw

validate_image()

def validate_image(
    _: typing.Any,
    param: str,
    values: tuple,
):

Validates the image to match the standard format. Also validates that only one default image is provided. a default image, is one that is specified as default=<image_uri> or just <image_uri>. All other images should be provided with a name, in the format name=<image_uri> This method can be used with the CLI

Parameter Type
_ typing.Any
param str
values tuple

flytekit.bin.entrypoint.OutputMetadataTracker

This class is for the users to set arbitrary metadata on output literals.

Attributes: output_metadata Optional[TaskOutputMetadata]: is a sparse dictionary of metadata that the user wants to attach to each output of a task. The key is the output value (object) and the value is an OutputMetadata object.

def OutputMetadataTracker(
    output_metadata: typing.Dict[typing.Any, OutputMetadata],
):
Parameter Type
output_metadata typing.Dict[typing.Any, OutputMetadata]

Methods

Method Description
add() None
get() None
with_params() Produces a copy of the current object and set new things

add()

def add(
    obj: typing.Any,
    metadata: OutputMetadata,
):
Parameter Type
obj typing.Any
metadata OutputMetadata

get()

def get(
    obj: typing.Any,
):
Parameter Type
obj typing.Any

with_params()

def with_params(
    output_metadata: Optional[TaskOutputMetadata],
):

Produces a copy of the current object and set new things

Parameter Type
output_metadata Optional[TaskOutputMetadata]

flytekit.bin.entrypoint.PythonTask

Base Class for all Tasks with a Python native Interface. This should be directly used for task types, that do not have a python function to be executed. Otherwise refer to :py:class:flytekit.PythonFunctionTask.

def PythonTask(
    task_type: str,
    name: str,
    task_config: typing.Optional[~T],
    interface: typing.Optional[flytekit.core.interface.Interface],
    environment: typing.Optional[typing.Dict[str, str]],
    disable_deck: typing.Optional[bool],
    enable_deck: typing.Optional[bool],
    deck_fields: typing.Optional[typing.Tuple[flytekit.deck.deck.DeckField, ...]],
    kwargs,
):
Parameter Type
task_type str
name str
task_config typing.Optional[~T]
interface typing.Optional[flytekit.core.interface.Interface]
environment typing.Optional[typing.Dict[str, str]]
disable_deck typing.Optional[bool]
enable_deck typing.Optional[bool]
deck_fields typing.Optional[typing.Tuple[flytekit.deck.deck.DeckField, ...]]
kwargs **kwargs

Methods

Method Description
compile() Generates a node that encapsulates this task in a workflow definition
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition
dispatch_execute() This method translates Flyte’s Type system based input values and invokes the actual call to the executor
execute() This method will be invoked to execute the task
find_lhs() None
get_config() Returns the task config as a serializable dictionary
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte
get_input_types() Returns the names and python types as a dictionary for the inputs of this task
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte
get_type_for_input_var() Returns the python type for an input variable by name
get_type_for_output_var() Returns the python type for the specified output variable by name
local_execute() This function is used only in the local execution path and is responsible for calling dispatch execute
local_execution_mode() None
post_execute() Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
pre_execute() This is the method that will be invoked directly before executing the task method and before all the inputs
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
):

Generates a node that encapsulates this task in a workflow definition.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

dispatch_execute()

def dispatch_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.

  • VoidPromise is returned in the case when the task itself declares no outputs.
  • Literal Map is returned when the task returns either one more outputs in the declaration. Individual outputs may be none
  • DynamicJobSpec is returned when a dynamic workflow is executed
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

execute()

def execute(
    kwargs,
):

This method will be invoked to execute the task.

Parameter Type
kwargs **kwargs

find_lhs()

def find_lhs()

get_config()

def get_config(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_container()

def get_container(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_custom()

def get_custom(
    settings: flytekit.configuration.SerializationSettings,
):

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
):

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
):

Returns the python type for an input variable by name.

Parameter Type
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
):

Returns the python type for the specified output variable by name.

Parameter Type
k str
v typing.Any

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):

This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

post_execute()

def post_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
    rval: typing.Any,
):

Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]
rval typing.Any

pre_execute()

def pre_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):

This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called

This should return either the same context of the mutated context

Parameter Type
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
):

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

Properties

Property Type Description
deck_fields
disable_deck
docs
enable_deck
environment
instantiated_in
interface
lhs
location
metadata
name
python_interface
security_context
task_config
task_type
task_type_version

flytekit.bin.entrypoint.SerializationSettings

These settings are provided while serializing a workflow and task, before registration. This is required to get runtime information at serialization time, as well as some defaults.

Attributes: project (str): The project (if any) with which to register entities under. domain (str): The domain (if any) with which to register entities under. version (str): The version (if any) with which to register entities under. image_config (ImageConfig): The image config used to define task container images. env (Optional[Dict[str, str]]): Environment variables injected into task container definitions. flytekit_virtualenv_root (Optional[str]): During out of container serialize the absolute path of the flytekit virtualenv at serialization time won’t match the in-container value at execution time. This optional value is used to provide the in-container virtualenv path python_interpreter (Optional[str]): The python executable to use. This is used for spark tasks in out of container execution. entrypoint_settings (Optional[EntrypointSettings]): Information about the command, path and version of the entrypoint program. fast_serialization_settings (Optional[FastSerializationSettings]): If the code is being serialized so that it can be fast registered (and thus omit building a Docker image) this object contains additional parameters for serialization. source_root (Optional[str]): The root directory of the source code.

def SerializationSettings(
    image_config: ImageConfig,
    project: typing.Optional[str],
    domain: typing.Optional[str],
    version: typing.Optional[str],
    env: Optional[Dict[str, str]],
    git_repo: Optional[str],
    python_interpreter: str,
    flytekit_virtualenv_root: Optional[str],
    fast_serialization_settings: Optional[FastSerializationSettings],
    source_root: Optional[str],
):
Parameter Type
image_config ImageConfig
project typing.Optional[str]
domain typing.Optional[str]
version typing.Optional[str]
env Optional[Dict[str, str]]
git_repo Optional[str]
python_interpreter str
flytekit_virtualenv_root Optional[str]
fast_serialization_settings Optional[FastSerializationSettings]
source_root Optional[str]

Methods

Method Description
default_entrypoint_settings() Assumes the entrypoint is installed in a virtual-environment where the interpreter is
for_image() None
from_dict() None
from_json() None
from_transport() None
new_builder() Creates a ``SerializationSettings
schema() None
should_fast_serialize() Whether or not the serialization settings specify that entities should be serialized for fast registration
to_dict() None
to_json() None
venv_root_from_interpreter() Computes the path of the virtual environment root, based on the passed in python interpreter path
with_serialized_context() Use this method to create a new SerializationSettings that has an environment variable set with the SerializedContext

default_entrypoint_settings()

def default_entrypoint_settings(
    interpreter_path: str,
):

Assumes the entrypoint is installed in a virtual-environment where the interpreter is

Parameter Type
interpreter_path str

for_image()

def for_image(
    image: str,
    version: str,
    project: str,
    domain: str,
    python_interpreter_path: str,
):
Parameter Type
image str
version str
project str
domain str
python_interpreter_path str

from_dict()

def from_dict(
    kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
    infer_missing,
):
Parameter Type
kvs typing.Union[dict, list, str, int, float, bool, NoneType]
infer_missing

from_json()

def from_json(
    s: typing.Union[str, bytes, bytearray],
    parse_float,
    parse_int,
    parse_constant,
    infer_missing,
    kw,
):
Parameter Type
s typing.Union[str, bytes, bytearray]
parse_float
parse_int
parse_constant
infer_missing
kw

from_transport()

def from_transport(
    s: str,
):
Parameter Type
s str

new_builder()

def new_builder()

Creates a SerializationSettings.Builder that copies the existing serialization settings parameters and allows for customization.

schema()

def schema(
    infer_missing: bool,
    only,
    exclude,
    many: bool,
    context,
    load_only,
    dump_only,
    partial: bool,
    unknown,
):
Parameter Type
infer_missing bool
only
exclude
many bool
context
load_only
dump_only
partial bool
unknown

should_fast_serialize()

def should_fast_serialize()

Whether or not the serialization settings specify that entities should be serialized for fast registration.

to_dict()

def to_dict(
    encode_json,
):
Parameter Type
encode_json

to_json()

def to_json(
    skipkeys: bool,
    ensure_ascii: bool,
    check_circular: bool,
    allow_nan: bool,
    indent: typing.Union[int, str, NoneType],
    separators: typing.Tuple[str, str],
    default: typing.Callable,
    sort_keys: bool,
    kw,
):
Parameter Type
skipkeys bool
ensure_ascii bool
check_circular bool
allow_nan bool
indent typing.Union[int, str, NoneType]
separators typing.Tuple[str, str]
default typing.Callable
sort_keys bool
kw

venv_root_from_interpreter()

def venv_root_from_interpreter(
    interpreter_path: str,
):

Computes the path of the virtual environment root, based on the passed in python interpreter path for example /opt/venv/bin/python3 -> /opt/venv

Parameter Type
interpreter_path str

with_serialized_context()

def with_serialized_context()

Use this method to create a new SerializationSettings that has an environment variable set with the SerializedContext This is useful in transporting SerializedContext to serialized and registered tasks. The setting will be available in the env field with the key SERIALIZED_CONTEXT_ENV_VAR :return: A newly constructed SerializationSettings, or self, if it already has the serializationSettings

Properties

Property Type Description
entrypoint_settings
serialized_context

flytekit.bin.entrypoint.StatsConfig

Configuration for sending statsd.

def StatsConfig(
    host: str,
    port: int,
    disabled: bool,
    disabled_tags: bool,
):
Parameter Type
host str
port int
disabled bool
disabled_tags bool

Methods

Method Description
auto() Reads from environment variable, followed by ConfigFile provided

auto()

def auto(
    config_file: typing.Union[str, ConfigFile],
):

Reads from environment variable, followed by ConfigFile provided

Parameter Type
config_file typing.Union[str, ConfigFile]

flytekit.bin.entrypoint.SyncCheckpoint

This class is NOT THREAD-SAFE! Sync Checkpoint, will synchronously checkpoint a user given file or folder. It will also synchronously download / restore previous checkpoints, when restore is invoked.

TODO: Implement an async checkpoint system

def SyncCheckpoint(
    checkpoint_dest: str,
    checkpoint_src: typing.Optional[str],
):
Parameter Type
checkpoint_dest str
checkpoint_src typing.Optional[str]

Methods

Method Description
prev_exists() None
read() This should only be used if there is a singular checkpoint file written
restore() Given a path, if a previous checkpoint exists, will be downloaded to this path
save()
write() This will overwrite the checkpoint

prev_exists()

def prev_exists()

read()

def read()

This should only be used if there is a singular checkpoint file written. If more than one checkpoint file is found, this will raise a ValueError

restore()

def restore(
    path: typing.Union[pathlib.Path, str, NoneType],
):

Given a path, if a previous checkpoint exists, will be downloaded to this path. If download is successful the downloaded path is returned

.. note:

Download will not be performed, if the checkpoint was previously restored. The method will return the previously downloaded path.

Parameter Type
path typing.Union[pathlib.Path, str, NoneType]

save()

def save(
    cp: typing.Union[pathlib.Path, str, _io.BufferedReader],
):
Parameter Type
cp typing.Union[pathlib.Path, str, _io.BufferedReader]

write()

def write(
    b: bytes,
):

This will overwrite the checkpoint. It can be retrieved using read or restore

Parameter Type
b bytes

flytekit.bin.entrypoint.Timestamp

A ProtocolMessage

Methods

Method Description
FromDatetime() Converts datetime to Timestamp
FromJsonString() Parse a RFC 3339 date string format to Timestamp
FromMicroseconds() Converts microseconds since epoch to Timestamp
FromMilliseconds() Converts milliseconds since epoch to Timestamp
FromNanoseconds() Converts nanoseconds since epoch to Timestamp
FromSeconds() Converts seconds since epoch to Timestamp
GetCurrentTime() Get the current UTC into Timestamp
ToDatetime() Converts Timestamp to a datetime
ToJsonString() Converts Timestamp to RFC 3339 date string format
ToMicroseconds() Converts Timestamp to microseconds since epoch
ToMilliseconds() Converts Timestamp to milliseconds since epoch
ToNanoseconds() Converts Timestamp to nanoseconds since epoch
ToSeconds() Converts Timestamp to seconds since epoch

FromDatetime()

def FromDatetime(
    dt,
):

Converts datetime to Timestamp.

Parameter Type
dt

FromJsonString()

def FromJsonString(
    value,
):

Parse a RFC 3339 date string format to Timestamp.

Parameter Type
value

FromMicroseconds()

def FromMicroseconds(
    micros,
):

Converts microseconds since epoch to Timestamp.

Parameter Type
micros

FromMilliseconds()

def FromMilliseconds(
    millis,
):

Converts milliseconds since epoch to Timestamp.

Parameter Type
millis

FromNanoseconds()

def FromNanoseconds(
    nanos,
):

Converts nanoseconds since epoch to Timestamp.

Parameter Type
nanos

FromSeconds()

def FromSeconds(
    seconds,
):

Converts seconds since epoch to Timestamp.

Parameter Type
seconds

GetCurrentTime()

def GetCurrentTime()

Get the current UTC into Timestamp.

ToDatetime()

def ToDatetime(
    tzinfo,
):

Converts Timestamp to a datetime.

Parameter Type
tzinfo

ToJsonString()

def ToJsonString()

Converts Timestamp to RFC 3339 date string format.

Returns: A string converted from timestamp. The string is always Z-normalized and uses 3, 6 or 9 fractional digits as required to represent the exact time. Example of the return format: ‘1972-01-01T10:00:20.021Z’

ToMicroseconds()

def ToMicroseconds()

Converts Timestamp to microseconds since epoch.

ToMilliseconds()

def ToMilliseconds()

Converts Timestamp to milliseconds since epoch.

ToNanoseconds()

def ToNanoseconds()

Converts Timestamp to nanoseconds since epoch.

ToSeconds()

def ToSeconds()

Converts Timestamp to seconds since epoch.

flytekit.bin.entrypoint.VoidPromise

This object is returned for tasks that do not return any outputs (declared interface is empty) VoidPromise cannot be interacted with and does not allow comparisons or any operations

def VoidPromise(
    task_name: str,
    ref: Optional[NodeOutput],
):
Parameter Type
task_name str
ref Optional[NodeOutput]

Methods

Method Description
runs_before() This is a placeholder and should do nothing
with_overrides() None

runs_before()

def runs_before(
    args,
    kwargs,
):

This is a placeholder and should do nothing. It is only here to enable local execution of workflows where a task returns nothing.

Parameter Type
args *args
kwargs **kwargs

with_overrides()

def with_overrides(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

Properties

Property Type Description
ref
task_name