Test data quality in an Airflow data pipeline

Follow this guide to set up and run scheduled Soda scans for data quality in your Airflow data pipeline.

Use this guide as an example for how to set up and use Soda to test the quality of your data in an Airflow pipeline. Automatically catch data quality issues after ingestion or transformation to prevent negative downstream impact.

Not quite ready for this big gulp of Soda? 🥤Try taking a sip, first.

About this guide

The instructions below offer Data Engineers an example of how to execute SodaCL checks for data quality on data in an Apache Airflow pipeline.

For context, this guide presents an example of a Data Engineer at a small firm who was tasked with building a simple products report of sales by category for AdventureWorks data. This Engineer uses dbt to build a simple model transformation to gather data, then builds more models to transform and push gathered information to a reporting and visualization tool. The Engineer uses Airflow for scheduling and monitoring workflows, including data ingestion and transformation events.

The Engineer's goal in this example is to make sure that after such events, and before pushing information into a reporting tool, they run scans to check the quality of the data. Where the scan results indicate an issue with data quality, Soda notifies the Engineer so that they can potentially stop the pipeline and investigate and address any issues before the issue causes problems in the report.

Access the sodadata/sip-of-soda/test-data-in-pipeline folder to review the dbt models and Soda checks files that the Data Engineer uses.

Borrow from this guide to connect to your own data source, set up scan points in your pipeline, and execute your own relevant tests for data quality.

Install Soda from the command-line

With Python 3.8, 3.9, or 3.10 installed, the Engineer creates a virtual environment in Terminal, then installs the Soda package for PostgreSQL using the following command.

Refer to complete install instructions for all supported data sources, if you wish.

Connect Soda to a data source and Soda Cloud account

To connect to a data source such as Snowflake, PostgreSQL, Amazon Athena, or BigQuery, you use a configuration.yml file which stores access details for your data source.

This guide also includes instructions for how to connect to a Soda Cloud account using API keys that you create and add to the same configuration.yml file. Available for free as a 45-day trial, a Soda Cloud account gives you access to visualized scan results, tracks trends in data quality over time, enables you to set alert notifications, and much more.

  1. In the directory in which they work with their dbt models, the Data Engineer creates a soda directory to contain the Soda configuration and check YAML files.

  2. In the new directory, they create a new file called configuration.yml.

  3. In the configuration.yml file, they add the data source connection configuration for the PostgreSQL data source that contains the AdventureWorks data. The example below is the connection configuration for a PostgreSQL data source. Access the example file. See a complete list of supported data sources.

  1. In a browser, they navigate to cloud.soda.io/signup to create a free, 45-day trial Soda account.

  2. They navigate to avatar > Profile, then navigate to the API Keys tab and click the plus icon to generate new API keys.

  • They copy the syntax for the soda_cloud configuration, including the values API Key ID and API Key Secret, and paste it into the configuration.yml.

  • They are careful not to nest the soda_cloud configuration in the data_source configuration.

  1. They save the configuration.yml file and close the API modal in the Soda account.

  2. In Terminal, they run the following command to test Soda's connection to the data source.

Write checks for data quality

A check is a test that Soda executes when it scans a dataset in your data source. The checks.yml file stores the checks you write using the Soda Checks Language (SodaCL). You can create multiple checks.yml files to organize your data quality checks and run all, or some of them, at scan time.

In this example, the Data Engineer creates multiple checks after ingestion, after initial transformation, and before pushing the information to a visualization or reporting tool.

Transform checks

After building a simple dbt model transformation that creates a new fact table which gathers data about products, product categories, and subcategories (see dbt model), the Engineer realizes that some of the products in the dataset do not have an assigned category or subcategory, which means those values would erroneously be excluded from the report.

To mitigate the issue and get a warning when these values are missing, they create a new checks YAML file and write the following checks to execute after the transformation produces the fact_product_category dataset.

fact_product_category.yml

Ingest checks

Because the Engineer does not have the ability or access to fix upstream data themselves, they create another checks YAML file write checks to apply to each dataset they use in the transformation, after the data is ingested, but before it is transformed.

For any checks that fail, the Engineer can notify upstream Data Engineers or Data Product Owners to address the issue of missing categories and subcategories.

dim_product.yml

dim_product_category.yml

dim_product_subcategory.yml

fact_internet_sales.yml

Reports checks

Finally, the Engineer builds category and subcategory sales report models sales report models using dbt.

The checks files they create to run on the new transform models contain similiar user-defined checks. Ultimately, the Engineer wants data quality checks to fail if the sales of uncategorized products rises above normal (0.85%), and if the sum of sales orders in the model that prepares the report differs greatly from the sum of raw sales order number.

report_category_sales.yml

report_subcategory_sales.yml

Create a DAG and run the workflow

The Engineer creates an Airflow DAG and runs the workflow locally. Note that the value for scan-name must be unique to every programmatic scan you define. In other words, it cannot be the same as a programmatic scan in another pipeline. Access the DAG in the repo.

Run Soda scans manually

Without using an Airflow DAG, the Engineer can use Soda locally to run scans for data quality using the checks YAML files they created.

  1. They use the soda scan command to run the ingest checks on the raw data, pointing Soda to the checks YAML files in the ingest-checks folder.

  1. If the ingest check results pass, they run dbt to create the new fact_product_category dataset.

  1. Accordingly, they run a scan on the new dataset, pointing Soda to the checks YAML file in the transform-checks folder.

  1. If the transform check results pass, they run dbt to create the reports.

  1. Lastely, they run a scan on the reports data, pointing Soda to the checks YAML file in the reports-checks folder.

  1. If the reports check results pass, the data is reliable enough to push to the reporting or visualization tool for consumers.

Learn more about running Soda scans.

View results and tag datasets

  1. In their Soda Cloud account, the Engineer clicks Checks to access the Checks dashboard. The checks from the scan that Soda performed during the scan appear in the table where they can click each line item to learn more about the results, as in the example below.

  2. To more easily retrieve Soda scan results by dbt model, the Engineer navigates to Datasets, then clicks the stacked dots at the right of the dim_product dataset and selects Edit Dataset.

  3. In the Tags field, they add a value for fact_product_category, the dbt model that uses this dataset, and a tag to indicate the kind of data that Soda is scanning, raw, transformed or reporting, then saves. They repeat these steps to add tags to all the datasets in their Soda Cloud account.

  4. Navigating again to the Datasets page, they use the filters to display datasets according to Tags and Arrival Time to narrow the search for the most recent quality checks associated with their models which have failed or warned.

  5. After filtering the datasets according to the tags, the Engineer saves the filter setup as a Collection that they can revisit daily.

  6. If you were in the Data Engineer's shoes, you may further wish to set up [Slack notifications]() for any checks that warn or fail during scans.

✨Hey, hey!✨ Now you know what it's like to add data quality checks to your production data pipeline. Huzzah!

Go further

Join the Soda community on Slack.

Last updated

Was this helpful?