Spark Dataframe
Access configuration details to connect Soda to a Spark Dataframe data source.
Soda supports Apache Spark as a scalable distributed SQL engine that can be used with in-memory data frames and existing Spark sessions.
Install the following package:
pip install -i https://pypi.dev.sodadata.io/simple -U soda-sparkdfFrom existing spark session
If you already have a running Spark session, you can initialize a Soda Spark DataFrame data source directly from it.
from pyspark.sql import SparkSession
from soda_core.contracts import verify_contracts_locally
from soda_sparkdf import SparkDataFrameDataSource
spark = (
SparkSession.builder.master("local[*]")
.appName("soda_sparkdf")
.getOrCreate()
)
# Create a database (schema) for organization
spark.sql("CREATE DATABASE IF NOT EXISTS my_schema")
spark.sql("USE my_schema")
# Create the DataFrame and save it as a table in the schema
df = spark.createDataFrame([(1,), (2,), (3,)], ["id"])
df.write.mode("overwrite").saveAsTable("my_table")
spark_data_source = SparkDataFrameDataSource.from_existing_session(
session=spark,
name="my_sparkdf"
)
result = verify_contracts_locally(
data_sources=[spark_data_source],
contract_file_paths=["./my_table.yaml"],
soda_cloud_file_path="../soda-cloud.yaml",
publish=True
)
if result.is_ok:
print("✅ Contract verification passed.")
else:
print("❌ Contract verification failed:")
print(result.get_errors_str())Learn more about Python API.
Example contract
Here’s a minimal example of a Soda contract that validates the my_table dataset in Spark:
dataset: my_sparkdf/my_schema/my_table
columns:
- name: id
data_type: integer
checks:
- missing:
checks:
- row_count:
threshold:
must_be: 3Example contract: Spark - Databricks
Below is an example of a contract scan that runs on a Databricks table with a Spark connector:
from soda_core.contracts import verify_contracts_locally
from soda_sparkdf import SparkDataFrameDataSource
from soda_core import configure_logging
# Enable or disable verbose logging
configure_logging(verbose=True)
# unity catalog tables are available in the spark session
# dataset DQN in the contract should include the full path to the table, e.g.,
# dataset: soda_databricks_example/unity_catalog/tyler/obs_test_data_seasoned
spark_data_source = SparkDataFrameDataSource.from_existing_session(
session=spark,
name="soda_databricks_example"
)
result = verify_contracts_locally(
data_sources=[spark_data_source],
contract_file_paths=["obs_test_data_seasoned.yml"],
soda_cloud_file_path="sc.yml",
publish=False
)
if result.is_ok:
print("✅ Contract verification passed.")
else:
print("❌ Contract verification failed:")
print(result.get_errors_str())Troubleshoot
Problem: In Databricks Notebook, running
from soda_sparkdf import SparkDataFrameDataSourceresults in error
ImportError: cannot import name 'sql' from 'databricks' (/databricks/spark/python/databricks/__init__.py)Solution: Run these in your Databricks Notebook:
!pip install databricks-sql-connector
dbutils.library.restartPython() # for pip installation to take effectLast updated
Was this helpful?
