DEV Community

Cover image for SQLAlchemy 2.0: The Most Powerful ORM for Python Yet
Leapcell
Leapcell

Posted on

SQLAlchemy 2.0: The Most Powerful ORM for Python Yet

Image description

Leapcell: The Next-Gen Serverless Platform for Python app Hosting

SQLAlchemy Tutorial

SQLAlchemy is the most popular Object Relational Mapping (ORM) in the Python ecosystem. It has an elegant design and is divided into two parts: the underlying Core and the upper-level traditional ORM. In most ORMs in Python and even in other languages, a good hierarchical design has not been implemented. For example, in Django's ORM, the database connection and the ORM itself are completely mixed together.

Image description

Why Do We Need the Core?

The Core layer mainly implements the client connection pool. As the core of modern web applications, the concurrent connection capability of relational databases is often not strong. It is generally not recommended to use a large number of short connections, and in most cases, a connection pool is needed. There are roughly two types of connection pools:

  • Server-side connection pool: A specialized connection pool middleware that allocates a long connection for reuse each time for a short connection.
  • Client-side connection pool: Generally introduced into the code as a third-party library.

The connection pool of SQLAlchemy belongs to the client-side connection pool. In this connection pool, SQLAlchemy maintains a certain number of long connections. When connect is called, it actually retrieves a connection from the pool; when close is called, it actually returns the connection to the pool.

Creating a Connection

In SQLAlchemy, use create_engine to create a connection (pool). The parameter of create_engine is the URL of the database.

from sqlalchemy import create_engine

# MySQL connection example
engine = create_engine(
    "mysql://user:password@localhost:3306/dbname",
    echo=True,  # Setting echo to True will print the actual executed SQL, which is more convenient for debugging
    future=True,  # Use the SQLAlchemy 2.0 API, which is backward-compatible
    pool_size=5,  # The size of the connection pool is 5 by default. Setting it to 0 means there is no limit to the connection
    pool_recycle=3600  # Set the time to limit the automatic disconnection of the database
)

# Create an in-memory SQLite database. You must add check_same_thread=False, otherwise it cannot be used in a multithreaded environment
engine = create_engine("sqlite:///:memory:", echo=True, future=True,
                       connect_args={"check_same_thread": False})

# Another way to connect to MySQL
# pip install mysqlclient
engine = create_engine('mysql+mysqldb://user:password@localhost/foo?charset=utf8mb4')
Enter fullscreen mode Exit fullscreen mode

The Core Layer -- Using SQL Directly

CRUD

from sqlalchemy import text

with engine.connect() as conn:
    result = conn.execute(text("select * from users"))
    print(result.all())

# The result can be iterated over, and each row result is a Row object
for row in result:
    # The row object supports three access methods
    print(row.x, row.y)
    print(row[0], row[1])
    print(row["x"], row["y"])

# Pass parameters, use `:var` to pass
result = conn.execute(
    text("SELECT x, y FROM some_table WHERE y > :y"),
    {"y": 2}
)

# You can also pre-compile the parameters
stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)

# When inserting, you can directly insert multiple rows
conn.execute(
    text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
    [{"x": 11, "y": 12}, {"x": 13, "y": 14}]
)
Enter fullscreen mode Exit fullscreen mode

Transactions and Commit

SQLAlchemy provides two ways to commit, one is manual commit, and the other is semi-automatic commit. The official documentation recommends using engine.begin(). There is also a completely automatic autocommit method that commits once for each row, which is not recommended.

# "commit as you go" requires manual commit
with engine.connect() as conn:
    conn.execute(text("CREATE TABLE some_table (x int, y int)"))
    conn.execute(
        text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
        [{"x": 1, "y": 1}, {"x": 2, "y": 4}]
    )
    conn.commit()  # Note the commit here

# "begin once" semi-automatic commit
with engine.begin() as conn:
    conn.execute(
        text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
        [{"x": 6, "y": 8}, {"x": 9, "y": 10}]
    )
Enter fullscreen mode Exit fullscreen mode

