Skip to content

Provero - Technical Architecture

provero /ˈæseɪ/, the testing of a substance to determine its quality or purity.

A vendor-neutral, declarative data quality engine with built-in anomaly detection. Works standalone, as an Airflow provider, or with any orchestrator. A spiritual successor to Apache Griffin, learning from its mistakes.

Note: This document describes the target architecture, including planned features not yet implemented. For currently available features, see the README. Unimplemented components include: server mode (FastAPI), streaming engine (Kafka), dedicated cloud connectors (Snowflake, BigQuery, Redshift), email alerts, watch mode, and Prophet-based anomaly detection.


Design Principles

  1. Simple by default, powerful when needed. A quality check in 3 lines of YAML. GX needs 50+ lines for the same result.
  2. Portable rules. Rules defined once, executed anywhere. Introduces the Provero Quality Language (AQL), an open standard.
  3. Anomaly detection built-in. No need for a $100k/year SaaS. Statistical detection runs locally, with no external dependency.
  4. Orchestrator-agnostic. Works as a CLI, as a Python library, as an Airflow provider, as a sidecar in any pipeline.
  5. Streaming + Batch. First open source framework to treat streaming as a first-class citizen.
  6. Data contracts first. Producers declare, consumers verify, Provero enforces.

High-Level Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                           USER LAYER                                    │
│                                                                         │
│  provero.yaml          Python SDK          CLI            REST API      │
│  (declarative)       (programmatic)      (provero-ctl)    (server mode) │
│                                                                         │
│  ┌──────────────┐   ┌──────────────┐   ┌────────────┐  ┌────────────┐   │
│  │ source: pg   │   │ @check       │   │ provero run│  │ POST /scan │   │
│  │ checks:      │   │ @contract    │   │ provero... │  │ GET /report│   │
│  │  - not_null  │   │ @monitor     │   │ provero... │  │ GET /health│   │
│  │  - unique    │   │              │   │ provero... │  │            │   │
│  └──────────────┘   └──────────────┘   └────────────┘  └────────────┘   │
└──────────┬──────────────────┬──────────────────┬──────────────┬─────────┘
           │                  │                  │              │
           ▼                  ▼                  ▼              ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                         PROVERO CORE ENGINE                             │
│                                                                         │
│  ┌─────────────────┐  ┌─────────────────┐  ┌────────────────────────┐   │
│  │  Rule Compiler  │  │  Check Engine   │  │  Anomaly Detector      │   │
│  │                 │  │                 │  │                        │   │
│  │  AQL → plan     │  │  Runs checks    │  │  Statistical models    │   │
│  │  Validates      │  │  against data   │  │  over historical       │   │
│  │  Optimizes scan │  │  Returns verdict│  │  check results         │   │
│  └────────┬────────┘  └────────┬────────┘  └───────────┬────────────┘   │
│           │                    │                       │                │
│  ┌────────┴────────────────────┴───────────────────────┴─────────────┐  │
│  │                     Result Store                                  │  │
│  │  Check results, metrics, anomalies, trends (time-series)          │  │
│  │  Backends: SQLite (local) | PostgreSQL (server) | S3 (archive)    │  │
│  └───────────────────────────┬───────────────────────────────────────┘  │
│                              │                                          │
│  ┌───────────────────────────┴────────────────────────────────────────┐ │
│  │                     Contract Registry                              │ │
│  │  Data contracts: schema + quality SLAs + ownership                 │ │
│  │  Versioned, git-friendly, publishable                              │ │
│  └────────────────────────────────────────────────────────────────────┘ │
└──────────────────────────────┬──────────────────────────────────────────┘
              ┌────────────────┼────────────────┐
              ▼                ▼                ▼
┌──────────────────┐ ┌─────────────────┐ ┌──────────────────────┐
│  DATA CONNECTORS │ │  ORCHESTRATION  │ │  OUTPUT / ACTIONS    │
│                  │ │  INTEGRATIONS   │ │                      │
│ ┌──────────────┐ │ │ ┌─────────────┐ │ │ ┌──────────────────┐ │
│ │ SQL          │ │ │ │ Airflow     │ │ │ │ Reports (HTML,   │ │
│ │ (Postgres,   │ │ │ │ Provider    │ │ │ │ JSON, Markdown)  │ │
│ │  MySQL,      │ │ │ ├─────────────┤ │ │ ├──────────────────┤ │
│ │  Snowflake,  │ │ │ │ Standalone  │ │ │ │ Alerts (Slack,   │ │
│ │  BigQuery,   │ │ │ │ (cron/CLI)  │ │ │ │ PagerDuty, email,│ │
│ │  Redshift,   │ │ │ ├─────────────┤ │ │ │ webhook)         │ │
│ │  DuckDB)     │ │ │ │ Flyte       │ │ │ ├──────────────────┤ │
│ ├──────────────┤ │ │ │ (plugin)    │ │ │ │ OpenLineage      │ │
│ │ DataFrame    │ │ │ ├─────────────┤ │ │ │ (lineage events) │ │
│ │ (Pandas,     │ │ │ │ Dagster,    │ │ │ ├──────────────────┤ │
│ │  Polars,     │ │ │ │ Prefect     │ │ │ │ Block/Quarantine │ │
│ │  Spark)      │ │ │ │ (future)    │ │ │ │ (halt pipeline   │ │
│ ├──────────────┤ │ │ └─────────────┘ │ │ │  on failure)     │ │
│ │ Files        │ │ │                 │ │ ├──────────────────┤ │
│ │ (Parquet,    │ │ │                 │ │ │ OpenTelemetry    │ │
│ │  CSV, JSON,  │ │ │                 │ │ │ (metrics export) │ │
│ │  Avro, Delta,│ │ │                 │ │ └──────────────────┘ │
│ │  Iceberg)    │ │ │                 │ │                      │
│ ├──────────────┤ │ │                 │ │                      │
│ │ Streaming    │ │ │                 │ │                      │
│ │ (Kafka,      │ │ │                 │ │                      │
│ │  Kinesis,    │ │ │                 │ │                      │
│ │  Pulsar)     │ │ │                 │ │                      │
│ ├──────────────┤ │ │                 │ │                      │
│ │ APIs         │ │ │                 │ │                      │
│ │ (REST, gRPC, │ │ │                 │ │                      │
│ │  GraphQL)    │ │ │                 │ │                      │
│ └──────────────┘ │ │                 │ │                      │
└──────────────────┘ └─────────────────┘ └──────────────────────┘

