flytekit.core.local_cache
Directory
Classes
Class | Description |
---|---|
Cache |
Disk and file backed cache. |
Literal |
None. |
LiteralCollection |
None. |
LiteralMap |
A ProtocolMessage. |
LocalTaskCache |
This class implements a persistent store able to cache the result of local task executions. |
ModelLiteralMap |
None. |
flytekit.core.local_cache.Cache
Disk and file backed cache.
def Cache(
directory,
timeout,
disk,
settings,
):
Initialize cache instance.
Parameter | Type |
---|---|
directory |
|
timeout |
|
disk |
|
settings |
Methods
Method | Description |
---|---|
add() |
Add key and value item to cache |
check() |
Check database and file system consistency |
clear() |
Remove all items from cache |
close() |
Close database connection |
create_tag_index() |
Create tag index on cache database |
cull() |
Cull items from cache until volume is less than size limit |
decr() |
Decrement value by delta for item with key |
delete() |
Delete corresponding item for key from cache |
drop_tag_index() |
Drop tag index on cache database |
evict() |
Remove items with matching tag from cache |
expire() |
Remove expired items from cache |
get() |
Retrieve value from cache |
incr() |
Increment value by delta for item with key |
iterkeys() |
Iterate Cache keys in database sort order |
memoize() |
Memoizing cache decorator |
peek() |
Peek at key and value item pair from side of queue in cache |
peekitem() |
Peek at key and value item pair in cache based on iteration order |
pop() |
Remove corresponding item for key from cache and return value |
pull() |
Pull key and value item pair from side of queue in cache |
push() |
Push value onto side of queue identified by prefix in cache |
read() |
Return file handle value corresponding to key from cache |
reset() |
Reset key and value item from Settings table |
set() |
Set key and value item in cache |
stats() |
Return cache statistics hits and misses |
touch() |
Touch key in cache and update expire time |
transact() |
Context manager to perform a transaction by locking the cache |
volume() |
Return estimated total size of cache on disk |
add()
def add(
key,
value,
expire,
read,
tag,
retry,
):
Add key
and value
item to cache.
Similar to set
, but only add to cache if key not present.
Operation is atomic. Only one concurrent add operation for a given key will succeed.
When read
is True
, value
should be a file-like object opened
for reading in binary mode.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
Parameter | Type |
---|---|
key |
|
value |
|
expire |
|
read |
|
tag |
|
retry |
check()
def check(
fix,
retry,
):
Check database and file system consistency.
Intended for use in testing and post-mortem error analysis.
While checking the Cache table for consistency, a writer lock is held on the database. The lock blocks other cache clients from writing to the database. For caches with many file references, the lock may be held for a long time. For example, local benchmarking shows that a cache with 1,000 file references takes ~60ms to check.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
Parameter | Type |
---|---|
fix |
|
retry |
clear()
def clear(
retry,
):
Remove all items from cache.
Removing items is an iterative process. In each iteration, a subset of items is removed. Concurrent writes may occur between iterations.
If a :exc:Timeout
occurs, the first element of the exception’s
args
attribute will be the number of items removed before the
exception occurred.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
Parameter | Type |
---|---|
retry |
close()
def close()
Close database connection.
create_tag_index()
def create_tag_index()
Create tag index on cache database.
It is better to initialize cache with tag_index=True
than use this.
:raises Timeout: if database timeout occurs
cull()
def cull(
retry,
):
Cull items from cache until volume is less than size limit.
Removing items is an iterative process. In each iteration, a subset of items is removed. Concurrent writes may occur between iterations.
If a :exc:Timeout
occurs, the first element of the exception’s
args
attribute will be the number of items removed before the
exception occurred.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
Parameter | Type |
---|---|
retry |
decr()
def decr(
key,
delta,
default,
retry,
):
Decrement value by delta for item with key.
If key is missing and default is None then raise KeyError. Else if key is missing and default is not None then use default for value.
Operation is atomic. All concurrent decrement operations will be counted individually.
Unlike Memcached, negative values are supported. Value may be decremented below zero.
Assumes value may be stored in a SQLite column. Most builds that target machines with 64-bit pointer widths will support 64-bit signed integers.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
Parameter | Type |
---|---|
key |
|
delta |
|
default |
|
retry |
delete()
def delete(
key,
retry,
):
Delete corresponding item for key
from cache.
Missing keys are ignored.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
Parameter | Type |
---|---|
key |
|
retry |
drop_tag_index()
def drop_tag_index()
Drop tag index on cache database.
:raises Timeout: if database timeout occurs
evict()
def evict(
tag,
retry,
):
Remove items with matching tag
from cache.
Removing items is an iterative process. In each iteration, a subset of items is removed. Concurrent writes may occur between iterations.
If a :exc:Timeout
occurs, the first element of the exception’s
args
attribute will be the number of items removed before the
exception occurred.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
Parameter | Type |
---|---|
tag |
|
retry |
expire()
def expire(
now,
retry,
):
Remove expired items from cache.
Removing items is an iterative process. In each iteration, a subset of items is removed. Concurrent writes may occur between iterations.
If a :exc:Timeout
occurs, the first element of the exception’s
args
attribute will be the number of items removed before the
exception occurred.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
Parameter | Type |
---|---|
now |
|
retry |
get()
def get(
key,
default,
read,
expire_time,
tag,
retry,
):
Retrieve value from cache. If key
is missing, return default
.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
Parameter | Type |
---|---|
key |
|
default |
|
read |
|
expire_time |
|
tag |
|
retry |
incr()
def incr(
key,
delta,
default,
retry,
):
Increment value by delta for item with key.
If key is missing and default is None then raise KeyError. Else if key is missing and default is not None then use default for value.
Operation is atomic. All concurrent increment operations will be counted individually.
Assumes value may be stored in a SQLite column. Most builds that target machines with 64-bit pointer widths will support 64-bit signed integers.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
Parameter | Type |
---|---|
key |
|
delta |
|
default |
|
retry |
iterkeys()
def iterkeys(
reverse,
):
Iterate Cache keys in database sort order.
cache = Cache() for key in [4, 1, 3, 0, 2]: … cache[key] = key list(cache.iterkeys()) [0, 1, 2, 3, 4] list(cache.iterkeys(reverse=True)) [4, 3, 2, 1, 0]
Parameter | Type |
---|---|
reverse |
memoize()
def memoize(
name,
typed,
expire,
tag,
ignore,
):
Memoizing cache decorator.
Decorator to wrap callable with memoizing function using cache. Repeated calls with the same arguments will lookup result in cache and avoid function evaluation.
If name is set to None (default), the callable name will be determined automatically.
When expire is set to zero, function results will not be set in the
cache. Cache lookups still occur, however. Read
:doc:case-study-landing-page-caching
for example usage.
If typed is set to True, function arguments of different types will be cached separately. For example, f(3) and f(3.0) will be treated as distinct calls with distinct results.
The original underlying function is accessible through the wrapped attribute. This is useful for introspection, for bypassing the cache, or for rewrapping the function with a different cache.
from diskcache import Cache cache = Cache() @cache.memoize(expire=1, tag=‘fib’) … def fibonacci(number): … if number == 0: … return 0 … elif number == 1: … return 1 … else: … return fibonacci(number - 1) + fibonacci(number - 2) print(fibonacci(100)) 354224848179261915075
An additional __cache_key__
attribute can be used to generate the
cache key used for the given arguments.
key = fibonacci.cache_key(100) print(cache[key]) 354224848179261915075
Remember to call memoize when decorating a callable. If you forget, then a TypeError will occur. Note the lack of parenthenses after memoize below:
@cache.memoize … def test(): … pass Traceback (most recent call last): … TypeError: name cannot be callable
Parameter | Type |
---|---|
name |
|
typed |
|
expire |
|
tag |
|
ignore |
peek()
def peek(
prefix,
default,
side,
expire_time,
tag,
retry,
):
Peek at key and value item pair from side
of queue in cache.
When prefix is None, integer keys are used. Otherwise, string keys are used in the format “prefix-integer”. Integer starts at 500 trillion.
If queue is empty, return default.
Defaults to peeking at key and value item pairs from front of queue. Set side to ‘back’ to pull from back of queue. Side must be one of ‘front’ or ‘back’.
Expired items are deleted from cache. Operation is atomic. Concurrent operations will be serialized.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
See also Cache.pull
and Cache.push
.
cache = Cache() for letter in ‘abc’: … print(cache.push(letter)) 500000000000000 500000000000001 500000000000002 key, value = cache.peek() print(key) 500000000000000 value ‘a’ key, value = cache.peek(side=‘back’) print(key) 500000000000002 value ‘c’
Parameter | Type |
---|---|
prefix |
|
default |
|
side |
|
expire_time |
|
tag |
|
retry |
peekitem()
def peekitem(
last,
expire_time,
tag,
retry,
):
Peek at key and value item pair in cache based on iteration order.
Expired items are deleted from cache. Operation is atomic. Concurrent operations will be serialized.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
cache = Cache() for num, letter in enumerate(‘abc’): … cache[letter] = num cache.peekitem() (‘c’, 2) cache.peekitem(last=False) (‘a’, 0)
Parameter | Type |
---|---|
last |
|
expire_time |
|
tag |
|
retry |
pop()
def pop(
key,
default,
expire_time,
tag,
retry,
):
Remove corresponding item for key
from cache and return value.
If key
is missing, return default
.
Operation is atomic. Concurrent operations will be serialized.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
Parameter | Type |
---|---|
key |
|
default |
|
expire_time |
|
tag |
|
retry |
pull()
def pull(
prefix,
default,
side,
expire_time,
tag,
retry,
):
Pull key and value item pair from side
of queue in cache.
When prefix is None, integer keys are used. Otherwise, string keys are used in the format “prefix-integer”. Integer starts at 500 trillion.
If queue is empty, return default.
Defaults to pulling key and value item pairs from front of queue. Set side to ‘back’ to pull from back of queue. Side must be one of ‘front’ or ‘back’.
Operation is atomic. Concurrent operations will be serialized.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
See also Cache.push
and Cache.get
.
cache = Cache() cache.pull() (None, None) for letter in ‘abc’: … print(cache.push(letter)) 500000000000000 500000000000001 500000000000002 key, value = cache.pull() print(key) 500000000000000 value ‘a’ _, value = cache.pull(side=‘back’) value ‘c’ cache.push(1234, ‘userids’) ‘userids-500000000000000’ _, value = cache.pull(‘userids’) value 1234
Parameter | Type |
---|---|
prefix |
|
default |
|
side |
|
expire_time |
|
tag |
|
retry |
push()
def push(
value,
prefix,
side,
expire,
read,
tag,
retry,
):
Push value
onto side
of queue identified by prefix
in cache.
When prefix is None, integer keys are used. Otherwise, string keys are used in the format “prefix-integer”. Integer starts at 500 trillion.
Defaults to pushing value on back of queue. Set side to ‘front’ to push value on front of queue. Side must be one of ‘back’ or ‘front’.
Operation is atomic. Concurrent operations will be serialized.
When read
is True
, value
should be a file-like object opened
for reading in binary mode.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
See also Cache.pull
.
cache = Cache() print(cache.push(‘first value’)) 500000000000000 cache.get(500000000000000) ‘first value’ print(cache.push(‘second value’)) 500000000000001 print(cache.push(’third value’, side=‘front’)) 499999999999999 cache.push(1234, prefix=‘userids’) ‘userids-500000000000000’
Parameter | Type |
---|---|
value |
|
prefix |
|
side |
|
expire |
|
read |
|
tag |
|
retry |
read()
def read(
key,
retry,
):
Return file handle value corresponding to key
from cache.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
Parameter | Type |
---|---|
key |
|
retry |
reset()
def reset(
key,
value,
update,
):
Reset key
and value
item from Settings table.
Use reset
to update the value of Cache settings correctly. Cache
settings are stored in the Settings table of the SQLite database. If
update
is False
then no attempt is made to update the database.
If value
is not given, it is reloaded from the Settings
table. Otherwise, the Settings table is updated.
Settings with the disk_
prefix correspond to Disk
attributes. Updating the value will change the unprefixed attribute on
the associated Disk instance.
Settings with the sqlite_
prefix correspond to SQLite
pragmas. Updating the value will execute the corresponding PRAGMA
statement.
SQLite PRAGMA statements may be executed before the Settings table
exists in the database by setting update
to False
.
Parameter | Type |
---|---|
key |
|
value |
|
update |
set()
def set(
key,
value,
expire,
read,
tag,
retry,
):
Set key
and value
item in cache.
When read
is True
, value
should be a file-like object opened
for reading in binary mode.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
Parameter | Type |
---|---|
key |
|
value |
|
expire |
|
read |
|
tag |
|
retry |
stats()
def stats(
enable,
reset,
):
Return cache statistics hits and misses.
Parameter | Type |
---|---|
enable |
|
reset |
touch()
def touch(
key,
expire,
retry,
):
Touch key
in cache and update expire
time.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
Parameter | Type |
---|---|
key |
|
expire |
|
retry |
transact()
def transact(
retry,
):
Context manager to perform a transaction by locking the cache.
While the cache is locked, no other write operation is permitted. Transactions should therefore be as short as possible. Read and write operations performed in a transaction are atomic. Read operations may occur concurrent to a transaction.
Transactions may be nested and may not be shared between threads.
Raises :exc:Timeout
error when database timeout occurs and retry
is
False
(default).
cache = Cache() with cache.transact(): # Atomically increment two keys. … _ = cache.incr(’total’, 123.4) … _ = cache.incr(‘count’, 1) with cache.transact(): # Atomically calculate average. … average = cache[’total’] / cache[‘count’] average 123.4
Parameter | Type |
---|---|
retry |
volume()
def volume()
Return estimated total size of cache on disk.
:return: size in bytes
Properties
Property | Type | Description |
---|---|---|
directory | ||
disk | ||
timeout |
flytekit.core.local_cache.Literal
def Literal(
scalar: typing.Optional[flytekit.models.literals.Scalar],
collection: typing.Optional[flytekit.models.literals.LiteralCollection],
map: typing.Optional[flytekit.models.literals.LiteralMap],
hash: typing.Optional[str],
metadata: typing.Optional[typing.Dict[str, str]],
offloaded_metadata: typing.Optional[flytekit.models.literals.LiteralOffloadedMetadata],
):
This IDL message represents a literal value in the Flyte ecosystem.
Parameter | Type |
---|---|
scalar |
typing.Optional[flytekit.models.literals.Scalar] |
collection |
typing.Optional[flytekit.models.literals.LiteralCollection] |
map |
typing.Optional[flytekit.models.literals.LiteralMap] |
hash |
typing.Optional[str] |
metadata |
typing.Optional[typing.Dict[str, str]] |
offloaded_metadata |
typing.Optional[flytekit.models.literals.LiteralOffloadedMetadata] |
Methods
Method | Description |
---|---|
from_flyte_idl() |
|
serialize_to_string() |
None |
set_metadata() |
Note: This is a mutation on the literal |
short_string() |
|
to_flyte_idl() |
|
verbose_string() |
from_flyte_idl()
def from_flyte_idl(
pb2_object: flyteidl.core.literals_pb2.Literal,
):
Parameter | Type |
---|---|
pb2_object |
flyteidl.core.literals_pb2.Literal |
serialize_to_string()
def serialize_to_string()
set_metadata()
def set_metadata(
metadata: typing.Dict[str, str],
):
Note: This is a mutation on the literal
Parameter | Type |
---|---|
metadata |
typing.Dict[str, str] |
short_string()
def short_string()
to_flyte_idl()
def to_flyte_idl()
verbose_string()
def verbose_string()
Properties
Property | Type | Description |
---|---|---|
collection | ||
hash | ||
is_empty | ||
map | ||
metadata | ||
offloaded_metadata | ||
scalar | ||
value |
flytekit.core.local_cache.LiteralCollection
def LiteralCollection(
literals,
):
Parameter | Type |
---|---|
literals |
Methods
Method | Description |
---|---|
from_flyte_idl() |
|
serialize_to_string() |
None |
short_string() |
|
to_flyte_idl() |
|
verbose_string() |
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter | Type |
---|---|
pb2_object |
serialize_to_string()
def serialize_to_string()
short_string()
def short_string()
to_flyte_idl()
def to_flyte_idl()
verbose_string()
def verbose_string()
Properties
Property | Type | Description |
---|---|---|
is_empty | ||
literals |
flytekit.core.local_cache.LiteralMap
A ProtocolMessage
flytekit.core.local_cache.LocalTaskCache
This class implements a persistent store able to cache the result of local task executions.
Methods
Method | Description |
---|---|
clear() |
None |
get() |
None |
initialize() |
None |
set() |
None |
clear()
def clear()
get()
def get(
task_name: str,
cache_version: str,
input_literal_map: flytekit.models.literals.LiteralMap,
cache_ignore_input_vars: typing.Tuple[str, ...],
):
Parameter | Type |
---|---|
task_name |
str |
cache_version |
str |
input_literal_map |
flytekit.models.literals.LiteralMap |
cache_ignore_input_vars |
typing.Tuple[str, ...] |
initialize()
def initialize()
set()
def set(
task_name: str,
cache_version: str,
input_literal_map: flytekit.models.literals.LiteralMap,
cache_ignore_input_vars: typing.Tuple[str, ...],
value: flytekit.models.literals.LiteralMap,
):
Parameter | Type |
---|---|
task_name |
str |
cache_version |
str |
input_literal_map |
flytekit.models.literals.LiteralMap |
cache_ignore_input_vars |
typing.Tuple[str, ...] |
value |
flytekit.models.literals.LiteralMap |
flytekit.core.local_cache.ModelLiteralMap
def ModelLiteralMap(
literals,
):
Parameter | Type |
---|---|
literals |
Methods
Method | Description |
---|---|
from_flyte_idl() |
|
serialize_to_string() |
None |
short_string() |
|
to_flyte_idl() |
|
verbose_string() |
from_flyte_idl()
def from_flyte_idl(
pb2_object,
):
Parameter | Type |
---|---|
pb2_object |
serialize_to_string()
def serialize_to_string()
short_string()
def short_string()
to_flyte_idl()
def to_flyte_idl()
verbose_string()
def verbose_string()
Properties
Property | Type | Description |
---|---|---|
is_empty | ||
literals |