ORM

Session

The Session is not thread-safe. But generally, the web framework should obtain a session at the start of each request, so it is not a problem either.

from sqlalchemy.orm import Session

with Session(engine) as session:
    session.add(foo)
    session.commit()

# You can also use sessionmaker to create a factory function, so you don't have to enter parameters every time
from sqlalchemy.orm import sessionmaker
new_session = sessionmaker(engine)

with new_session() as session:
    ...
Enter fullscreen mode Exit fullscreen mode

Declarative API

  • Use __tablename__ to specify the database table name.
  • Use Mapped and native types to declare each field.
  • Use Integer, String, etc. to specify the field type.
  • Use the index parameter to specify the index.
  • Use the unique parameter to specify the unique index.
  • Use __table_args__ to specify other attributes, such as composite indexes.
from datetime import datetime
from sqlalchemy import Integer, String, func, UniqueConstraint
from sqlalchemy.orm import relationship, mapped_column, Mapped
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    # It must be a tuple, not a list
    __table_args__ = (UniqueConstraint("name", "time_created"),)
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    name: Mapped[str] = mapped_column(String(30), index=True)
    fullname: Mapped[str] = mapped_column(String, unique=True)
    # For particularly large fields, you can also use deferred, so that this field is not loaded by default
    description: Mapped[str] = mapped_column(Text, deferred=True)
    # Default value, note that a function is passed, not the current time
    time_created: Mapped[datetime] = mapped_column(DateTime(Timezone=True), default=datetime.now)
    # Or use the server default value, but it must be set when the table is created and will become part of the table's schema
    time_created: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
    time_updated: Mapped[datetime] = mapped_column(DateTime(timezone=True), onupdate=func.now())


class Address(Base):
    __tablename__ = "address"
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    email_address: Mapped[str] = mapped_column(String, nullable=False)

# Call create_all to create all models
Base.metadata.create_all(engine)

# If you only need to create one model
User.__table__.create(engine)
Enter fullscreen mode Exit fullscreen mode

Foreign Keys

Use relationship to specify the association relationship between models.

Bi-directional Mapping of One-to-Many Relationship

from sqlalchemy import create_engine, Integer, String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, relationship, Session, Mapped, mapped_column

class Group(Base):
    __tablename__ = 'groups'
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    name: Mapped[str] = mapped_column(String)
    # The corresponding multiple users, here use the model name as the parameter
    members = relationship('User')

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    # group_id is the real foreign key name in the database, and the second field ForeignKey is used to specify the corresponding ID
    group_id = Column(Integer, ForeignKey('groups.id'))
    # The corresponding group field in the model, which needs to declare which field in the corresponding model it overlaps with
    group = relationship('Group', overlaps="members")
Enter fullscreen mode Exit fullscreen mode

Many-to-Many Mapping, an Association Table is Required

# Association table
class UserPermissions(Base):
    __tablename__ = 'user_permissions'
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    # Also use foreign key to specify the foreign key
    user_id: Mapped[int] = mapped_column(Integer, ForeignKey('users.id'))
    permission_id: Mapped[str] = mapped_column(String, ForeignKey('permissions.id'))

class User(Base):
    __tablename__ = 'users'
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    name: Mapped[str] = Column(String)
    # Use secondary to specify the association table, and also use overlaps to specify the corresponding field in the model
    permissions = relationship('Permission', secondary="user_permissions", overlaps="users")

class Permission(Base):
    __tablename__ = 'permissions'
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    name: Mapped[str] = Column(String)
    # The same as above
    users = relationship('User', secondary="user_permissions", overlaps="permissions")


user1 = User(name='user1', group_id=1)
user2 = User(name='user2')
group1 = Group(name='group1')
group2 = Group(name='group2', members=[user2])
permission1 = Permission(name="open_file")
permission2 = Permission(name="save_file")
user1.permissions.append(permission1)

db.add_all([user1, user2, group1, group2, permission1, permission2])

db.commit()

