Skip to content

flowmachine.core.cache

Source: flowmachine/core/cache.py

Functions which deal with inspecting and managing the query cache.

cache_table_exists

cache_table_exists(connection: 'Connection', query_id: str) -> bool
Source: flowmachine/core/cache.py

Return True if a cache table for the query with id query_id exist, otherwise return False.

Parameters

  • connection: Connection

  • query_id: str

    Unique id of the cached query

Returns

  • bool

get_cache_half_life

get_cache_half_life(connection: 'Connection') -> float
Source: flowmachine/core/cache.py

Get the current setting for cache half-life.

Parameters

  • connection: Connection

Returns

  • float

    Cache half-life setting

get_cache_protected_period

get_cache_protected_period(connection: 'Connection') -> float
Source: flowmachine/core/cache.py

Get the current setting for cache protected period.

Parameters

  • connection: Connection

Returns

  • float

    Cache protected period

get_cached_query_objects_ordered_by_score

get_cached_query_objects_ordered_by_score(connection: 'Connection', protected_period: Union[int, NoneType] = None) -> Tuple[ForwardRef('Query'), int]
Source: flowmachine/core/cache.py

Get all cached query objects in ascending cache score order.

Parameters

  • connection: Connection

  • protected_period: typing.Union[int, NoneType], default None

    Optionally specify a number of seconds within which cache entries are excluded. If None, the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection completely.

Returns

  • typing.Tuple[ForwardRef('Query'), int]

get_compute_time

get_compute_time(connection: 'Connection', query_id: str) -> float
Source: flowmachine/core/cache.py

Get the time in seconds that a cached query took to compute.

Parameters

  • connection: Connection

  • query_id: str

    Unique id of the query

Returns

  • float

    Number of seconds the query took to compute

get_max_size_of_cache

get_max_size_of_cache(connection: 'Connection') -> int
Source: flowmachine/core/cache.py

Get the upper limit set in FlowDB for the cache size, in bytes.

Parameters

  • connection: Connection

Returns

  • int

    Number of bytes in total available to cache tables

get_obj_or_stub

get_obj_or_stub(connection: 'Connection', query_id: str)
Source: flowmachine/core/cache.py

Get a query object by ID if the object can be recreated. If it cannot then get a stub which keeps the query's dependency relationships as recorded in the database.

Parameters

  • connection: Connection

    DB connection to get the object from

  • query_id: str

    Unique identifier of the query

Returns

  • Query

    The query object if it can be recreated, or a 'stub' class which records the dependencies it had if not.

get_query_object_by_id

get_query_object_by_id(connection: 'Connection', query_id: str) -> 'Query'
Source: flowmachine/core/cache.py

Get a query object from cache by id.

Parameters

  • connection: Connection

  • query_id: str

    Unique id of the query

Returns

  • Query

    The original query object.

get_score

get_score(connection: 'Connection', query_id: str) -> float
Source: flowmachine/core/cache.py

Get the current cache score for a cached query.

Parameters

  • connection: Connection

  • query_id: str

    Unique id of the cached query

Returns

  • float

    Current cache score of this query

get_size_of_cache

get_size_of_cache(connection: 'Connection') -> int
Source: flowmachine/core/cache.py

Get the total size in bytes of all cache tables.

Parameters

  • connection: Connection

Returns

  • int

    Number of bytes in total used by cache tables

get_size_of_table

get_size_of_table(connection: 'Connection', table_name: str, table_schema: str) -> int
Source: flowmachine/core/cache.py

Get the size on disk in bytes of a table in the database.

Parameters

  • connection: Connection

  • table_name: str

    Name of table to get size of

  • table_schema: str

    Schema of the table

Returns

  • int

    Number of bytes on disk this table uses in total

invalidate_cache_by_id

invalidate_cache_by_id(connection: 'Connection', query_id: str, cascade=False) -> 'Query'
Source: flowmachine/core/cache.py

Remove a query object from cache by id.

Parameters

  • connection: Connection

  • query_id: str

    Unique id of the query

  • cascade: bool, default False

    Set to true to remove any queries that depend on the one being removed

Returns

  • Query

    The original query object.

reset_cache

reset_cache(connection: 'Connection', redis: redis.client.Redis, protect_table_objects: bool = True) -> None
Source: flowmachine/core/cache.py

Reset the query cache. Deletes any tables under cache schema, resets the cache count, clears the cached and dependencies tables.

Parameters

  • connection: Connection

  • redis: redis.client.Redis

  • protect_table_objects: bool, default True

    Set to False to also remove cache metadata for Table objects which point to tables outside the cache schema

Note

You must ensure that no queries are currently running when calling this function. Any queries currently running will no longer be tracked by redis, and UNDEFINED BEHAVIOUR will occur. In addition, you should restart any interpreter running flowmachine after this command is run to ensure that the local objects have not drifted out of sync with the database.

resync_redis_with_cache

resync_redis_with_cache(connection: 'Connection', redis: redis.client.Redis) -> None
Source: flowmachine/core/cache.py

Reset redis to be in sync with the current contents of the cache.

Parameters

  • connection: Connection

  • redis: redis.client.Redis

Returns

  • None

Note

You must ensure that no queries are currently running when calling this function. Any queries currently running will no longer be tracked by redis, and UNDEFINED BEHAVIOUR will occur.

set_cache_half_life

set_cache_half_life(connection: 'Connection', cache_half_life: float) -> None
Source: flowmachine/core/cache.py

Set the cache's half-life.

Parameters

  • connection: Connection

  • cache_half_life: float

    Setting for half-life

