龙空技术网

Python 使用SQLAlchemy数据库模块

微软技术分享 71

前言:

现在朋友们对“pythonsql记录条数”大体比较重视,同学们都想要了解一些“pythonsql记录条数”的相关知识。那么小编同时在网络上搜集了一些关于“pythonsql记录条数””的相关资讯,希望你们能喜欢,看官们一起来了解一下吧!

SQLAlchemy 是用Python编程语言开发的一个开源项目,它提供了SQL工具包和ORM对象关系映射工具,使用MIT许可证发行,SQLAlchemy 提供高效和高性能的数据库访问,实现了完整的企业级持久模型。

ORM(对象关系映射)是一种编程模式,用于将对象与关系型数据库中的表和记录进行映射,从而实现通过面向对象的方式进行数据库操作。ORM 的目标是在编程语言中使用类似于面向对象编程的语法,而不是使用传统的 SQL 查询语言,来操作数据库。

主要思想是将数据库表的结构映射到程序中的对象,通过对对象的操作来实现对数据库的操作,而不是直接编写 SQL 查询。ORM 工具负责将数据库记录转换为程序中的对象,反之亦然。

ORM 的核心概念包括:

1. 实体(Entity): 在 ORM 中,实体是指映射到数据库表的对象。每个实体对应数据库中的一条记录。2. 属性(Attribute): 实体中的属性对应数据库表中的列。每个属性表示一个字段。3. 关系(Relationship): ORM 允许定义实体之间的关系,例如一对多、多对一、多对多等。这种关系会映射到数据库表之间的关系。4. 映射(Mapping): ORM 负责将实体的属性和方法映射到数据库表的列和操作。5. 会话(Session): ORM 提供了会话来管理对象的生命周期,包括对象的创建、更新和删除。6. 查询语言: ORM 通常提供一种查询语言,允许开发者使用面向对象的方式编写查询,而不是直接使用 SQL。

对象映射ROM模型可连接任何关系数据库,连接方法大同小异,以下总结了如何连接常用的几种数据库方式。

# sqlite 创建数据库连接engine = create_engine('sqlite:///database.db', echo=False)# sqlite 创建内存数据库engine = create_engine('sqlite://')engine = create_engine('sqlite:///:memory:', echo=True)# PostgreSQL 创建数据库连接engine = create_engine('postgresql://scott:tiger@localhost/mydatabase')                # defaultengine = create_engine('postgresql+psycopg2://scott:tiger@localhost/mydatabase')       # psycopg2engine = create_engine('postgresql+pg8000://scott:tiger@localhost/mydatabase')         # pg8000# MySQL 创建数据库连接engine = create_engine('mysql://scott:tiger@localhost/foo')                  # defaultengine = create_engine('mysql+mysqldb://scott:tiger@localhost/foo')          # mysql-pythonengine = create_engine('mysql+mysqlconnector://scott:tiger@localhost/foo')   # MySQL-connector-pythonengine = create_engine('mysql+oursql://scott:tiger@localhost/foo')           # OurSQL# Oracle 创建数据库连接engine = create_engine('oracle://scott:tiger@127.0.0.1:1521/sidname')engine = create_engine('oracle+cx_oracle://scott:tiger@tnsname')# MSSQL 创建数据库连接engine = create_engine('mssql+pyodbc://scott:tiger@mydsn')                   # pyodbcengine = create_engine('mssql+pymssql://scott:tiger@hostname:port/dbname')   # pymssql
数据表创建

简单的创建一个User映射类,映射到UserDB库上,分别增加几个常用的数据库字段,并插入一些测试数据。

