Configure orchestrated scans
Last modified on 29-Mar-23
Integrate Soda Core with a data orchestration tool such as, Airflow, to automate and schedule your search for “bad” data.
Configure actions that the orchestration tool can take based on scan output. For example, if the output of a scan reveals a large number of failed tests, the orchestration tool can automatically block “bad” data from contaminating your data pipeline.
Apache Airflow using BashOperator
Apache Airflow using PythonOperator
Example DAG
Soda Core and Prefect
Apache Airflow using BashOperator
Access a guide published by Astronomer for setting up and using Soda Core with Airflow.
Apache Airflow using PythonOperator
class SodaScanOperator(PythonOperator):
def __init__(self,
task_id: str,
dag: DAG,
data_sources: list,
soda_cl_path: str,
variables: dict = None,
airflow_variables: list = None,
airflow_variables_json: list = None,
soda_cloud_api_key: Optional[str] = None,
soda_cloud_api_key_var_name: Optional[str] = None):
if variables is None:
variables = {}
if isinstance(airflow_variables, list):
for airflow_variable in airflow_variables:
variables[airflow_variable] = Variable.get(airflow_variable)
if isinstance(airflow_variables_json, list):
for airflow_variable in airflow_variables_json:
variables[airflow_variable] = Variable.get(airflow_variable, deserialize_json=True)
if not soda_cloud_api_key and soda_cloud_api_key_var_name:
soda_cloud_api_key = Variable.get(soda_cloud_api_key_var_name)
super().__init__(
task_id=task_id,
python_callable=SodaAirflow.scan,
op_kwargs={
'scan_name': f'{dag.dag_id}.{task_id}',
'data_sources': data_sources,
'soda_cl_path': soda_cl_path,
'variables': variables,
'soda_cloud_api_key': soda_cloud_api_key
},
dag=dag
)
Also, configure the following.
class SodaAirflow:
@staticmethod
def scan(datasource_name,
data_sources: list,
soda_cl_path: str,
schedule_name: Optional[str] = None,
variables: dict = None,
soda_cloud_api_key: str = None):
scan = Scan()
scan.set_data_source_name('')
if data_sources:
for data_source_details in data_sources:
data_source_properties = data_source_details.copy()
data_source_name = data_source_properties.pop('data_source_name')
airflow_conn_id = data_source_properties.pop('airflow_conn_id')
connection = Variable.get(f'conn.{airflow_conn_id}')
scan.add_environment_provided_data_source_connection(
connection=connection,
data_source_name=data_source_name,
data_source_properties=data_source_properties
)
scan.add_sodacl_yaml_files(soda_cl_path)
scan.add_variables(variables)
scan.add_soda_cloud_api_key(soda_cloud_api_key)
scan.execute()
scan.assert_no_error_logs()
scan.assert_no_checks_fail()
Example DAG
from airflow import DAG
from airflow.models.variable import Variable
from airflow.operators.python import PythonVirtualenvOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import os
from airflow.exceptions import AirflowFailException
default_args = {
'owner': 'soda_core',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def run_soda_scan():
from soda.scan import Scan
print("Running Soda Scan ...")
config_file = "/Users/path-to-your-config-file/configuration.yml"
checks_file = "/Users/path-to-your-checks-file/checks.yml"
data_source = "srcdb"
scan = Scan()
scan.set_verbose()
scan.add_configuration_yaml_file(config_file)
scan.set_data_source_name(data_source)
scan.add_sodacl_yaml_files(checks_file)
scan.execute()
print(scan.get_logs_text())
if scan.has_check_fails():
raise ValueError(f"Soda Scan failed with errors!")
else:
print("Soda scan successful")
return 0
dag = DAG(
'soda_core_python_venv_op',
default_args=default_args,
description='A simple Soda Core scan DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
)
ingest_data_op = DummyOperator(
task_id='ingest_data'
)
soda_core_scan_op = PythonVirtualenvOperator(
task_id='soda_core_scan_demodata',
python_callable=run_soda_scan,
requirements=["soda-core-postgres==3.0.0b9"],
system_site_packages=False,
dag=dag
)
publish_data_op = DummyOperator(
task_id='publish_data'
)
ingest_data_op >> soda_core_scan_op >> publish_data_op
Soda Core and Prefect
A contribution from our Soda Community, read the documentation for the Prefect 2.0 collection for Soda Core.
Go further
- Learn more about the Metrics and checks you can use to check for data quality.
- Learn how to Connect to Soda Cloud.
- Learn how to prepare programmatic scans of your data.
- Need help? Join the Soda community on Slack.
Was this documentation helpful?
What could we do to improve this page?
- Suggest a docs change in GitHub.
- Share feedback in the Soda community on Slack.
Last modified on 29-Mar-23