Provero Quality Language (AQL)

The main differentiator. An open standard for defining quality rules that works in any tool, the way SQL works in any database.

Basic syntax

# provero.yaml, the simplest possible form
source:
  type: postgres
  connection: ${POSTGRES_URI}    # environment variable or Airflow connection
  table: orders

checks:
  - not_null: [order_id, customer_id, amount]
  - unique: order_id
  - accepted_values:
      column: status
      values: [pending, shipped, delivered, cancelled]
  - range:
      column: amount
      min: 0
      max: 1000000
  - freshness:
      column: created_at
      max_age: 24h
  - row_count:
      min: 1000
  - custom_sql: |
      SELECT COUNT(*) = 0
      FROM orders
      WHERE amount < 0 AND status = 'delivered'

3 lines for the most common case. No boilerplate, no classes, no configs.

Full example with every feature

# provero.yaml, advanced example
version: "1.0"

# Reusable data sources
sources:
  warehouse:
    type: snowflake
    connection: ${SNOWFLAKE_URI}
  lake:
    type: s3
    bucket: data-lake-prod
    format: parquet
  stream:
    type: kafka
    bootstrap_servers: ${KAFKA_BROKERS}
    topic: events.transactions

# ──────────────────────────────────────────
# Data Contracts (producer declares, consumer verifies)
# ──────────────────────────────────────────
contracts:
  - name: transactions_contract
    owner: payments-team
    source: warehouse
    table: transactions
    version: "2.1"
    sla:
      freshness: 1h
      completeness: 99.5%    # max 0.5% nulls in required fields
      availability: 99.9%    # uptime of the data source
    schema:
      columns:
        - name: tx_id
          type: string
          checks: [not_null, unique]
        - name: amount
          type: decimal(10,2)
          checks:
            - not_null
            - range: {min: 0.01}
        - name: currency
          type: string
          checks:
            - accepted_values: [USD, EUR, GBP, BRL, JPY]
        - name: customer_id
          type: string
          checks: [not_null]
        - name: created_at
          type: timestamp
          checks:
            - not_null
            - freshness: {max_age: 1h}
        - name: status
          type: string
          checks:
            - accepted_values: [pending, completed, failed, refunded]
    on_violation: block       # block | warn | quarantine

