Back to list
Obsidian-Owl

dagster-orchestration

by Obsidian-Owl

The Open Platform for building Data Platforms. Ship faster. Stay compliant. Scale to Data Mesh.

0🍴 11📅 Jan 24, 2026

SKILL.md


name: dagster-orchestration description: | ALWAYS USE when working with Dagster assets, resources, IO managers, schedules, sensors, or dbt integration. CRITICAL for: @asset decorators, @dbt_assets, DbtCliResource, ConfigurableResource, IO managers, partitions. Enforces CATALOG-AS-CONTROL-PLANE architecture - ALL Iceberg writes via catalog (Polaris/Glue). Provides pluggable orchestration patterns abstractable to Airflow/Prefect. Compute abstraction: DuckDB (default), Spark, Snowflake - all via dbt.

Dagster Orchestration with dbt Integration

Constitution Alignment

This skill enforces project principles:

  • I. Technology Ownership: Dagster owns orchestration, dbt owns SQL
  • II. Plugin-First Architecture: Orchestrator is pluggable (Dagster default, Airflow 3.x alternative)
  • III. Enforced vs Pluggable: Iceberg format ENFORCED, compute engine PLUGGABLE
  • VIII. Observability By Default: All operations emit OpenTelemetry traces and OpenLineage events
ADRDecisionRelevance
ADR-0011Pluggable OrchestrationDagster as default, Airflow 3.x as alternative
ADR-0009dbt Owns SQLNEVER parse SQL in Python - dbt handles all transformations
ADR-0005Apache Iceberg EnforcedAll tables MUST be Iceberg format
ADR-0010Target-Agnostic ComputeDuckDB default, Snowflake/Spark via dbt
ADR-0033Target Airflow 3.xAlternative orchestrator option

Critical Architecture: Catalog-as-Control-Plane

⚠️ NEVER write directly to storage. ALL table operations MUST flow through catalog:

┌─────────────────────────────────────────────────────────────┐
│                   DAGSTER ORCHESTRATION                      │
│  (Schedule → Sensor → Asset Graph → Materialization)         │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                    dbt TRANSFORMATIONS                       │
│  (SQL owns transformations - NEVER parse SQL in Python)      │
└─────────────────────────────────────────────────────────────┘
                              │
              ┌───────────────┼───────────────┐
              │               │               │
              ▼               ▼               ▼
        ┌─────────┐     ┌─────────┐     ┌─────────┐
        │ DuckDB  │     │  Spark  │     │Snowflake│
        │(default)│     │ (scale) │     │(analytic)│
        └────┬────┘     └────┬────┘     └────┬────┘
              │               │               │
              └───────────────┼───────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│               POLARIS / GLUE CATALOG (REST API)              │
│     ⚡ CONTROL PLANE - ACID, Schema, Access, Governance ⚡   │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                    ICEBERG TABLES                            │
│                   (S3 / Azure / GCS)                         │
└─────────────────────────────────────────────────────────────┘

Why Catalog-as-Control-Plane?

  • ACID transactions across ALL compute engines
  • Schema evolution coordination (engines see same schema)
  • Access control and governance (row/column masking)
  • Multi-engine interoperability (DuckDB + Spark + dbt query same tables)
  • See: references/catalog-control-plane.md

Pluggable Orchestration Design

Design assets as pure functions runnable in ANY orchestrator:

ConceptDagsterAirflowPrefect
Unit of Work@asset@task@task
DependenciesAsset depsTask depsTask deps
Scheduling@scheduleDAG scheduleDeployment
Event-driven@sensorSensorEvent handlers
ConfigurationConfigurableResourceConnection/VariableBlock

See: references/orchestration-abstraction.md

Pre-Implementation Checklist

Step 1: Verify Runtime Environment

