05、Redis实战:优惠券秒杀、优化异步下单

6、秒杀优化

6.1 秒杀优化-异步秒杀思路

  • 我们先来回顾一下下单流程
  • 当用户发起请求,此时会先请求Nginx,Nginx反向代理到Tomcat,而Tomcat中的程序,会进行串行操作,分为如下几个步骤
    1. 查询优惠券
    2. 判断秒杀库存是否足够
    3. 查询订单
    4. 校验是否一人一单
    5. 扣减库存
    6. 创建订单
  • 在这六个步骤中,有很多操作都是要去操作数据库的,而且还是一个线程串行执行,这样就会导致我们的程序执行很慢,所以我们需要异步程序执行,那么如何加速呢?
  • 优化方案:我们将耗时较短的逻辑判断放到Redis中,例如:库存是否充足,是否一人一单这样的操作,只要满足这两条操作,那我们是一定可以下单成功的,不用等数据真的写进数据库,我们直接告诉用户下单成功就好了。然后后台再开一个线程,后台线程再去慢慢执行队列里的消息,这样我们就能很快的完成下单业务。

1653560986599

优化方案:我们将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单,这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功, 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息,这样程序不就超级快了吗?而且也不用担心线程池消耗殆尽的问题,因为这里我们的程序中并没有手动使用任何线程池,当然这里边有两个难点

第一个难点是我们怎么在redis中去快速校验一人一单,还有库存判断

第二个难点是由于我们校验和tomct下单是两个线程,那么我们如何知道到底哪个单他最后是否成功,或者是下单完成,为了完成这件事我们在redis操作完之后,我们会将一些信息返回给前端,同时也会把这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询我们tomcat中的下单逻辑是否完成了。

1653561657295

  • 但是这里还存在两个难点
    1. 我们怎么在Redis中快速校验是否一人一单,还有库存判断
    2. 我们校验一人一单和将下单数据写入数据库,这是两个线程,我们怎么知道下单是否完成。
      • 我们需要将一些信息返回给前端,同时也将这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询下单逻辑是否完成
  • 我们现在来看整体思路:当用户下单之后,判断库存是否充足,只需要取Redis中根据key找对应的value是否大于0即可,如果不充足,则直接结束。如果充足,则在Redis中判断用户是否可以下单,如果set集合中没有该用户的下单数据,则可以下单,并将userId和优惠券存入到Redis中,并且返回0,整个过程需要保证是原子性的,所以我们要用Lua来操作,同时由于我们需要在Redis中查询优惠券信息,所以在我们新增秒杀优惠券的同时,需要将优惠券信息保存到Redis中
  • 完成以上逻辑判断时,我们只需要判断当前Redis中的返回值是否为0,如果是0,则表示可以下单,将信息保存到queue中去,然后返回,开一个线程来异步下单,其阿奴单可以通过返回订单的id来判断是否下单成功

1653562234886

6.2 秒杀优化-Redis完成秒杀资格判断

需求:

  • 新增秒杀优惠券的同时,将优惠券信息保存到Redis中

  • 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

  • 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列

  • 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

    1656080546603

VoucherServiceImpl

@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
    // 保存优惠券
    save(voucher);
    // 保存秒杀信息
    SeckillVoucher seckillVoucher = new SeckillVoucher();
    seckillVoucher.setVoucherId(voucher.getId());
    seckillVoucher.setStock(voucher.getStock());
    seckillVoucher.setBeginTime(voucher.getBeginTime());
    seckillVoucher.setEndTime(voucher.getEndTime());
    seckillVoucherService.save(seckillVoucher);
    // 保存秒杀库存到Redis中
    //SECKILL_STOCK_KEY 这个变量定义在RedisConstans中
    //private static final String SECKILL_STOCK_KEY ="seckill:stock:"
    stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
  • 使用PostMan发送请求,添加优惠券
    请求路径:http://localhost:8080/api/voucher/seckill
    请求方式:POST

  • {
        "shopId":1,
        "title":"9999元代金券",
        "subTitle":"365*24小时可用",
        "rules":"全场通用\\nApex猎杀无需预约",
        "payValue":1000,
        "actualValue":999900,
        "type":1,
        "stock":100,
        "beginTime":"2022-01-01T00:00:00",
        "endTime":"2022-12-31T23:59:59"
    }
    
  • 添加成功后,数据库中和Redis中都能看到优惠券信息

  • 步骤二:编写Lua脚本
    lua的字符串拼接使用..,字符串转数字是tonumber()

