Skip to content


Source: flowmachine/core/

Functions which deal with inspecting and managing the query cache.


cache_table_exists(connection: 'Connection', query_id: str) -> bool
Source: flowmachine/core/

Return True if a cache table for the query with id query_id exist, otherwise return False.


  • connection: Connection

  • query_id: str

    Unique id of the cached query


  • bool


get_cache_half_life(connection: 'Connection') -> float
Source: flowmachine/core/

Get the current setting for cache half-life.


  • connection: Connection


  • float

    Cache half-life setting


get_cache_protected_period(connection: 'Connection') -> float
Source: flowmachine/core/

Get the current setting for cache protected period.


  • connection: Connection


  • float

    Cache protected period


get_cached_query_objects_ordered_by_score(connection: 'Connection', protected_period: Optional[int] = None) -> Tuple[ForwardRef('Query'), int]
Source: flowmachine/core/

Get all cached query objects in ascending cache score order.


  • connection: Connection

  • protected_period: typing.Optional, default None

    Optionally specify a number of seconds within which cache entries are excluded. If None, the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection completely.


  • typing.Tuple


get_compute_time(connection: 'Connection', query_id: str) -> float
Source: flowmachine/core/

Get the time in seconds that a cached query took to compute.


  • connection: Connection

  • query_id: str

    Unique id of the query


  • float

    Number of seconds the query took to compute


get_max_size_of_cache(connection: 'Connection') -> int
Source: flowmachine/core/

Get the upper limit set in FlowDB for the cache size, in bytes.


  • connection: Connection


  • int

    Number of bytes in total available to cache tables


get_obj_or_stub(connection: 'Connection', query_id: str)
Source: flowmachine/core/

Get a query object by ID if the object can be recreated. If it cannot then get a stub which keeps the query's dependency relationships as recorded in the database.


  • connection: Connection

    DB connection to get the object from

  • query_id: str

    Unique identifier of the query


  • Query

    The query object if it can be recreated, or a 'stub' class which records the dependencies it had if not.


get_query_object_by_id(connection: 'Connection', query_id: str) -> 'Query'
Source: flowmachine/core/

Get a query object from cache by id.


  • connection: Connection

  • query_id: str

    Unique id of the query


  • Query

    The original query object.


get_score(connection: 'Connection', query_id: str) -> float
Source: flowmachine/core/

Get the current cache score for a cached query.


  • connection: Connection

  • query_id: str

    Unique id of the cached query


  • float

    Current cache score of this query


get_size_of_cache(connection: 'Connection') -> int
Source: flowmachine/core/

Get the total size in bytes of all cache tables.


  • connection: Connection


  • int

    Number of bytes in total used by cache tables


get_size_of_table(connection: 'Connection', table_name: str, table_schema: str) -> int
Source: flowmachine/core/

Get the size on disk in bytes of a table in the database.


  • connection: Connection

  • table_name: str

    Name of table to get size of

  • table_schema: str

    Schema of the table


  • int

    Number of bytes on disk this table uses in total


invalidate_cache_by_id(connection: 'Connection', query_id: str, cascade=False) -> 'Query'
Source: flowmachine/core/

Remove a query object from cache by id.


  • connection: Connection

  • query_id: str

    Unique id of the query

  • cascade: bool, default False

    Set to true to remove any queries that depend on the one being removed


  • Query

    The original query object.


reset_cache(connection: 'Connection', redis: redis.client.Redis, protect_table_objects: bool = True) -> None
Source: flowmachine/core/

Reset the query cache. Deletes any tables under cache schema, resets the cache count, clears the cached and dependencies tables.


  • connection: Connection

  • redis: redis.client.Redis

  • protect_table_objects: bool, default True

    Set to False to also remove cache metadata for Table objects which point to tables outside the cache schema


You must ensure that no queries are currently running when calling this function. Any queries currently running will no longer be tracked by redis, and UNDEFINED BEHAVIOUR will occur. In addition, you should restart any interpreter running flowmachine after this command is run to ensure that the local objects have not drifted out of sync with the database.


resync_redis_with_cache(connection: 'Connection', redis: redis.client.Redis) -> None
Source: flowmachine/core/

Reset redis to be in sync with the current contents of the cache.


  • connection: Connection

  • redis: redis.client.Redis


  • None


You must ensure that no queries are currently running when calling this function. Any queries currently running will no longer be tracked by redis, and UNDEFINED BEHAVIOUR will occur.


run_ops_list_and_return_execution_time(query_ddl_ops: List[str], connection: sqlalchemy.engine.base.Engine) -> float
Source: flowmachine/core/

Run a list of sql operations which may begin with explain analyze and return the overall execution time of the first op if available.


  • query_ddl_ops: typing.List

    List of SQL commands to execute

  • connection: sqlalchemy.engine.base.Engine

    SQLAlchemy engine to execute with


  • float

    The execution time of the query if available or 0


set_cache_half_life(connection: 'Connection', cache_half_life: float) -> None
Source: flowmachine/core/

