1.15.4.dev2+g3e3ce2426

flytekit.remote.backfill

Directory

Classes

Class Description
FlyteLaunchPlan A class encapsulating a remote Flyte launch plan.
ImperativeWorkflow An imperative workflow is a programmatic analogue to the typical @workflow function-based workflow and is.
LaunchPlan Launch Plans are one of the core constructs of Flyte.
WorkflowBase None.
WorkflowFailurePolicy Defines the behavior for a workflow execution in the case of an observed node execution failure.
croniter None.
datetime datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]]).
timedelta Difference between two datetime values.

flytekit.remote.backfill.FlyteLaunchPlan

A class encapsulating a remote Flyte launch plan.

def FlyteLaunchPlan(
    id,
    args,
    kwargs,
):
Parameter Type
id
args *args
kwargs **kwargs

Methods

Method Description
compile() None
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition
execute() None
from_flyte_idl()
local_execute() None
local_execution_mode() None
promote_from_model() None
serialize_to_string() None
short_string()
to_flyte_idl()
verbose_string()

compile()

def compile(
    ctx: FlyteContext,
    args,
    kwargs,
):
Parameter Type
ctx FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

execute()

def execute(
    kwargs,
):
Parameter Type
kwargs **kwargs

from_flyte_idl()

def from_flyte_idl(
    pb2,
):
Parameter Type
pb2

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
):
Parameter Type
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

promote_from_model()

def promote_from_model(
    id: id_models.Identifier,
    model: _launch_plan_models.LaunchPlanSpec,
):
Parameter Type
id id_models.Identifier
model _launch_plan_models.LaunchPlanSpec

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

to_flyte_idl()

def to_flyte_idl()

verbose_string()

def verbose_string()

Properties

Property Type Description
annotations
auth_role
default_inputs
entity_metadata
entity_type_text
fixed_inputs
flyte_workflow
id
interface
is_empty
is_scheduled
labels
max_parallelism
name
overwrite_cache
python_interface
raw_output_data_config
resource_type
security_context
workflow_id

flytekit.remote.backfill.ImperativeWorkflow

An imperative workflow is a programmatic analogue to the typical @workflow function-based workflow and is better suited to programmatic applications.

Assuming you have some tasks like so

.. literalinclude:: ../../../tests/flytekit/unit/core/test_imperative.py :start-after: # docs_tasks_start :end-before: # docs_tasks_end :language: python :dedent: 4

You could create a workflow imperatively like so

.. literalinclude:: ../../../tests/flytekit/unit/core/test_imperative.py :start-after: # docs_start :end-before: # docs_end :language: python :dedent: 4

This workflow would be identical on the back-end to

.. literalinclude:: ../../../tests/flytekit/unit/core/test_imperative.py :start-after: # docs_equivalent_start :end-before: # docs_equivalent_end :language: python :dedent: 4

Note that the only reason we need the NamedTuple is so we can name the output the same thing as in the imperative example. The imperative paradigm makes the naming of workflow outputs easier, but this isn’t a big deal in function-workflows because names tend to not be necessary.

def ImperativeWorkflow(
    name: str,
    failure_policy: Optional[WorkflowFailurePolicy],
    interruptible: bool,
):
Parameter Type
name str
failure_policy Optional[WorkflowFailurePolicy]
interruptible bool

Methods

Method Description
add_entity() Anytime you add an entity, all the inputs to the entity must be bound
add_launch_plan() None
add_on_failure_handler() This is a special function that mimics the add_entity function, but this is only used
add_subwf() None
add_task() None
add_workflow_input() Adds an input to the workflow
add_workflow_output() Add an output with the given name from the given node output
compile() None
construct_node_metadata() None
create_conditional() None
execute() Called by local_execute
local_execute() None
local_execution_mode() None
ready() This function returns whether or not the workflow is in a ready state, which means

add_entity()

def add_entity(
    entity: Union[PythonTask, _annotated_launch_plan.LaunchPlan, WorkflowBase],
    kwargs,
):

Anytime you add an entity, all the inputs to the entity must be bound.

Parameter Type
entity Union[PythonTask, _annotated_launch_plan.LaunchPlan, WorkflowBase]
kwargs **kwargs

add_launch_plan()

def add_launch_plan(
    launch_plan: _annotated_launch_plan.LaunchPlan,
    kwargs,
):
Parameter Type
launch_plan _annotated_launch_plan.LaunchPlan
kwargs **kwargs

add_on_failure_handler()

def add_on_failure_handler(
    entity,
):

This is a special function that mimics the add_entity function, but this is only used to add the failure node. Failure nodes are special because we don’t want them to be part of the main workflow.

Parameter Type
entity

add_subwf()

def add_subwf(
    sub_wf: WorkflowBase,
    kwargs,
):
Parameter Type
sub_wf WorkflowBase
kwargs **kwargs

add_task()

def add_task(
    task: PythonTask,
    kwargs,
):
Parameter Type
task PythonTask
kwargs **kwargs

add_workflow_input()

def add_workflow_input(
    input_name: str,
    python_type: Type,
):

Adds an input to the workflow.

Parameter Type
input_name str
python_type Type

add_workflow_output()

def add_workflow_output(
    output_name: str,
    p: Union[Promise, List[Promise], Dict[str, Promise]],
    python_type: Optional[Type],
):

Add an output with the given name from the given node output.

Parameter Type
output_name str
p Union[Promise, List[Promise], Dict[str, Promise]]
python_type Optional[Type]

compile()

def compile(
    kwargs,
):
Parameter Type
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

create_conditional()

def create_conditional(
    name: str,
):
Parameter Type
name str

execute()

def execute(
    kwargs,
):

Called by local_execute. This function is how local execution for imperative workflows runs. Because when an entity is added using the add_entity function, all inputs to that entity should’ve been already declared, we can just iterate through the nodes in order and we shouldn’t run into any dependency issues. That is, we force the user to declare entities already in a topological sort. To keep track of outputs, we create a map to start things off, filled in only with the workflow inputs (if any). As things are run, their outputs are stored in this map. After all nodes are run, we fill in workflow level outputs the same way as any other previous node.

Parameter Type
kwargs **kwargs

local_execute()

def local_execute(
    ctx: FlyteContext,
    kwargs,
):
Parameter Type
ctx FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

ready()

def ready()

This function returns whether or not the workflow is in a ready state, which means

  • Has at least one node
  • All workflow inputs are bound

These conditions assume that all nodes and workflow i/o changes were done with the functions above, which do additional checking.

Properties

Property Type Description
compilation_state
default_options
docs
failure_node
inputs
interface
name
nodes
on_failure
output_bindings
python_interface
short_name
workflow_metadata
workflow_metadata_defaults

flytekit.remote.backfill.LaunchPlan

Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the :std:ref:core concepts <flyte:divedeep-launchplans> if you are unfamiliar with them.

Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow

.. code-block:: python

@workflow def wf(a: int, c: str) -> str: …

Create the default launch plan with

.. code-block:: python

LaunchPlan.get_or_create(workflow=my_wf)

If you specify additional parameters, you’ll also have to give the launch plan a unique name. Default and fixed inputs can be expressed as Python native values like so:

.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # fixed_and_default_start :end-before: # fixed_and_default_end :language: python :dedent: 4

Additionally, a launch plan can be configured to run on a schedule and emit notifications.

Please see the relevant Schedule and Notification objects as well.

To configure the remaining parameters, you’ll need to import the relevant model objects as well.

.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # schedule_start :end-before: # schedule_end :language: python :dedent: 4

.. code-block:: python

from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfig

Then use as follows

.. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py :start-after: # auth_role_start :end-before: # auth_role_end :language: python :dedent: 4

def LaunchPlan(
    name: str,
    workflow: _annotated_workflow.WorkflowBase,
    parameters: _interface_models.ParameterMap,
    fixed_inputs: _literal_models.LiteralMap,
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):
Parameter Type
name str
workflow _annotated_workflow.WorkflowBase
parameters _interface_models.ParameterMap
fixed_inputs _literal_models.LiteralMap
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

Methods

Method Description
clone_with() None
construct_node_metadata() None
create() None
get_default_launch_plan() Users should probably call the get_or_create function defined below instead
get_or_create() This function offers a friendlier interface for creating launch plans

clone_with()

def clone_with(
    name: str,
    parameters: Optional[_interface_models.ParameterMap],
    fixed_inputs: Optional[_literal_models.LiteralMap],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):
Parameter Type
name str
parameters Optional[_interface_models.ParameterMap]
fixed_inputs Optional[_literal_models.LiteralMap]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

construct_node_metadata()

def construct_node_metadata()

create()

def create(
    name: str,
    workflow: _annotated_workflow.WorkflowBase,
    default_inputs: Optional[Dict[str, Any]],
    fixed_inputs: Optional[Dict[str, Any]],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    auth_role: Optional[_common_models.AuthRole],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):
Parameter Type
name str
workflow _annotated_workflow.WorkflowBase
default_inputs Optional[Dict[str, Any]]
fixed_inputs Optional[Dict[str, Any]]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
auth_role Optional[_common_models.AuthRole]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

get_default_launch_plan()

def get_default_launch_plan(
    ctx: FlyteContext,
    workflow: _annotated_workflow.WorkflowBase,
):

Users should probably call the get_or_create function defined below instead. A default launch plan is the one that will just pick up whatever default values are defined in the workflow function signature (if any) and use the default auth information supplied during serialization, with no notifications or schedules.

Parameter Type
ctx FlyteContext
workflow _annotated_workflow.WorkflowBase

get_or_create()

def get_or_create(
    workflow: _annotated_workflow.WorkflowBase,
    name: Optional[str],
    default_inputs: Optional[Dict[str, Any]],
    fixed_inputs: Optional[Dict[str, Any]],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    auth_role: Optional[_common_models.AuthRole],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
):

This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it will be used. If creating the default launch plan, none of the other arguments may be specified.

The resulting launch plan is also cached and if called again with the same name, the cached version is returned

Parameter Type
workflow _annotated_workflow.WorkflowBase
name Optional[str]
default_inputs Optional[Dict[str, Any]]
fixed_inputs Optional[Dict[str, Any]]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
auth_role Optional[_common_models.AuthRole]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

Properties

Property Type Description
annotations
fixed_inputs
interface
labels
max_parallelism
name
notifications
overwrite_cache
parameters
python_interface
raw_output_data_config
saved_inputs
schedule
security_context
should_auto_activate
trigger
workflow

flytekit.remote.backfill.WorkflowBase

def WorkflowBase(
    name: str,
    workflow_metadata: WorkflowMetadata,
    workflow_metadata_defaults: WorkflowMetadataDefaults,
    python_interface: Interface,
    on_failure: Optional[Union[WorkflowBase, Task]],
    docs: Optional[Documentation],
    default_options: Optional[Options],
    kwargs,
):
Parameter Type
name str
workflow_metadata WorkflowMetadata
workflow_metadata_defaults WorkflowMetadataDefaults
python_interface Interface
on_failure Optional[Union[WorkflowBase, Task]]
docs Optional[Documentation]
default_options Optional[Options]
kwargs **kwargs

Methods

Method Description
compile() None
construct_node_metadata() None
execute() None
local_execute() None
local_execution_mode() None

compile()

def compile(
    kwargs,
):
Parameter Type
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

execute()

def execute(
    kwargs,
):
Parameter Type
kwargs **kwargs

local_execute()

def local_execute(
    ctx: FlyteContext,
    kwargs,
):
Parameter Type
ctx FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

Properties

Property Type Description
default_options
docs
failure_node
interface
name
nodes
on_failure
output_bindings
python_interface
short_name
workflow_metadata
workflow_metadata_defaults

flytekit.remote.backfill.WorkflowFailurePolicy

Defines the behavior for a workflow execution in the case of an observed node execution failure. By default, a workflow execution will immediately enter a failed state if a component node fails.

flytekit.remote.backfill.croniter

def croniter(
    expr_format,
    start_time,
    ret_type,
    day_or,
    max_years_between_matches,
    is_prev,
    hash_id,
    implement_cron_bug,
    second_at_beginning,
    expand_from_start_time,
):
Parameter Type
expr_format
start_time
ret_type
day_or
max_years_between_matches
is_prev
hash_id
implement_cron_bug
second_at_beginning
expand_from_start_time

Methods

Method Description
all_next() Returns a generator yielding consecutive dates
all_prev() Returns a generator yielding previous dates
datetime_to_timestamp() Converts a datetime object d into a UNIX timestamp
expand() Expand a cron expression format into a noramlized format of
get_current() None
get_next() None
get_prev() None
is_leap() None
is_valid() None
iter() None
match() None
match_range() None
next() None
set_current() None
timedelta_to_seconds() Converts a ‘datetime
timestamp_to_datetime() Converts a UNIX timestamp into a datetime object
value_alias() None

all_next()

