当前位置: 首页 > news >正文

note-Redis实战6 核心-构建应用程序组件2

助记提要
  1. 任务队列实现
  2. 同一队列兼容多类任务
  3. 实现任务优先级(加队列)
  4. 实现延迟任务
  5. 消息推送和消息拉取
  6. 多接收者的发送与订阅(实现群组聊天)
  7. 基于群组聊天实现文件分发(上传、接收、处理)

6章 构建应用程序组件2

构建常用组件:任务队列、延迟消息传递、文件分发

6.4 任务队列

把待执行的任务放入队列,并在之后对队列进行处理,用户可以推迟执行那些需要一段时间才能完成的操作。

先进先出队列

需求:发送电子邮件

数据结构说明内容
列表邮件发送队列queue:email内容是JSON序列化后的发送邮件的信息

将待发邮件推入队列

def send_email_via_queue(conn, seller, item, price, buyer):data = {'seller_id': seller,'item_id': item,'price': price,'buyer_id': buyer,'time': time.time()}conn.rpush('queue:email', json.dump(data))

取出待发邮件并发送

def process_email_queue(conn):while not QUIT:# 从队列取邮件packed = conn.blpop(['queue:email'], 30)# 队列内没有待发邮件if not packed:continueto_send = json.loads(packed)try:fetch_data_and_send_email(to_send)except EmailSendError as err:log_error("Failed to send email", err, to_send)else:log_success("Send email", to_send)

这个进程除了发邮件外没有其他任务,所以使用阻塞版本的弹出命令BLPOP。

多个可执行任务

每种任务单独使用一个队列很常见。有时一个队列也能够处理多种不同类型的任务。

# 一个队列有多个任务类型,使用不同回调函数处理
def worker_watch_queue(conn, queue, callbacks):while not QUIT:packed = conn.blpop([queue], 30)if not packed:continue# 解码任务信息,得到函数名和函数参数name, args = json.loads(packed[1])# 没有找到任务指定的回调函数if name not in callbacks:log_error("Unknown callback %s" % name)continue# 执行任务callbacks[name](*args)
多个优先级队列

对于拥有不同的优先级的任务,可以同时使用多个队列。
Redis的BLPOP和BRPOP命令可以接收多个列表进行弹出操作。

def worker_watch_queues(conn, queues, callbacks):while not QUIT:packed = conn.blpop(queues, 30)if not packed:continuename, args = json.loads(packed[1])if name not in callbacks:log_error("Unknown callback %s" % name)continuecallbacks[name](*args)

多队列有时会用于分割不同的任务,这时处理这些任务会出现不公平的现象。可以偶尔重排列各个队列的顺序,使其更公平一些。

延迟任务

不立即执行,而是等到某一时刻才开始执行任务。

创建延迟任务的3种方式:

  1. 任务信息中包含任务的执行时间,工作进程发现执行时间未到的话,就把任务重新推到队列;
  2. 工作进程使用本地列表记录延迟任务,每次循环检查这个列表并执行到期的任务;
  3. 使用有序集合,把执行时间设为分值。一个单独的进程查找是否存在可以被执行的任务,有的话就从有序集合移除并加到对应的任务队列里面。

其中方式1会浪费工作进程的时间;方式2在工作进程崩溃时会失去本地记录的信息。

def execute_later(conn, queue, name, args, delay=0):identifier = str(uuid.uuid4())# 准备任务信息,标识符、队列名、回调函数名、参数item = json.dumps([identifier, queue, name, args])if delay > 0:# 延迟执行,加入有序集合,任务信息作为成员conn.zadd('delayed:', item, time.time() + delay)else:# 立即执行conn.rpush('queue:' + queue, item)return identifier

从延迟队列获取可执行任务