import sqlite3,datetime,timefrom sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, Float, DateTime, Date, Time, Boolean, DECIMAL, Enum, Text# 建立基本映射类Base = declarative_base()# 创建SQLITE数据库engine = create_engine('sqlite:///:memory:', echo=False)# 创建映射类Userclass User(Base):    __tablename__ = 'UserDB'    # 主键 primary_key | 自动增长 autoincrement | 不为空 nullable | 唯一性约束 unique    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)    # 字符串类型    username = Column(String(32), nullable=True, default="none")    password = Column(String(32), nullable=True, default="none")    # 姓名字段默认值是0    age = Column(Integer,nullable=False, default=0)    # 增加创建日期 [日期:时间]    create_time = Column(DateTime, default=datetime.datetime.now)    # onupdate=datetime.now 每次更新数据的时候都要更新该字段值    update_time = Column(DateTime, onupdate=datetime.datetime.now, default=datetime.datetime.now)    # 增加用户分数    user_value = Column(Float, default=0.0)    # 枚举类型定义    # tag = Column(Enum("python",'flask','django'))    # __repr__方法用于输出该类的对象被print()时输出的字符串,如果不想写可以不写    def __repr__(self):        return "<UserDB(username='%s',password='%s')>" % (self.username,self.password)if __name__ == "__main__":    print("当前表名: {}".format(User.__table__))    # 创建会话    Session = sessionmaker(bind=engine)    session = Session()    # 创建数据表    Base.metadata.create_all(engine, checkfirst=True)    # 逐条增加新记录    insert_user = User(username='lyshark', password='123456', age=24, user_value=12.5)    session.add(insert_user)    insert_user = User(username='sqlalchemy', password='123', age=34, user_value=45.8)    session.add(insert_user)    # 插入多条记录    session.add_all(        [         User(username="admin", password="123123", age=54, user_value=66.9),         User(username="root", password="3456576", age=67, user_value=98.4)        ]    )    # 提交事务    session.commit()
数据库查询

演示了通过ORM关系映射实现对单表的简单查询与筛选过滤功能。

import sqlite3,time,datetimefrom sqlalchemy import funcfrom sqlalchemy import or_from sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, Float,DateTime# 建立基本映射类Base = declarative_base()# 创建SQLITE数据库engine = create_engine('sqlite:///:memory:', echo=False)# 创建映射类Userclass User(Base):    __tablename__ = 'UserDB'    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)    username = Column(String(32), nullable=True, default="none")    password = Column(String(32), nullable=True, default="none")    age = Column(Integer,nullable=False, default=0)    create_time = Column(DateTime, default=datetime.datetime.now)    update_time = Column(DateTime, onupdate=datetime.datetime.now, default=datetime.datetime.now)    user_value = Column(Float, default=0.0)    # __repr__方法用于输出该类的对象被print()时输出的字符串,如果不想写可以不写    def __repr__(self):        return "<UserDB(username='%s',password='%s')>" % (self.username,self.password)if __name__ == "__main__":    # 创建会话    Session = sessionmaker(bind=engine)    session = Session()    # 创建数据表(存在则跳过)    Base.metadata.create_all(engine, checkfirst=True)    # 查询所有字段    all_value = session.query(User).all()    for item in all_value:        print("ID: {} --> 用户: {}".format(item.id, item.username))    # 查询指定字段    key_value = session.query(User.username,User.password).all()    print(key_value)    # 查询第一条    first_value = session.query(User).first()    print("第一条记录: {} {}".format(first_value.username, first_value.password))    # 使用过滤器 [ 过滤出age>20的用户,输出其(id,username)字段 ]    filter_value = session.query(User.id,User.username).filter(User.age > 20).all()    print("过滤结果: {}".format(filter_value))    # 排序输出 [ 正序/倒序 ]    sort_value = session.query(User.username,User.age).order_by(User.age).all()    print("正序排列: {}".format(sort_value))    sort_value = session.query(User.username,User.age).order_by(User.age.desc()).all()    print("倒序排列: {}".format(sort_value))    # 查询计数    count_value = session.query(User).count()    print("记录条数: {}".format(count_value))    # and/or 条件过滤 默认为and 在filter()中用,分隔多个条件表示,如果是or则需增加or_连接多个条件    and_value = session.query(User.username,User.age).filter(User.age >= 20, User.age <= 40).all()    print("与查询: {}".format(and_value))    or_value = session.query(User.username,User.age).filter(or_(User.age >= 20, User.age <= 40)).all()    print("或查询: {}".format(or_value))    # 等于查询    equal_value = session.query(User.username,User.password).filter(User.age == 67).all()    print("等于查询: {}".format(equal_value))    not_equal_value = session.query(User.username,User.password).filter(User.age != 67).all()    print("不等于查询: {}".format(not_equal_value))    # like模糊匹配    like_value = session.query(User.username,User.create_time).filter(User.username.like("%ly%")).all()    print("模糊匹配: {}".format(like_value))    # in查询范围    in_value = session.query(User.username,User.password).filter(User.age.in_([24,34])).all()    print("查询两者: {}".format(in_value))    not_in_value = session.query(User.username,User.password).filter(User.age.notin_([24,34])).all()    print("查询非两者: {}".format(not_in_value))    # op正则匹配查询    op_value = session.query(User.username).filter(User.username.op("regexp")("^a")).all()    print("正则匹配: {}".format(op_value))    # 调用数据库内置函数    func_value = session.query(func.count(User.age)).one()    print("调用函数: {}".format(func_value))    # 数据切片    cat_value = session.query(User.username).all()[:2]    print("输出前两条: {}".format(cat_value))    cat_value = session.query(User.username).offset(5).limit(3).all()    print("第6行开始显示前3个: {}".format(cat_value))    cat_value = session.query(User.username).order_by(User.id.desc())[0:10]    print("输出最后10个: {}".format(cat_value))    # 非空查询    isnot_value = session.query(User).filter(User.username.isnot(None)).all()    print("非空显示: {}".format(isnot_value))    null_value = session.query(User).filter(User.username.is_(None)).all()    print("为空显示: {}".format(null_value))    # 分组测试    group_by = session.query(User.username,func.count(User.id)).group_by(User.age).all()    print("根据年龄分组: {}".format(group_by))    # 进一步过滤查询    having_by = session.query(User.username,User.age).group_by(User.age).having(User.age > 30).all()    print("以age分组,并查询age大于30的记录: {}".format(having_by))
数据库修改

