Spark Dataframe

Access configuration details to connect Soda to a Spark Dataframe data source.

Soda supports Apache Spark as a scalable distributed SQL engine that can be used with in-memory data frames and existing Spark sessions.

Install the following package:

pip install -i https://pypi.dev.sodadata.io/simple -U soda-sparkdf

From existing spark session

If you already have a running Spark session, you can initialize a Soda Spark DataFrame data source directly from it.

from pyspark.sql import SparkSession
from soda_core.contracts import verify_contracts_locally
from soda_sparkdf import SparkDataFrameDataSource

spark = (
    SparkSession.builder.master("local[*]")
    .appName("soda_sparkdf")
    .getOrCreate()
)

# Create a database (schema) for organization
spark.sql("CREATE DATABASE IF NOT EXISTS my_schema")
spark.sql("USE my_schema")

# Create the DataFrame and save it as a table in the schema
df = spark.createDataFrame([(1,), (2,), (3,)], ["id"])
df.write.mode("overwrite").saveAsTable("my_table")

spark_data_source = SparkDataFrameDataSource.from_existing_session(
    session=spark,
    name="my_sparkdf"
)

result = verify_contracts_locally(
    data_sources=[spark_data_source],
    contract_file_paths=["./my_table.yaml"],
    soda_cloud_file_path="../soda-cloud.yaml",
    publish=True
)

if result.is_ok:
    print("✅ Contract verification passed.")
else:
    print("❌ Contract verification failed:")
    print(result.get_errors_str())

Learn more about Python API.

Example contract

Here’s a minimal example of a Soda contract that validates the my_table dataset in Spark:

dataset: my_sparkdf/my_schema/my_table
columns:
  - name: id
    data_type: integer
    checks:
      - missing:
checks:
  - row_count:
      threshold:
        must_be: 3

Example contract: Spark - Databricks

Below is an example of a contract scan that runs on a Databricks table with a Spark connector:

from soda_core.contracts import verify_contracts_locally
from soda_sparkdf import SparkDataFrameDataSource
from soda_core import configure_logging

# Enable or disable verbose logging
configure_logging(verbose=True)

# unity catalog tables are available in the spark session
# dataset DQN in the contract should include the full path to the table, e.g.,
# dataset: soda_databricks_example/unity_catalog/tyler/obs_test_data_seasoned
spark_data_source = SparkDataFrameDataSource.from_existing_session(
    session=spark,
    name="soda_databricks_example"
)

result = verify_contracts_locally(
    data_sources=[spark_data_source],
    contract_file_paths=["obs_test_data_seasoned.yml"],
    soda_cloud_file_path="sc.yml",
    publish=False
)

if result.is_ok:
    print("✅ Contract verification passed.")
else:
    print("❌ Contract verification failed:")
    print(result.get_errors_str())

Troubleshoot

Problem: In Databricks Notebook, running

from soda_sparkdf import SparkDataFrameDataSource

results in error

ImportError: cannot import name 'sql' from 'databricks' (/databricks/spark/python/databricks/__init__.py)

Solution: Run these in your Databricks Notebook:

!pip install databricks-sql-connector
dbutils.library.restartPython()  # for pip installation to take effect

You are not logged in to Soda and are viewing the default public documentation. Learn more about Documentation access & licensing.

Last updated

Was this helpful?