def pool_queue(conn):while not QUIT:# 获取队列中的第一个任务item = conn.zrange('delayed:', 0, 0, withscores=True)if not item or item[0][1] > time.time():# 队列中无任务,或任务未到执行时间time.sleep(.01)continue# 解码需要被执行的任务item = item[0][0]identifier, queue, function, args = json.loads(item)# 获取锁,失败则重试locked = acquire_lock(conn, identifier)if not locked:continue# 把任务推入对应的任务队列if conn.zrem('delayed:', item):conn.rpush('queue:' + queue, item)release_lock(conn, identifier, locked)

只需要获取有序集合里排在第一个的元素即可。

优先执行延迟任务
延迟任务到期后,会被推到普通任务队列中。
如果想让这些延迟任务到期后优先执行,可以添加多个队列。高、中、低三个队列,可以对应创建“延迟高”、“延迟中”、“延迟低”队列,并按['high-delayed', 'high', 'medium-delayed', 'medium', 'low-delayed', 'low']的顺序传给worker_watch_queues函数。

不能使用lpush直接加到队列前面,因为进程在执行某个费时的任务期间,有多个延迟任务到期了,它们会被依次lpush到队列头部,这会导致最后被推入的任务被最先执行,不符合先到先执行的规则。

6.5 消息拉取

消息推送和消息拉取

多个客户端之间,有两种互相传递消息的方式:

  1. 消息推送。由发送者确保所有接收者已成功收到消息。
  2. 消息拉取。接收者自己去获取发送者存储在某处的信息。

当客户端由于某些原因没办法保持在线的时候,使用消息推送的程序会出现各种问题。

Redis用于消息推送的publish和subscribe命令存在这个问题。需要另外实现不遗漏的消息传递。

单接收者的发送与订阅

如果每条消息都只会被发送到一个客户端。

可以为每个客户端设置一个接收列表,发送者把消息放到接收者的列表里面。
接收者客户端通过向服务器发送请求来获取最新的消息。

多接收者的发送与订阅

群组聊天需要进行多接收者的消息传递。

数据结构说明内容
字符串当前最大群组idids:chat:每次新建群组,自增来生成群组id
字符串指定群组的最大消息idids:群组id群组中有新消息时,自增来生成消息的id
有序集合群组消息msgs:群组id成员是消息内容的JSON序列,分值是消息id
有序集合群组chat:群组id成员是群内的用户id,分值是用户在这个群组内已接收的最大消息id
有序集合已读信息seen:用户id成员是用户加入的各个群组的id,分值是用户在这个群组已读的最大消息id

创建群组

def create_chat(conn, sender, recipients, message, chat_id=None):# 获取新的群组idchat_id = chat_id or str(conn.incr('ids:chat:'))# 群组的用户和初始分值recipients.append(sender)recipientsd = dict((r, 0) for r in recipients)pipeline = conn.pipeline(True)# 参与的用户加到群组有序集合pipeline.zadd('chat:' + chat_id, **recipientsd)# 初始化已读有序集合for rec in recipients:pipeline.zadd('seen:' + rec, chat_id, 0)pipeline.execute()# 发送消息return send_message(conn, chat_id, sender, message)

发送消息

def send_message(conn, chat_id, sender, message):identifier = acquire_lock(conn, 'chat:' + chat_id)if not identifier:raise Exception("Couldn't get the lock")try:mid = conn.incr('ids:' + chat_id)ts = time.time()packed = json.dumps({'id': mid,'ts': ts,'sender': sender,'message': message,})# 把消息发到群组conn.zadd('msgs:' + chat_id, packed, mid)finally:release_lock(conn, 'chat:' + chat_id, identifier)return chat_id

当使用一个来自Redis的值取构建另一个要添加到Redis的值时,就需要使用锁来消除竞争条件。

获取用户的未读消息

