标题

用一段优雅的 Python 代码,把 SQLAlchemy 模型安全、高效地序列化成字典


副标题 / 摘要

SQLAlchemy 模型转字典(dict)看似简单,却暗藏字段格式、关系递归、循环引用等坑。本文通过一段实战代码,带你实现一个可复用的 _to_dict 序列化工具,并分析其设计取舍与改进方向,适合正在用 SQLAlchemy 写后端接口的你。


目标读者

这篇文章适合以下读者:

  • 使用 SQLAlchemy 做 ORM 的后端开发者
  • 想把 ORM 模型转换为 JSON/dict 的 Python 工程师
  • 模型序列化规范化 有需求的中级开发者
  • 使用 Flask/FastAPI/Django + SQLAlchemy 的同学

一、背景 / 动机:为什么要自己写 _to_dict

在 Web 开发中,我们几乎每天都要做一件事:

把数据库里的 ORM 对象,转成可以 JSON 响应给前端的数据结构(通常是 dict / list)。

乍一看好像只是 obj.__dict__ 或用个 asdict 就完事,但现实中的问题包括:

  1. 日期时间字段无法直接 JSON 化datetime / date 对象不能直接 JSON 序列化,必须格式化成字符串。

  2. 关系字段怎么处理?

    • 一对多 / 多对多(uselist=True
    • 一对一 / 多对一(uselist=False
  3. 避免递归爆炸: 两个模型互相关联,很容易序列化时陷入无限递归。

  4. 统一输出格式: 不同模型、不同接口如果各写各的 to_dict,维护成本极高。

于是,就有了这段通用序列化代码:

def _serialize_row(self, obj):
    return self._to_dict(obj) if obj else None

def _to_dict(self, obj, include_relationships=True, backref_depth=1):
    mapper = inspect(obj.__class__)
    data = {}

    # 字段
    for column in mapper.columns:
        val = getattr(obj, column.key)
        if isinstance(val, (date, datetime)):
            val = val.strftime("%Y-%m-%d %H:%M:%S")
        data[column.key] = val

    # 关系
    if include_relationships and backref_depth > 0:
        for name, rel in mapper.relationships.items():
            value = getattr(obj, name)

            if value is None:
                data[name] = None
            elif rel.uselist:
                data[name] = [
                    self._to_dict(
                        item,
                        include_relationships=False,
                        backref_depth=backref_depth-1
                    )
                    for item in value
                ]
            else:
                data[name] = self._to_dict(
                    value,
                    include_relationships=False,
                    backref_depth=backref_depth-1
                )

    return data

二、核心概念解释

在深入代码前,先把几个关键概念讲清楚:

1. SQLAlchemy 的 mapper

mapper = inspect(obj.__class__)
  • inspect() 是 SQLAlchemy 的一个工具函数,用来获取模型类的 映射信息
  • mapper.columns:模型映射到表的全部字段(Column)。
  • mapper.relationships:模型定义的所有关系(relationship(...))。

2. uselist:关系是单个对象还是列表

  • rel.uselist == True:关系是 多条记录(一对多 / 多对多),比如 User.posts
  • rel.uselist == False:关系是 单个对象(一对一 / 多对一),比如 Post.author

我们需要根据这个属性决定是返回:

  • list[dict],还是
  • dictNone

3. 循环引用 & backref_depth

如果 A 模型引用 B,B 又引用回 A:

  • A → B → A → B …… 非常容易递归到栈溢出。

所以这里设计了一个参数:

  • backref_depth控制反向引用的递归深度,默认是 1 每深入一层递归,backref_depth-1,直到 0 时不再继续关系序列化。

4. include_relationships

  • include_relationships=True:序列化时,把关联对象也一起展开。
  • False:只序列化当前表的字段,不管关系。

这个开关可以在不同场景下灵活控制:

  • 列表接口:往往只要字段即可(减少体积)。
  • 详情接口:可能需要关联信息(如用户 + 地址)。

三、实践指南:一步步实现可复用的序列化工具

你可以把这两个方法放到一个 BaseMixin / 工具类里,比如:

from datetime import date, datetime
from sqlalchemy import inspect

class ModelSerializerMixin:
    def _serialize_row(self, obj):
        return self._to_dict(obj) if obj else None

    def _to_dict(self, obj, include_relationships=True, backref_depth=1):
        mapper = inspect(obj.__class__)
        data = {}

        # 1. 处理普通字段
        for column in mapper.columns:
            val = getattr(obj, column.key)
            if isinstance(val, (date, datetime)):
                val = val.strftime("%Y-%m-%d %H:%M:%S")
            data[column.key] = val

        # 2. 处理关系字段
        if include_relationships and backref_depth > 0:
            for name, rel in mapper.relationships.items():
                value = getattr(obj, name)

                if value is None:
                    data[name] = None
                elif rel.uselist:
                    data[name] = [
                        self._to_dict(
                            item,
                            include_relationships=False,
                            backref_depth=backref_depth - 1
                        )
                        for item in value
                    ]
                else:
                    data[name] = self._to_dict(
                        value,
                        include_relationships=False,
                        backref_depth=backref_depth - 1
                    )

        return data

然后你的模型可以这样用:

class User(Base, ModelSerializerMixin):
    __tablename__ = "users"
    # id, name, created_at 等字段...
    # posts = relationship("Post", back_populates="author")

四、可运行示例:从模型到 JSON 响应

下面给一个完整、可理解的示例(略做简化):

from datetime import datetime, date
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, create_engine
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
from sqlalchemy import inspect

Base = declarative_base()

class ModelSerializerMixin:
    def _serialize_row(self, obj):
        return self._to_dict(obj) if obj else None

    def _to_dict(self, obj, include_relationships=True, backref_depth=1):
        mapper = inspect(obj.__class__)
        data = {}

        # 字段
        for column in mapper.columns:
            val = getattr(obj, column.key)
            if isinstance(val, (date, datetime)):
                val = val.strftime("%Y-%m-%d %H:%M:%S")
            data[column.key] = val

        # 关系
        if include_relationships and backref_depth > 0:
            for name, rel in mapper.relationships.items():
                value = getattr(obj, name)

                if value is None:
                    data[name] = None
                elif rel.uselist:
                    data[name] = [
                        self._to_dict(item, include_relationships=False, backref_depth=backref_depth-1)
                        for item in value
                    ]
                else:
                    data[name] = self._to_dict(
                        value,
                        include_relationships=False,
                        backref_depth=backref_depth-1
                    )

        return data


class User(Base, ModelSerializerMixin):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    created_at = Column(DateTime, default=datetime.utcnow)

    posts = relationship("Post", back_populates="author")


class Post(Base, ModelSerializerMixin):
    __tablename__ = "posts"

    id = Column(Integer, primary_key=True)
    title = Column(String(100))
    created_at = Column(DateTime, default=datetime.utcnow)
    user_id = Column(Integer, ForeignKey("users.id"))

    author = relationship("User", back_populates="posts")


# --- 演示 ---
engine = create_engine("sqlite:///:memory:", echo=False)
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine)

session = SessionLocal()

user = User(name="Alice")
post1 = Post(title="Hello SQLAlchemy", author=user)
post2 = Post(title="Serialization Tricks", author=user)

session.add_all([user, post1, post2])
session.commit()

# 从数据库中查询
u = session.query(User).first()

# 转成 dict
user_dict = u._to_dict(include_relationships=True)
print(user_dict)

示例输出类似:

{
    'id': 1,
    'name': 'Alice',
    'created_at': '2025-11-11 10:00:00',
    'posts': [
        {
            'id': 1,
            'title': 'Hello SQLAlchemy',
            'created_at': '2025-11-11 10:00:00',
            'user_id': 1
        },
        {
            'id': 2,
            'title': 'Serialization Tricks',
            'created_at': '2025-11-11 10:01:00',
            'user_id': 1
        }
    ]
}

此时你就可以直接 json.dumps(user_dict) 返回给前端了。


五、解释与原理:为什么要这么写?

1. 手动遍历 mapper.columns

而不是用 obj.__dict__

  • __dict__ 会带出 SQLAlchemy 的内部属性(_sa_instance_state 等)
  • mapper.columns 只包含真正的表字段,干净且可控。

2. 统一日期时间格式

if isinstance(val, (date, datetime)):
    val = val.strftime("%Y-%m-%d %H:%M:%S")

好处:

  • 统一格式,前后端约定清晰
  • JSON 友好,不会出现 “Object of type datetime is not JSON serializable”

当然,你也可以改成 ISO 格式:

val.isoformat()

只要全局统一即可。

3. 关系处理的策略与取舍

  • 这里的策略是:当前对象可以展开关系对象,但关系对象的内部不再展开关系include_relationships=False)。
  • 配合 backref_depth,既避免了过度递归,又能适度展开。

