flytekit.core.reference
Directory
Classes
Class |
Description |
ReferenceLaunchPlan |
A reference launch plan serves as a pointer to a Launch Plan that already exists on your Flyte installation. |
ReferenceTask |
This is a reference task, the body of the function passed in through the constructor will never be used, only the. |
ReferenceWorkflow |
A reference workflow is a pointer to a workflow that already exists on your Flyte installation. |
Errors
flytekit.core.reference.FlyteValidationException
Assertion failed.
def FlyteValidationException(
args,
timestamp: typing.Optional[float],
):
Parameter |
Type |
args |
*args |
timestamp |
typing.Optional[float] |
Properties
Property |
Type |
Description |
timestamp |
|
|
flytekit.core.reference.ReferenceLaunchPlan
A reference launch plan serves as a pointer to a Launch Plan that already exists on your Flyte installation. This
object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface.
If at registration time the interface provided causes an issue with compilation, an error will be returned.
def ReferenceLaunchPlan(
project: str,
domain: str,
name: str,
version: str,
inputs: Dict[str, Type],
outputs: Dict[str, Type],
):
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
inputs |
Dict[str, Type] |
outputs |
Dict[str, Type] |
Methods
clone_with()
def clone_with(
name: str,
parameters: Optional[_interface_models.ParameterMap],
fixed_inputs: Optional[_literal_models.LiteralMap],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
Parameter |
Type |
name |
str |
parameters |
Optional[_interface_models.ParameterMap] |
fixed_inputs |
Optional[_literal_models.LiteralMap] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
create()
def create(
name: str,
workflow: _annotated_workflow.WorkflowBase,
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
Parameter |
Type |
name |
str |
workflow |
_annotated_workflow.WorkflowBase |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
execute()
Parameter |
Type |
kwargs |
**kwargs |
get_default_launch_plan()
def get_default_launch_plan(
ctx: FlyteContext,
workflow: _annotated_workflow.WorkflowBase,
):
Users should probably call the get_or_create function defined below instead. A default launch plan is the one
that will just pick up whatever default values are defined in the workflow function signature (if any) and
use the default auth information supplied during serialization, with no notifications or schedules.
Parameter |
Type |
ctx |
FlyteContext |
workflow |
_annotated_workflow.WorkflowBase |
get_or_create()
def get_or_create(
workflow: _annotated_workflow.WorkflowBase,
name: Optional[str],
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not
supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it
will be used. If creating the default launch plan, none of the other arguments may be specified.
The resulting launch plan is also cached and if called again with the same name, the
cached version is returned
Parameter |
Type |
workflow |
_annotated_workflow.WorkflowBase |
name |
Optional[str] |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Please see the local_execute comments in the main task.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
unwrap_literal_map_and_execute()
def unwrap_literal_map_and_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Please see the implementation of the dispatch_execute function in the real task.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property |
Type |
Description |
annotations |
|
|
fixed_inputs |
|
|
id |
|
|
interface |
|
|
labels |
|
|
max_parallelism |
|
|
name |
|
|
notifications |
|
|
overwrite_cache |
|
|
parameters |
|
|
python_interface |
|
|
raw_output_data_config |
|
|
reference |
|
|
saved_inputs |
|
|
schedule |
|
|
security_context |
|
|
should_auto_activate |
|
|
trigger |
|
|
workflow |
|
|
flytekit.core.reference.ReferenceTask
This is a reference task, the body of the function passed in through the constructor will never be used, only the
signature of the function will be. The signature should also match the signature of the task you’re referencing,
as stored by Flyte Admin, if not, workflows using this will break upon compilation.
def ReferenceTask(
project: str,
domain: str,
name: str,
version: str,
inputs: Dict[str, type],
outputs: Dict[str, Type],
):
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
inputs |
Dict[str, type] |
outputs |
Dict[str, Type] |
Methods
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
This method translates Flyte’s Type system based input values and invokes the actual call to the executor
This method is also invoked during runtime.
VoidPromise
is returned in the case when the task itself declares no outputs.
Literal Map
is returned when the task returns either one more outputs in the declaration. Individual outputs
may be none
DynamicJobSpec
is returned when a dynamic workflow is executed
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
Parameter |
Type |
kwargs |
**kwargs |
find_lhs()
get_config()
def get_config(
settings: flytekit.configuration.SerializationSettings,
):
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom
defined for this task.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_container()
def get_container(
settings: flytekit.configuration.SerializationSettings,
):
Returns the container definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
):
Return additional plugin-specific custom data (if any) as a serializable dictionary.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: flytekit.configuration.SerializationSettings,
):
Returns the extended resources to allocate to the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: flytekit.configuration.SerializationSettings,
):
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
):
Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
def get_type_for_input_var(
k: str,
v: typing.Any,
):
Returns the python type for an input variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
):
Returns the python type for the specified output variable by name.
Parameter |
Type |
k |
str |
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Please see the local_execute comments in the main task.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
post_execute()
def post_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
rval: typing.Any,
):
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
):
This is the method that will be invoked directly before executing the task method and before all the inputs
are converted. One particular case where this is useful is if the context is to be modified for the user process
to get some user space parameters. This also ensures that things like SparkSession are already correctly
setup before the type transformers are called
This should return either the same context of the mutated context
Parameter |
Type |
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
unwrap_literal_map_and_execute()
def unwrap_literal_map_and_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Please see the implementation of the dispatch_execute function in the real task.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property |
Type |
Description |
deck_fields |
|
|
disable_deck |
|
|
docs |
|
|
enable_deck |
|
|
environment |
|
|
id |
|
|
instantiated_in |
|
|
interface |
|
|
lhs |
|
|
location |
|
|
metadata |
|
|
name |
|
|
python_interface |
|
|
reference |
|
|
security_context |
|
|
task_config |
|
|
task_type |
|
|
task_type_version |
|
|
flytekit.core.reference.ReferenceWorkflow
A reference workflow is a pointer to a workflow that already exists on your Flyte installation. This
object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface.
If at registration time the interface provided causes an issue with compilation, an error will be returned.
def ReferenceWorkflow(
project: str,
domain: str,
name: str,
version: str,
inputs: Dict[str, Type],
outputs: Dict[str, Type],
):
Parameter |
Type |
project |
str |
domain |
str |
name |
str |
version |
str |
inputs |
Dict[str, Type] |
outputs |
Dict[str, Type] |
Methods
add()
def add(
t: flytekit.core.python_auto_container.PythonAutoContainerTask,
):
Parameter |
Type |
t |
flytekit.core.python_auto_container.PythonAutoContainerTask |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
):
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
args |
*args |
kwargs |
**kwargs |
def construct_node_metadata()
execute()
Parameter |
Type |
kwargs |
**kwargs |
find_lhs()
get_all_tasks()
Future proof method. Just making it easy to access all tasks (Not required today as we auto register them)
load_task()
def load_task(
loader_args: typing.List[str],
):
Given the set of identifier keys, should return one Python Task or raise an error if not found
Parameter |
Type |
loader_args |
typing.List[str] |
loader_args()
def loader_args(
settings: flytekit.configuration.SerializationSettings,
t: flytekit.core.python_auto_container.PythonAutoContainerTask,
):
This is responsible for turning an instance of a task into args that the load_task function can reconstitute.
Parameter |
Type |
settings |
flytekit.configuration.SerializationSettings |
t |
flytekit.core.python_auto_container.PythonAutoContainerTask |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Please see the local_execute comments in the main task.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
task_name()
def task_name(
t: PythonAutoContainerTask,
):
Overridable function that can optionally return a custom name for a given task
Parameter |
Type |
t |
PythonAutoContainerTask |
unwrap_literal_map_and_execute()
def unwrap_literal_map_and_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
):
Please see the implementation of the dispatch_execute function in the real task.
Parameter |
Type |
ctx |
flytekit.core.context_manager.FlyteContext |
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
Property |
Type |
Description |
default_options |
|
|
docs |
|
|
failure_node |
|
|
function |
|
|
id |
|
|
instantiated_in |
|
|
interface |
|
|
lhs |
|
|
location |
|
|
name |
|
|
nodes |
|
|
on_failure |
|
|
output_bindings |
|
|
python_interface |
|
|
reference |
|
|
short_name |
|
|
workflow_metadata |
|
|
workflow_metadata_defaults |
|
|