note-Redis实战6 核心-构建应用程序组件2
助记提要
- 任务队列实现
- 同一队列兼容多类任务
- 实现任务优先级(加队列)
- 实现延迟任务
- 消息推送和消息拉取
- 多接收者的发送与订阅(实现群组聊天)
- 基于群组聊天实现文件分发(上传、接收、处理)
6章 构建应用程序组件2
构建常用组件:任务队列、延迟消息传递、文件分发
6.4 任务队列
把待执行的任务放入队列,并在之后对队列进行处理,用户可以推迟执行那些需要一段时间才能完成的操作。
先进先出队列
需求:发送电子邮件
数据结构 | 说明 | 键 | 内容 |
---|---|---|---|
列表 | 邮件发送队列 | queue:email | 内容是JSON序列化后的发送邮件的信息 |
将待发邮件推入队列
def send_email_via_queue(conn, seller, item, price, buyer):data = {'seller_id': seller,'item_id': item,'price': price,'buyer_id': buyer,'time': time.time()}conn.rpush('queue:email', json.dump(data))
取出待发邮件并发送
def process_email_queue(conn):while not QUIT:# 从队列取邮件packed = conn.blpop(['queue:email'], 30)# 队列内没有待发邮件if not packed:continueto_send = json.loads(packed)try:fetch_data_and_send_email(to_send)except EmailSendError as err:log_error("Failed to send email", err, to_send)else:log_success("Send email", to_send)
这个进程除了发邮件外没有其他任务,所以使用阻塞版本的弹出命令BLPOP。
多个可执行任务
每种任务单独使用一个队列很常见。有时一个队列也能够处理多种不同类型的任务。
# 一个队列有多个任务类型,使用不同回调函数处理
def worker_watch_queue(conn, queue, callbacks):while not QUIT:packed = conn.blpop([queue], 30)if not packed:continue# 解码任务信息,得到函数名和函数参数name, args = json.loads(packed[1])# 没有找到任务指定的回调函数if name not in callbacks:log_error("Unknown callback %s" % name)continue# 执行任务callbacks[name](*args)
多个优先级队列
对于拥有不同的优先级的任务,可以同时使用多个队列。
Redis的BLPOP和BRPOP命令可以接收多个列表进行弹出操作。
def worker_watch_queues(conn, queues, callbacks):while not QUIT:packed = conn.blpop(queues, 30)if not packed:continuename, args = json.loads(packed[1])if name not in callbacks:log_error("Unknown callback %s" % name)continuecallbacks[name](*args)
多队列有时会用于分割不同的任务,这时处理这些任务会出现不公平的现象。可以偶尔重排列各个队列的顺序,使其更公平一些。
延迟任务
不立即执行,而是等到某一时刻才开始执行任务。
创建延迟任务的3种方式:
- 任务信息中包含任务的执行时间,工作进程发现执行时间未到的话,就把任务重新推到队列;
- 工作进程使用本地列表记录延迟任务,每次循环检查这个列表并执行到期的任务;
- 使用有序集合,把执行时间设为分值。一个单独的进程查找是否存在可以被执行的任务,有的话就从有序集合移除并加到对应的任务队列里面。
其中方式1会浪费工作进程的时间;方式2在工作进程崩溃时会失去本地记录的信息。
def execute_later(conn, queue, name, args, delay=0):identifier = str(uuid.uuid4())# 准备任务信息,标识符、队列名、回调函数名、参数item = json.dumps([identifier, queue, name, args])if delay > 0:# 延迟执行,加入有序集合,任务信息作为成员conn.zadd('delayed:', item, time.time() + delay)else:# 立即执行conn.rpush('queue:' + queue, item)return identifier
从延迟队列获取可执行任务
def pool_queue(conn):while not QUIT:# 获取队列中的第一个任务item = conn.zrange('delayed:', 0, 0, withscores=True)if not item or item[0][1] > time.time():# 队列中无任务,或任务未到执行时间time.sleep(.01)continue# 解码需要被执行的任务item = item[0][0]identifier, queue, function, args = json.loads(item)# 获取锁,失败则重试locked = acquire_lock(conn, identifier)if not locked:continue# 把任务推入对应的任务队列if conn.zrem('delayed:', item):conn.rpush('queue:' + queue, item)release_lock(conn, identifier, locked)
只需要获取有序集合里排在第一个的元素即可。
优先执行延迟任务
延迟任务到期后,会被推到普通任务队列中。
如果想让这些延迟任务到期后优先执行,可以添加多个队列。高、中、低三个队列,可以对应创建“延迟高”、“延迟中”、“延迟低”队列,并按['high-delayed', 'high', 'medium-delayed', 'medium', 'low-delayed', 'low']
的顺序传给worker_watch_queues
函数。
不能使用lpush直接加到队列前面,因为进程在执行某个费时的任务期间,有多个延迟任务到期了,它们会被依次lpush到队列头部,这会导致最后被推入的任务被最先执行,不符合先到先执行的规则。
6.5 消息拉取
消息推送和消息拉取
多个客户端之间,有两种互相传递消息的方式:
- 消息推送。由发送者确保所有接收者已成功收到消息。
- 消息拉取。接收者自己去获取发送者存储在某处的信息。
当客户端由于某些原因没办法保持在线的时候,使用消息推送的程序会出现各种问题。
Redis用于消息推送的publish和subscribe命令存在这个问题。需要另外实现不遗漏的消息传递。
单接收者的发送与订阅
如果每条消息都只会被发送到一个客户端。
可以为每个客户端设置一个接收列表,发送者把消息放到接收者的列表里面。
接收者客户端通过向服务器发送请求来获取最新的消息。
多接收者的发送与订阅
群组聊天需要进行多接收者的消息传递。
数据结构 | 说明 | 键 | 内容 |
---|---|---|---|
字符串 | 当前最大群组id | ids:chat: | 每次新建群组,自增来生成群组id |
字符串 | 指定群组的最大消息id | ids:群组id | 群组中有新消息时,自增来生成消息的id |
有序集合 | 群组消息 | msgs:群组id | 成员是消息内容的JSON序列,分值是消息id |
有序集合 | 群组 | chat:群组id | 成员是群内的用户id,分值是用户在这个群组内已接收的最大消息id |
有序集合 | 已读信息 | seen:用户id | 成员是用户加入的各个群组的id,分值是用户在这个群组已读的最大消息id |
创建群组
def create_chat(conn, sender, recipients, message, chat_id=None):# 获取新的群组idchat_id = chat_id or str(conn.incr('ids:chat:'))# 群组的用户和初始分值recipients.append(sender)recipientsd = dict((r, 0) for r in recipients)pipeline = conn.pipeline(True)# 参与的用户加到群组有序集合pipeline.zadd('chat:' + chat_id, **recipientsd)# 初始化已读有序集合for rec in recipients:pipeline.zadd('seen:' + rec, chat_id, 0)pipeline.execute()# 发送消息return send_message(conn, chat_id, sender, message)
发送消息
def send_message(conn, chat_id, sender, message):identifier = acquire_lock(conn, 'chat:' + chat_id)if not identifier:raise Exception("Couldn't get the lock")try:mid = conn.incr('ids:' + chat_id)ts = time.time()packed = json.dumps({'id': mid,'ts': ts,'sender': sender,'message': message,})# 把消息发到群组conn.zadd('msgs:' + chat_id, packed, mid)finally:release_lock(conn, 'chat:' + chat_id, identifier)return chat_id
当使用一个来自Redis的值取构建另一个要添加到Redis的值时,就需要使用锁来消除竞争条件。
获取用户的未读消息
def fetch_pending_messages(conn, recipient):seen = conn.zrange('seen:' + recipient, 0, -1, withscores=True)pipeline = conn.pipeline(True)for chat_id, seen_id in seen:# 取未读消息pipeline.zrangebysscore('msgs:' + chat_id, seen_id+1, 'inf')chat_info = zip(seen, pipeline.execute())for i, ((chat_id, seen_id), messages) in enumerate(chat_info):if not messages:continuemessages[:] = map(json.loads, messages)# 更新群组有序集合seen_id = messages[-1]['id']conn.zadd('chat:' + chat_id, recipient, seen_id)# 更新已读消息有序集合pipeline.zadd('seen:' + recipient, chat_id, seen_id)# 清除已被所有人阅读过的消息min_id = conn.zrange('chat:' + chat_id, 0, 0, withscores=True)if min_id:pipeline.zremrangebyscore('msgs:' + chat_id, 0, min_id[0][1])chat_info[i] = (chat_id, messages)pipeline.execute()return chat_info
加入群组
def join_chat(conn, chat_id, user):# 目标群组的最新消息的idmessage_id = int(conn.get('ids:' + chat_id))pipeline = conn.pipeline(True)# 加到群组成员pipeline.zadd('chat:' + chat_id, user, message_id)# 群组添加到用户的已读列表中pipeline.zadd('seen:' + user, chat_id, message_id)pipeline.execute()
离开群组
def leave_chat(conn, chat_id, user):pipeline = conn.pipeline(True)pipeline.zrem('chat:' + chat_id, user)pipeline.zrem('seen:' + user, chat_id)# 查看剩余成员数pipeline.zcard('chat:' + chat_id)# 删除没有成员的群组if not pipeline.execute()[-1]:pipeline.delete('msgs:' + chat_id)pipeline.delete('ids:' + chat_id)pipeline.execute()# 清理已被所有成员阅读过的消息else:oldest = conn.zrange('chat:' + chat_id, 0, 0, withscores=True)conn.zremrangebyscore('msgs:' + chat_id, 0, oldest[0][1])
6.6 文件分发
分布式系统经常需要再多台机器上复制、分发或处理数据文件。
需求:巨大的日志文件需要通过多台服务器分析处理。
上传日志文件
基本思路:将日志文件存到redis里面,然后多个客户端从Redis拉取日志进行处理。
生成日志的服务器作为发送者,会有多个日志处理服务器作为接收者,可以使用之前的群组实现来分发文件。
def copy_logs_to_redis(conn, path, channel, count=10, limit=2**30, quit_when_done=True):# channel为群组id,count为聚合进程的数量bytes_in_redis = 0waiting = deque()# 创建向客户端发送消息的群组create_chat(conn, 'source', map(str, range(count)), '', channel)count = str(count)# 遍历所有日志文件for logfile in sorted(os.listdir(path)):full_path = os.path.join(path, logfile)fsize = os.stat(full_path).st_sizewhile bytes_in_redis + fsize > limit:cleaned = _clean(conn, channel, waiting, count)if cleaned:bytes_in_redis -= cleanedelse:time.sleep(.25)# 把文件上传到Rediswith open(full_path, 'rb') as inp:block = ''while block:block = inp.read(2**17)conn.append(channel + logfile, block)# 提醒接收者文件可接收send_message(conn, channel, 'source', logfile)bytes_in_redis += fsizewaiting.append((logfile, fsize))# 所有文件处理完后提醒接收者if quit_when_done:send_message(conn, channel, 'source', ':done')while waiting:cleaned = _clean(conn, channel, waiting, count)if cleaned:bytes_in_redis -= cleanedelse:time.sleep(.25)# 对redis进行清理def _clean(conn, channel, waiting, count):if not waiting:return 0w0 = waiting[0][0]if conn.get(channel + w0 + ':done') == count:conn.delete(channel + w0, channel + w0 + ':done')return waiting.popleft()[1]return 0
上传的时候不能一次性推入过多数据,并且在客户端读取完后,要清理数据。
接收日志文件
按固定大小接收文件,交给回调函数处理。
def process_logs_from_redis(conn, id, callback):while 1:fdata = fetch_pending_messages(conn, id)for ch, mdata in fdata:for message in mdata:logfile = message['message']# 日志行全部处理完if logfile == ':done':return elif not logfile:continue# 选择读取器block_reader = readblocksif logfile.endswith('.gz'):block_reader = readblocks_gz# 回调函数处理日志行for line in readlines(conn, ch+logfile, block_reader):callback(conn, line)# 强制刷新聚合数据缓存callback(conn, line)# 日志处理完毕,告知文件发送者conn.incr(ch + logfile + ':done')if not fdata:time.sleep(.1)
处理日志文件
从数据块中读取行的程序
def readlines(conn, key, rblocks):out = ''for block in rblocks(conn, key):out += block# 找文末的换行符。有换行符,即有可处理的完整行posn = out.rfind('\n')if posn >= 0:# 返回每行for line in out[:posn].split('\n'):yield line + '\n'# 余下的非整行数据到下一次循环中处理out = out[posn+1:]# 处理完毕if not block:yield outbreak
读取数据块的函数
def readblocks(conn, key, blocksize=2**17):lb = blocksizepos = 0# 按块长度读取数据块,直到数据不够完整的一个数据块while lb == blocksize:block = conn.substr(key, pos, pos + blocksize - 1)yield blocklb = len(block)pos += lbyield ''
readblocks函数把用于读取数据块的操作抽象,可以使用其他类型的读取器来代替它。
读取gzip存储的块
def readblocks_gz(conn, key):inp = ''decoder = Nonefor block in readblocks(conn, key, 2**17):if not decoder:inp += blocktry:# 分析头信息,以便解压数据if inp[:3] != '\xlf\x8b\x08':raise IOError("invalid gzip data")i = 10flag = ord(inp[3])if flag & 4:i += 2 + ord(inp[i]) + 256*ord(inp[i+1])if flag & 8:i = inp.index('\0', i) + 1if flag & 16:i = inp.index('\0', i) + 1if flag & 2:i += 2# 头信息不完整if i > len(inp):raise IndexError("not enough data")except (IndexError, ValueError):continueelse:# 解压程序block = inp[i:]inp = Nonedecoder = zlib.decompressobj(-zlib.MAX_WBITS)if not block:continue# 数据处理完毕,返回剩下的数据块if not block:yield decoder.flush()break# 返回解压后的数据块yield decoder.decompress(block)
执行聚合计算的回调函数callback的实例,统计每天每个国家有多少次访问。统计结果会上传给Redis。
aggregates = defaultdict(lambda: defaultdict(int))def daily_country_aggregate(conn, line):if line:# 提取日志行中的信息line = line.split()ip = line[0]day = line[1]# 按IP判断用户国家country = find_city_by_ip_local(ip)[2]aggregates[day][country] += 1return# 当天日志处理完,把聚合计算的结果写入Redisfor day, aggregate in aggregates.items():conn.zadd('daily:country:' + day, **aggregate)del aggregates[day]