你也可以选择更严格:

  • 某些接口完全禁用关系序列化。
  • 某些敏感关系(比如密码、token)不返回。

4. 替代方案

  • 使用 SQLAlchemy 的官方工具或第三方库:

    • sqlalchemy-utilsmarshmallow-sqlalchemypydantic 等进行序列化。
  • 使用 ORM 模型 → Pydantic 模型 的方式进行验证和输出。

本文这种实现属于:

简洁、无额外依赖、立刻能用的“小而美”方案。


六、常见问题与注意事项

1. 性能问题

  • 如果一次性序列化大量对象 + 展开关系,会带来额外的 SQL 查询(N+1 问题)。

  • 建议:

    • 查询时使用 joinedload / selectinload 进行预加载。
    • 对列表接口减少关系展开,或者分页返回。

2. 循环引用仍然可能出现

此实现通过:

  • include_relationships=False
  • backref_depth

来降低风险,但如果你在别的地方又手动递归,仍有可能踩坑。复杂场景建议引入更健壮的方案(例如 Pydantic 模型)。

3. 安全问题(字段泄露)

mapper.columns 会把所有表字段都序列化出来:

  • 包括密码哈希、token、内部状态等敏感字段。

解决办法:

  • _to_dict 中加入一个白名单/黑名单机制:

    • include_fields / exclude_fields
  • 或者在模型上定义可导出的字段列表。