# ──────────────────────────────────────────
# Check Suites (logical grouping)
# ──────────────────────────────────────────
suites:
  - name: transactions_daily
    description: "Daily quality checks on transactions table"
    source: warehouse
    table: transactions
    schedule: "0 6 * * *"    # 6am UTC (standalone mode)
    tags: [payments, daily, critical]

    checks:
      # ── Completeness ──
      - not_null: [tx_id, amount, customer_id, created_at]
      - completeness:
          column: email
          min: 0.95            # at least 95% non-null

      # ── Uniqueness ──
      - unique: tx_id
      - unique_combination: [customer_id, created_at, amount]

      # ── Validity ──
      - accepted_values:
          column: status
          values: [pending, completed, failed, refunded]
      - regex:
          column: email
          pattern: "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$"
      - range:
          column: amount
          min: 0.01
          max: 999999.99

      # ── Freshness ──
      - freshness:
          column: created_at
          max_age: 2h

      # ── Volume ──
      - row_count:
          min: 10000
          max: 10000000
      - row_count_change:
          max_decrease: 20%    # alert if count drops >20% vs yesterday
          max_increase: 300%   # alert if count spikes >300%

      # ── Consistency ──
      - referential_integrity:
          column: customer_id
          reference:
            source: warehouse
            table: customers
            column: id
      - cross_source:
          description: "Row count matches between warehouse and lake"
          left:
            source: warehouse
            query: "SELECT COUNT(*) FROM transactions WHERE date = CURRENT_DATE"
          right:
            source: lake
            path: "transactions/date={{ today }}/"
            query: "SELECT COUNT(*) FROM read_parquet('*.parquet')"
          check: "abs(left - right) / left < 0.01"   # <1% difference

      # ── Statistical / Anomaly ──
      - distribution:
          column: amount
          method: ks_test          # Kolmogorov-Smirnov
          reference: last_30d      # compare against last 30 days
          significance: 0.05
      - anomaly:
          metric: row_count
          method: prophet          # prophet | zscore | mad | iqr
          sensitivity: medium      # low | medium | high
      - anomaly:
          metric: null_rate
          column: email
          method: zscore
          threshold: 3.0

      # ── Custom SQL ──
      - custom_sql:
          name: "no_negative_completed"
          query: |
            SELECT COUNT(*) = 0
            FROM transactions
            WHERE amount < 0 AND status = 'completed'
      - custom_sql:
          name: "daily_revenue_sanity"
          query: |
            SELECT SUM(amount) BETWEEN 100000 AND 50000000
            FROM transactions
            WHERE date = CURRENT_DATE AND status = 'completed'

    # Actions when checks fail
    on_failure:
      - alert:
          channels: [slack, pagerduty]
          severity: critical
          message: "Transaction quality checks failed: {{ failed_checks }}"
      - block_downstream: true      # prevent downstream DAGs from running
      - quarantine:                 # move bad data to quarantine
          target: warehouse.quarantine.transactions
          filter: "{{ failing_rows_query }}"

  # ── Streaming checks ──
  - name: transactions_stream
    source: stream
    type: streaming                 # key difference: runs continuously
    window: 5m                      # evaluate every 5 min window

    checks:
      - schema:
          type: json_schema
          ref: schemas/transaction_event.json
      - throughput:
          min: 100                  # min 100 events per window
          max: 50000
      - latency:
          field: event_time
          max_delay: 30s            # event_time vs processing_time
      - anomaly:
          metric: throughput
          method: mad               # Median Absolute Deviation
          sensitivity: high

    on_failure:
      - alert:
          channels: [pagerduty]
          severity: critical

# ──────────────────────────────────────────
# Monitoring (continuous, post-deploy)
# ──────────────────────────────────────────
monitors:
  - name: transactions_drift
    source: warehouse
    table: transactions
    schedule: "0 */6 * * *"         # every 6 hours
    columns: [amount, currency, status]
    methods:
      - psi                         # Population Stability Index
      - wasserstein                 # Earth Mover's Distance
    reference: last_7d
    threshold: 0.2
    on_drift:
      - alert: {channels: [slack]}
      - trigger_pipeline: retrain_fraud_model   # trigger Airflow DAG

# ──────────────────────────────────────────
# Report config
# ──────────────────────────────────────────
reporting:
  formats: [html, json]
  retention: 90d
  publish_to: s3://data-quality-reports/

AQL as an open standard

AQL (Provero Quality Language) Specification:

Goal: become the "SQL of data quality"
     Define once, run anywhere.

Core check types (universal):
┌─────────────────────┬────────────────────────────────────────┐
│ Category            │ Checks                                 │
├─────────────────────┼────────────────────────────────────────┤
│ Completeness        │ not_null, completeness                 │
│ Uniqueness          │ unique, unique_combination             │
│ Validity            │ accepted_values, range, regex, type    │
│ Freshness           │ freshness, latency                     │
│ Volume              │ row_count, row_count_change, throughput│
│ Consistency         │ referential_integrity, cross_source    │
│ Distribution        │ distribution, anomaly                  │
│ Custom              │ custom_sql, custom_python              │
└─────────────────────┴────────────────────────────────────────┘

The spec defines:
- YAML schema (JSON Schema published)
- Check semantics (what each check means, precisely)
- Result format (standardized output)
- Severity levels (info, warning, critical, blocker)

Other tools can implement AQL:
- Great Expectations could add an AQL importer
- Soda could add AQL compatibility
- dbt could compile AQL to dbt tests
- Cloud DQ tools could adopt AQL as input format

Core Components

1. Rule Compiler

Transforms AQL (YAML) into optimized execution plans.

provero.yaml
┌──────────────────────────────┐
│         Rule Compiler        │
│                              │
│  1. Parse YAML               │
│  2. Validate against schema  │
│  3. Resolve references       │
│     (sources, variables)     │
│  4. Build dependency graph   │
│  5. Optimize execution:      │
│     - Batch SQL checks into  │
│       single query           │
│     - Parallelize independent│
│       checks                 │
│     - Sample large tables    │
│       when configured        │
│  6. Output: ExecutionPlan    │
└──────────────────────────────┘

Key optimization: SQL check batching
Instead of:
  SELECT COUNT(*) FROM t WHERE col IS NULL;     -- check 1
  SELECT COUNT(DISTINCT col) FROM t;            -- check 2
  SELECT MIN(col), MAX(col) FROM t;             -- check 3

Compiled into:
  SELECT
    COUNT(*) FILTER (WHERE col IS NULL) as null_count,
    COUNT(DISTINCT col) as distinct_count,
    MIN(col) as min_val,
    MAX(col) as max_val,
    COUNT(*) as total_count
  FROM t;

One query instead of three. Massive performance difference at scale.

2. Check Engine

Executes checks against the data and returns standardized results.