Set the cache's half-life.


  • connection: Connection

  • cache_half_life: float

    Setting for half-life


Changing this setting without flushing the cache first may have unpredictable consequences and should be avoided.


set_cache_protected_period(connection: 'Connection', protected_period: int) -> None
Source: flowmachine/core/

Set the cache's half-life.


  • connection: Connection

  • protected_period: int

    Optionally specify a number of seconds within which cache entries are excluded.


set_max_size_of_cache(connection: 'Connection', cache_size_limit: int) -> None
Source: flowmachine/core/

Set the upper limit set in FlowDB for the cache size, in bytes.


  • connection: Connection

  • cache_size_limit: int

    Size in bytes to set as the cache limit


shrink_below_size(connection: 'Connection', size_threshold: int = None, dry_run: bool = False, protected_period: Optional[int] = None) -> 'Query'
Source: flowmachine/core/

Remove queries from the cache until it is below a specified size threshold.


  • connection: Connection

  • size_threshold: int, default None

    Optionally override the maximum cache size set in flowdb.

  • dry_run: bool, default False

    Set to true to just report the objects that would be removed and not remove them

  • protected_period: typing.Optional, default None

    Optionally specify a number of seconds within which cache entries are excluded. If None, the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection completely.


  • Query

    List of the queries that were removed


shrink_one(connection: 'Connection', dry_run: bool = False, protected_period: Optional[int] = None) -> 'Query'
Source: flowmachine/core/

Remove the lowest scoring cached query from cache and return it and size of it in bytes.


  • connection: Connection

  • dry_run: bool, default False

    Set to true to just report the object that would be removed and not remove it protected_period : int, default None Optionally specify a number of seconds within which cache entries are excluded. If None, the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection completely.


  • Query


touch_cache(connection: 'Connection', query_id: str) -> float
Source: flowmachine/core/

'Touch' a cache record and update the cache score for it.


  • connection: Connection

  • query_id: str

    Unique id of the query to touch


  • float

    The new cache score


watch_and_shrink_cache(*, flowdb_connection: 'Connection', pool: concurrent.futures._base.Executor, sleep_time: int = 86400, timeout: Optional[int] = 600, loop: bool = True, size_threshold: int = None, dry_run: bool = False, protected_period: Optional[int] = None) -> None
Source: flowmachine/core/

Background task to periodically trigger a shrink of the cache.


  • flowdb_connection: Connection

    Flowdb connection to check dates on

  • pool: concurrent.futures._base.Executor

    Executor to run the date check with

  • sleep_time: int, default 86400

    Number of seconds to sleep for between checks

  • timeout: typing.Optional, default 600

    Seconds to wait for a cache shrink to complete before cancelling it

  • loop: bool, default True

    Set to false to return after the first check

  • size_threshold: int, default None

    Optionally override the maximum cache size set in flowdb.

  • dry_run: bool, default False

    Set to true to just report the objects that would be removed and not remove them

  • protected_period: typing.Optional, default None

    Optionally specify a number of seconds within which cache entries are excluded. If None, the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection completely.


  • None


write_cache_metadata(connection: sqlalchemy.engine.base.Transaction, query: 'Query', compute_time: Optional[float] = None, executed_sql: str = '')
Source: flowmachine/core/

Helper function for store, updates flowmachine metadata table to log that this query is stored, but does not actually store the query.


  • connection: sqlalchemy.engine.base.Transaction

    sqlalchemy Transaction to use

  • query: Query

    Query object to write metadata about

  • compute_time: typing.Optional, default None

    Optionally provide the compute time for the query

  • executed_sql: str, default ````

    Optionally provide the sql executed when this cache record was created.


write_query_to_cache(*, name: str, redis: redis.client.Redis, query: 'Query', connection: 'Connection', ddl_ops_func: Callable[[str, str], List[str]], schema: Optional[str] = 'cache', sleep_duration: Optional[int] = 1, analyze=True) -> 'Query'
Source: flowmachine/core/

Write a Query object into a postgres table and update the cache metadata about it. Attempts to update the query's state in redis to executing, and if successful, tries to run it. If unable to update the state to executing, will block until the query is in a completed, errored, or cancelled state.


  • name: str

    Name of the table to write to

  • redis: redis.client.Redis

    Redis connection to use to update state

  • query: Query

    Query object to write

  • connection: Connection

    Flowmachine connection to use for writing

  • ddl_ops_func: typing.Callable

    Function that will be called to generate a list of SQL statements to run. Should accept name and schema as arguments and return a list of SQL strings.

  • schema: typing.Optional, default cache

    Name of the schema to write to

  • sleep_duration: typing.Optional, default 1

    Number of seconds to wait between polls when monitoring a query being written from elsewhere

  • analyze: bool, default True

    Set to False to disable running analyze on the newly created table to generate statistics


  • Query

    The query object which was written once the write has completed successfully.


This is a blocking function, and will not return until the query is no longer in an executing state.