1.15.4.dev2+g3e3ce2426

flytekit.core.environment

Directory

Classes

Class Description
Any Special type indicating an unconstrained type.
Console A high level console interface.
Environment None.
Panel A console renderable that draws a border around its contents.
ParamSpec Parameter specification.
Pretty A rich renderable that pretty prints an object.
TypeVar Type variable.
partial partial(func, *args, **keywords) - new function with partial application.

flytekit.core.environment.Any

Special type indicating an unconstrained type.

  • Any is compatible with every type.
  • Any assumed to have all methods.
  • All values assumed to be instances of Any.

Note that all the above statements are true from the point of view of static type checkers. At runtime, Any should not be used with instance checks.

flytekit.core.environment.Console

A high level console interface.

def Console(
    color_system: typing.Optional[typing.Literal['auto', 'standard', '256', 'truecolor', 'windows']],
    force_terminal: typing.Optional[bool],
    force_jupyter: typing.Optional[bool],
    force_interactive: typing.Optional[bool],
    soft_wrap: bool,
    theme: typing.Optional[rich.theme.Theme],
    stderr: bool,
    file: typing.Optional[typing.IO[str]],
    quiet: bool,
    width: typing.Optional[int],
    height: typing.Optional[int],
    style: typing.Union[str, ForwardRef('Style'), NoneType],
    no_color: typing.Optional[bool],
    tab_size: int,
    record: bool,
    markup: bool,
    emoji: bool,
    emoji_variant: typing.Optional[typing.Literal['emoji', 'text']],
    highlight: bool,
    log_time: bool,
    log_path: bool,
    log_time_format: typing.Union[str, typing.Callable[[datetime.datetime], rich.text.Text]],
    highlighter: typing.Optional[ForwardRef('HighlighterType')],
    legacy_windows: typing.Optional[bool],
    safe_box: bool,
    get_datetime: typing.Optional[typing.Callable[[], datetime.datetime]],
    get_time: typing.Optional[typing.Callable[[], float]],
    _environ: typing.Optional[typing.Mapping[str, str]],
):
Parameter Type
color_system typing.Optional[typing.Literal['auto', 'standard', '256', 'truecolor', 'windows']]
force_terminal typing.Optional[bool]
force_jupyter typing.Optional[bool]
force_interactive typing.Optional[bool]
soft_wrap bool
theme typing.Optional[rich.theme.Theme]
stderr bool
file typing.Optional[typing.IO[str]]
quiet bool
width typing.Optional[int]
height typing.Optional[int]
style typing.Union[str, ForwardRef('Style'), NoneType]
no_color typing.Optional[bool]
tab_size int
record bool
markup bool
emoji bool
emoji_variant typing.Optional[typing.Literal['emoji', 'text']]
highlight bool
log_time bool
log_path bool
log_time_format typing.Union[str, typing.Callable[[datetime.datetime], rich.text.Text]]
highlighter typing.Optional[ForwardRef('HighlighterType')]
legacy_windows typing.Optional[bool]
safe_box bool
get_datetime typing.Optional[typing.Callable[[], datetime.datetime]]
get_time typing.Optional[typing.Callable[[], float]]
_environ typing.Optional[typing.Mapping[str, str]]

Methods

Method Description
begin_capture() Begin capturing console output
bell() Play a ‘bell’ sound (if supported by the terminal)
capture() A context manager to capture the result of print() or log() in a string,
clear() Clear the screen
clear_live() Clear the Live instance
control() Insert non-printing control codes
end_capture() End capture mode and return captured string
export_html() Generate HTML from console contents (requires record=True argument in constructor)
export_svg() Generate an SVG from the console contents (requires record=True in Console constructor)
export_text() Generate text from console contents (requires record=True argument in constructor)
get_style() Get a Style instance by its theme name or parse a definition
input() Displays a prompt and waits for input from the user
line() Write new line(s)
log() Log rich content to the terminal
measure() Measure a renderable
on_broken_pipe() This function is called when a BrokenPipeError is raised
out() Output to the terminal
pager() A context manager to display anything printed within a “pager”
pop_render_hook() Pop the last renderhook from the stack
pop_theme() Remove theme from top of stack, restoring previous theme
print() Print to the console
print_exception() Prints a rich render of the last exception and traceback
print_json() Pretty prints JSON
push_render_hook() Add a new render hook to the stack
push_theme() Push a new theme on to the top of the stack, replacing the styles from the previous theme
render() Render an object in to an iterable of Segment instances
render_lines() Render objects in to a list of lines
render_str() Convert a string to a Text instance
rule() Draw a line with optional centered title
save_html() Generate HTML from console contents and write to a file (requires record=True argument in constructor)
save_svg() Generate an SVG file from the console contents (requires record=True in Console constructor)
save_text() Generate text from console and save to a given location (requires record=True argument in constructor)
screen() Context manager to enable and disable ‘alternative screen’ mode
set_alt_screen() Enables alternative screen mode
set_live() Set Live instance
set_window_title() Set the title of the console terminal window
show_cursor() Show or hide the cursor
status() Display a status and spinner
update_screen() Update the screen at a given offset
update_screen_lines() Update lines of the screen at a given offset
use_theme() Use a different theme for the duration of the context manager

