flytekit.interaction.rich_utils
Directory
Classes
Class | Description |
---|---|
Callback |
Base class and interface for callback mechanism. |
Progress |
Renders an auto-updating progress bar(s). |
RichCallback |
Base class and interface for callback mechanism. |
flytekit.interaction.rich_utils.Callback
Base class and interface for callback mechanism
This class can be used directly for monitoring file transfers by
providing callback=Callback(hooks=...)
(see the hooks
argument,
below), or subclassed for more specialised behaviour.
Parameters
size: int (optional)
Nominal quantity for the value that corresponds to a complete
transfer, e.g., total number of tiles or total number of
bytes
value: int (0)
Starting internal counter value
hooks: dict or None
A dict of named functions to be called on each update. The signature
of these must be f(size, value, **kwargs)
def Callback(
size,
value,
hooks,
kwargs,
):
Parameter | Type |
---|---|
size |
|
value |
|
hooks |
|
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
absolute_update() |
Set the internal value state |
as_callback() |
Transform callback= |
branch() |
Set callbacks for child transfers |
branch_coro() |
Wraps a coroutine, and pass a new child callback to it |
branched() |
Return callback for child transfers |
call() |
Execute hook(s) with current state |
close() |
Close callback |
no_op() |
None |
relative_update() |
Delta increment the internal counter |
set_size() |
Set the internal maximum size attribute |
wrap() |
Wrap an iterable to call relative_update on each iterations |
absolute_update()
def absolute_update(
value,
):
Set the internal value state
Triggers call()
Parameters
value: int
Parameter | Type |
---|---|
value |
as_callback()
def as_callback(
maybe_callback,
):
Transform callback=… into Callback instance
For the special value of None
, return the global instance of
NoOpCallback
. This is an alternative to including
callback=DEFAULT_CALLBACK
directly in a method signature.
Parameter | Type |
---|---|
maybe_callback |
branch()
def branch(
path_1,
path_2,
kwargs,
):
Set callbacks for child transfers
If this callback is operating at a higher level, e.g., put, which may
trigger transfers that can also be monitored. The passed kwargs are
to be mutated to add callback=
, if this class supports branching
to children.
Parameters
path_1: str Child’s source path path_2: str Child’s destination path kwargs: dict arguments passed to child method, e.g., put_file.
Returns
Parameter | Type |
---|---|
path_1 |
|
path_2 |
|
kwargs |
**kwargs |
branch_coro()
def branch_coro(
fn,
):
Wraps a coroutine, and pass a new child callback to it.
Parameter | Type |
---|---|
fn |
branched()
def branched(
path_1,
path_2,
kwargs,
):
Return callback for child transfers
If this callback is operating at a higher level, e.g., put, which may
trigger transfers that can also be monitored. The function returns a callback
that has to be passed to the child method, e.g., put_file,
as callback=
argument.
The implementation uses callback.branch
for compatibility.
When implementing callbacks, it is recommended to override this function instead
of branch
and avoid calling super().branched(...)
.
Prefer using this function over branch
.
Parameters
path_1: str Child’s source path path_2: str Child’s destination path **kwargs: Arbitrary keyword arguments
Returns
callback: Callback A callback instance to be passed to the child method
Parameter | Type |
---|---|
path_1 |
|
path_2 |
|
kwargs |
**kwargs |
call()
def call(
hook_name,
kwargs,
):
Execute hook(s) with current state
Each function is passed the internal size and current value
Parameters
hook_name: str or None If given, execute on this hook kwargs: passed on to (all) hook(s)
Parameter | Type |
---|---|
hook_name |
|
kwargs |
**kwargs |
close()
def close()
Close callback.
no_op()
def no_op(
_,
__,
):
Parameter | Type |
---|---|
_ |
|
__ |
relative_update()
def relative_update(
inc,
):
Delta increment the internal counter
Triggers call()
Parameters
inc: int
Parameter | Type |
---|---|
inc |
set_size()
def set_size(
size,
):
Set the internal maximum size attribute
Usually called if not initially set at instantiation. Note that this
triggers a call()
.
Parameters
size: int
Parameter | Type |
---|---|
size |
wrap()
def wrap(
iterable,
):
Wrap an iterable to call relative_update
on each iterations
Parameters
iterable: Iterable The iterable that is being wrapped
Parameter | Type |
---|---|
iterable |
flytekit.interaction.rich_utils.Progress
Renders an auto-updating progress bar(s).
def Progress(
columns: typing.Union[str, rich.progress.ProgressColumn],
console: typing.Optional[rich.console.Console],
auto_refresh: bool,
refresh_per_second: float,
speed_estimate_period: float,
transient: bool,
redirect_stdout: bool,
redirect_stderr: bool,
get_time: typing.Optional[typing.Callable[[], float]],
disable: bool,
expand: bool,
):
Parameter | Type |
---|---|
columns |
typing.Union[str, rich.progress.ProgressColumn] |
console |
typing.Optional[rich.console.Console] |
auto_refresh |
bool |
refresh_per_second |
float |
speed_estimate_period |
float |
transient |
bool |
redirect_stdout |
bool |
redirect_stderr |
bool |
get_time |
typing.Optional[typing.Callable[[], float]] |
disable |
bool |
expand |
bool |
Methods
Method | Description |
---|---|
add_task() |
Add a new ’task’ to the Progress display |
advance() |
Advance task by a number of steps |
get_default_columns() |
Get the default columns used for a new Progress instance: |
get_renderable() |
Get a renderable for the progress display |
get_renderables() |
Get a number of renderables for the progress display |
make_tasks_table() |
Get a table to render the Progress display |
open() |
Track progress while reading from a binary file |
refresh() |
Refresh (render) the progress information |
remove_task() |
Delete a task if it exists |
reset() |
Reset a task so completed is 0 and the clock is reset |
start() |
Start the progress display |
start_task() |
Start a task |
stop() |
Stop the progress display |
stop_task() |
Stop a task |
track() |
Track progress by iterating over a sequence |
update() |
Update information associated with a task |
wrap_file() |
Track progress file reading from a binary file |
add_task()
def add_task(
description: str,
start: bool,
total: typing.Optional[float],
completed: int,
visible: bool,
fields: typing.Any,
):
Add a new ’task’ to the Progress display.
Parameter | Type |
---|---|
description |
str |
start |
bool |
total |
typing.Optional[float] |
completed |
int |
visible |
bool |
fields |
typing.Any |
advance()
def advance(
task_id: rich.progress.TaskID,
advance: float,
):
Advance task by a number of steps.
Parameter | Type |
---|---|
task_id |
rich.progress.TaskID |
advance |
float |
get_default_columns()
def get_default_columns()
Get the default columns used for a new Progress instance:
- a text column for the description (TextColumn)
- the bar itself (BarColumn)
- a text column showing completion percentage (TextColumn)
- an estimated-time-remaining column (TimeRemainingColumn) If the Progress instance is created without passing a columns argument, the default columns defined here will be used.
You can also create a Progress instance using custom columns before and/or after the defaults, as in this example:
progress = Progress( SpinnerColumn(), *Progress.get_default_columns(), “Elapsed:”, TimeElapsedColumn(), )
This code shows the creation of a Progress display, containing a spinner to the left, the default columns, and a labeled elapsed time column.
get_renderable()
def get_renderable()
Get a renderable for the progress display.
get_renderables()
def get_renderables()
Get a number of renderables for the progress display.
make_tasks_table()
def make_tasks_table(
tasks: typing.Iterable[rich.progress.Task],
):
Get a table to render the Progress display.
Parameter | Type |
---|---|
tasks |
typing.Iterable[rich.progress.Task] |
open()
def open(
file: typing.Union[str, ForwardRef('PathLike[str]'), bytes],
mode: typing.Union[typing.Literal['rb'], typing.Literal['rt'], typing.Literal['r']],
buffering: int,
encoding: typing.Optional[str],
errors: typing.Optional[str],
newline: typing.Optional[str],
total: typing.Optional[int],
task_id: typing.Optional[rich.progress.TaskID],
description: str,
):
Track progress while reading from a binary file.
Parameter | Type |
---|---|
file |
typing.Union[str, ForwardRef('PathLike[str]'), bytes] |
mode |
typing.Union[typing.Literal['rb'], typing.Literal['rt'], typing.Literal['r']] |
buffering |
int |
encoding |
typing.Optional[str] |
errors |
typing.Optional[str] |
newline |
typing.Optional[str] |
total |
typing.Optional[int] |
task_id |
typing.Optional[rich.progress.TaskID] |
description |
str |
refresh()
def refresh()
Refresh (render) the progress information.
remove_task()
def remove_task(
task_id: rich.progress.TaskID,
):
Delete a task if it exists.
Parameter | Type |
---|---|
task_id |
rich.progress.TaskID |
reset()
def reset(
task_id: rich.progress.TaskID,
start: bool,
total: typing.Optional[float],
completed: int,
visible: typing.Optional[bool],
description: typing.Optional[str],
fields: typing.Any,
):
Reset a task so completed is 0 and the clock is reset.
Parameter | Type |
---|---|
task_id |
rich.progress.TaskID |
start |
bool |
total |
typing.Optional[float] |
completed |
int |
visible |
typing.Optional[bool] |
description |
typing.Optional[str] |
fields |
typing.Any |
start()
def start()
Start the progress display.
start_task()
def start_task(
task_id: rich.progress.TaskID,
):
Start a task.
Starts a task (used when calculating elapsed time). You may need to call this manually,
if you called add_task
with start=False
.
Parameter | Type |
---|---|
task_id |
rich.progress.TaskID |
stop()
def stop()
Stop the progress display.
stop_task()
def stop_task(
task_id: rich.progress.TaskID,
):
Stop a task.
This will freeze the elapsed time on the task.
Parameter | Type |
---|---|
task_id |
rich.progress.TaskID |
track()
def track(
sequence: typing.Union[typing.Iterable[~ProgressType], typing.Sequence[~ProgressType]],
total: typing.Optional[float],
completed: int,
task_id: typing.Optional[rich.progress.TaskID],
description: str,
update_period: float,
):
Track progress by iterating over a sequence.
Parameter | Type |
---|---|
sequence |
typing.Union[typing.Iterable[~ProgressType], typing.Sequence[~ProgressType]] |
total |
typing.Optional[float] |
completed |
int |
task_id |
typing.Optional[rich.progress.TaskID] |
description |
str |
update_period |
float |
update()
def update(
task_id: rich.progress.TaskID,
total: typing.Optional[float],
completed: typing.Optional[float],
advance: typing.Optional[float],
description: typing.Optional[str],
visible: typing.Optional[bool],
refresh: bool,
fields: typing.Any,
):
Update information associated with a task.
Parameter | Type |
---|---|
task_id |
rich.progress.TaskID |
total |
typing.Optional[float] |
completed |
typing.Optional[float] |
advance |
typing.Optional[float] |
description |
typing.Optional[str] |
visible |
typing.Optional[bool] |
refresh |
bool |
fields |
typing.Any |
wrap_file()
def wrap_file(
file: typing.BinaryIO,
total: typing.Optional[int],
task_id: typing.Optional[rich.progress.TaskID],
description: str,
):
Track progress file reading from a binary file.
Parameter | Type |
---|---|
file |
typing.BinaryIO |
total |
typing.Optional[int] |
task_id |
typing.Optional[rich.progress.TaskID] |
description |
str |
Properties
Property | Type | Description |
---|---|---|
console | ||
finished | ||
task_ids | ||
tasks |
flytekit.interaction.rich_utils.RichCallback
Base class and interface for callback mechanism
This class can be used directly for monitoring file transfers by
providing callback=Callback(hooks=...)
(see the hooks
argument,
below), or subclassed for more specialised behaviour.
Parameters
size: int (optional)
Nominal quantity for the value that corresponds to a complete
transfer, e.g., total number of tiles or total number of
bytes
value: int (0)
Starting internal counter value
hooks: dict or None
A dict of named functions to be called on each update. The signature
of these must be f(size, value, **kwargs)
def RichCallback(
rich_kwargs: typing.Optional[typing.Dict],
kwargs,
):
Parameter | Type |
---|---|
rich_kwargs |
typing.Optional[typing.Dict] |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
absolute_update() |
Set the internal value state |
as_callback() |
Transform callback= |
branch() |
Set callbacks for child transfers |
branch_coro() |
Wraps a coroutine, and pass a new child callback to it |
branched() |
Return callback for child transfers |
call() |
Execute hook(s) with current state |
close() |
Close callback |
no_op() |
None |
relative_update() |
Delta increment the internal counter |
set_size() |
Set the internal maximum size attribute |
wrap() |
Wrap an iterable to call relative_update on each iterations |
absolute_update()
def absolute_update(
value,
):
Set the internal value state
Triggers call()
Parameters
value: int
Parameter | Type |
---|---|
value |
as_callback()
def as_callback(
maybe_callback,
):
Transform callback=… into Callback instance
For the special value of None
, return the global instance of
NoOpCallback
. This is an alternative to including
callback=DEFAULT_CALLBACK
directly in a method signature.
Parameter | Type |
---|---|
maybe_callback |
branch()
def branch(
path_1,
path_2,
kwargs,
):
Set callbacks for child transfers
If this callback is operating at a higher level, e.g., put, which may
trigger transfers that can also be monitored. The passed kwargs are
to be mutated to add callback=
, if this class supports branching
to children.
Parameters
path_1: str Child’s source path path_2: str Child’s destination path kwargs: dict arguments passed to child method, e.g., put_file.
Returns
Parameter | Type |
---|---|
path_1 |
|
path_2 |
|
kwargs |
**kwargs |
branch_coro()
def branch_coro(
fn,
):
Wraps a coroutine, and pass a new child callback to it.
Parameter | Type |
---|---|
fn |
branched()
def branched(
path_1,
path_2,
kwargs,
):
Return callback for child transfers
If this callback is operating at a higher level, e.g., put, which may
trigger transfers that can also be monitored. The function returns a callback
that has to be passed to the child method, e.g., put_file,
as callback=
argument.
The implementation uses callback.branch
for compatibility.
When implementing callbacks, it is recommended to override this function instead
of branch
and avoid calling super().branched(...)
.
Prefer using this function over branch
.
Parameters
path_1: str Child’s source path path_2: str Child’s destination path **kwargs: Arbitrary keyword arguments
Returns
callback: Callback A callback instance to be passed to the child method
Parameter | Type |
---|---|
path_1 |
|
path_2 |
|
kwargs |
**kwargs |
call()
def call(
hook_name,
kwargs,
):
Execute hook(s) with current state
Each function is passed the internal size and current value
Parameters
hook_name: str or None If given, execute on this hook kwargs: passed on to (all) hook(s)
Parameter | Type |
---|---|
hook_name |
|
kwargs |
**kwargs |
close()
def close()
Close callback.
no_op()
def no_op(
_,
__,
):
Parameter | Type |
---|---|
_ |
|
__ |
relative_update()
def relative_update(
inc,
):
Delta increment the internal counter
Triggers call()
Parameters
inc: int
Parameter | Type |
---|---|
inc |
set_size()
def set_size(
size,
):
Set the internal maximum size attribute
Usually called if not initially set at instantiation. Note that this
triggers a call()
.
Parameters
size: int
Parameter | Type |
---|---|
size |
wrap()
def wrap(
iterable,
):
Wrap an iterable to call relative_update
on each iterations
Parameters
iterable: Iterable The iterable that is being wrapped
Parameter | Type |
---|---|
iterable |