点评项目-9-秒杀业务(加锁)、redis分布式锁
秒杀业务
业务需求1:给库存的判断添加乐观锁
业务需求2:给一人只能下一单的判断,使用分布式锁的方式添加悲观锁
秒杀业务请求路径:/voucher-order/seckill/{id}
乐观锁与悲观锁
悲观锁:认为线程安全一定会发送,因此在操作数据之前先获取锁,确保线程串行执行
乐观锁:不加锁,只在更新数据时判断有没有其他线程对数据做了修改,若发现数据被修改则说明发生了安全问题
对于乐观锁,我们可以使用版本号来判断数据是否被修改,维护一个版本号字段(版本号法);也可以直接使用已有的字段来判断数据是否被修改(CAS法)
库存判断加乐观锁
对于库存的判断,这里使用 CAS 法给秒杀优惠卷的业务添加乐观锁,在更新库存的时候,再次查询库存是否大于 0 即可。
在判断还有库存后,从数据库中将库存 -1,这里我选择添加 Mapper 方法来操作数据库,and stock > 0 是加乐观锁的操作
@Mapper
public interface VoucherSeckillMapper extends BaseMapper<SeckillVoucher> {@Update("update tb_seckill_voucher set stock = stock-1 where voucher_id = #{voucherId} and stock > 0")boolean updateSecStock(@Param("voucherId") Long voucherId);}
测试该方法:
@Autowiredprivate VoucherSeckillMapper voucherSeckillMapper;//扣除秒杀优惠卷库存@Testvoid delStock(){voucherSeckillMapper.updateSecStock(10L);}
查询数据库库存,成功 -1
完成秒杀请求
创建一个订单表的封装类:
/*** 订单表封装bean*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_voucher_order")
public class VoucherOrder implements Serializable {private static final long serialVersionUID = 1L;/*** 主键*/@TableId(value = "id", type = IdType.INPUT)private Long id;/*** 下单的用户id*/private Long userId;/*** 购买的代金券id*/private Long voucherId;/*** 支付方式 1:余额支付;2:支付宝;3:微信*/private Integer payType;/*** 订单状态,1:未支付;2:已支付;3:已核销;4:已取消;5:退款中;6:已退款*/private Integer status;/*** 下单时间*/private LocalDateTime createTime;/*** 支付时间*/private LocalDateTime payTime;/*** 核销时间*/private LocalDateTime useTime;/*** 退款时间*/private LocalDateTime refundTime;/*** 更新时间*/private LocalDateTime updateTime;}
controller:
@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {@Resourceprivate VoucherOrderService voucherOrderService;@PostMapping("/seckill/{id}")public Result orderVoucher(@PathVariable("id") Long voucherId){return voucherOrderService.seckillVoucher(voucherId);}}
Service:
public interface VoucherOrderService {Result seckillVoucher(Long voucherId);
}
@Service
public class VoucherOrderServiceImpl implements VoucherOrderService {@Resourceprivate VoucherSeckillMapper voucherSeckillMapper;//id 生成器@Resourceprivate CreateOnlyId createOnlyId;@Resourceprivate VoucherOrderMapper voucherOrderMapper;@Overridepublic Result seckillVoucher(Long voucherId) {//查询秒杀优惠卷SeckillVoucher seckillVoucher = voucherSeckillMapper.selectById(voucherId);//判断是否在活动时间内if(seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){return Result.fail("活动尚未开始");}if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("活动已经结束");}//查看库存if(seckillVoucher.getStock() < 1){return Result.fail("库存不足");}//扣减库存boolean flag = voucherSeckillMapper.updateSecStock(voucherId);if(!flag){return Result.fail("库存不足");}//创建订单VoucherOrder voucherOrder = new VoucherOrder();long orderKey = createOnlyId.createID("order");voucherOrder.setId(orderKey);voucherOrder.setVoucherId(voucherId);voucherOrder.setUserId(UserHolder.getUser().getId());//将订单存入数据库voucherOrderMapper.insert(voucherOrder);return Result.ok(orderKey);}
}
Mapper:
@Mapper
public interface VoucherOrderMapper extends BaseMapper<VoucherOrder> {}
测试:
注意在发送测试请求时,需要在请求头带上 token
为了避免重复登录,我们手动添加一个永久登录用户到缓存重
//缓存一个永久用的用户登录信息@Testvoid saveUserForever(){Map<String,String> userMap = new HashMap<>();userMap.put("id","1");userMap.put("nickName","管理员");stringRedisTemplate.opsForHash().putAll("login:token:"+"textUser",userMap);}
发送抢秒杀卷请求:
成功抢到卷,库存 -1 ,(自己测了几次,目前还剩余 94张)
接下来,我们使用 jmeter 发送 94 * 2 个抢秒杀卷的请求,测试线程的安全性,若秒杀卷出现负数,则代表超卖了
一人一单加悲观锁
对于一人一单的逻辑判断,由于下单操作无法进行更新,所以只能添加悲观锁
悲观锁:在加锁时,我们将锁加在含有事物提交的方法上,对于方法的调用,会出现没有被 spirng 管理导致事物无法提交的情况,我们需要拿到其 sping 代理对象来调用含有事物提交的方法(需要添加 aspectjweaver 依赖,并在启动类添加 @EnableAspectJAutoProxy(exposeProxy = true) 注解)
对于以上的加锁方式,若在 tomcat 集群中,会因为在不同的服务中导致安全问题出现,且步骤繁琐。我们可以通过 redis 分布式锁的方式解决
redis 分布式锁
在存入锁时,我们使用 UUID 生成 Key
redis 命令
//若没有锁就加一个过期时间为 10 s 的锁,若有锁则操作无效
set lock thread1 NX EX 10
java 代码:
//存入 redis 的keyprivate static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";@Overridepublic boolean tryLock() {// 获取线程标示String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, 10, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}
在释放锁时,我们需要判断要释放的锁和当前线程的锁是否一致,一致才释放,若不一致则直接让本该释放的锁通过 ttl 延迟释放
我们使用 lua 脚本来保证获取锁和删除锁操作的原子性,即同时成功同时失败
--对于 Luq 脚本,若有传参,key 会被放到 KEYS 数组中,其他参数被放到 ARGV 数组中
--对于释放锁的 Luq 脚本
if (redis.call('GET',KEYS[1]) == ARGV[1]) then
return redis.call('DEL',KEYS[1])
end
return 0
将 lua 脚本文件放入 resource 文件夹下(即ymal同级目录),并在锁工具类中初始化后调用
//提前将 lua 脚本中的内容读取出来,避免重复读取private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}@Overridepublic void unLock() {//调用 Lua 脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX+name),ID_PREFIX + Thread.currentThread().getId());}
完整的分布式锁代码:
@Component
public class RedisLock implements ILock{private String name;private StringRedisTemplate stringRedisTemplate;public RedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}//存入 redis 的keyprivate static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";//提前将 lua 脚本中的内容读取出来,避免重复读取private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}@Overridepublic boolean tryLock() {// 获取线程标示String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, 10, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unLock() {//调用 Lua 脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX+name),ID_PREFIX + Thread.currentThread().getId());}
}
使用分布式锁工具完成一人一单的悲观锁设置
Mapper查询用户id和优惠卷id是否已经出现:
@Mapper
public interface VoucherOrderMapper extends BaseMapper<VoucherOrder> {@Select("select count(*) from tb_voucher_order where user_id = #{id} and voucher_id = #{voucherId}")int overBuy(@Param("id") Long id,@Param("voucherId") Long voucherId);}
测试:
@Autowiredprivate VoucherOrderMapper voucherOrderMapper;//测试用户多次购买@Testvoid overBuy(){int i = voucherOrderMapper.overBuy(1L, 10L);System.out.println(i);}
将一人一单的判断放在库存之前,原因是 redis 的查询效率更高
@Service
public class VoucherOrderServiceImpl implements VoucherOrderService {@Resourceprivate VoucherSeckillMapper voucherSeckillMapper;//id 生成器@Resourceprivate CreateOnlyId createOnlyId;@Resourceprivate VoucherOrderMapper voucherOrderMapper;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic Result seckillVoucher(Long voucherId) {//查询秒杀优惠卷SeckillVoucher seckillVoucher = voucherSeckillMapper.selectById(voucherId);//判断是否在活动时间内if(seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){return Result.fail("活动尚未开始");}if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("活动已经结束");}//先判断一人一单逻辑UserDTO user = UserHolder.getUser();RedisLock redisLock = new RedisLock("order:"+user.getId(),stringRedisTemplate);boolean getLock = redisLock.tryLock();//尝试获取锁if(!getLock){//获取锁失败,直接返回失败return Result.fail("一个用户只能购买一单");}//获取锁成功long orderKey = 0;try {//查询当前店铺下的卷是否已经有 user_id 了,有就返回failint count = voucherOrderMapper.overBuy(user.getId(), voucherId);if(count > 0){return Result.fail("一个用户只能购买一单");}//查看库存if(seckillVoucher.getStock() < 1){return Result.fail("库存不足");}//扣减库存boolean flag = voucherSeckillMapper.updateSecStock(voucherId);if(!flag){return Result.fail("库存不足");}//创建订单VoucherOrder voucherOrder = new VoucherOrder();orderKey = createOnlyId.createID("order");voucherOrder.setId(orderKey);voucherOrder.setVoucherId(voucherId);voucherOrder.setUserId(user.getId());//将订单存入数据库voucherOrderMapper.insert(voucherOrder);} catch (Exception e) {throw new RuntimeException();} finally {//释放锁redisLock.unLock();}return Result.ok(orderKey);}
}
测试是否一个人能否在短时间内抢到多个秒杀卷:
发送大量相同用户请求后,只减少了一个库存
订单表的记录也只有一条
不过这样的加锁方式任然有少量的线程安全问题,后续会继续优化分布式锁