# Standardized result for every check
@dataclass
class CheckResult:
    check_name: str
    check_type: str              # "not_null", "unique", "anomaly", etc.
    status: Status               # PASS | FAIL | WARN | ERROR | SKIP
    severity: Severity           # INFO | WARNING | CRITICAL | BLOCKER

    # What was checked
    source: str
    table: str
    column: str | None

    # Result
    observed_value: Any          # what was found
    expected_value: Any          # what was expected

    # Context
    row_count: int               # rows checked
    failing_rows: int            # rows that failed
    failing_rows_sample: list    # sample of bad rows (configurable)
    failing_rows_query: str      # SQL to reproduce

    # Timing
    started_at: datetime
    duration_ms: int

    # Metadata
    tags: list[str]
    suite: str
    run_id: str

# Result of a full suite
@dataclass
class SuiteResult:
    suite_name: str
    status: Status               # PASS if all passed, otherwise FAIL
    checks: list[CheckResult]

    total: int
    passed: int
    failed: int
    warned: int
    errored: int

    started_at: datetime
    duration_ms: int

    # Quality score (0-100)
    quality_score: float

3. Anomaly Detector

Built-in anomaly detection, no external SaaS dependency.

┌─────────────────────────────────────────────────────────────┐
│                    Anomaly Detector                         │
│                                                             │
│  Works over the history of check results.                   │
│  Does not need raw data, only the metrics.                  │
│                                                             │
│  ┌───────────────────────────────────────────────────────┐  │
│  │ Methods (built-in, no external deps)                  │  │
│  │                                                       │  │
│  │  Z-Score          Simple, fast.                       │  │
│  │                   Good for normally-distributed       │  │
│  │                   metrics (row_count, latency)        │  │
│  │                                                       │  │
│  │  MAD              Median Absolute Deviation.          │  │
│  │  (default)        Robust to outliers. Works well      │  │
│  │                   in most cases.                      │  │
│  │                                                       │  │
│  │  IQR              Interquartile Range.                │  │
│  │                   Good for skewed metrics.            │  │
│  │                                                       │  │
│  │  Prophet          Facebook Prophet.                   │  │
│  │  (optional dep)   For metrics with seasonality        │  │
│  │                   (weekday/weekend, holidays).        │  │
│  │                                                       │  │
│  │  DBSCAN           Density-based clustering.           │  │
│  │                   For multivariate anomaly            │  │
│  │                   detection.                          │  │
│  └───────────────────────────────────────────────────────┘  │
│                                                             │
│  Input:  metric history (result store)                      │
│  Output: anomaly score + is_anomaly boolean + explanation   │
│                                                             │
│  ┌───────────────────────────────────────────────────────┐  │
│  │ Sensitivity presets (user-friendly)                   │  │
│  │                                                       │  │
│  │  low:    flags only extreme deviations (>4 sigma)     │  │
│  │  medium: flags significant deviations (>3 sigma)      │  │
│  │  high:   flags moderate deviations (>2 sigma)         │  │
│  └───────────────────────────────────────────────────────┘  │
│                                                             │
│  No GPU required, no cloud required, runs anywhere.         │
└─────────────────────────────────────────────────────────────┘

4. Contract Registry

Versioned, publishable, auditable data contracts.

Concept:

  PRODUCER (payments team)          CONSUMER (analytics team)
       │                                    │
       │  publishes contract                │  imports contract
       ▼                                    ▼
  ┌──────────────┐                 ┌──────────────┐
  │ transactions │                 │ analytics    │
  │ _contract    │                 │ _pipeline    │
  │ v2.1         │                 │              │
  │              │    registry     │ expects:     │
  │ schema: ...  │◄──────────────► │  transactions│
  │ sla: ...     │                 │  _contract   │
  │ checks: ...  │                 │  v2.x        │
  └──────────────┘                 └──────────────┘

The contract defines:
  - Schema (columns, types, nullable)
  - Quality SLAs (freshness, completeness, availability)
  - Owner (who is responsible)
  - Version (semver, breaking changes = major bump)
  - Checks (quality rules the producer guarantees)

Storage:
  - Git (YAML files in the repo, versioned with the code)
  - Registry Server (API to publish/query, server mode)
  - Both (git as source of truth, server as cache)

Workflow:
  1. Producer defines the contract in provero.yaml
  2. CI verifies that the data satisfies the contract
  3. Consumer declares a dependency on the contract
  4. If the producer breaks the contract, the consumer is alerted
  5. Breaking changes (major version) require consumer opt-in

5. Data Connectors

Pluggable architecture. Every connector implements a simple interface.

class Connector(Protocol):
    """Interface that every connector implements."""

    def connect(self, config: dict) -> Connection:
        """Establish connection to the data source."""
        ...

    def execute_checks(
        self,
        connection: Connection,
        checks: list[CompiledCheck]
    ) -> list[CheckResult]:
        """Execute compiled checks against the source."""
        ...

    def get_profile(
        self,
        connection: Connection,
        table: str,
        columns: list[str] | None = None,
        sample_size: int | None = None
    ) -> DataProfile:
        """Generate a statistical profile of the data."""
        ...

    def get_schema(
        self,
        connection: Connection,
        table: str
    ) -> SchemaInfo:
        """Return the table schema."""
        ...