演示了修改数据库参数以及对数据库指定记录的删除功能。

import sqlite3,time,datetimefrom sqlalchemy import funcfrom sqlalchemy import or_from sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, Float,DateTime# 建立基本映射类Base = declarative_base()# 创建SQLITE数据库engine = create_engine('sqlite:///:memory:', echo=False)# 创建映射类Userclass User(Base):    __tablename__ = 'UserDB'    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)    username = Column(String(32), nullable=True, default="none")    password = Column(String(32), nullable=True, default="none")    age = Column(Integer,nullable=False, default=0)    create_time = Column(DateTime, default=datetime.datetime.now)    update_time = Column(DateTime, onupdate=datetime.datetime.now, default=datetime.datetime.now)    user_value = Column(Float, default=0.0)    # __repr__方法用于输出该类的对象被print()时输出的字符串,如果不想写可以不写    def __repr__(self):        return "<UserDB(username='%s',password='%s')>" % (self.username,self.password)if __name__ == "__main__":    # 创建会话    Session = sessionmaker(bind=engine)    session = Session()    # 创建数据表(存在则跳过)    Base.metadata.create_all(engine, checkfirst=True)    # 修改数据: 先查询在修改    select_update = session.query(User).filter_by(username="lyshark").first()    select_update.password = "test1234"    session.commit()    # 修改数据: 直接修改    session.query(User).filter_by(username="lyshark").update({User.password: 'abcd'})    session.commit()    session.query(User).filter_by(username="lyshark").update({"password": '123456'})    session.commit()    # 删除数据: 先查询在删除    del_ptr = session.query(User).filter_by(username="lyshark").first()    session.delete(del_ptr)    session.commit()    # 删除数据: 直接删除    session.query(User).filter(User.username=="sqlalchemy").delete()    session.commit()
数据库查询转字典

将从数据库中过滤查询指定的记录,并将该记录转换为字典JSON格式,利于解析。