begin_capture()

def begin_capture()

Begin capturing console output. Call :meth:end_capture to exit capture mode and return output.

bell()

def bell()

Play a ‘bell’ sound (if supported by the terminal).

capture()

def capture()

A context manager to capture the result of print() or log() in a string, rather than writing it to the console.

Example:

from rich.console import Console console = Console() with console.capture() as capture: … console.print("[bold magenta]Hello World[/]") print(capture.get())

Returns: Capture: Context manager with disables writing to the terminal.

clear()

def clear(
    home: bool,
):

Clear the screen.

Parameter Type
home bool

clear_live()

def clear_live()

Clear the Live instance.

control()

def control(
    control: rich.control.Control,
):

Insert non-printing control codes.

Parameter Type
control rich.control.Control

end_capture()

def end_capture()

End capture mode and return captured string.

Returns: str: Console output.

export_html()

def export_html(
    theme: typing.Optional[rich.terminal_theme.TerminalTheme],
    clear: bool,
    code_format: typing.Optional[str],
    inline_styles: bool,
):

Generate HTML from console contents (requires record=True argument in constructor).

Parameter Type
theme typing.Optional[rich.terminal_theme.TerminalTheme]
clear bool
code_format typing.Optional[str]
inline_styles bool

export_svg()

def export_svg(
    title: str,
    theme: typing.Optional[rich.terminal_theme.TerminalTheme],
    clear: bool,
    code_format: str,
    font_aspect_ratio: float,
    unique_id: typing.Optional[str],
):

Generate an SVG from the console contents (requires record=True in Console constructor).

Parameter Type
title str
theme typing.Optional[rich.terminal_theme.TerminalTheme]
clear bool
code_format str
font_aspect_ratio float
unique_id typing.Optional[str]

export_text()

def export_text(
    clear: bool,
    styles: bool,
):

Generate text from console contents (requires record=True argument in constructor).

Parameter Type
clear bool
styles bool

get_style()

def get_style(
    name: typing.Union[str, rich.style.Style],
    default: typing.Union[rich.style.Style, str, NoneType],
):

Get a Style instance by its theme name or parse a definition.

Parameter Type
name typing.Union[str, rich.style.Style]
default typing.Union[rich.style.Style, str, NoneType]

input()

def input(
    prompt: typing.Union[str, ForwardRef('Text')],
    markup: bool,
    emoji: bool,
    password: bool,
    stream: typing.Optional[typing.TextIO],
):

Displays a prompt and waits for input from the user. The prompt may contain color / style.

It works in the same way as Python’s builtin :func:input function and provides elaborate line editing and history features if Python’s builtin :mod:readline module is previously loaded.

Parameter Type
prompt typing.Union[str, ForwardRef('Text')]
markup bool
emoji bool
password bool
stream typing.Optional[typing.TextIO]

line()

def line(
    count: int,
):

Write new line(s).

Parameter Type
count int

log()

def log(
    objects: typing.Any,
    sep: str,
    end: str,
    style: typing.Union[rich.style.Style, str, NoneType],
    justify: typing.Optional[typing.Literal['default', 'left', 'center', 'right', 'full']],
    emoji: typing.Optional[bool],
    markup: typing.Optional[bool],
    highlight: typing.Optional[bool],
    log_locals: bool,
    _stack_offset: int,
):

Log rich content to the terminal.

Parameter Type
objects typing.Any
sep str
end str
style typing.Union[rich.style.Style, str, NoneType]
justify typing.Optional[typing.Literal['default', 'left', 'center', 'right', 'full']]
emoji typing.Optional[bool]
markup typing.Optional[bool]
highlight typing.Optional[bool]
log_locals bool
_stack_offset int

measure()

def measure(
    renderable: typing.Union[rich.console.ConsoleRenderable, rich.console.RichCast, str],
    options: typing.Optional[rich.console.ConsoleOptions],
):

Measure a renderable. Returns a :class:~rich.measure.Measurement object which contains information regarding the number of characters required to print the renderable.

Parameter Type
renderable typing.Union[rich.console.ConsoleRenderable, rich.console.RichCast, str]
options typing.Optional[rich.console.ConsoleOptions]

on_broken_pipe()

def on_broken_pipe()

This function is called when a BrokenPipeError is raised.

This can occur when piping Textual output in Linux and macOS. The default implementation is to exit the app, but you could implement this method in a subclass to change the behavior.

See https://docs.python.org/3/library/signal.html#note-on-sigpipe for details.

out()

def out(
    objects: typing.Any,
    sep: str,
    end: str,
    style: typing.Union[rich.style.Style, str, NoneType],
    highlight: typing.Optional[bool],
):