# Built-in connectors in the core:
# SQL (SQLAlchemy-based): Postgres, MySQL, SQLite, DuckDB
# Cloud SQL: Snowflake, BigQuery, Redshift, Databricks
# Files: Parquet, CSV, JSON, Avro (via DuckDB or Polars)
# DataFrame: Pandas, Polars, Spark
# Streaming: Kafka, Kinesis, Pulsar

# Plugin connectors (separate packages):
# provero-connector-mongodb
# provero-connector-elasticsearch
# provero-connector-dynamodb
# etc.

Airflow Integration

Airflow Provider Package: apache-airflow-providers-provero

# ── Operator: runs checks inline in the DAG ──

from airflow.decorators import dag, task
from provero.airflow import ProveroCheckOperator, ProveroSensor

@dag(schedule="@daily")
def etl_pipeline():

    extract = ...
    transform = ...

    # Quality gate between transform and load
    quality_check = ProveroCheckOperator(
        task_id="quality_check",
        # Option 1: reference a provero.yaml file
        provero_file="checks/transactions.yaml",
        # Option 2: inline checks
        # source={"type": "postgres", "conn_id": "warehouse"},
        # table="staging.transactions",
        # checks=[
        #     {"not_null": ["id", "amount"]},
        #     {"row_count": {"min": 1000}},
        # ],
        fail_on=["critical", "blocker"],  # which severities block
    )

    load = ...

    extract >> transform >> quality_check >> load


# ── Sensor: wait for data to reach a minimum quality ──

    wait_for_quality = ProveroSensor(
        task_id="wait_for_quality",
        source={"type": "postgres", "conn_id": "warehouse"},
        table="raw.events",
        checks=[
            {"freshness": {"column": "event_time", "max_age": "1h"}},
            {"row_count": {"min": 10000}},
        ],
        poke_interval=300,      # check every 5 min
        timeout=3600,           # give up after 1h
    )


# ── Decorator: more Pythonic ──

from provero.airflow import provero_check

@dag(schedule="@daily")
def modern_pipeline():

    @task
    def transform():
        return process_data()

    @provero_check(
        source="warehouse",
        table="staging.output",
        checks=["not_null:id", "unique:id", "freshness:updated_at<1h"],
        fail_on="critical",
    )
    @task
    def load(data):
        write_to_warehouse(data)

    transform() >> load()


# ── DAG auto-generated from provero.yaml ──
# If provero.yaml has a schedule defined, a DAG is generated automatically

# File: dags/provero_auto.py (single line)
from provero.airflow import generate_dags_from_directory
generate_dags_from_directory("checks/")
# This generates one DAG per suite with a schedule in provero.yaml

Flyte Integration (separate plugin)

# provero-flyte package
from flytekit import task, workflow
from provero.flyte import provero_check

@task
def train_model(data):
    ...

@provero_check(
    source="s3://training-data/",
    checks=["row_count:min=10000", "not_null:label", "completeness:features>0.99"],
)
@task
def validate_training_data(path: str):
    ...

@workflow
def ml_pipeline():
    validate_training_data(path="s3://data/") >> train_model(data=...)

Streaming Architecture

┌─────────────────────────────────────────────────────────┐
│               Streaming Check Engine                    │
│                                                         │
│  Kafka/Kinesis/Pulsar                                   │
│       │                                                 │
│       ▼                                                 │
│  ┌──────────┐     ┌───────────┐     ┌───────────────┐   │
│  │ Ingester │────▶│ Windowed  │────▶│ Check Engine  │   │
│  │          │     │ Aggregator│     │ (same as batch)│  │
│  └──────────┘     └───────────┘     └───────┬───────┘   │
│                                             │           │
│  Windows:                                   ▼           │
│  - Tumbling (every 5m)          ┌───────────────────┐   │
│  - Sliding (5m window, 1m step) │ Result Store      │   │
│  - Session (gap-based)          │ + Anomaly Detector│   │
│                                 └───────────────────┘   │
│                                                         │
│  Checks supported in streaming:                         │
│  - schema (JSON Schema validation per message)          │
│  - throughput (messages per window)                     │
│  - latency (event_time vs processing_time)              │
│  - null_rate (per window)                               │
│  - anomaly (on windowed metrics)                        │
│  - custom_python (UDF on each message or window)        │
│                                                         │
│  Not supported in streaming (batch-only):               │
│  - unique (requires full scan)                          │
│  - referential_integrity (requires join)                │
│  - distribution tests (requires full sample)            │
└─────────────────────────────────────────────────────────┘

Data Model

-- Minimal schema. SQLite for local, PostgreSQL for server mode.

-- Registered sources
CREATE TABLE provero_source (
    id          TEXT PRIMARY KEY,    -- "warehouse", "lake", etc.
    type        TEXT NOT NULL,       -- "postgres", "snowflake", etc.
    config      TEXT NOT NULL,       -- JSON (connection params, no secrets)
    created_at  TEXT NOT NULL,
    updated_at  TEXT NOT NULL
);

-- Check suites
CREATE TABLE provero_suite (
    id          TEXT PRIMARY KEY,
    name        TEXT UNIQUE NOT NULL,
    source_id   TEXT REFERENCES provero_source(id),
    definition  TEXT NOT NULL,       -- JSON (compiled AQL)
    tags        TEXT,                -- JSON array
    created_at  TEXT NOT NULL,
    updated_at  TEXT NOT NULL
);