-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2.库存不足,返回1
    return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.3.存在,说明是重复下单,返回2
    return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0
  • 修改业务逻辑

VoucherOrderServiceImpl

private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
    SECKILL_SCRIPT =new DefaultRedisScript<>();
    SECKILL_SCRIPT,setLocation(new classPathResource("seckill.lua"));
    SECKILL_SCRIPT.setResultType(Long.class);
}


@Override
public Result seckillVoucher(Long voucherId) {
    //获取用户
    Long userId = UserHolder.getUser().getId();
    //生成订单ID
    long orderId = redisIdWorker.nextId("order");
    // 1.执行lua脚本
    Long result = stringRedisTemplate.execute(
            SECKILL_SCRIPT,
            Collections.emptyList(),
            voucherId.toString(), userId.toString(), String.valueOf(orderId)
    );
    int r = result.intValue();
    // 2.判断结果是否为0
    if (r != 0) {
        // 2.1.不为0 ,代表没有购买资格
        return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
    }
    //TODO 保存阻塞队列
    // 3.返回订单id
    return Result.ok(orderId);
}

6.3 秒杀优化-基于阻塞队列实现秒杀优化

  • 修改下单的操作,我们在下单时,是通过Lua表达式去原子执行判断逻辑,如果判断结果不为0,返回错误信息,如果判断结果为0,则将下单的逻辑保存到队列中去,然后异步执行
  • 需求
    1. 如果秒杀成功,则将优惠券id和用户id封装后存入阻塞队列
    2. 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
  • 步骤一:创建阻塞队列
    阻塞队列有一个特点:当一个线程尝试从阻塞队列里获取元素的时候,如果没有元素,那么该线程就会被阻塞,直到队列中有元素,才会被唤醒,并去获取元素
    阻塞队列的创建需要指定一个大小
private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
  • 那么把优惠券id和用户id封装后存入阻塞队列
@Override
public Result seckillVoucher(Long voucherId) {
    Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
            Collections.emptyList(), voucherId.toString(),
            UserHolder.getUser().getId().toString());
    if (result.intValue() != 0) {
        return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单");
    }
    long orderId = redisIdWorker.nextId("order");
    //封装到voucherOrder中
    VoucherOrder voucherOrder = new VoucherOrder();
    voucherOrder.setVoucherId(voucherId);
    voucherOrder.setUserId(UserHolder.getUser().getId());
    voucherOrder.setId(orderId);
    //加入到阻塞队列
    orderTasks.add(voucherOrder);
    return Result.ok(orderId);
}
  • 步骤二

实现异步下单功能

  1. 先创建一个线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
  1. 创建线程任务,秒杀业务需要在类初始化之后,就立即执行,所以这里需要用到@PostConstruct注解
