
dagster-orchestration
by Obsidian-Owl
The Open Platform for building Data Platforms. Ship faster. Stay compliant. Scale to Data Mesh.
SKILL.md
name: dagster-orchestration description: | ALWAYS USE when working with Dagster assets, resources, IO managers, schedules, sensors, or dbt integration. CRITICAL for: @asset decorators, @dbt_assets, DbtCliResource, ConfigurableResource, IO managers, partitions. Enforces CATALOG-AS-CONTROL-PLANE architecture - ALL Iceberg writes via catalog (Polaris/Glue). Provides pluggable orchestration patterns abstractable to Airflow/Prefect. Compute abstraction: DuckDB (default), Spark, Snowflake - all via dbt.
Dagster Orchestration with dbt Integration
Constitution Alignment
This skill enforces project principles:
- I. Technology Ownership: Dagster owns orchestration, dbt owns SQL
- II. Plugin-First Architecture: Orchestrator is pluggable (Dagster default, Airflow 3.x alternative)
- III. Enforced vs Pluggable: Iceberg format ENFORCED, compute engine PLUGGABLE
- VIII. Observability By Default: All operations emit OpenTelemetry traces and OpenLineage events
Related ADRs
| ADR | Decision | Relevance |
|---|---|---|
| ADR-0011 | Pluggable Orchestration | Dagster as default, Airflow 3.x as alternative |
| ADR-0009 | dbt Owns SQL | NEVER parse SQL in Python - dbt handles all transformations |
| ADR-0005 | Apache Iceberg Enforced | All tables MUST be Iceberg format |
| ADR-0010 | Target-Agnostic Compute | DuckDB default, Snowflake/Spark via dbt |
| ADR-0033 | Target Airflow 3.x | Alternative orchestrator option |
Critical Architecture: Catalog-as-Control-Plane
⚠️ NEVER write directly to storage. ALL table operations MUST flow through catalog:
┌─────────────────────────────────────────────────────────────┐
│ DAGSTER ORCHESTRATION │
│ (Schedule → Sensor → Asset Graph → Materialization) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ dbt TRANSFORMATIONS │
│ (SQL owns transformations - NEVER parse SQL in Python) │
└─────────────────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ DuckDB │ │ Spark │ │Snowflake│
│(default)│ │ (scale) │ │(analytic)│
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└───────────────┼───────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ POLARIS / GLUE CATALOG (REST API) │
│ ⚡ CONTROL PLANE - ACID, Schema, Access, Governance ⚡ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ ICEBERG TABLES │
│ (S3 / Azure / GCS) │
└─────────────────────────────────────────────────────────────┘
Why Catalog-as-Control-Plane?
- ACID transactions across ALL compute engines
- Schema evolution coordination (engines see same schema)
- Access control and governance (row/column masking)
- Multi-engine interoperability (DuckDB + Spark + dbt query same tables)
- See:
references/catalog-control-plane.md
Pluggable Orchestration Design
Design assets as pure functions runnable in ANY orchestrator:
| Concept | Dagster | Airflow | Prefect |
|---|---|---|---|
| Unit of Work | @asset | @task | @task |
| Dependencies | Asset deps | Task deps | Task deps |
| Scheduling | @schedule | DAG schedule | Deployment |
| Event-driven | @sensor | Sensor | Event handlers |
| Configuration | ConfigurableResource | Connection/Variable | Block |
See: references/orchestration-abstraction.md
Pre-Implementation Checklist
Step 1: Verify Runtime Environment
# ALWAYS run first
python -c "import dagster; print(f'Dagster {dagster.__version__}')"
python -c "import dagster_dbt; print(f'dagster-dbt {dagster_dbt.__version__}')"
python -c "import dagster_iceberg; print(f'dagster-iceberg installed')"
dbt --version
Step 2: Discover Existing Patterns
# Find Dagster definitions
rg "@asset|@multi_asset|@dbt_assets" --type py
rg "ConfigurableResource|IOManager" --type py
rg "dg.Definitions|Definitions\(" --type py
# Find dbt project
find . -name "dbt_project.yml"
find . -name "manifest.json" -path "*/target/*"
# Check catalog configuration
cat platform.yaml | grep -A 20 "catalogs:"
Step 3: Understand Platform Configuration
# Two-tier config: platform.yaml (credentials) + floe.yaml (logical refs)
cat platform.yaml # Engineers NEVER see credentials in code
cat floe.yaml # Data engineers reference: catalog: default
dbt Integration (Primary Pattern)
Pattern 1: Load dbt Assets from Manifest
from pathlib import Path
from dagster import AssetExecutionContext, Definitions
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
dbt_project = DbtProject(
project_dir=Path(__file__).parent / "dbt",
packaged_project_dir=Path(__file__).parent / "dbt-project",
)
dbt_project.prepare_if_dev() # Hot-reload in dev
@dbt_assets(manifest=dbt_project.manifest_path)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
"""Execute dbt build - yields Dagster events for each model."""
yield from dbt.cli(["build"], context=context).stream()
defs = Definitions(
assets=[my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=dbt_project)},
)
Pattern 2: Custom DagsterDbtTranslator
from typing import Any, Mapping, Optional
from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator, DagsterDbtTranslatorSettings
class FloeDbTranslator(DagsterDbtTranslator):
"""Translator for floe-runtime architecture."""
def __init__(self):
super().__init__(
settings=DagsterDbtTranslatorSettings(
enable_code_references=True,
enable_source_tests_as_checks=True,
)
)
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
"""Map dbt models to Dagster asset keys with namespace."""
schema = dbt_resource_props.get("schema", "default")
name = dbt_resource_props["name"]
return AssetKey([schema, name])
def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
"""Group by dbt folder structure."""
fqn = dbt_resource_props.get("fqn", [])
return fqn[1] if len(fqn) > 2 else None
def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
"""Extract governance metadata from dbt model meta."""
meta = dbt_resource_props.get("meta", {})
return {
"classification": meta.get("classification"),
"owner": meta.get("owner"),
"sla": meta.get("sla"),
}
See: references/dbt-integration.md for complete patterns
Pattern 3: Partitioned dbt Assets
from dagster import DailyPartitionsDefinition
daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")
@dbt_assets(
manifest=dbt_project.manifest_path,
partitions_def=daily_partitions,
)
def partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
partition_date = context.partition_key
yield from dbt.cli(
["build", "--vars", f'{{"run_date": "{partition_date}"}}'],
context=context,
).stream()
Compute Target Integration
DuckDB (Default - Ephemeral Compute via dbt-duckdb)
DuckDB reads/writes via catalog ATTACH:
-- dbt-duckdb plugin automatically executes:
ATTACH 'demo_catalog' AS polaris_catalog (
TYPE ICEBERG,
CLIENT_ID '{{ env_var("POLARIS_CLIENT_ID") }}',
CLIENT_SECRET '{{ env_var("POLARIS_CLIENT_SECRET") }}',
ENDPOINT '{{ env_var("POLARIS_URI") }}'
);
Snowflake (Analytical Compute)
-- External Iceberg via Polaris integration
CREATE OR REPLACE ICEBERG TABLE gold.metrics
CATALOG = 'polaris_catalog'
EXTERNAL_VOLUME = 'iceberg_volume'
AS SELECT * FROM silver.orders;
Spark (Distributed Compute)
@asset(kinds={"spark"})
def spark_asset(spark: SparkResource):
spark.spark_session.sql("""
INSERT INTO polaris_catalog.gold.metrics
SELECT * FROM polaris_catalog.silver.orders
""")
See: references/compute-abstraction.md
IO Manager Patterns
Iceberg IO Manager (Catalog-Controlled)
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager
iceberg_io_manager = PyArrowIcebergIOManager(
name="polaris_catalog",
config=IcebergCatalogConfig(
properties={
"type": "rest",
"uri": "http://polaris:8181/api/catalog",
"credential": f"{client_id}:{client_secret}",
"warehouse": "demo_catalog",
}
),
namespace="default",
)
See: references/io-managers.md
Environment-Based Resource Switching
import os
from dagster import EnvVar
def get_resources_for_env() -> dict:
env = os.getenv("DAGSTER_DEPLOYMENT", "local")
base_resources = {"dbt": DbtCliResource(project_dir=dbt_project)}
if env == "local":
return {**base_resources, "io_manager": local_iceberg_io()}
elif env == "production":
return {**base_resources, "io_manager": prod_iceberg_io()}
Validation Workflow
Before Implementation
- Verified Dagster + dagster-dbt versions
- Located dbt project and manifest.json
- Understood catalog configuration (Polaris/Glue)
- Identified compute targets (DuckDB/Snowflake/Spark)
- Read
/docs/for CompiledArtifacts contract
During Implementation
- Using
@dbt_assetsfor dbt models - Custom DagsterDbtTranslator for metadata
- IO manager uses catalog (NOT direct storage writes)
- Resources configured per environment
- Partitions aligned with dbt vars
After Implementation
- Run
dagster dev- verify assets appear - Materialize assets manually
- Verify data lineage in UI
- Check Polaris catalog for table metadata
- Test schedules/sensors
Anti-Patterns to Avoid
❌ Don't write to Iceberg without going through catalog
❌ Don't hardcode compute logic (use dbt for SQL transforms)
❌ Don't mix Dagster partitions with dbt incremental without alignment
❌ Don't use deprecated load_assets_from_dbt_manifest()
❌ Don't bypass DbtCliResource for dbt execution
❌ Don't store credentials in code (use EnvVar or secret_ref)
❌ Don't parse SQL in Python (dbt owns SQL)
Reference Documentation
| Document | Purpose |
|---|---|
references/dbt-integration.md | Complete dbt-Dagster patterns |
references/compute-abstraction.md | DuckDB, Spark, Snowflake patterns |
references/io-managers.md | Iceberg IO managers, storage layer |
references/orchestration-abstraction.md | Pluggable Airflow/Prefect patterns |
references/catalog-control-plane.md | CRITICAL architecture doc |
API-REFERENCE.md | Dagster SDK quick reference |
Quick Reference: Research Queries
When uncertain, search:
- "Dagster dbt_assets decorator examples 2025"
- "DagsterDbtTranslator custom implementation 2025"
- "dagster-iceberg PyArrowIcebergIOManager 2025"
- "DuckDB Iceberg REST catalog ATTACH 2025"
Remember: Design for abstraction. Dagster orchestrates, dbt owns SQL, catalog controls storage.
Score
Total Score
Based on repository quality metrics
SKILL.mdファイルが含まれている
ライセンスが設定されている
100文字以上の説明がある
GitHub Stars 100以上
1ヶ月以内に更新
10回以上フォークされている
オープンIssueが50未満
プログラミング言語が設定されている
1つ以上のタグが設定されている
Reviews
Reviews coming soon