import sqlite3,time,datetime,jsonfrom sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, Float,DateTime# 建立基本映射类Base = declarative_base()# 创建SQLITE数据库engine = create_engine('sqlite:///:memory:', echo=False)# 创建映射类Userclass User(Base):    __tablename__ = 'UserDB'    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)    username = Column(String(32), nullable=True, default="none")    password = Column(String(32), nullable=True, default="none")    age = Column(Integer,nullable=False, default=0)    create_time = Column(DateTime, default=datetime.datetime.now)    update_time = Column(DateTime, onupdate=datetime.datetime.now, default=datetime.datetime.now)    user_value = Column(Float, default=0.0)    # 查询结果转字典 (保留数据类型)    def single_to_dict(self):        return {c.name: getattr(self, c.name) for c in self.__table__.columns}    # 查询结果转字典 (全转为字符串)    def dobule_to_dict(self):        result = {}        for key in self.__mapper__.c.keys():            if getattr(self, key) is not None:                result[key] = str(getattr(self, key))            else:                result[key] = getattr(self, key)        return result# 将查询结果转为JSONdef to_json(all_vendors):    v = [ ven.dobule_to_dict() for ven in all_vendors ]    return vif __name__ == "__main__":    # 创建会话    Session = sessionmaker(bind=engine)    session = Session()    # 创建数据表(存在则跳过)    Base.metadata.create_all(engine, checkfirst=True)    # 查询结果转为字典(保持数据库格式)    key_value = session.query(User).first()    data = key_value.single_to_dict()    print("转为字典: {}".format(data))    # 查询结果转为字典(字符串格式)    key_value = session.query(User).first()    data = key_value.dobule_to_dict()    print("转为字符串字典: {}".format(data))    # 查询结果转为JSON格式    key_value = session.query(User)    data = to_json(key_value)    print("转为JSON格式: {}".format(data))
数据库类内函数调用

用户在使用ORM模型定义类时,可以同时在该映射类中定义各种针对类模型的处理函数,实现对数据的动态处理

from werkzeug.security import generate_password_hash,check_password_hashimport sqlite3,datetime,timefrom sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, Float, DateTime, Date, Time, Boolean, DECIMAL, Enum, Text# 创建SQLITE数据库engine = create_engine("sqlite:///:memory:", encoding='utf-8')Base = declarative_base()                                       # 生成orm基类# 创建会话Session_class = sessionmaker(bind=engine)                       # 创建与数据库的会话sessionsession = Session_class()                                       # 生成session实例# 创建映射类Userclass User(Base):    __tablename__ = 'UserDB'    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)    username = Column(String(64))    _password_hash_ = Column(String(256))                       # 加下划线作为私有函数,无法被外部访问    email = Column(String(64))    # 设置一个password字段用来设置密码    @property    def password(self):        raise Exception("密码不能被读取")    # 赋值password字段时,则自动加密存储    @password.setter    def password(self, value):        self._password_hash_ = generate_password_hash(value)    # 使用 check_password,进行密码校验 返回True False。    def check_password(self, pasword):        return check_password_hash(self._password_hash_, pasword)    # 设置输出函数    def print_function(self):        # 密码不可读调用 self.password 会报错        # print("用户: {} 密码: {}".format(self.username,self.password))        print("用户: {} email: {}".format(self.username, self.email))        return Trueif __name__ == "__main__":    print("当前表名: {}".format(User.__table__))    # 创建数据表    Base.metadata.create_all(engine, checkfirst=True)    # 插入测试数据    insert = User(username="lyshark",password="123123",email="lyshark@163.com")    session.add(insert)    insert = User(username="admin",password="556677",email="lyshark@163.com")    session.add(insert)    session.commit()    # 查询测试    tag = session.query(User).filter_by(username="lyshark").first()    print("测试密码是否正确: {}".format(tag.check_password("123123")))    # 调用函数验证当前用户    tag = session.query(User).filter_by(username="admin").first()    func = tag.print_function()    print("输出测试: {}".format(func))
数据库聚合函数

通过func库调用数据库内的聚合函数,实现统计最大最小平均数等数据。