Output to the terminal. This is a low-level way of writing to the terminal which unlike :meth:~rich.console.Console.print won’t pretty print, wrap text, or apply markup, but will optionally apply highlighting and a basic style.

Parameter Type
objects typing.Any
sep str
end str
style typing.Union[rich.style.Style, str, NoneType]
highlight typing.Optional[bool]

pager()

def pager(
    pager: typing.Optional[rich.pager.Pager],
    styles: bool,
    links: bool,
):

A context manager to display anything printed within a “pager”. The pager application is defined by the system and will typically support at least pressing a key to scroll.

Parameter Type
pager typing.Optional[rich.pager.Pager]
styles bool
links bool

pop_render_hook()

def pop_render_hook()

Pop the last renderhook from the stack.

pop_theme()

def pop_theme()

Remove theme from top of stack, restoring previous theme.

print()

def print(
    objects: typing.Any,
    sep: str,
    end: str,
    style: typing.Union[rich.style.Style, str, NoneType],
    justify: typing.Optional[typing.Literal['default', 'left', 'center', 'right', 'full']],
    overflow: typing.Optional[typing.Literal['fold', 'crop', 'ellipsis', 'ignore']],
    no_wrap: typing.Optional[bool],
    emoji: typing.Optional[bool],
    markup: typing.Optional[bool],
    highlight: typing.Optional[bool],
    width: typing.Optional[int],
    height: typing.Optional[int],
    crop: bool,
    soft_wrap: typing.Optional[bool],
    new_line_start: bool,
):

Print to the console.

Parameter Type
objects typing.Any
sep str
end str
style typing.Union[rich.style.Style, str, NoneType]
justify typing.Optional[typing.Literal['default', 'left', 'center', 'right', 'full']]
overflow typing.Optional[typing.Literal['fold', 'crop', 'ellipsis', 'ignore']]
no_wrap typing.Optional[bool]
emoji typing.Optional[bool]
markup typing.Optional[bool]
highlight typing.Optional[bool]
width typing.Optional[int]
height typing.Optional[int]
crop bool
soft_wrap typing.Optional[bool]
new_line_start bool
def print_exception(
    width: typing.Optional[int],
    extra_lines: int,
    theme: typing.Optional[str],
    word_wrap: bool,
    show_locals: bool,
    suppress: typing.Iterable[typing.Union[str, module]],
    max_frames: int,
):

Prints a rich render of the last exception and traceback.

Parameter Type
width typing.Optional[int]
extra_lines int
theme typing.Optional[str]
word_wrap bool
show_locals bool
suppress typing.Iterable[typing.Union[str, module]]
max_frames int
def print_json(
    json: typing.Optional[str],
    data: typing.Any,
    indent: typing.Union[NoneType, int, str],
    highlight: bool,
    skip_keys: bool,
    ensure_ascii: bool,
    check_circular: bool,
    allow_nan: bool,
    default: typing.Optional[typing.Callable[[typing.Any], typing.Any]],
    sort_keys: bool,
):

Pretty prints JSON. Output will be valid JSON.

Parameter Type
json typing.Optional[str]
data typing.Any
indent typing.Union[NoneType, int, str]
highlight bool
skip_keys bool
ensure_ascii bool
check_circular bool
allow_nan bool
default typing.Optional[typing.Callable[[typing.Any], typing.Any]]
sort_keys bool

push_render_hook()

def push_render_hook(
    hook: rich.console.RenderHook,
):

Add a new render hook to the stack.

Parameter Type
hook rich.console.RenderHook

push_theme()

def push_theme(
    theme: rich.theme.Theme,
    inherit: bool,
):

Push a new theme on to the top of the stack, replacing the styles from the previous theme. Generally speaking, you should call :meth:~rich.console.Console.use_theme to get a context manager, rather than calling this method directly.

Parameter Type
theme rich.theme.Theme
inherit bool

render()

def render(
    renderable: typing.Union[rich.console.ConsoleRenderable, rich.console.RichCast, str],
    options: typing.Optional[rich.console.ConsoleOptions],
):

Render an object in to an iterable of Segment instances.

This method contains the logic for rendering objects with the console protocol. You are unlikely to need to use it directly, unless you are extending the library.

Parameter Type
renderable typing.Union[rich.console.ConsoleRenderable, rich.console.RichCast, str]
options typing.Optional[rich.console.ConsoleOptions]

render_lines()

def render_lines(
    renderable: typing.Union[rich.console.ConsoleRenderable, rich.console.RichCast, str],
    options: typing.Optional[rich.console.ConsoleOptions],
    style: typing.Optional[rich.style.Style],
    pad: bool,
    new_lines: bool,
):

Render objects in to a list of lines.

The output of render_lines is useful when further formatting of rendered console text is required, such as the Panel class which draws a border around any renderable object.

Parameter Type
renderable typing.Union[rich.console.ConsoleRenderable, rich.console.RichCast, str]
options typing.Optional[rich.console.ConsoleOptions]
style typing.Optional[rich.style.Style]
pad bool
new_lines bool

