Skip to content

Information for Developers

Because FlowKit deployment is primarily done using Docker, the installation for developers is slightly different, see the instructions here.

An outline roadmap is provided below together with details about contributing to the project.

Followed by a guide to the FlowAPI.

FlowMachine specifications are found here.

FlowDB details are found here.

FlowETL details are found here.

Roadmap

Now

  • Additional FlowMachine aggregates exposed via API
  • Non-spatial aggregations
  • Documentation enhancements

Next

  • New metrics and insights
  • Custom geography support

Later

  • Additional language targets for FlowClient
  • Alternative login provider support for FlowAuth
  • Plugin support
  • Enhanced temporal aggregations
  • Individual level API

Contributing

We are creating FlowKit at Flowminder.

Get involved

You are welcome to contribute to the FlowKit library. To get started, please check out our contributor guidelines, and when you're ready to get started, follow the developer install guide.

FlowAPI Guide

FlowAPI is an HTTP API which provides access to the functionality of FlowMachine. A full overview of the available api routes is available as an interactive api specification.

FlowAPI uses ZeroMQ for asynchronous communication with the FlowMachine server.

FlowAPI Access tokens

As explained in the quick install guide, user authentication and access control are handled through the use of JSON Web Tokens (JWT). There are two categories of permissions (run and get_result) which can be granted to a user. JWTs allow these access permissions to be granted independently for each query kind (e.g. get results for daily location only at admin 3 resolution). The FlowAuth authentication management system is designed to generate JWTs for accessing FlowAPI.

Test Tokens

FlowKit includes the flowkit-jwt-generator package, which can be used to generate tokens for testing purposes. This package supplies:

  • Two commandline tools
    • generate-jwt, which allows you to generate tokens which grant specific kinds of access to a subset of queries, or to generate an all access token for a specific instance of FlowAPI
  • Two pytest plugins
    • access_token_builder
    • universal_access_token

Alternatively, the generate_token function can be used directly.

FlowMachine

FlowMachine is a Python toolkit for the analysis of CDR data. It is essentially a python wrapper for postgres SQL queries, including geographical queries with postgis. All of the classes in flowmachine define an SQL query under the hood.

Documentation

Documentation for FlowMachine can be found here. A worked example of using FlowMachine for analysis is provided here.

Exposing a new query kind

The FlowMachine server is responsible for defining the queries that users can run via FlowAPI. Translation from parameters provided in calls to the run API endpoint to the underlying FlowMachine query objects is handled by marshmallow schemas defined in flowmachine.core.server.query_schemas.

In this section we assume that a FlowMachine query class MyQuery (derived from flowmachine.core.query.Query) is already defined, and describe the steps required to expose this query class, so that queries of this kind can be run via the API.

The information below refers to the following categories of query:

  • aggregate: A query whose results are aggregated over groups of subscribers, so that no individual-level information is revealed. Example: histogram_aggregate
  • spatial aggregate: An aggregate query that returns a result per location (e.g. a count of subscribers per administrative region). Example: spatial_aggregate
  • individual-level query: A query whose result consists of a value per subscriber. Example: event_count
  • reference location: An individual-level query that assigns a single location to each subscriber. Example: daily_location

1. Define an "exposed query" class

In a new file 'flowmachine/core/server/query_schemas/my_query.py', define a new class MyQueryExposed. This class is responsible for constructing the appropriate MyQuery object from parameter values supplied in an API call.

There are two options here: if users should be able to select a random sample of the rows from this query result, the exposed query class should inherit from BaseExposedQueryWithSampling (this is usually appropriate for individual-level queries). If it does not make sense to allow random sampling of the query result (as is usually the case for aggregate queries), the exposed query class should inherit from BaseExposedQuery.

from flowmachine.core.server.query_schemas.base_exposed_query import BaseExposedQuery

