Install
Terminal · npx$
npx skills add https://github.com/obra/superpowers --skill brainstormingWorks with Paperclip
How Sqlalchemy Orm fits into a Paperclip company.
Sqlalchemy Orm drops into any Paperclip agent that handles this kind of work. Assign it to a specialist inside a pre-configured PaperclipOrg company and the skill becomes available on every heartbeat — no prompt engineering, no tool wiring.
S
SaaS FactoryPaired
Pre-configured AI company — 18 agents, 18 skills, one-time purchase.
$27$59
Explore packSource file
SKILL.md992 linesExpandCollapse
---name: sqlalchemy-ormdescription: "SQLAlchemy Python SQL toolkit and ORM with powerful query builder, relationship mapping, and database migrations via Alembic"user-invocable: falsedisable-model-invocation: trueprogressive_disclosure: entry_point: summary: "SQLAlchemy Python SQL toolkit and ORM with powerful query builder, relationship mapping, and database migrations via Alembic" when_to_use: "When working with sqlalchemy-orm or related functionality." quick_start: "1. Review the core concepts below. 2. Apply patterns to your use case. 3. Follow best practices for implementation."---# SQLAlchemy ORM Skill ---progressive_disclosure: entry_point: summary: "Python SQL toolkit and ORM with powerful query builder and relationship mapping" when_to_use: - "When building Python applications with databases" - "When needing complex SQL queries with type safety" - "When working with FastAPI/Flask/Django" - "When needing database migrations (Alembic)" quick_start: - "pip install sqlalchemy" - "Define models with declarative base" - "Create engine and session" - "Query with select() and commit()" token_estimate: entry: 70-85 full: 4500-5500--- ## Core Concepts ### SQLAlchemy 2.0 Modern APISQLAlchemy 2.0 introduced modern patterns with better type hints, improved query syntax, and async support. **Key Changes from 1.x:**- `select()` instead of `Query`- `Mapped[T]` and `mapped_column()` for type hints- Explicit `Session.execute()` for queries- Better async support with `AsyncSession` ### Installation```bash# Core SQLAlchemypip install sqlalchemy # With async supportpip install sqlalchemy[asyncio] aiosqlite # SQLitepip install sqlalchemy[asyncio] asyncpg # PostgreSQL # With Alembic for migrationspip install alembic # FastAPI integrationpip install fastapi sqlalchemy``` ## Declarative Models (SQLAlchemy 2.0) ### Basic Model Definition```pythonfrom datetime import datetimefrom typing import Optionalfrom sqlalchemy import String, DateTime, ForeignKey, funcfrom sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship # Base class for all modelsclass Base(DeclarativeBase): pass # User model with type hintsclass User(Base): __tablename__ = "users" # Primary key id: Mapped[int] = mapped_column(primary_key=True) # Required fields email: Mapped[str] = mapped_column(String(255), unique=True, index=True) username: Mapped[str] = mapped_column(String(50), unique=True) hashed_password: Mapped[str] = mapped_column(String(255)) # Optional fields full_name: Mapped[Optional[str]] = mapped_column(String(100)) is_active: Mapped[bool] = mapped_column(default=True) # Timestamps with server defaults created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now() ) updated_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), onupdate=func.now() ) # Relationships posts: Mapped[list["Post"]] = relationship(back_populates="author") def __repr__(self) -> str: return f"User(id={self.id}, email={self.email})"``` ### Relationships **One-to-Many:**```pythonclass Post(Base): __tablename__ = "posts" id: Mapped[int] = mapped_column(primary_key=True) title: Mapped[str] = mapped_column(String(200)) content: Mapped[str] user_id: Mapped[int] = mapped_column(ForeignKey("users.id")) # Relationship with back_populates author: Mapped["User"] = relationship(back_populates="posts") tags: Mapped[list["Tag"]] = relationship( secondary="post_tags", back_populates="posts" )``` **Many-to-Many:**```pythonfrom sqlalchemy import Table, Column, Integer, ForeignKey # Association tablepost_tags = Table( "post_tags", Base.metadata, Column("post_id", Integer, ForeignKey("posts.id"), primary_key=True), Column("tag_id", Integer, ForeignKey("tags.id"), primary_key=True)) class Tag(Base): __tablename__ = "tags" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(50), unique=True) posts: Mapped[list["Post"]] = relationship( secondary=post_tags, back_populates="tags" )``` ## Database Setup ### Engine and Session Configuration```pythonfrom sqlalchemy import create_enginefrom sqlalchemy.orm import sessionmaker, Sessionfrom sqlalchemy.pool import QueuePool # Database URL formats# SQLite: sqlite:///./database.db# PostgreSQL: postgresql://user:pass@localhost/dbname# MySQL: mysql+pymysql://user:pass@localhost/dbname DATABASE_URL = "postgresql://user:pass@localhost/mydb" # Create engine with connection poolingengine = create_engine( DATABASE_URL, poolclass=QueuePool, pool_size=5, max_overflow=10, pool_pre_ping=True, # Check connection before using echo=False # Set True for SQL logging) # Session factorySessionLocal = sessionmaker( bind=engine, autocommit=False, autoflush=False, expire_on_commit=False) # Create tablesBase.metadata.create_all(bind=engine)``` ### Dependency Injection (FastAPI Pattern)```pythonfrom typing import Generator def get_db() -> Generator[Session, None, None]: """Database session dependency for FastAPI.""" db = SessionLocal() try: yield db finally: db.close() # Usage in FastAPIfrom fastapi import Depends @app.get("/users/{user_id}")def get_user(user_id: int, db: Session = Depends(get_db)): return db.execute( select(User).where(User.id == user_id) ).scalar_one_or_none()``` ## Query Patterns (SQLAlchemy 2.0) ### Select Queries```pythonfrom sqlalchemy import select, and_, or_, desc, func # Basic selectstmt = select(User).where(User.email == "user@example.com")user = session.execute(stmt).scalar_one_or_none() # Multiple conditionsstmt = select(User).where( and_( User.is_active == True, User.created_at > datetime(2024, 1, 1) ))users = session.execute(stmt).scalars().all() # OR conditionsstmt = select(User).where( or_( User.email.like("%@gmail.com"), User.email.like("%@yahoo.com") )) # Ordering and limitingstmt = ( select(User) .where(User.is_active == True) .order_by(desc(User.created_at)) .limit(10) .offset(20)) # Countingstmt = select(func.count()).select_from(User)count = session.execute(stmt).scalar()``` ### Joins```python# Inner joinstmt = ( select(Post, User) .join(User, Post.user_id == User.id) .where(User.is_active == True))results = session.execute(stmt).all() # Left outer joinstmt = ( select(User, func.count(Post.id).label("post_count")) .outerjoin(Post) .group_by(User.id)) # Multiple joinsstmt = ( select(Post) .join(Post.author) .join(Post.tags) .where(Tag.name == "python"))``` ### Eager Loading (Solve N+1 Problem)```pythonfrom sqlalchemy.orm import selectinload, joinedload # selectinload - separate query (better for collections)stmt = select(User).options(selectinload(User.posts))users = session.execute(stmt).scalars().all()# Now users[0].posts won't trigger additional queries # joinedload - single query with join (better for one-to-one)stmt = select(Post).options(joinedload(Post.author))posts = session.execute(stmt).unique().scalars().all() # Nested eager loadingstmt = select(User).options( selectinload(User.posts).selectinload(Post.tags)) # Load only specific columnsfrom sqlalchemy.orm import load_onlystmt = select(User).options(load_only(User.id, User.email))``` ## CRUD Operations ### Create```pythondef create_user(db: Session, email: str, username: str, password: str): """Create new user.""" user = User( email=email, username=username, hashed_password=hash_password(password) ) db.add(user) db.commit() db.refresh(user) # Get updated fields (id, timestamps) return user # Bulk insertusers = [ User(email=f"user{i}@example.com", username=f"user{i}") for i in range(100)]db.add_all(users)db.commit()``` ### Read```pythondef get_user_by_email(db: Session, email: str) -> Optional[User]: """Get user by email.""" stmt = select(User).where(User.email == email) return db.execute(stmt).scalar_one_or_none() def get_users( db: Session, skip: int = 0, limit: int = 100) -> list[User]: """Get paginated users.""" stmt = ( select(User) .where(User.is_active == True) .order_by(User.created_at.desc()) .offset(skip) .limit(limit) ) return db.execute(stmt).scalars().all()``` ### Update```pythondef update_user(db: Session, user_id: int, **kwargs): """Update user fields.""" stmt = select(User).where(User.id == user_id) user = db.execute(stmt).scalar_one_or_none() if not user: return None for key, value in kwargs.items(): setattr(user, key, value) db.commit() db.refresh(user) return user # Bulk updatefrom sqlalchemy import update stmt = ( update(User) .where(User.is_active == False) .values(deleted_at=datetime.utcnow()))db.execute(stmt)db.commit()``` ### Delete```pythondef delete_user(db: Session, user_id: int) -> bool: """Delete user.""" stmt = select(User).where(User.id == user_id) user = db.execute(stmt).scalar_one_or_none() if not user: return False db.delete(user) db.commit() return True # Bulk deletefrom sqlalchemy import delete stmt = delete(User).where(User.is_active == False)db.execute(stmt)db.commit()``` ## Transactions and Session Management ### Context Manager Pattern```pythonfrom contextlib import contextmanager @contextmanagerdef get_db_session(): """Session context manager.""" session = SessionLocal() try: yield session session.commit() except Exception: session.rollback() raise finally: session.close() # Usagewith get_db_session() as db: user = create_user(db, "test@example.com", "testuser", "password") # Auto-commits on success, rollback on exception``` ### Manual Transaction Control```pythondef transfer_money(db: Session, from_user_id: int, to_user_id: int, amount: float): """Transfer money between users with transaction.""" try: # Begin nested transaction with db.begin_nested(): # Deduct from sender stmt = select(User).where(User.id == from_user_id).with_for_update() sender = db.execute(stmt).scalar_one() sender.balance -= amount # Add to receiver stmt = select(User).where(User.id == to_user_id).with_for_update() receiver = db.execute(stmt).scalar_one() receiver.balance += amount db.commit() except Exception as e: db.rollback() raise``` ## Async SQLAlchemy ### Async Setup```pythonfrom sqlalchemy.ext.asyncio import ( create_async_engine, AsyncSession, async_sessionmaker) # Async engine (note: asyncpg for PostgreSQL, aiosqlite for SQLite)DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb" async_engine = create_async_engine( DATABASE_URL, echo=False, pool_size=5, max_overflow=10) # Async session factoryAsyncSessionLocal = async_sessionmaker( async_engine, class_=AsyncSession, expire_on_commit=False) # Create tablesasync def init_db(): async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all)``` ### Async CRUD Operations```pythonasync def get_user_async(user_id: int) -> Optional[User]: """Get user asynchronously.""" async with AsyncSessionLocal() as session: stmt = select(User).where(User.id == user_id) result = await session.execute(stmt) return result.scalar_one_or_none() async def create_user_async(email: str, username: str) -> User: """Create user asynchronously.""" async with AsyncSessionLocal() as session: user = User(email=email, username=username) session.add(user) await session.commit() await session.refresh(user) return user # FastAPI async dependencyasync def get_async_db(): async with AsyncSessionLocal() as session: yield session @app.get("/users/{user_id}")async def get_user_endpoint( user_id: int, db: AsyncSession = Depends(get_async_db)): stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) return result.scalar_one_or_none()``` ## Alembic Migrations ### Setup Alembic```bash# Initialize Alembicalembic init alembic # Edit alembic.ini - set database URL# sqlalchemy.url = postgresql://user:pass@localhost/mydb # Or use env variable in alembic/env.py``` ### Configure Alembic```python# alembic/env.pyfrom sqlalchemy import engine_from_config, poolfrom alembic import contextfrom myapp.models import Base # Import your Base # Add your model's MetaDatatarget_metadata = Base.metadata def run_migrations_online(): """Run migrations in 'online' mode.""" configuration = config.get_section(config.config_ini_section) configuration["sqlalchemy.url"] = os.getenv("DATABASE_URL") connectable = engine_from_config( configuration, prefix="sqlalchemy.", poolclass=pool.NullPool, ) with connectable.connect() as connection: context.configure( connection=connection, target_metadata=target_metadata ) with context.begin_transaction(): context.run_migrations()``` ### Create and Apply Migrations```bash# Auto-generate migration from model changesalembic revision --autogenerate -m "Add users table" # Review generated migration in alembic/versions/ # Apply migrationalembic upgrade head # Rollback one versionalembic downgrade -1 # Show current versionalembic current # Show migration historyalembic history``` ### Manual Migration Example```python# alembic/versions/xxx_add_users.pyfrom alembic import opimport sqlalchemy as sa def upgrade(): op.create_table( 'users', sa.Column('id', sa.Integer(), nullable=False), sa.Column('email', sa.String(255), nullable=False), sa.Column('username', sa.String(50), nullable=False), sa.PrimaryKeyConstraint('id') ) op.create_index('ix_users_email', 'users', ['email'], unique=True) def downgrade(): op.drop_index('ix_users_email', table_name='users') op.drop_table('users')``` ## FastAPI Integration ### Complete FastAPI Example```pythonfrom fastapi import FastAPI, Depends, HTTPException, statusfrom sqlalchemy.orm import Sessionfrom pydantic import BaseModel, EmailStrfrom typing import List app = FastAPI() # Pydantic schemasclass UserBase(BaseModel): email: EmailStr username: str class UserCreate(UserBase): password: str class UserResponse(UserBase): id: int is_active: bool created_at: datetime class Config: from_attributes = True # SQLAlchemy 2.0 (was orm_mode) # CRUD operations@app.post("/users/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)def create_user_endpoint(user: UserCreate, db: Session = Depends(get_db)): # Check if user exists stmt = select(User).where(User.email == user.email) if db.execute(stmt).scalar_one_or_none(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) # Create user db_user = User( email=user.email, username=user.username, hashed_password=hash_password(user.password) ) db.add(db_user) db.commit() db.refresh(db_user) return db_user @app.get("/users/{user_id}", response_model=UserResponse)def read_user(user_id: int, db: Session = Depends(get_db)): stmt = select(User).where(User.id == user_id) user = db.execute(stmt).scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) return user @app.get("/users/", response_model=List[UserResponse])def list_users( skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): stmt = ( select(User) .where(User.is_active == True) .offset(skip) .limit(limit) ) return db.execute(stmt).scalars().all() @app.put("/users/{user_id}", response_model=UserResponse)def update_user( user_id: int, user_update: UserBase, db: Session = Depends(get_db)): stmt = select(User).where(User.id == user_id) db_user = db.execute(stmt).scalar_one_or_none() if not db_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) db_user.email = user_update.email db_user.username = user_update.username db.commit() db.refresh(db_user) return db_user @app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)def delete_user(user_id: int, db: Session = Depends(get_db)): stmt = select(User).where(User.id == user_id) db_user = db.execute(stmt).scalar_one_or_none() if not db_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) db.delete(db_user) db.commit()``` ## Testing with Pytest ### Test Database Setup```pythonimport pytestfrom sqlalchemy import create_engine, StaticPoolfrom sqlalchemy.orm import sessionmaker # In-memory SQLite for testingSQLALCHEMY_TEST_DATABASE_URL = "sqlite:///:memory:" @pytest.fixture(scope="function")def db_session(): """Create test database session.""" engine = create_engine( SQLALCHEMY_TEST_DATABASE_URL, connect_args={"check_same_thread": False}, poolclass=StaticPool, ) # Create tables Base.metadata.create_all(bind=engine) TestingSessionLocal = sessionmaker( autocommit=False, autoflush=False, bind=engine ) session = TestingSessionLocal() try: yield session finally: session.close() Base.metadata.drop_all(bind=engine) @pytest.fixture(scope="function")def test_user(db_session): """Create test user.""" user = User( email="test@example.com", username="testuser", hashed_password="hashed" ) db_session.add(user) db_session.commit() db_session.refresh(user) return user``` ### Test Examples```pythondef test_create_user(db_session): """Test user creation.""" user = User(email="new@example.com", username="newuser") db_session.add(user) db_session.commit() assert user.id is not None assert user.email == "new@example.com" assert user.created_at is not None def test_query_user(db_session, test_user): """Test user query.""" stmt = select(User).where(User.email == "test@example.com") found_user = db_session.execute(stmt).scalar_one() assert found_user.id == test_user.id assert found_user.username == test_user.username def test_update_user(db_session, test_user): """Test user update.""" test_user.username = "updated" db_session.commit() stmt = select(User).where(User.id == test_user.id) updated_user = db_session.execute(stmt).scalar_one() assert updated_user.username == "updated" def test_delete_user(db_session, test_user): """Test user deletion.""" user_id = test_user.id db_session.delete(test_user) db_session.commit() stmt = select(User).where(User.id == user_id) assert db_session.execute(stmt).scalar_one_or_none() is None``` ## Performance Optimization ### Query Optimization```python# Use indexesclass User(Base): __tablename__ = "users" email: Mapped[str] = mapped_column(String(255), index=True, unique=True) created_at: Mapped[datetime] = mapped_column(index=True) # Composite index __table_args__ = ( Index('ix_user_email_active', 'email', 'is_active'), ) # Use select_from for complex queriesstmt = ( select(func.count(Post.id)) .select_from(User) .join(Post) .where(User.is_active == True)) # Use contains_eager for joined loadsfrom sqlalchemy.orm import contains_eager stmt = ( select(Post) .join(Post.author) .options(contains_eager(Post.author)) .where(User.is_active == True))``` ### Connection Pooling```python# Configure poolengine = create_engine( DATABASE_URL, pool_size=20, # Number of connections to keep max_overflow=10, # Additional connections when pool full pool_timeout=30, # Seconds to wait for connection pool_recycle=3600, # Recycle connections after 1 hour pool_pre_ping=True # Verify connections before use) # Monitor poolfrom sqlalchemy import event @event.listens_for(engine, "connect")def receive_connect(dbapi_conn, connection_record): print("New connection established") @event.listens_for(engine, "checkout")def receive_checkout(dbapi_conn, connection_record, connection_proxy): print("Connection checked out from pool")``` ### Batch Operations```python# Bulk insert with executemanyfrom sqlalchemy import insert data = [ {"email": f"user{i}@example.com", "username": f"user{i}"} for i in range(1000)] stmt = insert(User)db.execute(stmt, data)db.commit() # Bulk updatefrom sqlalchemy import update stmt = ( update(User) .where(User.is_active == False) .values(deleted_at=func.now()))db.execute(stmt)``` ## Best Practices 1. **Use SQLAlchemy 2.0 Syntax**: Modern API with better type hints2. **Type Annotations**: Use `Mapped[T]` and `mapped_column()`3. **Eager Loading**: Solve N+1 queries with `selectinload`/`joinedload`4. **Session Management**: Use dependency injection pattern5. **Migrations**: Always use Alembic for schema changes6. **Indexes**: Add indexes for frequently queried columns7. **Connection Pooling**: Configure appropriate pool settings8. **Testing**: Use in-memory SQLite for fast tests9. **Async**: Use `AsyncSession` for async frameworks10. **Error Handling**: Always handle `NoResultFound` and `MultipleResultsFound` ## Common Patterns ### Repository Pattern```pythonfrom typing import Generic, TypeVar, Typefrom sqlalchemy.orm import Session T = TypeVar('T', bound=Base) class BaseRepository(Generic[T]): def __init__(self, model: Type[T], db: Session): self.model = model self.db = db def get(self, id: int) -> Optional[T]: stmt = select(self.model).where(self.model.id == id) return self.db.execute(stmt).scalar_one_or_none() def get_all(self, skip: int = 0, limit: int = 100) -> list[T]: stmt = select(self.model).offset(skip).limit(limit) return self.db.execute(stmt).scalars().all() def create(self, obj: T) -> T: self.db.add(obj) self.db.commit() self.db.refresh(obj) return obj def delete(self, id: int) -> bool: obj = self.get(id) if obj: self.db.delete(obj) self.db.commit() return True return False # Usageuser_repo = BaseRepository(User, db)user = user_repo.get(1)``` ### Soft Delete Pattern```pythonclass SoftDeleteMixin: deleted_at: Mapped[Optional[datetime]] = mapped_column(default=None) @property def is_deleted(self) -> bool: return self.deleted_at is not None class User(Base, SoftDeleteMixin): __tablename__ = "users" # ... fields # Query only active recordsstmt = select(User).where(User.deleted_at.is_(None)) # Soft deleteuser.deleted_at = datetime.utcnow()db.commit()``` ### Audit Trail Pattern```pythonclass AuditMixin: created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now() ) updated_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), onupdate=func.now() ) created_by: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id")) updated_by: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id")) class Post(Base, AuditMixin): __tablename__ = "posts" # ... fields``` ## Resources - [SQLAlchemy 2.0 Documentation](https://docs.sqlalchemy.org/en/20/)- [Alembic Documentation](https://alembic.sqlalchemy.org/)- [FastAPI SQLAlchemy Guide](https://fastapi.tiangolo.com/tutorial/sql-databases/)- [SQLAlchemy Type Annotations](https://docs.sqlalchemy.org/en/20/orm/declarative_tables.html#mapped-column-derives-the-datatype-and-nullability-from-the-mapped-annotation) ## Related Skills When using Sqlalchemy, these skills enhance your workflow:- **django**: Django ORM patterns and migration strategies for comparison- **test-driven-development**: TDD patterns for database models and queries- **fastapi-local-dev**: FastAPI + SQLAlchemy integration patterns- **systematic-debugging**: Advanced debugging for ORM query issues and N+1 problems [Full documentation available in these skills if deployed in your bundle]