render_str()

def render_str(
    text: str,
    style: typing.Union[str, rich.style.Style],
    justify: typing.Optional[typing.Literal['default', 'left', 'center', 'right', 'full']],
    overflow: typing.Optional[typing.Literal['fold', 'crop', 'ellipsis', 'ignore']],
    emoji: typing.Optional[bool],
    markup: typing.Optional[bool],
    highlight: typing.Optional[bool],
    highlighter: typing.Optional[typing.Callable[[typing.Union[str, ForwardRef('Text')]], ForwardRef('Text')]],
):

Convert a string to a Text instance. This is called automatically if you print or log a string.

Parameter Type
text str
style typing.Union[str, rich.style.Style]
justify typing.Optional[typing.Literal['default', 'left', 'center', 'right', 'full']]
overflow typing.Optional[typing.Literal['fold', 'crop', 'ellipsis', 'ignore']]
emoji typing.Optional[bool]
markup typing.Optional[bool]
highlight typing.Optional[bool]
highlighter typing.Optional[typing.Callable[[typing.Union[str, ForwardRef('Text')]], ForwardRef('Text')]]

rule()

def rule(
    title: typing.Union[str, ForwardRef('Text')],
    characters: str,
    style: typing.Union[str, rich.style.Style],
    align: typing.Literal['left', 'center', 'right'],
):

Draw a line with optional centered title.

Parameter Type
title typing.Union[str, ForwardRef('Text')]
characters str
style typing.Union[str, rich.style.Style]
align typing.Literal['left', 'center', 'right']

save_html()

def save_html(
    path: str,
    theme: typing.Optional[rich.terminal_theme.TerminalTheme],
    clear: bool,
    code_format: str,
    inline_styles: bool,
):

Generate HTML from console contents and write to a file (requires record=True argument in constructor).

Parameter Type
path str
theme typing.Optional[rich.terminal_theme.TerminalTheme]
clear bool
code_format str
inline_styles bool

save_svg()

def save_svg(
    path: str,
    title: str,
    theme: typing.Optional[rich.terminal_theme.TerminalTheme],
    clear: bool,
    code_format: str,
    font_aspect_ratio: float,
    unique_id: typing.Optional[str],
):

Generate an SVG file from the console contents (requires record=True in Console constructor).

Parameter Type
path str
title str
theme typing.Optional[rich.terminal_theme.TerminalTheme]
clear bool
code_format str
font_aspect_ratio float
unique_id typing.Optional[str]

save_text()

def save_text(
    path: str,
    clear: bool,
    styles: bool,
):

Generate text from console and save to a given location (requires record=True argument in constructor).

Parameter Type
path str
clear bool
styles bool

screen()

def screen(
    hide_cursor: bool,
    style: typing.Union[str, ForwardRef('Style'), NoneType],
):

Context manager to enable and disable ‘alternative screen’ mode.

Parameter Type
hide_cursor bool
style typing.Union[str, ForwardRef('Style'), NoneType]

set_alt_screen()

def set_alt_screen(
    enable: bool,
):

Enables alternative screen mode.

Note, if you enable this mode, you should ensure that is disabled before the application exits. See :meth:~rich.Console.screen for a context manager that handles this for you.

Parameter Type
enable bool

set_live()

def set_live(
    live: Live,
):

Set Live instance. Used by Live context manager.

Parameter Type
live Live

set_window_title()

def set_window_title(
    title: str,
):

Set the title of the console terminal window.

Warning: There is no means within Rich of “resetting” the window title to its previous value, meaning the title you set will persist even after your application exits.

fish shell resets the window title before and after each command by default, negating this issue. Windows Terminal and command prompt will also reset the title for you. Most other shells and terminals, however, do not do this.

Some terminals may require configuration changes before you can set the title. Some terminals may not support setting the title at all.

Other software (including the terminal itself, the shell, custom prompts, plugins, etc.) may also set the terminal window title. This could result in whatever value you write using this method being overwritten.

Parameter Type
title str

show_cursor()

def show_cursor(
    show: bool,
):

Show or hide the cursor.

Parameter Type
show bool

status()

def status(
    status: typing.Union[rich.console.ConsoleRenderable, rich.console.RichCast, str],
    spinner: str,
    spinner_style: typing.Union[str, ForwardRef('Style')],
    speed: float,
    refresh_per_second: float,
):

Display a status and spinner.

Parameter Type
status typing.Union[rich.console.ConsoleRenderable, rich.console.RichCast, str]
spinner str
spinner_style typing.Union[str, ForwardRef('Style')]
speed float
refresh_per_second float

update_screen()

def update_screen(
    renderable: typing.Union[rich.console.ConsoleRenderable, rich.console.RichCast, str],
    region: typing.Optional[rich.region.Region],
    options: typing.Optional[rich.console.ConsoleOptions],
):

Update the screen at a given offset.

