flowmachine.core.cache¶
Source: flowmachine/core/cache.py
Functions which deal with inspecting and managing the query cache.
cache_table_exists¶
cache_table_exists(connection: 'Connection', query_id: str) -> bool
Return True if a cache table for the query with id query_id exist, otherwise return False.
Parameters¶
-
connection:Connection -
query_id:strUnique id of the cached query
Returns¶
bool
get_cache_half_life¶
get_cache_half_life(connection: 'Connection') -> float
Get the current setting for cache half-life.
Parameters¶
connection:Connection
Returns¶
-
floatCache half-life setting
get_cache_protected_period¶
get_cache_protected_period(connection: 'Connection') -> float
Get the current setting for cache protected period.
Parameters¶
connection:Connection
Returns¶
-
floatCache protected period
get_cached_query_objects_ordered_by_score¶
get_cached_query_objects_ordered_by_score(connection: 'Connection', protected_period: Union[int, NoneType] = None) -> Tuple[ForwardRef('Query'), int]
Get all cached query objects in ascending cache score order.
Parameters¶
-
connection:Connection -
protected_period:typing.Union[int, NoneType], defaultNoneOptionally specify a number of seconds within which cache entries are excluded. If None, the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection completely.
Returns¶
typing.Tuple[ForwardRef('Query'), int]
get_compute_time¶
get_compute_time(connection: 'Connection', query_id: str) -> float
Get the time in seconds that a cached query took to compute.
Parameters¶
-
connection:Connection -
query_id:strUnique id of the query
Returns¶
-
floatNumber of seconds the query took to compute
get_max_size_of_cache¶
get_max_size_of_cache(connection: 'Connection') -> int
Get the upper limit set in FlowDB for the cache size, in bytes.
Parameters¶
connection:Connection
Returns¶
-
intNumber of bytes in total available to cache tables
get_obj_or_stub¶
get_obj_or_stub(connection: 'Connection', query_id: str)
Get a query object by ID if the object can be recreated. If it cannot then get a stub which keeps the query's dependency relationships as recorded in the database.
Parameters¶
-
connection:ConnectionDB connection to get the object from
-
query_id:strUnique identifier of the query
Returns¶
-
QueryThe query object if it can be recreated, or a 'stub' class which records the dependencies it had if not.
get_query_object_by_id¶
get_query_object_by_id(connection: 'Connection', query_id: str) -> 'Query'
Get a query object from cache by id.
Parameters¶
-
connection:Connection -
query_id:strUnique id of the query
Returns¶
-
QueryThe original query object.
get_score¶
get_score(connection: 'Connection', query_id: str) -> float
Get the current cache score for a cached query.
Parameters¶
-
connection:Connection -
query_id:strUnique id of the cached query
Returns¶
-
floatCurrent cache score of this query
get_size_of_cache¶
get_size_of_cache(connection: 'Connection') -> int
Get the total size in bytes of all cache tables.
Parameters¶
connection:Connection
Returns¶
-
intNumber of bytes in total used by cache tables
get_size_of_table¶
get_size_of_table(connection: 'Connection', table_name: str, table_schema: str) -> int
Get the size on disk in bytes of a table in the database.
Parameters¶
-
connection:Connection -
table_name:strName of table to get size of
-
table_schema:strSchema of the table
Returns¶
-
intNumber of bytes on disk this table uses in total
invalidate_cache_by_id¶
invalidate_cache_by_id(connection: 'Connection', query_id: str, cascade=False) -> 'Query'
Remove a query object from cache by id.
Parameters¶
-
connection:Connection -
query_id:strUnique id of the query
-
cascade:bool, defaultFalseSet to true to remove any queries that depend on the one being removed
Returns¶
-
QueryThe original query object.
reset_cache¶
reset_cache(connection: 'Connection', redis: redis.client.Redis, protect_table_objects: bool = True) -> None
Reset the query cache. Deletes any tables under cache schema, resets the cache count, clears the cached and dependencies tables.
Parameters¶
-
connection:Connection -
redis:redis.client.Redis -
protect_table_objects:bool, defaultTrueSet to False to also remove cache metadata for Table objects which point to tables outside the cache schema
Note
You must ensure that no queries are currently running when calling this function. Any queries currently running will no longer be tracked by redis, and UNDEFINED BEHAVIOUR will occur. In addition, you should restart any interpreter running flowmachine after this command is run to ensure that the local objects have not drifted out of sync with the database.
resync_redis_with_cache¶
resync_redis_with_cache(connection: 'Connection', redis: redis.client.Redis) -> None
Reset redis to be in sync with the current contents of the cache.
Parameters¶
-
connection:Connection -
redis:redis.client.Redis
Returns¶
None
Note
You must ensure that no queries are currently running when calling this function. Any queries currently running will no longer be tracked by redis, and UNDEFINED BEHAVIOUR will occur.
run_ops_list_and_return_execution_time¶
run_ops_list_and_return_execution_time(query_ddl_ops: List[str], connection: sqlalchemy.engine.base.Engine) -> float
Run a list of sql operations which may begin with explain analyze and return the overall execution time of the first op if available.
Parameters¶
-
query_ddl_ops:typing.List[str]List of SQL commands to execute
-
connection:sqlalchemy.engine.base.EngineSQLAlchemy engine to execute with
Returns¶
-
floatThe execution time of the query if available or 0
set_cache_half_life¶
set_cache_half_life(connection: 'Connection', cache_half_life: float) -> None
Set the cache's half-life.
Parameters¶
-
connection:Connection -
cache_half_life:floatSetting for half-life
Note
Changing this setting without flushing the cache first may have unpredictable consequences and should be avoided.
set_cache_protected_period¶
set_cache_protected_period(connection: 'Connection', protected_period: int) -> None
Set the cache's half-life.
Parameters¶
-
connection:Connection -
protected_period:intOptionally specify a number of seconds within which cache entries are excluded.
set_max_size_of_cache¶
set_max_size_of_cache(connection: 'Connection', cache_size_limit: int) -> None
Set the upper limit set in FlowDB for the cache size, in bytes.
Parameters¶
-
connection:Connection -
cache_size_limit:intSize in bytes to set as the cache limit
shrink_below_size¶
shrink_below_size(connection: 'Connection', size_threshold: int = None, dry_run: bool = False, protected_period: Union[int, NoneType] = None) -> 'Query'
Remove queries from the cache until it is below a specified size threshold.
Parameters¶
-
connection:Connection -
size_threshold:int, defaultNoneOptionally override the maximum cache size set in flowdb.
-
dry_run:bool, defaultFalseSet to true to just report the objects that would be removed and not remove them
-
protected_period:typing.Union[int, NoneType], defaultNoneOptionally specify a number of seconds within which cache entries are excluded. If None, the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection completely.
Returns¶
-
QueryList of the queries that were removed
shrink_one¶
shrink_one(connection: 'Connection', dry_run: bool = False, protected_period: Union[int, NoneType] = None) -> 'Query'
Remove the lowest scoring cached query from cache and return it and size of it in bytes.
Parameters¶
-
connection:Connection -
dry_run:bool, defaultFalseSet to true to just report the object that would be removed and not remove it protected_period : int, default None Optionally specify a number of seconds within which cache entries are excluded. If None, the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection completely.
Returns¶
Query
touch_cache¶
touch_cache(connection: 'Connection', query_id: str) -> float
'Touch' a cache record and update the cache score for it.
Parameters¶
-
connection:Connection -
query_id:strUnique id of the query to touch
Returns¶
-
floatThe new cache score
watch_and_shrink_cache¶
watch_and_shrink_cache(*, flowdb_connection: 'Connection', pool: concurrent.futures._base.Executor, sleep_time: int = 86400, timeout: Union[int, NoneType] = 600, loop: bool = True, size_threshold: int = None, dry_run: bool = False, protected_period: Union[int, NoneType] = None) -> None
Background task to periodically trigger a shrink of the cache.
Parameters¶
-
flowdb_connection:ConnectionFlowdb connection to check dates on
-
pool:concurrent.futures._base.ExecutorExecutor to run the date check with
-
sleep_time:int, default86400Number of seconds to sleep for between checks
-
timeout:typing.Union[int, NoneType], default600Seconds to wait for a cache shrink to complete before cancelling it
-
loop:bool, defaultTrueSet to false to return after the first check
-
size_threshold:int, defaultNoneOptionally override the maximum cache size set in flowdb.
-
dry_run:bool, defaultFalseSet to true to just report the objects that would be removed and not remove them
-
protected_period:typing.Union[int, NoneType], defaultNoneOptionally specify a number of seconds within which cache entries are excluded. If None, the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection completely.
Returns¶
None
write_cache_metadata¶
write_cache_metadata(connection: sqlalchemy.engine.base.Transaction, query: 'Query', compute_time: Union[float, NoneType] = None, executed_sql: str = '')
Helper function for store, updates flowmachine metadata table to log that this query is stored, but does not actually store the query.
Parameters¶
-
connection:sqlalchemy.engine.base.Transactionsqlalchemy Transaction to use
-
query:QueryQuery object to write metadata about
-
compute_time:typing.Union[float, NoneType], defaultNoneOptionally provide the compute time for the query
-
executed_sql:str, default ````Optionally provide the sql executed when this cache record was created.
write_query_to_cache¶
write_query_to_cache(*, name: str, redis: redis.client.Redis, query: 'Query', connection: 'Connection', ddl_ops_func: Callable[[str, str], List[str]], schema: Union[str, NoneType] = 'cache', sleep_duration: Union[int, NoneType] = 1) -> 'Query'
Write a Query object into a postgres table and update the cache metadata about it. Attempts to update the query's state in redis to executing, and if successful, tries to run it. If unable to update the state to executing, will block until the query is in a completed, errored, or cancelled state.
Parameters¶
-
name:strName of the table to write to
-
redis:redis.client.RedisRedis connection to use to update state
-
query:QueryQuery object to write
-
connection:ConnectionFlowmachine connection to use for writing
-
ddl_ops_func:typing.Callable[[str, str], typing.List[str]]Function that will be called to generate a list of SQL statements to run. Should accept name and schema as arguments and return a list of SQL strings.
-
schema:typing.Union[str, NoneType], defaultcacheName of the schema to write to
-
sleep_duration:typing.Union[int, NoneType], default1Number of seconds to wait between polls when monitoring a query being written from elsewhere
Returns¶
-
QueryThe query object which was written once the write has completed successfully.
Note
This is a blocking function, and will not return until the query is no longer in an executing state.