import sqlite3,datetime,timefrom sqlalchemy import funcfrom sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, Float, DateTime, Date, Time, Boolean, DECIMAL, Enum, Text# 建立基本映射类Base = declarative_base()# 创建SQLITE数据库engine = create_engine('sqlite:///:memory:', echo=False)# 创建映射类Userclass User(Base):    __tablename__ = 'UserDB'    # 主键 primary_key | 自动增长 autoincrement | 不为空 nullable | 唯一性约束 unique    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)    # 字符串类型    username = Column(String(32), nullable=True, default="none")    password = Column(String(32), nullable=True, default="none")    # 姓名字段默认值是0    age = Column(Integer,nullable=False, default=0)    # 增加创建日期 [日期:时间]    create_time = Column(DateTime, default=datetime.datetime.now)    # onupdate=datetime.now 每次更新数据的时候都要更新该字段值    update_time = Column(DateTime, onupdate=datetime.datetime.now, default=datetime.datetime.now)    # 增加用户分数    user_value = Column(Float, default=0.0)    # 枚举类型定义    # tag = Column(Enum("python",'flask','django'))    # __repr__方法用于输出该类的对象被print()时输出的字符串,如果不想写可以不写    def __repr__(self):        return "<UserDB(username='%s',password='%s')>" % (self.username,self.password)if __name__ == "__main__":    print("当前表名: {}".format(User.__table__))    # 创建会话    Session = sessionmaker(bind=engine)    session = Session()    # 创建数据表    Base.metadata.create_all(engine, checkfirst=True)    # 统计总数    count = session.query(func.count(User.id)).first()    print("总记录: {}".format(count))    # age 字段平均值    age_avg = session.query(func.avg(User.age)).first()    print("平均值: {}".format(age_avg))    # age 字段最大值    age_max = session.query(func.max(User.age)).first()    print("最大值: {}".format(age_max))    # age 最小值    age_min = session.query(func.min(User.age)).one()    print("最小值: {}".format(age_min))    # age 求和 只求前三个的和    age_sum = session.query(func.sum(User.age)).one()    print("求总数: {}".format(age_sum))    # 提交事务    session.commit()
ORM定义一对多关系

SQLAlchemy提供了一个relationship,这个类可以定义属性,以后在访问相关联的表的时候就直接可以通过属性访问的方式就可以访问得到。

from sqlalchemy.orm import sessionmakerfrom sqlalchemy.orm import relationshipfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import create_engine, Column, Integer, String, Text, func, ForeignKey# 打开数据库Base = declarative_base()engine = create_engine('sqlite:///:memory:', echo=False)# 主表class Author(Base):    __tablename__ = 'author'    id = Column(Integer, primary_key=True, autoincrement=True)    name = Column(String(10), nullable=False)    # 定义外键关联到Book模型上面,主表是author    books = relationship('Book', backref='author')    def __repr__(self):        return '<Author:(id={}, name={})>'.format(self.id, self.name)# 从表class Book(Base):    __tablename__ = 'book'    id = Column(Integer, primary_key=True, autoincrement=True)    name = Column(String(20), nullable=False)    # 外键关联到主表author的id字段上    author_id = Column(Integer, ForeignKey('author.id',ondelete="RESTRICT"))    def __repr__(self):        return '<Book:(id={}, name={}, author_id={})>'.format(self.id, self.name, self.author_id)if __name__ == "__main__":    # 创建会话    Session = sessionmaker(bind=engine)    session = Session()    # 创建数据表(存在则跳过)    Base.metadata.create_all(engine, checkfirst=True)    # ------------------------------------------------------    # 创建数据并插入    author1 = Author(id=1, name="张三")    author2 = Author(id=2, name="李四")    book1 = Book(name="<Python 开发>", author_id=1)    book2 = Book(name="<C++ 开发教程>", author_id=1)    book3 = Book(name="<C# 从入门到精通>", author_id=1)    book4 = Book(name="<渗透测试指南>", author_id=2)    book5 = Book(name="<nmap 扫描工具指南>", author_id=2)    session.add_all([author1,book1,book2,book3])    session.add_all([author2,book4,book5])    session.commit()    # ------------------------------------------------------    # 关联插入模式    author1 = Author(id=3, name="王五")    book1 = Book(name="<Python 开发>", author_id=1)    book2 = Book(name="<C++ 开发教程>", author_id=1)    book3 = Book(name="<C# 从入门到精通>", author_id=1)    author1.books.append(book1)    author1.books.append(book2)    author1.books.append(book3)    session.add(author1)    session.commit()    # ------------------------------------------------------    # 一对多查询测试    book = session.query(Book).get(1)    print("书籍作者: {}".format(book.author.name))    author = session.query(Author).get(1)    print("书籍数量: {}".format(len(author.books)))    for book_name in author.books:        print("书籍: {}".format(book_name.name))
ORM定义一对一关系

