Configure orchestrated scans
Last modified on 27-Sep-23
Migrate to Soda Library to connect to Soda Cloud and access all the newest Soda features.
Integrate Soda Library with a data orchestration tool such as, Airflow, to automate and schedule your search for bad-quality data.
🎥 Consider following a 30-minute Astronomer tutorial for Data Quality Checks with Airflow, Snowflake and Soda.
Configure actions that the orchestration tool can take based on scan output. For example, if the output of a scan reveals a large number of failed tests, the orchestration tool can automatically block “bad” data from contaminating your data pipeline.
Consider following the Test data in a pipeline guide for specific details about embedding Soda tests in an Airflow pipeline.
Apache Airflow using PythonOperator
Example DAG
Apache Airflow using PythonOperator
class SodaScanOperator(PythonOperator):
def __init__(self,
task_id: str,
dag: DAG,
data_sources: list,
soda_cl_path: str,
variables: dict = None,
airflow_variables: list = None,
airflow_variables_json: list = None,
soda_cloud_api_key: Optional[str] = None,
soda_cloud_api_key_var_name: Optional[str] = None):
if variables is None:
variables = {}
if isinstance(airflow_variables, list):
for airflow_variable in airflow_variables:
variables[airflow_variable] = Variable.get(airflow_variable)
if isinstance(airflow_variables_json, list):
for airflow_variable in airflow_variables_json:
variables[airflow_variable] = Variable.get(airflow_variable, deserialize_json=True)
if not soda_cloud_api_key and soda_cloud_api_key_var_name:
soda_cloud_api_key = Variable.get(soda_cloud_api_key_var_name)
super().__init__(
task_id=task_id,
python_callable=SodaAirflow.scan,
op_kwargs={
'scan_name': f'{dag.dag_id}.{task_id}',
'data_sources': data_sources,
'soda_cl_path': soda_cl_path,
'variables': variables,
'soda_cloud_api_key': soda_cloud_api_key
},
dag=dag
)
Also, configure the following.
class SodaAirflow:
@staticmethod
def scan(datasource_name,
data_sources: list,
soda_cl_path: str,
schedule_name: Optional[str] = None,
variables: dict = None,
soda_cloud_api_key: str = None):
scan = Scan()
scan.set_data_source_name('')
if data_sources:
for data_source_details in data_sources:
data_source_properties = data_source_details.copy()
data_source_name = data_source_properties.pop('data_source_name')
airflow_conn_id = data_source_properties.pop('airflow_conn_id')
connection = Variable.get(f'conn.{airflow_conn_id}')
scan.add_environment_provided_data_source_connection(
connection=connection,
data_source_name=data_source_name,
data_source_properties=data_source_properties
)
scan.add_sodacl_yaml_files(soda_cl_path)
scan.add_variables(variables)
scan.add_soda_cloud_api_key(soda_cloud_api_key)
scan.execute()
scan.assert_no_error_logs()
scan.assert_no_checks_fail()
Example DAG
from airflow import DAG
from airflow.models.variable import Variable
from airflow.operators.python import PythonVirtualenvOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import os
from airflow.exceptions import AirflowFailException
default_args = {
'owner': 'soda_core',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def run_soda_scan():
from soda.scan import Scan
print("Running Soda Scan ...")
config_file = "/Users/path-to-your-config-file/configuration.yml"
checks_file = "/Users/path-to-your-checks-file/checks.yml"
data_source = "srcdb"
scan = Scan()
scan.set_verbose()
scan.add_configuration_yaml_file(config_file)
scan.set_data_source_name(data_source)
scan.add_sodacl_yaml_files(checks_file)
scan.execute()
print(scan.get_logs_text())
if scan.has_check_fails():
raise ValueError(f"Soda Scan failed with errors!")
else:
print("Soda scan successful")
return 0
dag = DAG(
'soda_core_python_venv_op',
default_args=default_args,
description='A simple Soda Library scan DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
)
ingest_data_op = DummyOperator(
task_id='ingest_data'
)
soda_core_scan_op = PythonVirtualenvOperator(
task_id='soda_core_scan_demodata',
python_callable=run_soda_scan,
requirements=["-i https://pypi.cloud.soda.io", "soda-postgres"],
system_site_packages=False,
dag=dag
)
publish_data_op = DummyOperator(
task_id='publish_data'
)
ingest_data_op >> soda_core_scan_op >> publish_data_op
Go further
- Learn more about the Metrics and checks you can use to check for data quality.
- Learn how to prepare programmatic scans of your data.
- Need help? Join the Soda community on Slack.
Was this documentation helpful?
What could we do to improve this page?
- Suggest a docs change in GitHub.
- Share feedback in the Soda community on Slack.
Documentation always applies to the latest version of Soda products
Last modified on 27-Sep-23