Parameter Type
renderable typing.Union[rich.console.ConsoleRenderable, rich.console.RichCast, str]
region typing.Optional[rich.region.Region]
options typing.Optional[rich.console.ConsoleOptions]

update_screen_lines()

def update_screen_lines(
    lines: typing.List[typing.List[rich.segment.Segment]],
    x: int,
    y: int,
):

Update lines of the screen at a given offset.

Parameter Type
lines typing.List[typing.List[rich.segment.Segment]]
x int
y int

use_theme()

def use_theme(
    theme: rich.theme.Theme,
    inherit: bool,
):

Use a different theme for the duration of the context manager.

Parameter Type
theme rich.theme.Theme
inherit bool

Properties

Property Type Description
color_system
encoding
file
height
is_alt_screen
is_dumb_terminal
is_terminal
options
size
width

flytekit.core.environment.Environment

def Environment(
    _task_function: Optional[Callable[P, FuncOut]],
    task_config: Optional[T],
    cache: Union[bool, Cache],
    retries: int,
    interruptible: Optional[bool],
    deprecated: str,
    timeout: Union[datetime.timedelta, int],
    container_image: Optional[Union[str, ImageSpec]],
    environment: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    secret_requests: Optional[List[Secret]],
    execution_mode: PythonFunctionTask.ExecutionBehavior,
    node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
    task_resolver: Optional[TaskResolverMixin],
    docs: Optional[Documentation],
    disable_deck: Optional[bool],
    enable_deck: Optional[bool],
    deck_fields: Optional[Tuple[DeckField, ...]],
    pod_template: Optional['PodTemplate'],
    pod_template_name: Optional[str],
    accelerator: Optional[BaseAccelerator],
    pickle_untyped: bool,
    shared_memory: Optional[Union[L[True], str]],
    resources: Optional[Resources],
    kwargs,
):

This is the core decorator to use for any task type in flytekit.

Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties

  • Versioned (usually tied to the git revision SHA1)
  • Strong interfaces (specified inputs and outputs)
  • Declarative
  • Independently executable
  • Unit testable

For a simple python task,

.. code-block:: python

@task def my_task(x: int, y: typing.Dict[str, str]) -> str: …

For specific task types

.. code-block:: python

@task(task_config=Spark(), retries=3) def my_task(x: int, y: typing.Dict[str, str]) -> str: …

Please see some cookbook :std:ref:task examples <cookbook:tasks> for additional information.

Parameter Type
_task_function Optional[Callable[P, FuncOut]]
task_config Optional[T]
cache Union[bool, Cache]
retries int
interruptible Optional[bool]
deprecated str
timeout Union[datetime.timedelta, int]
container_image Optional[Union[str, ImageSpec]]
environment Optional[Dict[str, str]]
requests Optional[Resources]
limits Optional[Resources]
secret_requests Optional[List[Secret]]
execution_mode PythonFunctionTask.ExecutionBehavior
node_dependency_hints Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]]
task_resolver Optional[TaskResolverMixin]
docs Optional[Documentation]
disable_deck Optional[bool]
enable_deck Optional[bool]
deck_fields Optional[Tuple[DeckField, ...]]
pod_template Optional['PodTemplate']
pod_template_name Optional[str]
accelerator Optional[BaseAccelerator]
pickle_untyped bool
shared_memory Optional[Union[L[True], str]]
resources Optional[Resources]
kwargs **kwargs

Methods

