Lua脚本解决高并发与库存超卖
尽管从Redis6开始,网络I/O可以开启多线程模式(通过io-threads配置项),但主命令执行仍然保持单线程。因此,利用主命令执行单线程的性质,可以模拟高并发环境下加锁的处理,加上redis本身具有的超快处理优势,可以完美替代程序中加锁的逻辑。
具体流程图如下所示:
1.编写controller用例:
package com.example.demo;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.*;
@RestController
public class LuaController {
private static final String Lua_SCRIPT = """
if tonumber(redis.call('exists',KEYS[1])) == 0 then
redis.call('set',KEYS[1],'10')
end
if tonumber(redis.call('exists',KEYS[2])) == 0 then
redis.call('sadd',KEYS[2],'-1')
end
if tonumber(redis.call('get',KEYS[1]))>0 and tonumber(redis.call('sismember',KEYS[2],ARGV[1])) == 0 then
redis.call('incrby',KEYS[1],'-1')
redis.call('sadd',KEYS[2],ARGV[1])
return 1
else
return 0
end
""";
private final StringRedisTemplate stringRedisTemplate;
LuaController(StringRedisTemplate redisTemplate){
this.stringRedisTemplate = redisTemplate;
}
@GetMapping("sk")
public Map<String,String> seckill(String pid){
Map<String,String> resp = new HashMap<>();
String uid = String.valueOf(new Random().nextInt(10000000));
List<String> keys = new ArrayList<>();
keys.add("P"+pid);
keys.add("U"+pid);
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(Lua_SCRIPT,Long.class);
Long result = stringRedisTemplate.execute(redisScript,keys,uid);
resp.put("uid",uid);
resp.put("result",String.valueOf(result));
return resp;
}
}
其中,以productid为“P1001”为例:
if tonumber(redis.call('exists',KEYS[1])) == 0 then
redis.call('set',KEYS[1],'10')
end
if tonumber(redis.call('exists',KEYS[2])) == 0 then
redis.call('sadd',KEYS[2],'-1')
end
这一段lua脚本表示如果当前redis没有存入“P1001”的商品,就初始化“P1001”=10,表示“P1001”的商品库存有10个,等待秒杀。
KEYS[2],即uid的列表,比如“U1001”,值是一个set集合,存储秒杀成功的用户id,比如{“1009099”,“90909009”,“68676889” .......},如果不存在这个列表就初始化这个列表,存一个占位符“-1”进去。
if tonumber(redis.call('get',KEYS[1]))>0 and tonumber(redis.call('sismember',KEYS[2],ARGV[1])) == 0 then
redis.call('incrby',KEYS[1],'-1')
redis.call('sadd',KEYS[2],ARGV[1])
return 1
else
return 0
end
这段代码模拟加锁的场景,即如果先前的PID存在,即“P1001”存在并初始化了并且值是>0的,表示仍然还有余额,当前试图秒杀的用户如果已经在列表里了表示已经秒杀成了,禁止重复秒杀,只有没有秒杀成功的且还有余额的情况下返回1.如果“P1001”的值是负数,即“-1”,表示余额已经没有了或者试图重复秒杀就返回0.
2.编写测试用例
模拟50个并发用户的秒杀场景
package com.example.demo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.web.client.RestTemplate;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
@SpringBootTest
class Demo1ApplicationTests {
@Test
void AD() {
LongAdder ones = new LongAdder(); // 统计秒杀成功用户数量
ExecutorService pool = Executors.newFixedThreadPool(50);
CyclicBarrier barrier = new CyclicBarrier(50); // 利用CyclicBarrier循环屏障确保50个线程同时开始
CountDownLatch cdl = new CountDownLatch(50); // 利用CountDownLatch确保50个线程都调用了Restful接口后主进程才结束
Runnable task = new Runnable() {
@Override
public void run() {
try {
barrier.await();
}
catch (InterruptedException e){
throw new RuntimeException(e);
}
catch (BrokenBarrierException e){
throw new RuntimeException(e);
}
String result = new RestTemplate().getForObject("http://localhost:8080/sk?pid=1080",String.class);
System.out.println(result);
ObjectMapper mapper = new ObjectMapper();
Map map = null;
try {
map = mapper.readValue(result, Map.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
if(map.get("result").equals("1")){
ones.increment();
}
cdl.countDown();
}
};
for(int i=0;i<50;i++){
pool.submit(task);
}
try {
cdl.await();
System.out.println("秒杀成功用户数量: "+ones);
}
catch (InterruptedException e){
throw new RuntimeException(e);
}
}
}
运行程序后,控制台打印输出
此时查看数据库:
发现最终只有10个用户秒杀成功,有效地实现了高并发下秒杀场景的任务.