Lua脚本解决高并发与库存超卖

尽管从Redis6开始,网络I/O可以开启多线程模式(通过io-threads配置项),但主命令执行仍然保持单线程。因此,利用主命令执行单线程的性质,可以模拟高并发环境下加锁的处理,加上redis本身具有的超快处理优势,可以完美替代程序中加锁的逻辑。

具体流程图如下所示:

1.编写controller用例:

package com.example.demo;

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.*;

@RestController
public class LuaController {
    private static final String Lua_SCRIPT = """
            if tonumber(redis.call('exists',KEYS[1])) == 0 then
                redis.call('set',KEYS[1],'10')
            end

            if tonumber(redis.call('exists',KEYS[2])) == 0 then 
                redis.call('sadd',KEYS[2],'-1')
            end

            if tonumber(redis.call('get',KEYS[1]))>0 and tonumber(redis.call('sismember',KEYS[2],ARGV[1])) == 0 then
               redis.call('incrby',KEYS[1],'-1')
               redis.call('sadd',KEYS[2],ARGV[1])
               return 1
            else
               return 0
            end
            """;
    private final StringRedisTemplate stringRedisTemplate;

    LuaController(StringRedisTemplate redisTemplate){
        this.stringRedisTemplate = redisTemplate;
    }
    @GetMapping("sk")
    public Map<String,String> seckill(String pid){
        Map<String,String> resp = new HashMap<>();
        String uid = String.valueOf(new Random().nextInt(10000000));
        List<String> keys  = new ArrayList<>();
        keys.add("P"+pid);
        keys.add("U"+pid);
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(Lua_SCRIPT,Long.class);
        Long result = stringRedisTemplate.execute(redisScript,keys,uid);
        resp.put("uid",uid);
        resp.put("result",String.valueOf(result));
        return resp;
    }

}

其中,以productid为“P1001”为例:

 if tonumber(redis.call('exists',KEYS[1])) == 0 then
                redis.call('set',KEYS[1],'10')
            end

            if tonumber(redis.call('exists',KEYS[2])) == 0 then 
                redis.call('sadd',KEYS[2],'-1')
            end

这一段lua脚本表示如果当前redis没有存入“P1001”的商品,就初始化“P1001”=10,表示“P1001”的商品库存有10个,等待秒杀。

KEYS[2],即uid的列表,比如“U1001”,值是一个set集合,存储秒杀成功的用户id,比如{“1009099”,“90909009”,“68676889” .......},如果不存在这个列表就初始化这个列表,存一个占位符“-1”进去。

 if tonumber(redis.call('get',KEYS[1]))>0 and tonumber(redis.call('sismember',KEYS[2],ARGV[1])) == 0 then
               redis.call('incrby',KEYS[1],'-1')
               redis.call('sadd',KEYS[2],ARGV[1])
               return 1
            else
               return 0
            end

这段代码模拟加锁的场景,即如果先前的PID存在,即“P1001”存在并初始化了并且值是>0的,表示仍然还有余额,当前试图秒杀的用户如果已经在列表里了表示已经秒杀成了,禁止重复秒杀,只有没有秒杀成功的且还有余额的情况下返回1.如果“P1001”的值是负数,即“-1”,表示余额已经没有了或者试图重复秒杀就返回0.

2.编写测试用例

模拟50个并发用户的秒杀场景

package com.example.demo;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.web.client.RestTemplate;

import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;

@SpringBootTest
class Demo1ApplicationTests {

    @Test
    void AD() {
        LongAdder ones = new LongAdder(); // 统计秒杀成功用户数量
        ExecutorService pool =  Executors.newFixedThreadPool(50);
        CyclicBarrier barrier = new CyclicBarrier(50); // 利用CyclicBarrier循环屏障确保50个线程同时开始
        CountDownLatch cdl = new CountDownLatch(50); // 利用CountDownLatch确保50个线程都调用了Restful接口后主进程才结束
        Runnable task = new Runnable() {
            @Override
            public void run() {
                try {
                    barrier.await();
                }
                catch (InterruptedException e){
                    throw new RuntimeException(e);
                }
                catch (BrokenBarrierException e){
                    throw new RuntimeException(e);
                }
                String result = new RestTemplate().getForObject("http://localhost:8080/sk?pid=1080",String.class);
                System.out.println(result);
                ObjectMapper mapper = new ObjectMapper();
                Map map = null;
                try {
                    map = mapper.readValue(result, Map.class);
                } catch (JsonProcessingException e) {
                    throw new RuntimeException(e);
                }
                if(map.get("result").equals("1")){
                    ones.increment();
                }
                cdl.countDown();
            }
        };
        for(int i=0;i<50;i++){
            pool.submit(task);
        }
        try {

            cdl.await();
            System.out.println("秒杀成功用户数量: "+ones);
        }
        catch (InterruptedException e){
            throw new RuntimeException(e);
        }
    }

}

运行程序后,控制台打印输出

image-20250715113427350

此时查看数据库:

image-20250715113502791

image-20250715113514846

发现最终只有10个用户秒杀成功,有效地实现了高并发下秒杀场景的任务.

最后修改:2025 年 07 月 15 日
如果觉得我的文章对你有用,请随意赞赏