Skip to main content

.msh File Format Reference

The .msh file is the core unit of work in msh. It defines the complete lifecycle of a data asset: ingestion, transformation, and publication.

File Structure

A .msh file is written in YAML and consists of several top-level blocks:

# Top-level metadata
name: asset_name
deployment: blue_green
execution: incremental

# Ingestion
source:
type: dlt
source_name: stripe
resource: customers

# Transformation
sql: |
SELECT * FROM source('stripe', 'customers')

# Optional: Python transformation
python: |
def transform(df):
return df

# Optional: Data quality
tests:
- unique: id
- not_null: email

# Optional: Incremental strategy
incremental:
strategy: merge
primary_key: id
write_disposition: merge

# Optional: Publication
publish:
to: salesforce
object: Contact

Top-Level Fields

name (optional)

Type: string
Default: Filename without .msh extension

The name of the asset. This becomes the table/view name in your destination.

name: customers_clean

deployment (optional)

Type: string
Default: blue_green
Options: blue_green, direct

Deployment strategy:

  • blue_green: Deploy to a staging schema first, then swap (recommended)
  • direct: Deploy directly to production (use with caution)
deployment: blue_green

execution (optional)

Type: string
Default: full_refresh
Options: full_refresh, incremental

Execution mode:

  • full_refresh: Drop and recreate the table on every run
  • incremental: Append or merge new data only
execution: incremental

layer (optional)

Type: string
Default: Auto-detected from directory path

Specifies the layer for inheriting defaults from msh.yaml. If not specified, msh auto-detects from the directory path:

  • models/staging/staging
  • models/marts/marts
  • models/intermediate/intermediate
layer: staging

Benefits:

  • Inherits default configs (write_disposition, primary_key, test_suites) from msh.yaml
  • Consistent patterns across layers
  • Override defaults when needed

See msh.yaml Configuration Reference for more details.


Ingestion Block: source

The source block defines where data comes from. msh supports two approaches:

  1. Source References: Reference a source defined in msh.yaml (recommended for DRY)
  2. Direct Credentials: Define credentials directly in the .msh file

Both approaches work and can be mixed in the same project.

Source References

Reference a source defined in msh.yaml:

# msh.yaml
sources:
- name: prod_db
type: sql_database
credentials: "${DB_PROD_CREDENTIALS}"
schema: public
tables:
- name: orders

# models/stg_orders.msh
ingest:
source: prod_db
table: orders

For SQL Database Sources:

ingest:
source: prod_db # Source name from msh.yaml
table: orders # Table name (or schema.table)

For REST API Sources:

ingest:
source: stripe_api # Source name from msh.yaml
resource: customers # Resource name from msh.yaml

Benefits:

  • Define credentials once in msh.yaml
  • Change credentials in one place, update everywhere
  • Clear source catalog
  • Environment variable support

See msh.yaml Configuration Reference for complete source definition documentation.

Direct Credentials

Define credentials directly in the .msh file:

REST API Source

source:
type: dlt
source_name: rest_api
endpoint: https://api.example.com/users
credentials:
api_key: ${API_KEY}
pagination:
type: offset
limit: 100

Fields:

  • type: Always dlt for ingestion
  • source_name: rest_api for REST APIs
  • endpoint: The API endpoint URL
  • credentials: Authentication (use environment variables)
  • pagination (optional): Pagination strategy

SQL Database Source

Using Direct Credentials:

source:
type: dlt
source_name: sql_database
credentials: ${SOURCE_DB_URI}
table: users
schema: public

Using Source Reference (Recommended):

# msh.yaml
sources:
- name: prod_db
type: sql_database
credentials: "${DB_PROD_CREDENTIALS}"
schema: public
tables:
- name: users

# models/stg_users.msh
ingest:
source: prod_db
table: users

Fields (Direct Credentials):

  • source_name: sql_database for DB-to-DB replication
  • credentials: Connection string (use environment variables)
  • table: Source table name
  • schema (optional): Source schema name

Fields (Source Reference):

  • source: Source name from msh.yaml
  • table: Table name (can be schema.table format)

dlt Verified Source

For sources with native dlt support (Stripe, Salesforce, etc.):

source:
type: dlt
source_name: stripe
resource: customers
credentials:
api_key: ${STRIPE_API_KEY}

Fields:

  • source_name: Name of the dlt verified source (e.g., stripe, salesforce, github)
  • resource: The specific resource to load (e.g., customers, invoices)
  • credentials: Source-specific authentication

Supported Sources: See dlt verified sources


GraphQL Source

source:
type: dlt
source_name: graphql
endpoint: https://api.example.com/graphql
query: |
query {
users {
id
name
email
}
}
credentials:
token: ${GRAPHQL_TOKEN}

Transformation Block: sql

The sql block defines the transformation logic using SQL.

sql: |
SELECT
id,
name,
email,
UPPER(status) as status_upper,
created_at
FROM source('stripe', 'customers')
WHERE status = 'active'

Special Functions:

  • source('source_name', 'resource'): References the ingested data
  • {{ ref('model_name') }}: References another model
  • {{ var("variable_name") }}: References a variable from msh.yaml
  • {{ macro_name('arg1', 'arg2') }}: Calls a SQL macro
  • Standard SQL functions are supported (depends on your destination dialect)

Using Variables in SQL

Variables defined in msh.yaml can be used in SQL:

# msh.yaml
vars:
active_status: "active"
start_date: "${START_DATE}"
# models/stg_orders.msh
sql: |
SELECT * FROM {{ source }}
WHERE status = '{{ var("active_status") }}'
AND created_at > '{{ var("start_date") }}'

Using SQL Macros

Macros defined in macros/ directory can be used in SQL:

-- macros/common.sql
{% macro clean_string(column_name) %}
TRIM(UPPER({{ column_name }}))
{% endmacro %}
# models/stg_orders.msh
sql: |
SELECT
id,
{{ clean_string('customer_name') }} as customer_name
FROM {{ source }}

Python Transformation Block: python

The python block allows you to transform data using Python (Polars or Pandas).

python: |
import polars as pl

def transform(df: pl.DataFrame) -> pl.DataFrame:
# Mask PII
return df.with_columns(
pl.col("email").map_elements(lambda x: "*****" + x[-5:])
)

Requirements:

  • Must define a transform function
  • Function must accept a DataFrame and return a DataFrame
  • Supports both Polars (default) and Pandas

Execution Order:

  1. Data is ingested
  2. Python transform is applied
  3. SQL is applied to the transformed data (if both are present)

Data Quality: tests and test_suites

Define data quality tests that run before deployment. You can use individual tests or reference test suites defined in msh.yaml.

Individual Tests

tests:
- unique: id
- not_null: [id, email]
- accepted_values:
column: status
values: ['active', 'inactive', 'pending']
- relationships:
column: customer_id
to: ref('customers')
field: id
- assert: "amount > 0"

Available Tests:

  • unique: Column must have unique values
  • not_null: Column cannot be null
  • accepted_values: Column must be in a list of values
  • relationships: Foreign key constraint
  • assert: Custom SQL assertion

Test Suites

Reference test suites defined in msh.yaml:

# msh.yaml
test_suites:
staging:
- assert: "id IS NOT NULL"
- assert: "created_at IS NOT NULL"
- unique: id
financial:
- assert: "amount > 0"
# models/staging/stg_orders.msh
test_suites:
- staging
- financial
# Automatically expands to all tests from both suites

Combining Tests and Test Suites

You can combine test suites with individual tests:

test_suites:
- staging
- financial
tests:
- assert: "order_date >= '2024-01-01'" # Additional test
# All tests are merged together

If any test fails, the deployment is aborted and the Blue schema remains unchanged.

Test Results Capture

Test results are automatically captured after each pipeline run:

  • Storage: Results saved to .msh/test_results.json
  • Catalog Integration: Test results included in msh_catalog.json
  • API Access: Available via /api/quality.json endpoint when UI is running
  • Dashboard: View test results in the msh UI dashboard

Test Results Structure:

{
"orders": {
"asset_name": "orders",
"timestamp": "2024-01-15T10:30:00Z",
"tests": [
{
"name": "test_orders_id_unique",
"status": "pass",
"execution_time": 0.5
}
],
"summary": {
"total": 5,
"passed": 4,
"failed": 1
}
}
}

See Using the Dashboard for more details.


Schema Contracts: contract

Define schema expectations and evolution rules that are validated before data ingestion.

contract:
evolution: evolve # Schema evolution mode
enforce_types: true # Enforce type consistency
required_columns: # Columns that must exist
- id
- name
- email
allow_new_columns: true # Allow new columns (default: true)