-- Suite runs
CREATE TABLE provero_run (
    id          TEXT PRIMARY KEY,
    suite_id    TEXT REFERENCES provero_suite(id),
    status      TEXT NOT NULL,       -- running, passed, failed, error
    trigger     TEXT NOT NULL,       -- schedule, manual, api, airflow

    total       INTEGER NOT NULL DEFAULT 0,
    passed      INTEGER NOT NULL DEFAULT 0,
    failed      INTEGER NOT NULL DEFAULT 0,
    warned      INTEGER NOT NULL DEFAULT 0,
    errored     INTEGER NOT NULL DEFAULT 0,

    quality_score REAL,              -- 0-100

    started_at  TEXT NOT NULL,
    completed_at TEXT,
    duration_ms INTEGER
);

-- Individual check results
CREATE TABLE provero_check_result (
    id              TEXT PRIMARY KEY,
    run_id          TEXT REFERENCES provero_run(id),
    check_name      TEXT NOT NULL,
    check_type      TEXT NOT NULL,

    status          TEXT NOT NULL,    -- pass, fail, warn, error, skip
    severity        TEXT NOT NULL,    -- info, warning, critical, blocker

    source_table    TEXT,
    source_column   TEXT,

    observed_value  TEXT,             -- JSON
    expected_value  TEXT,             -- JSON

    row_count       INTEGER,
    failing_rows    INTEGER,
    failing_sample  TEXT,             -- JSON (sample of bad rows)
    failing_query   TEXT,             -- SQL to reproduce

    duration_ms     INTEGER,

    INDEX idx_run (run_id),
    INDEX idx_check_type (check_type),
    INDEX idx_status (status)
);

-- Historical metrics (for anomaly detection)
CREATE TABLE provero_metric (
    id          TEXT PRIMARY KEY,
    suite_id    TEXT REFERENCES provero_suite(id),
    check_name  TEXT NOT NULL,
    metric_name TEXT NOT NULL,        -- "row_count", "null_rate", "mean", etc.
    value       REAL NOT NULL,
    recorded_at TEXT NOT NULL,

    INDEX idx_metric_lookup (suite_id, check_name, metric_name, recorded_at)
);

-- Data contracts
CREATE TABLE provero_contract (
    id          TEXT PRIMARY KEY,
    name        TEXT UNIQUE NOT NULL,
    version     TEXT NOT NULL,        -- semver
    owner       TEXT,
    source_id   TEXT REFERENCES provero_source(id),
    definition  TEXT NOT NULL,        -- JSON (schema + SLAs + checks)
    status      TEXT DEFAULT 'active', -- active, deprecated, violated
    created_at  TEXT NOT NULL,
    updated_at  TEXT NOT NULL
);

-- Drift events
CREATE TABLE provero_drift_event (
    id          TEXT PRIMARY KEY,
    suite_id    TEXT REFERENCES provero_suite(id),
    column_name TEXT,
    drift_type  TEXT NOT NULL,        -- data, schema, volume
    method      TEXT NOT NULL,        -- psi, wasserstein, ks_test
    score       REAL NOT NULL,
    threshold   REAL NOT NULL,
    is_anomaly  BOOLEAN NOT NULL,
    detected_at TEXT NOT NULL
);

CLI Design

# ── Init ──
provero init                          # creates provero.yaml template
provero init --from-source postgres://...  # auto-generates checks
                                         # based on the data profile

# ── Run checks ──
provero run                           # runs every check in provero.yaml
provero run --suite transactions      # runs a specific suite
provero run --tag critical            # runs checks with a tag
provero run --source warehouse        # runs checks for one source

# ── Output ──
provero run --format json             # JSON result
provero run --format table            # table result (default)
provero run --report html             # generate HTML report

# Example output (table format):
# ┌─────────────────────┬──────────┬──────────┬───────────┬──────────┐
# │ Check               │ Column   │ Status   │ Observed  │ Expected │
# ├─────────────────────┼──────────┼──────────┼───────────┼──────────┤
# │ not_null            │ order_id │ ✓ PASS   │ 0 nulls   │ 0 nulls  │
# │ not_null            │ amount   │ ✓ PASS   │ 0 nulls   │ 0 nulls  │
# │ unique              │ order_id │ ✓ PASS   │ 0 dupes   │ 0 dupes  │
# │ range               │ amount   │ ✗ FAIL   │ min=-5.00 │ min=0.01 │
# │ freshness           │ created  │ ✓ PASS   │ 23m ago   │ < 2h     │
# │ row_count           │ -        │ ✓ PASS   │ 48,291    │ > 10,000 │
# │ anomaly(row_count)  │ -        │ ⚠ WARN   │ -18% vs 7d│ < ±25%   │
# └─────────────────────┴──────────┴──────────┴───────────┴──────────┘
#
# Suite: transactions_daily
# Score: 85/100 | 5 passed, 1 failed, 1 warning | 1.2s
#
# BLOCKER: range check failed on 'amount' (3 rows with negative values)
# Query to inspect: SELECT * FROM orders WHERE amount < 0.01

# ── Profile ──
provero profile postgres://...        # statistical profile of a source
provero profile --table orders        # profile one table
provero profile --suggest             # suggests checks based on profile