@PostConstruct
private void init() {
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

private class VoucherOrderHandler implements Runnable {
    @Override
    public void run() {
        while (true) {
            try {
                //1. 获取队列中的订单信息
                VoucherOrder voucherOrder = orderTasks.take();
                //2. 创建订单
                handleVoucherOrder(voucherOrder);
            } catch (Exception e) {
                log.error("订单处理异常", e);
            }
        }
    }
}
  1. 编写创建订单的业务逻辑
private IVoucherOrderService proxy;
private void handleVoucherOrder(VoucherOrder voucherOrder) {
    //1. 获取用户
    Long userId = voucherOrder.getUserId();
    //2. 创建锁对象,作为兜底方案
    RLock redisLock = redissonClient.getLock("order:" + userId);
    //3. 获取锁
    boolean isLock = redisLock.tryLock();
    //4. 判断是否获取锁成功         
    if (!isLock) {
        log.error("不允许重复下单!");
        return;
    }
    try {
        //5. 使用代理对象,由于这里是另外一个线程,
        proxy.createVoucherOrder(voucherOrder);
    } finally {
        redisLock.unlock();
    }
}
  • 查看AopContext源码,它的获取代理对象也是通过ThreadLocal进行获取的,由于我们这里是异步下单,和主线程不是一个线程,所以不能获取成功
private static final ThreadLocal<Object> currentProxy = new NamedThreadLocal("Current AOP proxy");
  • 但是我们可以将proxy放在成员变量的位置,然后在主线程中获取代理对象
@Override
public Result seckillVoucher(Long voucherId) {
    Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
            Collections.emptyList(), voucherId.toString(),
            UserHolder.getUser().getId().toString());
    if (result.intValue() != 0) {
        return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单");
    }
    long orderId = redisIdWorker.nextId("order");
    //封装到voucherOrder中
    VoucherOrder voucherOrder = new VoucherOrder();
    voucherOrder.setVoucherId(voucherId);
    voucherOrder.setUserId(UserHolder.getUser().getId());
    voucherOrder.setId(orderId);
    //加入到阻塞队列
    orderTasks.add(voucherOrder);
    //主线程获取代理对象
    proxy = (IVoucherOrderService) AopContext.currentProxy();
    return Result.ok(orderId);
}
@Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        // 一人一单逻辑
        Long userId = voucherOrder.getUserId();
        Long voucherId = voucherOrder.getVoucherId();
        synchronized (userId.toString().intern()) {
            int count = query().eq("voucher_id", voucherId).eq("user_id", userId).count();
            if (count > 0) {
                log.error("你已经抢过优惠券了哦");
                return;
            }
            //5. 扣减库存
            boolean success = seckillVoucherService.update()
                    .setSql("stock = stock - 1")
                    .eq("voucher_id", voucherId)
                    .gt("stock", 0)
                    .update();
            if (!success) {
                log.error("库存不足");
            }
            //7. 将订单数据保存到表中
            save(voucherOrder);
        }
    }
  • 完整代码如下
package com.hmdp.service.impl;

import com.hmdp.dto.Result;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * <p>
 * 服务实现类
 * </p>
 *
 * @author Kyle
 * @since 2022-10-22
 */
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private RedisIdWorker redisIdWorker;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedissonClient redissonClient;

    private IVoucherOrderService proxy;


    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
        SECKILL_SCRIPT = new DefaultRedisScript();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    @PostConstruct
    private void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        //1. 获取用户
        Long userId = voucherOrder.getUserId();
        //2. 创建锁对象,作为兜底方案
        RLock redisLock = redissonClient.getLock("order:" + userId);
        //3. 获取锁
        boolean isLock = redisLock.tryLock();
        //4. 判断是否获取锁成功 
        if (!isLock) {
            log.error("不允许重复下单!");
            return;
        }
        try {
            //5. 使用代理对象,由于这里是另外一个线程,
            proxy.createVoucherOrder(voucherOrder);
        } finally {
            redisLock.unlock();
        }
    }

    private class VoucherOrderHandler implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    //1. 获取队列中的订单信息
                    VoucherOrder voucherOrder = orderTasks.take();
                    //2. 创建订单
                    handleVoucherOrder(voucherOrder);
                } catch (Exception e) {
                    log.error("订单处理异常", e);
                }
            }
        }
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
        Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
                Collections.emptyList(), voucherId.toString(),
                UserHolder.getUser().getId().toString());
        if (result.intValue() != 0) {
            return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单");
        }
        long orderId = redisIdWorker.nextId("order");
        //封装到voucherOrder中
        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setVoucherId(voucherId);
        voucherOrder.setUserId(UserHolder.getUser().getId());
        voucherOrder.setId(orderId);
        //加入到阻塞队列
        orderTasks.add(voucherOrder);
        //主线程获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();
        return Result.ok(orderId);
    }


    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        // 一人一单逻辑
        Long userId = voucherOrder.getUserId();
        Long voucherId = voucherOrder.getVoucherId();
        synchronized (userId.toString().intern()) {
            int count = query().eq("voucher_id", voucherId).eq("user_id", userId).count();
            if (count > 0) {
                log.error("你已经抢过优惠券了哦");
                return;
            }
            //5. 扣减库存
            boolean success = seckillVoucherService.update()
                    .setSql("stock = stock - 1")
                    .eq("voucher_id", voucherId)
                    .gt("stock", 0)
                    .update();
            if (!success) {
                log.error("库存不足");
            }
            //7. 将订单数据保存到表中
            save(voucherOrder);
        }
    }
}