class MyQueryExposed(BaseExposedQuery):
    query_kind = "daily_location" # (1)

    def __init__( # (2)
        self,
        *,
        start_date,
        end_date,
        sub_query,
        other_param,
    ):
        self.start_date = start_date # (3)
        self.end_date = end_date
        self.sub_query = sub_query
        self.other_param = other_param

    @property
    def aggregation_unit(self): # (4)
        return self.sub_query.aggregation_unit

    @property
    def _flowmachine_query_obj(self): # (5)
        return MyQuery(
            start=self.start_date, # (6)
            stop=self.end_date,
            sub_query=self.sub_query._flowmachine_query_obj, # (7)
            other_param=self.other_param,
            non_exposed_param="default_value", # (8)
        )
  1. query_kind class attribute is required, and must be different from the query_kind of all other exposed query classes.
  2. The __init__ method should take as arguments all parameters of MyQuery that will be exposed via the API.
  3. All input parameters must be set as attributes on self so that the object can be serialised correctly.
  4. If MyQuery is a spatial aggregate or a reference location, but does not have an explicit aggregation_unit parameter (e.g. because the aggregation unit is determined by a nested sub-query), you must define an aggregation_unit property or attribute so that other queries (and the get_aggregation_unit server action) can identify the aggregation unit associated with this query.
  5. Define a _flowmachine_query_obj property that returns the underlying MyQuery FlowMachine query object.
  6. The exposed parameters do not need to have names that match the corresponding parameters of the underlying MyQuery object.
  7. If a parameter is a nested sub-query, you will need to access its _flowmachine_query_obj property so that the MyQuery constructor receives a Flowmachine query object and not the exposed query object.
  8. It is not necessary for all parameters of the underlying MyQuery object to be exposed as parameters of MyQueryExposed.
from flowmachine.core.server.query_schemas.base_query_with_sampling import BaseExposedQueryWithSampling

class MyQueryExposed(BaseExposedQueryWithSampling):
    query_kind = "daily_location" # (1)

    def __init__( # (2)
        self,
        *,
        start_date,
        end_date,
        sub_query,
        other_param,
        sampling=None, # (9)
    ):
        self.start_date = start_date # (3)
        self.end_date = end_date
        self.sub_query = sub_query
        self.other_param = other_param
        self.sampling = sampling

    @property
    def aggregation_unit(self): # (4)
        return self.sub_query.aggregation_unit

    @property
    def _unsampled_query_obj(self): # (5)
        return MyQuery(
            start=self.start_date, # (6)
            stop=self.end_date,
            sub_query=self.sub_query._flowmachine_query_obj, # (7)
            other_param=self.other_param,
            non_exposed_param="default_value", # (8)
        )
  1. query_kind class attribute is required, and must be different from the query_kind of all other exposed query classes.
  2. The __init__ method should take as arguments all parameters of MyQuery that will be exposed via the API.
  3. All input parameters must be set as attributes on self so that the object can be serialised correctly.
  4. If MyQuery is a spatial aggregate or a reference location, but does not have an explicit aggregation_unit parameter (e.g. because the aggregation unit is determined by a nested sub-query), you must define an aggregation_unit property or attribute so that other queries (and the get_aggregation_unit server action) can identify the aggregation unit associated with this query.
  5. Define a _unsampled_query_obj property that returns the underlying MyQuery FlowMachine query object. Note: When inheriting from BaseExposedQueryWithSampling, this property should be named _unsampled_query_obj - the _flowmachine_query_obj property will return this query wrapped in an appropriate "random sample" query.
  6. The exposed parameters do not need to have names that match the corresponding parameters of the underlying MyQuery object.
  7. If a parameter is a nested sub-query, you will need to access its _flowmachine_query_obj property so that the MyQuery constructor receives a Flowmachine query object and not the exposed query object.
  8. It is not necessary for all parameters of the underlying MyQuery object to be exposed as parameters of MyQueryExposed.
  9. When inheriting from BaseExposedQueryWithSampling, it is important to also accept the sampling argument here.

2. Define a "query schema" class

In the same file as MyQueryExposed, define a new class MyQuerySchema. This is a marshmallow schema, responsible for validation and deserialisation of parameter values supplied in an API call.

As before, there are two options, depending on whether or not random sampling should be enabled for this query kind. If MyQueryExposed inherits from BaseExposedQueryWithSampling then MyQuerySchema should inherit from BaseQueryWithSamplingSchema. Otherwise, MyQuerySchema should inherit from BaseSchema.

from marshmallow import fields, validate
from flowmachine.core.server.query_schemas.field_mixins import StartAndEndField
from flowmachine.core.server.query_schemas.base_schema import BaseSchema
from flowmachine.core.server.query_schemas.aggregation_unit import AggregationUnitKind