def fetch_pending_messages(conn, recipient):seen = conn.zrange('seen:' + recipient, 0, -1, withscores=True)pipeline = conn.pipeline(True)for chat_id, seen_id in seen:# 取未读消息pipeline.zrangebysscore('msgs:' + chat_id, seen_id+1, 'inf')chat_info = zip(seen, pipeline.execute())for i, ((chat_id, seen_id), messages) in enumerate(chat_info):if not messages:continuemessages[:] = map(json.loads, messages)# 更新群组有序集合seen_id = messages[-1]['id']conn.zadd('chat:' + chat_id, recipient, seen_id)# 更新已读消息有序集合pipeline.zadd('seen:' + recipient, chat_id, seen_id)# 清除已被所有人阅读过的消息min_id = conn.zrange('chat:' + chat_id, 0, 0, withscores=True)if min_id:pipeline.zremrangebyscore('msgs:' + chat_id, 0, min_id[0][1])chat_info[i] = (chat_id, messages)pipeline.execute()return chat_info

加入群组

def join_chat(conn, chat_id, user):# 目标群组的最新消息的idmessage_id = int(conn.get('ids:' + chat_id))pipeline = conn.pipeline(True)# 加到群组成员pipeline.zadd('chat:' + chat_id, user, message_id)# 群组添加到用户的已读列表中pipeline.zadd('seen:' + user, chat_id, message_id)pipeline.execute()

离开群组

def leave_chat(conn, chat_id, user):pipeline = conn.pipeline(True)pipeline.zrem('chat:' + chat_id, user)pipeline.zrem('seen:' + user, chat_id)# 查看剩余成员数pipeline.zcard('chat:' + chat_id)# 删除没有成员的群组if not pipeline.execute()[-1]:pipeline.delete('msgs:' + chat_id)pipeline.delete('ids:' + chat_id)pipeline.execute()# 清理已被所有成员阅读过的消息else:oldest = conn.zrange('chat:' + chat_id, 0, 0, withscores=True)conn.zremrangebyscore('msgs:' + chat_id, 0, oldest[0][1])

6.6 文件分发

分布式系统经常需要再多台机器上复制、分发或处理数据文件。
需求:巨大的日志文件需要通过多台服务器分析处理。

上传日志文件

基本思路:将日志文件存到redis里面,然后多个客户端从Redis拉取日志进行处理。

生成日志的服务器作为发送者,会有多个日志处理服务器作为接收者,可以使用之前的群组实现来分发文件。

def copy_logs_to_redis(conn, path, channel, count=10, limit=2**30, quit_when_done=True):# channel为群组id,count为聚合进程的数量bytes_in_redis = 0waiting = deque()# 创建向客户端发送消息的群组create_chat(conn, 'source', map(str, range(count)), '', channel)count = str(count)# 遍历所有日志文件for logfile in sorted(os.listdir(path)):full_path = os.path.join(path, logfile)fsize = os.stat(full_path).st_sizewhile bytes_in_redis + fsize > limit:cleaned = _clean(conn, channel, waiting, count)if cleaned:bytes_in_redis -= cleanedelse:time.sleep(.25)# 把文件上传到Rediswith open(full_path, 'rb') as inp:block = ''while block:block = inp.read(2**17)conn.append(channel + logfile, block)# 提醒接收者文件可接收send_message(conn, channel, 'source', logfile)bytes_in_redis += fsizewaiting.append((logfile, fsize))# 所有文件处理完后提醒接收者if quit_when_done:send_message(conn, channel, 'source', ':done')while waiting:cleaned = _clean(conn, channel, waiting, count)if cleaned:bytes_in_redis -= cleanedelse:time.sleep(.25)# 对redis进行清理def _clean(conn, channel, waiting, count):if not waiting:return 0w0 = waiting[0][0]if conn.get(channel + w0 + ':done') == count:conn.delete(channel + w0, channel + w0 + ':done')return waiting.popleft()[1]return 0

上传的时候不能一次性推入过多数据,并且在客户端读取完后,要清理数据。

接收日志文件

按固定大小接收文件,交给回调函数处理。

def process_logs_from_redis(conn, id, callback):while 1:fdata = fetch_pending_messages(conn, id)for ch, mdata in fdata:for message in mdata:logfile = message['message']# 日志行全部处理完if logfile == ':done':return elif not logfile:continue# 选择读取器block_reader = readblocksif logfile.endswith('.gz'):block_reader = readblocks_gz# 回调函数处理日志行for line in readlines(conn, ch+logfile, block_reader):callback(conn, line)# 强制刷新聚合数据缓存callback(conn, line)# 日志处理完毕,告知文件发送者conn.incr(ch + logfile + ':done')if not fdata:time.sleep(.1)
处理日志文件