Note

Changing this setting without flushing the cache first may have unpredictable consequences and should be avoided.

set_cache_protected_period

set_cache_protected_period(connection: 'Connection', protected_period: int) -> None
Source: flowmachine/core/cache.py

Set the cache's half-life.

Parameters

  • connection: Connection

  • protected_period: int

    Optionally specify a number of seconds within which cache entries are excluded.

set_max_size_of_cache

set_max_size_of_cache(connection: 'Connection', cache_size_limit: int) -> None
Source: flowmachine/core/cache.py

Set the upper limit set in FlowDB for the cache size, in bytes.

Parameters

  • connection: Connection

  • cache_size_limit: int

    Size in bytes to set as the cache limit

shrink_below_size

shrink_below_size(connection: 'Connection', size_threshold: int = None, dry_run: bool = False, protected_period: Union[int, NoneType] = None) -> 'Query'
Source: flowmachine/core/cache.py

Remove queries from the cache until it is below a specified size threshold.

Parameters

  • connection: Connection

  • size_threshold: int, default None

    Optionally override the maximum cache size set in flowdb.

  • dry_run: bool, default False

    Set to true to just report the objects that would be removed and not remove them

  • protected_period: typing.Union[int, NoneType], default None

    Optionally specify a number of seconds within which cache entries are excluded. If None, the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection completely.

Returns

  • Query

    List of the queries that were removed

shrink_one

shrink_one(connection: 'Connection', dry_run: bool = False, protected_period: Union[int, NoneType] = None) -> 'Query'
Source: flowmachine/core/cache.py

Remove the lowest scoring cached query from cache and return it and size of it in bytes.

Parameters

  • connection: Connection

  • dry_run: bool, default False

    Set to true to just report the object that would be removed and not remove it protected_period : int, default None Optionally specify a number of seconds within which cache entries are excluded. If None, the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection completely.

Returns

  • Query

touch_cache

touch_cache(connection: 'Connection', query_id: str) -> float
Source: flowmachine/core/cache.py

'Touch' a cache record and update the cache score for it.

Parameters

  • connection: Connection

  • query_id: str

    Unique id of the query to touch

Returns

  • float

    The new cache score

watch_and_shrink_cache

watch_and_shrink_cache(*, flowdb_connection: 'Connection', pool: concurrent.futures._base.Executor, sleep_time: int = 86400, timeout: Union[int, NoneType] = 600, loop: bool = True, size_threshold: int = None, dry_run: bool = False, protected_period: Union[int, NoneType] = None) -> None
Source: flowmachine/core/cache.py

Background task to periodically trigger a shrink of the cache.

Parameters

  • flowdb_connection: Connection

    Flowdb connection to check dates on

  • pool: concurrent.futures._base.Executor

    Executor to run the date check with

  • sleep_time: int, default 86400

    Number of seconds to sleep for between checks

  • timeout: typing.Union[int, NoneType], default 600

    Seconds to wait for a cache shrink to complete before cancelling it

  • loop: bool, default True

    Set to false to return after the first check

  • size_threshold: int, default None

    Optionally override the maximum cache size set in flowdb.

  • dry_run: bool, default False

    Set to true to just report the objects that would be removed and not remove them

  • protected_period: typing.Union[int, NoneType], default None

    Optionally specify a number of seconds within which cache entries are excluded. If None, the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection completely.

Returns

  • None

write_cache_metadata

write_cache_metadata(connection: 'Connection', query: 'Query', compute_time: Union[float, NoneType] = None)
Source: flowmachine/core/cache.py

Helper function for store, updates flowmachine metadata table to log that this query is stored, but does not actually store the query.

Parameters

  • connection: Connection

    Flowmachine connection object to use

  • query: Query

    Query object to write metadata about

  • compute_time: typing.Union[float, NoneType], default None

    Optionally provide the compute time for the query

write_query_to_cache

write_query_to_cache(*, name: str, redis: redis.client.Redis, query: 'Query', connection: 'Connection', ddl_ops_func: Callable[[str, str], List[str]], write_func: Callable[[List[str], sqlalchemy.engine.base.Engine], float], schema: Union[str, NoneType] = 'cache', sleep_duration: Union[int, NoneType] = 1) -> 'Query'
Source: flowmachine/core/cache.py

Write a Query object into a postgres table and update the cache metadata about it. Attempts to update the query's state in redis to executing, and if successful, tries to run it. If unable to update the state to executing, will block until the query is in a completed, errored, or cancelled state.

Parameters

  • name: str

    Name of the table to write to

  • redis: redis.client.Redis

    Redis connection to use to update state

  • query: Query

    Query object to write

  • connection: Connection

    Flowmachine connection to use for writing

  • ddl_ops_func: typing.Callable[[str, str], typing.List[str]]

    Function that will be called to generate a list of SQL statements to run. Should accept name and schema as arguments and return a list of SQL strings.

  • write_func: typing.Callable[[typing.List[str], sqlalchemy.engine.base.Engine], float]

    Function which will be called with the result of ddl_ops_func to perform the actual write. Should take a list of SQL strings and an SQLAlchemy Engine as arguments and return the runtime of the query.

  • schema: typing.Union[str, NoneType], default cache

    Name of the schema to write to

  • sleep_duration: typing.Union[int, NoneType], default 1

    Number of seconds to wait between polls when monitoring a query being written from elsewhere

Returns

  • Query

    The query object which was written once the write has completed successfully.

Note

This is a blocking function, and will not return until the query is no longer in an executing state.