How to implement a session creator with a pool with SQL alchemy for my app.
I'll help you create a session manager with a connection pool using SQLAlchemy. This is a common pattern to efficiently manage database connections.
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.pool import QueuePool
from contextlib import contextmanager
from typing import Generator
import os
from threading import Lock
class DatabaseSessionManager:
"""
A thread-safe database session manager that handles connection pooling
and session creation for SQLAlchemy.
"""
_instance = None
_lock = Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super(DatabaseSessionManager, cls).__new__(cls)
return cls._instance
def __init__(self):
# Only initialize once
if hasattr(self, 'engine'):
return
# Configure database connection
db_url = os.getenv('DATABASE_URL', 'postgresql://user:password@localhost:5432/dbname')
# Create engine with connection pooling
self.engine = create_engine(
db_url,
poolclass=QueuePool,
pool_size=5, # Number of connections to maintain
max_overflow=10, # Maximum number of connections to allow over pool_size
pool_timeout=30, # Seconds to wait before timing out on getting a connection
pool_recycle=1800, # Recycle connections after 30 minutes
echo=False # Set to True for SQL query logging
)
# Create session factory
session_factory = sessionmaker(
bind=self.engine,
autocommit=False,
autoflush=False
)
# Create thread-safe session registry
self.Session = scoped_session(session_factory)
@contextmanager
def session_scope(self) -> Generator:
"""
Provide a transactional scope around a series of operations.
Usage:
with session_manager.session_scope() as session:
session.query(...)
"""
session = self.Session()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def get_session(self):
"""
Get a new session directly.
Remember to close it when done!
"""
return self.Session()
def cleanup(self):
"""
Clean up database connections.
Call this when shutting down your application.
"""
self.Session.remove()
self.engine.dispose()
# Example usage
if __name__ == "__main__":
# Example model
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
# Initialize session manager
session_manager = DatabaseSessionManager()
# Create tables
Base.metadata.create_all(session_manager.engine)
# Example using context manager
with session_manager.session_scope() as session:
# Create a new user
new_user = User(name="John Doe")
session.add(new_user)
# Session will automatically commit if no errors occur
# and roll back if there are any exceptions
# Example using direct session
session = session_manager.get_session()
try:
users = session.query(User).all()
for user in users:
print(user.name)
finally:
session.close()
# Clean up when shutting down
session_manager.cleanup()
This implementation provides several key features:
To use this in your application:
pip install sqlalchemy psycopg2-binary
export DATABASE_URL="postgresql://user:password@localhost:5432/dbname"
# Create a session manager instance
session_manager = DatabaseSessionManager()
# Use the context manager (recommended)
with session_manager.session_scope() as session:
result = session.query(YourModel).all()
The connection pool settings can be adjusted based on your needs:
pool_size
: Number of connections to keep open