# ── Contracts ──
provero contract validate             # validates data against contract
provero contract publish              # publishes contract to registry
provero contract diff v2.0 v2.1       # diff between contract versions
provero contract breaking-changes     # lists breaking changes

# ── Monitor (continuous) ──
provero watch                         # runs checks continuously
provero watch --interval 5m           # every 5 minutes
provero watch --stream                # streaming mode (Kafka, etc.)

# ── Server mode ──
provero server                        # start API server (FastAPI)
provero server --port 8080

# ── Utilities ──
provero diff source_a source_b        # compare two data sources
provero lineage                       # show lineage (OpenLineage)
provero export openlineage            # export results as OpenLineage events
provero export great-expectations     # export rules as GX expectations
provero export soda                   # export rules as SodaCL

Technology Stack

Language:        Python 3.11+ (core engine, connectors, SDK)
                 Rust (CLI binary, via PyO3, optional, MVP in Python)

SQL Engine:      DuckDB (embedded, for file-based checks and profiling)
                 SQLAlchemy 2.0 (for database connectors)

API:             FastAPI (server mode)
Validation:      Pydantic v2 (config parsing, result models)
Stats:           scipy (statistical tests)
                 numpy (numerical operations)
                 prophet (optional, for time-series anomaly detection)

Build:           uv (package manager)
                 Hatch (build backend)

Testing:         pytest + hypothesis
Docs:            Sphinx + MyST (Markdown)
CI:              GitHub Actions

Packaging:
  provero-core              # engine + CLI + basic connectors (Postgres, DuckDB, files)
  provero-airflow           # Airflow provider
  provero-flyte             # Flyte plugin
  provero-snowflake         # Snowflake connector
  provero-bigquery          # BigQuery connector
  provero-redshift          # Redshift connector
  provero-kafka             # Kafka streaming connector
  provero-server            # REST API server mode

Package Structure

provero/
├── provero-core/
│   └── src/provero/
│       ├── __init__.py
│       ├── core/
│       │   ├── compiler.py          # AQL YAML → execution plan
│       │   ├── engine.py            # Check execution engine
│       │   ├── results.py           # CheckResult, SuiteResult models
│       │   ├── profiler.py          # Data profiling
│       │   └── optimizer.py         # SQL batching, parallelization
│       ├── checks/
│       │   ├── completeness.py      # not_null, completeness
│       │   ├── uniqueness.py        # unique, unique_combination
│       │   ├── validity.py          # accepted_values, range, regex
│       │   ├── freshness.py         # freshness, latency
│       │   ├── volume.py            # row_count, row_count_change, throughput
│       │   ├── consistency.py       # referential_integrity, cross_source
│       │   ├── distribution.py      # ks_test, chi_squared, psi
│       │   ├── custom.py            # custom_sql, custom_python
│       │   └── registry.py          # check type registry (pluggable)
│       ├── anomaly/
│       │   ├── detector.py          # Anomaly detection orchestrator
│       │   ├── methods/
│       │   │   ├── zscore.py
│       │   │   ├── mad.py           # Median Absolute Deviation
│       │   │   ├── iqr.py           # Interquartile Range
│       │   │   ├── prophet.py       # Facebook Prophet (optional)
│       │   │   └── dbscan.py        # Density-based (optional)
│       │   └── sensitivity.py       # low/medium/high presets
│       ├── connectors/
│       │   ├── base.py              # Connector protocol
│       │   ├── sql.py               # Generic SQL (SQLAlchemy)
│       │   ├── postgres.py
│       │   ├── duckdb.py            # Embedded (files, Parquet, CSV)
│       │   ├── dataframe.py         # Pandas, Polars
│       │   └── registry.py          # Connector registry
│       ├── contracts/
│       │   ├── models.py            # Contract data models
│       │   ├── registry.py          # Contract storage/retrieval
│       │   ├── validator.py         # Validate data against contract
│       │   └── diff.py              # Contract version diffing
│       ├── streaming/
│       │   ├── engine.py            # Streaming check engine
│       │   ├── window.py            # Windowing logic
│       │   └── ingester.py          # Message ingestion
│       ├── store/
│       │   ├── sqlite.py            # Local result store
│       │   ├── postgres.py          # Server result store
│       │   └── models.py            # SQLAlchemy models
│       ├── reporting/
│       │   ├── html.py              # HTML report generator
│       │   ├── json.py              # JSON output
│       │   ├── table.py             # CLI table output
│       │   └── templates/           # Jinja2 HTML templates
│       ├── actions/
│       │   ├── alert.py             # Slack, PagerDuty, webhook, email
│       │   ├── block.py             # Block downstream pipelines
│       │   ├── quarantine.py        # Move bad data to quarantine
│       │   └── trigger.py           # Trigger external pipelines
│       ├── export/
│       │   ├── openlineage.py       # Export as OpenLineage events
│       │   ├── great_expectations.py # Export as GX expectations
│       │   └── soda.py              # Export as SodaCL
│       ├── api/                     # REST API (server mode)
│       │   ├── app.py
│       │   ├── routes/
│       │   │   ├── checks.py
│       │   │   ├── contracts.py
│       │   │   ├── reports.py
│       │   │   └── health.py
│       │   └── auth.py
│       └── cli/
│           ├── main.py              # Click/Typer CLI
│           ├── commands/
│           │   ├── run.py
│           │   ├── profile.py
│           │   ├── contract.py
│           │   ├── watch.py
│           │   ├── server.py
│           │   └── export.py
│           └── output.py            # Rich terminal output
├── provero-airflow/
│   └── src/provero/airflow/
│       ├── operators.py             # ProveroCheckOperator
│       ├── sensors.py               # ProveroSensor, ProveroDriftSensor
│       ├── hooks.py                 # ProveroHook (API client)
│       ├── decorators.py            # @provero_check
│       ├── dag_generator.py         # Auto-gen DAGs from provero.yaml
│       └── provider.yaml
├── provero-flyte/
│   └── src/provero/flyte/
│       ├── plugin.py
│       └── decorators.py
├── provero-snowflake/
│   └── src/provero/connectors/
│       └── snowflake.py
├── provero-bigquery/
│   └── src/provero/connectors/
│       └── bigquery.py
├── provero-kafka/
│   └── src/provero/connectors/
│       └── kafka.py
├── provero-server/
│   └── (installs provero-core + API dependencies)
├── docs/
│   ├── getting-started.md
│   ├── aql-spec.md                  # AQL language specification
│   ├── connectors/
│   ├── airflow-integration.md
│   └── migration-from-gx.md         # Guide for migrating from Great Expectations
├── examples/
│   ├── quickstart/                  # 3-line provero.yaml
│   ├── ecommerce/                   # Full e-commerce pipeline
│   ├── iot-sensors/                 # IoT/industrial
│   ├── streaming/                   # Kafka streaming checks
│   └── data-contracts/              # Contract-first workflow
├── aql-spec/                        # AQL spec (separate, for adoption by others)
│   ├── spec.md
│   ├── schema.json                  # JSON Schema for provero.yaml
│   └── examples/
├── pyproject.toml                   # uv workspace root
└── CONTRIBUTING.md

