SQLAlchemy
是Python最广泛使用的一个ORM(对象关系映射,简单地说就是把数据库的表即各种操作映射到Python对象上面来)工具。它支持操作PostgreSQL
、MySQL
、Oracle
、Microsoft SQL Server
、SQLite
等支持SQL的数据库。文档地址
需要特别注意的是,SQLAlchemy
只是适用于一些通用的微型框架,而全栈框架Django
的orm在结合特定框架用起来可能更加便利,所以在使用SQLAlchemy
的时候,如果不知道怎么完成复杂的定义,那就干脆自己写sql吧,自己去join什么的
有另外一个选择peewee
,提供类似Django那样又好的查询API,比SQLAlchemy
易用,虽然可能没那么强大,性能可能也没那么好(并没有人去对比过性能),但是peewee
还不支持Oracle
等数据库,虽然我不用,但是为了防止以后多学习一门,就决定是SQLAlchemy
了
SQLAlchemy
本身并不支持异步,在tornado/sanic
中只有手动去执行异步
SQLAlchemy安装
SQLAlchemy连接数据库
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 engine = create_engine('postgresql://scott:tiger@localhost/mydatabase' ) engine = create_engine('mysql://scott:tiger@localhost/foo?charset=utf8' , echo=True , pool_size=5 , max_overflow=10 , pool_recycle=-1 , pool_timeout=30 , pool_pre_ping=True ) engine = create_engine('oracle://scott:tiger@127.0.0.1:1521/sidname' ) engine = create_engine('sqlite:///foo.db' ) DBSession = sessionmaker(bind=engine, autocommit=True ) session = DBSession() new_user = User(id ='1' , name='haofly' ) session.add(new_user) session.commit() session.close()
需要注意的是,如果没有修改autocommit的默认值(False),那么一个session会一直保持,直到该session被回滚、关闭、提交才结束。每次发起请求,都创建一个新的session(注意不是创建新的连接,创建session并不会有多大的开销),一个session就是一个transaction的支持。我们可以让session是一个全局的对象,这样和数据库通信的session在任何时候只有一个,但是全局的session不是线程安全的,如果多线程的情况下,可能会造成commit错乱,tornado
这种单线程程序由于其异步的特性也不可以那样做(Tornado可以在每个Handler
的初始化进行session的创建与提交销毁)。当然,如果是在单线程的情况下,我们完全可以保持session的单例,减少一丢丢的开销。下面这种方式对于多线程还是单现成都是非常推荐的做法:
1 2 3 4 5 6 7 8 from sqlalchemy.orm import scoped_sessionfrom sqlalchemy.orm import sessionmakersession_factory = sessionmaker(bind=some_engine) Session = scoped_session(session_factory) some_session = Session() some_other_session = Session() some_session is some_other_session
Model/数据表定义 表定义 1 2 3 4 5 6 7 8 9 10 11 12 from sqlalchemy.ext.declarative import declarative_baseBase = declarative_base() class User (Base ): __tablename__ = 'users' __mapper_args__ = {'column_prerfix' : '_' } id = Column('user_id' , Integer, primary_key=True ) class __str__ (self ): return f'<{self.__class__.__module__} .{self.__class__.__name__} (id={self.id } )>' User.__table__.columns
列定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 BigInteger Boolean Enum Float SmallInteger Integer(unsigned=False ) Interval Numeric JSON LargeBinary(length=None ) PickleType SchemaType String(50 ) Text(length=None ) Unicode UnicodeText Date DateTime Time TIMESTAMP fullname = column_property(firstname + ' ' + lastname) primary_key=True comment='' table_name.column_name.name nullable=False
关联关系定义
relationship
的几个常用的参数
backref
是在一对多或者多对一关系之间简历双向的关系
lazy
懒加载,默认为True
remote_side
: 外键是自身时使用,例如remote_side=[id]
secondary
: 指向多对多的中间表
一对多/多对一 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class User (Base ): __tablename__ = 'users' id = Column(Integer, primary_key=True ) posts = relationship('Post' , backref='post' ) class Post (Base ): __tablename__ = 'posts' user_id = Column(Integer, ForeignKey('users.id' )) user = relationship('User' , back_populates='posts' , cascade='all, delete, delete-orphan' ) user = User(...) user.posts = [ Post(...), Post(...) ] user.posts post.user session.commit()
一对一 仅需要将上面的一对多关系中uselist=False
即可
1 2 3 class User (Base ): __tablename__ = 'users' posts = relationship('Post' , uselist=False , back_populates='post' )
多对多
关于一个表同一个字段对应多张表的外键(类似Laravel/Django
中的target_id/targe_type
定义方式),sqlalchemy
没有一个官方的定义方式,有个现成的Generic relationships ,但是该库作者已经许久没维护了。我的建议是自己join吧。
如上一条SQlAlchemy
里面比较难实现复杂的多对多关系,所以官方的文档就干脆建议大家连关系表都不用单独建daemon了,直接按照下面的方法来更简单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 user_privilege_relationship = Table('user_privilge_relationships' , Base.metadata, Column('user_id' , Integer, ForeignKey('users.id' )) Column('privilege_id' , Integer, ForeignKey('privilege.id' )) ) class User (Base ): __tablename__ = 'users' id = Column(Integer, primary_key=True ) privileges = relationship('Privilege' , secondary=user_privilege_relationship, backref='users' ) class Privilege (Base ) __tablename__ = 'privileges' id = Column(Integer, primary_key=True ) users = relationship('User' , secondary=user_privilege_relationship, backref='privileges' )
列方法 1 2 3 4 5 6 7 8 9 10 @hybrid_property def fullname (self ): return self.firstname + ' ' + self.lastname @validates('email' ) def validate_email (self, key, address ): assert '@' in address return address
CRUD
像join
自身类似的需求,可以使用别名user_model1 = aliased(UserModel)
目前没有找到合适的方法去返回影响的行数,但是在UPDATE/DELETE
方法中可以使用result.rowcount
来返回SQL中where语句匹配到的行数,折衷方案是可以多加一个where条件去返回实际的影响行数。
执行原生语句,返回的是ResultProxy
对象:
1 2 3 result = conn.execute("INSERT INTO user (name) VALUES ('haofly')" ) result = conn.execute("INSERT INTO user (name) VALUES ('haofly') RETURNING id" ) result.fetchall()
执行原生语句的时候,防止SQL注入:
1 2 3 4 5 6 bind_sql = 'SELECT * FROM xxx WHERE field = :value' session.execute(bind_sql, {'value' : 'value1' }) session.execute(MyModel.__table__.insert(), modelDict) session.execute(MyModel.__table__.insert(), modelDicts)
查询
filter_by
只能用=
,而filter
可以用==,!=
等多种取值方式,且必须带表名
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 query = session.query(User) print (query) query.statement query.count() query.get(2 ) query.first() query.all () session.query(User.id ).distinct().all () query.limit(2 ).offset(2 ).all () query.filter ( getattr (User, 'icon_id' ) == 3 , User.id ==2 , User.age>10 , User.deleted_at == None , User.name.in_(['hao' , 'fly' ]) ).first().name query.filter ('id = 2' ).first() query.filter_by(deleted_at == None ) query.order_by('user_name' ).all () query.order_by(desc('name' )).all () query(func.count('*' )).all () query(func.json_contains(User.age, '{"A":"B"}' )).all () session.query(User.name) session.query(User.id , User.name) session.query.with_entities(User.id , User.name) query2.filter (or_(User.id == 1 )) query2.filter (or_(User.id == 1 , User.name.like('' ))) query(User).join(Post, User.id == Post.user_id).all () query(User).join(Post, and_(User.id == Post.user_id, User.deleted_at==None )) query.filter (Post.user == user) query.filter (Post.user == None ) query.filter (User.posts.contains(post)) query.filter (User.posts.any (title='hao' )) query.filter (Post.user.has(name='haofly' )) from sqlalchemy.sql import existsstmt = exists().where(Post.user_id==User.id ) for name, in session.query(User.name).filter (stmt): print (name) query.filter (User.name.like('%王%' )) MyModel.query.filter (User.name.like('%hao%' ))
插入
注意在连接数据库时autoflush
参数默认为True
,但是并不是add
之后就自动将语句flush
到数据库,而是指每次查询前回自动flush
,所以无论autoflush
是否为True
,add
之后都需要手动session.flush()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 session.add(User(name='haofly' )) session.flush() session.bulk_save_objects([User(name="wang" ) for i in xrange(1000 )]) result = session.execute( User.__table__.insert(), [{'name' : 'wang' , 'age' : 10 }, {}] ) session.commit() result.lastrowid modelobj.id
修改 1 2 3 4 5 6 7 8 9 query.filter (...).update({User.age: 10 }) session.flush() user.name = 'new' session.commit() for key, value in mydict.items(): setattr (user, key, value)
删除 1 2 session.delete(user) session.flush()
自定义SQL构造 1 2 3 4 5 6 7 8 9 10 11 from sqlalchemy.sql.expression import Insert@compiles(Insert ) def prefix_inserts (insert, compiler, **kw ): s = compiler.visit_insert(insert, **kw) if 'append_string' in insert.kwargs: return s + " " + insert.kwargs['append_string' ] return s session.execute(MyModel.__table__.insert(append_string = 'ON DUPLICATE KEY UPDATE fieldname="abc"' ), objects)
其他 1 2 session.rollback() session.commit()
Event事件 Attribute Events属性相关事件 1 append/bulk_replace/dispose_collection/init_collection/init_scalar/modified/remove/set
###Mapper Events
都无法获取请求context,接收的参数仅仅是Mapper、Connection、Target(目标model对象)
1 2 after_configuree/after_delete/after_insert/after_update/before_configured/before_delete/ before_insert/before_update/instrument_class/mapper_configured
例如
1 2 3 4 5 6 7 8 9 10 @event.listens_for(Test, 'after_update' ) def receive_after_update (mapper: Mapper, connection: Connection, target: Test ): print ('receive_after_update' ) state = inspect(target) for attr in state.attrs: hist = attr.load_history() if hist.has_changes(): print ('change ' + attr.key + ' from ' + str (hist.non_added()[0 ]) + ' to ' + str (hist.non_deleted()[0 ]))
Instance Events 1 expire/first_init/init/init_failure/load/pickle/refresh/refresh_flush/unpickle
Session Events 1 after_attach/after_begin/after_bulk_delete/after_bulk_update/after_commit/after_flush/after_flush_postexec/after_rollback/after_soft_rollback/after_transaction_create/after_transaction_end/before_attach/before_commit/before_flush/deleted_to_detached/deleted_to_persistent/detached_to_persistent/loaded_as_persistent/pending_to_persistent/pending_to_transient/persistent_to_deleted/persistent_to_detached/persistent_to_transient/transient_to_pending
TroubleShooting
Tornado中使用SQLAlchemy连接SQLite进行commit操作的时候程序中断: Segment Fault : 原因是SQLite
的自增主键id
重复了😂
UnicodeEncodeError:’latin-1’ codec can’t encode characters in position 0-1: ordinal not in range(256) : 连接数据库没有指定utf8的charset,参考本文连接数据库设置。
UnicodeEncodeError: ‘ascii’ codec can’t encode characters in position 7-8: ordinal not in range(128) : 除了上面那种可能,还有种可能是直接把含有中文的json对象拿来给model的字符类型赋值了
Can’t recoonect until invalid transaction is rolled back : 要么在每次执行sql语句之后主动close,要么在连接的时候设置autocommit=True
MySQL server has gone away : 程序运行久了出现该问题。如果是使用了线程池,那么可能的原因是线程池的回收时间大于了mysql的最长交互时间(可使用SHOW VARIABLES LIKE '%interactive_timeout%';
查看)。这个时候可以把POOL_RECYCLE
参数设置为比那个时间小就行了。
2013 Lost connection to MySQL server during query : 原因是超过了wait_timeout
规定的时间了,首先show GLOBAL variables LIke '%wait_timeout%'
看看全局的超时时间是多少(这里一定要先看GLOBAL的,因为当前session的会首先被全局的影响),这种情况,尽量优化sql,实在不行再修改这个配置。
SqlAlchemy实现ON DUPLICATE KEY UPDATE : 目前没找到ORM的实现方式,但是有相对复杂的方式 ,更简单的方式是直接执行原生语句,在后面添加ON DUPLICATE KEY UPDATE
即可。可以参见上面的自定义SQL构造方法。
扩展阅读 SQLAlchemy 的 scoped_session 是啥玩意