class MyQuerySchema(
    StartAndEndField, # (1)
    BaseSchema,
):
    __model__ = MyQueryExposed # (2)

    query_kind = fields.String( # (3)
        validate=validate.OneOf([__model__.query_kind]),
        required=True,
    )
    sub_query = fields.Nested(SomeOtherQuerySchema, required=True) # (4)
    other_param = fields.Integer(
        validate=validate.Range(0, 10), # (5)
        required=False,
        load_default=0, # (6)
    )
    # Only relevant for spatial aggregates:
    aggregation_unit = AggregationUnitKind(dump_only=True) # (7)
  1. The StartAndEndField mixin adds start_date and end_date fields. There are other mixins available for adding commonly-used fields, e.g. HoursField and AggregationUnitMixin.
  2. Set MyQueryExposed as the __model__ class attribute so that MyQuerySchema will deserialise parameters to an instance of MyQueryExposed.
  3. query_kind field must be defined here. This field will not be passed on to MyQueryExposed.__init__().
  4. Sub-queries can be accepted as parameters by specifying the appropriate query schema in a marshmallow Nested field.
  5. The fields specified here should provide all necessary validation of parameter values.
  6. If you wish to set a default parameter value to be used if no value is supplied by the user, it is better to specify this here than in MyQueryExposed.__init__() so that the default value will be stated in the API spec.
  7. If MyQuery is a spatial aggregate but does not have an explicit aggregation_unit parameter (e.g. because the aggregation unit is determined by a nested sub-query), add a dump-only 'aggregation_unit' field. This enables FlowAPI to identify this query kind as a spatial aggregate, without exposing a redundant 'aggregation_unit' input parameter.
from marshmallow import fields, validate
from flowmachine.core.server.query_schemas.field_mixins import StartAndEndField
from flowmachine.core.server.query_schemas.base_query_with_sampling import BaseQueryWithSamplingSchema
from flowmachine.core.server.query_schemas.aggregation_unit import AggregationUnitKind

class MyQuerySchema(
    StartAndEndField, # (1)
    BaseQueryWithSamplingSchema, # (8)
):
    __model__ = MyQueryExposed # (2)

    query_kind = fields.String( # (3)
        validate=validate.OneOf([__model__.query_kind]),
        required=True,
    )
    sub_query = fields.Nested(SomeOtherQuerySchema, required=True) # (4)
    other_param = fields.Integer(
        validate=validate.Range(0, 10), # (5)
        required=False,
        load_default=0, # (6)
    )
    # Only relevant for spatial aggregates:
    aggregation_unit = AggregationUnitKind(dump_only=True) # (7)
  1. The StartAndEndField mixin adds start_date and end_date fields. There are other mixins available for adding commonly-used fields, e.g. HoursField and AggregationUnitMixin.
  2. Set MyQueryExposed as the __model__ class attribute so that MyQuerySchema will deserialise parameters to an instance of MyQueryExposed.
  3. query_kind field must be defined here. This field will not be passed on to MyQueryExposed.__init__().
  4. Sub-queries can be accepted as parameters by specifying the appropriate query schema in a marshmallow Nested field.
  5. The fields specified here should provide all necessary validation of parameter values.
  6. If you wish to set a default parameter value to be used if no value is supplied by the user, it is better to specify this here than in MyQueryExposed.__init__() so that the default value will be stated in the API spec.
  7. If MyQuery is a spatial aggregate but does not have an explicit aggregation_unit parameter (e.g. because the aggregation unit is determined by a nested sub-query), add a dump-only 'aggregation_unit' field. This enables FlowAPI to identify this query kind as a spatial aggregate, without exposing a redundant 'aggregation_unit' input parameter.
  8. BaseQueryWithSamplingSchema adds a sampling field.

3. Expose the query

If MyQuery is an aggregate, it can be exposed as a top-level query (meaning that API users will be able to directly run and get the results of MyQuery queries). In this case, add MyQuerySchema to FlowmachineQuerySchema.query_schemas.

Warning

If the query will be exposed as a top-level query, it is essential that the underlying FlowMachine query defined in the exposed query's _flowmachine_query_object property is redacted - i.e. all rows in the query result corresponding to 15 or fewer individuals are removed from the output. This protects individuals' privacy through k-anonymity.

If MyQuery is an individual-level query, it should not be exposed directly as a top-level query. In this case, MyQuerySchema should be added as a nested sub-query parameter of the appropriate other query schemas. For example, if MyQuery is a reference location, add MyQuerySchema to ReferenceLocationSchema.query_schemas so that it will be accepted as a parameter to query kinds such as spatial_aggregate and flows.

FlowDB

FlowDB is database designed for storing, serving, and analysing mobile operator data. The project's main features are:

  • Uses standard schema for most common mobile operator data
  • Is built as a Docker container
  • Uses PostgreSQL 12
  • Grants different permissions for users depending on need
  • Is configured for high-performance operations
  • Comes with
    • PostGIS for geospatial data handling and analysis
    • pgRouting for advanced spatial analysis
    • Utilities for connecting with Oracle databases (oracle_fdw)

Synthetic Data

In addition to the bare FlowDB container and the test data container used for tests a 'synthetic' data container is available. This container generates arbitrary quantities of data at runtime.

