flytekit.configuration.plugin
Defines a plugin API allowing other libraries to modify the behavior of flytekit.
Libraries can register by defining an object that follows the same API as FlytekitPlugin
and providing an entrypoint with the group name “flytekit.plugin”. In setuptools
,
you can specific them with:
setup(entry_points={
"flytekit.configuration.plugin": ["my_plugin=my_module:MyCustomPlugin"]
})
or in pyproject.toml:
[project.entry-points."flytekit.configuration.plugin"]
my_plugin = "my_module:MyCustomPlugin"
Directory
Classes
Class | Description |
---|---|
CachePolicy |
Base class for protocol classes. |
Config |
This the parent configuration object and holds all the underlying configuration object types. |
FlyteRemote |
Main entrypoint for programmatically accessing a Flyte remote backend. |
FlytekitPlugin |
None. |
FlytekitPluginProtocol |
Base class for protocol classes. |
Group |
A group allows a command to have subcommands attached. |
Protocol |
Base class for protocol classes. |
flytekit.configuration.plugin.CachePolicy
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol): def meth(self) -> int: …
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C: def meth(self) -> int: return 0
def func(x: Proto) -> int: return x.meth()
func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProtoT: def meth(self) -> T: …
def CachePolicy(
args,
kwargs,
):
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
get_version() |
None |
get_version()
def get_version(
salt: str,
params: flytekit.core.cache.VersionParameters,
):
Parameter | Type |
---|---|
salt |
str |
params |
flytekit.core.cache.VersionParameters |
flytekit.configuration.plugin.Config
This the parent configuration object and holds all the underlying configuration object types. An instance of this object holds all the config necessary to
- Interactive session with Flyte backend
- Some parts are required for Serialization, for example Platform Config is not required
- Runtime of a task
def Config(
platform: PlatformConfig,
secrets: SecretsConfig,
stats: StatsConfig,
data_config: DataConfig,
local_sandbox_path: str,
):
Parameter | Type |
---|---|
platform |
PlatformConfig |
secrets |
SecretsConfig |
stats |
StatsConfig |
data_config |
DataConfig |
local_sandbox_path |
str |
Methods
Method | Description |
---|---|
auto() |
Automatically constructs the Config Object |
for_endpoint() |
Creates an automatic config for the given endpoint and uses the config_file or environment variable for default |
for_sandbox() |
Constructs a new Config object specifically to connect to :std:ref:deployment-deployment-sandbox |
with_params() |
None |
auto()
def auto(
config_file: typing.Union[str, ConfigFile, None],
):
Automatically constructs the Config Object. The order of precedence is as follows
- first try to find any env vars that match the config vars specified in the FLYTE_CONFIG format.
- If not found in environment then values ar read from the config file
- If not found in the file, then the default values are used.
Parameter | Type |
---|---|
config_file |
typing.Union[str, ConfigFile, None] |
for_endpoint()
def for_endpoint(
endpoint: str,
insecure: bool,
data_config: typing.Optional[DataConfig],
config_file: typing.Union[str, ConfigFile],
):
Creates an automatic config for the given endpoint and uses the config_file or environment variable for default.
Refer to Config.auto()
to understand the default bootstrap behavior.
data_config can be used to configure how data is downloaded or uploaded to a specific Blob storage like S3 / GCS etc. But, for permissions to a specific backend just use Cloud providers reqcommendation. If using fsspec, then refer to fsspec documentation
Parameter | Type |
---|---|
endpoint |
str |
insecure |
bool |
data_config |
typing.Optional[DataConfig] |
config_file |
typing.Union[str, ConfigFile] |
for_sandbox()
def for_sandbox()
Constructs a new Config object specifically to connect to :std:ref:deployment-deployment-sandbox
.
If you are using a hosted Sandbox like environment, then you may need to use port-forward or ingress urls
:return: Config
with_params()
def with_params(
platform: PlatformConfig,
secrets: SecretsConfig,
stats: StatsConfig,
data_config: DataConfig,
local_sandbox_path: str,
):
Parameter | Type |
---|---|
platform |
PlatformConfig |
secrets |
SecretsConfig |
stats |
StatsConfig |
data_config |
DataConfig |
local_sandbox_path |
str |
flytekit.configuration.plugin.FlyteRemote
Main entrypoint for programmatically accessing a Flyte remote backend.
The term ‘remote’ is synonymous with ‘backend’ or ‘deployment’ and refers to a hosted instance of the Flyte platform, which comes with a Flyte Admin server on some known URI.
def FlyteRemote(
config: Config,
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: typing.Optional[bool],
kwargs,
):
Initialize a FlyteRemote object.
:type kwargs: All arguments that can be passed to create the SynchronousFlyteClient. These are usually grpc parameters, if you want to customize credentials, ssl handling etc.
Parameter | Type |
---|---|
config |
Config |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
typing.Optional[bool] |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
activate_launchplan() |
Given a launchplan, activate it, all previous versions are deactivated |
approve() |
|
auto() |
None |
download() |
Download the data to the specified location |
execute() |
Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity |
execute_local_launch_plan() |
Execute a locally defined LaunchPlan |
execute_local_task() |
Execute a @task-decorated function or TaskTemplate task |
execute_local_workflow() |
Execute an @workflow decorated function |
execute_reference_launch_plan() |
Execute a ReferenceLaunchPlan |
execute_reference_task() |
Execute a ReferenceTask |
execute_reference_workflow() |
Execute a ReferenceWorkflow |
execute_remote_task_lp() |
Execute a FlyteTask, or FlyteLaunchplan |
execute_remote_wf() |
Execute a FlyteWorkflow |
fast_package() |
Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location |
fast_register_workflow() |
Use this method to register a workflow with zip mode |
fetch_active_launchplan() |
Returns the active version of the launch plan if it exists or returns None |
fetch_execution() |
Fetch a workflow execution entity from flyte admin |
fetch_launch_plan() |
Fetch a launchplan entity from flyte admin |
fetch_task() |
Fetch a task entity from flyte admin |
fetch_task_lazy() |
Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily |
fetch_workflow() |
Fetch a workflow entity from flyte admin |
fetch_workflow_lazy() |
Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily |
find_launch_plan() |
None |
find_launch_plan_for_node() |
None |
for_endpoint() |
None |
for_sandbox() |
None |
generate_console_http_domain() |
This should generate the domain where console is hosted |
generate_console_url() |
Generate a Flyteconsole URL for the given Flyte remote endpoint |
get() |
General function that works with flyte tiny urls |
get_domains() |
Lists registered domains from flyte admin |
get_execution_metrics() |
Get the metrics for a given execution |
get_extra_headers_for_protocol() |
None |
launch_backfill() |
Creates and launches a backfill workflow for the given launchplan |
list_projects() |
Lists registered projects from flyte admin |
list_signals() |
|
list_tasks_by_version() |
None |
raw_register() |
Raw register method, can be used to register control plane entities |
recent_executions() |
None |
register_launch_plan() |
Register a given launchplan, possibly applying overrides from the provided options |
register_script() |
Use this method to register a workflow via script mode |
register_task() |
Register a qualified task (PythonTask) with Remote |
register_workflow() |
Use this method to register a workflow |
reject() |
|
remote_context() |
Context manager with remote-specific configuration |
set_input() |
|
set_signal() |
|
sync() |
This function was previously a singledispatchmethod |
sync_execution() |
Sync a FlyteWorkflowExecution object with its corresponding remote state |
sync_node_execution() |
Get data backing a node execution |
sync_task_execution() |
Sync a FlyteTaskExecution object with its corresponding remote state |
terminate() |
Terminate a workflow execution |
upload_file() |
Function will use remote’s client to hash and then upload the file using Admin’s data proxy service |
wait() |
Wait for an execution to finish |
activate_launchplan()
def activate_launchplan(
ident: Identifier,
):
Given a launchplan, activate it, all previous versions are deactivated.
Parameter | Type |
---|---|
ident |
Identifier |
approve()
def approve(
signal_id: str,
execution_name: str,
project: str,
domain: str,
):
Parameter | Type |
---|---|
signal_id |
str |
execution_name |
str |
project |
str |
domain |
str |
auto()
def auto(
config_file: typing.Union[str, ConfigFile],
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
):
Parameter | Type |
---|---|
config_file |
typing.Union[str, ConfigFile] |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
bool |
kwargs |
**kwargs |
download()
def download(
data: typing.Union[LiteralsResolver, Literal, LiteralMap],
download_to: str,
recursive: bool,
):
Download the data to the specified location. If the data is a LiteralsResolver, LiteralMap and if recursive is specified, then all file like objects will be recursively downloaded (e.g. FlyteFile/Dir (blob), StructuredDataset etc).
Note: That it will use your sessions credentials to access the remote location. For sandbox, this should be automatically configured, assuming you are running sandbox locally. For other environments, you will need to configure your credentials appropriately.
Parameter | Type |
---|---|
data |
typing.Union[LiteralsResolver, Literal, LiteralMap] |
download_to |
str |
recursive |
bool |
execute()
def execute(
entity: typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity],
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
):
Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity.
This method supports:
Flyte{Task, Workflow, LaunchPlan}
remote module objects.@task
-decorated functions andTaskTemplate
tasks.@workflow
-decorated functions.LaunchPlan
objects.
For local entities, this code will attempt to find the entity first, and if missing, will compile and register the object.
Not all arguments are relevant in all circumstances. For example, there’s no reason to use the serialization settings for entities that have already been registered on Admin.
Parameter | Type |
---|---|
entity |
typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity] |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
name |
str |
version |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
image_config |
typing.Optional[ImageConfig] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_local_launch_plan()
def execute_local_launch_plan(
entity: LaunchPlan,
inputs: typing.Dict[str, typing.Any],
version: str,
project: typing.Optional[str],
domain: typing.Optional[str],
name: typing.Optional[str],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
):
Execute a locally defined LaunchPlan
.
Parameter | Type |
---|---|
entity |
LaunchPlan |
inputs |
typing.Dict[str, typing.Any] |
version |
str |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
name |
typing.Optional[str] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_local_task()
def execute_local_task(
entity: PythonTask,
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
options: typing.Optional[Options],
serialization_settings: typing.Optional[SerializationSettings],
):
Execute a @task-decorated function or TaskTemplate task.
Parameter | Type |
---|---|
entity |
PythonTask |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
name |
str |
version |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
image_config |
typing.Optional[ImageConfig] |
wait |
bool |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
options |
typing.Optional[Options] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_local_workflow()
def execute_local_workflow(
entity: WorkflowBase,
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
options: typing.Optional[Options],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
):
Execute an @workflow decorated function.
Parameter | Type |
---|---|
entity |
WorkflowBase |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
name |
str |
version |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
image_config |
typing.Optional[ImageConfig] |
options |
typing.Optional[Options] |
wait |
bool |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
serialization_settings |
typing.Optional[SerializationSettings] |
execute_reference_launch_plan()
def execute_reference_launch_plan(
entity: ReferenceLaunchPlan,
inputs: typing.Dict[str, typing.Any],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a ReferenceLaunchPlan.
Parameter | Type |
---|---|
entity |
ReferenceLaunchPlan |
inputs |
typing.Dict[str, typing.Any] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_reference_task()
def execute_reference_task(
entity: ReferenceTask,
inputs: typing.Dict[str, typing.Any],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a ReferenceTask.
Parameter | Type |
---|---|
entity |
ReferenceTask |
inputs |
typing.Dict[str, typing.Any] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_reference_workflow()
def execute_reference_workflow(
entity: ReferenceWorkflow,
inputs: typing.Dict[str, typing.Any],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a ReferenceWorkflow.
Parameter | Type |
---|---|
entity |
ReferenceWorkflow |
inputs |
typing.Dict[str, typing.Any] |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_remote_task_lp()
def execute_remote_task_lp(
entity: typing.Union[FlyteTask, FlyteLaunchPlan],
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a FlyteTask, or FlyteLaunchplan.
NOTE: the name and version arguments are currently not used and only there consistency in the function signature
Parameter | Type |
---|---|
entity |
typing.Union[FlyteTask, FlyteLaunchPlan] |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
execute_remote_wf()
def execute_remote_wf(
entity: FlyteWorkflow,
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
):
Execute a FlyteWorkflow.
NOTE: the name and version arguments are currently not used and only there consistency in the function signature
Parameter | Type |
---|---|
entity |
FlyteWorkflow |
inputs |
typing.Dict[str, typing.Any] |
project |
str |
domain |
str |
execution_name |
typing.Optional[str] |
execution_name_prefix |
typing.Optional[str] |
options |
typing.Optional[Options] |
wait |
bool |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
overwrite_cache |
typing.Optional[bool] |
interruptible |
typing.Optional[bool] |
envs |
typing.Optional[typing.Dict[str, str]] |
tags |
typing.Optional[typing.List[str]] |
cluster_pool |
typing.Optional[str] |
execution_cluster_label |
typing.Optional[str] |
fast_package()
def fast_package(
root: os.PathLike,
deref_symlinks: bool,
output: str,
options: typing.Optional[FastPackageOptions],
):
Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location
Parameter | Type |
---|---|
root |
os.PathLike |
deref_symlinks |
bool |
output |
str |
options |
typing.Optional[FastPackageOptions] |
fast_register_workflow()
def fast_register_workflow(
entity: WorkflowBase,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
default_launch_plan: typing.Optional[bool],
options: typing.Optional[Options],
fast_package_options: typing.Optional[FastPackageOptions],
):
Use this method to register a workflow with zip mode.
Parameter | Type |
---|---|
entity |
WorkflowBase |
serialization_settings |
typing.Optional[SerializationSettings] |
version |
typing.Optional[str] |
default_launch_plan |
typing.Optional[bool] |
options |
typing.Optional[Options] |
fast_package_options |
typing.Optional[FastPackageOptions] |
fetch_active_launchplan()
def fetch_active_launchplan(
project: str,
domain: str,
name: str,
):
Returns the active version of the launch plan if it exists or returns None
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
fetch_execution()
def fetch_execution(
project: str,
domain: str,
name: str,
):
Fetch a workflow execution entity from flyte admin.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
fetch_launch_plan()
def fetch_launch_plan(
project: str,
domain: str,
name: str,
version: str,
):
Fetch a launchplan entity from flyte admin.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_task()
def fetch_task(
project: str,
domain: str,
name: str,
version: str,
):
Fetch a task entity from flyte admin.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_task_lazy()
def fetch_task_lazy(
project: str,
domain: str,
name: str,
version: str,
):
Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_workflow()
def fetch_workflow(
project: str,
domain: str,
name: str,
version: str,
):
Fetch a workflow entity from flyte admin.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
fetch_workflow_lazy()
def fetch_workflow_lazy(
project: str,
domain: str,
name: str,
version: str,
):
Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily.
Parameter | Type |
---|---|
project |
str |
domain |
str |
name |
str |
version |
str |
find_launch_plan()
def find_launch_plan(
lp_ref: id_models,
node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
):
Parameter | Type |
---|---|
lp_ref |
id_models |
node_launch_plans |
Dict[id_models, launch_plan_models.LaunchPlanSpec] |
find_launch_plan_for_node()
def find_launch_plan_for_node(
node: Node,
node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
):
Parameter | Type |
---|---|
node |
Node |
node_launch_plans |
Dict[id_models, launch_plan_models.LaunchPlanSpec] |
for_endpoint()
def for_endpoint(
endpoint: str,
insecure: bool,
data_config: typing.Optional[DataConfig],
config_file: typing.Union[str, ConfigFile],
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
):
Parameter | Type |
---|---|
endpoint |
str |
insecure |
bool |
data_config |
typing.Optional[DataConfig] |
config_file |
typing.Union[str, ConfigFile] |
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
bool |
kwargs |
**kwargs |
for_sandbox()
def for_sandbox(
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
):
Parameter | Type |
---|---|
default_project |
typing.Optional[str] |
default_domain |
typing.Optional[str] |
data_upload_location |
str |
interactive_mode_enabled |
bool |
kwargs |
**kwargs |
generate_console_http_domain()
def generate_console_http_domain()
This should generate the domain where console is hosted.
:return:
generate_console_url()
def generate_console_url(
entity: typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, WorkflowExecutionIdentifier, Identifier, FlyteLaunchPlan],
):
Generate a Flyteconsole URL for the given Flyte remote endpoint. This will automatically determine if this is an execution or an entity and change the type automatically
Parameter | Type |
---|---|
entity |
typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, WorkflowExecutionIdentifier, Identifier, FlyteLaunchPlan] |
get()
def get(
flyte_uri: typing.Optional[str],
):
General function that works with flyte tiny urls. This can return outputs (in the form of LiteralsResolver, or individual Literals for singular requests), or HTML if passed a deck link, or bytes containing HTML, if ipython is not available locally.
Parameter | Type |
---|---|
flyte_uri |
typing.Optional[str] |
get_domains()
def get_domains()
Lists registered domains from flyte admin.
:returns: typing.List[flytekit.models.domain.Domain]
get_execution_metrics()
def get_execution_metrics(
id: WorkflowExecutionIdentifier,
depth: int,
):
Get the metrics for a given execution.
Parameter | Type |
---|---|
id |
WorkflowExecutionIdentifier |
depth |
int |
get_extra_headers_for_protocol()
def get_extra_headers_for_protocol(
native_url,
):
Parameter | Type |
---|---|
native_url |
launch_backfill()
def launch_backfill(
project: str,
domain: str,
from_date: datetime,
to_date: datetime,
launchplan: str,
launchplan_version: str,
execution_name: str,
version: str,
dry_run: bool,
execute: bool,
parallel: bool,
failure_policy: typing.Optional[WorkflowFailurePolicy],
overwrite_cache: typing.Optional[bool],
):
Creates and launches a backfill workflow for the given launchplan. If launchplan version is not specified, then the latest launchplan is retrieved. The from_date is exclusive and end_date is inclusive and backfill run for all instances in between. :: -> (start_date - exclusive, end_date inclusive)
If dry_run is specified, the workflow is created and returned. If execute==False is specified then the workflow is created and registered. In the last case, the workflow is created, registered and executed.
The parallel
flag can be used to generate a workflow where all launchplans can be run in parallel. Default
is that execute backfill is run sequentially
Parameter | Type |
---|---|
project |
str |
domain |
str |
from_date |
datetime |
to_date |
datetime |
launchplan |
str |
launchplan_version |
str |
execution_name |
str |
version |
str |
dry_run |
bool |
execute |
bool |
parallel |
bool |
failure_policy |
typing.Optional[WorkflowFailurePolicy] |
overwrite_cache |
typing.Optional[bool] |
list_projects()
def list_projects(
limit: typing.Optional[int],
filters: typing.Optional[typing.List[filter_models.Filter]],
sort_by: typing.Optional[admin_common_models.Sort],
):
Lists registered projects from flyte admin.
Parameter | Type |
---|---|
limit |
typing.Optional[int] |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
sort_by |
typing.Optional[admin_common_models.Sort] |
list_signals()
def list_signals(
execution_name: str,
project: typing.Optional[str],
domain: typing.Optional[str],
limit: int,
filters: typing.Optional[typing.List[filter_models.Filter]],
):
Parameter | Type |
---|---|
execution_name |
str |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
limit |
int |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
list_tasks_by_version()
def list_tasks_by_version(
version: str,
project: typing.Optional[str],
domain: typing.Optional[str],
limit: typing.Optional[int],
):
Parameter | Type |
---|---|
version |
str |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
limit |
typing.Optional[int] |
raw_register()
def raw_register(
cp_entity: FlyteControlPlaneEntity,
settings: SerializationSettings,
version: str,
create_default_launchplan: bool,
options: Options,
og_entity: FlyteLocalEntity,
):
Raw register method, can be used to register control plane entities. Usually if you have a Flyte Entity like a WorkflowBase, Task, LaunchPlan then use other methods. This should be used only if you have already serialized entities
Parameter | Type |
---|---|
cp_entity |
FlyteControlPlaneEntity |
settings |
SerializationSettings |
version |
str |
create_default_launchplan |
bool |
options |
Options |
og_entity |
FlyteLocalEntity |
recent_executions()
def recent_executions(
project: typing.Optional[str],
domain: typing.Optional[str],
limit: typing.Optional[int],
filters: typing.Optional[typing.List[filter_models.Filter]],
):
Parameter | Type |
---|---|
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
limit |
typing.Optional[int] |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
register_launch_plan()
def register_launch_plan(
entity: LaunchPlan,
version: typing.Optional[str],
project: typing.Optional[str],
domain: typing.Optional[str],
options: typing.Optional[Options],
serialization_settings: typing.Optional[SerializationSettings],
):
Register a given launchplan, possibly applying overrides from the provided options. If the underlying workflow is not already registered, it, along with any underlying entities, will also be registered. If the underlying workflow does exist (with the given project/domain/version), then only the launchplan will be registered.
Parameter | Type |
---|---|
entity |
LaunchPlan |
version |
typing.Optional[str] |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
options |
typing.Optional[Options] |
serialization_settings |
typing.Optional[SerializationSettings] |
register_script()
def register_script(
entity: typing.Union[WorkflowBase, PythonTask, LaunchPlan],
image_config: typing.Optional[ImageConfig],
version: typing.Optional[str],
project: typing.Optional[str],
domain: typing.Optional[str],
destination_dir: str,
copy_all: bool,
default_launch_plan: bool,
options: typing.Optional[Options],
source_path: typing.Optional[str],
module_name: typing.Optional[str],
envs: typing.Optional[typing.Dict[str, str]],
fast_package_options: typing.Optional[FastPackageOptions],
):
Use this method to register a workflow via script mode.
Parameter | Type |
---|---|
entity |
typing.Union[WorkflowBase, PythonTask, LaunchPlan] |
image_config |
typing.Optional[ImageConfig] |
version |
typing.Optional[str] |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
destination_dir |
str |
copy_all |
bool |
default_launch_plan |
bool |
options |
typing.Optional[Options] |
source_path |
typing.Optional[str] |
module_name |
typing.Optional[str] |
envs |
typing.Optional[typing.Dict[str, str]] |
fast_package_options |
typing.Optional[FastPackageOptions] |
register_task()
def register_task(
entity: PythonTask,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
):
Register a qualified task (PythonTask) with Remote For any conflicting parameters method arguments are regarded as overrides
Parameter | Type |
---|---|
entity |
PythonTask |
serialization_settings |
typing.Optional[SerializationSettings] |
version |
typing.Optional[str] |
register_workflow()
def register_workflow(
entity: WorkflowBase,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
default_launch_plan: typing.Optional[bool],
options: typing.Optional[Options],
):
Use this method to register a workflow.
Parameter | Type |
---|---|
entity |
WorkflowBase |
serialization_settings |
typing.Optional[SerializationSettings] |
version |
typing.Optional[str] |
default_launch_plan |
typing.Optional[bool] |
options |
typing.Optional[Options] |
reject()
def reject(
signal_id: str,
execution_name: str,
project: str,
domain: str,
):
Parameter | Type |
---|---|
signal_id |
str |
execution_name |
str |
project |
str |
domain |
str |
remote_context()
def remote_context()
Context manager with remote-specific configuration.
set_input()
def set_input(
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project,
domain,
python_type,
literal_type,
):
Parameter | Type |
---|---|
signal_id |
str |
execution_name |
str |
value |
typing.Union[literal_models.Literal, typing.Any] |
project |
|
domain |
|
python_type |
|
literal_type |
set_signal()
def set_signal(
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project: typing.Optional[str],
domain: typing.Optional[str],
python_type: typing.Optional[typing.Type],
literal_type: typing.Optional[type_models.LiteralType],
):
Parameter | Type |
---|---|
signal_id |
str |
execution_name |
str |
value |
typing.Union[literal_models.Literal, typing.Any] |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
python_type |
typing.Optional[typing.Type] |
literal_type |
typing.Optional[type_models.LiteralType] |
sync()
def sync(
execution: FlyteWorkflowExecution,
entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
sync_nodes: bool,
):
This function was previously a singledispatchmethod. We’ve removed that but this function remains so that we don’t break people.
Parameter | Type |
---|---|
execution |
FlyteWorkflowExecution |
entity_definition |
typing.Union[FlyteWorkflow, FlyteTask] |
sync_nodes |
bool |
sync_execution()
def sync_execution(
execution: FlyteWorkflowExecution,
entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
sync_nodes: bool,
):
Sync a FlyteWorkflowExecution object with its corresponding remote state.
Parameter | Type |
---|---|
execution |
FlyteWorkflowExecution |
entity_definition |
typing.Union[FlyteWorkflow, FlyteTask] |
sync_nodes |
bool |
sync_node_execution()
def sync_node_execution(
execution: FlyteNodeExecution,
node_mapping: typing.Dict[str, FlyteNode],
):
Get data backing a node execution. These FlyteNodeExecution objects should’ve come from Admin with the model fields already populated correctly. For purposes of the remote experience, we’d like to supplement the object with some additional fields:
- inputs/outputs
- task/workflow executions, and/or underlying node executions in the case of parent nodes
- TypedInterface (remote wrapper type)
A node can have several different types of executions behind it. That is, the node could’ve run (perhaps multiple times because of retries):
- A task
- A static subworkflow
- A dynamic subworkflow (which in turn may have run additional tasks, subwfs, and/or launch plans)
- A launch plan
The data model is complicated, so ascertaining which of these happened is a bit tricky. That logic is encapsulated in this function.
Parameter | Type |
---|---|
execution |
FlyteNodeExecution |
node_mapping |
typing.Dict[str, FlyteNode] |
sync_task_execution()
def sync_task_execution(
execution: FlyteTaskExecution,
entity_interface: typing.Optional[TypedInterface],
):
Sync a FlyteTaskExecution object with its corresponding remote state.
Parameter | Type |
---|---|
execution |
FlyteTaskExecution |
entity_interface |
typing.Optional[TypedInterface] |
terminate()
def terminate(
execution: FlyteWorkflowExecution,
cause: str,
):
Terminate a workflow execution.
Parameter | Type |
---|---|
execution |
FlyteWorkflowExecution |
cause |
str |
upload_file()
def upload_file(
to_upload: pathlib.Path,
project: typing.Optional[str],
domain: typing.Optional[str],
filename_root: typing.Optional[str],
):
Function will use remote’s client to hash and then upload the file using Admin’s data proxy service.
Parameter | Type |
---|---|
to_upload |
pathlib.Path |
project |
typing.Optional[str] |
domain |
typing.Optional[str] |
filename_root |
typing.Optional[str] |
wait()
def wait(
execution: FlyteWorkflowExecution,
timeout: typing.Optional[typing.Union[timedelta, int]],
poll_interval: typing.Optional[typing.Union[timedelta, int]],
sync_nodes: bool,
):
Wait for an execution to finish.
Parameter | Type |
---|---|
execution |
FlyteWorkflowExecution |
timeout |
typing.Optional[typing.Union[timedelta, int]] |
poll_interval |
typing.Optional[typing.Union[timedelta, int]] |
sync_nodes |
bool |
Properties
Property | Type | Description |
---|---|---|
client | ||
config | ||
context | ||
default_domain | ||
default_project | ||
file_access | ||
interactive_mode_enabled |
flytekit.configuration.plugin.FlytekitPlugin
Methods
Method | Description |
---|---|
configure_pyflyte_cli() |
Configure pyflyte’s CLI |
get_auth_success_html() |
Get default success html |
get_default_cache_policies() |
Get default cache policies for tasks |
get_default_image() |
Get default image |
get_remote() |
Get FlyteRemote object for CLI session |
secret_requires_group() |
Return True if secrets require group entry during registration time |
configure_pyflyte_cli()
def configure_pyflyte_cli(
main: click.core.Group,
):
Configure pyflyte’s CLI.
Parameter | Type |
---|---|
main |
click.core.Group |
get_auth_success_html()
def get_auth_success_html(
endpoint: str,
):
Get default success html. Return None to use flytekit’s default success html.
Parameter | Type |
---|---|
endpoint |
str |
get_default_cache_policies()
def get_default_cache_policies()
Get default cache policies for tasks.
get_default_image()
def get_default_image()
Get default image. Return None to use the images from flytekit.configuration.DefaultImages
get_remote()
def get_remote(
config: typing.Optional[str],
project: str,
domain: str,
data_upload_location: typing.Optional[str],
):
Get FlyteRemote object for CLI session.
Parameter | Type |
---|---|
config |
typing.Optional[str] |
project |
str |
domain |
str |
data_upload_location |
typing.Optional[str] |
secret_requires_group()
def secret_requires_group()
Return True if secrets require group entry during registration time.
flytekit.configuration.plugin.FlytekitPluginProtocol
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol): def meth(self) -> int: …
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C: def meth(self) -> int: return 0
def func(x: Proto) -> int: return x.meth()
func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProtoT: def meth(self) -> T: …
def FlytekitPluginProtocol(
args,
kwargs,
):
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
configure_pyflyte_cli() |
Configure pyflyte’s CLI |
get_auth_success_html() |
Get default success html for auth |
get_default_cache_policies() |
Get default cache policies for tasks |
get_default_image() |
Get default image |
get_remote() |
Get FlyteRemote object for CLI session |
secret_requires_group() |
Return True if secrets require group entry |
configure_pyflyte_cli()
def configure_pyflyte_cli(
main: click.core.Group,
):
Configure pyflyte’s CLI.
Parameter | Type |
---|---|
main |
click.core.Group |
get_auth_success_html()
def get_auth_success_html(
endpoint: str,
):
Get default success html for auth. Return None to use flytekit’s default success html.
Parameter | Type |
---|---|
endpoint |
str |
get_default_cache_policies()
def get_default_cache_policies()
Get default cache policies for tasks.
get_default_image()
def get_default_image()
Get default image. Return None to use the images from flytekit.configuration.DefaultImages
get_remote()
def get_remote(
config: typing.Optional[str],
project: str,
domain: str,
data_upload_location: typing.Optional[str],
):
Get FlyteRemote object for CLI session.
Parameter | Type |
---|---|
config |
typing.Optional[str] |
project |
str |
domain |
str |
data_upload_location |
typing.Optional[str] |
secret_requires_group()
def secret_requires_group()
Return True if secrets require group entry.
flytekit.configuration.plugin.Group
A group allows a command to have subcommands attached. This is the most common way to implement nesting in Click.
def Group(
name: typing.Optional[str],
commands: typing.Union[typing.MutableMapping[str, click.core.Command], typing.Sequence[click.core.Command], NoneType],
attrs: typing.Any,
):
Parameter | Type |
---|---|
name |
typing.Optional[str] |
commands |
typing.Union[typing.MutableMapping[str, click.core.Command], typing.Sequence[click.core.Command], NoneType] |
attrs |
typing.Any |
Methods
Method | Description |
---|---|
add_command() |
Registers another :class:Command with this group |
collect_usage_pieces() |
Returns all the pieces that go into the usage line and returns |
command() |
A shortcut decorator for declaring and attaching a command to |
format_commands() |
Extra format methods for multi methods that adds all the commands |
format_epilog() |
Writes the epilog into the formatter if it exists |
format_help() |
Writes the help into the formatter if it exists |
format_help_text() |
Writes the help text to the formatter if it exists |
format_options() |
Writes all the options into the formatter if they exist |
format_usage() |
Writes the usage line into the formatter |
get_command() |
Given a context and a command name, this returns a |
get_help() |
Formats the help into a string and returns it |
get_help_option() |
Returns the help option object |
get_help_option_names() |
Returns the names for the help option |
get_params() |
None |
get_short_help_str() |
Gets short help for the command or makes it by shortening the |
get_usage() |
Formats the usage line into a string and returns it |
group() |
A shortcut decorator for declaring and attaching a group to |
invoke() |
Given a context, this invokes the attached callback (if it exists) |
list_commands() |
Returns a list of subcommand names in the order they should |
main() |
This is the way to invoke a script with all the bells and |
make_context() |
This function when given an info name and arguments will kick |
make_parser() |
Creates the underlying option parser for this command |
parse_args() |
Given a context and a list of arguments this creates the parser |
resolve_command() |
None |
result_callback() |
Adds a result callback to the command |
shell_complete() |
Return a list of completions for the incomplete value |
to_info_dict() |
Gather information that could be useful for a tool generating |
add_command()
def add_command(
cmd: click.core.Command,
name: typing.Optional[str],
):
Registers another :class:Command
with this group. If the name
is not provided, the name of the command is used.
Parameter | Type |
---|---|
cmd |
click.core.Command |
name |
typing.Optional[str] |
collect_usage_pieces()
def collect_usage_pieces(
ctx: click.core.Context,
):
Returns all the pieces that go into the usage line and returns it as a list of strings.
Parameter | Type |
---|---|
ctx |
click.core.Context |
command()
def command(
args: `*args`,
kwargs: `**kwargs`,
):
A shortcut decorator for declaring and attaching a command to
the group. This takes the same arguments as :func:command
and
immediately registers the created command with this group by
calling :meth:add_command
.
To customize the command class used, set the
:attr:command_class
attribute.
.. versionchanged:: 8.1 This decorator can be applied without parentheses.
.. versionchanged:: 8.0
Added the :attr:command_class
attribute.
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
format_commands()
def format_commands(
ctx: click.core.Context,
formatter: click.formatting.HelpFormatter,
):
Extra format methods for multi methods that adds all the commands after the options.
Parameter | Type |
---|---|
ctx |
click.core.Context |
formatter |
click.formatting.HelpFormatter |
format_epilog()
def format_epilog(
ctx: click.core.Context,
formatter: click.formatting.HelpFormatter,
):
Writes the epilog into the formatter if it exists.
Parameter | Type |
---|---|
ctx |
click.core.Context |
formatter |
click.formatting.HelpFormatter |
format_help()
def format_help(
ctx: click.core.Context,
formatter: click.formatting.HelpFormatter,
):
Writes the help into the formatter if it exists.
This is a low-level method called by :meth:get_help
.
This calls the following methods:
- :meth:
format_usage
- :meth:
format_help_text
- :meth:
format_options
- :meth:
format_epilog
Parameter | Type |
---|---|
ctx |
click.core.Context |
formatter |
click.formatting.HelpFormatter |
format_help_text()
def format_help_text(
ctx: click.core.Context,
formatter: click.formatting.HelpFormatter,
):
Writes the help text to the formatter if it exists.
Parameter | Type |
---|---|
ctx |
click.core.Context |
formatter |
click.formatting.HelpFormatter |
format_options()
def format_options(
ctx: click.core.Context,
formatter: click.formatting.HelpFormatter,
):
Writes all the options into the formatter if they exist.
Parameter | Type |
---|---|
ctx |
click.core.Context |
formatter |
click.formatting.HelpFormatter |
format_usage()
def format_usage(
ctx: click.core.Context,
formatter: click.formatting.HelpFormatter,
):
Writes the usage line into the formatter.
This is a low-level method called by :meth:get_usage
.
Parameter | Type |
---|---|
ctx |
click.core.Context |
formatter |
click.formatting.HelpFormatter |
get_command()
def get_command(
ctx: click.core.Context,
cmd_name: str,
):
Given a context and a command name, this returns a
:class:Command
object if it exists or returns None
.
Parameter | Type |
---|---|
ctx |
click.core.Context |
cmd_name |
str |
get_help()
def get_help(
ctx: click.core.Context,
):
Formats the help into a string and returns it.
Calls :meth:format_help
internally.
Parameter | Type |
---|---|
ctx |
click.core.Context |
get_help_option()
def get_help_option(
ctx: click.core.Context,
):
Returns the help option object.
Unless add_help_option
is False
.
.. versionchanged:: 8.1.8 The help option is now cached to avoid creating it multiple times.
Parameter | Type |
---|---|
ctx |
click.core.Context |
get_help_option_names()
def get_help_option_names(
ctx: click.core.Context,
):
Returns the names for the help option.
Parameter | Type |
---|---|
ctx |
click.core.Context |
get_params()
def get_params(
ctx: click.core.Context,
):
Parameter | Type |
---|---|
ctx |
click.core.Context |
get_short_help_str()
def get_short_help_str(
limit: int,
):
Gets short help for the command or makes it by shortening the long help string.
Parameter | Type |
---|---|
limit |
int |
get_usage()
def get_usage(
ctx: click.core.Context,
):
Formats the usage line into a string and returns it.
Calls :meth:format_usage
internally.
Parameter | Type |
---|---|
ctx |
click.core.Context |
group()
def group(
args: `*args`,
kwargs: `**kwargs`,
):
A shortcut decorator for declaring and attaching a group to
the group. This takes the same arguments as :func:group
and
immediately registers the created group with this group by
calling :meth:add_command
.
To customize the group class used, set the :attr:group_class
attribute.
.. versionchanged:: 8.1 This decorator can be applied without parentheses.
.. versionchanged:: 8.0
Added the :attr:group_class
attribute.
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
invoke()
def invoke(
ctx: click.core.Context,
):
Given a context, this invokes the attached callback (if it exists) in the right way.
Parameter | Type |
---|---|
ctx |
click.core.Context |
list_commands()
def list_commands(
ctx: click.core.Context,
):
Returns a list of subcommand names in the order they should appear.
Parameter | Type |
---|---|
ctx |
click.core.Context |
main()
def main(
args: `*args`,
prog_name: typing.Optional[str],
complete_var: typing.Optional[str],
standalone_mode: bool,
windows_expand_args: bool,
extra: typing.Any,
):
This is the way to invoke a script with all the bells and
whistles as a command line application. This will always terminate
the application after a call. If this is not wanted, SystemExit
needs to be caught.
This method is also available by directly calling the instance of
a :class:Command
.
Parameter | Type |
---|---|
args |
*args |
prog_name |
typing.Optional[str] |
complete_var |
typing.Optional[str] |
standalone_mode |
bool |
windows_expand_args |
bool |
extra |
typing.Any |
make_context()
def make_context(
info_name: typing.Optional[str],
args: `*args`,
parent: typing.Optional[click.core.Context],
extra: typing.Any,
):
This function when given an info name and arguments will kick
off the parsing and create a new :class:Context
. It does not
invoke the actual command callback though.
To quickly customize the context class used without overriding
this method, set the :attr:context_class
attribute.
Parameter | Type |
---|---|
info_name |
typing.Optional[str] |
args |
*args |
parent |
typing.Optional[click.core.Context] |
extra |
typing.Any |
make_parser()
def make_parser(
ctx: click.core.Context,
):
Creates the underlying option parser for this command.
Parameter | Type |
---|---|
ctx |
click.core.Context |
parse_args()
def parse_args(
ctx: click.core.Context,
args: `*args`,
):
Given a context and a list of arguments this creates the parser
and parses the arguments, then modifies the context as necessary.
This is automatically invoked by :meth:make_context
.
Parameter | Type |
---|---|
ctx |
click.core.Context |
args |
*args |
resolve_command()
def resolve_command(
ctx: click.core.Context,
args: `*args`,
):
Parameter | Type |
---|---|
ctx |
click.core.Context |
args |
*args |
result_callback()
def result_callback(
replace: bool,
):
Adds a result callback to the command. By default if a
result callback is already registered this will chain them but
this can be disabled with the replace
parameter. The result
callback is invoked with the return value of the subcommand
(or the list of return values from all subcommands if chaining
is enabled) as well as the parameters as they would be passed
to the main callback.
Example::
@click.group() @click.option(’-i’, ‘–input’, default=23) def cli(input): return 42
@cli.result_callback() def process_result(result, input): return result + input
Parameter | Type |
---|---|
replace |
bool |
shell_complete()
def shell_complete(
ctx: click.core.Context,
incomplete: str,
):
Return a list of completions for the incomplete value. Looks at the names of options, subcommands, and chained multi-commands.
Parameter | Type |
---|---|
ctx |
click.core.Context |
incomplete |
str |
to_info_dict()
def to_info_dict(
ctx: click.core.Context,
):
Gather information that could be useful for a tool generating user-facing documentation. This traverses the entire structure below this command.
Use :meth:click.Context.to_info_dict
to traverse the entire
CLI structure.
Parameter | Type |
---|---|
ctx |
click.core.Context |
flytekit.configuration.plugin.Protocol
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol): def meth(self) -> int: …
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C: def meth(self) -> int: return 0
def func(x: Proto) -> int: return x.meth()
func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProtoT: def meth(self) -> T: …