flytekit.experimental.eager_function
Directory
Classes
Class | Description |
---|---|
DataConfig |
Any data storage specific configuration. |
ExecutionState |
This is the context that is active when executing a task or a local workflow. |
FlyteContext |
This is an internal-facing context object, that most users will not have to deal with. |
FlyteContextManager |
FlyteContextManager manages the execution context within Flytekit. |
FlyteRemote |
Main entrypoint for programmatically accessing a Flyte remote backend. |
PlatformConfig |
This object contains the settings to talk to a Flyte backend (the DNS location of your Admin server basically). |
S3Config |
S3 specific configuration. |
flytekit.experimental.eager_function.DataConfig
Any data storage specific configuration. Please do not use this to store secrets, in S3 case, as it is used in Flyte sandbox environment we store the access key id and secret. All DataPersistence plugins are passed all DataConfig and the plugin should correctly use the right config
def DataConfig(
s3: S3Config,
gcs: GCSConfig,
azure: AzureBlobStorageConfig,
generic: GenericPersistenceConfig,
):
Parameter | Type |
---|---|
s3 |
S3Config |
gcs |
GCSConfig |
azure |
AzureBlobStorageConfig |
generic |
GenericPersistenceConfig |
Methods
Method | Description |
---|---|
auto() |
None |
auto()
def auto(
config_file: typing.Union[str, ConfigFile],
):
Parameter | Type |
---|---|
config_file |
typing.Union[str, ConfigFile] |
flytekit.experimental.eager_function.ExecutionState
This is the context that is active when executing a task or a local workflow. This carries the necessary state to execute. Some required things during execution deal with temporary directories, ExecutionParameters that are passed to the user etc.
Attributes: mode (ExecutionState.Mode): Defines the context in which the task is executed (local, hosted, etc). working_dir (os.PathLike): Specifies the remote, external directory where inputs, outputs and other protobufs are uploaded engine_dir (os.PathLike): branch_eval_mode Optional[BranchEvalMode]: Used to determine whether a branch node should execute. user_space_params Optional[ExecutionParameters]: Provides run-time, user-centric context such as a statsd handler, a logging handler, the current execution id and a working directory.
def ExecutionState(
working_dir: Union[os.PathLike, str],
mode: Optional[ExecutionState.Mode],
engine_dir: Optional[Union[os.PathLike, str]],
branch_eval_mode: Optional[BranchEvalMode],
user_space_params: Optional[ExecutionParameters],
):
Parameter | Type |
---|---|
working_dir |
Union[os.PathLike, str] |
mode |
Optional[ExecutionState.Mode] |
engine_dir |
Optional[Union[os.PathLike, str]] |
branch_eval_mode |
Optional[BranchEvalMode] |
user_space_params |
Optional[ExecutionParameters] |
Methods
Method | Description |
---|---|
branch_complete() |
Indicates that we are within a conditional / ifelse block and the active branch is not done |
is_local_execution() |
None |
take_branch() |
Indicates that we are within an if-else block and the current branch has evaluated to true |
with_params() |
Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values |
branch_complete()
def branch_complete()
Indicates that we are within a conditional / ifelse block and the active branch is not done. Default to SKIPPED
is_local_execution()
def is_local_execution()
take_branch()
def take_branch()
Indicates that we are within an if-else block and the current branch has evaluated to true. Useful only in local execution mode
with_params()
def with_params(
working_dir: Optional[os.PathLike],
mode: Optional[Mode],
engine_dir: Optional[os.PathLike],
branch_eval_mode: Optional[BranchEvalMode],
user_space_params: Optional[ExecutionParameters],
):
Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values.
Parameter | Type |
---|---|
working_dir |
Optional[os.PathLike] |
mode |
Optional[Mode] |
engine_dir |
Optional[os.PathLike] |
branch_eval_mode |
Optional[BranchEvalMode] |
user_space_params |
Optional[ExecutionParameters] |
flytekit.experimental.eager_function.FlyteContext
This is an internal-facing context object, that most users will not have to deal with. It’s essentially a globally available grab bag of settings and objects that allows flytekit to do things like convert complex types, run and compile workflows, serialize Flyte entities, etc.
Even though this object as a current_context
function on it, it should not be called directly. Please use the
:py:class:flytekit.FlyteContextManager
object instead.
Please do not confuse this object with the :py:class:flytekit.ExecutionParameters
object.
def FlyteContext(
file_access: FileAccessProvider,
level: int,
flyte_client: Optional['friendly_client.SynchronousFlyteClient'],
compilation_state: Optional[CompilationState],
execution_state: Optional[ExecutionState],
serialization_settings: Optional[SerializationSettings],
in_a_condition: bool,
origin_stackframe: Optional[traceback.FrameSummary],
output_metadata_tracker: Optional[OutputMetadataTracker],
worker_queue: Optional[Controller],
):
Parameter | Type |
---|---|
file_access |
FileAccessProvider |
level |
int |
flyte_client |
Optional['friendly_client.SynchronousFlyteClient'] |
compilation_state |
Optional[CompilationState] |
execution_state |
Optional[ExecutionState] |
serialization_settings |
Optional[SerializationSettings] |
in_a_condition |
bool |
origin_stackframe |
Optional[traceback.FrameSummary] |
output_metadata_tracker |
Optional[OutputMetadataTracker] |
worker_queue |
Optional[Controller] |
Methods
Method | Description |
---|---|
current_context() |
This method exists only to maintain backwards compatibility |
enter_conditional_section() |
None |
get_deck() |
Returns the deck that was created as part of the last execution |
get_origin_stackframe_repr() |
None |
new_builder() |
None |
new_compilation_state() |
Creates and returns a default compilation state |
new_execution_state() |
Creates and returns a new default execution state |
set_stackframe() |
None |
with_client() |
None |
with_compilation_state() |
None |
with_execution_state() |
None |
with_file_access() |
None |
with_new_compilation_state() |
None |
with_output_metadata_tracker() |
None |
with_serialization_settings() |
None |
with_worker_queue() |
None |
current_context()
def current_context()
This method exists only to maintain backwards compatibility. Please use
FlyteContextManager.current_context()
instead.
Users of flytekit should be wary not to confuse the object returned from this function
with :py:func:flytekit.current_context
enter_conditional_section()
def enter_conditional_section()
get_deck()
def get_deck()
Returns the deck that was created as part of the last execution.
The return value depends on the execution environment. In a notebook, the return value is compatible with IPython.display and should be rendered in the notebook.
.. code-block:: python
with flytekit.new_context() as ctx: my_task(…) ctx.get_deck()
OR if you wish to explicitly display
.. code-block:: python
from IPython import display display(ctx.get_deck())
get_origin_stackframe_repr()
def get_origin_stackframe_repr()
new_builder()
def new_builder()
new_compilation_state()
def new_compilation_state(
prefix: str,
):
Creates and returns a default compilation state. For most of the code this should be the entrypoint of compilation, otherwise the code should always uses - with_compilation_state
Parameter | Type |
---|---|
prefix |
str |
new_execution_state()
def new_execution_state(
working_dir: Optional[os.PathLike],
):
Creates and returns a new default execution state. This should be used at the entrypoint of execution, in all other cases it is preferable to use with_execution_state
Parameter | Type |
---|---|
working_dir |
Optional[os.PathLike] |
set_stackframe()
def set_stackframe(
s: traceback.FrameSummary,
):
Parameter | Type |
---|---|
s |
traceback.FrameSummary |
with_client()
def with_client(
c: SynchronousFlyteClient,
):
Parameter | Type |
---|---|
c |
SynchronousFlyteClient |
with_compilation_state()
def with_compilation_state(
c: CompilationState,
):
Parameter | Type |
---|---|
c |
CompilationState |
with_execution_state()
def with_execution_state(
es: ExecutionState,
):
Parameter | Type |
---|---|
es |
ExecutionState |
with_file_access()
def with_file_access(
fa: FileAccessProvider,
):
Parameter | Type |
---|---|
fa |
FileAccessProvider |
with_new_compilation_state()
def with_new_compilation_state()
with_output_metadata_tracker()
def with_output_metadata_tracker(
t: OutputMetadataTracker,
):
Parameter | Type |
---|---|
t |
OutputMetadataTracker |
with_serialization_settings()
def with_serialization_settings(
ss: SerializationSettings,
):
Parameter | Type |
---|---|
ss |
SerializationSettings |
with_worker_queue()
def with_worker_queue(
wq: Controller,
):
Parameter | Type |
---|---|
wq |
Controller |
Properties
Property | Type | Description |
---|---|---|
user_space_params |
flytekit.experimental.eager_function.FlyteContextManager
FlyteContextManager manages the execution context within Flytekit. It holds global state of either compilation
or Execution. It is not thread-safe and can only be run as a single threaded application currently.
Context’s within Flytekit is useful to manage compilation state and execution state. Refer to CompilationState
and ExecutionState
for more information. FlyteContextManager provides a singleton stack to manage these contexts.
Typical usage is
.. code-block:: python
FlyteContextManager.initialize() with FlyteContextManager.with_context(o) as ctx: pass
If required - not recommended you can use
FlyteContextManager.push_context()
but correspondingly a pop_context should be called
FlyteContextManager.pop_context()
Methods
Method | Description |
---|---|
add_signal_handler() |
None |
current_context() |
None |
get_origin_stackframe() |
None |
initialize() |
Re-initializes the context and erases the entire context |
pop_context() |
None |
push_context() |
None |
size() |
None |
with_context() |
None |
add_signal_handler()
def add_signal_handler(
handler: typing.Callable[[int, FrameType], typing.Any],
):
Parameter | Type |
---|---|
handler |
typing.Callable[[int, FrameType], typing.Any] |
current_context()
def current_context()
get_origin_stackframe()
def get_origin_stackframe(
limit,
):
Parameter | Type |
---|---|
limit |
initialize()
def initialize()
Re-initializes the context and erases the entire context
pop_context()
def pop_context()
push_context()
def push_context(
ctx: FlyteContext,
f: Optional[traceback.FrameSummary],
):
Parameter | Type |
---|---|
ctx |
FlyteContext |
f |
Optional[traceback.FrameSummary] |
size()
def size()
with_context()
def with_context(
b: FlyteContext.Builder,
):
Parameter | Type |
---|---|
b |
FlyteContext.Builder |
flytekit.experimental.eager_function.FlyteRemote
Main entrypoint for programmatically accessing a Flyte remote backend.
The term ‘remote’ is synonymous with ‘backend’ or ‘deployment’ and refers to a hosted instance of the Flyte platform, which comes with a Flyte Admin server on some known URI.
def FlyteRemote(
config: Config,
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: typing.Optional[bool],
kwargs,
):
Initialize a FlyteRemote object.
:type kwargs: All arguments that can be passed to create the SynchronousFlyteClient. These are usually grpc parameters, if you want to customize credentials, ssl handling etc.
Parameter | Type |
---|---|
config |
Config |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
typing.Optional[bool] |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
activate_launchplan() |
Given a launchplan, activate it, all previous versions are deactivated |
approve() |
|
auto() |
None |
download() |
Download the data to the specified location |
execute() |
Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity |
execute_local_launch_plan() |
Execute a locally defined LaunchPlan |
execute_local_task() |
Execute a @task-decorated function or TaskTemplate task |
execute_local_workflow() |
Execute an @workflow decorated function |
execute_reference_launch_plan() |
Execute a ReferenceLaunchPlan |
execute_reference_task() |
Execute a ReferenceTask |
execute_reference_workflow() |
Execute a ReferenceWorkflow |
execute_remote_task_lp() |
Execute a FlyteTask, or FlyteLaunchplan |
execute_remote_wf() |
Execute a FlyteWorkflow |
fast_package() |
Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location |
fast_register_workflow() |
Use this method to register a workflow with zip mode |
fetch_active_launchplan() |
Returns the active version of the launch plan if it exists or returns None |
fetch_execution() |
Fetch a workflow execution entity from flyte admin |
fetch_launch_plan() |
Fetch a launchplan entity from flyte admin |
fetch_task() |
Fetch a task entity from flyte admin |
fetch_task_lazy() |
Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily |
fetch_workflow() |
Fetch a workflow entity from flyte admin |
fetch_workflow_lazy() |
Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily |
find_launch_plan() |
None |
find_launch_plan_for_node() |
None |
for_endpoint() |
None |
for_sandbox() |
None |
generate_console_http_domain() |
This should generate the domain where console is hosted |
generate_console_url() |
Generate a Flyteconsole URL for the given Flyte remote endpoint |
get() |
General function that works with flyte tiny urls |
get_domains() |
Lists registered domains from flyte admin |
get_execution_metrics() |
Get the metrics for a given execution |
get_extra_headers_for_protocol() |
None |
launch_backfill() |
Creates and launches a backfill workflow for the given launchplan |
list_projects() |
Lists registered projects from flyte admin |
list_signals() |
|
list_tasks_by_version() |
None |
raw_register() |
Raw register method, can be used to register control plane entities |
recent_executions() |
None |
register_launch_plan() |
Register a given launchplan, possibly applying overrides from the provided options |
register_script() |
Use this method to register a workflow via script mode |
register_task() |
Register a qualified task (PythonTask) with Remote |
register_workflow() |
Use this method to register a workflow |
reject() |
|
remote_context() |
Context manager with remote-specific configuration |
set_input() |
|
set_signal() |
|
sync() |
This function was previously a singledispatchmethod |
sync_execution() |
Sync a FlyteWorkflowExecution object with its corresponding remote state |
sync_node_execution() |
Get data backing a node execution |
sync_task_execution() |
Sync a FlyteTaskExecution object with its corresponding remote state |
terminate() |
Terminate a workflow execution |
upload_file() |
Function will use remote’s client to hash and then upload the file using Admin’s data proxy service |
wait() |
Wait for an execution to finish |
activate_launchplan()
def activate_launchplan(
ident: Identifier,
):
Given a launchplan, activate it, all previous versions are deactivated.
Parameter | Type |
---|---|
ident |
Identifier |
approve()
def approve(
signal_id: str,
execution_name: str,
project: str,
domain: str,
):
Parameter | Type |
---|---|
signal_id |
str |
execution_name |
str |
project |
str |
domain |
str |
auto()
def auto(
config_file: typing.Union[str, ConfigFile],
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
):
Parameter | Type |
---|---|
config_file |
typing.Union[str, ConfigFile] |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
bool |
kwargs |
**kwargs |
download()
def download(
data: typing.Union[LiteralsResolver, Literal, LiteralMap],
download_to: str,
recursive: bool,
):
Download the data to the specified location. If the data is a LiteralsResolver, LiteralMap and if recursive is specified, then all file like objects will be recursively downloaded (e.g. FlyteFile/Dir (blob), StructuredDataset etc).
Note: That it will use your sessions credentials to access the remote location. For sandbox, this should be automatically configured, assuming you are running sandbox locally. For other environments, you will need to configure your credentials appropriately.
Parameter | Type |
---|---|
data |
typing.Union[LiteralsResolver, Literal, LiteralMap] |
download_to |
str |
recursive |
bool |
execute()
def execute(
entity: typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity],
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
):
Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity.
This method supports:
Flyte{Task, Workflow, LaunchPlan}
remote module objects.@task
-decorated functions andTaskTemplate
tasks.@workflow
-decorated functions.LaunchPlan
objects.
For local entities, this code will attempt to find the entity first, and if missing, will compile and register the object.
Not all arguments are relevant in all circumstances. For example, there’s no reason to use the serialization settings for entities that have already been registered on Admin.
Parameter | Type |
---|---|
entity |
typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity] |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
name |
str |
version |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
image_config |
typing.Optional[ImageConfig] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_local_launch_plan()
def execute_local_launch_plan(
entity: LaunchPlan,
inputs: typing.Dict[str, typing.Any],
version: str,
project: typing.Optional[str],
domain: typing.Optional[str],
name: typing.Optional[str],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
):
Execute a locally defined LaunchPlan
.
Parameter | Type |
---|---|
entity |
LaunchPlan |
inputs |
typing.Dict[str, typing.Any] |
version |
str |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
name |
typing.Optional[str] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_local_task()
def execute_local_task(
entity: PythonTask,
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
options: typing.Optional[Options],
serialization_settings: typing.Optional[SerializationSettings],
):
Execute a @task-decorated function or TaskTemplate task.
Parameter | Type |
---|---|
entity |
PythonTask |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
name |
str |
version |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
image_config |
typing.Optional[ImageConfig] |
wait |
bool |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
options |
typing.Optional[Options] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_local_workflow()
def execute_local_workflow(
entity: WorkflowBase,
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
options: typing.Optional[Options],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
):
Execute an @workflow decorated function.
Parameter | Type |
---|---|
entity |
WorkflowBase |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
name |
str |
version |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
image_config |
typing.Optional[ImageConfig] |
options |
typing.Optional[Options] |
wait |
bool |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_reference_launch_plan()
def execute_reference_launch_plan(
entity: ReferenceLaunchPlan,
inputs: typing.Dict[str, typing.Any],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a ReferenceLaunchPlan.
Parameter | Type |
---|---|
entity |
ReferenceLaunchPlan |
inputs |
typing.Dict[str, typing.Any] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_reference_task()
def execute_reference_task(
entity: ReferenceTask,
inputs: typing.Dict[str, typing.Any],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a ReferenceTask.
Parameter | Type |
---|---|
entity |
ReferenceTask |
inputs |
typing.Dict[str, typing.Any] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_reference_workflow()
def execute_reference_workflow(
entity: ReferenceWorkflow,
inputs: typing.Dict[str, typing.Any],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a ReferenceWorkflow.
Parameter | Type |
---|---|
entity |
ReferenceWorkflow |
inputs |
typing.Dict[str, typing.Any] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_remote_task_lp()
def execute_remote_task_lp(
entity: typing.Union[FlyteTask, FlyteLaunchPlan],
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a FlyteTask, or FlyteLaunchplan.
NOTE: the name and version arguments are currently not used and only there consistency in the function signature
Parameter | Type |
---|---|
entity |
typing.Union[FlyteTask, FlyteLaunchPlan] |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_remote_wf()
def execute_remote_wf(
entity: FlyteWorkflow,
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a FlyteWorkflow.
NOTE: the name and version arguments are currently not used and only there consistency in the function signature
Parameter | Type |
---|---|
entity |
FlyteWorkflow |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
fast_package()
def fast_package(
root: os.PathLike,
deref_symlinks: bool,
output: str,
options: typing.Optional[FastPackageOptions],
):
Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location
Parameter | Type |
---|---|
root |
os.PathLike |
deref_symlinks |
bool |
output |
str |
options |
typing.Optional[FastPackageOptions] |
fast_register_workflow()
def fast_register_workflow(
entity: WorkflowBase,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
default_launch_plan: typing.Optional[bool],
options: typing.Optional[Options],
fast_package_options: typing.Optional[FastPackageOptions],
):
Use this method to register a workflow with zip mode.
Parameter | Type |
---|---|
entity |
WorkflowBase |
serialization_settings |
typing.Optional[SerializationSettings] |
version |
typing.Optional[str] |
default_launch_plan |
typing.Optional[bool] |
options |
typing.Optional[Options] |
fast_package_options |
typing.Optional[FastPackageOptions] |
fetch_active_launchplan()
def fetch_active_launchplan(
project: str,
domain: str,
name: str,
):
Returns the active version of the launch plan if it exists or returns None
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
fetch_execution()
def fetch_execution(
project: str,
domain: str,
name: str,
):
Fetch a workflow execution entity from flyte admin.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
fetch_launch_plan()
def fetch_launch_plan(
project: str,
domain: str,
name: str,
version: str,
):
Fetch a launchplan entity from flyte admin.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_task()
def fetch_task(
project: str,
domain: str,
name: str,
version: str,
):
Fetch a task entity from flyte admin.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_task_lazy()
def fetch_task_lazy(
project: str,
domain: str,
name: str,
version: str,
):
Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_workflow()
def fetch_workflow(
project: str,
domain: str,
name: str,
version: str,
):
Fetch a workflow entity from flyte admin.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_workflow_lazy()
def fetch_workflow_lazy(
project: str,
domain: str,
name: str,
version: str,
):
Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
find_launch_plan()
def find_launch_plan(
lp_ref: id_models,
node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
):
Parameter | Type |
---|---|
lp_ref |
id_models |
node_launch_plans |
Dict[id_models, launch_plan_models.LaunchPlanSpec] |
find_launch_plan_for_node()
def find_launch_plan_for_node(
node: Node,
node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
):
Parameter | Type |
---|---|
node |
Node |
node_launch_plans |
Dict[id_models, launch_plan_models.LaunchPlanSpec] |
for_endpoint()
def for_endpoint(
endpoint: str,
insecure: bool,
data_config: typing.Optional[DataConfig],
config_file: typing.Union[str, ConfigFile],
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
):
Parameter | Type |
---|---|
endpoint |
str |
insecure |
bool |
data_config |
typing.Optional[DataConfig] |
config_file |
typing.Union[str, ConfigFile] |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
bool |
kwargs |
**kwargs |
for_sandbox()
def for_sandbox(
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
):
Parameter | Type |
---|---|
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
bool |
kwargs |
**kwargs |
generate_console_http_domain()
def generate_console_http_domain()
This should generate the domain where console is hosted.
:return:
generate_console_url()
def generate_console_url(
entity: typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, WorkflowExecutionIdentifier, Identifier, FlyteLaunchPlan],
):
Generate a Flyteconsole URL for the given Flyte remote endpoint. This will automatically determine if this is an execution or an entity and change the type automatically
Parameter | Type |
---|---|
entity |
typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, WorkflowExecutionIdentifier, Identifier, FlyteLaunchPlan] |
get()
def get(
flyte_uri: typing.Optional[str],
):
General function that works with flyte tiny urls. This can return outputs (in the form of LiteralsResolver, or individual Literals for singular requests), or HTML if passed a deck link, or bytes containing HTML, if ipython is not available locally.
Parameter | Type |
---|---|
flyte_uri |
typing.Optional[str] |
get_domains()
def get_domains()
Lists registered domains from flyte admin.
:returns: typing.List[flytekit.models.domain.Domain]
get_execution_metrics()
def get_execution_metrics(
id: WorkflowExecutionIdentifier,
depth: int,
):
Get the metrics for a given execution.
Parameter | Type |
---|---|
id |
WorkflowExecutionIdentifier |
depth |
int |
get_extra_headers_for_protocol()
def get_extra_headers_for_protocol(
native_url,
):
Parameter | Type |
---|---|
native_url |
launch_backfill()
def launch_backfill(
project: str,
domain: str,
from_date: datetime,
to_date: datetime,
launchplan: str,
launchplan_version: str,
execution_name: str,
version: str,
dry_run: bool,
execute: bool,
parallel: bool,
failure_policy: typing.Optional[WorkflowFailurePolicy],
overwrite_cache: typing.Optional[bool],
):
Creates and launches a backfill workflow for the given launchplan. If launchplan version is not specified, then the latest launchplan is retrieved. The from_date is exclusive and end_date is inclusive and backfill run for all instances in between. :: -> (start_date - exclusive, end_date inclusive)
If dry_run is specified, the workflow is created and returned. If execute==False is specified then the workflow is created and registered. In the last case, the workflow is created, registered and executed.
The parallel
flag can be used to generate a workflow where all launchplans can be run in parallel. Default
is that execute backfill is run sequentially
Parameter | Type |
---|---|
project |
str |
domain |
str |
from_date |
datetime |
to_date |
datetime |
launchplan |
str |
launchplan_version |
str |
execution_name |
str |
version |
str |
dry_run |
bool |
execute |
bool |
parallel |
bool |
failure_policy |
typing.Optional[WorkflowFailurePolicy] |
overwrite_cache |
typing.Optional[bool] |
list_projects()
def list_projects(
limit: typing.Optional[int],
filters: typing.Optional[typing.List[filter_models.Filter]],
sort_by: typing.Optional[admin_common_models.Sort],
):
Lists registered projects from flyte admin.
Parameter | Type |
---|---|
limit |
typing.Optional[int] |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
sort_by |
typing.Optional[admin_common_models.Sort] |
list_signals()
def list_signals(
execution_name: str,
project: typing.Optional[str],
domain: typing.Optional[str],
limit: int,
filters: typing.Optional[typing.List[filter_models.Filter]],
):
Parameter | Type |
---|---|
execution_name |
str |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
limit |
int |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
list_tasks_by_version()
def list_tasks_by_version(
version: str,
project: typing.Optional[str],
domain: typing.Optional[str],
limit: typing.Optional[int],
):
Parameter | Type |
---|---|
version |
str |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
limit |
typing.Optional[int] |
raw_register()
def raw_register(
cp_entity: FlyteControlPlaneEntity,
settings: SerializationSettings,
version: str,
create_default_launchplan: bool,
options: Options,
og_entity: FlyteLocalEntity,
):
Raw register method, can be used to register control plane entities. Usually if you have a Flyte Entity like a WorkflowBase, Task, LaunchPlan then use other methods. This should be used only if you have already serialized entities
Parameter | Type |
---|---|
cp_entity |
FlyteControlPlaneEntity |
settings |
SerializationSettings |
version |
str |
create_default_launchplan |
bool |
options |
Options |
og_entity |
FlyteLocalEntity |
recent_executions()
def recent_executions(
project: typing.Optional[str],
domain: typing.Optional[str],
limit: typing.Optional[int],
filters: typing.Optional[typing.List[filter_models.Filter]],
):
Parameter | Type |
---|---|
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
limit |
typing.Optional[int] |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
register_launch_plan()
def register_launch_plan(
entity: LaunchPlan,
version: typing.Optional[str],
project: typing.Optional[str],
domain: typing.Optional[str],
options: typing.Optional[Options],
serialization_settings: typing.Optional[SerializationSettings],
):
Register a given launchplan, possibly applying overrides from the provided options. If the underlying workflow is not already registered, it, along with any underlying entities, will also be registered. If the underlying workflow does exist (with the given project/domain/version), then only the launchplan will be registered.
Parameter | Type |
---|---|
entity |
LaunchPlan |
version |
typing.Optional[str] |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
options |
typing.Optional[Options] |
serialization_settings |
typing.Optional[SerializationSettings] |
register_script()
def register_script(
entity: typing.Union[WorkflowBase, PythonTask, LaunchPlan],
image_config: typing.Optional[ImageConfig],
version: typing.Optional[str],
project: typing.Optional[str],
domain: typing.Optional[str],
destination_dir: str,
copy_all: bool,
default_launch_plan: bool,
options: typing.Optional[Options],
source_path: typing.Optional[str],
module_name: typing.Optional[str],
envs: typing.Optional[typing.Dict[str, str]],
fast_package_options: typing.Optional[FastPackageOptions],
):
Use this method to register a workflow via script mode.
Parameter | Type |
---|---|
entity |
typing.Union[WorkflowBase, PythonTask, LaunchPlan] |
image_config |
typing.Optional[ImageConfig] |
version |
typing.Optional[str] |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
destination_dir |
str |
copy_all |
bool |
default_launch_plan |
bool |
options |
typing.Optional[Options] |
source_path |
typing.Optional[str] |
module_name |
typing.Optional[str] |
envs |
typing.Optional[typing.Dict[str, str]] |
fast_package_options |
typing.Optional[FastPackageOptions] |
register_task()
def register_task(
entity: PythonTask,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
):
Register a qualified task (PythonTask) with Remote For any conflicting parameters method arguments are regarded as overrides
Parameter | Type |
---|---|
entity |
PythonTask |
serialization_settings |
typing.Optional[SerializationSettings] |
version |
typing.Optional[str] |
register_workflow()
def register_workflow(
entity: WorkflowBase,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
default_launch_plan: typing.Optional[bool],
options: typing.Optional[Options],
):
Use this method to register a workflow.
Parameter | Type |
---|---|
entity |
WorkflowBase |
serialization_settings |
typing.Optional[SerializationSettings] |
version |
typing.Optional[str] |
default_launch_plan |
typing.Optional[bool] |
options |
typing.Optional[Options] |
reject()
def reject(
signal_id: str,
execution_name: str,
project: str,
domain: str,
):
Parameter | Type |
---|---|
signal_id |
str |
execution_name |
str |
project |
str |
domain |
str |
remote_context()
def remote_context()
Context manager with remote-specific configuration.
set_input()
def set_input(
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project,
domain,
python_type,
literal_type,
):
Parameter | Type |
---|---|
signal_id |
str |
execution_name |
str |
value |
typing.Union[literal_models.Literal, typing.Any] |
project |
|
domain |
|
python_type |
|
literal_type |
set_signal()
def set_signal(
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project: typing.Optional[str],
domain: typing.Optional[str],
python_type: typing.Optional[typing.Type],
literal_type: typing.Optional[type_models.LiteralType],
):
Parameter | Type |
---|---|
signal_id |
str |
execution_name |
str |
value |
typing.Union[literal_models.Literal, typing.Any] |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
python_type |
typing.Optional[typing.Type] |
literal_type |
typing.Optional[type_models.LiteralType] |
sync()
def sync(
execution: FlyteWorkflowExecution,
entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
sync_nodes: bool,
):
This function was previously a singledispatchmethod. We’ve removed that but this function remains so that we don’t break people.
Parameter | Type |
---|---|
execution |
FlyteWorkflowExecution |
entity_definition |
typing.Union[FlyteWorkflow, FlyteTask] |
sync_nodes |
bool |
sync_execution()
def sync_execution(
execution: FlyteWorkflowExecution,
entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
sync_nodes: bool,
):
Sync a FlyteWorkflowExecution object with its corresponding remote state.
Parameter | Type |
---|---|
execution |
FlyteWorkflowExecution |
entity_definition |
typing.Union[FlyteWorkflow, FlyteTask] |
sync_nodes |
bool |
sync_node_execution()
def sync_node_execution(
execution: FlyteNodeExecution,
node_mapping: typing.Dict[str, FlyteNode],
):
Get data backing a node execution. These FlyteNodeExecution objects should’ve come from Admin with the model fields already populated correctly. For purposes of the remote experience, we’d like to supplement the object with some additional fields:
- inputs/outputs
- task/workflow executions, and/or underlying node executions in the case of parent nodes
- TypedInterface (remote wrapper type)
A node can have several different types of executions behind it. That is, the node could’ve run (perhaps multiple times because of retries):
- A task
- A static subworkflow
- A dynamic subworkflow (which in turn may have run additional tasks, subwfs, and/or launch plans)
- A launch plan
The data model is complicated, so ascertaining which of these happened is a bit tricky. That logic is encapsulated in this function.
Parameter | Type |
---|---|
execution |
FlyteNodeExecution |
node_mapping |
typing.Dict[str, FlyteNode] |
sync_task_execution()
def sync_task_execution(
execution: FlyteTaskExecution,
entity_interface: typing.Optional[TypedInterface],
):
Sync a FlyteTaskExecution object with its corresponding remote state.
Parameter | Type |
---|---|
execution |
FlyteTaskExecution |
entity_interface |
typing.Optional[TypedInterface] |
terminate()
def terminate(
execution: FlyteWorkflowExecution,
cause: str,
):
Terminate a workflow execution.
Parameter | Type |
---|---|
execution |
FlyteWorkflowExecution |
cause |
str |
upload_file()
def upload_file(
to_upload: pathlib.Path,
project: typing.Optional[str],
domain: typing.Optional[str],
filename_root: typing.Optional[str],
):
Function will use remote’s client to hash and then upload the file using Admin’s data proxy service.
Parameter | Type |
---|---|
to_upload |
pathlib.Path |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
filename_root |
typing.Optional[str] |
wait()
def wait(
execution: FlyteWorkflowExecution,
timeout: typing.Optional[typing.Union[timedelta, int]],
poll_interval: typing.Optional[typing.Union[timedelta, int]],
sync_nodes: bool,
):
Wait for an execution to finish.
Parameter | Type |
---|---|
execution |
FlyteWorkflowExecution |
timeout |
typing.Optional[typing.Union[timedelta, int]] |
poll_interval |
typing.Optional[typing.Union[timedelta, int]] |
sync_nodes |
bool |
Properties
Property | Type | Description |
---|---|---|
client | ||
config | ||
context | ||
default_domain | ||
default_project | ||
file_access | ||
interactive_mode_enabled |
flytekit.experimental.eager_function.PlatformConfig
This object contains the settings to talk to a Flyte backend (the DNS location of your Admin server basically).
def PlatformConfig(
endpoint: str,
insecure: bool,
insecure_skip_verify: bool,
ca_cert_file_path: typing.Optional[str],
console_endpoint: typing.Optional[str],
command: typing.Optional[typing.List[str]],
proxy_command: typing.Optional[typing.List[str]],
client_id: typing.Optional[str],
client_credentials_secret: typing.Optional[str],
scopes: List[str],
auth_mode: AuthType,
audience: typing.Optional[str],
rpc_retries: int,
http_proxy_url: typing.Optional[str],
):
Parameter | Type |
---|---|
endpoint |
str |
insecure |
bool |
insecure_skip_verify |
bool |
ca_cert_file_path |
typing.Optional[str] |
console_endpoint |
typing.Optional[str] |
command |
typing.Optional[typing.List[str]] |
proxy_command |
typing.Optional[typing.List[str]] |
client_id |
typing.Optional[str] |
client_credentials_secret |
typing.Optional[str] |
scopes |
List[str] |
auth_mode |
AuthType |
audience |
typing.Optional[str] |
rpc_retries |
int |
http_proxy_url |
typing.Optional[str] |
Methods
Method | Description |
---|---|
auto() |
Reads from Config file, and overrides from Environment variables |
for_endpoint() |
None |
auto()
def auto(
config_file: typing.Optional[typing.Union[str, ConfigFile]],
):
Reads from Config file, and overrides from Environment variables. Refer to ConfigEntry for details
Parameter | Type |
---|---|
config_file |
typing.Optional[typing.Union[str, ConfigFile]] |
for_endpoint()
def for_endpoint(
endpoint: str,
insecure: bool,
):
Parameter | Type |
---|---|
endpoint |
str |
insecure |
bool |
flytekit.experimental.eager_function.S3Config
S3 specific configuration
def S3Config(
enable_debug: bool,
endpoint: typing.Optional[str],
retries: int,
backoff: datetime.timedelta,
access_key_id: typing.Optional[str],
secret_access_key: typing.Optional[str],
):
Parameter | Type |
---|---|
enable_debug |
bool |
endpoint |
typing.Optional[str] |
retries |
int |
backoff |
datetime.timedelta |
access_key_id |
typing.Optional[str] |
secret_access_key |
typing.Optional[str] |
Methods
Method | Description |
---|---|
auto() |
Automatically configure |
auto()
def auto(
config_file: typing.Union[str, ConfigFile],
):
Automatically configure
Parameter | Type |
---|---|
config_file |
typing.Union[str, ConfigFile] |