ETL
Working with FlowETL¶
FlowETL manages the loading of CDR data into FlowDB. It is built on Apache Airflow, and a basic understanding of how to use Airflow will be very helpful in making the best use of FlowETL. We recommend you familiarise yourself with the Airflow tutorial, and key concepts before continuing. You should also have some familiarity with SQL.
To work with FlowETL, it may be helpful to install the flowetl module into your local Python environment. This is not a prerequisite, but can be helpful when writing pipelines.
Once your FlowETL deployment is running, you can monitor the progress of your ETL tasks using the web interface, and newly added or changed pipeline definitions will be automatically picked up.
Macros¶
To help you write the SQL you'll need to create your data pipeline, FlowETL supplies several macros, in additional to those provided by Airflow. These macros are filled in when a task is run.
Macro | Purpose | Example |
---|---|---|
{{ params.cdr_type }} |
The category of CDR data being loaded | "calls" |
{{ table_name }} |
The base name of the table from the date and cdr type | "mds_20200101" |
{{ etl_schema }} |
Name of the schema used for etl tables | "etl" |
{{ final_schema }} |
Schema under which the final table will be created | "events" |
{{ parent_table }} |
Supertable that the final table will inherit from | "calls" |
{{ extract_table_name }} |
Name of the table created to extract to | "extract_sms_20200101" |
{{ staging_table_name }} |
Name of the table or view created to extract from | "staging_sms_20200101" |
{{ final_table }} |
Schema qualified name of the final table | "events.mds_20200101" |
{{ extract_table }} |
Schema qualified name of the table to be extracted to | "etl.extract_mds_20200101" |
{{ staging_table }} |
Schema qualified name of the table to be extracted from | "etl.staging_mds_20200101" |
These macros are available when using the FlowETL operators and sensors.
Creating ETL pipelines¶
As with any Airflow-based system, you will need to create dag files to define your ETL pipelines. FlowETL provides a convenient helper function, create_dag
, which automates this for most common usage scenarios. To create a pipeline, you will need to create a new file in the directory which you have bind mounted to the FlowETL container's DAG directory. This file will specify one data pipeline - we recommend creating a separate data pipeline for each CDR variant you expect to encounter.
You can find some example pipelines in the FlowETL section of our GitHub repository.
Because data comes in many forms, you must specify in SQL a transformation from your source data to the correct FlowDB schema for the CDR data type. You will need to write this transformation in the form of a SELECT
statement, using the {{ staging_table }}
macro as the source table.
Warning
When specifing a transform, you may not have (or need) all the fields specified in the schema. Any fields which are not included still need to be specified in the transform.
Fields which are not being extracted from your source data should be specified as NULL::<field_type>
, and fields must be specified in your select statement in the order they are given in the tables below.
For example, a valid SQL extract statement for calls data with only the mandatory fields available:
SELECT uuid_generate_v4()::TEXT as id, TRUE as outgoing, event_time::TIMESTAMPTZ as datetime, NULL::NUMERIC as duration,
NULL::TEXT as network, msisdn::TEXT as msisdn, NULL::TEXT as msisdn_counterpart, NULL::TEXT as location_id, NULL::TEXT as imsi,
NULL::TEXT as imei, NULL::NUMERIC(8) as tac, NULL::NUMERIC as operator_code, NULL::NUMERIC as country_code
FROM {{ staging_table }}
Calls data¶
FlowDB uses a two-line format for calls data, similar to double entry bookkeeping. One row records information about the msisdn, location etc. of the calling party, and a second records the equivalent information for the receiver. If your data is supplied in one-line format, you will need to transform it to two-lines as part of the extract step.
The result of your extract SQL must conform to this schema:
Field | Type | Optional | Notes |
---|---|---|---|
id | TEXT | ✓ | ID which matches this call to the counterparty record if available |
outgoing | BOOLEAN | ✓ | True if this is the initiating record for the call |
datetime | TIMESTAMPTZ | ✗ | The time the call began (we recommend storing this in UTC) |
duration | NUMERIC | ✓ | The length of time in fractional minutes that call lasted |
network | TEXT | ✓ | The network this party was on |
msisdn | TEXT | ✗ | The (deidentified) MSISDN of this party's phone |
msisdn_counterpart | TEXT | ✓ | The (deidentified) MSISDN of the other party's phone |
location_id | TEXT | ✓ | The ID of the location, which should refer to a cell in infrastructure.cells |
imsi | TEXT | ✓ | The (deidentified) IMSI of this party's phone |
imei | TEXT | ✓ | The (deidentified) IMEI of this party's phone |
tac | NUMERIC(8) | ✓ | The TAC code of this party's handset |
operator_code | NUMERIC | ✓ | The numeric code of this party's operator |
country_code | NUMERIC | ✓ | The numeric code of this party's country |
SMS data¶
As with calls, FlowDB uses a two-line format for sms data. One row records information about the msisdn, location etc. of the sender , and a second records the equivalent information for any receivers.
The result of your extract SQL must conform to this schema:
Field | Type | Optional | Notes |
---|---|---|---|
id | TEXT | ✓ | ID which matches this sms to the counterparty record if available |
outgoing | BOOLEAN | ✓ | True if this is the initiating record for the sms |
datetime | TIMESTAMPTZ | ✗ | The time the sms was sent (we recommend storing this in UTC) |
network | TEXT | ✓ | The network this party was on |
msisdn | TEXT | ✗ | The (deidentified) MSISDN of this party's phone |
msisdn_counterpart | TEXT | ✓ | The (deidentified) MSISDN of the other party's phone |
location_id | TEXT | ✓ | The ID of the cell which handled this sms, which should refer to a cell in infrastructure.cells |
imsi | TEXT | ✓ | The (deidentified) IMSI of this party's phone |
imei | TEXT | ✓ | The (deidentified) IMEI of this party's phone |
tac | NUMERIC(8) | ✓ | The TAC code of this party's handset |
operator_code | NUMERIC | ✓ | The numeric code of this party's operator |
country_code | NUMERIC | ✓ | The numeric code of this party's country |
Mobile data session (MDS) data¶
Because there is no counterparty for MDS data, FlowDB uses a one-line format for this data type, with the following schema:
Field | Type | Optional | Notes |
---|---|---|---|
id | TEXT | ✓ | ID which matches this call to the counterparty record if available |
datetime | TIMESTAMPTZ | ✗ | The time the data session began (we recommend storing this in UTC) |
duration | NUMERIC | ✓ | The length of time in fractional minutes that data session lasted |
volume_total | NUMERIC | ✓ | Data volume transferred in MB |
volume_upload | NUMERIC | ✓ | Data volume sent in MB |
volume_download | NUMERIC | ✓ | Data volume downloaded in MB |
network | TEXT | ✓ | The network this party was on |
msisdn | TEXT | ✗ | The (deidentified) MSISDN of this party's phone |
location_id | TEXT | ✓ | The ID of the cell which connected this session, which should refer to a cell in infrastructure.cells |
imsi | TEXT | ✓ | The (deidentified) IMSI of the phone |
imei | TEXT | ✓ | The (deidentified) IMEI of the phone |
tac | NUMERIC(8) | ✓ | The TAC code of the handset |
operator_code | NUMERIC | ✓ | The numeric code of the operator |
country_code | NUMERIC | ✓ | The numeric code of the country |
Topup data¶
As with MDS data, FlowDB uses a one-line format for topups, with the following schema:
Field | Type | Optional | Notes |
---|---|---|---|
id | TEXT | ✓ | ID which matches this call to the counterparty record if available |
datetime | TIMESTAMPTZ | ✗ | The time the data session began (we recommend storing this in UTC) |
type | TEXT | ✓ | Kind of topup |
recharge_amount | NUMERIC | ✓ | Cost of topup |
airtime_fee | NUMERIC | ✓ | |
tax_and_fee | NUMERIC | ✓ | |
pre_event_balance | NUMERIC | ✓ | Balance before topup applied |
post_event_balance | NUMERIC | ✓ | Balance after topup applied |
msisdn | TEXT | ✗ | The (deidentified) MSISDN of this party's phone |
location_id | TEXT | ✓ | The ID of the cell which connected this session, which should refer to a cell in infrastructure.cells |
imsi | TEXT | ✓ | The (deidentified) IMSI of the phone |
imei | TEXT | ✓ | The (deidentified) IMEI of the phone |
tac | NUMERIC(8) | ✓ | The TAC code of the handset |
operator_code | NUMERIC | ✓ | The numeric code of the operator |
country_code | NUMERIC | ✓ | The numeric code of the country |
FlowETL supports a variety of data sources, which are covered in more detail below.
Connecting to different CDR data sources¶
Remote databases¶
Extracting data from a remote database has significant benefits, because the source database provides guarantees about the integrity of the data in terms of data types, nullable values and so on. FlowDB uses PostgreSQL's foreign data wrapper mechanism to connect to external databases. In general, to use a remote database table as a source you should create a persistent foreign table that will contain the data (instructions of how to do this for the databases supported by FlowETL are given below), then you will need to provide an SQL snippet to use for extraction. For example, to extract three fields:
SELECT event_time, msisdn, cell_id FROM {{ source_table }}
WHERE event_time >= '{{ ds_nodash }}' AND event_time < '{{ tomorrow_ds_nodash }}';
Note the use of the {{ source_table }}
, which you should provide as an argument to either create_dag
, or CreateStagingViewOperator
, and the datetime constraint using the {{ ds_nodash }}
and {{ tomorrow_ds_nodash }}
Airflow macros. Remote database extraction in FlowETL works by selecting a time delimited segment of your source table. If your source table is complex, or you will need to use multiple tables to contruct a suitable query, we recommend creating a view in your source database and connecting to the view.
You will also need to be able to connect to the remote database from inside FlowDB's docker container. If the remote database is also running as a container, you can achieve this by creating an overlay network and attaching both containers to it.
PostgreSQL database¶
The postgres_fdw
extension allows FlowDB to communicate with other databases built on PostgreSQL. This makes data extraction from remote PostgreSQL-compatible databases (e.g. TimescaleDB) simple. You will need to create a foreign server, map a user for it, and then specify the table and schema on the remote database you would like to make available within FlowDB:
CREATE SERVER IF NOT EXISTS postgres_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (
host '<foreign_host>',
port '<foreign_port>>',
dbname '<foreign_db_name>'
);
CREATE USER MAPPING IF NOT EXISTS FOR flowdb
SERVER postgres_server
OPTIONS (
user '<postgres_db_user>',
password '<postgres_db_password>'
);
IMPORT FOREIGN SCHEMA <source_table_schema> LIMIT TO (<source_table>)
FROM SERVER foreign_db INTO etl;
You can then use the remote table as a data source in FlowETL, by providing "<source_table_schema>.<source_table>"
as the source table.
Oracle database¶
FlowDB supports the oracle_fdw connector, but to conform with Oracle's licensing we do not distribute a container which includes it. You will need to supply Oracle client binaries, and build the container by downloading the Dockerfile and script and running:
docker build --build-arg ORACLE_BINARY_SOURCE=<oracle_binary_url> \
--build-arg CODE_VERSION=latest -t flowdb_with_oracle
Once you have built the image, you can use it in place of standard FlowDB.
To connect to Oracle, you will need to access FlowDB as the flowdb
user and run:
CREATE SERVER oracle_server FOREIGN DATA WRAPPER oracle_fdw
OPTIONS (dbserver '//<oracle_server>:<oracle_port>/<oracle_db>');
CREATE USER MAPPING FOR flowdb SERVER oracle_server
OPTIONS (user '<oracle_user>', password '<oracle_password>');
CREATE FOREIGN TABLE oracle_source_table (
<fields>
) SERVER oracle_server OPTIONS (schema '<schema>', table '<table_name>');
Further instructions on use of the wrapper are available from the projects Github repo.
MSSQL database¶
FlowDB includes tds_fdw, which supports data extraction from Microsoft SQL server. Usage is very similar to the other remote database connections:
CREATE SERVER mssql_server
FOREIGN DATA WRAPPER tds_fdw
OPTIONS (servername '<remote_database_host>', port '<remote_database_port>', database '<remote_database_name>', tds_version '<tds_version>');
CREATE USER MAPPING FOR flowdb
SERVER mssql_server
OPTIONS (username '<remote_user>', password '<remote_password>');
CREATE FOREIGN TABLE mssql_table (
<fields>
)
SERVER mssql_server
OPTIONS (table_name '<schema_qualified_table_name>', row_estimate_method 'showplan_all');
You should choose the tds version based on the software version of the remote database - additional information on the correct version for different MSSQL server versions is available here.
CSV Files¶
FlowETL can also be used to load data from files - useful if you're receiving data as a daily file dump. To load from a file, you'll need to ensure that the files have a predictable date based name, for example calls_data_2019_05_01.csv.gz
, which you can capture using a (templated) string. The filename pattern should include the absolute path to the files from inside your FlowDB container, for example a complete pattern to capture files with names like calls_data_2019-05-01.csv.gz
in the /etl/calls
data root might be /etl/{{ params.cdr_type }}/{{ params.cdr_type }}_data_{{ ds }}
. This uses a combination of Airflow's built-in macros, and the {{ params.cdr_type }}
macro supplied by FlowETL.
You will also need to specify the names and types of the fields your CSV will contain as a dict. These will be used to create a foreign data wrapper which allows FlowDB to treat your data file like a table, and helps ensure data integrity. You may optionally specify a program to be run to read your data file, for example zcat
if your source data is compressed. You can either pass these arguments to the create_dag
function, or use CreateForeignStagingTableOperator
directly if composing your DAG manually. If you are composing the DAG manually, you will need to use the ExtractFromForeignTableOperator
to ensure that FlowETL correctly handles cleanup of intermediary ETL steps.
Loading Infrastructure data¶
At this time, FlowETL does not provide helper functions loading infrastructure data. You can however still create a custom DAG for regular infrastructure data import using either the CreateForeignStagingTableOperator
and ExtractFromForeignTableOperator
operators, the CreateStagingViewOperator
and ExtractFromViewOperator
, or by using Airflow's existing collection of operators.
Cell data should be loaded to the infrastructure.cells
table, and contain fields as follows:
Field | Type | Notes |
---|---|---|
id | TEXT | ID of the cell as referenced in the CDR |
version | INTEGER | Add a new row and increment if details of the cell change |
site_id | TEXT | ID of the cell tower this cell is on |
name | TEXT | Any name attached to this cell |
type | TEXT | Type of the cell |
msc | TEXT | Mobile switching centre |
bsc_rnc | TEXT | |
antenna_type | TEXT | |
status | TEXT | |
lac | TEXT | |
height | NUMERIC | |
azimuth | NUMERIC | |
transmitter | TEXT | |
max_range | NUMERIC | (m) |
min_range | NUMERIC | (m) |
electrical_tilt | NUMERIC | |
mechanical_downtilt | NUMERIC | |
date_of_first_service | DATE | Date this cell became operational |
date_of_last_service | DATE | Date this cell ceased operation |
geom_point | POINT | ESPG 4326 point location of the cell |
geom_polygon | MULTIPOLYGON | ESPG 4326 coverage polygon of the cell |
This should generally be used in concert with the infrastructure.sites
table:
Field | Type | Notes |
---|---|---|
id | TEXT | ID of the site as referenced in the cells table |
version | INTEGER | Add a new row and increment if details of the site change |
name | TEXT | Any name attached to this cell |
type | TEXT | Type of the cell |
status | TEXT | |
structure_type | TEXT | |
is_cow | BOOLEAN | True indicates that this is a mobile tower |
date_of_first_service | DATE | Date this site became operational |
date_of_last_service | DATE | Date this site ceased operation |
geom_point | POINT | ESPG 4326 point location of the site |
geom_polygon | MULTIPOLYGON | ESPG 4326 coverage polygon of the site |
Where site information is not available, we advise that you populate the sites table as a function of the cells table.
Loading GIS data¶
Good spatial data is critical to successful analysis using FlowKit, and you will want to obtain, at a minimum admin 0-3 boundaries for your country of interest. At this time, we recommend using the OGR foreign data wrapper to load shapefiles.
Administrative boundaries should be loaded under the geography
schema, and named as admin<level>
. They should at a minimum contain adminXname
, adminXpcod
and geom
fields. While it is possible to directly mount boundary data into the database using the OGR foreign data wrapper, we would recommend creating in-database tables and using the wrapper only for extraction, for example:
CREATE SERVER admin_boundaries_source
FOREIGN DATA WRAPPER ogr_fdw
OPTIONS (
datasource '<path_inside_container_to_admin_3_file',
format 'ESRI Shapefile' );
CREATE FOREIGN TABLE tmp_admin3 (
gid integer,
geom geometry(Point, 4326),
admin3name varchar,
admin3pcod integer
)
SERVER admin_3_source
OPTIONS (layer 'admin3');
CREATE TABLE geography.admin3 AS (
SELECT gid, geom, admin3name, admin3pcod FROM
tmp_admin3);
You should also create an index on the geom
column:
CREATE INDEX admin3_geom_gist ON geography.admin3 USING gist (geom);
Data QA checks¶
FlowETL includes a small number of built in QA checks. These checks are not designed to pass or fail newly arriving data, but to provide you and your analysts with important caveats and metadata about the data you are working with. QA checks will run automatically if you are using the create_dag
function, and their results will be available inside FlowDB in the etl.post_etl_queries
table to both superusers, and the flowmachine
role. If you are manually composing a DAG, you can use the get_qa_checks
function to return a list of QA check tasks, which can be scheduled in relation to the other tasks in the dag.
Customising ETL pipelines¶
In addition to the create_dag
function, FlowETL allows you to compose DAGs in exactly the same way as you would with AirFlow. You can mix and match FlowETL operators with AirFlow's built in ones, or extend FlowETL to add your own. Because the create_dag
functions returns a DAG, you can also use the returned DAG as a basis for a pipeline and extend it further in the same DAG file.
Adding new QA checks¶
Adding additional QA checks is as simple as writing an SQL select statement. To add your own custom checks, create a qa_checks
subfolder in the directory where you are keeping your dags. You can then create your check as a <check name.sql>
file, and it will automatically get picked up for the next run of the dag.
SQL files under the qa_checks
directory are treated as applicable for all CDR types. If you need a type specific check, place it in qa_checks/<cdr_type>
.
Note
To be a valid QA check, your select statement should return a single value.
When writing your QA check, you're almost certain to need to be able to refer to the table of data that's just been loaded. Because the SQL file will be templated, you can use a macro which will be filled in when the check is run. The macro for the final table is {{ final_table }}
.
Here's an example of a valid QA check; one of the defaults which records the number of just-added rows:
SELECT COUNT(*) FROM {{ final_table }}
Worked example¶
A complete worked example covering deploying and using FlowETL to load data from both a remote PostgreSQL database and CSV files can be found in the main FlowKit repository.