如果想要将两个模型映射成一对一的关系,那么应该在父模型中,指定引用的时候,要传递一个uselist=False参数进去。就是告诉父模型,以后引用这个从模型的时候,不再是一个列表了,而是一个对象了。

from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker,relationship,backreffrom sqlalchemy import create_engine, Column, Integer, String, Text, func, ForeignKey# 打开数据库Base = declarative_base()engine = create_engine('sqlite:///:memory:', echo=False)# 主表class User(Base):    __tablename__ = 'user'    id = Column(Integer, primary_key=True, autoincrement=True)    username = Column(String(50), nullable=False)    def __repr__(self):        return 'User(username:%s)' % self.username# 从表class UserExtend(Base):    __tablename__ = 'user_extend'    id = Column(Integer,primary_key=True,autoincrement=True)    school = Column(String(50))    age = Column(String(32))    sex = Column(String(32))    uid = Column(Integer,ForeignKey('user.id'))    user = relationship('User',backref=backref('extend', uselist=False))    #uselist=False 告诉父模型 以后引用时不再是列表 而是对象    def __repr__(self):        return 'extend(school:%s)'%self.schoolif __name__ == "__main__":    # 创建会话    Session = sessionmaker(bind=engine)    session = Session()    # 创建数据表(存在则跳过)    Base.metadata.create_all(engine, checkfirst=True)    # ------------------------------------------------------    # 插入测试数据    user = User(username="lyshark")    extend = UserExtend(school="<家里蹲大学>", age="22", sex="M")    extend.user = user    session.add(extend)    session.commit()    # ------------------------------------------------------    # 一对一关系测试    user_ptr = session.query(User).first()    print("用户名: {} --> 学校: {}".format(user_ptr.username,user_ptr.extend.school))    extend_ptr = session.query(UserExtend).first()    print("用户名: {} --> 学校: {} --> 年龄: {} --> 性别: {}".format(extend_ptr.user.username,extend_ptr.school,extend_ptr.age,extend_ptr.sex))
ORM定义多对多关系

多对多与上面的一对多,一对一不同,创建多对对必须使用中间表Table来解决查询问题。

• 多对多的关系需要通过一张中间表来绑定他们之间的关系。• 先把两个需要做多对多的模型定义出来• 使用Table定义一个中间表,中间表一般就是包含两个模型的外键字段就可以了,并且让他们两个来作为一个“复合主键”。• 在两个需要做多对多的模型中随便选择一个模型,定义一个relationship属性,来绑定三者之间的关系,在使用relationship的时候,需要传入一个secondary=中间表。