Two data generators are available - a Python based generator, which supports reproducible random data, and an SQL based generator which supports greater data volumes, more data types, plausible subscriber behaviour, and simple disaster scenarios.

Both are packaged in the flowminder/flowdb-synthetic-data docker image, and configured via environment variables.

For example, to generate a repeatable random data set with seven days of data, where 20,000 subscribers make 10,000 calls each day and use 5,000 cells:

docker run --name flowdb_synth_data -e FLOWMACHINE_FLOWDB_PASSWORD=foo -e FLOWAPI_FLOWDB_PASSWORD=foo \
 --publish 9000:5432 \
 -e N_CALLS=10000 -e N_SUBSCRIBERS=20000 -e N_CELLS=5000 -e N_DAYS=7 -e SYNTHETIC_DATA_GENERATOR=python \
 -e SUBSCRIBERS_SEED=11111 -e CALLS_SEED=22222 -e CELLS_SEED=33333 \
 --detach flowminder/flowdb-synthetic-data:latest

Or to generate an equivalent data set which includes TACs, mobile data sessions and sms:

docker run --name flowdb_synth_data -e FLOWMACHINE_FLOWDB_PASSWORD=foo -e FLOWAPI_FLOWDB_PASSWORD=foo \
 --publish 9000:5432 \
 -e N_CALLS=10000 -e N_SUBSCRIBERS=20000 -e N_CELLS=5000 -e N_SITES=5000 -e N_DAYS=7 -e SYNTHETIC_DATA_GENERATOR=sql \
 -e N_SMS=10000 -e N_MDS=10000 \
 --detach flowminder/flowdb-synthetic-data:latest

Warning

For generating large datasets, it is recommended that you use the SQL based generator.

SQL Generator features

The SQL generator supports semi-plausible behaviour - each subscriber has a 'home' region, and will typically (by default, 95% of the time) call/sms/use data from cells in that region. Subscribers will occasionally (by default, 1% chance per day) relocate to a new home region. Subscribers also have a consistent phone model across time, and a consistent set of other subscribers who they interact with (by default, 5*N_SUBSCRIBERS calling pairs are used).

Mass relocation scenarios are also supported - a designated admin 2 region can be chosen to be 'off limits' to all subscribers for a period. Any subscribers ordinarily resident will relocate to another randomly chosen region, and no subscriber will call from a cell within the region or relocate there while the region is off limits.

Parameters

  • N_DAYS: number of days of data to generate, defaults to 7
  • N_SUBSCRIBERS: number of simulated subscribers, defaults to 4,000
  • N_TACS: number of mobile phone models, defaults to 1,000 (SQL generator only)
  • N_SITES: number of mobile sites, defaults to 1,000 (SQL generator only)
  • N_CELLS: number of cells, defaults to 1,000
  • N_CALLS: number of calls to generate per day, defaults to 200,000
  • N_SMS: number of sms to generate per day, defaults to 200,000 (SQL generator only)
  • N_MDS: number of mobile data sessions to generate per day, defaults to 200,000 (SQL generator only)
  • SUBSCRIBERS_SEED: random seed used when generating subscribers, defaults to 11111 (Python generator only)
  • CALLS_SEED: random seed used when generating calls, defaults to 22222 (Python generator only)
  • CELLS_SEED: random seed used when generating cells, defaults to 33333 (Python generator only)
  • SYNTHETIC_DATA_GENERATOR: which data generator to use, may be'sql' or 'python'. Defaults to 'sql'
  • P_OUT_OF_AREA: probability that an event is taking place out of a subscriber's home region. Defaults to 0.05
  • P_RELOCATE: probability that each subscriber relocates each day, defaults to 0.01
  • INTERACTIONS_MULTIPLIER: multiplier for interaction pairs, defaults to 5.

FlowAuth

Quick setup to run the Frontend tests interactively

For development purposes, it is useful to be able to run the Flowauth frontend tests interactively.

  • As an initial step, ensure that all the relevant Python and Javascript dependencies are installed.

    cd /path/to/flowkit/flowauth/
    pipenv install
    
    cd /path/to/flowkit/flowauth/frontend
    npm install
    

  • The following command sets both the flowauth backend and frontend running (and also opens the flowauth web interface at http://localhost:3000/ in the browser).

    cd /path/to/flowkit/flowauth/
    pipenv run start-all
    

  • To open the Cypress UI, run the following in a separate terminal session:

    cd /path/to/flowkit/flowauth/frontend/
    npm run cy:open
    

  • You can then click the button "Run all specs", or select an individual spec to run only a subset of the tests.