【Python百日进阶-Web开发-Peewee】Day298 - 骇客 Hacks
文章目录
- 十六 骇客 Hacks
- 16.1 乐观锁定 Optimistic Locking
- 16.2 每组顶部对象 Top object per group
- 16.3 每组前 N 个对象 Top N objects per group
- 16.3.1 Postgres 横向连接 Postgres lateral joins
- 16.3.2 窗口函数 Window functions
- 16.3.3 其他方法 Other methods
- 16.4 使用 SQLite 编写自定义函数 Writing custom functions with SQLite
- 16.5 日期数学 Date math
十六 骇客 Hacks
http://docs.peewee-orm.com/en/latest/peewee/hacks.html
使用 peewee 收集的骇客。你有一个很酷的骇客想要分享吗?在 GitHub 上打开问题或联系我。
16.1 乐观锁定 Optimistic Locking
乐观锁定在您可能通常使用SELECT FOR UPDATE(或在 SQLite 中,BEGIN IMMEDIATE)的情况下很有用 。例如,您可以从数据库中获取用户记录,进行一些修改,然后保存修改后的用户记录。通常,这种情况需要我们在事务期间锁定用户记录,从我们选择它的那一刻到我们保存更改的那一刻。
另一方面,在乐观锁定中,我们不获取任何锁,而是依赖于我们正在修改的行中的内部版本列。在读取时,我们会看到该行当前的版本,在保存时,我们确保仅当版本与我们最初读取的版本相同时才会进行更新。如果版本更高,则必须有其他进程潜入并更改了行 - 保存我们修改后的版本可能会导致丢失重要更改。
在 Peewee 中实现乐观锁定非常简单,这里有一个基类,您可以将其用作起点:
from peewee import *class ConflictDetectedException(Exception): passclass BaseVersionedModel(Model):version = IntegerField(default=1, index=True)def save_optimistic(self):if not self.id:# This is a new record, so the default logic is to perform an# INSERT. Ideally your model would also have a unique# constraint that made it impossible for two INSERTs to happen# at the same time.return self.save()# Update any data that has changed and bump the version counter.field_data = dict(self.__data__)current_version = field_data.pop('version', 1)self._populate_unsaved_relations(field_data)field_data = self._prune_fields(field_data, self.dirty_fields)if not field_data:raise ValueError('No changes have been made.')ModelClass = type(self)field_data['version'] = ModelClass.version + 1 # Atomic increment.query = ModelClass.update(**field_data).where((ModelClass.version == current_version) &(ModelClass.id == self.id))if query.execute() == 0:# No rows were updated, indicating another process has saved# a new version. How you handle this situation is up to you,# but for simplicity I'm just raising an exception.raise ConflictDetectedException()else:# Increment local version to match what is now in the db.self.version += 1return True
这是一个如何工作的示例。假设我们有以下模型定义。请注意,用户名有一个唯一约束——这很重要,因为它提供了一种防止双插入的方法。
class User(BaseVersionedModel):username = CharField(unique=True)favorite_animal = CharField()
例子:
>>> u = User(username='charlie', favorite_animal='cat')
>>> u.save_optimistic()
True>>> u.version
1>>> u.save_optimistic()
Traceback (most recent call last):File "<stdin>", line 1, in <module>File "x.py", line 18, in save_optimisticraise ValueError('No changes have been made.')
ValueError: No changes have been made.>>> u.favorite_animal = 'kitten'
>>> u.save_optimistic()
True# Simulate a separate thread coming in and updating the model.
>>> u2 = User.get(User.username == 'charlie')
>>> u2.favorite_animal = 'macaw'
>>> u2.save_optimistic()
True# Now, attempt to change and re-save the original instance:
>>> u.favorite_animal = 'little parrot'
>>> u.save_optimistic()
Traceback (most recent call last):File "<stdin>", line 1, in <module>File "x.py", line 30, in save_optimisticraise ConflictDetectedException()
ConflictDetectedException: current version is out of sync
16.2 每组顶部对象 Top object per group
这些示例描述了查询每个组的单个顶部项目的几种方法。有关各种技术的全面讨论,请查看我的博客文章Querying the top item by group with Peewee ORM。如果您对查询前N个项目的更一般的问题感兴趣,请参阅每组前 N 个对象下面的部分。
在这些示例中,我们将使用User和Tweet模型来查找每个用户及其最近的推文。
我在测试中发现的最有效的方法是使用MAX()聚合函数。
我们将在非相关子查询中执行聚合,因此我们可以确信此方法将是高效的。我们的想法是,我们将选择按作者分组的帖子,其时间戳等于该用户观察到的最大时间戳。
# When referencing a table multiple times, we'll call Model.alias() to create
# a secondary reference to the table.
TweetAlias = Tweet.alias()# Create a subquery that will calculate the maximum Tweet created_date for each
# user.
subquery = (TweetAlias.select(TweetAlias.user,fn.MAX(TweetAlias.created_date).alias('max_ts')).group_by(TweetAlias.user).alias('tweet_max_subquery'))# Query for tweets and join using the subquery to match the tweet's user
# and created_date.
query = (Tweet.select(Tweet, User).join(User).switch(Tweet).join(subquery, on=((Tweet.created_date == subquery.c.max_ts) &(Tweet.user == subquery.c.user_id))))
SQLite 和 MySQL 稍微宽松一些,允许按所选列的子集进行分组。这意味着我们可以取消子查询并非常简洁地表达它:
query = (Tweet.select(Tweet, User).join(User).group_by(Tweet.user).having(Tweet.created_date == fn.MAX(Tweet.created_date)))
16.3 每组前 N 个对象 Top N objects per group
这些示例描述了几种合理有效地查询每组前N个项目的方法。有关各种技术的全面讨论,请查看我的博客文章Querying the top N objects per group with Peewee ORM。
在这些示例中,我们将使用User和Tweet模型来查找每个用户及其三个最近的推文。
16.3.1 Postgres 横向连接 Postgres lateral joins
横向连接是一个简洁的 Postgres 功能,允许相当有效的相关子查询。它们通常被描述为 SQL循环。for each
所需的 SQL 是:
SELECT * FROM(SELECT id, username FROM user) AS uqLEFT JOIN LATERAL(SELECT message, created_dateFROM tweetWHERE (user_id = uq.id)ORDER BY created_date DESC LIMIT 3)AS pq ON true
使用 peewee 实现这一点非常简单:
subq = (Tweet.select(Tweet.message, Tweet.created_date).where(Tweet.user == User.id).order_by(Tweet.created_date.desc()).limit(3))query = (User.select(User, subq.c.content, subq.c.created_date).join(subq, JOIN.LEFT_LATERAL).order_by(User.username, subq.c.created_date.desc()))# We queried from the "perspective" of user, so the rows are User instances
# with the addition of a "content" and "created_date" attribute for each of
# the (up-to) 3 most-recent tweets for each user.
for row in query:print(row.username, row.content, row.created_date)
要从 Tweet 模型的“视角”实现等效查询,我们可以改为:
# subq is the same as the above example.
subq = (Tweet.select(Tweet.message, Tweet.created_date).where(Tweet.user == User.id).order_by(Tweet.created_date.desc()).limit(3))query = (Tweet.select(User.username, subq.c.content, subq.c.created_date).from_(User).join(subq, JOIN.LEFT_LATERAL).order_by(User.username, subq.c.created_date.desc()))# Each row is a "tweet" instance with an additional "username" attribute.
# This will print the (up-to) 3 most-recent tweets from each user.
for tweet in query:print(tweet.username, tweet.content, tweet.created_date)
16.3.2 窗口函数 Window functions
peewee 支持的窗口函数提供可扩展、高效的性能。
所需的 SQL 是:
SELECT subq.message, subq.username
FROM (SELECTt2.message,t3.username,RANK() OVER (PARTITION BY t2.user_idORDER BY t2.created_date DESC) AS rnkFROM tweet AS t2INNER JOIN user AS t3 ON (t2.user_id = t3.id)
) AS subq
WHERE (subq.rnk <= 3)
为了用 peewee 实现这一点,我们将把排名过的推文包装在一个执行过滤的外部查询中。
TweetAlias = Tweet.alias()# The subquery will select the relevant data from the Tweet and
# User table, as well as ranking the tweets by user from newest
# to oldest.
subquery = (TweetAlias.select(TweetAlias.message,User.username,fn.RANK().over(partition_by=[TweetAlias.user],order_by=[TweetAlias.created_date.desc()]).alias('rnk')).join(User, on=(TweetAlias.user == User.id)).alias('subq'))# Since we can't filter on the rank, we are wrapping it in a query
# and performing the filtering in the outer query.
query = (Tweet.select(subquery.c.message, subquery.c.username).from_(subquery).where(subquery.c.rnk <= 3))
16.3.3 其他方法 Other methods
如果您没有使用 Postgres,那么不幸的是,您的选项会表现出不太理想的性能。有关常用方法的更完整概述,请查看此博客文章。下面我将总结方法和相应的SQL。
使用COUNT,我们可以获得所有存在少于N条推文且时间戳较新的推文:
TweetAlias = Tweet.alias()# Create a correlated subquery that calculates the number of
# tweets with a higher (newer) timestamp than the tweet we're
# looking at in the outer query.
subquery = (TweetAlias.select(fn.COUNT(TweetAlias.id)).where((TweetAlias.created_date >= Tweet.created_date) &(TweetAlias.user == Tweet.user)))# Wrap the subquery and filter on the count.
query = (Tweet.select(Tweet, User).join(User).where(subquery <= 3))
我们可以通过执行自连接并在HAVING子句中执行过滤来获得类似的结果:
TweetAlias = Tweet.alias()# Use a self-join and join predicates to count the number of
# newer tweets.
query = (Tweet.select(Tweet.id, Tweet.message, Tweet.user, User.username).join(User).switch(Tweet).join(TweetAlias, on=((TweetAlias.user == Tweet.user) &(TweetAlias.created_date >= Tweet.created_date))).group_by(Tweet.id, Tweet.content, Tweet.user, User.username).having(fn.COUNT(Tweet.id) <= 3))
最后一个示例LIMIT在相关子查询中使用子句。
TweetAlias = Tweet.alias()# The subquery here will calculate, for the user who created the
# tweet in the outer loop, the three newest tweets. The expression
# will evaluate to `True` if the outer-loop tweet is in the set of
# tweets represented by the inner query.
query = (Tweet.select(Tweet, User).join(User).where(Tweet.id << (TweetAlias.select(TweetAlias.id).where(TweetAlias.user == Tweet.user).order_by(TweetAlias.created_date.desc()).limit(3))))
16.4 使用 SQLite 编写自定义函数 Writing custom functions with SQLite
SQLite 很容易使用用 Python 编写的自定义函数进行扩展,然后可以从您的 SQL 语句中调用这些函数。通过使用SqliteExtDatabase和func()装饰器,您可以非常轻松地定义自己的函数。
这是一个示例函数,它生成用户提供的密码的散列版本。我们还可以使用它来实现login匹配用户和密码的功能。
from hashlib import sha1
from random import random
from playhouse.sqlite_ext import SqliteExtDatabasedb = SqliteExtDatabase('my-blog.db')def get_hexdigest(salt, raw_password):data = salt + raw_passwordreturn sha1(data.encode('utf8')).hexdigest()@db.func()
def make_password(raw_password):salt = get_hexdigest(str(random()), str(random()))[:5]hsh = get_hexdigest(salt, raw_password)return '%s$%s' % (salt, hsh)@db.func()
def check_password(raw_password, enc_password):salt, hsh = enc_password.split('$', 1)return hsh == get_hexdigest(salt, raw_password)
以下是如何使用该函数添加新用户并存储散列密码:
query = User.insert(username='charlie',password=fn.make_password('testing')).execute()
如果我们从数据库中检索用户,则存储的密码会经过哈希处理和加盐处理:
>>> user = User.get(User.username == 'charlie')
>>> print(user.password)
b76fa$88be1adcde66a1ac16054bc17c8a297523170949
要实现login-type 功能,您可以编写如下内容:
def login(username, password):try:return (User.select().where((User.username == username) &(fn.check_password(password, User.password) == True)).get())except User.DoesNotExist:# Incorrect username and/or password.return False
16.5 日期数学 Date math
Peewee 支持的每个数据库都为日期/时间算术实现了自己的一组函数和语义。
本节将提供一个简短的场景和示例代码,演示如何利用 Peewee 在 SQL 中进行动态日期操作。
场景:我们需要每隔X秒运行一次某些任务,并且任务间隔和任务本身都在数据库中定义。我们需要编写一些代码来告诉我们应该在给定时间运行哪些任务:
class Schedule(Model):interval = IntegerField() # Run this schedule every X seconds.class Task(Model):schedule = ForeignKeyField(Schedule, backref='tasks')command = TextField() # Run this command.last_run = DateTimeField() # When was this run last?
我们的逻辑基本上可以归结为:
# e.g., if the task was last run at 12:00:05, and the associated interval
# is 10 seconds, the next occurrence should be 12:00:15. So we check
# whether the current time (now) is 12:00:15 or later.
now >= task.last_run + schedule.interval
所以我们可以编写如下代码:
next_occurrence = something # ??? how do we define this ???# We can express the current time as a Python datetime value, or we could
# alternatively use the appropriate SQL function/name.
now = Value(datetime.datetime.now()) # Or SQL('current_timestamp'), e.g.query = (Task.select(Task, Schedule).join(Schedule).where(now >= next_occurrence))
对于 Postgresql,我们将多个静态 1 秒间隔来动态计算偏移量:
second = SQL("INTERVAL '1 second'")
next_occurrence = Task.last_run + (Schedule.interval * second)
对于 MySQL,我们可以直接引用计划的时间间隔:
from peewee import NodeList # Needed to construct sql entity.interval = NodeList((SQL('INTERVAL'), Schedule.interval, SQL('SECOND')))
next_occurrence = fn.date_add(Task.last_run, interval)
对于 SQLite,事情有点棘手,因为 SQLite 没有专用的日期时间类型。因此对于 SQLite,我们转换为 unix 时间戳,添加计划秒数,然后转换回可比较的日期时间表示:
next_ts = fn.strftime('%s', Task.last_run) + Schedule.interval
next_occurrence = fn.datetime(next_ts, 'unixepoch')