.msh File Format Reference
The .msh file is the core unit of work in msh. It defines the complete lifecycle of a data asset: ingestion, transformation, and publication.
File Structure
A .msh file is written in YAML and consists of several top-level blocks:
# Top-level metadata
name: asset_name
deployment: blue_green
execution: incremental
# Ingestion
source:
type: dlt
source_name: stripe
resource: customers
# Transformation
sql: |
SELECT * FROM source('stripe', 'customers')
# Optional: Python transformation
python: |
def transform(df):
return df
# Optional: Data quality
tests:
- unique: id
- not_null: email
# Optional: Incremental strategy
incremental:
strategy: merge
primary_key: id
write_disposition: merge
# Optional: Publication
publish:
to: salesforce
object: Contact
Top-Level Fields
name (optional)
Type: string
Default: Filename without .msh extension
The name of the asset. This becomes the table/view name in your destination.
name: customers_clean
deployment (optional)
Type: string
Default: blue_green
Options: blue_green, direct
Deployment strategy:
blue_green: Deploy to a staging schema first, then swap (recommended)direct: Deploy directly to production (use with caution)
deployment: blue_green
execution (optional)
Type: string
Default: full_refresh
Options: full_refresh, incremental
Execution mode:
full_refresh: Drop and recreate the table on every runincremental: Append or merge new data only
execution: incremental
layer (optional)
Type: string
Default: Auto-detected from directory path
Specifies the layer for inheriting defaults from msh.yaml. If not specified, msh auto-detects from the directory path:
models/staging/→stagingmodels/marts/→martsmodels/intermediate/→intermediate
layer: staging
Benefits:
- Inherits default configs (write_disposition, primary_key, test_suites) from
msh.yaml - Consistent patterns across layers
- Override defaults when needed
See msh.yaml Configuration Reference for more details.
Ingestion Block: source
The source block defines where data comes from. msh supports two approaches:
- Source References: Reference a source defined in
msh.yaml(recommended for DRY) - Direct Credentials: Define credentials directly in the
.mshfile
Both approaches work and can be mixed in the same project.
Source References
Reference a source defined in msh.yaml:
# msh.yaml
sources:
- name: prod_db
type: sql_database
credentials: "${DB_PROD_CREDENTIALS}"
schema: public
tables:
- name: orders
# models/stg_orders.msh
ingest:
source: prod_db
table: orders
For SQL Database Sources:
ingest:
source: prod_db # Source name from msh.yaml
table: orders # Table name (or schema.table)
For REST API Sources:
ingest:
source: stripe_api # Source name from msh.yaml
resource: customers # Resource name from msh.yaml
Benefits:
- Define credentials once in
msh.yaml - Change credentials in one place, update everywhere
- Clear source catalog
- Environment variable support
See msh.yaml Configuration Reference for complete source definition documentation.
Direct Credentials
Define credentials directly in the .msh file:
REST API Source
source:
type: dlt
source_name: rest_api
endpoint: https://api.example.com/users
credentials:
api_key: ${API_KEY}
pagination:
type: offset
limit: 100
Fields:
type: Alwaysdltfor ingestionsource_name:rest_apifor REST APIsendpoint: The API endpoint URLcredentials: Authentication (use environment variables)pagination(optional): Pagination strategy
SQL Database Source
Using Direct Credentials:
source:
type: dlt
source_name: sql_database
credentials: ${SOURCE_DB_URI}
table: users
schema: public
Using Source Reference (Recommended):
# msh.yaml
sources:
- name: prod_db
type: sql_database
credentials: "${DB_PROD_CREDENTIALS}"
schema: public
tables:
- name: users
# models/stg_users.msh
ingest:
source: prod_db
table: users
Fields (Direct Credentials):
source_name:sql_databasefor DB-to-DB replicationcredentials: Connection string (use environment variables)table: Source table nameschema(optional): Source schema name
Fields (Source Reference):
source: Source name frommsh.yamltable: Table name (can beschema.tableformat)
dlt Verified Source
For sources with native dlt support (Stripe, Salesforce, etc.):
source:
type: dlt
source_name: stripe
resource: customers
credentials:
api_key: ${STRIPE_API_KEY}
Fields:
source_name: Name of the dlt verified source (e.g.,stripe,salesforce,github)resource: The specific resource to load (e.g.,customers,invoices)credentials: Source-specific authentication
Supported Sources: See dlt verified sources
GraphQL Source
source:
type: dlt
source_name: graphql
endpoint: https://api.example.com/graphql
query: |
query {
users {
id
name
email
}
}
credentials:
token: ${GRAPHQL_TOKEN}
Transformation Block: sql
The sql block defines the transformation logic using SQL.
sql: |
SELECT
id,
name,
email,
UPPER(status) as status_upper,
created_at
FROM source('stripe', 'customers')
WHERE status = 'active'
Special Functions:
source('source_name', 'resource'): References the ingested data{{ ref('model_name') }}: References another model{{ var("variable_name") }}: References a variable frommsh.yaml{{ macro_name('arg1', 'arg2') }}: Calls a SQL macro- Standard SQL functions are supported (depends on your destination dialect)
Using Variables in SQL
Variables defined in msh.yaml can be used in SQL:
# msh.yaml
vars:
active_status: "active"
start_date: "${START_DATE}"
# models/stg_orders.msh
sql: |
SELECT * FROM {{ source }}
WHERE status = '{{ var("active_status") }}'
AND created_at > '{{ var("start_date") }}'
Using SQL Macros
Macros defined in macros/ directory can be used in SQL:
-- macros/common.sql
{% macro clean_string(column_name) %}
TRIM(UPPER({{ column_name }}))
{% endmacro %}
# models/stg_orders.msh
sql: |
SELECT
id,
{{ clean_string('customer_name') }} as customer_name
FROM {{ source }}
Python Transformation Block: python
The python block allows you to transform data using Python (Polars or Pandas).
python: |
import polars as pl
def transform(df: pl.DataFrame) -> pl.DataFrame:
# Mask PII
return df.with_columns(
pl.col("email").map_elements(lambda x: "*****" + x[-5:])
)
Requirements:
- Must define a
transformfunction - Function must accept a DataFrame and return a DataFrame
- Supports both Polars (default) and Pandas
Execution Order:
- Data is ingested
- Python
transformis applied - SQL is applied to the transformed data (if both are present)
Data Quality: tests and test_suites
Define data quality tests that run before deployment. You can use individual tests or reference test suites defined in msh.yaml.
Individual Tests
tests:
- unique: id
- not_null: [id, email]
- accepted_values:
column: status
values: ['active', 'inactive', 'pending']
- relationships:
column: customer_id
to: ref('customers')
field: id
- assert: "amount > 0"
Available Tests:
unique: Column must have unique valuesnot_null: Column cannot be nullaccepted_values: Column must be in a list of valuesrelationships: Foreign key constraintassert: Custom SQL assertion
Test Suites
Reference test suites defined in msh.yaml:
# msh.yaml
test_suites:
staging:
- assert: "id IS NOT NULL"
- assert: "created_at IS NOT NULL"
- unique: id
financial:
- assert: "amount > 0"
# models/staging/stg_orders.msh
test_suites:
- staging
- financial
# Automatically expands to all tests from both suites
Combining Tests and Test Suites
You can combine test suites with individual tests:
test_suites:
- staging
- financial
tests:
- assert: "order_date >= '2024-01-01'" # Additional test
# All tests are merged together
If any test fails, the deployment is aborted and the Blue schema remains unchanged.
Test Results Capture
Test results are automatically captured after each pipeline run:
- Storage: Results saved to
.msh/test_results.json - Catalog Integration: Test results included in
msh_catalog.json - API Access: Available via
/api/quality.jsonendpoint when UI is running - Dashboard: View test results in the msh UI dashboard
Test Results Structure:
{
"orders": {
"asset_name": "orders",
"timestamp": "2024-01-15T10:30:00Z",
"tests": [
{
"name": "test_orders_id_unique",
"status": "pass",
"execution_time": 0.5
}
],
"summary": {
"total": 5,
"passed": 4,
"failed": 1
}
}
}
See Using the Dashboard for more details.
Schema Contracts: contract
Define schema expectations and evolution rules that are validated before data ingestion.
contract:
evolution: evolve # Schema evolution mode
enforce_types: true # Enforce type consistency
required_columns: # Columns that must exist
- id
- name
- email
allow_new_columns: true # Allow new columns (default: true)
Fields:
-
evolution(optional):"evolve"(default) or"freeze""evolve": Allows new columns to be added automatically as the source schema changes"freeze": Prevents new columns from being added (uses dlt'sschema_evolution="freeze")
-
enforce_types(optional):trueorfalse(default:false)true: Validates that data types match expectationsfalse: Allows type flexibility
-
required_columns(optional): List of column names- Columns that must exist in the source data
- Pipeline fails immediately if any required column is missing
- Empty list or omitted means no columns are required
-
allow_new_columns(optional):trueorfalse(default:true)true: Allows columns not inrequired_columnslistfalse: Only allows columns specified inrequired_columns(whenevolution: freeze)
Contract Validation
Contracts are validated before ingestion (Pre-Flight Contracts):
- Required Columns: Checks that all
required_columnsexist in source data - Schema Evolution: If
evolution: freeze, prevents new columns - Type Enforcement: If
enforce_types: true, validates data types
Failure Example:
contract:
evolution: freeze
required_columns: [id, name, email]
# If source is missing 'email' column:
# Error: Contract Failed: Missing required columns: ['email']
# Pipeline aborts before any data is processed
Example: Flexible Schema (Default)
name: api_data
ingest:
type: rest_api
endpoint: https://api.example.com/data
contract:
evolution: evolve # Allow new columns
enforce_types: false # Flexible types
required_columns: [id] # Only id is required
transform: |
SELECT * FROM {{ source }}
Example: Strict Schema
name: stable_table
ingest:
type: sql_database
credentials: ${DB_URI}
table: customers
contract:
evolution: freeze # Prevent new columns
enforce_types: true # Strict type checking
required_columns: # All these must exist
- id
- name
- email
- created_at
allow_new_columns: false # Only required columns allowed
transform: |
SELECT id, name, email, created_at FROM {{ source }}
Relationship to Schema Evolution
evolution: evolve: Source can add new columns, msh adapts automaticallyevolution: freeze: Source schema must remain exactly as specifiedallow_new_columns: false: Combined withfreeze, creates strict schema enforcement
See Atomic Resilience for more details.
Incremental Strategy: incremental
Configure incremental loading to avoid full table scans.
incremental:
strategy: merge
primary_key: id
write_disposition: merge
incremental_key: updated_at
Fields:
strategy:mergeorappendprimary_key: Column(s) to use for deduplicationwrite_disposition:merge(upsert) orappend(insert only)incremental_key(optional): Column to track changes (e.g.,updated_at)
Merge Strategy
incremental:
strategy: merge
primary_key: id
write_disposition: merge
Updates existing rows and inserts new ones based on primary_key.
Append Strategy
incremental:
strategy: append
write_disposition: append
# Use 'cursor_field' for dlt sources to identify new data
cursor_field: updated_at
Only inserts new rows, never updates.
Publication Block: publish
Define how to publish transformed data to downstream systems (Reverse ETL).
publish:
to: salesforce
object: Contact
mapping:
email: Email
name: Name
credentials: ${SALESFORCE_CREDENTIALS}
Fields:
to: Destination system (salesforce,hubspot,postgres,filesystem)object: Target object/table namemapping: Field mapping (source → destination)credentials: Authentication
Expose Block: expose (Optional)
Define how this asset should be exposed to downstream consumers, such as dashboards or catalogs.
expose:
- type: dashboard
name: revenue_dashboard
url: https://mshdata.io/docs
Fields:
type: Type of exposure (e.g.,dashboard,notebook,app)name: Human-readable nameurl: Link to the external resource
Complete Examples
Example 1: Simple API to SQL
name: github_issues
source:
type: dlt
source_name: rest_api
endpoint: https://api.github.com/repos/owner/repo/issues
credentials:
token: ${GITHUB_TOKEN}
sql: |
SELECT
id,
title,
state,
created_at
FROM source('rest_api', 'issues')
WHERE state = 'open'
Example 2: Incremental with Tests
name: stripe_customers
execution: incremental
source:
type: dlt
source_name: stripe
resource: customers
credentials:
api_key: ${STRIPE_API_KEY}
sql: |
SELECT
id,
email,
name,
created
FROM source('stripe', 'customers')
incremental:
strategy: merge
primary_key: id
write_disposition: merge
incremental_key: created
tests:
- unique: id
- not_null: [id, email]
Example 3: Polyglot with PII Masking
name: users_masked
source:
type: dlt
source_name: sql_database
credentials: ${SOURCE_DB}
table: users
python: |
import polars as pl
def transform(df: pl.DataFrame) -> pl.DataFrame:
return df.with_columns([
pl.col("email").map_elements(lambda x: "*****" + x[-5:]),
pl.col("phone").map_elements(lambda x: "***-***-" + x[-4:])
])
sql: |
SELECT
id,
email,
phone,
created_at
FROM {{ python_output }}
Example 4: Full Pipeline with Reverse ETL
name: active_leads
execution: incremental
source:
type: dlt
source_name: salesforce
resource: Lead
credentials:
username: ${SALESFORCE_USERNAME}
password: ${SALESFORCE_PASSWORD}
security_token: ${SALESFORCE_TOKEN}
sql: |
SELECT
Id,
Email,
Company,
Status
FROM source('salesforce', 'Lead')
WHERE Status = 'Qualified'
incremental:
strategy: merge
primary_key: Id
write_disposition: merge
tests:
- unique: Id
- not_null: Email
publish:
to: hubspot
object: Contact
mapping:
Email: email
Company: company
Best Practices
- Use Environment Variables: Never hardcode credentials
- Name Explicitly: Set
nameto avoid filename-based naming - Test Everything: Add
testsfor critical columns - Incremental When Possible: Use
incrementalfor large datasets - Document Complex Logic: Add comments to your SQL
- Version Control: Commit
.mshfiles, not.env