Configure orchestrated scans
Integrate Soda Core with a data orchestration tool such as, Airflow, to automate and schedule your search for “bad” data.
Configure actions that the orchestration tool can take based on scan output. For example, if the output of a scan reveals a large number of failed tests, the orchestration tool can automatically block “bad” data from contaminating your data pipeline.
Apache Airflow using PythonOperator
class SodaScanOperator(PythonOperator):
def __init__(self,
task_id: str,
dag: DAG,
data_sources: list,
soda_cl_path: str,
variables: dict = None,
airflow_variables: list = None,
airflow_variables_json: list = None,
soda_cloud_api_key: Optional[str] = None,
soda_cloud_api_key_var_name: Optional[str] = None):
if variables is None:
variables = {}
if isinstance(airflow_variables, list):
for airflow_variable in airflow_variables:
variables[airflow_variable] = Variable.get(airflow_variable)
if isinstance(airflow_variables_json, list):
for airflow_variable in airflow_variables_json:
variables[airflow_variable] = Variable.get(airflow_variable, deserialize_json=True)
if not soda_cloud_api_key and soda_cloud_api_key_var_name:
soda_cloud_api_key = Variable.get(soda_cloud_api_key_var_name)
super().__init__(
task_id=task_id,
python_callable=SodaAirflow.scan,
op_kwargs={
'scan_name': f'{dag.dag_id}.{task_id}',
'data_sources': data_sources,
'soda_cl_path': soda_cl_path,
'variables': variables,
'soda_cloud_api_key': soda_cloud_api_key
},
dag=dag
)
Also, configure the following.
class SodaAirflow:
@staticmethod
def scan(datasource_name,
data_sources: list,
soda_cl_path: str,
schedule_name: Optional[str] = None,
variables: dict = None,
soda_cloud_api_key: str = None):
scan = Scan()
scan.set_data_source_name('')
if data_sources:
for data_source_details in data_sources:
data_source_properties = data_source_details.copy()
data_source_name = data_source_properties.pop('data_source_name')
airflow_conn_id = data_source_properties.pop('airflow_conn_id')
connection = Variable.get(f'conn.{airflow_conn_id}')
scan.add_environment_provided_data_source_connection(
connection=connection,
data_source_name=data_source_name,
data_source_properties=data_source_properties
)
scan.add_sodacl_yaml_files(soda_cl_path)
scan.add_variables(variables)
scan.add_soda_cloud_api_key(soda_cloud_api_key)
scan.execute()
scan.assert_no_error_logs()
scan.assert_no_checks_fail()
Go further
- Learn more about the Metrics and checks you can use to check for data quality.
- Learn how to Connect to Soda Cloud.
- Learn how to prepare programmatic scans of your data.
- Need help? Join the Soda community on Slack.
Last modified on 01-Jul-22
Was this documentation helpful?
Share feedback in the Soda community on Slack.
Help improve our docs!