print(user1.permissions[0].id)
Enter fullscreen mode Exit fullscreen mode

In most other tutorials, backref is used to generate the attributes of the corresponding model. Here, it is more preferable to explicitly declare the accessible attributes in the corresponding model.

CRUD

Different from the 1.x API, in the 2.0 API, query is no longer used, but select is used to query data.

from sqlalchemy import select

# The parameter of where is an expression composed of `==`. The advantage is that when writing code, spelling errors will be detected
stmt = select(User).where(User.name == "john").order_by(User.id)
# filter_by uses **kwargs as parameters
stmt = select(User).filter_by(name="some_user")
# order_by can also use User.id.desc() to represent reverse sorting

result = session.execute(stmt)

# Generally, when selecting the entire object, the scalars method should be used, otherwise a tuple containing one object will be returned
for user in result.scalars():
    print(user.name)

# When querying a single attribute of the model, there is no need to use scalars
result = session.execute(select(User.name))
for row in result:
    print(row.name)

# There is also a shortcut to query by id:
user = session.get(User, pk=1)

# To update data, the update statement needs to be used
from sqlalchemy import update
# synchronize_session has three options: false, "fetch", "evaluate", and the default is evaluate
# false means not updating the object in Python at all
# fetch means reloading an object from the database
# evaluate means that while updating the database, the same operation is also tried on the object in Python as much as possible
stmt = update(User).where(User.name == "john").values(name="John").execution_options(synchronize_session="fetch")
session.execute(stmt)

# Or directly assign a value to the attribute
user.name = "John"
session.commit()

# There is a place here that may introduce a race condition (竞态条件)
# Wrong! If two processes update this value at the same time, it may result in only one value being updated.
# Both assign the value they think is correct, which is 2, but the actual correct value is 1 + 1 + 1 = 3
# Corresponding SQL: Update users set visit_count = 2 where user.id = 1
user.visit_count += 1
# Correct approach: Note the capital U, that is, using the attribute of the model, and the generated SQL is to add 1 on the SQL server side
# Corresponding SQL: Update users set visit_count = visit_count + 1 where user.id = 1
user.visit_count = User.visit_count + 1

# To add an object, directly use the session.add method
session.add(user)
# Or add_all
session.add_all([user1, user2, group1])

# If you want to get the inserted ID, of course, you can also read it after committing
session.flush()  # flush is not a commit, and the transaction has not been committed. It should be repeatable read, which is related to the isolation level of the database.
print(user.id)

# To delete, use session.delete
session.delete(user)
Enter fullscreen mode Exit fullscreen mode

Loading Associated Models

If after reading a list of N records, you then go to the database to read the specific values of each item one by one, N+1 queries will be generated, which is the most common mistake in the database: the N+1 problem.

By default, the foreign key associated models will not be loaded in the query. You can use the selectinload option to load the foreign keys, thus avoiding the N+1 problem.

# Foreign keys not loaded
session.execute(select(User)).scalars().all()
# Foreign keys loaded
session.execute(select(User).options(selectinload(User.groups))).scalars().all()
Enter fullscreen mode Exit fullscreen mode

The principle of Selectinload is to use the select in subquery. In addition to selectinload, the traditional joinedload can also be used, and its principle is the most common join table.

# Use joinedload to load foreign keys. Note that the unique method needs to be used, which is specified in 2.0.
session.execute(select(User).options(joinedload(User.groups))).unique().scalars().all()
Enter fullscreen mode Exit fullscreen mode

In 2.0, it is more recommended to use selectinload rather than joinedload. Generally, selectinload is better, and there is no need to use unique.

Writing Foreign Keys

In SQLAlchemy, you can directly handle foreign keys just like handling arrays.

user.permissions.append(open_permission)  # Add
user.permissions.remove(save_permission)  # Remove
# Clear all foreign keys
user.permissions.clear()
user.permissions = []
Enter fullscreen mode Exit fullscreen mode

Special Handling of JSON Fields