Method Description
dynamic() Please first see the comments for :py:func:`flytekit
extend() This is the core decorator to use for any task type in flytekit
show() None
task() This is the core decorator to use for any task type in flytekit
update() This is the core decorator to use for any task type in flytekit

dynamic()

def dynamic(
    _task_function: Optional[Callable[P, FuncOut]],
    task_config: Optional[T],
    cache: Union[bool, Cache],
    retries: int,
    interruptible: Optional[bool],
    deprecated: str,
    timeout: Union[datetime.timedelta, int],
    container_image: Optional[Union[str, ImageSpec]],
    environment: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    secret_requests: Optional[List[Secret]],
    execution_mode: PythonFunctionTask.ExecutionBehavior,
    node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
    task_resolver: Optional[TaskResolverMixin],
    docs: Optional[Documentation],
    disable_deck: Optional[bool],
    enable_deck: Optional[bool],
    deck_fields: Optional[Tuple[DeckField, ...]],
    pod_template: Optional['PodTemplate'],
    pod_template_name: Optional[str],
    accelerator: Optional[BaseAccelerator],
    pickle_untyped: bool,
    shared_memory: Optional[Union[L[True], str]],
    resources: Optional[Resources],
    kwargs,
):

Please first see the comments for :py:func:flytekit.task and :py:func:flytekit.workflow. This dynamic concept is an amalgamation of both and enables the user to pursue some :std:ref:pretty incredible <cookbook:advanced_merge_sort> constructs.

In short, a task’s function is run at execution time only, and a workflow function is run at compilation time only (local execution notwithstanding). A dynamic workflow is modeled on the backend as a task, but at execution time, the function body is run to produce a workflow. It is almost as if the decorator changed from @task to @workflow except workflows cannot make use of their inputs like native Python values whereas dynamic workflows can. The resulting workflow is passed back to the Flyte engine and is run as a :std:ref:subworkflow <cookbook:subworkflows>. Simple usage

.. code-block::

@dynamic def my_dynamic_subwf(a: int) -> (typing.List[str], int): s = [] for i in range(a): s.append(t1(a=i)) return s, 5

Note in the code block that we call the Python range operator on the input. This is typically not allowed in a workflow but it is here. You can even express dependencies between tasks.

.. code-block::

@dynamic def my_dynamic_subwf(a: int, b: int) -> int: x = t1(a=a) return t2(b=b, x=x)

See the :std:ref:cookbook <cookbook:subworkflows> for a longer discussion.

Parameter Type
_task_function Optional[Callable[P, FuncOut]]
task_config Optional[T]
cache Union[bool, Cache]
retries int
interruptible Optional[bool]
deprecated str
timeout Union[datetime.timedelta, int]
container_image Optional[Union[str, ImageSpec]]
environment Optional[Dict[str, str]]
requests Optional[Resources]
limits Optional[Resources]
secret_requests Optional[List[Secret]]
execution_mode PythonFunctionTask.ExecutionBehavior
node_dependency_hints Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]]
task_resolver Optional[TaskResolverMixin]
docs Optional[Documentation]
disable_deck Optional[bool]
enable_deck Optional[bool]
deck_fields Optional[Tuple[DeckField, ...]]
pod_template Optional['PodTemplate']
pod_template_name Optional[str]
accelerator Optional[BaseAccelerator]
pickle_untyped bool
shared_memory Optional[Union[L[True], str]]
resources Optional[Resources]
kwargs **kwargs

extend()

def extend(
    _task_function: Optional[Callable[P, FuncOut]],
    task_config: Optional[T],
    cache: Union[bool, Cache],
    retries: int,
    interruptible: Optional[bool],
    deprecated: str,
    timeout: Union[datetime.timedelta, int],
    container_image: Optional[Union[str, ImageSpec]],
    environment: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    secret_requests: Optional[List[Secret]],
    execution_mode: PythonFunctionTask.ExecutionBehavior,
    node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
    task_resolver: Optional[TaskResolverMixin],
    docs: Optional[Documentation],
    disable_deck: Optional[bool],
    enable_deck: Optional[bool],
    deck_fields: Optional[Tuple[DeckField, ...]],
    pod_template: Optional['PodTemplate'],
    pod_template_name: Optional[str],
    accelerator: Optional[BaseAccelerator],
    pickle_untyped: bool,
    shared_memory: Optional[Union[L[True], str]],
    resources: Optional[Resources],
    kwargs,
):

This is the core decorator to use for any task type in flytekit.

Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties

  • Versioned (usually tied to the git revision SHA1)
  • Strong interfaces (specified inputs and outputs)
  • Declarative
  • Independently executable
  • Unit testable

For a simple python task,

.. code-block:: python

@task def my_task(x: int, y: typing.Dict[str, str]) -> str: …

For specific task types

.. code-block:: python

@task(task_config=Spark(), retries=3) def my_task(x: int, y: typing.Dict[str, str]) -> str: …

Please see some cookbook :std:ref:task examples <cookbook:tasks> for additional information.

Parameter Type
_task_function Optional[Callable[P, FuncOut]]
task_config Optional[T]
cache Union[bool, Cache]
retries int
interruptible Optional[bool]
deprecated str
timeout Union[datetime.timedelta, int]
container_image Optional[Union[str, ImageSpec]]
environment Optional[Dict[str, str]]
requests Optional[Resources]
limits Optional[Resources]
secret_requests Optional[List[Secret]]
execution_mode PythonFunctionTask.ExecutionBehavior
node_dependency_hints Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]]
task_resolver Optional[TaskResolverMixin]
docs Optional[Documentation]
disable_deck Optional[bool]
enable_deck Optional[bool]
deck_fields Optional[Tuple[DeckField, ...]]
pod_template Optional['PodTemplate']
pod_template_name Optional[str]
accelerator Optional[BaseAccelerator]
pickle_untyped bool
shared_memory Optional[Union[L[True], str]]
resources Optional[Resources]
kwargs **kwargs

show()

def show()

task()

def task(
    _task_function: Optional[Callable[P, FuncOut]],
    task_config: Optional[T],
    cache: Union[bool, Cache],
    retries: int,
    interruptible: Optional[bool],
    deprecated: str,
    timeout: Union[datetime.timedelta, int],
    container_image: Optional[Union[str, ImageSpec]],
    environment: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    secret_requests: Optional[List[Secret]],
    execution_mode: PythonFunctionTask.ExecutionBehavior,
    node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
    task_resolver: Optional[TaskResolverMixin],
    docs: Optional[Documentation],
    disable_deck: Optional[bool],
    enable_deck: Optional[bool],
    deck_fields: Optional[Tuple[DeckField, ...]],
    pod_template: Optional['PodTemplate'],
    pod_template_name: Optional[str],
    accelerator: Optional[BaseAccelerator],
    pickle_untyped: bool,
    shared_memory: Optional[Union[L[True], str]],
    resources: Optional[Resources],
    kwargs,
):

This is the core decorator to use for any task type in flytekit.

Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties

  • Versioned (usually tied to the git revision SHA1)
  • Strong interfaces (specified inputs and outputs)
  • Declarative
  • Independently executable
  • Unit testable

For a simple python task,

.. code-block:: python

@task def my_task(x: int, y: typing.Dict[str, str]) -> str: …

For specific task types

.. code-block:: python

@task(task_config=Spark(), retries=3) def my_task(x: int, y: typing.Dict[str, str]) -> str: …

Please see some cookbook :std:ref:task examples <cookbook:tasks> for additional information.

Parameter Type
_task_function Optional[Callable[P, FuncOut]]
task_config Optional[T]
cache Union[bool, Cache]
retries int
interruptible Optional[bool]
deprecated str
timeout Union[datetime.timedelta, int]
container_image Optional[Union[str, ImageSpec]]
environment Optional[Dict[str, str]]
requests Optional[Resources]
limits Optional[Resources]
secret_requests Optional[List[Secret]]
execution_mode PythonFunctionTask.ExecutionBehavior
node_dependency_hints Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]]
task_resolver Optional[TaskResolverMixin]
docs Optional[Documentation]
disable_deck Optional[bool]
enable_deck Optional[bool]
deck_fields Optional[Tuple[DeckField, ...]]
pod_template Optional['PodTemplate']
pod_template_name Optional[str]
accelerator Optional[BaseAccelerator]
pickle_untyped bool
shared_memory Optional[Union[L[True], str]]
resources Optional[Resources]
kwargs **kwargs

update()

def update(
    _task_function: Optional[Callable[P, FuncOut]],
    task_config: Optional[T],
    cache: Union[bool, Cache],
    retries: int,
    interruptible: Optional[bool],
    deprecated: str,
    timeout: Union[datetime.timedelta, int],
    container_image: Optional[Union[str, ImageSpec]],
    environment: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    secret_requests: Optional[List[Secret]],
    execution_mode: PythonFunctionTask.ExecutionBehavior,
    node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
    task_resolver: Optional[TaskResolverMixin],
    docs: Optional[Documentation],
    disable_deck: Optional[bool],
    enable_deck: Optional[bool],
    deck_fields: Optional[Tuple[DeckField, ...]],
    pod_template: Optional['PodTemplate'],
    pod_template_name: Optional[str],
    accelerator: Optional[BaseAccelerator],
    pickle_untyped: bool,
    shared_memory: Optional[Union[L[True], str]],
    resources: Optional[Resources],
    kwargs,
):

This is the core decorator to use for any task type in flytekit.

Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties

  • Versioned (usually tied to the git revision SHA1)
  • Strong interfaces (specified inputs and outputs)
  • Declarative
  • Independently executable
  • Unit testable

For a simple python task,

.. code-block:: python

@task def my_task(x: int, y: typing.Dict[str, str]) -> str: …

For specific task types

.. code-block:: python

@task(task_config=Spark(), retries=3) def my_task(x: int, y: typing.Dict[str, str]) -> str: …

Please see some cookbook :std:ref:task examples <cookbook:tasks> for additional information.

Parameter Type
_task_function Optional[Callable[P, FuncOut]]
task_config Optional[T]
cache Union[bool, Cache]
retries int
interruptible Optional[bool]
deprecated str
timeout Union[datetime.timedelta, int]
container_image Optional[Union[str, ImageSpec]]
environment Optional[Dict[str, str]]
requests Optional[Resources]
limits Optional[Resources]
secret_requests Optional[List[Secret]]
execution_mode PythonFunctionTask.ExecutionBehavior
node_dependency_hints Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]]
task_resolver Optional[TaskResolverMixin]
docs Optional[Documentation]
disable_deck Optional[bool]
enable_deck Optional[bool]
deck_fields Optional[Tuple[DeckField, ...]]
pod_template Optional['PodTemplate']
pod_template_name Optional[str]
accelerator Optional[BaseAccelerator]
pickle_untyped bool
shared_memory Optional[Union[L[True], str]]
resources Optional[Resources]
kwargs **kwargs

flytekit.core.environment.Panel

A console renderable that draws a border around its contents.

Example:

console.print(Panel(“Hello, World!”))

def Panel(
    renderable: RenderableType,
    box: rich.box.Box,
    title: typing.Union[str, ForwardRef('Text'), NoneType],
    title_align: typing.Literal['left', 'center', 'right'],
    subtitle: typing.Union[str, ForwardRef('Text'), NoneType],
    subtitle_align: typing.Literal['left', 'center', 'right'],
    safe_box: typing.Optional[bool],
    expand: bool,
    style: typing.Union[str, ForwardRef('Style')],
    border_style: typing.Union[str, ForwardRef('Style')],
    width: typing.Optional[int],
    height: typing.Optional[int],
    padding: typing.Union[int, typing.Tuple[int], typing.Tuple[int, int], typing.Tuple[int, int, int, int]],
    highlight: bool,
):
Parameter Type
renderable RenderableType
box rich.box.Box
title typing.Union[str, ForwardRef('Text'), NoneType]
title_align typing.Literal['left', 'center', 'right']
subtitle typing.Union[str, ForwardRef('Text'), NoneType]
subtitle_align typing.Literal['left', 'center', 'right']
safe_box typing.Optional[bool]
expand bool
style typing.Union[str, ForwardRef('Style')]
border_style typing.Union[str, ForwardRef('Style')]
width typing.Optional[int]
height typing.Optional[int]
padding typing.Union[int, typing.Tuple[int], typing.Tuple[int, int], typing.Tuple[int, int, int, int]]
highlight bool

Methods

Method Description
fit() An alternative constructor that sets expand=False

fit()

def fit(
    renderable: RenderableType,
    box: rich.box.Box,
    title: typing.Union[str, ForwardRef('Text'), NoneType],
    title_align: typing.Literal['left', 'center', 'right'],
    subtitle: typing.Union[str, ForwardRef('Text'), NoneType],
    subtitle_align: typing.Literal['left', 'center', 'right'],
    safe_box: typing.Optional[bool],
    style: typing.Union[str, ForwardRef('Style')],
    border_style: typing.Union[str, ForwardRef('Style')],
    width: typing.Optional[int],
    height: typing.Optional[int],
    padding: typing.Union[int, typing.Tuple[int], typing.Tuple[int, int], typing.Tuple[int, int, int, int]],
    highlight: bool,
):

An alternative constructor that sets expand=False.

Parameter Type
renderable RenderableType
box rich.box.Box
title typing.Union[str, ForwardRef('Text'), NoneType]
title_align typing.Literal['left', 'center', 'right']
subtitle typing.Union[str, ForwardRef('Text'), NoneType]
subtitle_align typing.Literal['left', 'center', 'right']
safe_box typing.Optional[bool]
style typing.Union[str, ForwardRef('Style')]
border_style typing.Union[str, ForwardRef('Style')]
width typing.Optional[int]
height typing.Optional[int]
padding typing.Union[int, typing.Tuple[int], typing.Tuple[int, int], typing.Tuple[int, int, int, int]]
highlight bool

flytekit.core.environment.ParamSpec

Parameter specification.

flytekit.core.environment.Pretty

A rich renderable that pretty prints an object.

def Pretty(
    _object: typing.Any,
    highlighter: typing.Optional[ForwardRef('HighlighterType')],
    indent_size: int,
    justify: typing.Optional[ForwardRef('JustifyMethod')],
    overflow: typing.Optional[ForwardRef('OverflowMethod')],
    no_wrap: typing.Optional[bool],
    indent_guides: bool,
    max_length: typing.Optional[int],
    max_string: typing.Optional[int],
    max_depth: typing.Optional[int],
    expand_all: bool,
    margin: int,
    insert_line: bool,
):
Parameter Type
_object typing.Any
highlighter typing.Optional[ForwardRef('HighlighterType')]
indent_size int
justify typing.Optional[ForwardRef('JustifyMethod')]
overflow typing.Optional[ForwardRef('OverflowMethod')]
no_wrap typing.Optional[bool]
indent_guides bool
max_length typing.Optional[int]
max_string typing.Optional[int]
max_depth typing.Optional[int]
expand_all bool
margin int
insert_line bool

flytekit.core.environment.TypeVar

Type variable.

The preferred way to construct a type variable is via the dedicated syntax for generic functions, classes, and type aliases::

class Sequence[T]: # T is a TypeVar …

This syntax can also be used to create bound and constrained type variables::

S is a TypeVar bound to str

class StrSequence[S: str]: …

A is a TypeVar constrained to str or bytes

class StrOrBytesSequence[A: (str, bytes)]: …

However, if desired, reusable type variables can also be constructed manually, like so::

T = TypeVar(‘T’) # Can be anything S = TypeVar(‘S’, bound=str) # Can be any subtype of str A = TypeVar(‘A’, str, bytes) # Must be exactly str or bytes

Type variables exist primarily for the benefit of static type checkers. They serve as the parameters for generic types as well as for generic function and type alias definitions.

The variance of type variables is inferred by type checkers when they are created through the type parameter syntax and when infer_variance=True is passed. Manually created type variables may be explicitly marked covariant or contravariant by passing covariant=True or contravariant=True. By default, manually created type variables are invariant. See PEP 484 and PEP 695 for more details.

flytekit.core.environment.partial

partial(func, *args, **keywords) - new function with partial application of the given arguments and keywords.