Skip to content

flowetl.util

create_dag

create_dag(*, dag_id: str, cdr_type: str, start_date: datetime.datetime, extract_sql: str, end_date: Union[datetime.datetime, NoneType] = None, retries: int = 10, retry_delay: datetime.timedelta = datetime.timedelta(days=1), schedule_interval: Union[str, pendulum.interval.Interval] = '@daily', indexes: Iterable[str] = ('msisdn_counterpart', 'location_id', 'datetime', 'tac'), data_present_poke_interval: int = 60, data_present_timeout: int = 604800, flux_check_poke_interval: int = 60, flux_check_wait_interval: int = 60, flux_check_timeout: int = 604800, source_table: Union[str, NoneType] = None, staging_view_sql: Union[str, NoneType] = None, cluster_field: Union[str, NoneType] = None, program: Union[str, NoneType] = None, filename: Union[str, NoneType] = None, fields: Union[Dict[str, str], NoneType] = None, null: str = '', additional_macros: Dict[str, Union[str, Callable]] = {}, header: bool = True, delimiter: str = ',', quote: str = '"', escape: str = '"', encoding: Union[str, NoneType] = None, use_file_flux_sensor: bool = True, **kwargs) -> 'DAG'
Source: flowetl/util.py

Create an ETL DAG that will load data from files, or a table within the database.

Parameters

  • dag_id: str

    Name of the dag

  • cdr_type: str

    Type of CDR data

  • start_date: datetime.datetime

    First date the dag should run for

  • extract_sql: str

    SQL template. May be an SQL string, or the name of a file in the dags folder. The SQL should output a table with fields matching the corresponding cdr type schema. Where the source data is missing a field, the field must be introduced using NULL:: as .

  • end_date: typing.Union[datetime.datetime, NoneType], default None

    Optionally specify the final day the day should run on

  • retries: int, default 10

    Number of times to retry the dag if it fails

  • retry_delay: datetime.timedelta, default 1 day, 0:00:00

    Delay between retries

  • schedule_interval: typing.Union[str, pendulum.interval.Interval], default @daily

    Time interval between execution dates.

  • indexes: typing.Iterable[str], default ('msisdn_counterpart', 'location_id', 'datetime', 'tac')

    Fields to create indexes on.

  • data_present_poke_interval: int, default 60

    Number of seconds to wait between runs for the data present check

  • data_present_timeout: int, default 604800

    Maximum number of seconds to keep checking before failing

  • flux_check_poke_interval: int, default 60

    Number of seconds to wait between runs for the data in flux check

  • flux_check_wait_interval: int, default 60

    Number of seconds to monitor data when checking for flux

  • flux_check_timeout: int, default 604800

    Maximum number of seconds to keep checking before failing

  • source_table: typing.Union[str, NoneType], default None

    If extracting from a table within the database (e.g. when using a FDW to connect to another db), the schema qualified name of the table.

  • staging_view_sql: typing.Union[str, NoneType], default None

    If extracting from a table within the database (e.g. when using a FDW to connect to another db), the sql template or name of the template which will be used to create a date limited view of the data.

  • cluster_field: typing.Union[str, NoneType], default None

    Optionally require that the data tables be 'clustered' on a field, which improves the performance of queries which need to subset based on that field at the cost of a significant increase in ETL time.

  • program: typing.Union[str, NoneType], default None

    When loading data from files, set to the name of a program to be used when reading them (e.g. zcat to load from compressed csv files).

  • filename: typing.Union[str, NoneType], default None

    When loading data from files, the filename pattern to be used - may include Airflow macros.

  • fields: typing.Union[typing.Dict[str, str], NoneType], default None

    When loading data from files, a mapping of field names to postgres types.

  • null: str, default ````

    When loading data from files, optionally specify a null value character

  • additional_macros: typing.Dict[str, typing.Union[str, typing.Callable]], default {}

    Optionally provide additional macros to be available in SQL templates.

  • header: bool, default True

    Set to False when loading files if the files do not have a header row.

  • delimiter: str, default ,

    When loading from files, you may specify the delimiter character

  • quote: str, default "

    When loading from files, you may specify the quote character

  • escape: str, default "

    When loading from files, you may specify the escape character

  • encoding: typing.Union[str, NoneType], default None

    Optionally specify file encoding when loading from files.

  • use_file_flux_sensor: bool, default True

    When set to True, uses a check on the last modification time of the file to determine whether the file is in flux. Set to False to perform a slower check based on the number of rows in the mounted table.

Returns

  • DAG

get_qa_checks

get_qa_checks(*, dag: Union[ForwardRef('DAG'), NoneType] = None) -> List[ForwardRef('QACheckOperator')]
Source: flowetl/util.py

Create from .sql files a list of QACheckOperators which are applicable for this dag. Adds all the 'default' checks from this package (see the qa_checks module), and any found under /qa_checks or additionally added template search paths. CDR type-specific QA checks found under qa_checks/ will also be added if they match the CDR type set for the dag.

Parameters

  • dag: typing.Union[ForwardRef('DAG'), NoneType], default None

    The DAG to add operators to. May be None, if called within a DAG context manager.

Returns

  • typing.List[ForwardRef('QACheckOperator')]