flowmachine.core.cache¶
Source: flowmachine/core/cache.py
Functions which deal with inspecting and managing the query cache.
cache_table_exists¶
cache_table_exists(connection: 'Connection', query_id: str) -> bool
Return True if a cache table for the query with id query_id
exist, otherwise return False.
Parameters¶
-
connection
:Connection
-
query_id
:str
Unique id of the cached query
Returns¶
bool
get_cache_half_life¶
get_cache_half_life(connection: 'Connection') -> float
Get the current setting for cache half-life.
Parameters¶
connection
:Connection
Returns¶
-
float
Cache half-life setting
get_cache_protected_period¶
get_cache_protected_period(connection: 'Connection') -> float
Get the current setting for cache protected period.
Parameters¶
connection
:Connection
Returns¶
-
float
Cache protected period
get_cached_query_objects_ordered_by_score¶
get_cached_query_objects_ordered_by_score(connection: 'Connection', protected_period: Optional[int] = None) -> Tuple[ForwardRef('Query'), int]
Get all cached query objects in ascending cache score order.
Parameters¶
-
connection
:Connection
-
protected_period
:typing.Optional
, defaultNone
Optionally specify a number of seconds within which cache entries are excluded. If None, the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection completely.
Returns¶
typing.Tuple
get_compute_time¶
get_compute_time(connection: 'Connection', query_id: str) -> float
Get the time in seconds that a cached query took to compute.
Parameters¶
-
connection
:Connection
-
query_id
:str
Unique id of the query
Returns¶
-
float
Number of seconds the query took to compute
get_max_size_of_cache¶
get_max_size_of_cache(connection: 'Connection') -> int
Get the upper limit set in FlowDB for the cache size, in bytes.
Parameters¶
connection
:Connection
Returns¶
-
int
Number of bytes in total available to cache tables
get_obj_or_stub¶
get_obj_or_stub(connection: 'Connection', query_id: str)
Get a query object by ID if the object can be recreated. If it cannot then get a stub which keeps the query's dependency relationships as recorded in the database.
Parameters¶
-
connection
:Connection
DB connection to get the object from
-
query_id
:str
Unique identifier of the query
Returns¶
-
Query
The query object if it can be recreated, or a 'stub' class which records the dependencies it had if not.
get_query_object_by_id¶
get_query_object_by_id(connection: 'Connection', query_id: str) -> 'Query'
Get a query object from cache by id.
Parameters¶
-
connection
:Connection
-
query_id
:str
Unique id of the query
Returns¶
-
Query
The original query object.
get_score¶
get_score(connection: 'Connection', query_id: str) -> float
Get the current cache score for a cached query.
Parameters¶
-
connection
:Connection
-
query_id
:str
Unique id of the cached query
Returns¶
-
float
Current cache score of this query
get_size_of_cache¶
get_size_of_cache(connection: 'Connection') -> int
Get the total size in bytes of all cache tables.
Parameters¶
connection
:Connection
Returns¶
-
int
Number of bytes in total used by cache tables
get_size_of_table¶
get_size_of_table(connection: 'Connection', table_name: str, table_schema: str) -> int
Get the size on disk in bytes of a table in the database.
Parameters¶
-
connection
:Connection
-
table_name
:str
Name of table to get size of
-
table_schema
:str
Schema of the table
Returns¶
-
int
Number of bytes on disk this table uses in total
invalidate_cache_by_id¶
invalidate_cache_by_id(connection: 'Connection', query_id: str, cascade=False) -> 'Query'
Remove a query object from cache by id.
Parameters¶
-
connection
:Connection
-
query_id
:str
Unique id of the query
-
cascade
:bool
, defaultFalse
Set to true to remove any queries that depend on the one being removed
Returns¶
-
Query
The original query object.
reset_cache¶
reset_cache(connection: 'Connection', redis: redis.client.Redis, protect_table_objects: bool = True) -> None
Reset the query cache. Deletes any tables under cache schema, resets the cache count, clears the cached and dependencies tables.
Parameters¶
-
connection
:Connection
-
redis
:redis.client.Redis
-
protect_table_objects
:bool
, defaultTrue
Set to False to also remove cache metadata for Table objects which point to tables outside the cache schema
Note
You must ensure that no queries are currently running when calling this function. Any queries currently running will no longer be tracked by redis, and UNDEFINED BEHAVIOUR will occur. In addition, you should restart any interpreter running flowmachine after this command is run to ensure that the local objects have not drifted out of sync with the database.
resync_redis_with_cache¶
resync_redis_with_cache(connection: 'Connection', redis: redis.client.Redis) -> None
Reset redis to be in sync with the current contents of the cache.
Parameters¶
-
connection
:Connection
-
redis
:redis.client.Redis
Returns¶
None
Note
You must ensure that no queries are currently running when calling this function. Any queries currently running will no longer be tracked by redis, and UNDEFINED BEHAVIOUR will occur.
run_ops_list_and_return_execution_time¶
run_ops_list_and_return_execution_time(query_ddl_ops: List[str], connection: sqlalchemy.engine.base.Engine) -> float
Run a list of sql operations which may begin with explain analyze and return the overall execution time of the first op if available.
Parameters¶
-
query_ddl_ops
:typing.List
List of SQL commands to execute
-
connection
:sqlalchemy.engine.base.Engine
SQLAlchemy engine to execute with
Returns¶
-
float
The execution time of the query if available or 0
set_cache_half_life¶
set_cache_half_life(connection: 'Connection', cache_half_life: float) -> None
Set the cache's half-life.
Parameters¶
-
connection
:Connection
-
cache_half_life
:float
Setting for half-life
Note
Changing this setting without flushing the cache first may have unpredictable consequences and should be avoided.
set_cache_protected_period¶
set_cache_protected_period(connection: 'Connection', protected_period: int) -> None
Set the cache's half-life.
Parameters¶
-
connection
:Connection
-
protected_period
:int
Optionally specify a number of seconds within which cache entries are excluded.
set_max_size_of_cache¶
set_max_size_of_cache(connection: 'Connection', cache_size_limit: int) -> None
Set the upper limit set in FlowDB for the cache size, in bytes.
Parameters¶
-
connection
:Connection
-
cache_size_limit
:int
Size in bytes to set as the cache limit
shrink_below_size¶
shrink_below_size(connection: 'Connection', size_threshold: int = None, dry_run: bool = False, protected_period: Optional[int] = None) -> 'Query'
Remove queries from the cache until it is below a specified size threshold.
Parameters¶
-
connection
:Connection
-
size_threshold
:int
, defaultNone
Optionally override the maximum cache size set in flowdb.
-
dry_run
:bool
, defaultFalse
Set to true to just report the objects that would be removed and not remove them
-
protected_period
:typing.Optional
, defaultNone
Optionally specify a number of seconds within which cache entries are excluded. If None, the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection completely.
Returns¶
-
Query
List of the queries that were removed
shrink_one¶
shrink_one(connection: 'Connection', dry_run: bool = False, protected_period: Optional[int] = None) -> 'Query'
Remove the lowest scoring cached query from cache and return it and size of it in bytes.
Parameters¶
-
connection
:Connection
-
dry_run
:bool
, defaultFalse
Set to true to just report the object that would be removed and not remove it protected_period : int, default None Optionally specify a number of seconds within which cache entries are excluded. If None, the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection completely.
Returns¶
Query
touch_cache¶
touch_cache(connection: 'Connection', query_id: str) -> float
'Touch' a cache record and update the cache score for it.
Parameters¶
-
connection
:Connection
-
query_id
:str
Unique id of the query to touch
Returns¶
-
float
The new cache score
watch_and_shrink_cache¶
watch_and_shrink_cache(*, flowdb_connection: 'Connection', pool: concurrent.futures._base.Executor, sleep_time: int = 86400, timeout: Optional[int] = 600, loop: bool = True, size_threshold: int = None, dry_run: bool = False, protected_period: Optional[int] = None) -> None
Background task to periodically trigger a shrink of the cache.
Parameters¶
-
flowdb_connection
:Connection
Flowdb connection to check dates on
-
pool
:concurrent.futures._base.Executor
Executor to run the date check with
-
sleep_time
:int
, default86400
Number of seconds to sleep for between checks
-
timeout
:typing.Optional
, default600
Seconds to wait for a cache shrink to complete before cancelling it
-
loop
:bool
, defaultTrue
Set to false to return after the first check
-
size_threshold
:int
, defaultNone
Optionally override the maximum cache size set in flowdb.
-
dry_run
:bool
, defaultFalse
Set to true to just report the objects that would be removed and not remove them
-
protected_period
:typing.Optional
, defaultNone
Optionally specify a number of seconds within which cache entries are excluded. If None, the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection completely.
Returns¶
None
write_cache_metadata¶
write_cache_metadata(connection: sqlalchemy.engine.base.Transaction, query: 'Query', compute_time: Optional[float] = None, executed_sql: str = '')
Helper function for store, updates flowmachine metadata table to log that this query is stored, but does not actually store the query.
Parameters¶
-
connection
:sqlalchemy.engine.base.Transaction
sqlalchemy Transaction to use
-
query
:Query
Query object to write metadata about
-
compute_time
:typing.Optional
, defaultNone
Optionally provide the compute time for the query
-
executed_sql
:str
, default ````Optionally provide the sql executed when this cache record was created.
write_query_to_cache¶
write_query_to_cache(*, name: str, redis: redis.client.Redis, query: 'Query', connection: 'Connection', ddl_ops_func: Callable[[str, str], List[str]], schema: Optional[str] = 'cache', sleep_duration: Optional[int] = 1, analyze=True) -> 'Query'
Write a Query object into a postgres table and update the cache metadata about it. Attempts to update the query's state in redis to executing
, and if successful, tries to run it. If unable to update the state to executing
, will block until the query is in a completed
, errored
, or cancelled
state.
Parameters¶
-
name
:str
Name of the table to write to
-
redis
:redis.client.Redis
Redis connection to use to update state
-
query
:Query
Query object to write
-
connection
:Connection
Flowmachine connection to use for writing
-
ddl_ops_func
:typing.Callable
Function that will be called to generate a list of SQL statements to run. Should accept name and schema as arguments and return a list of SQL strings.
-
schema
:typing.Optional
, defaultcache
Name of the schema to write to
-
sleep_duration
:typing.Optional
, default1
Number of seconds to wait between polls when monitoring a query being written from elsewhere
-
analyze
:bool
, defaultTrue
Set to False to disable running analyze on the newly created table to generate statistics
Returns¶
-
Query
The query object which was written once the write has completed successfully.
Note
This is a blocking function, and will not return until the query is no longer in an executing state.