# ALWAYS run first
python -c "import dagster; print(f'Dagster {dagster.__version__}')"
python -c "import dagster_dbt; print(f'dagster-dbt {dagster_dbt.__version__}')"
python -c "import dagster_iceberg; print(f'dagster-iceberg installed')"
dbt --version

Step 2: Discover Existing Patterns

# Find Dagster definitions
rg "@asset|@multi_asset|@dbt_assets" --type py
rg "ConfigurableResource|IOManager" --type py
rg "dg.Definitions|Definitions\(" --type py

# Find dbt project
find . -name "dbt_project.yml"
find . -name "manifest.json" -path "*/target/*"

# Check catalog configuration
cat platform.yaml | grep -A 20 "catalogs:"

Step 3: Understand Platform Configuration

# Two-tier config: platform.yaml (credentials) + floe.yaml (logical refs)
cat platform.yaml    # Engineers NEVER see credentials in code
cat floe.yaml        # Data engineers reference: catalog: default

dbt Integration (Primary Pattern)

Pattern 1: Load dbt Assets from Manifest

from pathlib import Path
from dagster import AssetExecutionContext, Definitions
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets

dbt_project = DbtProject(
    project_dir=Path(__file__).parent / "dbt",
    packaged_project_dir=Path(__file__).parent / "dbt-project",
)
dbt_project.prepare_if_dev()  # Hot-reload in dev

@dbt_assets(manifest=dbt_project.manifest_path)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    """Execute dbt build - yields Dagster events for each model."""
    yield from dbt.cli(["build"], context=context).stream()

defs = Definitions(
    assets=[my_dbt_assets],
    resources={"dbt": DbtCliResource(project_dir=dbt_project)},
)

Pattern 2: Custom DagsterDbtTranslator

from typing import Any, Mapping, Optional
from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator, DagsterDbtTranslatorSettings

class FloeDbTranslator(DagsterDbtTranslator):
    """Translator for floe-runtime architecture."""

    def __init__(self):
        super().__init__(
            settings=DagsterDbtTranslatorSettings(
                enable_code_references=True,
                enable_source_tests_as_checks=True,
            )
        )

    def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
        """Map dbt models to Dagster asset keys with namespace."""
        schema = dbt_resource_props.get("schema", "default")
        name = dbt_resource_props["name"]
        return AssetKey([schema, name])

    def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
        """Group by dbt folder structure."""
        fqn = dbt_resource_props.get("fqn", [])
        return fqn[1] if len(fqn) > 2 else None

    def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
        """Extract governance metadata from dbt model meta."""
        meta = dbt_resource_props.get("meta", {})
        return {
            "classification": meta.get("classification"),
            "owner": meta.get("owner"),
            "sla": meta.get("sla"),
        }

See: references/dbt-integration.md for complete patterns

Pattern 3: Partitioned dbt Assets

from dagster import DailyPartitionsDefinition

daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")

@dbt_assets(
    manifest=dbt_project.manifest_path,
    partitions_def=daily_partitions,
)
def partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    partition_date = context.partition_key
    yield from dbt.cli(
        ["build", "--vars", f'{{"run_date": "{partition_date}"}}'],
        context=context,
    ).stream()

Compute Target Integration

DuckDB (Default - Ephemeral Compute via dbt-duckdb)

DuckDB reads/writes via catalog ATTACH:

-- dbt-duckdb plugin automatically executes:
ATTACH 'demo_catalog' AS polaris_catalog (
    TYPE ICEBERG,
    CLIENT_ID '{{ env_var("POLARIS_CLIENT_ID") }}',
    CLIENT_SECRET '{{ env_var("POLARIS_CLIENT_SECRET") }}',
    ENDPOINT '{{ env_var("POLARIS_URI") }}'
);

Snowflake (Analytical Compute)

-- External Iceberg via Polaris integration
CREATE OR REPLACE ICEBERG TABLE gold.metrics
    CATALOG = 'polaris_catalog'
    EXTERNAL_VOLUME = 'iceberg_volume'
AS SELECT * FROM silver.orders;

Spark (Distributed Compute)

@asset(kinds={"spark"})
def spark_asset(spark: SparkResource):
    spark.spark_session.sql("""
        INSERT INTO polaris_catalog.gold.metrics
        SELECT * FROM polaris_catalog.silver.orders
    """)

See: references/compute-abstraction.md

IO Manager Patterns

Iceberg IO Manager (Catalog-Controlled)

from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager

iceberg_io_manager = PyArrowIcebergIOManager(
    name="polaris_catalog",
    config=IcebergCatalogConfig(
        properties={
            "type": "rest",
            "uri": "http://polaris:8181/api/catalog",
            "credential": f"{client_id}:{client_secret}",
            "warehouse": "demo_catalog",
        }
    ),
    namespace="default",
)

See: references/io-managers.md

Environment-Based Resource Switching

import os
from dagster import EnvVar

def get_resources_for_env() -> dict:
    env = os.getenv("DAGSTER_DEPLOYMENT", "local")

    base_resources = {"dbt": DbtCliResource(project_dir=dbt_project)}

    if env == "local":
        return {**base_resources, "io_manager": local_iceberg_io()}
    elif env == "production":
        return {**base_resources, "io_manager": prod_iceberg_io()}

Validation Workflow

Before Implementation

  • Verified Dagster + dagster-dbt versions
  • Located dbt project and manifest.json
  • Understood catalog configuration (Polaris/Glue)
  • Identified compute targets (DuckDB/Snowflake/Spark)
  • Read /docs/ for CompiledArtifacts contract

During Implementation

  • Using @dbt_assets for dbt models
  • Custom DagsterDbtTranslator for metadata
  • IO manager uses catalog (NOT direct storage writes)
  • Resources configured per environment
  • Partitions aligned with dbt vars

After Implementation

  • Run dagster dev - verify assets appear
  • Materialize assets manually
  • Verify data lineage in UI
  • Check Polaris catalog for table metadata
  • Test schedules/sensors

Anti-Patterns to Avoid

Don't write to Iceberg without going through catalog ❌ Don't hardcode compute logic (use dbt for SQL transforms) ❌ Don't mix Dagster partitions with dbt incremental without alignment ❌ Don't use deprecated load_assets_from_dbt_manifest()Don't bypass DbtCliResource for dbt execution ❌ Don't store credentials in code (use EnvVar or secret_ref) ❌ Don't parse SQL in Python (dbt owns SQL)

Reference Documentation

DocumentPurpose
references/dbt-integration.mdComplete dbt-Dagster patterns
references/compute-abstraction.mdDuckDB, Spark, Snowflake patterns
references/io-managers.mdIceberg IO managers, storage layer
references/orchestration-abstraction.mdPluggable Airflow/Prefect patterns
references/catalog-control-plane.mdCRITICAL architecture doc
API-REFERENCE.mdDagster SDK quick reference

Quick Reference: Research Queries

When uncertain, search:

  • "Dagster dbt_assets decorator examples 2025"
  • "DagsterDbtTranslator custom implementation 2025"
  • "dagster-iceberg PyArrowIcebergIOManager 2025"
  • "DuckDB Iceberg REST catalog ATTACH 2025"

Remember: Design for abstraction. Dagster orchestrates, dbt owns SQL, catalog controls storage.

Score

Total Score

70/100

Based on repository quality metrics

SKILL.md

SKILL.mdファイルが含まれている

+20
LICENSE

ライセンスが設定されている

+10
説明文

100文字以上の説明がある

0/10
人気

GitHub Stars 100以上

0/15
最近の活動

1ヶ月以内に更新

+10
フォーク

10回以上フォークされている

+5
Issue管理

オープンIssueが50未満

+5
言語

プログラミング言語が設定されている

+5
タグ

1つ以上のタグが設定されている

+5

Reviews

💬

Reviews coming soon