npx skills add https://github.com/wshobson/agents --skill python-resource-managementHow Python Resource Management fits into a Paperclip company.
Python Resource Management drops into any Paperclip agent that handles this kind of work. Assign it to a specialist inside a pre-configured PaperclipOrg company and the skill becomes available on every heartbeat — no prompt engineering, no tool wiring.
Pre-configured AI company — 18 agents, 18 skills, one-time purchase.
SKILL.md421 linesExpandCollapse
---name: python-resource-managementdescription: Python resource management with context managers, cleanup patterns, and streaming. Use when managing connections, file handles, implementing cleanup logic, or building streaming responses with accumulated state.--- # Python Resource Management Manage resources deterministically using context managers. Resources like database connections, file handles, and network sockets should be released reliably, even when exceptions occur. ## When to Use This Skill - Managing database connections and connection pools- Working with file handles and I/O- Implementing custom context managers- Building streaming responses with state- Handling nested resource cleanup- Creating async context managers ## Core Concepts ### 1. Context Managers The `with` statement ensures resources are released automatically, even on exceptions. ### 2. Protocol Methods `__enter__`/`__exit__` for sync, `__aenter__`/`__aexit__` for async resource management. ### 3. Unconditional Cleanup `__exit__` always runs, regardless of whether an exception occurred. ### 4. Exception Handling Return `True` from `__exit__` to suppress exceptions, `False` to propagate them. ## Quick Start ```pythonfrom contextlib import contextmanager @contextmanagerdef managed_resource(): resource = acquire_resource() try: yield resource finally: resource.cleanup() with managed_resource() as r: r.do_work()``` ## Fundamental Patterns ### Pattern 1: Class-Based Context Manager Implement the context manager protocol for complex resources. ```pythonclass DatabaseConnection: """Database connection with automatic cleanup.""" def __init__(self, dsn: str) -> None: self._dsn = dsn self._conn: Connection | None = None def connect(self) -> None: """Establish database connection.""" self._conn = psycopg.connect(self._dsn) def close(self) -> None: """Close connection if open.""" if self._conn is not None: self._conn.close() self._conn = None def __enter__(self) -> "DatabaseConnection": """Enter context: connect and return self.""" self.connect() return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: """Exit context: always close connection.""" self.close() # Usage with context manager (preferred)with DatabaseConnection(dsn) as db: result = db.execute(query) # Manual management when neededdb = DatabaseConnection(dsn)db.connect()try: result = db.execute(query)finally: db.close()``` ### Pattern 2: Async Context Manager For async resources, implement the async protocol. ```pythonclass AsyncDatabasePool: """Async database connection pool.""" def __init__(self, dsn: str, min_size: int = 1, max_size: int = 10) -> None: self._dsn = dsn self._min_size = min_size self._max_size = max_size self._pool: asyncpg.Pool | None = None async def __aenter__(self) -> "AsyncDatabasePool": """Create connection pool.""" self._pool = await asyncpg.create_pool( self._dsn, min_size=self._min_size, max_size=self._max_size, ) return self async def __aexit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: """Close all connections in pool.""" if self._pool is not None: await self._pool.close() async def execute(self, query: str, *args) -> list[dict]: """Execute query using pooled connection.""" async with self._pool.acquire() as conn: return await conn.fetch(query, *args) # Usageasync with AsyncDatabasePool(dsn) as pool: users = await pool.execute("SELECT * FROM users WHERE active = $1", True)``` ### Pattern 3: Using @contextmanager Decorator Simplify context managers with the decorator for straightforward cases. ```pythonfrom contextlib import contextmanager, asynccontextmanagerimport timeimport structlog logger = structlog.get_logger() @contextmanagerdef timed_block(name: str): """Time a block of code.""" start = time.perf_counter() try: yield finally: elapsed = time.perf_counter() - start logger.info(f"{name} completed", duration_seconds=round(elapsed, 3)) # Usagewith timed_block("data_processing"): process_large_dataset() @asynccontextmanagerasync def database_transaction(conn: AsyncConnection): """Manage database transaction.""" await conn.execute("BEGIN") try: yield conn await conn.execute("COMMIT") except Exception: await conn.execute("ROLLBACK") raise # Usageasync with database_transaction(conn) as tx: await tx.execute("INSERT INTO users ...") await tx.execute("INSERT INTO audit_log ...")``` ### Pattern 4: Unconditional Resource Release Always clean up resources in `__exit__`, regardless of exceptions. ```pythonclass FileProcessor: """Process file with guaranteed cleanup.""" def __init__(self, path: str) -> None: self._path = path self._file: IO | None = None self._temp_files: list[Path] = [] def __enter__(self) -> "FileProcessor": self._file = open(self._path, "r") return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: """Clean up all resources unconditionally.""" # Close main file if self._file is not None: self._file.close() # Clean up any temporary files for temp_file in self._temp_files: try: temp_file.unlink() except OSError: pass # Best effort cleanup # Return None/False to propagate any exception``` ## Advanced Patterns ### Pattern 5: Selective Exception Suppression Only suppress specific, documented exceptions. ```pythonclass StreamWriter: """Writer that handles broken pipe gracefully.""" def __init__(self, stream) -> None: self._stream = stream def __enter__(self) -> "StreamWriter": return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> bool: """Clean up, suppressing BrokenPipeError on shutdown.""" self._stream.close() # Suppress BrokenPipeError (client disconnected) # This is expected behavior, not an error if exc_type is BrokenPipeError: return True # Exception suppressed return False # Propagate all other exceptions``` ### Pattern 6: Streaming with Accumulated State Maintain both incremental chunks and accumulated state during streaming. ```pythonfrom collections.abc import Generatorfrom dataclasses import dataclass, field @dataclassclass StreamingResult: """Accumulated streaming result.""" chunks: list[str] = field(default_factory=list) _finalized: bool = False @property def content(self) -> str: """Get accumulated content.""" return "".join(self.chunks) def add_chunk(self, chunk: str) -> None: """Add chunk to accumulator.""" if self._finalized: raise RuntimeError("Cannot add to finalized result") self.chunks.append(chunk) def finalize(self) -> str: """Mark stream complete and return content.""" self._finalized = True return self.content def stream_with_accumulation( response: StreamingResponse,) -> Generator[tuple[str, str], None, str]: """Stream response while accumulating content. Yields: Tuple of (accumulated_content, new_chunk) for each chunk. Returns: Final accumulated content. """ result = StreamingResult() for chunk in response.iter_content(): result.add_chunk(chunk) yield result.content, chunk return result.finalize()``` ### Pattern 7: Efficient String Accumulation Avoid O(n²) string concatenation when accumulating. ```pythondef accumulate_stream(stream) -> str: """Efficiently accumulate stream content.""" # BAD: O(n²) due to string immutability # content = "" # for chunk in stream: # content += chunk # Creates new string each time # GOOD: O(n) with list and join chunks: list[str] = [] for chunk in stream: chunks.append(chunk) return "".join(chunks) # Single allocation``` ### Pattern 8: Tracking Stream Metrics Measure time-to-first-byte and total streaming time. ```pythonimport timefrom collections.abc import Generator def stream_with_metrics( response: StreamingResponse,) -> Generator[str, None, dict]: """Stream response while collecting metrics. Yields: Content chunks. Returns: Metrics dictionary. """ start = time.perf_counter() first_chunk_time: float | None = None chunk_count = 0 total_bytes = 0 for chunk in response.iter_content(): if first_chunk_time is None: first_chunk_time = time.perf_counter() - start chunk_count += 1 total_bytes += len(chunk.encode()) yield chunk total_time = time.perf_counter() - start return { "time_to_first_byte_ms": round((first_chunk_time or 0) * 1000, 2), "total_time_ms": round(total_time * 1000, 2), "chunk_count": chunk_count, "total_bytes": total_bytes, }``` ### Pattern 9: Managing Multiple Resources with ExitStack Handle a dynamic number of resources cleanly. ```pythonfrom contextlib import ExitStack, AsyncExitStackfrom pathlib import Path def process_files(paths: list[Path]) -> list[str]: """Process multiple files with automatic cleanup.""" results = [] with ExitStack() as stack: # Open all files - they'll all be closed when block exits files = [stack.enter_context(open(p)) for p in paths] for f in files: results.append(f.read()) return results async def process_connections(hosts: list[str]) -> list[dict]: """Process multiple async connections.""" results = [] async with AsyncExitStack() as stack: connections = [ await stack.enter_async_context(connect_to_host(host)) for host in hosts ] for conn in connections: results.append(await conn.fetch_data()) return results``` ## Best Practices Summary 1. **Always use context managers** - For any resource that needs cleanup2. **Clean up unconditionally** - `__exit__` runs even on exception3. **Don't suppress unexpectedly** - Return `False` unless suppression is intentional4. **Use @contextmanager** - For simple resource patterns5. **Implement both protocols** - Support `with` and manual management6. **Use ExitStack** - For dynamic numbers of resources7. **Accumulate efficiently** - List + join, not string concatenation8. **Track metrics** - Time-to-first-byte matters for streaming9. **Document behavior** - Especially exception suppression10. **Test cleanup paths** - Verify resources are released on errorsAccessibility Compliance
This walks you through implementing proper WCAG 2.2 compliance with real code patterns for screen readers, keyboard navigation, and mobile accessibility. It cov
Airflow Dag Patterns
If you're building data pipelines with Airflow, this skill gives you production-ready DAG patterns that actually work in the real world. It covers TaskFlow API
Angular Migration
Migrating from AngularJS to Angular is notoriously painful, and this skill tackles the practical stuff that makes or breaks these projects. It covers hybrid app