def all_next(
    ret_type,
    start_time,
    update_current,
):

Returns a generator yielding consecutive dates.

May be used instead of an implicit call to iter whenever a non-default ret_type needs to be specified.

Parameter Type
ret_type
start_time
update_current

all_prev()

def all_prev(
    ret_type,
    start_time,
    update_current,
):

Returns a generator yielding previous dates.

Parameter Type
ret_type
start_time
update_current

datetime_to_timestamp()

def datetime_to_timestamp(
    d,
):

Converts a datetime object d into a UNIX timestamp.

Parameter Type
d

expand()

def expand(
    expr_format,
    hash_id,
    second_at_beginning,
    from_timestamp,
):

Expand a cron expression format into a noramlized format of list[list[int | ’l’ | ‘*’]]. The first list representing each element of the epxression, and each sub-list representing the allowed values for that expression component.

A tuple is returned, the first value being the expanded epxression list, and the second being a nth_weekday_of_month mapping.

Examples:

Every minute

croniter.expand(’* * * * ’) ([[’’], [’’], [’’], [’’], [’’]], {})

On the hour

croniter.expand(‘0 0 * * ’) ([[0], [0], [’’], [’’], [’’]], {})

Hours 0-5 and 10 monday through friday

croniter.expand(‘0-5,10 * * * mon-fri’) ([[0, 1, 2, 3, 4, 5, 10], [’’], [’’], [’*’], [1, 2, 3, 4, 5]], {})

Note that some special values such as nth day of week are expanded to a special mapping format for later processing:

Every minute on the 3rd tuesday of the month

croniter.expand(’* * * * 2#3’) ([[’’], [’’], [’’], [’’], [2]], {2: {3}})

Every hour on the last day of the month

croniter.expand(‘0 * l * ’) ([[0], [’’], [’l’], [’’], [’’]], {})

On the hour every 15 seconds

croniter.expand(‘0 0 * * * /15’) ([[0], [0], [’’], [’’], [’’], [0, 15, 30, 45]], {})

Parameter Type
expr_format
hash_id
second_at_beginning
from_timestamp

get_current()

def get_current(
    ret_type,
):
Parameter Type
ret_type

get_next()

def get_next(
    ret_type,
    start_time,
    update_current,
):
Parameter Type
ret_type
start_time
update_current

get_prev()

def get_prev(
    ret_type,
    start_time,
    update_current,
):
Parameter Type
ret_type
start_time
update_current

is_leap()

def is_leap(
    year,
):
Parameter Type
year

is_valid()

def is_valid(
    expression,
    hash_id,
    encoding,
    second_at_beginning,
):
Parameter Type
expression
hash_id
encoding
second_at_beginning

iter()

def iter(
    args,
    kwargs,
):
Parameter Type
args *args
kwargs **kwargs

match()

def match(
    cron_expression,
    testdate,
    day_or,
    second_at_beginning,
):
Parameter Type
cron_expression
testdate
day_or
second_at_beginning

match_range()

def match_range(
    cron_expression,
    from_datetime,
    to_datetime,
    day_or,
    second_at_beginning,
):
Parameter Type
cron_expression
from_datetime
to_datetime
day_or
second_at_beginning

next()

def next(
    ret_type,
    start_time,
    is_prev,
    update_current,
):
Parameter Type
ret_type
start_time
is_prev
update_current

set_current()

def set_current(
    start_time,
    force,
):
Parameter Type
start_time
force

timedelta_to_seconds()

def timedelta_to_seconds(
    td,
):

Converts a ‘datetime.timedelta’ object td into seconds contained in the duration. Note: We cannot use timedelta.total_seconds() because this is not supported by Python 2.6.

Parameter Type
td

timestamp_to_datetime()

def timestamp_to_datetime(
    timestamp,
    tzinfo,
):

Converts a UNIX timestamp into a datetime object.

Parameter Type
timestamp
tzinfo

value_alias()

def value_alias(
    val,
    field_index,
    len_expressions,
):
Parameter Type
val
field_index
len_expressions

flytekit.remote.backfill.datetime

datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]])

The year, month and day arguments are required. tzinfo may be None, or an instance of a tzinfo subclass. The remaining arguments may be ints.

flytekit.remote.backfill.timedelta

Difference between two datetime values.

timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)

All arguments are optional and default to 0. Arguments may be integers or floats, and may be positive or negative.