Configure orchestrated scans
Integrate Soda Library with a data orchestration tool to automate and schedule your search for "bad" data.
About Soda and Airflow
Airflow using PythonOperator
class SodaScanOperator(PythonOperator):
def __init__(self,
task_id: str,
dag: DAG,
data_sources: list,
soda_cl_path: str,
variables: dict = None,
airflow_variables: list = None,
airflow_variables_json: list = None,
soda_cloud_api_key: Optional[str] = None,
soda_cloud_api_key_var_name: Optional[str] = None):
if variables is None:
variables = {}
if isinstance(airflow_variables, list):
for airflow_variable in airflow_variables:
variables[airflow_variable] = Variable.get(airflow_variable)
if isinstance(airflow_variables_json, list):
for airflow_variable in airflow_variables_json:
variables[airflow_variable] = Variable.get(airflow_variable, deserialize_json=True)
if not soda_cloud_api_key and soda_cloud_api_key_var_name:
soda_cloud_api_key = Variable.get(soda_cloud_api_key_var_name)
super().__init__(
task_id=task_id,
python_callable=SodaAirflow.scan,
op_kwargs={
'scan_name': f'{dag.dag_id}.{task_id}',
'data_sources': data_sources,
'soda_cl_path': soda_cl_path,
'variables': variables,
'soda_cloud_api_key': soda_cloud_api_key
},
dag=dag
)Example DAG
Go further
Last updated
Was this helpful?
