flytekit.remote.backfill
Directory
Classes
Class | Description |
---|---|
FlyteLaunchPlan |
A class encapsulating a remote Flyte launch plan. |
ImperativeWorkflow |
An imperative workflow is a programmatic analogue to the typical @workflow function-based workflow and is. |
LaunchPlan |
Launch Plans are one of the core constructs of Flyte. |
WorkflowBase |
None. |
WorkflowFailurePolicy |
Defines the behavior for a workflow execution in the case of an observed node execution failure. |
croniter |
None. |
datetime |
datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]]). |
timedelta |
Difference between two datetime values. |
flytekit.remote.backfill.FlyteLaunchPlan
A class encapsulating a remote Flyte launch plan.
def FlyteLaunchPlan(
id,
args,
kwargs,
):
Parameter | Type |
---|---|
id |
|
args |
*args |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
compile() |
None |
construct_node_metadata() |
Used when constructing the node that encapsulates this task as part of a broader workflow definition |
execute() |
None |
from_flyte_idl() |
|
local_execute() |
None |
local_execution_mode() |
None |
promote_from_model() |
None |
serialize_to_string() |
None |
short_string() |
|
to_flyte_idl() |
|
verbose_string() |
compile()
def compile(
ctx: FlyteContext,
args,
kwargs,
):
Parameter | Type |
---|---|
ctx |
FlyteContext |
args |
*args |
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
execute()
def execute(
kwargs,
):
Parameter | Type |
---|---|
kwargs |
**kwargs |
from_flyte_idl()
def from_flyte_idl(
pb2,
):
Parameter | Type |
---|---|
pb2 |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
):
Parameter | Type |
---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
promote_from_model()
def promote_from_model(
id: id_models.Identifier,
model: _launch_plan_models.LaunchPlanSpec,
):
Parameter | Type |
---|---|
id |
id_models.Identifier |
model |
_launch_plan_models.LaunchPlanSpec |
serialize_to_string()
def serialize_to_string()
short_string()
def short_string()
to_flyte_idl()
def to_flyte_idl()
verbose_string()
def verbose_string()
Properties
Property | Type | Description |
---|---|---|
annotations | ||
auth_role | ||
default_inputs | ||
entity_metadata | ||
entity_type_text | ||
fixed_inputs | ||
flyte_workflow | ||
id | ||
interface | ||
is_empty | ||
is_scheduled | ||
labels | ||
max_parallelism | ||
name | ||
overwrite_cache | ||
python_interface | ||
raw_output_data_config | ||
resource_type | ||
security_context | ||
workflow_id |
flytekit.remote.backfill.ImperativeWorkflow
An imperative workflow is a programmatic analogue to the typical @workflow
function-based workflow and is
better suited to programmatic applications.
Assuming you have some tasks like so
.. literalinclude:: ../../../tests/flytekit/unit/core/test_imperative.py :start-after: # docs_tasks_start :end-before: # docs_tasks_end :language: python :dedent: 4
You could create a workflow imperatively like so
.. literalinclude:: ../../../tests/flytekit/unit/core/test_imperative.py :start-after: # docs_start :end-before: # docs_end :language: python :dedent: 4
This workflow would be identical on the back-end to
.. literalinclude:: ../../../tests/flytekit/unit/core/test_imperative.py :start-after: # docs_equivalent_start :end-before: # docs_equivalent_end :language: python :dedent: 4
Note that the only reason we need the NamedTuple
is so we can name the output the same thing as in the
imperative example. The imperative paradigm makes the naming of workflow outputs easier, but this isn’t a big
deal in function-workflows because names tend to not be necessary.
def ImperativeWorkflow(
name: str,
failure_policy: Optional[WorkflowFailurePolicy],
interruptible: bool,
):
Parameter | Type |
---|---|
name |
str |
failure_policy |
Optional[WorkflowFailurePolicy] |
interruptible |
bool |
Methods
Method | Description |
---|---|
add_entity() |
Anytime you add an entity, all the inputs to the entity must be bound |
add_launch_plan() |
None |
add_on_failure_handler() |
This is a special function that mimics the add_entity function, but this is only used |
add_subwf() |
None |
add_task() |
None |
add_workflow_input() |
Adds an input to the workflow |
add_workflow_output() |
Add an output with the given name from the given node output |
compile() |
None |
construct_node_metadata() |
None |
create_conditional() |
None |
execute() |
Called by local_execute |
local_execute() |
None |
local_execution_mode() |
None |
ready() |
This function returns whether or not the workflow is in a ready state, which means |
add_entity()
def add_entity(
entity: Union[PythonTask, _annotated_launch_plan.LaunchPlan, WorkflowBase],
kwargs,
):
Anytime you add an entity, all the inputs to the entity must be bound.
Parameter | Type |
---|---|
entity |
Union[PythonTask, _annotated_launch_plan.LaunchPlan, WorkflowBase] |
kwargs |
**kwargs |
add_launch_plan()
def add_launch_plan(
launch_plan: _annotated_launch_plan.LaunchPlan,
kwargs,
):
Parameter | Type |
---|---|
launch_plan |
_annotated_launch_plan.LaunchPlan |
kwargs |
**kwargs |
add_on_failure_handler()
def add_on_failure_handler(
entity,
):
This is a special function that mimics the add_entity function, but this is only used to add the failure node. Failure nodes are special because we don’t want them to be part of the main workflow.
Parameter | Type |
---|---|
entity |
add_subwf()
def add_subwf(
sub_wf: WorkflowBase,
kwargs,
):
Parameter | Type |
---|---|
sub_wf |
WorkflowBase |
kwargs |
**kwargs |
add_task()
def add_task(
task: PythonTask,
kwargs,
):
Parameter | Type |
---|---|
task |
PythonTask |
kwargs |
**kwargs |
add_workflow_input()
def add_workflow_input(
input_name: str,
python_type: Type,
):
Adds an input to the workflow.
Parameter | Type |
---|---|
input_name |
str |
python_type |
Type |
add_workflow_output()
def add_workflow_output(
output_name: str,
p: Union[Promise, List[Promise], Dict[str, Promise]],
python_type: Optional[Type],
):
Add an output with the given name from the given node output.
Parameter | Type |
---|---|
output_name |
str |
p |
Union[Promise, List[Promise], Dict[str, Promise]] |
python_type |
Optional[Type] |
compile()
def compile(
kwargs,
):
Parameter | Type |
---|---|
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()
create_conditional()
def create_conditional(
name: str,
):
Parameter | Type |
---|---|
name |
str |
execute()
def execute(
kwargs,
):
Called by local_execute. This function is how local execution for imperative workflows runs. Because when an entity is added using the add_entity function, all inputs to that entity should’ve been already declared, we can just iterate through the nodes in order and we shouldn’t run into any dependency issues. That is, we force the user to declare entities already in a topological sort. To keep track of outputs, we create a map to start things off, filled in only with the workflow inputs (if any). As things are run, their outputs are stored in this map. After all nodes are run, we fill in workflow level outputs the same way as any other previous node.
Parameter | Type |
---|---|
kwargs |
**kwargs |
local_execute()
def local_execute(
ctx: FlyteContext,
kwargs,
):
Parameter | Type |
---|---|
ctx |
FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
ready()
def ready()
This function returns whether or not the workflow is in a ready state, which means
- Has at least one node
- All workflow inputs are bound
These conditions assume that all nodes and workflow i/o changes were done with the functions above, which do additional checking.
Properties
Property | Type | Description |
---|---|---|
compilation_state | ||
default_options | ||
docs | ||
failure_node | ||
inputs | ||
interface | ||
name | ||
nodes | ||
on_failure | ||
output_bindings | ||
python_interface | ||
short_name | ||
workflow_metadata | ||
workflow_metadata_defaults |
flytekit.remote.backfill.LaunchPlan
Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the
:std:ref:core concepts <flyte:divedeep-launchplans>
if you are unfamiliar with them.
Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow
.. code-block:: python
@workflow def wf(a: int, c: str) -> str: …
Create the default launch plan with
.. code-block:: python
LaunchPlan.get_or_create(workflow=my_wf)
If you specify additional parameters, you’ll also have to give the launch plan a unique name. Default and fixed inputs can be expressed as Python native values like so:
.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # fixed_and_default_start :end-before: # fixed_and_default_end :language: python :dedent: 4
Additionally, a launch plan can be configured to run on a schedule and emit notifications.
Please see the relevant Schedule and Notification objects as well.
To configure the remaining parameters, you’ll need to import the relevant model objects as well.
.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # schedule_start :end-before: # schedule_end :language: python :dedent: 4
.. code-block:: python
from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfig
Then use as follows
.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # auth_role_start :end-before: # auth_role_end :language: python :dedent: 4
def LaunchPlan(
name: str,
workflow: _annotated_workflow.WorkflowBase,
parameters: _interface_models.ParameterMap,
fixed_inputs: _literal_models.LiteralMap,
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
Parameter | Type |
---|---|
name |
str |
workflow |
_annotated_workflow.WorkflowBase |
parameters |
_interface_models.ParameterMap |
fixed_inputs |
_literal_models.LiteralMap |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
Methods
Method | Description |
---|---|
clone_with() |
None |
construct_node_metadata() |
None |
create() |
None |
get_default_launch_plan() |
Users should probably call the get_or_create function defined below instead |
get_or_create() |
This function offers a friendlier interface for creating launch plans |
clone_with()
def clone_with(
name: str,
parameters: Optional[_interface_models.ParameterMap],
fixed_inputs: Optional[_literal_models.LiteralMap],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
Parameter | Type |
---|---|
name |
str |
parameters |
Optional[_interface_models.ParameterMap] |
fixed_inputs |
Optional[_literal_models.LiteralMap] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
construct_node_metadata()
def construct_node_metadata()
create()
def create(
name: str,
workflow: _annotated_workflow.WorkflowBase,
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
Parameter | Type |
---|---|
name |
str |
workflow |
_annotated_workflow.WorkflowBase |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
get_default_launch_plan()
def get_default_launch_plan(
ctx: FlyteContext,
workflow: _annotated_workflow.WorkflowBase,
):
Users should probably call the get_or_create function defined below instead. A default launch plan is the one that will just pick up whatever default values are defined in the workflow function signature (if any) and use the default auth information supplied during serialization, with no notifications or schedules.
Parameter | Type |
---|---|
ctx |
FlyteContext |
workflow |
_annotated_workflow.WorkflowBase |
get_or_create()
def get_or_create(
workflow: _annotated_workflow.WorkflowBase,
name: Optional[str],
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
):
This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it will be used. If creating the default launch plan, none of the other arguments may be specified.
The resulting launch plan is also cached and if called again with the same name, the cached version is returned
Parameter | Type |
---|---|
workflow |
_annotated_workflow.WorkflowBase |
name |
Optional[str] |
default_inputs |
Optional[Dict[str, Any]] |
fixed_inputs |
Optional[Dict[str, Any]] |
schedule |
Optional[_schedule_model.Schedule] |
notifications |
Optional[List[_common_models.Notification]] |
labels |
Optional[_common_models.Labels] |
annotations |
Optional[_common_models.Annotations] |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
max_parallelism |
Optional[int] |
security_context |
Optional[security.SecurityContext] |
auth_role |
Optional[_common_models.AuthRole] |
trigger |
Optional[LaunchPlanTriggerBase] |
overwrite_cache |
Optional[bool] |
auto_activate |
bool |
Properties
Property | Type | Description |
---|---|---|
annotations | ||
fixed_inputs | ||
interface | ||
labels | ||
max_parallelism | ||
name | ||
notifications | ||
overwrite_cache | ||
parameters | ||
python_interface | ||
raw_output_data_config | ||
saved_inputs | ||
schedule | ||
security_context | ||
should_auto_activate | ||
trigger | ||
workflow |
flytekit.remote.backfill.WorkflowBase
def WorkflowBase(
name: str,
workflow_metadata: WorkflowMetadata,
workflow_metadata_defaults: WorkflowMetadataDefaults,
python_interface: Interface,
on_failure: Optional[Union[WorkflowBase, Task]],
docs: Optional[Documentation],
default_options: Optional[Options],
kwargs,
):
Parameter | Type |
---|---|
name |
str |
workflow_metadata |
WorkflowMetadata |
workflow_metadata_defaults |
WorkflowMetadataDefaults |
python_interface |
Interface |
on_failure |
Optional[Union[WorkflowBase, Task]] |
docs |
Optional[Documentation] |
default_options |
Optional[Options] |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
compile() |
None |
construct_node_metadata() |
None |
execute() |
None |
local_execute() |
None |
local_execution_mode() |
None |
compile()
def compile(
kwargs,
):
Parameter | Type |
---|---|
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()
execute()
def execute(
kwargs,
):
Parameter | Type |
---|---|
kwargs |
**kwargs |
local_execute()
def local_execute(
ctx: FlyteContext,
kwargs,
):
Parameter | Type |
---|---|
ctx |
FlyteContext |
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()
Properties
Property | Type | Description |
---|---|---|
default_options | ||
docs | ||
failure_node | ||
interface | ||
name | ||
nodes | ||
on_failure | ||
output_bindings | ||
python_interface | ||
short_name | ||
workflow_metadata | ||
workflow_metadata_defaults |
flytekit.remote.backfill.WorkflowFailurePolicy
Defines the behavior for a workflow execution in the case of an observed node execution failure. By default, a workflow execution will immediately enter a failed state if a component node fails.
flytekit.remote.backfill.croniter
def croniter(
expr_format,
start_time,
ret_type,
day_or,
max_years_between_matches,
is_prev,
hash_id,
implement_cron_bug,
second_at_beginning,
expand_from_start_time,
):
Parameter | Type |
---|---|
expr_format |
|
start_time |
|
ret_type |
|
day_or |
|
max_years_between_matches |
|
is_prev |
|
hash_id |
|
implement_cron_bug |
|
second_at_beginning |
|
expand_from_start_time |
Methods
Method | Description |
---|---|
all_next() |
Returns a generator yielding consecutive dates |
all_prev() |
Returns a generator yielding previous dates |
datetime_to_timestamp() |
Converts a datetime object d into a UNIX timestamp |
expand() |
Expand a cron expression format into a noramlized format of |
get_current() |
None |
get_next() |
None |
get_prev() |
None |
is_leap() |
None |
is_valid() |
None |
iter() |
None |
match() |
None |
match_range() |
None |
next() |
None |
set_current() |
None |
timedelta_to_seconds() |
Converts a ‘datetime |
timestamp_to_datetime() |
Converts a UNIX timestamp into a datetime object |
value_alias() |
None |
all_next()
def all_next(
ret_type,
start_time,
update_current,
):
Returns a generator yielding consecutive dates.
May be used instead of an implicit call to iter whenever a
non-default ret_type
needs to be specified.
Parameter | Type |
---|---|
ret_type |
|
start_time |
|
update_current |
all_prev()
def all_prev(
ret_type,
start_time,
update_current,
):
Returns a generator yielding previous dates.
Parameter | Type |
---|---|
ret_type |
|
start_time |
|
update_current |
datetime_to_timestamp()
def datetime_to_timestamp(
d,
):
Converts a datetime
object d
into a UNIX timestamp.
Parameter | Type |
---|---|
d |
expand()
def expand(
expr_format,
hash_id,
second_at_beginning,
from_timestamp,
):
Expand a cron expression format into a noramlized format of list[list[int | ’l’ | ‘*’]]. The first list representing each element of the epxression, and each sub-list representing the allowed values for that expression component.
A tuple is returned, the first value being the expanded epxression
list, and the second being a nth_weekday_of_month
mapping.
Examples:
Every minute
croniter.expand(’* * * * ’) ([[’’], [’’], [’’], [’’], [’’]], {})
On the hour
croniter.expand(‘0 0 * * ’) ([[0], [0], [’’], [’’], [’’]], {})
Hours 0-5 and 10 monday through friday
croniter.expand(‘0-5,10 * * * mon-fri’) ([[0, 1, 2, 3, 4, 5, 10], [’’], [’’], [’*’], [1, 2, 3, 4, 5]], {})
Note that some special values such as nth day of week are expanded to a special mapping format for later processing:
Every minute on the 3rd tuesday of the month
croniter.expand(’* * * * 2#3’) ([[’’], [’’], [’’], [’’], [2]], {2: {3}})
Every hour on the last day of the month
croniter.expand(‘0 * l * ’) ([[0], [’’], [’l’], [’’], [’’]], {})
On the hour every 15 seconds
croniter.expand(‘0 0 * * * /15’) ([[0], [0], [’’], [’’], [’’], [0, 15, 30, 45]], {})
Parameter | Type |
---|---|
expr_format |
|
hash_id |
|
second_at_beginning |
|
from_timestamp |
get_current()
def get_current(
ret_type,
):
Parameter | Type |
---|---|
ret_type |
get_next()
def get_next(
ret_type,
start_time,
update_current,
):
Parameter | Type |
---|---|
ret_type |
|
start_time |
|
update_current |
get_prev()
def get_prev(
ret_type,
start_time,
update_current,
):
Parameter | Type |
---|---|
ret_type |
|
start_time |
|
update_current |
is_leap()
def is_leap(
year,
):
Parameter | Type |
---|---|
year |
is_valid()
def is_valid(
expression,
hash_id,
encoding,
second_at_beginning,
):
Parameter | Type |
---|---|
expression |
|
hash_id |
|
encoding |
|
second_at_beginning |
iter()
def iter(
args,
kwargs,
):
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
match()
def match(
cron_expression,
testdate,
day_or,
second_at_beginning,
):
Parameter | Type |
---|---|
cron_expression |
|
testdate |
|
day_or |
|
second_at_beginning |
match_range()
def match_range(
cron_expression,
from_datetime,
to_datetime,
day_or,
second_at_beginning,
):
Parameter | Type |
---|---|
cron_expression |
|
from_datetime |
|
to_datetime |
|
day_or |
|
second_at_beginning |
next()
def next(
ret_type,
start_time,
is_prev,
update_current,
):
Parameter | Type |
---|---|
ret_type |
|
start_time |
|
is_prev |
|
update_current |
set_current()
def set_current(
start_time,
force,
):
Parameter | Type |
---|---|
start_time |
|
force |
timedelta_to_seconds()
def timedelta_to_seconds(
td,
):
Converts a ‘datetime.timedelta’ object td
into seconds contained in
the duration.
Note: We cannot use timedelta.total_seconds()
because this is not
supported by Python 2.6.
Parameter | Type |
---|---|
td |
timestamp_to_datetime()
def timestamp_to_datetime(
timestamp,
tzinfo,
):
Converts a UNIX timestamp
into a datetime
object.
Parameter | Type |
---|---|
timestamp |
|
tzinfo |
value_alias()
def value_alias(
val,
field_index,
len_expressions,
):
Parameter | Type |
---|---|
val |
|
field_index |
|
len_expressions |
flytekit.remote.backfill.datetime
datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]])
The year, month and day arguments are required. tzinfo may be None, or an instance of a tzinfo subclass. The remaining arguments may be ints.
flytekit.remote.backfill.timedelta
Difference between two datetime values.
timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)
All arguments are optional and default to 0. Arguments may be integers or floats, and may be positive or negative.