小结

小总结:

秒杀业务的优化思路是什么?

  • 先利用Redis完成库存余量、一人一单判断,完成抢单业务
  • 再将下单业务放入阻塞队列,利用独立线程异步下单
  • 基于阻塞队列的异步秒杀存在哪些问题?
    • 内存限制问题:
      • 我们现在使用的是JDK里的阻塞队列,它使用的是JVM的内存,如果在高并发的条件下,无数的订单都会放在阻塞队列里,可能就会造成内存溢出,所以我们在创建阻塞队列时,设置了一个长度,但是如果真的存满了,再有新的订单来往里塞,那就塞不进去了,存在内存限制问题
    • 数据安全问题:
      • 经典服务器宕机了,用户明明下单了,但是数据库里没看到

7、Redis消息队列

7.1 Redis消息队列-认识消息队列

什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

1653574849336

  • 使用队列的好处在于解耦:举个例子,快递员(生产者)把快递放到驿站/快递柜里去(Message Queue)去,我们(消费者)从快递柜/驿站去拿快递,这就是一个异步,如果耦合,那么快递员必须亲自上楼把快递递到你手里,服务当然好,但是万一我不在家,快递员就得一直等我,浪费了快递员的时间。所以解耦还是非常有必要的
  • 那么在这种场景下我们的秒杀就变成了:在我们下单之后,利用Redis去进行校验下单的结果,然后在通过队列把消息发送出去,然后在启动一个线程去拿到这个消息,完成解耦,同时也加快我们的响应速度
  • 这里我们可以直接使用一些现成的(MQ)消息队列,如kafka,rabbitmq等,但是如果没有安装MQ,我们也可以使用Redis提供的MQ方案(学完Redis我就去学微服务)

7.2 Redis消息队列-基于List实现消息队列

基于List结构模拟消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。

队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

1653575176451

基于List的消息队列有哪些优缺点?
优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

7.3 Redis消息队列-基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg :向一个频道发送消息
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

Subscribes the client to the given patterns.
Supported glob-style patterns:

  • h?flo subscribes to hello, hallo and hxllo
  • h*llo subscribes to hllo and heeeello
  • h[ae]llo subscribes to hello and hallo, but not hillo

Use \ to escape special characters if you want to match them verbatim.

1653575506373

  • 基于PubSub的消息队列有哪些优缺点
    • 优点:
      1. 采用发布订阅模型,支持多生产,多消费
    • 缺点:
      1. 不支持数据持久化
      2. 无法避免消息丢失(如果向频道发送了消息,却没有人订阅该频道,那发送的这条消息就丢失了)
      3. 消息堆积有上限,超出时数据丢失(消费者拿到数据的时候处理的太慢,而发送消息发的太快)

7.4 Redis消息队列-基于Stream的消息队列

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息的命令:

1653577301737

例如:

1653577349691

读取消息的方式之一:XREAD

1653577445413

例如,使用XREAD读取第一个消息:

1653577643629

XREAD阻塞方式,读取最新的消 息:

1653577659166

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下

1653577689129

注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

7.5 Redis消息队列-基于Stream的消息队列-消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

1653577801668

创建消费者组:
1653577984924其它常见命令:

  • key
    • 队列名称
  • groupName
    • 消费者组名称
  • ID
    • 起始ID标识,$代表队列中的最后一个消息,0代表队列中的第一个消息
  • MKSTREAM
    • 队列不存在时自动创建队列

删除指定的消费者组

XGROUP DESTORY key groupName

给指定的消费者组添加消费者

XGROUP CREATECONSUMER key groupname consumername

删除消费者组中的指定消费者

XGROUP DELCONSUMER key groupname consumername

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group
    • 消费者组名称
  • consumer
    • 消费者名,如果消费者不存在,会自动创建一个消费者
  • count
    • 本次查询的最大数量
  • BLOCK milliseconds
    • 当前没有消息时的最大等待时间
  • NOACK
    • 无需手动ACK,获取到消息后自动确认(一般不用,我们都是手动确认)
  • STREAMS key
    • 指定队列名称
  • ID
    • 获取消息的起始ID
      • >:从下一个未消费的消息开始(pending-list中)
      • 其他:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

消费者监听消息的基本思路:

while(true){
    // 尝试监听队列,使用阻塞模式,最大等待时长为2000ms
    Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >")
    if(msg == null){
        // 没监听到消息,重试
        continue;
    }
    try{
        //处理消息,完成后要手动确认ACK,ACK代码在handleMessage中编写
        handleMessage(msg);
    } catch(Exception e){
        while(true){
            //0表示从pending-list中的第一个消息开始,如果前面都ACK了,那么这里就不会监听到消息
            Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
            if(msg == null){
                //null表示没有异常消息,所有消息均已确认,结束循环
                break;
            }
            try{
                //说明有异常消息,再次处理
                handleMessage(msg);
            } catch(Exception e){
                //再次出现异常,记录日志,继续循环
                log.error("..");
                continue;
            }
        }
    }
}

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

最后我们来个小对比

1653578560691

7.6 基于Redis的Stream结构作为消息队列,实现异步秒杀下单

需求:

  • 创建一个Stream类型的消息队列,名为stream.orders
  • 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
  • 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单\

修改lua表达式,新增3.6

1656082824939

VoucherOrderServiceImpl

private class VoucherOrderHandler implements Runnable {

    @Override
    public void run() {
        while (true) {
            try {
                // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                    StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                );
                // 2.判断订单信息是否为空
                if (list == null || list.isEmpty()) {
                    // 如果为null,说明没有消息,继续下一次循环
                    continue;
                }
                // 解析数据
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> value = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                // 3.创建订单
                createVoucherOrder(voucherOrder);
                // 4.确认消息 XACK
                stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
            } catch (Exception e) {
                log.error("处理订单异常", e);
                //处理异常消息
                handlePendingList();
            }
        }
    }

