Provero - Technical Architecture¶
provero /ˈæseɪ/, the testing of a substance to determine its quality or purity.
A vendor-neutral, declarative data quality engine with built-in anomaly detection. Works standalone, as an Airflow provider, or with any orchestrator. A spiritual successor to Apache Griffin, learning from its mistakes.
Note: This document describes the target architecture, including planned features not yet implemented. For currently available features, see the README. Unimplemented components include: server mode (FastAPI), streaming engine (Kafka), dedicated cloud connectors (Snowflake, BigQuery, Redshift), email alerts, watch mode, and Prophet-based anomaly detection.
Design Principles¶
- Simple by default, powerful when needed. A quality check in 3 lines of YAML. GX needs 50+ lines for the same result.
- Portable rules. Rules defined once, executed anywhere. Introduces the Provero Quality Language (AQL), an open standard.
- Anomaly detection built-in. No need for a $100k/year SaaS. Statistical detection runs locally, with no external dependency.
- Orchestrator-agnostic. Works as a CLI, as a Python library, as an Airflow provider, as a sidecar in any pipeline.
- Streaming + Batch. First open source framework to treat streaming as a first-class citizen.
- Data contracts first. Producers declare, consumers verify, Provero enforces.
High-Level Architecture¶
┌─────────────────────────────────────────────────────────────────────────┐
│ USER LAYER │
│ │
│ provero.yaml Python SDK CLI REST API │
│ (declarative) (programmatic) (provero-ctl) (server mode) │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ source: pg │ │ @check │ │ provero run│ │ POST /scan │ │
│ │ checks: │ │ @contract │ │ provero... │ │ GET /report│ │
│ │ - not_null │ │ @monitor │ │ provero... │ │ GET /health│ │
│ │ - unique │ │ │ │ provero... │ │ │ │
│ └──────────────┘ └──────────────┘ └────────────┘ └────────────┘ │
└──────────┬──────────────────┬──────────────────┬──────────────┬─────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────────┐
│ PROVERO CORE ENGINE │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌────────────────────────┐ │
│ │ Rule Compiler │ │ Check Engine │ │ Anomaly Detector │ │
│ │ │ │ │ │ │ │
│ │ AQL → plan │ │ Runs checks │ │ Statistical models │ │
│ │ Validates │ │ against data │ │ over historical │ │
│ │ Optimizes scan │ │ Returns verdict│ │ check results │ │
│ └────────┬────────┘ └────────┬────────┘ └───────────┬────────────┘ │
│ │ │ │ │
│ ┌────────┴────────────────────┴───────────────────────┴─────────────┐ │
│ │ Result Store │ │
│ │ Check results, metrics, anomalies, trends (time-series) │ │
│ │ Backends: SQLite (local) | PostgreSQL (server) | S3 (archive) │ │
│ └───────────────────────────┬───────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────┴────────────────────────────────────────┐ │
│ │ Contract Registry │ │
│ │ Data contracts: schema + quality SLAs + ownership │ │
│ │ Versioned, git-friendly, publishable │ │
│ └────────────────────────────────────────────────────────────────────┘ │
└──────────────────────────────┬──────────────────────────────────────────┘
│
┌────────────────┼────────────────┐
▼ ▼ ▼
┌──────────────────┐ ┌─────────────────┐ ┌──────────────────────┐
│ DATA CONNECTORS │ │ ORCHESTRATION │ │ OUTPUT / ACTIONS │
│ │ │ INTEGRATIONS │ │ │
│ ┌──────────────┐ │ │ ┌─────────────┐ │ │ ┌──────────────────┐ │
│ │ SQL │ │ │ │ Airflow │ │ │ │ Reports (HTML, │ │
│ │ (Postgres, │ │ │ │ Provider │ │ │ │ JSON, Markdown) │ │
│ │ MySQL, │ │ │ ├─────────────┤ │ │ ├──────────────────┤ │
│ │ Snowflake, │ │ │ │ Standalone │ │ │ │ Alerts (Slack, │ │
│ │ BigQuery, │ │ │ │ (cron/CLI) │ │ │ │ PagerDuty, email,│ │
│ │ Redshift, │ │ │ ├─────────────┤ │ │ │ webhook) │ │
│ │ DuckDB) │ │ │ │ Flyte │ │ │ ├──────────────────┤ │
│ ├──────────────┤ │ │ │ (plugin) │ │ │ │ OpenLineage │ │
│ │ DataFrame │ │ │ ├─────────────┤ │ │ │ (lineage events) │ │
│ │ (Pandas, │ │ │ │ Dagster, │ │ │ ├──────────────────┤ │
│ │ Polars, │ │ │ │ Prefect │ │ │ │ Block/Quarantine │ │
│ │ Spark) │ │ │ │ (future) │ │ │ │ (halt pipeline │ │
│ ├──────────────┤ │ │ └─────────────┘ │ │ │ on failure) │ │
│ │ Files │ │ │ │ │ ├──────────────────┤ │
│ │ (Parquet, │ │ │ │ │ │ OpenTelemetry │ │
│ │ CSV, JSON, │ │ │ │ │ │ (metrics export) │ │
│ │ Avro, Delta,│ │ │ │ │ └──────────────────┘ │
│ │ Iceberg) │ │ │ │ │ │
│ ├──────────────┤ │ │ │ │ │
│ │ Streaming │ │ │ │ │ │
│ │ (Kafka, │ │ │ │ │ │
│ │ Kinesis, │ │ │ │ │ │
│ │ Pulsar) │ │ │ │ │ │
│ ├──────────────┤ │ │ │ │ │
│ │ APIs │ │ │ │ │ │
│ │ (REST, gRPC, │ │ │ │ │ │
│ │ GraphQL) │ │ │ │ │ │
│ └──────────────┘ │ │ │ │ │
└──────────────────┘ └─────────────────┘ └──────────────────────┘
Provero Quality Language (AQL)¶
The main differentiator. An open standard for defining quality rules that works in any tool, the way SQL works in any database.
Basic syntax¶
# provero.yaml, the simplest possible form
source:
type: postgres
connection: ${POSTGRES_URI} # environment variable or Airflow connection
table: orders
checks:
- not_null: [order_id, customer_id, amount]
- unique: order_id
- accepted_values:
column: status
values: [pending, shipped, delivered, cancelled]
- range:
column: amount
min: 0
max: 1000000
- freshness:
column: created_at
max_age: 24h
- row_count:
min: 1000
- custom_sql: |
SELECT COUNT(*) = 0
FROM orders
WHERE amount < 0 AND status = 'delivered'
3 lines for the most common case. No boilerplate, no classes, no configs.
Full example with every feature¶
# provero.yaml, advanced example
version: "1.0"
# Reusable data sources
sources:
warehouse:
type: snowflake
connection: ${SNOWFLAKE_URI}
lake:
type: s3
bucket: data-lake-prod
format: parquet
stream:
type: kafka
bootstrap_servers: ${KAFKA_BROKERS}
topic: events.transactions
# ──────────────────────────────────────────
# Data Contracts (producer declares, consumer verifies)
# ──────────────────────────────────────────
contracts:
- name: transactions_contract
owner: payments-team
source: warehouse
table: transactions
version: "2.1"
sla:
freshness: 1h
completeness: 99.5% # max 0.5% nulls in required fields
availability: 99.9% # uptime of the data source
schema:
columns:
- name: tx_id
type: string
checks: [not_null, unique]
- name: amount
type: decimal(10,2)
checks:
- not_null
- range: {min: 0.01}
- name: currency
type: string
checks:
- accepted_values: [USD, EUR, GBP, BRL, JPY]
- name: customer_id
type: string
checks: [not_null]
- name: created_at
type: timestamp
checks:
- not_null
- freshness: {max_age: 1h}
- name: status
type: string
checks:
- accepted_values: [pending, completed, failed, refunded]
on_violation: block # block | warn | quarantine
# ──────────────────────────────────────────
# Check Suites (logical grouping)
# ──────────────────────────────────────────
suites:
- name: transactions_daily
description: "Daily quality checks on transactions table"
source: warehouse
table: transactions
schedule: "0 6 * * *" # 6am UTC (standalone mode)
tags: [payments, daily, critical]
checks:
# ── Completeness ──
- not_null: [tx_id, amount, customer_id, created_at]
- completeness:
column: email
min: 0.95 # at least 95% non-null
# ── Uniqueness ──
- unique: tx_id
- unique_combination: [customer_id, created_at, amount]
# ── Validity ──
- accepted_values:
column: status
values: [pending, completed, failed, refunded]
- regex:
column: email
pattern: "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$"
- range:
column: amount
min: 0.01
max: 999999.99
# ── Freshness ──
- freshness:
column: created_at
max_age: 2h
# ── Volume ──
- row_count:
min: 10000
max: 10000000
- row_count_change:
max_decrease: 20% # alert if count drops >20% vs yesterday
max_increase: 300% # alert if count spikes >300%
# ── Consistency ──
- referential_integrity:
column: customer_id
reference:
source: warehouse
table: customers
column: id
- cross_source:
description: "Row count matches between warehouse and lake"
left:
source: warehouse
query: "SELECT COUNT(*) FROM transactions WHERE date = CURRENT_DATE"
right:
source: lake
path: "transactions/date={{ today }}/"
query: "SELECT COUNT(*) FROM read_parquet('*.parquet')"
check: "abs(left - right) / left < 0.01" # <1% difference
# ── Statistical / Anomaly ──
- distribution:
column: amount
method: ks_test # Kolmogorov-Smirnov
reference: last_30d # compare against last 30 days
significance: 0.05
- anomaly:
metric: row_count
method: prophet # prophet | zscore | mad | iqr
sensitivity: medium # low | medium | high
- anomaly:
metric: null_rate
column: email
method: zscore
threshold: 3.0
# ── Custom SQL ──
- custom_sql:
name: "no_negative_completed"
query: |
SELECT COUNT(*) = 0
FROM transactions
WHERE amount < 0 AND status = 'completed'
- custom_sql:
name: "daily_revenue_sanity"
query: |
SELECT SUM(amount) BETWEEN 100000 AND 50000000
FROM transactions
WHERE date = CURRENT_DATE AND status = 'completed'
# Actions when checks fail
on_failure:
- alert:
channels: [slack, pagerduty]
severity: critical
message: "Transaction quality checks failed: {{ failed_checks }}"
- block_downstream: true # prevent downstream DAGs from running
- quarantine: # move bad data to quarantine
target: warehouse.quarantine.transactions
filter: "{{ failing_rows_query }}"
# ── Streaming checks ──
- name: transactions_stream
source: stream
type: streaming # key difference: runs continuously
window: 5m # evaluate every 5 min window
checks:
- schema:
type: json_schema
ref: schemas/transaction_event.json
- throughput:
min: 100 # min 100 events per window
max: 50000
- latency:
field: event_time
max_delay: 30s # event_time vs processing_time
- anomaly:
metric: throughput
method: mad # Median Absolute Deviation
sensitivity: high
on_failure:
- alert:
channels: [pagerduty]
severity: critical
# ──────────────────────────────────────────
# Monitoring (continuous, post-deploy)
# ──────────────────────────────────────────
monitors:
- name: transactions_drift
source: warehouse
table: transactions
schedule: "0 */6 * * *" # every 6 hours
columns: [amount, currency, status]
methods:
- psi # Population Stability Index
- wasserstein # Earth Mover's Distance
reference: last_7d
threshold: 0.2
on_drift:
- alert: {channels: [slack]}
- trigger_pipeline: retrain_fraud_model # trigger Airflow DAG
# ──────────────────────────────────────────
# Report config
# ──────────────────────────────────────────
reporting:
formats: [html, json]
retention: 90d
publish_to: s3://data-quality-reports/
AQL as an open standard¶
AQL (Provero Quality Language) Specification:
Goal: become the "SQL of data quality"
Define once, run anywhere.
Core check types (universal):
┌─────────────────────┬────────────────────────────────────────┐
│ Category │ Checks │
├─────────────────────┼────────────────────────────────────────┤
│ Completeness │ not_null, completeness │
│ Uniqueness │ unique, unique_combination │
│ Validity │ accepted_values, range, regex, type │
│ Freshness │ freshness, latency │
│ Volume │ row_count, row_count_change, throughput│
│ Consistency │ referential_integrity, cross_source │
│ Distribution │ distribution, anomaly │
│ Custom │ custom_sql, custom_python │
└─────────────────────┴────────────────────────────────────────┘
The spec defines:
- YAML schema (JSON Schema published)
- Check semantics (what each check means, precisely)
- Result format (standardized output)
- Severity levels (info, warning, critical, blocker)
Other tools can implement AQL:
- Great Expectations could add an AQL importer
- Soda could add AQL compatibility
- dbt could compile AQL to dbt tests
- Cloud DQ tools could adopt AQL as input format
Core Components¶
1. Rule Compiler¶
Transforms AQL (YAML) into optimized execution plans.
provero.yaml
│
▼
┌──────────────────────────────┐
│ Rule Compiler │
│ │
│ 1. Parse YAML │
│ 2. Validate against schema │
│ 3. Resolve references │
│ (sources, variables) │
│ 4. Build dependency graph │
│ 5. Optimize execution: │
│ - Batch SQL checks into │
│ single query │
│ - Parallelize independent│
│ checks │
│ - Sample large tables │
│ when configured │
│ 6. Output: ExecutionPlan │
└──────────────────────────────┘
Key optimization: SQL check batching
Instead of:
SELECT COUNT(*) FROM t WHERE col IS NULL; -- check 1
SELECT COUNT(DISTINCT col) FROM t; -- check 2
SELECT MIN(col), MAX(col) FROM t; -- check 3
Compiled into:
SELECT
COUNT(*) FILTER (WHERE col IS NULL) as null_count,
COUNT(DISTINCT col) as distinct_count,
MIN(col) as min_val,
MAX(col) as max_val,
COUNT(*) as total_count
FROM t;
One query instead of three. Massive performance difference at scale.
2. Check Engine¶
Executes checks against the data and returns standardized results.
# Standardized result for every check
@dataclass
class CheckResult:
check_name: str
check_type: str # "not_null", "unique", "anomaly", etc.
status: Status # PASS | FAIL | WARN | ERROR | SKIP
severity: Severity # INFO | WARNING | CRITICAL | BLOCKER
# What was checked
source: str
table: str
column: str | None
# Result
observed_value: Any # what was found
expected_value: Any # what was expected
# Context
row_count: int # rows checked
failing_rows: int # rows that failed
failing_rows_sample: list # sample of bad rows (configurable)
failing_rows_query: str # SQL to reproduce
# Timing
started_at: datetime
duration_ms: int
# Metadata
tags: list[str]
suite: str
run_id: str
# Result of a full suite
@dataclass
class SuiteResult:
suite_name: str
status: Status # PASS if all passed, otherwise FAIL
checks: list[CheckResult]
total: int
passed: int
failed: int
warned: int
errored: int
started_at: datetime
duration_ms: int
# Quality score (0-100)
quality_score: float
3. Anomaly Detector¶
Built-in anomaly detection, no external SaaS dependency.
┌─────────────────────────────────────────────────────────────┐
│ Anomaly Detector │
│ │
│ Works over the history of check results. │
│ Does not need raw data, only the metrics. │
│ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Methods (built-in, no external deps) │ │
│ │ │ │
│ │ Z-Score Simple, fast. │ │
│ │ Good for normally-distributed │ │
│ │ metrics (row_count, latency) │ │
│ │ │ │
│ │ MAD Median Absolute Deviation. │ │
│ │ (default) Robust to outliers. Works well │ │
│ │ in most cases. │ │
│ │ │ │
│ │ IQR Interquartile Range. │ │
│ │ Good for skewed metrics. │ │
│ │ │ │
│ │ Prophet Facebook Prophet. │ │
│ │ (optional dep) For metrics with seasonality │ │
│ │ (weekday/weekend, holidays). │ │
│ │ │ │
│ │ DBSCAN Density-based clustering. │ │
│ │ For multivariate anomaly │ │
│ │ detection. │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
│ Input: metric history (result store) │
│ Output: anomaly score + is_anomaly boolean + explanation │
│ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Sensitivity presets (user-friendly) │ │
│ │ │ │
│ │ low: flags only extreme deviations (>4 sigma) │ │
│ │ medium: flags significant deviations (>3 sigma) │ │
│ │ high: flags moderate deviations (>2 sigma) │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
│ No GPU required, no cloud required, runs anywhere. │
└─────────────────────────────────────────────────────────────┘
4. Contract Registry¶
Versioned, publishable, auditable data contracts.
Concept:
PRODUCER (payments team) CONSUMER (analytics team)
│ │
│ publishes contract │ imports contract
▼ ▼
┌──────────────┐ ┌──────────────┐
│ transactions │ │ analytics │
│ _contract │ │ _pipeline │
│ v2.1 │ │ │
│ │ registry │ expects: │
│ schema: ... │◄──────────────► │ transactions│
│ sla: ... │ │ _contract │
│ checks: ... │ │ v2.x │
└──────────────┘ └──────────────┘
The contract defines:
- Schema (columns, types, nullable)
- Quality SLAs (freshness, completeness, availability)
- Owner (who is responsible)
- Version (semver, breaking changes = major bump)
- Checks (quality rules the producer guarantees)
Storage:
- Git (YAML files in the repo, versioned with the code)
- Registry Server (API to publish/query, server mode)
- Both (git as source of truth, server as cache)
Workflow:
1. Producer defines the contract in provero.yaml
2. CI verifies that the data satisfies the contract
3. Consumer declares a dependency on the contract
4. If the producer breaks the contract, the consumer is alerted
5. Breaking changes (major version) require consumer opt-in
5. Data Connectors¶
Pluggable architecture. Every connector implements a simple interface.
class Connector(Protocol):
"""Interface that every connector implements."""
def connect(self, config: dict) -> Connection:
"""Establish connection to the data source."""
...
def execute_checks(
self,
connection: Connection,
checks: list[CompiledCheck]
) -> list[CheckResult]:
"""Execute compiled checks against the source."""
...
def get_profile(
self,
connection: Connection,
table: str,
columns: list[str] | None = None,
sample_size: int | None = None
) -> DataProfile:
"""Generate a statistical profile of the data."""
...
def get_schema(
self,
connection: Connection,
table: str
) -> SchemaInfo:
"""Return the table schema."""
...
# Built-in connectors in the core:
# SQL (SQLAlchemy-based): Postgres, MySQL, SQLite, DuckDB
# Cloud SQL: Snowflake, BigQuery, Redshift, Databricks
# Files: Parquet, CSV, JSON, Avro (via DuckDB or Polars)
# DataFrame: Pandas, Polars, Spark
# Streaming: Kafka, Kinesis, Pulsar
# Plugin connectors (separate packages):
# provero-connector-mongodb
# provero-connector-elasticsearch
# provero-connector-dynamodb
# etc.
Airflow Integration¶
Airflow Provider Package: apache-airflow-providers-provero¶
# ── Operator: runs checks inline in the DAG ──
from airflow.decorators import dag, task
from provero.airflow import ProveroCheckOperator, ProveroSensor
@dag(schedule="@daily")
def etl_pipeline():
extract = ...
transform = ...
# Quality gate between transform and load
quality_check = ProveroCheckOperator(
task_id="quality_check",
# Option 1: reference a provero.yaml file
provero_file="checks/transactions.yaml",
# Option 2: inline checks
# source={"type": "postgres", "conn_id": "warehouse"},
# table="staging.transactions",
# checks=[
# {"not_null": ["id", "amount"]},
# {"row_count": {"min": 1000}},
# ],
fail_on=["critical", "blocker"], # which severities block
)
load = ...
extract >> transform >> quality_check >> load
# ── Sensor: wait for data to reach a minimum quality ──
wait_for_quality = ProveroSensor(
task_id="wait_for_quality",
source={"type": "postgres", "conn_id": "warehouse"},
table="raw.events",
checks=[
{"freshness": {"column": "event_time", "max_age": "1h"}},
{"row_count": {"min": 10000}},
],
poke_interval=300, # check every 5 min
timeout=3600, # give up after 1h
)
# ── Decorator: more Pythonic ──
from provero.airflow import provero_check
@dag(schedule="@daily")
def modern_pipeline():
@task
def transform():
return process_data()
@provero_check(
source="warehouse",
table="staging.output",
checks=["not_null:id", "unique:id", "freshness:updated_at<1h"],
fail_on="critical",
)
@task
def load(data):
write_to_warehouse(data)
transform() >> load()
# ── DAG auto-generated from provero.yaml ──
# If provero.yaml has a schedule defined, a DAG is generated automatically
# File: dags/provero_auto.py (single line)
from provero.airflow import generate_dags_from_directory
generate_dags_from_directory("checks/")
# This generates one DAG per suite with a schedule in provero.yaml
Flyte Integration (separate plugin)¶
# provero-flyte package
from flytekit import task, workflow
from provero.flyte import provero_check
@task
def train_model(data):
...
@provero_check(
source="s3://training-data/",
checks=["row_count:min=10000", "not_null:label", "completeness:features>0.99"],
)
@task
def validate_training_data(path: str):
...
@workflow
def ml_pipeline():
validate_training_data(path="s3://data/") >> train_model(data=...)
Streaming Architecture¶
┌─────────────────────────────────────────────────────────┐
│ Streaming Check Engine │
│ │
│ Kafka/Kinesis/Pulsar │
│ │ │
│ ▼ │
│ ┌──────────┐ ┌───────────┐ ┌───────────────┐ │
│ │ Ingester │────▶│ Windowed │────▶│ Check Engine │ │
│ │ │ │ Aggregator│ │ (same as batch)│ │
│ └──────────┘ └───────────┘ └───────┬───────┘ │
│ │ │
│ Windows: ▼ │
│ - Tumbling (every 5m) ┌───────────────────┐ │
│ - Sliding (5m window, 1m step) │ Result Store │ │
│ - Session (gap-based) │ + Anomaly Detector│ │
│ └───────────────────┘ │
│ │
│ Checks supported in streaming: │
│ - schema (JSON Schema validation per message) │
│ - throughput (messages per window) │
│ - latency (event_time vs processing_time) │
│ - null_rate (per window) │
│ - anomaly (on windowed metrics) │
│ - custom_python (UDF on each message or window) │
│ │
│ Not supported in streaming (batch-only): │
│ - unique (requires full scan) │
│ - referential_integrity (requires join) │
│ - distribution tests (requires full sample) │
└─────────────────────────────────────────────────────────┘
Data Model¶
-- Minimal schema. SQLite for local, PostgreSQL for server mode.
-- Registered sources
CREATE TABLE provero_source (
id TEXT PRIMARY KEY, -- "warehouse", "lake", etc.
type TEXT NOT NULL, -- "postgres", "snowflake", etc.
config TEXT NOT NULL, -- JSON (connection params, no secrets)
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
-- Check suites
CREATE TABLE provero_suite (
id TEXT PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
source_id TEXT REFERENCES provero_source(id),
definition TEXT NOT NULL, -- JSON (compiled AQL)
tags TEXT, -- JSON array
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
-- Suite runs
CREATE TABLE provero_run (
id TEXT PRIMARY KEY,
suite_id TEXT REFERENCES provero_suite(id),
status TEXT NOT NULL, -- running, passed, failed, error
trigger TEXT NOT NULL, -- schedule, manual, api, airflow
total INTEGER NOT NULL DEFAULT 0,
passed INTEGER NOT NULL DEFAULT 0,
failed INTEGER NOT NULL DEFAULT 0,
warned INTEGER NOT NULL DEFAULT 0,
errored INTEGER NOT NULL DEFAULT 0,
quality_score REAL, -- 0-100
started_at TEXT NOT NULL,
completed_at TEXT,
duration_ms INTEGER
);
-- Individual check results
CREATE TABLE provero_check_result (
id TEXT PRIMARY KEY,
run_id TEXT REFERENCES provero_run(id),
check_name TEXT NOT NULL,
check_type TEXT NOT NULL,
status TEXT NOT NULL, -- pass, fail, warn, error, skip
severity TEXT NOT NULL, -- info, warning, critical, blocker
source_table TEXT,
source_column TEXT,
observed_value TEXT, -- JSON
expected_value TEXT, -- JSON
row_count INTEGER,
failing_rows INTEGER,
failing_sample TEXT, -- JSON (sample of bad rows)
failing_query TEXT, -- SQL to reproduce
duration_ms INTEGER,
INDEX idx_run (run_id),
INDEX idx_check_type (check_type),
INDEX idx_status (status)
);
-- Historical metrics (for anomaly detection)
CREATE TABLE provero_metric (
id TEXT PRIMARY KEY,
suite_id TEXT REFERENCES provero_suite(id),
check_name TEXT NOT NULL,
metric_name TEXT NOT NULL, -- "row_count", "null_rate", "mean", etc.
value REAL NOT NULL,
recorded_at TEXT NOT NULL,
INDEX idx_metric_lookup (suite_id, check_name, metric_name, recorded_at)
);
-- Data contracts
CREATE TABLE provero_contract (
id TEXT PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
version TEXT NOT NULL, -- semver
owner TEXT,
source_id TEXT REFERENCES provero_source(id),
definition TEXT NOT NULL, -- JSON (schema + SLAs + checks)
status TEXT DEFAULT 'active', -- active, deprecated, violated
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
-- Drift events
CREATE TABLE provero_drift_event (
id TEXT PRIMARY KEY,
suite_id TEXT REFERENCES provero_suite(id),
column_name TEXT,
drift_type TEXT NOT NULL, -- data, schema, volume
method TEXT NOT NULL, -- psi, wasserstein, ks_test
score REAL NOT NULL,
threshold REAL NOT NULL,
is_anomaly BOOLEAN NOT NULL,
detected_at TEXT NOT NULL
);
CLI Design¶
# ── Init ──
provero init # creates provero.yaml template
provero init --from-source postgres://... # auto-generates checks
# based on the data profile
# ── Run checks ──
provero run # runs every check in provero.yaml
provero run --suite transactions # runs a specific suite
provero run --tag critical # runs checks with a tag
provero run --source warehouse # runs checks for one source
# ── Output ──
provero run --format json # JSON result
provero run --format table # table result (default)
provero run --report html # generate HTML report
# Example output (table format):
# ┌─────────────────────┬──────────┬──────────┬───────────┬──────────┐
# │ Check │ Column │ Status │ Observed │ Expected │
# ├─────────────────────┼──────────┼──────────┼───────────┼──────────┤
# │ not_null │ order_id │ ✓ PASS │ 0 nulls │ 0 nulls │
# │ not_null │ amount │ ✓ PASS │ 0 nulls │ 0 nulls │
# │ unique │ order_id │ ✓ PASS │ 0 dupes │ 0 dupes │
# │ range │ amount │ ✗ FAIL │ min=-5.00 │ min=0.01 │
# │ freshness │ created │ ✓ PASS │ 23m ago │ < 2h │
# │ row_count │ - │ ✓ PASS │ 48,291 │ > 10,000 │
# │ anomaly(row_count) │ - │ ⚠ WARN │ -18% vs 7d│ < ±25% │
# └─────────────────────┴──────────┴──────────┴───────────┴──────────┘
#
# Suite: transactions_daily
# Score: 85/100 | 5 passed, 1 failed, 1 warning | 1.2s
#
# BLOCKER: range check failed on 'amount' (3 rows with negative values)
# Query to inspect: SELECT * FROM orders WHERE amount < 0.01
# ── Profile ──
provero profile postgres://... # statistical profile of a source
provero profile --table orders # profile one table
provero profile --suggest # suggests checks based on profile
# ── Contracts ──
provero contract validate # validates data against contract
provero contract publish # publishes contract to registry
provero contract diff v2.0 v2.1 # diff between contract versions
provero contract breaking-changes # lists breaking changes
# ── Monitor (continuous) ──
provero watch # runs checks continuously
provero watch --interval 5m # every 5 minutes
provero watch --stream # streaming mode (Kafka, etc.)
# ── Server mode ──
provero server # start API server (FastAPI)
provero server --port 8080
# ── Utilities ──
provero diff source_a source_b # compare two data sources
provero lineage # show lineage (OpenLineage)
provero export openlineage # export results as OpenLineage events
provero export great-expectations # export rules as GX expectations
provero export soda # export rules as SodaCL
Technology Stack¶
Language: Python 3.11+ (core engine, connectors, SDK)
Rust (CLI binary, via PyO3, optional, MVP in Python)
SQL Engine: DuckDB (embedded, for file-based checks and profiling)
SQLAlchemy 2.0 (for database connectors)
API: FastAPI (server mode)
Validation: Pydantic v2 (config parsing, result models)
Stats: scipy (statistical tests)
numpy (numerical operations)
prophet (optional, for time-series anomaly detection)
Build: uv (package manager)
Hatch (build backend)
Testing: pytest + hypothesis
Docs: Sphinx + MyST (Markdown)
CI: GitHub Actions
Packaging:
provero-core # engine + CLI + basic connectors (Postgres, DuckDB, files)
provero-airflow # Airflow provider
provero-flyte # Flyte plugin
provero-snowflake # Snowflake connector
provero-bigquery # BigQuery connector
provero-redshift # Redshift connector
provero-kafka # Kafka streaming connector
provero-server # REST API server mode
Package Structure¶
provero/
├── provero-core/
│ └── src/provero/
│ ├── __init__.py
│ ├── core/
│ │ ├── compiler.py # AQL YAML → execution plan
│ │ ├── engine.py # Check execution engine
│ │ ├── results.py # CheckResult, SuiteResult models
│ │ ├── profiler.py # Data profiling
│ │ └── optimizer.py # SQL batching, parallelization
│ ├── checks/
│ │ ├── completeness.py # not_null, completeness
│ │ ├── uniqueness.py # unique, unique_combination
│ │ ├── validity.py # accepted_values, range, regex
│ │ ├── freshness.py # freshness, latency
│ │ ├── volume.py # row_count, row_count_change, throughput
│ │ ├── consistency.py # referential_integrity, cross_source
│ │ ├── distribution.py # ks_test, chi_squared, psi
│ │ ├── custom.py # custom_sql, custom_python
│ │ └── registry.py # check type registry (pluggable)
│ ├── anomaly/
│ │ ├── detector.py # Anomaly detection orchestrator
│ │ ├── methods/
│ │ │ ├── zscore.py
│ │ │ ├── mad.py # Median Absolute Deviation
│ │ │ ├── iqr.py # Interquartile Range
│ │ │ ├── prophet.py # Facebook Prophet (optional)
│ │ │ └── dbscan.py # Density-based (optional)
│ │ └── sensitivity.py # low/medium/high presets
│ ├── connectors/
│ │ ├── base.py # Connector protocol
│ │ ├── sql.py # Generic SQL (SQLAlchemy)
│ │ ├── postgres.py
│ │ ├── duckdb.py # Embedded (files, Parquet, CSV)
│ │ ├── dataframe.py # Pandas, Polars
│ │ └── registry.py # Connector registry
│ ├── contracts/
│ │ ├── models.py # Contract data models
│ │ ├── registry.py # Contract storage/retrieval
│ │ ├── validator.py # Validate data against contract
│ │ └── diff.py # Contract version diffing
│ ├── streaming/
│ │ ├── engine.py # Streaming check engine
│ │ ├── window.py # Windowing logic
│ │ └── ingester.py # Message ingestion
│ ├── store/
│ │ ├── sqlite.py # Local result store
│ │ ├── postgres.py # Server result store
│ │ └── models.py # SQLAlchemy models
│ ├── reporting/
│ │ ├── html.py # HTML report generator
│ │ ├── json.py # JSON output
│ │ ├── table.py # CLI table output
│ │ └── templates/ # Jinja2 HTML templates
│ ├── actions/
│ │ ├── alert.py # Slack, PagerDuty, webhook, email
│ │ ├── block.py # Block downstream pipelines
│ │ ├── quarantine.py # Move bad data to quarantine
│ │ └── trigger.py # Trigger external pipelines
│ ├── export/
│ │ ├── openlineage.py # Export as OpenLineage events
│ │ ├── great_expectations.py # Export as GX expectations
│ │ └── soda.py # Export as SodaCL
│ ├── api/ # REST API (server mode)
│ │ ├── app.py
│ │ ├── routes/
│ │ │ ├── checks.py
│ │ │ ├── contracts.py
│ │ │ ├── reports.py
│ │ │ └── health.py
│ │ └── auth.py
│ └── cli/
│ ├── main.py # Click/Typer CLI
│ ├── commands/
│ │ ├── run.py
│ │ ├── profile.py
│ │ ├── contract.py
│ │ ├── watch.py
│ │ ├── server.py
│ │ └── export.py
│ └── output.py # Rich terminal output
│
├── provero-airflow/
│ └── src/provero/airflow/
│ ├── operators.py # ProveroCheckOperator
│ ├── sensors.py # ProveroSensor, ProveroDriftSensor
│ ├── hooks.py # ProveroHook (API client)
│ ├── decorators.py # @provero_check
│ ├── dag_generator.py # Auto-gen DAGs from provero.yaml
│ └── provider.yaml
│
├── provero-flyte/
│ └── src/provero/flyte/
│ ├── plugin.py
│ └── decorators.py
│
├── provero-snowflake/
│ └── src/provero/connectors/
│ └── snowflake.py
│
├── provero-bigquery/
│ └── src/provero/connectors/
│ └── bigquery.py
│
├── provero-kafka/
│ └── src/provero/connectors/
│ └── kafka.py
│
├── provero-server/
│ └── (installs provero-core + API dependencies)
│
├── docs/
│ ├── getting-started.md
│ ├── aql-spec.md # AQL language specification
│ ├── connectors/
│ ├── airflow-integration.md
│ └── migration-from-gx.md # Guide for migrating from Great Expectations
│
├── examples/
│ ├── quickstart/ # 3-line provero.yaml
│ ├── ecommerce/ # Full e-commerce pipeline
│ ├── iot-sensors/ # IoT/industrial
│ ├── streaming/ # Kafka streaming checks
│ └── data-contracts/ # Contract-first workflow
│
├── aql-spec/ # AQL spec (separate, for adoption by others)
│ ├── spec.md
│ ├── schema.json # JSON Schema for provero.yaml
│ └── examples/
│
├── pyproject.toml # uv workspace root
└── CONTRIBUTING.md
What Makes Provero Different¶
┌────────────────────┬──────────┬───────────┬─────────┬──────────┐
│ │ Provero │ Great Ex │ Soda │ Pandera │
├────────────────────┼──────────┼───────────┼─────────┼──────────┤
│ Lines for 1 check │ 3 │ 50+ │ 5 │ 10 │
│ Anomaly detection │ Built-in │ No │ Paid │ No │
│ Data contracts │ Built-in │ No │ Yes │ No │
│ Streaming │ Yes │ No │ No │ No │
│ Airflow provider │ Yes │ Community │ Partial │ No │
│ Flyte plugin │ Yes │ No │ No │ Yes* │
│ CLI │ Yes │ Partial │ Yes │ No │
│ SQL batching │ Yes │ No │ No │ N/A │
│ Rule portability │ AQL std │ No │ No │ No │
│ Governance │ Apache │ VC-backed │ VC-back │ Union.ai │
│ Self-hosted cost │ Free │ Free* │ Free* │ Free │
│ Anomaly (self-host)│ Free │ No │ Paid │ No │
└────────────────────┴──────────┴───────────┴─────────┴──────────┘
* Free with significant engineering effort
MVP Scope (Phase 1)¶
What ships for v1.0:
Must have (MVP):
✓ provero.yaml parsing (AQL core subset)
✓ Check engine with 8 core check types:
- not_null, unique, accepted_values, range
- freshness, row_count, custom_sql, completeness
✓ 3 connectors: PostgreSQL, DuckDB (files), Pandas DataFrame
✓ CLI: provero init, provero run, provero profile
✓ Result store: SQLite
✓ Table + JSON output
✓ Airflow provider: ProveroCheckOperator (basic)
✓ One complete example: e-commerce pipeline
Phase 2:
- Anomaly detection (Z-Score, MAD)
- HTML reports
- Data contracts
- Snowflake, BigQuery connectors
- provero profile --suggest
- SQL batching optimizer
Phase 3:
- Streaming engine (Kafka)
- Server mode (REST API)
- Prophet anomaly detection
- Flyte plugin
- Export to GX/Soda formats
- AQL spec v1.0 formal publication
Phase 4:
- Advanced anomaly (DBSCAN, multivariate)
- Quarantine actions
- OpenLineage integration
- Rust CLI
- UI dashboard (optional, community-driven)
Anti-Griffin: Community Health Plan¶
Lessons from Griffin's failure, built into the project DNA:
1. Multi-company from day 0
- Seek committers from at least 3 organizations before proposing
- No single company holds >40% of committers
2. Release early, release often
- Monthly releases
- No multi-year rewrites. Incremental improvements only.
3. Onboarding pipeline
- "good first issue" labels always available
- Contributor guide with video walkthrough
- Monthly contributor office hours
4. Communication cadence
- Weekly dev sync (async, on mailing list)
- Monthly community call (video)
- Board reports always on time
5. User community
- Discord/Slack for real-time help
- GitHub Discussions for decisions
- Blog post for every release
6. Meritocratic path
- Clear criteria for contributor → committer → PMC
- Actively invite new committers (target: 2+ new per quarter)