What Makes Provero Different

┌────────────────────┬──────────┬───────────┬─────────┬──────────┐
│                    │  Provero │  Great Ex │  Soda   │  Pandera │
├────────────────────┼──────────┼───────────┼─────────┼──────────┤
│ Lines for 1 check  │ 3        │ 50+       │ 5       │ 10       │
│ Anomaly detection  │ Built-in │ No        │ Paid    │ No       │
│ Data contracts     │ Built-in │ No        │ Yes     │ No       │
│ Streaming          │ Yes      │ No        │ No      │ No       │
│ Airflow provider   │ Yes      │ Community │ Partial │ No       │
│ Flyte plugin       │ Yes      │ No        │ No      │ Yes*     │
│ CLI                │ Yes      │ Partial   │ Yes     │ No       │
│ SQL batching       │ Yes      │ No        │ No      │ N/A      │
│ Rule portability   │ AQL std  │ No        │ No      │ No       │
│ Governance         │ Apache   │ VC-backed │ VC-back │ Union.ai │
│ Self-hosted cost   │ Free     │ Free*     │ Free*   │ Free     │
│ Anomaly (self-host)│ Free     │ No        │ Paid    │ No       │
└────────────────────┴──────────┴───────────┴─────────┴──────────┘

* Free with significant engineering effort

MVP Scope (Phase 1)

What ships for v1.0:

Must have (MVP):
  ✓ provero.yaml parsing (AQL core subset)
  ✓ Check engine with 8 core check types:
    - not_null, unique, accepted_values, range
    - freshness, row_count, custom_sql, completeness
  ✓ 3 connectors: PostgreSQL, DuckDB (files), Pandas DataFrame
  ✓ CLI: provero init, provero run, provero profile
  ✓ Result store: SQLite
  ✓ Table + JSON output
  ✓ Airflow provider: ProveroCheckOperator (basic)
  ✓ One complete example: e-commerce pipeline

Phase 2:
  - Anomaly detection (Z-Score, MAD)
  - HTML reports
  - Data contracts
  - Snowflake, BigQuery connectors
  - provero profile --suggest
  - SQL batching optimizer

Phase 3:
  - Streaming engine (Kafka)
  - Server mode (REST API)
  - Prophet anomaly detection
  - Flyte plugin
  - Export to GX/Soda formats
  - AQL spec v1.0 formal publication

Phase 4:
  - Advanced anomaly (DBSCAN, multivariate)
  - Quarantine actions
  - OpenLineage integration
  - Rust CLI
  - UI dashboard (optional, community-driven)

Anti-Griffin: Community Health Plan

Lessons from Griffin's failure, built into the project DNA:

1. Multi-company from day 0
   - Seek committers from at least 3 organizations before proposing
   - No single company holds >40% of committers

2. Release early, release often
   - Monthly releases
   - No multi-year rewrites. Incremental improvements only.

3. Onboarding pipeline
   - "good first issue" labels always available
   - Contributor guide with video walkthrough
   - Monthly contributor office hours

4. Communication cadence
   - Weekly dev sync (async, on mailing list)
   - Monthly community call (video)
   - Board reports always on time

5. User community
   - Discord/Slack for real-time help
   - GitHub Discussions for decisions
   - Blog post for every release

6. Meritocratic path
   - Clear criteria for contributor → committer → PMC
   - Actively invite new committers (target: 2+ new per quarter)