从数据块中读取行的程序

def readlines(conn, key, rblocks):out = ''for block in rblocks(conn, key):out += block# 找文末的换行符。有换行符,即有可处理的完整行posn = out.rfind('\n')if posn >= 0:# 返回每行for line in out[:posn].split('\n'):yield line + '\n'# 余下的非整行数据到下一次循环中处理out = out[posn+1:]# 处理完毕if not block:yield outbreak

读取数据块的函数

def readblocks(conn, key, blocksize=2**17):lb = blocksizepos = 0# 按块长度读取数据块,直到数据不够完整的一个数据块while lb == blocksize:block = conn.substr(key, pos, pos + blocksize - 1)yield blocklb = len(block)pos += lbyield ''

readblocks函数把用于读取数据块的操作抽象,可以使用其他类型的读取器来代替它。

读取gzip存储的块

def readblocks_gz(conn, key):inp = ''decoder = Nonefor block in readblocks(conn, key, 2**17):if not decoder:inp += blocktry:# 分析头信息,以便解压数据if inp[:3] != '\xlf\x8b\x08':raise IOError("invalid gzip data")i = 10flag = ord(inp[3])if flag & 4:i += 2 + ord(inp[i]) + 256*ord(inp[i+1])if flag & 8:i = inp.index('\0', i) + 1if flag & 16:i = inp.index('\0', i) + 1if flag & 2:i += 2# 头信息不完整if i > len(inp):raise IndexError("not enough data")except (IndexError, ValueError):continueelse:# 解压程序block = inp[i:]inp = Nonedecoder = zlib.decompressobj(-zlib.MAX_WBITS)if not block:continue# 数据处理完毕,返回剩下的数据块if not block:yield decoder.flush()break# 返回解压后的数据块yield decoder.decompress(block)

执行聚合计算的回调函数callback的实例,统计每天每个国家有多少次访问。统计结果会上传给Redis。

aggregates = defaultdict(lambda: defaultdict(int))def daily_country_aggregate(conn, line):if line:# 提取日志行中的信息line = line.split()ip = line[0]day = line[1]# 按IP判断用户国家country = find_city_by_ip_local(ip)[2]aggregates[day][country] += 1return# 当天日志处理完,把聚合计算的结果写入Redisfor day, aggregate in aggregates.items():conn.zadd('daily:country:' + day, **aggregate)del aggregates[day]

http://www.mrgr.cn/news/36871.html

相关文章:

  • 云手机的默认ip地址是什么
  • Cgroup介绍
  • p,div等里面支持br换行 对应后台换过来的textarea的富文本内容
  • 书生大模型实战营学习[7] InternLM + LlamaIndex RAG 实践
  • 集翼智能视频营销管理平台 丨OPENAIGC开发者大赛企业组AI创作力奖
  • 【CAS框架自定义登录异常提示-固定时间内限制登录失败次数提醒】
  • 手机也可以更换任意IP地址吗?
  • 无人机之物流货运篇
  • Rust编程的if选择语句
  • 通过Sovit2D在ARMxy边缘计算网关上实现工艺控制
  • Vue3+Vite中引用Swiper11自动轮播、左右切换不生效,已解决
  • AI中医香方仪丨OPENAIGC开发者大赛企业组AI创作力奖
  • RS485通信(串口通信)超时模式与固定字节接收模式(不定长数据包、长度固定数据包)
  • 如何在谷歌浏览器上玩大型多人在线游戏
  • SQL Server的文本和图像函数
  • 移动CRM应用排名
  • Spring Web MVC课后作业
  • 吉林大学微机接口实验五:D/A转换
  • OpenKylin--解压文件
  • pycirclize python包画circos环形图