七、最佳实践与建议

  1. 统一封装在 Mixin 或 BaseModel 中 所有模型继承同一个序列化能力,避免到处写重复 to_dict()

  2. 接口按需调整 include_relationships / backref_depth

    • 列表接口:include_relationships=False
    • 详情接口:include_relationships=Truebackref_depth=1
  3. 对日期字段统一规范 制定团队统一的日期时间格式,常见选项:

    • "YYYY-MM-DD HH:MM:SS"
    • "YYYY-MM-DDTHH:MM:SS"(ISO 风格)
  4. 对敏感字段做过滤 直接在 _to_dict 里实现 exclude 逻辑,避免误泄露。

  5. 尽量在查询层解决 N+1 问题 通过 joinedload / selectinload,不要让序列化函数背锅。


八、小结 / 结论

本文从一段简短的 _to_dict 序列化代码出发,讲了:

  • 为什么 ORM 模型序列化没你想的那么简单
  • inspect(mapper)columnsrelationships 的用法
  • 如何处理日期、关系字段、循环引用
  • 性能、安全等常见坑与改善方向

这段代码的定位是:

“轻量、无依赖、可快速集成到现有项目”的通用 SQLAlchemy 序列化工具。

你可以先直接拷贝到项目里用起来,然后根据自己团队的规范(字段过滤、格式要求、性能优化)逐步演进。


九、参考与延伸阅读

你可以检索(或在项目中查阅):

  • SQLAlchemy 官方文档:

    • ORM Mapped Class Configuration
    • inspect() 使用说明
  • 第三方序列化/验证工具:

    • Marshmallow & marshmallow-sqlalchemy
    • Pydantic(尤其是和 SQLAlchemy 集成的示例)

十、元信息(Meta 信息)

  • 预计阅读时间:8–12 分钟

  • 标签PythonSQLAlchemy序列化后端开发JSON

  • SEO 关键词

    • SQLAlchemy 模型转字典
    • Python ORM 序列化
    • SQLAlchemy to_dict 实现
    • SQLAlchemy JSON 响应
  • 元描述(Meta Description): “本文教你用一小段 Python 代码优雅地将 SQLAlchemy 模型序列化为字典,支持日期格式化、关系字段展开与循环引用控制,并给出可运行示例与最佳实践,适合使用 SQLAlchemy 做后端开发的工程师。”


十一、行动号召(CTA)

如果你已经看到这里,可以试着做几件事:

  1. 先把文中的 Mixin 直接放进你的项目试一试: 看看你的 User、Post 等模型转出来的 dict 是什么样子。
  2. 根据自己业务加上字段过滤 / 日期格式配置: 比如加个 exclude_fields 或全局时间格式。
  3. 如果你愿意继续迭代这段代码: