
pyiceberg-storage
by Obsidian-Owl
The Open Platform for building Data Platforms. Ship faster. Stay compliant. Scale to Data Mesh.
SKILL.md
name: pyiceberg-storage description: ALWAYS USE when working with Iceberg table storage, ACID transactions, or time-travel queries. Use IMMEDIATELY when creating/loading Iceberg tables, implementing schema evolution, querying historical snapshots, or debugging catalog operations. Provides research steps for PyIceberg SDK, catalog integration, and storage layer operations.
PyIceberg Storage Development (Research-Driven)
⚠️ CRITICAL: Catalog-as-Control-Plane
NEVER write directly to storage. ALL Iceberg table operations MUST flow through the catalog (Polaris/Glue):
❌ FORBIDDEN: PyIceberg → S3 (bypasses catalog coordination)
✅ CORRECT: PyIceberg → Polaris Catalog → S3 (catalog coordinates writes)
Why?
- ACID transactions require catalog coordination
- Schema evolution tracked by catalog
- Access control enforced at catalog layer
- Multi-engine interoperability (DuckDB, Spark, dbt all see same metadata)
See: ADR-0005 - Apache Iceberg Enforced
Related ADRs
| ADR | Decision | Relevance |
|---|---|---|
| ADR-0005 | Apache Iceberg Enforced | All tables MUST be Iceberg format |
| ADR-0034 | dbt-duckdb Workaround | Inline credentials for ATTACH |
| ADR-0010 | Target-Agnostic Compute | PyIceberg supports all compute targets |
Philosophy
This skill does NOT prescribe specific Iceberg table patterns. Instead, it guides you to:
- Research the current PyIceberg version and capabilities
- Discover existing Iceberg table usage in the codebase
- Validate your implementations against PyIceberg SDK documentation
- Verify integration with Polaris catalog and storage layer
Pre-Implementation Research Protocol
Step 1: Verify Runtime Environment
ALWAYS run this first:
python -c "import pyiceberg; print(f'PyIceberg {pyiceberg.__version__}')"
Critical Questions to Answer:
- What version is installed? (0.10.x recommended as of 2025)
- Are required dependencies installed? (pyarrow, fsspec, cloud SDKs)
- Does it match the documented requirements?
Step 2: Research SDK State (if unfamiliar)
When to research: If you encounter unfamiliar PyIceberg features or need to validate patterns
Research queries (use WebSearch):
- "PyIceberg [feature] documentation 2025" (e.g., "PyIceberg ACID transactions 2025")
- "PyIceberg catalog REST API configuration 2025"
- "PyIceberg [cloud] integration 2025" (e.g., "PyIceberg AWS S3 integration 2025")
Official documentation: https://py.iceberg.apache.org
Key documentation sections:
- API Reference: https://py.iceberg.apache.org/reference/
- Configuration: https://py.iceberg.apache.org/configuration/
- Table Operations: https://py.iceberg.apache.org/reference/pyiceberg/table/
Step 3: Discover Existing Patterns
BEFORE creating new Iceberg operations, search for existing implementations:
# Find existing PyIceberg usage
rg "from pyiceberg|import pyiceberg" --type py
# Find catalog configurations
rg "load_catalog|Catalog" --type py
# Find table operations
rg "\.load_table|\.create_table" --type py
# Find transaction usage
rg "\.transaction|\.append|\.overwrite" --type py
Key questions:
- What catalog is configured? (REST, Hive, Glue, etc.)
- What storage is used? (S3, Azure, GCS, local filesystem)
- What table operations are already implemented?
- How are schemas defined?
Step 4: Validate Against Architecture
Check architecture docs for integration requirements:
- Read
/docs/for Iceberg storage requirements - Understand Polaris catalog integration
- Verify compute targets and storage mappings
- Check partition strategy requirements
Implementation Guidance (Not Prescriptive)
Catalog Configuration
Core concept: Catalogs manage Iceberg table metadata and coordinate with storage
Research questions:
- What catalog type should be used? (REST for Polaris, Hive, Glue)
- How should credentials be managed? (vended credentials, access delegation)
- What warehouse configuration is needed?
- How should catalog be initialized in code?
SDK features to research:
load_catalog(): Initialize catalog from configuration- REST catalog: Connection to Polaris or other REST APIs
- Catalog types:
rest,hive,glue,dynamodb,sql - Configuration: YAML files, environment variables, Python dicts
Table Operations
Core concept: Tables represent datasets with schema, partitioning, and ACID guarantees
Research questions:
- Should I create a new table or load existing?
- What schema should the table have?
- What partitioning strategy? (time-based, dimension-based)
- What sort order for performance?
SDK features to research:
catalog.create_table(): Create new Iceberg tablecatalog.load_table(): Load existing tabletable.scan(): Query table datatable.schema(): Get table schematable.spec(): Get partition spectable.properties(): Get table properties
ACID Transactions
Core concept: Iceberg uses optimistic concurrency and snapshots for ACID guarantees
Research questions:
- What operation am I performing? (append, overwrite, delete)
- Do I need transactional guarantees?
- How should concurrent writes be handled?
- What snapshot isolation level is needed?
SDK features to research:
table.append(): Append data (creates APPEND snapshot)table.overwrite(): Overwrite data (creates OVERWRITE snapshot)table.delete(): Delete data (creates DELETE snapshot)- Transaction API: Stage changes before committing
- Optimistic concurrency: Snapshot-based isolation
Schema Evolution
Core concept: Iceberg supports safe schema changes without rewriting data
Research questions:
- What schema changes are needed? (add column, rename, type promotion)
- Are changes backward compatible?
- How should schema evolution be tracked?
- What validation is needed?
SDK features to research:
table.update_schema(): Modify table schema- Schema operations:
add_column(),rename_column(),update_column() - Type promotion: Safe type changes (int → long, float → double)
- Schema versioning: Iceberg tracks schema evolution
Partitioning
Core concept: Partitioning improves query performance by organizing data
Research questions:
- What partition strategy? (daily, hourly, by dimension)
- What columns should be partition keys?
- Should partitioning be hidden (partition transforms)?
- How does partitioning affect queries?
SDK features to research:
- Partition specs: Define partitioning strategy
- Partition transforms:
day(),hour(),month(),year(),bucket(),truncate() - Hidden partitioning: Partition transforms applied automatically
- Dynamic partitioning: Partitions created as data arrives
Time Travel
Core concept: Iceberg snapshots enable querying historical data
Research questions:
- What snapshot should be queried? (timestamp, snapshot ID)
- How far back does history need to go?
- Should old snapshots be expired?
- What metadata is available in snapshots?
SDK features to research:
table.scan().use_ref(): Query specific snapshot/tag/branchtable.history(): List table snapshotstable.refs(): List named references (branches, tags)- Snapshot expiration: Clean up old snapshots
Data I/O
Core concept: PyIceberg reads/writes data using PyArrow and fsspec
Research questions:
- What data format? (Parquet, Avro, ORC)
- What file system? (S3, Azure, GCS, local)
- How should data be batched?
- What compression should be used?
SDK features to research:
table.scan().to_arrow(): Read data as PyArrow Tabletable.scan().to_pandas(): Read data as Pandas DataFrametable.scan().to_duckdb(): Read data into DuckDB- FileIO implementations: S3, Azure, GCS, fsspec
- Data file formats: Parquet (recommended), Avro, ORC
Validation Workflow
Before Implementation
- ✅ Verified PyIceberg version and dependencies
- ✅ Searched for existing catalog and table configurations
- ✅ Read architecture docs for Polaris integration
- ✅ Identified storage layer (S3, Azure, GCS)
- ✅ Researched unfamiliar PyIceberg features
During Implementation
- ✅ Using
load_catalog()with proper configuration - ✅ Type hints on ALL functions and parameters
- ✅ Proper error handling for catalog/table operations
- ✅ Transaction API for ACID guarantees
- ✅ Schema evolution following Iceberg best practices
- ✅ Partitioning strategy aligned with query patterns
After Implementation
- ✅ Verify catalog connection works
- ✅ Test table creation/loading
- ✅ Test data write operations (append, overwrite)
- ✅ Test data read operations (scan, time travel)
- ✅ Verify schema evolution works
- ✅ Check integration with dbt/Dagster (if applicable)
Context Injection (For Future Claude Instances)
When this skill is invoked, you should:
-
Verify runtime state (don't assume):
python -c "import pyiceberg; print(pyiceberg.__version__)" -
Discover existing patterns (don't invent):
rg "load_catalog" --type py rg "\.load_table|\.create_table" --type py -
Research when uncertain (don't guess):
- Use WebSearch for "PyIceberg [feature] documentation 2025"
- Check official docs: https://py.iceberg.apache.org
-
Validate against architecture (don't assume requirements):
- Read relevant architecture docs in
/docs/ - Understand Polaris catalog integration
- Check storage layer configuration
- Read relevant architecture docs in
-
Check Polaris integration (if using REST catalog):
- Verify Polaris endpoint configuration
- Check credential management (vended credentials)
- Understand access delegation model
Quick Reference: Common Research Queries
Use these WebSearch queries when encountering specific needs:
- Catalog setup: "PyIceberg REST catalog configuration 2025"
- Polaris integration: "PyIceberg Polaris catalog integration 2025"
- Table operations: "PyIceberg create table append data 2025"
- ACID transactions: "PyIceberg transaction API ACID guarantees 2025"
- Schema evolution: "PyIceberg schema evolution add column 2025"
- Partitioning: "PyIceberg partition transforms hidden partitioning 2025"
- Time travel: "PyIceberg snapshot time travel query 2025"
- AWS S3: "PyIceberg S3 FileIO configuration 2025"
- Azure: "PyIceberg Azure Blob Storage configuration 2025"
- GCS: "PyIceberg Google Cloud Storage configuration 2025"
Integration Points to Research
Polaris Catalog Integration
Key question: How does PyIceberg connect to Polaris REST catalog?
Research areas:
- REST catalog configuration (
type: rest,uri,warehouse) - Credential management (
credential,token-refresh-enabled) - Access delegation (
header.X-Iceberg-Access-Delegation: vended-credentials) - Scope and permissions (
scope: PRINCIPAL_ROLE:ALL) - FileIO implementation (
py-io-impl: pyiceberg.io.fsspec.FsspecFileIO)
dbt → PyIceberg Integration
Key question: How do dbt models write to Iceberg tables?
Research areas:
- dbt Python models using PyIceberg
- External tables (dbt creates views, PyIceberg writes data)
- Post-hooks for Iceberg table creation
- Schema mapping (dbt schema → Iceberg schema)
Dagster → PyIceberg Integration
Key question: How do Dagster assets materialize Iceberg tables?
Research areas:
- Custom IOManager for Iceberg tables
- PyIceberg in asset functions
- Partition mapping (Dagster partitions → Iceberg partitions)
- Metadata propagation (Dagster metadata → Iceberg properties)
Storage Layer Configuration
Key question: How is cloud storage configured for Iceberg?
Research areas:
- S3 configuration (bucket, region, credentials)
- Azure Blob Storage configuration
- Google Cloud Storage configuration
- Local filesystem for development
- Vended credentials vs static credentials
PyIceberg Development Workflow
Local Development
# Install PyIceberg with extras
pip install "pyiceberg[s3,pyarrow,pandas,duckdb]"
# Verify installation
python -c "import pyiceberg; print(pyiceberg.__version__)"
# Test catalog connection
python -c "from pyiceberg.catalog import load_catalog; catalog = load_catalog('default')"
Example: Load Catalog and Query Table
from pyiceberg.catalog import load_catalog
# Load catalog from configuration
catalog = load_catalog(
"polaris_catalog",
**{
"type": "rest",
"uri": "https://account.snowflakecomputing.com/polaris/api/catalog",
"credential": "client_id:client_secret",
"warehouse": "my_warehouse",
}
)
# Load table
table = catalog.load_table("my_namespace.my_table")
# Query data
df = table.scan().to_pandas()
Example: Create Table and Write Data
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, IntegerType
import pyarrow as pa
# Define schema
schema = Schema(
NestedField(1, "id", IntegerType(), required=True),
NestedField(2, "name", StringType(), required=False),
)
# Create table
table = catalog.create_table(
"my_namespace.my_new_table",
schema=schema,
)
# Append data
data = pa.table({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
})
table.append(data)
References
- PyIceberg Documentation: Official documentation
- PyIceberg on PyPI: Package information
- Configuration Guide: Catalog and storage config
- API Reference: Complete API documentation
- GitHub Repository: PyIceberg source
- Apache Iceberg Spec: Iceberg table format specification
Remember: This skill provides research guidance, NOT prescriptive patterns. Always:
- Verify the PyIceberg version and dependencies
- Discover existing catalog and table configurations
- Research SDK capabilities when needed (use WebSearch liberally)
- Validate against actual Polaris integration requirements
- Test catalog connection and table operations before considering complete
- Understand ACID guarantees and snapshot isolation for correctness
Score
Total Score
Based on repository quality metrics
SKILL.mdファイルが含まれている
ライセンスが設定されている
100文字以上の説明がある
GitHub Stars 100以上
1ヶ月以内に更新
10回以上フォークされている
オープンIssueが50未満
プログラミング言語が設定されている
1つ以上のタグが設定されている
Reviews
Reviews coming soon
