flytekit.core.context_manager
.. autoclass:: flytekit.core.context_manager::ExecutionState.Mode
:noindex:
.. autoclass:: flytekit.core.context_manager::ExecutionState.Mode.TASK_EXECUTION
:noindex:
.. autoclass:: flytekit.core.context_manager::ExecutionState.Mode.LOCAL_WORKFLOW_EXECUTION
:noindex:
.. autoclass:: flytekit.core.context_manager::ExecutionState.Mode.LOCAL_TASK_EXECUTION
:noindex:
Directory
Classes
Class |
Description |
BranchEvalMode |
This is a 3-way class, with the None value meaning that we are not within a conditional context. |
Checkpoint |
Base class for Checkpoint system. |
CompilationState |
Compilation state is used during the compilation of a workflow or task. |
Config |
This the parent configuration object and holds all the underlying configuration object types. |
ContextVar |
None. |
Enum |
Create a collection of name/value pairs. |
ExecutionParameters |
This is a run-time user-centric context object that is accessible to every @task method. |
ExecutionState |
This is the context that is active when executing a task or a local workflow. |
FileAccessProvider |
This is the class that is available through the FlyteContext and can be used for persisting data to the remote. |
FlyteContext |
This is an internal-facing context object, that most users will not have to deal with. |
FlyteContextManager |
FlyteContextManager manages the execution context within Flytekit. |
FlyteEntities |
This is a global Object that tracks various tasks and workflows that are declared within a VM during the. |
FrameType |
None. |
Node |
This class will hold all the things necessary to make an SdkNode but we won’t make one until we know things like. |
OutputMetadata |
None. |
OutputMetadataTracker |
This class is for the users to set arbitrary metadata on output literals. |
SecretsConfig |
Configuration for secrets. |
SecretsManager |
This provides a secrets resolution logic at runtime. |
SerializableToString |
This protocol is used by the Artifact create_from function. |
SerializationSettings |
These settings are provided while serializing a workflow and task, before registration. |
SyncCheckpoint |
This class is NOT THREAD-SAFE!. |
WorkflowExecutionIdentifier |
None. |
datetime |
datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]]). |
timezone |
Fixed offset from UTC implementation of tzinfo. |
flytekit.core.context_manager.BranchEvalMode
This is a 3-way class, with the None value meaning that we are not within a conditional context. The other two
values are
- Active - This means that the next
then
should run
- Skipped - The next
then
should not run
flytekit.core.context_manager.Checkpoint
Base class for Checkpoint system. Checkpoint system allows reading and writing custom checkpoints from user
scripts
Methods
Method |
Description |
prev_exists() |
None |
read() |
This should only be used if there is a singular checkpoint file written |
restore() |
Given a path, if a previous checkpoint exists, will be downloaded to this path |
save() |
|
write() |
This will overwrite the checkpoint |
prev_exists()
read()
This should only be used if there is a singular checkpoint file written. If more than one checkpoint file is
found, this will raise a ValueError
restore()
def restore(
path: typing.Union[pathlib.Path, str],
):
Given a path, if a previous checkpoint exists, will be downloaded to this path.
If download is successful the downloaded path is returned
.. note:
Download will not be performed, if the checkpoint was previously restored. The method will return the
previously downloaded path.
Parameter |
Type |
path |
typing.Union[pathlib.Path, str] |
save()
def save(
cp: typing.Union[pathlib.Path, str, _io.BufferedReader],
):
Parameter |
Type |
cp |
typing.Union[pathlib.Path, str, _io.BufferedReader] |
write()
This will overwrite the checkpoint. It can be retrieved using read or restore
flytekit.core.context_manager.CompilationState
Compilation state is used during the compilation of a workflow or task. It stores the nodes that were
created when walking through the workflow graph.
Attributes:
prefix (str): This is because we may one day want to be able to have subworkflows inside other workflows. If
users choose to not specify their node names, then we can end up with multiple “n0"s. This prefix allows
us to give those nested nodes a distinct name, as well as properly identify them in the workflow.
mode (int): refer to :py:class:flytekit.extend.ExecutionState.Mode
task_resolver (Optional[TaskResolverMixin]): Please see :py:class:flytekit.extend.TaskResolverMixin
nodes (Optional[List]): Stores currently compiled nodes so far.
def CompilationState(
prefix: str,
mode: int,
task_resolver: Optional[TaskResolverMixin],
nodes: List,
):
Parameter |
Type |
prefix |
str |
mode |
int |
task_resolver |
Optional[TaskResolverMixin] |
nodes |
List |
Methods
Method |
Description |
add_node() |
None |
with_params() |
Create a new CompilationState where the mode and task resolver are defaulted to the current object, but they |
add_node()
def add_node(
n: Node,
):
with_params()
def with_params(
prefix: str,
mode: Optional[int],
resolver: Optional[TaskResolverMixin],
nodes: Optional[List],
):
Create a new CompilationState where the mode and task resolver are defaulted to the current object, but they
and all other args are taken if explicitly provided as an argument.
Usage:
s.with_params(“p”, nodes=[])
Parameter |
Type |
prefix |
str |
mode |
Optional[int] |
resolver |
Optional[TaskResolverMixin] |
nodes |
Optional[List] |
flytekit.core.context_manager.Config
This the parent configuration object and holds all the underlying configuration object types. An instance of
this object holds all the config necessary to
- Interactive session with Flyte backend
- Some parts are required for Serialization, for example Platform Config is not required
- Runtime of a task
def Config(
platform: PlatformConfig,
secrets: SecretsConfig,
stats: StatsConfig,
data_config: DataConfig,
local_sandbox_path: str,
):
Parameter |
Type |
platform |
PlatformConfig |
secrets |
SecretsConfig |
stats |
StatsConfig |
data_config |
DataConfig |
local_sandbox_path |
str |
Methods
Method |
Description |
auto() |
Automatically constructs the Config Object |
for_endpoint() |
Creates an automatic config for the given endpoint and uses the config_file or environment variable for default |
for_sandbox() |
Constructs a new Config object specifically to connect to :std:ref:deployment-deployment-sandbox |
with_params() |
None |
auto()
def auto(
config_file: typing.Union[str, ConfigFile, None],
):
Automatically constructs the Config Object. The order of precedence is as follows
- first try to find any env vars that match the config vars specified in the FLYTE_CONFIG format.
- If not found in environment then values ar read from the config file
- If not found in the file, then the default values are used.
Parameter |
Type |
config_file |
typing.Union[str, ConfigFile, None] |
for_endpoint()
def for_endpoint(
endpoint: str,
insecure: bool,
data_config: typing.Optional[DataConfig],
config_file: typing.Union[str, ConfigFile],
):
Creates an automatic config for the given endpoint and uses the config_file or environment variable for default.
Refer to Config.auto()
to understand the default bootstrap behavior.
data_config can be used to configure how data is downloaded or uploaded to a specific Blob storage like S3 / GCS etc.
But, for permissions to a specific backend just use Cloud providers reqcommendation. If using fsspec, then
refer to fsspec documentation
Parameter |
Type |
endpoint |
str |
insecure |
bool |
data_config |
typing.Optional[DataConfig] |
config_file |
typing.Union[str, ConfigFile] |
for_sandbox()
Constructs a new Config object specifically to connect to :std:ref:deployment-deployment-sandbox
.
If you are using a hosted Sandbox like environment, then you may need to use port-forward or ingress urls
:return: Config
with_params()
def with_params(
platform: PlatformConfig,
secrets: SecretsConfig,
stats: StatsConfig,
data_config: DataConfig,
local_sandbox_path: str,
):
Parameter |
Type |
platform |
PlatformConfig |
secrets |
SecretsConfig |
stats |
StatsConfig |
data_config |
DataConfig |
local_sandbox_path |
str |
flytekit.core.context_manager.ContextVar
flytekit.core.context_manager.Enum
Create a collection of name/value pairs.
Example enumeration:
class Color(Enum):
… RED = 1
… BLUE = 2
… GREEN = 3
Access them by:
Color.RED
<Color.RED: 1>
Color(1)
<Color.RED: 1>
Color[‘RED’]
<Color.RED: 1>
Enumerations can be iterated over, and know how many members they have:
len(Color)
3
list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]
Methods can be added to enumerations, and members can have their own
attributes – see the documentation for details.
flytekit.core.context_manager.ExecutionParameters
This is a run-time user-centric context object that is accessible to every @task method. It can be accessed using
.. code-block:: python
flytekit.current_context()
This object provides the following
- a statsd handler
- a logging handler
- the execution ID as an :py:class:
flytekit.models.core.identifier.WorkflowExecutionIdentifier
object
- a working directory for the user to write arbitrary files to
Please do not confuse this object with the :py:class:flytekit.FlyteContext
object.
def ExecutionParameters(
execution_date,
tmp_dir,
stats,
execution_id: typing.Optional[_identifier.WorkflowExecutionIdentifier],
logging,
raw_output_prefix,
output_metadata_prefix,
checkpoint,
decks,
task_id: typing.Optional[_identifier.Identifier],
enable_deck: bool,
kwargs,
):
Parameter |
Type |
execution_date |
|
tmp_dir |
|
stats |
|
execution_id |
typing.Optional[_identifier.WorkflowExecutionIdentifier] |
logging |
|
raw_output_prefix |
|
output_metadata_prefix |
|
checkpoint |
|
decks |
|
task_id |
typing.Optional[_identifier.Identifier] |
enable_deck |
bool |
kwargs |
**kwargs |
Methods
builder()
get()
Returns task specific context if present else raise an error. The returned context will match the key
has_attr()
def has_attr(
attr_name: str,
):
Parameter |
Type |
attr_name |
str |
new_builder()
def new_builder(
current: Optional[ExecutionParameters],
):
Parameter |
Type |
current |
Optional[ExecutionParameters] |
with_enable_deck()
def with_enable_deck(
enable_deck: bool,
):
Parameter |
Type |
enable_deck |
bool |
with_task_sandbox()
Properties
Property |
Type |
Description |
checkpoint |
|
|
decks |
|
|
default_deck |
|
|
enable_deck |
|
|
execution_date |
|
|
execution_id |
|
|
logging |
|
|
output_metadata_prefix |
|
|
raw_output_prefix |
|
|
secrets |
|
|
stats |
|
|
task_id |
|
|
timeline_deck |
|
|
working_directory |
|
|
flytekit.core.context_manager.ExecutionState
This is the context that is active when executing a task or a local workflow. This carries the necessary state to
execute.
Some required things during execution deal with temporary directories, ExecutionParameters that are passed to the
user etc.
Attributes:
mode (ExecutionState.Mode): Defines the context in which the task is executed (local, hosted, etc).
working_dir (os.PathLike): Specifies the remote, external directory where inputs, outputs and other protobufs
are uploaded
engine_dir (os.PathLike):
branch_eval_mode Optional[BranchEvalMode]: Used to determine whether a branch node should execute.
user_space_params Optional[ExecutionParameters]: Provides run-time, user-centric context such as a statsd
handler, a logging handler, the current execution id and a working directory.
def ExecutionState(
working_dir: Union[os.PathLike, str],
mode: Optional[ExecutionState.Mode],
engine_dir: Optional[Union[os.PathLike, str]],
branch_eval_mode: Optional[BranchEvalMode],
user_space_params: Optional[ExecutionParameters],
):
Parameter |
Type |
working_dir |
Union[os.PathLike, str] |
mode |
Optional[ExecutionState.Mode] |
engine_dir |
Optional[Union[os.PathLike, str]] |
branch_eval_mode |
Optional[BranchEvalMode] |
user_space_params |
Optional[ExecutionParameters] |
Methods
Method |
Description |
branch_complete() |
Indicates that we are within a conditional / ifelse block and the active branch is not done |
is_local_execution() |
None |
take_branch() |
Indicates that we are within an if-else block and the current branch has evaluated to true |
with_params() |
Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values |
branch_complete()
Indicates that we are within a conditional / ifelse block and the active branch is not done.
Default to SKIPPED
is_local_execution()
take_branch()
Indicates that we are within an if-else block and the current branch has evaluated to true.
Useful only in local execution mode
with_params()
def with_params(
working_dir: Optional[os.PathLike],
mode: Optional[Mode],
engine_dir: Optional[os.PathLike],
branch_eval_mode: Optional[BranchEvalMode],
user_space_params: Optional[ExecutionParameters],
):
Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values.
Parameter |
Type |
working_dir |
Optional[os.PathLike] |
mode |
Optional[Mode] |
engine_dir |
Optional[os.PathLike] |
branch_eval_mode |
Optional[BranchEvalMode] |
user_space_params |
Optional[ExecutionParameters] |
flytekit.core.context_manager.FileAccessProvider
This is the class that is available through the FlyteContext and can be used for persisting data to the remote
durable store.
def FileAccessProvider(
local_sandbox_dir: typing.Union[str, os.PathLike],
raw_output_prefix: str,
data_config: typing.Optional[flytekit.configuration.DataConfig],
execution_metadata: typing.Optional[dict],
):
Parameter |
Type |
local_sandbox_dir |
typing.Union[str, os.PathLike] |
raw_output_prefix |
str |
data_config |
typing.Optional[flytekit.configuration.DataConfig] |
execution_metadata |
typing.Optional[dict] |
Methods
async_get_data()
def async_get_data(
remote_path: str,
local_path: str,
is_multipart: bool,
kwargs,
):
Parameter |
Type |
remote_path |
str |
local_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
async_put_data()
def async_put_data(
local_path: typing.Union[str, os.PathLike],
remote_path: str,
is_multipart: bool,
kwargs,
):
The implication here is that we’re always going to put data to the remote location, so we .remote to ensure
we don’t use the true local proxy if the remote path is a file://
Parameter |
Type |
local_path |
typing.Union[str, os.PathLike] |
remote_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
async_put_raw_data()
def async_put_raw_data(
lpath: typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO],
upload_prefix: typing.Optional[str],
file_name: typing.Optional[str],
read_chunk_size_bytes: int,
encoding: str,
skip_raw_data_prefix: bool,
kwargs,
):
This is a more flexible version of put that accepts a file-like object or a string path.
Writes to the raw output prefix only. If you want to write to another fs use put_data or get the fsspec
file system directly.
FYI: Currently the raw output prefix set by propeller is already unique per retry and looks like
s3://my-s3-bucket/data/o4/feda4e266c748463a97d-n0-0
If lpath is a folder, then recursive will be set.
If lpath is a streamable, then it can only be a single file.
Writes to:
{raw output prefix}/{upload_prefix}/{file_name}
Parameter |
Type |
lpath |
typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO] |
upload_prefix |
typing.Optional[str] |
file_name |
typing.Optional[str] |
read_chunk_size_bytes |
int |
encoding |
str |
skip_raw_data_prefix |
bool |
kwargs |
**kwargs |
download()
def download(
remote_path: str,
local_path: str,
kwargs,
):
Downloads from remote to local
Parameter |
Type |
remote_path |
str |
local_path |
str |
kwargs |
**kwargs |
download_directory()
def download_directory(
remote_path: str,
local_path: str,
kwargs,
):
Downloads directory from given remote to local path
Parameter |
Type |
remote_path |
str |
local_path |
str |
kwargs |
**kwargs |
exists()
def exists(
path: str,
):
generate_new_custom_path()
def generate_new_custom_path(
fs: typing.Optional[fsspec.spec.AbstractFileSystem],
alt: typing.Optional[str],
stem: typing.Optional[str],
):
Generates a new path with the raw output prefix and a random string appended to it.
Optionally, you can provide an alternate prefix and a stem. If stem is provided, it
will be appended to the path instead of a random string. If alt is provided, it will
replace the first part of the output prefix, e.g. the S3 or GCS bucket.
If wanting to write to a non-random prefix in a non-default S3 bucket, this can be
called with alt=“my-alt-bucket” and stem=“my-stem” to generate a path like
s3://my-alt-bucket/default-prefix-part/my-stem
Parameter |
Type |
fs |
typing.Optional[fsspec.spec.AbstractFileSystem] |
alt |
typing.Optional[str] |
stem |
typing.Optional[str] |
get()
def get(
from_path: str,
to_path: str,
recursive: bool,
kwargs,
):
Parameter |
Type |
from_path |
str |
to_path |
str |
recursive |
bool |
kwargs |
**kwargs |
get_async_filesystem_for_path()
def get_async_filesystem_for_path(
path: str,
anonymous: bool,
kwargs,
):
Parameter |
Type |
path |
str |
anonymous |
bool |
kwargs |
**kwargs |
get_data()
def get_data(
remote_path: str,
local_path: str,
is_multipart: bool,
kwargs,
):
Parameter |
Type |
remote_path |
str |
local_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
get_file_tail()
def get_file_tail(
file_path_or_file_name: str,
):
Parameter |
Type |
file_path_or_file_name |
str |
get_filesystem()
def get_filesystem(
protocol: typing.Optional[str],
anonymous: bool,
path: typing.Optional[str],
kwargs,
):
Parameter |
Type |
protocol |
typing.Optional[str] |
anonymous |
bool |
path |
typing.Optional[str] |
kwargs |
**kwargs |
get_filesystem_for_path()
def get_filesystem_for_path(
path: str,
anonymous: bool,
kwargs,
):
Parameter |
Type |
path |
str |
anonymous |
bool |
kwargs |
**kwargs |
get_random_local_directory()
def get_random_local_directory()
get_random_local_path()
def get_random_local_path(
file_path_or_file_name: typing.Optional[str],
):
Use file_path_or_file_name, when you want a random directory, but want to preserve the leaf file name
Parameter |
Type |
file_path_or_file_name |
typing.Optional[str] |
get_random_remote_directory()
def get_random_remote_directory()
get_random_remote_path()
def get_random_remote_path(
file_path_or_file_name: typing.Optional[str],
):
Parameter |
Type |
file_path_or_file_name |
typing.Optional[str] |
get_random_string()
is_remote()
def is_remote(
path: typing.Union[str, os.PathLike],
):
Deprecated. Let’s find a replacement
Parameter |
Type |
path |
typing.Union[str, os.PathLike] |
join()
def join(
args: `*args`,
unstrip: bool,
fs: typing.Optional[fsspec.spec.AbstractFileSystem],
):
Parameter |
Type |
args |
*args |
unstrip |
bool |
fs |
typing.Optional[fsspec.spec.AbstractFileSystem] |
put_data()
def put_data(
local_path: typing.Union[str, os.PathLike],
remote_path: str,
is_multipart: bool,
kwargs,
):
The implication here is that we’re always going to put data to the remote location, so we .remote to ensure
we don’t use the true local proxy if the remote path is a file://
Parameter |
Type |
local_path |
typing.Union[str, os.PathLike] |
remote_path |
str |
is_multipart |
bool |
kwargs |
**kwargs |
put_raw_data()
def put_raw_data(
lpath: typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO],
upload_prefix: typing.Optional[str],
file_name: typing.Optional[str],
read_chunk_size_bytes: int,
encoding: str,
skip_raw_data_prefix: bool,
kwargs,
):
This is a more flexible version of put that accepts a file-like object or a string path.
Writes to the raw output prefix only. If you want to write to another fs use put_data or get the fsspec
file system directly.
FYI: Currently the raw output prefix set by propeller is already unique per retry and looks like
s3://my-s3-bucket/data/o4/feda4e266c748463a97d-n0-0
If lpath is a folder, then recursive will be set.
If lpath is a streamable, then it can only be a single file.
Writes to:
{raw output prefix}/{upload_prefix}/{file_name}
Parameter |
Type |
lpath |
typing.Union[str, os.PathLike, pathlib.Path, bytes, _io.BufferedReader, _io.BytesIO, _io.StringIO] |
upload_prefix |
typing.Optional[str] |
file_name |
typing.Optional[str] |
read_chunk_size_bytes |
int |
encoding |
str |
skip_raw_data_prefix |
bool |
kwargs |
**kwargs |
recursive_paths()
def recursive_paths(
f: str,
t: str,
):
Parameter |
Type |
f |
str |
t |
str |
sep()
def sep(
file_system: typing.Optional[fsspec.spec.AbstractFileSystem],
):
Parameter |
Type |
file_system |
typing.Optional[fsspec.spec.AbstractFileSystem] |
def strip_file_header(
path: str,
trim_trailing_sep: bool,
):
Drops file:// if it exists from the file
Parameter |
Type |
path |
str |
trim_trailing_sep |
bool |
upload()
def upload(
file_path: str,
to_path: str,
kwargs,
):
Parameter |
Type |
file_path |
str |
to_path |
str |
kwargs |
**kwargs |
upload_directory()
def upload_directory(
local_path: str,
remote_path: str,
kwargs,
):
Parameter |
Type |
local_path |
str |
remote_path |
str |
kwargs |
**kwargs |
Properties
Property |
Type |
Description |
data_config |
|
|
local_access |
|
|
local_sandbox_dir |
|
|
raw_output_fs |
|
|
raw_output_prefix |
|
|
flytekit.core.context_manager.FlyteContext
This is an internal-facing context object, that most users will not have to deal with. It’s essentially a globally
available grab bag of settings and objects that allows flytekit to do things like convert complex types, run and
compile workflows, serialize Flyte entities, etc.
Even though this object as a current_context
function on it, it should not be called directly. Please use the
:py:class:flytekit.FlyteContextManager
object instead.
Please do not confuse this object with the :py:class:flytekit.ExecutionParameters
object.
def FlyteContext(
file_access: FileAccessProvider,
level: int,
flyte_client: Optional['friendly_client.SynchronousFlyteClient'],
compilation_state: Optional[CompilationState],
execution_state: Optional[ExecutionState],
serialization_settings: Optional[SerializationSettings],
in_a_condition: bool,
origin_stackframe: Optional[traceback.FrameSummary],
output_metadata_tracker: Optional[OutputMetadataTracker],
worker_queue: Optional[Controller],
):
Parameter |
Type |
file_access |
FileAccessProvider |
level |
int |
flyte_client |
Optional['friendly_client.SynchronousFlyteClient'] |
compilation_state |
Optional[CompilationState] |
execution_state |
Optional[ExecutionState] |
serialization_settings |
Optional[SerializationSettings] |
in_a_condition |
bool |
origin_stackframe |
Optional[traceback.FrameSummary] |
output_metadata_tracker |
Optional[OutputMetadataTracker] |
worker_queue |
Optional[Controller] |
Methods
current_context()
This method exists only to maintain backwards compatibility. Please use
FlyteContextManager.current_context()
instead.
Users of flytekit should be wary not to confuse the object returned from this function
with :py:func:flytekit.current_context
enter_conditional_section()
def enter_conditional_section()
get_deck()
Returns the deck that was created as part of the last execution.
The return value depends on the execution environment. In a notebook, the return value is compatible with
IPython.display and should be rendered in the notebook.
.. code-block:: python
with flytekit.new_context() as ctx:
my_task(…)
ctx.get_deck()
OR if you wish to explicitly display
.. code-block:: python
from IPython import display
display(ctx.get_deck())
get_origin_stackframe_repr()
def get_origin_stackframe_repr()
new_builder()
new_compilation_state()
def new_compilation_state(
prefix: str,
):
Creates and returns a default compilation state. For most of the code this should be the entrypoint
of compilation, otherwise the code should always uses - with_compilation_state
Parameter |
Type |
prefix |
str |
new_execution_state()
def new_execution_state(
working_dir: Optional[os.PathLike],
):
Creates and returns a new default execution state. This should be used at the entrypoint of execution,
in all other cases it is preferable to use with_execution_state
Parameter |
Type |
working_dir |
Optional[os.PathLike] |
set_stackframe()
def set_stackframe(
s: traceback.FrameSummary,
):
Parameter |
Type |
s |
traceback.FrameSummary |
with_client()
def with_client(
c: SynchronousFlyteClient,
):
Parameter |
Type |
c |
SynchronousFlyteClient |
with_compilation_state()
def with_compilation_state(
c: CompilationState,
):
Parameter |
Type |
c |
CompilationState |
with_execution_state()
def with_execution_state(
es: ExecutionState,
):
Parameter |
Type |
es |
ExecutionState |
with_file_access()
def with_file_access(
fa: FileAccessProvider,
):
Parameter |
Type |
fa |
FileAccessProvider |
with_new_compilation_state()
def with_new_compilation_state()
def with_output_metadata_tracker(
t: OutputMetadataTracker,
):
Parameter |
Type |
t |
OutputMetadataTracker |
with_serialization_settings()
def with_serialization_settings(
ss: SerializationSettings,
):
Parameter |
Type |
ss |
SerializationSettings |
with_worker_queue()
def with_worker_queue(
wq: Controller,
):
Parameter |
Type |
wq |
Controller |
Properties
Property |
Type |
Description |
user_space_params |
|
|
flytekit.core.context_manager.FlyteContextManager
FlyteContextManager manages the execution context within Flytekit. It holds global state of either compilation
or Execution. It is not thread-safe and can only be run as a single threaded application currently.
Context’s within Flytekit is useful to manage compilation state and execution state. Refer to CompilationState
and ExecutionState
for more information. FlyteContextManager provides a singleton stack to manage these contexts.
Typical usage is
.. code-block:: python
FlyteContextManager.initialize()
with FlyteContextManager.with_context(o) as ctx:
pass
If required - not recommended you can use
FlyteContextManager.push_context()
but correspondingly a pop_context should be called
FlyteContextManager.pop_context()
Methods
add_signal_handler()
def add_signal_handler(
handler: typing.Callable[[int, FrameType], typing.Any],
):
Parameter |
Type |
handler |
typing.Callable[[int, FrameType], typing.Any] |
current_context()
get_origin_stackframe()
def get_origin_stackframe(
limit,
):
initialize()
Re-initializes the context and erases the entire context
pop_context()
push_context()
def push_context(
ctx: FlyteContext,
f: Optional[traceback.FrameSummary],
):
Parameter |
Type |
ctx |
FlyteContext |
f |
Optional[traceback.FrameSummary] |
size()
with_context()
def with_context(
b: FlyteContext.Builder,
):
Parameter |
Type |
b |
FlyteContext.Builder |
flytekit.core.context_manager.FlyteEntities
This is a global Object that tracks various tasks and workflows that are declared within a VM during the
registration process
flytekit.core.context_manager.FrameType
flytekit.core.context_manager.Node
This class will hold all the things necessary to make an SdkNode but we won’t make one until we know things like
ID, which from the registration step
def Node(
id: str,
metadata: _workflow_model.NodeMetadata,
bindings: List[_literal_models.Binding],
upstream_nodes: List[Node],
flyte_entity: Any,
):
Parameter |
Type |
id |
str |
metadata |
_workflow_model.NodeMetadata |
bindings |
List[_literal_models.Binding] |
upstream_nodes |
List[Node] |
flyte_entity |
Any |
Methods
runs_before()
def runs_before(
other: Node,
):
This is typically something we shouldn’t do. This modifies an attribute of the other instance rather than
self. But it’s done so only because we wanted this English function to be the same as the shift function.
That is, calling node_1.runs_before(node_2) and node_1 » node_2 are the same. The shift operator going the
other direction is not implemented to further avoid confusion. Right shift was picked rather than left shift
because that’s what most users are familiar with.
Parameter |
Type |
other |
Node |
with_overrides()
def with_overrides(
node_name: Optional[str],
aliases: Optional[Dict[str, str]],
requests: Optional[Resources],
limits: Optional[Resources],
timeout: Optional[Union[int, datetime.timedelta, object]],
retries: Optional[int],
interruptible: Optional[bool],
name: Optional[str],
task_config: Optional[Any],
container_image: Optional[str],
accelerator: Optional[BaseAccelerator],
cache: Optional[bool],
cache_version: Optional[str],
cache_serialize: Optional[bool],
shared_memory: Optional[Union[L[True], str]],
pod_template: Optional[PodTemplate],
resources: Optional[Resources],
args,
kwargs,
):
Parameter |
Type |
node_name |
Optional[str] |
aliases |
Optional[Dict[str, str]] |
requests |
Optional[Resources] |
limits |
Optional[Resources] |
timeout |
Optional[Union[int, datetime.timedelta, object]] |
retries |
Optional[int] |
interruptible |
Optional[bool] |
name |
Optional[str] |
task_config |
Optional[Any] |
container_image |
Optional[str] |
accelerator |
Optional[BaseAccelerator] |
cache |
Optional[bool] |
cache_version |
Optional[str] |
cache_serialize |
Optional[bool] |
shared_memory |
Optional[Union[L[True], str]] |
pod_template |
Optional[PodTemplate] |
resources |
Optional[Resources] |
args |
*args |
kwargs |
**kwargs |
Properties
Property |
Type |
Description |
bindings |
|
|
flyte_entity |
|
|
id |
|
|
metadata |
|
|
name |
|
|
outputs |
|
|
run_entity |
|
|
upstream_nodes |
|
|
flytekit.core.context_manager.OutputMetadata
def OutputMetadata(
artifact: 'Artifact',
dynamic_partitions: Optional[typing.Dict[str, str]],
time_partition: Optional[datetime],
additional_items: Optional[typing.List[SerializableToString]],
):
Parameter |
Type |
artifact |
'Artifact' |
dynamic_partitions |
Optional[typing.Dict[str, str]] |
time_partition |
Optional[datetime] |
additional_items |
Optional[typing.List[SerializableToString]] |
flytekit.core.context_manager.OutputMetadataTracker
This class is for the users to set arbitrary metadata on output literals.
Attributes:
output_metadata Optional[TaskOutputMetadata]: is a sparse dictionary of metadata that the user wants to attach
to each output of a task. The key is the output value (object) and the value is an OutputMetadata object.
def OutputMetadataTracker(
output_metadata: typing.Dict[typing.Any, OutputMetadata],
):
Parameter |
Type |
output_metadata |
typing.Dict[typing.Any, OutputMetadata] |
Methods
add()
def add(
obj: typing.Any,
metadata: OutputMetadata,
):
Parameter |
Type |
obj |
typing.Any |
metadata |
OutputMetadata |
get()
def get(
obj: typing.Any,
):
Parameter |
Type |
obj |
typing.Any |
with_params()
def with_params(
output_metadata: Optional[TaskOutputMetadata],
):
Produces a copy of the current object and set new things
Parameter |
Type |
output_metadata |
Optional[TaskOutputMetadata] |
flytekit.core.context_manager.SecretsConfig
Configuration for secrets.
def SecretsConfig(
env_prefix: str,
default_dir: str,
file_prefix: str,
):
Parameter |
Type |
env_prefix |
str |
default_dir |
str |
file_prefix |
str |
Methods
Method |
Description |
auto() |
Reads from environment variable or from config file |
auto()
def auto(
config_file: typing.Union[str, ConfigFile],
):
Reads from environment variable or from config file
Parameter |
Type |
config_file |
typing.Union[str, ConfigFile] |
flytekit.core.context_manager.SecretsManager
This provides a secrets resolution logic at runtime.
The resolution order is
- Try env var first. The env var should have the configuration.SECRETS_ENV_PREFIX. The env var will be all upper
cased
- If not then try the file where the name matches lower case
configuration.SECRETS_DEFAULT_DIR/<group>/configuration.SECRETS_FILE_PREFIX<key>
All configuration values can always be overridden by injecting an environment variable
def SecretsManager(
secrets_cfg: typing.Optional[SecretsConfig],
):
Parameter |
Type |
secrets_cfg |
typing.Optional[SecretsConfig] |
Methods
Method |
Description |
get() |
Retrieves a secret using the resolution order -> Env followed by file |
get_secrets_env_var() |
Returns a string that matches the ENV Variable to look for the secrets |
get_secrets_file() |
Returns a path that matches the file to look for the secrets |
get()
def get(
group: Optional[str],
key: Optional[str],
group_version: Optional[str],
encode_mode: str,
):
Retrieves a secret using the resolution order -> Env followed by file. If not found raises a ValueError
param encode_mode, defines the mode to open files, it can either be “r” to read file, or “rb” to read binary file
Parameter |
Type |
group |
Optional[str] |
key |
Optional[str] |
group_version |
Optional[str] |
encode_mode |
str |
get_secrets_env_var()
def get_secrets_env_var(
group: Optional[str],
key: Optional[str],
group_version: Optional[str],
):
Returns a string that matches the ENV Variable to look for the secrets
Parameter |
Type |
group |
Optional[str] |
key |
Optional[str] |
group_version |
Optional[str] |
get_secrets_file()
def get_secrets_file(
group: Optional[str],
key: Optional[str],
group_version: Optional[str],
):
Returns a path that matches the file to look for the secrets
Parameter |
Type |
group |
Optional[str] |
key |
Optional[str] |
group_version |
Optional[str] |
flytekit.core.context_manager.SerializableToString
This protocol is used by the Artifact create_from function. Basically these objects are serialized when running,
and then added to a literal’s metadata.
def SerializableToString(
args,
kwargs,
):
Parameter |
Type |
args |
*args |
kwargs |
**kwargs |
Methods
serialize_to_string()
def serialize_to_string(
ctx: FlyteContext,
variable_name: str,
):
Parameter |
Type |
ctx |
FlyteContext |
variable_name |
str |
flytekit.core.context_manager.SerializationSettings
These settings are provided while serializing a workflow and task, before registration. This is required to get
runtime information at serialization time, as well as some defaults.
Attributes:
project (str): The project (if any) with which to register entities under.
domain (str): The domain (if any) with which to register entities under.
version (str): The version (if any) with which to register entities under.
image_config (ImageConfig): The image config used to define task container images.
env (Optional[Dict[str, str]]): Environment variables injected into task container definitions.
flytekit_virtualenv_root (Optional[str]): During out of container serialize the absolute path of the flytekit
virtualenv at serialization time won’t match the in-container value at execution time. This optional value
is used to provide the in-container virtualenv path
python_interpreter (Optional[str]): The python executable to use. This is used for spark tasks in out of
container execution.
entrypoint_settings (Optional[EntrypointSettings]): Information about the command, path and version of the
entrypoint program.
fast_serialization_settings (Optional[FastSerializationSettings]): If the code is being serialized so that it
can be fast registered (and thus omit building a Docker image) this object contains additional parameters
for serialization.
source_root (Optional[str]): The root directory of the source code.
def SerializationSettings(
image_config: ImageConfig,
project: typing.Optional[str],
domain: typing.Optional[str],
version: typing.Optional[str],
env: Optional[Dict[str, str]],
git_repo: Optional[str],
python_interpreter: str,
flytekit_virtualenv_root: Optional[str],
fast_serialization_settings: Optional[FastSerializationSettings],
source_root: Optional[str],
):
Parameter |
Type |
image_config |
ImageConfig |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
version |
typing.Optional[str] |
env |
Optional[Dict[str, str]] |
git_repo |
Optional[str] |
python_interpreter |
str |
flytekit_virtualenv_root |
Optional[str] |
fast_serialization_settings |
Optional[FastSerializationSettings] |
source_root |
Optional[str] |
Methods
default_entrypoint_settings()
def default_entrypoint_settings(
interpreter_path: str,
):
Assumes the entrypoint is installed in a virtual-environment where the interpreter is
Parameter |
Type |
interpreter_path |
str |
for_image()
def for_image(
image: str,
version: str,
project: str,
domain: str,
python_interpreter_path: str,
):
Parameter |
Type |
image |
str |
version |
str |
project |
str |
domain |
str |
python_interpreter_path |
str |
from_dict()
def from_dict(
kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
infer_missing,
):
Parameter |
Type |
kvs |
typing.Union[dict, list, str, int, float, bool, NoneType] |
infer_missing |
|
from_json()
def from_json(
s: typing.Union[str, bytes, bytearray],
parse_float,
parse_int,
parse_constant,
infer_missing,
kw,
):
Parameter |
Type |
s |
typing.Union[str, bytes, bytearray] |
parse_float |
|
parse_int |
|
parse_constant |
|
infer_missing |
|
kw |
|
from_transport()
def from_transport(
s: str,
):
new_builder()
Creates a SerializationSettings.Builder
that copies the existing serialization settings parameters and
allows for customization.
schema()
def schema(
infer_missing: bool,
only,
exclude,
many: bool,
context,
load_only,
dump_only,
partial: bool,
unknown,
):
Parameter |
Type |
infer_missing |
bool |
only |
|
exclude |
|
many |
bool |
context |
|
load_only |
|
dump_only |
|
partial |
bool |
unknown |
|
should_fast_serialize()
def should_fast_serialize()
Whether or not the serialization settings specify that entities should be serialized for fast registration.
to_dict()
def to_dict(
encode_json,
):
Parameter |
Type |
encode_json |
|
to_json()
def to_json(
skipkeys: bool,
ensure_ascii: bool,
check_circular: bool,
allow_nan: bool,
indent: typing.Union[int, str, NoneType],
separators: typing.Tuple[str, str],
default: typing.Callable,
sort_keys: bool,
kw,
):
Parameter |
Type |
skipkeys |
bool |
ensure_ascii |
bool |
check_circular |
bool |
allow_nan |
bool |
indent |
typing.Union[int, str, NoneType] |
separators |
typing.Tuple[str, str] |
default |
typing.Callable |
sort_keys |
bool |
kw |
|
venv_root_from_interpreter()
def venv_root_from_interpreter(
interpreter_path: str,
):
Computes the path of the virtual environment root, based on the passed in python interpreter path
for example /opt/venv/bin/python3 -> /opt/venv
Parameter |
Type |
interpreter_path |
str |
with_serialized_context()
def with_serialized_context()
Use this method to create a new SerializationSettings that has an environment variable set with the SerializedContext
This is useful in transporting SerializedContext to serialized and registered tasks.
The setting will be available in the env
field with the key SERIALIZED_CONTEXT_ENV_VAR
:return: A newly constructed SerializationSettings, or self, if it already has the serializationSettings
Properties
Property |
Type |
Description |
entrypoint_settings |
|
|
serialized_context |
|
|
flytekit.core.context_manager.SyncCheckpoint
This class is NOT THREAD-SAFE!
Sync Checkpoint, will synchronously checkpoint a user given file or folder.
It will also synchronously download / restore previous checkpoints, when restore is invoked.
TODO: Implement an async checkpoint system
def SyncCheckpoint(
checkpoint_dest: str,
checkpoint_src: typing.Optional[str],
):
Parameter |
Type |
checkpoint_dest |
str |
checkpoint_src |
typing.Optional[str] |
Methods
Method |
Description |
prev_exists() |
None |
read() |
This should only be used if there is a singular checkpoint file written |
restore() |
Given a path, if a previous checkpoint exists, will be downloaded to this path |
save() |
|
write() |
This will overwrite the checkpoint |
prev_exists()
read()
This should only be used if there is a singular checkpoint file written. If more than one checkpoint file is
found, this will raise a ValueError
restore()
def restore(
path: typing.Union[pathlib.Path, str, NoneType],
):
Given a path, if a previous checkpoint exists, will be downloaded to this path.
If download is successful the downloaded path is returned
.. note:
Download will not be performed, if the checkpoint was previously restored. The method will return the
previously downloaded path.
Parameter |
Type |
path |
typing.Union[pathlib.Path, str, NoneType] |
save()
def save(
cp: typing.Union[pathlib.Path, str, _io.BufferedReader],
):
Parameter |
Type |
cp |
typing.Union[pathlib.Path, str, _io.BufferedReader] |
write()
This will overwrite the checkpoint. It can be retrieved using read or restore
flytekit.core.context_manager.WorkflowExecutionIdentifier
def WorkflowExecutionIdentifier(
project,
domain,
name,
):
Parameter |
Type |
project |
|
domain |
|
name |
|
Methods
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter |
Type |
pb2_object |
|
from_python_std()
def from_python_std(
string,
):
Parses a string in the correct format into an identifier
def promote_from_model(
base_model,
):
Parameter |
Type |
base_model |
|
serialize_to_string()
def serialize_to_string()
short_string()
to_flyte_idl()
verbose_string()
Properties
Property |
Type |
Description |
domain |
|
|
is_empty |
|
|
name |
|
|
project |
|
|
flytekit.core.context_manager.datetime
datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]])
The year, month and day arguments are required. tzinfo may be None, or an
instance of a tzinfo subclass. The remaining arguments may be ints.
flytekit.core.context_manager.timezone
Fixed offset from UTC implementation of tzinfo.