Fields:

  • evolution (optional): "evolve" (default) or "freeze"

    • "evolve": Allows new columns to be added automatically as the source schema changes
    • "freeze": Prevents new columns from being added (uses dlt's schema_evolution="freeze")
  • enforce_types (optional): true or false (default: false)

    • true: Validates that data types match expectations
    • false: Allows type flexibility
  • required_columns (optional): List of column names

    • Columns that must exist in the source data
    • Pipeline fails immediately if any required column is missing
    • Empty list or omitted means no columns are required
  • allow_new_columns (optional): true or false (default: true)

    • true: Allows columns not in required_columns list
    • false: Only allows columns specified in required_columns (when evolution: freeze)

Contract Validation

Contracts are validated before ingestion (Pre-Flight Contracts):

  1. Required Columns: Checks that all required_columns exist in source data
  2. Schema Evolution: If evolution: freeze, prevents new columns
  3. Type Enforcement: If enforce_types: true, validates data types

Failure Example:

contract:
evolution: freeze
required_columns: [id, name, email]

# If source is missing 'email' column:
# Error: Contract Failed: Missing required columns: ['email']
# Pipeline aborts before any data is processed

Example: Flexible Schema (Default)

name: api_data
ingest:
type: rest_api
endpoint: https://api.example.com/data

contract:
evolution: evolve # Allow new columns
enforce_types: false # Flexible types
required_columns: [id] # Only id is required

transform: |
SELECT * FROM {{ source }}

Example: Strict Schema

name: stable_table
ingest:
type: sql_database
credentials: ${DB_URI}
table: customers

contract:
evolution: freeze # Prevent new columns
enforce_types: true # Strict type checking
required_columns: # All these must exist
- id
- name
- email
- created_at
allow_new_columns: false # Only required columns allowed

transform: |
SELECT id, name, email, created_at FROM {{ source }}

Relationship to Schema Evolution

  • evolution: evolve: Source can add new columns, msh adapts automatically
  • evolution: freeze: Source schema must remain exactly as specified
  • allow_new_columns: false: Combined with freeze, creates strict schema enforcement

See Atomic Resilience for more details.


Incremental Strategy: incremental

Configure incremental loading to avoid full table scans.

incremental:
strategy: merge
primary_key: id
write_disposition: merge
incremental_key: updated_at

Fields:

  • strategy: merge or append
  • primary_key: Column(s) to use for deduplication
  • write_disposition: merge (upsert) or append (insert only)
  • incremental_key (optional): Column to track changes (e.g., updated_at)

Merge Strategy

incremental:
strategy: merge
primary_key: id
write_disposition: merge

Updates existing rows and inserts new ones based on primary_key.

Append Strategy

incremental:
strategy: append
write_disposition: append
# Use 'cursor_field' for dlt sources to identify new data
cursor_field: updated_at

Only inserts new rows, never updates.


Publication Block: publish

Define how to publish transformed data to downstream systems (Reverse ETL).

publish:
to: salesforce
object: Contact
mapping:
email: Email
name: Name
credentials: ${SALESFORCE_CREDENTIALS}

Fields:

  • to: Destination system (salesforce, hubspot, postgres, filesystem)
  • object: Target object/table name
  • mapping: Field mapping (source → destination)
  • credentials: Authentication

Expose Block: expose (Optional)

Define how this asset should be exposed to downstream consumers, such as dashboards or catalogs.

expose:
- type: dashboard
name: revenue_dashboard
url: https://mshdata.io/docs

Fields:

  • type: Type of exposure (e.g., dashboard, notebook, app)
  • name: Human-readable name
  • url: Link to the external resource

Complete Examples

Example 1: Simple API to SQL

name: github_issues
source:
type: dlt
source_name: rest_api
endpoint: https://api.github.com/repos/owner/repo/issues
credentials:
token: ${GITHUB_TOKEN}

sql: |
SELECT
id,
title,
state,
created_at
FROM source('rest_api', 'issues')
WHERE state = 'open'

Example 2: Incremental with Tests

name: stripe_customers
execution: incremental

source:
type: dlt
source_name: stripe
resource: customers
credentials:
api_key: ${STRIPE_API_KEY}

sql: |
SELECT
id,
email,
name,
created
FROM source('stripe', 'customers')

incremental:
strategy: merge
primary_key: id
write_disposition: merge
incremental_key: created

tests:
- unique: id
- not_null: [id, email]

Example 3: Polyglot with PII Masking

name: users_masked

source:
type: dlt
source_name: sql_database
credentials: ${SOURCE_DB}
table: users

python: |
import polars as pl

def transform(df: pl.DataFrame) -> pl.DataFrame:
return df.with_columns([
pl.col("email").map_elements(lambda x: "*****" + x[-5:]),
pl.col("phone").map_elements(lambda x: "***-***-" + x[-4:])
])

sql: |
SELECT
id,
email,
phone,
created_at
FROM {{ python_output }}

Example 4: Full Pipeline with Reverse ETL

name: active_leads
execution: incremental

source:
type: dlt
source_name: salesforce
resource: Lead
credentials:
username: ${SALESFORCE_USERNAME}
password: ${SALESFORCE_PASSWORD}
security_token: ${SALESFORCE_TOKEN}

sql: |
SELECT
Id,
Email,
Company,
Status
FROM source('salesforce', 'Lead')
WHERE Status = 'Qualified'

incremental:
strategy: merge
primary_key: Id
write_disposition: merge

tests:
- unique: Id
- not_null: Email

publish:
to: hubspot
object: Contact
mapping:
Email: email
Company: company

Best Practices

  1. Use Environment Variables: Never hardcode credentials
  2. Name Explicitly: Set name to avoid filename-based naming
  3. Test Everything: Add tests for critical columns
  4. Incremental When Possible: Use incremental for large datasets
  5. Document Complex Logic: Add comments to your SQL
  6. Version Control: Commit .msh files, not .env