from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker,relationship,backreffrom sqlalchemy import create_engine, Column, Integer, String, Text, func, ForeignKey, Tablefrom sqlalchemy import Column,INT,VARCHAR,ForeignKeyfrom sqlalchemy.orm import relationship# 打开数据库Base = declarative_base()engine = create_engine('sqlite:///:memory:', echo=False)# 女孩表class Girls(Base):    __tablename__ = "girl"    id = Column(Integer,primary_key=True, autoincrement=True)    name = Column(String(32))    # 建立多对多关系    g2b = relationship("Boys",backref="b2g",secondary="hotel")# 男孩表class Boys(Base):    __tablename__ = "boy"    id = Column(Integer,primary_key=True, autoincrement=True)    name = Column(String(32))# 映射关系表class Table(Base):    __tablename__ = "hotel"    id = Column(Integer, primary_key=True, autoincrement=True)    boy_id = Column(Integer,ForeignKey("boy.id"))    girl_id = Column(Integer,ForeignKey("girl.id"))if __name__ == "__main__":    # 创建会话    Session = sessionmaker(bind=engine)    session = Session()    # 创建数据表(存在则跳过)    Base.metadata.create_all(engine, checkfirst=True)    # ------------------------------------------------------    # 增加数据 - relationship 正向    girl_obj = Girls(name="女孩主")    girl_obj.g2b = [Boys(name="男孩从1"),Boys(name="男孩从2")]    session.add(girl_obj)    session.commit()    # 增加数据 - relationship 反向    boy_obj = Boys(name="男孩主")    boy_obj.b2g = [Girls(name="女孩从1"),Girls(name="女孩从2")]    session.add(boy_obj)    session.commit()    # ------------------------------------------------------    # 正向查询    girl_obj_list = session.query(Girls).all()    for girl_obj in girl_obj_list:        for boy in girl_obj.g2b:            print(girl_obj.name,boy.name)    # 反向查询    boy_obj_list = session.query(Boys).all()    for boy in boy_obj_list:        for girl in boy.b2g:            print(girl.name,boy.name)
连接查询与子查询

连接查询通过JOIN语句实现,子查询则通过subquery实现,首先需要创建一对多关系然后才可使用子查询。

from sqlalchemy.orm import sessionmakerfrom sqlalchemy.orm import relationshipfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import create_engine, Column, Integer, String, Text, func, ForeignKey# 打开数据库Base = declarative_base()engine = create_engine('sqlite:///:memory:', echo=False)# 主表class Author(Base):    __tablename__ = 'author'    id = Column(Integer, primary_key=True, autoincrement=True)    name = Column(String(10), nullable=False)    # 定义外键关联到Book模型上面,主表是author    books = relationship('Book', backref='author')    def __repr__(self):        return '<Author:(id={}, name={})>'.format(self.id, self.name)# 从表class Book(Base):    __tablename__ = 'book'    id = Column(Integer, primary_key=True, autoincrement=True)    name = Column(String(20), nullable=False)    # 外键关联到主表author的id字段上    author_id = Column(Integer, ForeignKey('author.id',ondelete="RESTRICT"))    def __repr__(self):        return '<Book:(id={}, name={}, author_id={})>'.format(self.id, self.name, self.author_id)if __name__ == "__main__":    # 创建会话    Session = sessionmaker(bind=engine)    session = Session()    # 创建数据表(存在则跳过)    Base.metadata.create_all(engine, checkfirst=True)    # ------------------------------------------------------    # 创建数据并插入    author1 = Author(id=1, name="张三")    author2 = Author(id=2, name="李四")    book1 = Book(name="<Python 开发>", author_id=1)    book2 = Book(name="<C++ 开发教程>", author_id=1)    book3 = Book(name="<C# 从入门到精通>", author_id=1)    book4 = Book(name="<渗透测试指南>", author_id=2)    book5 = Book(name="<nmap 扫描工具指南>", author_id=2)    session.add_all([author1,book1,book2,book3])    session.add_all([author2,book4,book5])    session.commit()    # ------------------------------------------------------    # 连表查询    no_join_select = session.query(Author).filter(Author.id == Book.id).filter(Author.name == "王五").all()    print("查询主键==从键 并且 Author.name == 王五的记录: {}".format(no_join_select))    # JOIN 连接查询    join = session.query(Author).join(Book).filter(Book.name=="<nmap 扫描工具指南>").first().name    print("查询主表Author中的Book书名的作者是: {}".format(join))    join = session.query(Book).join(Author).filter(Author.name=="李四").all()    for book in join:        print("查询从表Book中的Author作者有哪些书: {}".format(book.name))    # subquery 子查询    sbq = session.query(Book.author_id,func.count('*').label("book_count")).group_by(Book.author_id).subquery()    print("查询出书籍编号计数(子语句): {}".format(sbq))    sub_join = session.query(Author.name,sbq.c.book_count).outerjoin(sbq,Author.id == sbq.c.author_id).all()    print("查询用户有几本书(主语句): {}".format(sub_join))

标签: #pythonsql记录条数