    private void handlePendingList() {
        while (true) {
            try {
                // 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().count(1),
                    StreamOffset.create("stream.orders", ReadOffset.from("0"))
                );
                // 2.判断订单信息是否为空
                if (list == null || list.isEmpty()) {
                    // 如果为null,说明没有异常消息,结束循环
                    break;
                }
                // 解析数据
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> value = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                // 3.创建订单
                createVoucherOrder(voucherOrder);
                // 4.确认消息 XACK
                stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
            } catch (Exception e) {
                log.error("处理pendding订单异常", e);
                try{
                    Thread.sleep(20);
                }catch(Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/871761.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

音视频相关知识

H.264编码格式 音频 PCM就是要把声音从模拟信号转换成数字信号的一种技术&#xff0c;他的原理简单地说就是利用一个固定的频率对模拟信号进行采样。 pcm是无损音频音频文件格式 每秒15帧 一秒钟300kb 单位&#xff1a;像素

故障频发,给我一个完美的解释...

1.盘点事故 8月19日&#xff0c;网易云音乐「崩了」&#xff0c;网页端报错&#xff0c;App 无法使用&#xff0c;什么原因&#xff1f;你那受影响了吗&#xff1f; 一次更新&#xff0c;一串代码&#xff0c;全球宕机。7月19日下午发生了全球范围内的Windows大面积蓝屏事件&a…

Django | 从中间件的角度来认识Django发送邮件功能

文章目录 概要中间件中间件 ---> 钩子实现中间件demo 邮件发送过程Django如何做邮件服务配置流程 中间件结合邮件服务实现告警 概要 摘要 业务告警 邮件验证 密码找回 邮件告警 中间件 中间件 —> ‘钩子’ 例如 访问路由 的次数【请求】 中间件类须实现下列五个方法…

C++,std::queue 详解

文章目录 1. 概述2. 包含头文件3. 基本操作3.1 构造函数3.2 赋值操作3.3 成员函数 4. 迭代器5. 示例6. 注意事项参考 1. 概述 std::queue 是 C 标准模板库&#xff08;STL&#xff09;中的一个容器适配器&#xff0c;它提供了一种先进先出&#xff08;FIFO&#xff09;的数据结…

[数据集][目标检测]木材缺陷检测数据集VOC+YOLO格式2383张10类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;2383 标注数量(xml文件个数)&#xff1a;2383 标注数量(txt文件个数)&#xff1a;2383 标注…

网络安全入门教程(非常详细)从零基础入门到精通_网路安全 教程

前言 1.入行网络安全这是一条坚持的道路&#xff0c;三分钟的热情可以放弃往下看了。2.多练多想&#xff0c;不要离开了教程什么都不会了&#xff0c;最好看完教程自己独立完成技术方面的开发。3.有时多百度&#xff0c;我们往往都遇不到好心的大神&#xff0c;谁会无聊天天给…

线性数据结构的基本概念(数组,链表,栈,队列)

数组 数组由相同类型的元素组成&#xff0c;使用一块连续的内存来存储。 数组的特点是&#xff1a; 1.利用索引进行访问 2.容量固定 3.使用一块连续的内存来存储 各种操作的时间复杂度&#xff1a; 查找/修改&#xff1a;O&#xff08;1&#xff09;//访问特定位置的元素 插入…

[鹏城杯 2022]简单的php

题目源代码 <?phpshow_source(__FILE__); $code $_GET[code]; if(strlen($code) > 80 or preg_match(/[A-Za-z0-9]|\|"||\ |,|\.|-|\||\/|\\|<|>|\$|\?|\^|&|\|/is,$code)){die( Hello); }else if(; preg_replace(/[^\s\(\)]?\((?R)?\)/, , $code…

Qt实现tcp协议

void Widget::readyRead_slot() {//读取服务器发来的数据QByteArray msg socket->readAll();QString str QString::fromLocal8Bit(msg);QStringList list str.split(:);if(list.at(0) userName){QString str2;for (int i 1; i < list.count(); i) {str2 list.at(i);…

使用DOM破坏启动xss

目录 实验环境&#xff1a; 分析&#xff1a; 找破坏点&#xff1a; 查看源码找函数&#xff1a; 找到了三个方法&#xff0c;loadComments、escapeHTM 、displayComments loadComments escapeHTM displayComments&#xff1a; GOGOGO 实验环境&#xff1a; Lab: Exp…

JLMSR超分算法说明和效果

一、简介 JLMSR是基于加权概率模型构造的一种超分算法&#xff0c;属于传统超分&#xff0c;无需训练&#xff0c;适合硬件化。与传统超分的插值方案最大区别在于基于16*16的块进行上下文概率统计&#xff0c;结合权重进行插值。目前该插值方案已经线性化和整数化&#xff0c;…

加油卡APP系统,探索新的市场机遇

当下&#xff0c;汽车已经成为了不可缺少的出行工具&#xff0c;汽车加油也成为了必要的环节。加油卡作为一种高效、便捷的加油方式&#xff0c;具有非常的市场发展空间&#xff0c;也成为了新的创业方式&#xff01; 在网络时代中&#xff0c;加油卡APP的开发大大减少了用户在…

NVF04M录音芯片在宠物喂食器的应用:录音播放功能,内置SPI闪存

在现代社会中&#xff0c;宠物已经成为人们生活中的一部分&#xff0c;而宠物喂食器作为宠物养护的重要工具&#xff0c;也越来越受到人们的关注。为了满足人们对宠物喂食器的多样化需求&#xff0c;九芯电子供应商研发了一款NVF04M录音芯片。它在宠物喂食器中的作用主要是提供…

[机器学习]--线性回归算法

线性回归算法原理 线性关系在生活中有很多案例: 摄氏度和华氏度的转化: F C ⋅ 9 5 32 F C \cdot\frac{9}{5}32 FC⋅59​32学科最终成绩的计算: 最终成绩 0.3 \times 平时成绩 0.7 \times 期末成绩 线性回归(Linear regression)就是利用回归函数对一个或多个自变量…

WEB渗透免杀篇-Golang免杀

往期文章 WEB渗透免杀篇-免杀工具全集-CSDN博客 WEB渗透免杀篇-加载器免杀-CSDN博客 WEB渗透免杀篇-分块免杀-CSDN博客 WEB渗透免杀篇-Powershell免杀-CSDN博客 WEB渗透免杀篇-Python源码免杀-CSDN博客 WEB渗透免杀篇-C#源码免杀-CSDN博客 WEB渗透免杀篇-MSFshellcode免杀…

土壤墒情固定监测站的工作原理

TH-GTS03土壤墒情固定监测站是一种专门用于监测土壤墒情信息的设备&#xff0c;它通过一系列精密的传感器和数据处理系统&#xff0c;实时、准确地获取土壤的水分含量、温度以及其他相关参数&#xff0c;为农业生产、生态保护和水资源管理等提供重要依据。 土壤墒情固定监测站通…

想做好专业儿童研学项目解决方案,建议先看看这篇

大家好&#xff0c;我是爱吐槽也爱分享的“教育&科技跨界老顽童”。今天想跟大家聊聊这几年越来越火的“沉浸式数字化研学”&#xff0c;因为前不久刚刚参与了一个不错的专业儿童研学项目解决方案&#xff0c;有一些心得想要及时分享&#xff0c;尤其是也想搞专业儿童研学项…

vue3 安装element-plus进行一些简单的测试

1、安装element-plus 官网地址&#xff1a;https://element-plus.org/zh-CN/guide/installation.html 2、安装方法&#xff1a; # 选择一个你喜欢的包管理器# NPM npm install element-plus --save# Yarn yarn add element-plus# pnpm pnpm install element-plus 这里我选择…

STM32G474的HRTIM用作时基定时器

STM32G474的HRTIM由7个定时器组成&#xff0c;分别是主定时器&#xff0c;定时器A&#xff0c;定时器B&#xff0c;定时器C&#xff0c;定时器D&#xff0c;定时器E和定时器F&#xff0c;每个定时器均可单独配置&#xff0c;独立工作。每个定时器都有1个独立的计数器和4个独立的…

[基础入门]正向shell和反弹shell

前言 在渗透过程能获取shell是很重要的一点&#xff0c;首先可以使用一些漏洞对ssh和ftp进行攻击获取shell&#xff0c;如果没有这些漏洞可以考虑一下使用正向shell或者是反弹shell。 一、什么是正向shell和反弹shell 其实这个过程是相对的&#xff0c;需要找到一个参考点&a…