Most databases now support JSON fields. In SQLAlchemy, you can directly read a JSON object from a field or write a JSON object to it. But never directly perform an update on this JSON object and expect to write it back to the database, which is unreliable. Be sure to copy, read and write, and then assign it back.

import copy
article = session.get(Article, 1)
tags = copy.copy(article.tags)
tags.append("iOS")
article.tags = tags
session.commit()
Enter fullscreen mode Exit fullscreen mode

Batch Insertion

When a large amount of data needs to be inserted, if the method of inserting one by one is used, a lot of time will be wasted in the interaction with the database, and the efficiency is very low. Most databases such as MySQL provide the insert ... values (...), (...) ... batch insertion API, and this can also be well utilized in SQLAlchemy.

# Use session.bulk_save_objects(...) to directly insert multiple objects
from sqlalchemy.orm import Session

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

# Using bulk_insert_mappings can save the overhead of creating objects and directly insert dictionaries
users = [
    {"name": "u1"},
    {"name": "u2"},
    {"name": "u3"},
]
s.bulk_insert_mappings(User, users)
s.commit()

# Using bulk_update_mappings can update objects in batches. The id in the dictionary will be used as the where condition,
# and all other fields will be used for the update
session.bulk_update_mappings(User, users)
Enter fullscreen mode Exit fullscreen mode

DeclarativeBase

Fully embrace the Python native type system

from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
    pass
from sqlalchemy.orm import mapped_column, MappedColumn
id: Mapped[int] = mapped_column(Integer, primary_key=True)
fullname: Mapped[Optional[str]]
Enter fullscreen mode Exit fullscreen mode

Asyncio

One AsyncSession per task. The AsyncSession object is a mutable, stateful object that represents an ongoing single, stateful database transaction. When using asyncio for concurrent tasks, such as using APIs like asyncio.gather(), each individual task should use a separate AsyncSession.

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

engine = create_async_engine(url, echo=True)
session = async_sessionmaker(engine)

# Create objects
async with engine.begin() as conn:
       await conn.run_sync(Base.metadata.create_all)

# Insert data
async with session() as db:
    db.add(...)
    await db.commit()

# Query data
async with session() as db:
    stmt = select(A)
    row = await db.execute(stmt)
    for obj in row.scalars():
        print(obj.id)

await engine.dispose()
Enter fullscreen mode Exit fullscreen mode

Using in a Multiprocessing Environment

Due to the Global Interpreter Lock (GIL) in Python, to utilize multi-core processors, multiprocessing needs to be used. In a multiprocessing environment, resources cannot be shared. Corresponding to SQLAlchemy, that is, the connection pool cannot be shared. We need to solve this problem manually.

Generally speaking, it is best not to try to share the same Session among multiple processes. It is best to create a Session when initializing each process.

Adding Where Conditions Only When a Value is Set

In the URL, it is often necessary to return corresponding results according to which options the user has specified.

query = select(User)
if username is not None:
    query = query.where(User.username == username)
if password is not None:
    query = query.where(User.password == password)
Enter fullscreen mode Exit fullscreen mode

Leapcell: The Next-Gen Serverless Platform for Python app Hosting

Finally, I would like to recommend to you the platform that is most suitable for deploying Python services: Leapcell

Image description

1. Multi-Language Support

  • Develop with JavaScript, Python, Go, or Rust.

2. Deploy unlimited projects for free

  • pay only for usage — no requests, no charges.

3. Unbeatable Cost Efficiency

  • Pay-as-you-go with no idle charges.
  • Example: $25 supports 6.94M requests at a 60ms average response time.

4. Streamlined Developer Experience

  • Intuitive UI for effortless setup.
  • Fully automated CI/CD pipelines and GitOps integration.
  • Real-time metrics and logging for actionable insights.

5. Effortless Scalability and High Performance

  • Auto-scaling to handle high concurrency with ease.
  • Zero operational overhead — just focus on building.

Image description

Explore more in the documentation!

Leapcell Twitter: https://x.com/LeapcellHQ

Top comments (0)