Redis可以做什么
- 记录帖子的点赞数、评论数和点击数(hash)
- 记录用户的帖子ID列表(排序),便于快速显示用户的帖子列表(zset)
- 记录帖子的标题、摘要、作者和封面信息,用于列表页展示(hash)
- 记录帖子的点赞用户ID列表,评论ID列表,用于显示和去重计数(zset)
- 缓存近期热帖的内容(帖子内容的空间占用比较大),减少数据库压力(hash)
- 记录帖子的相关文章ID,根据内容推荐相关帖子(list)
- 如果帖子ID是整数自增的,可以使用Redis来分配帖子ID(计数器)
- 收藏集和帖子之间的关系(zset)
- 记录热榜帖子ID列表、总热榜和分类热榜(zset)
- 缓存用户行为历史,过滤恶意行为(zset,hash)
Redis数据结构
string
字符串结构一个常见用途是缓存用户信息。将用的信息结构体使用JSON序列化成字符串,然后将序列化后的字符串塞进Redis来缓存,同样,取用户信息会进行一次反序列化。
Redis的字符串是动态字符串,可以修改。类似Java的ArrayList,采用预分配冗余空间的方式来减少内存的频繁分配。当字符串长度小于1MB时,扩容都是加倍现有的空间。大于1MB之后每次扩容1MB。字符串长度上限512MB。
list
Redis的列表相当于Java的LimkedList,是链表不是数组。双向。当列表弹出来最后一个元素之后,该数据结构会被自动删除,内存被回收。
Redis的列表结构常用来做异步队列使用。将需要延后处理的任务结构体序列化成字符串,塞进Redis列表,另一个线程从这个列表中轮询数据进行处理。
快速列表
Redis底层存储的并不是一个简单的linkedlist,而是称为快速链表(quicklist)的一个东西。首先在列表元素较少的情况下,会使用一块连续的内存存储,这个结构是ziplist,即压缩链表。它将所有的元素彼此紧挨着一起存储,分配的是一块连续的内存。当数据量比较多的时候才会改成quicklist,因为普通的链表需要的附加的指针空间太大,会浪费空间,还会加重内存的碎片化,比如prev和next指针。所以Redis将链表和ziplist结合起来组成了 quicklist,也就是将多个ziplist使用双向指针串起来使用。既满足了快速的插入删除性能,又不会出现太大的空间冗余。
hash
Redis的字典相当于Java中的HashMap。它是无序字典,内部存储了很多键值对。实现结构上与Java的HashMap也是一样的都是数组+链表的二维结构。第一维hash的数组碰撞时,就会将碰撞的元素使用链表串接起来。
渐进式rehash。
set
Redis的集合相当于Java中的HashSet,它内部的键值对是无序的、唯一的。它的内部实现相当于一个特殊的字典,字典中所有的value都是一个值NULL。
zset(有序列表)
类似于Java中的SortedSet和HashMap的结合体。一方面是一个Set保证了内部value的唯一性,另一方面可以给每个vlue赋予一个score,代表这个value的排序权重。内部实现用的是一种跳表的数据结构。
跳跃列表
容器型数据结构的通用规则
list,set,hash,zset这四种数据结构是容器型数据结构,共享下面两条通用规则:
- create if not existed:如果容器不存在,那就创建一个,再进行操作。
- drop if no elements:如果容器里的元素没有了,那么立即删除容器,释放内存。
过期时间
Redis的所有数据结构可以设置过期时间,时间到了,Redis会自动删除相应的对象。
注意⚠️:如果一个字符串已经设定了过期时间,然后调用set方法修改了它,它的过期时间会消失。
分布式锁
Redis的分布式锁不能解决超时问题。如果再加锁和释放锁之间的逻辑执行得太长,以至于超出了锁的超时限制,就会出现问题。为了避免这个问题,Redis的分布式锁不要执行太长时间的任务。
可重入性:可重入性是指线程在持有锁的情况下再次请求加锁,如果一个锁支持同一个线程的多次加锁,那么这个锁就是可重入的。Redis分布式锁如果要支持可重入,需要对客户端的set方法进行包装,使用线程的Threadlocal变量存储当前持有锁的计数。不推荐使用可重入锁,它加重了客户端的复杂性,在编写业务方法时在逻辑上注意可以完全不使用可重入锁。
import redis.clients.jedis.Jedis;
public class RedisWithReentrantLock {
private ThreadLocal<Map<String.Integer>> lockers = new ThreadLocal<>();
private Jedis jedis;
public RedisWithReentrantLock(Jedis jedis) {
this.jedis = jedis;
}
private boolean _lock(String key) {
return jedis.set(key, "", "nx", "ex", 5L) != null;
}
private void _unlock(String key) {
jedis.del(key);
}
private Map<String, Integer> currentLockers() {
Map<String, Integer> refs = lockers.get();
if (refs != null) {
return refs;
}
lockers.set(new HashMap<>());
return lockers.get();
}
public boolean lock(String key) {
Map<String, Integer> refs = currentLockers();
Integer refCnt = refs.get(key);
if (refCnt != null) {
refs.put(key, refCnt + 1);
return true;
}
boolean ok = this._lock(key);
if (!ok) {
return false;
}
refs.put(key, 1);
return true;
}
public boolean unlock(String key) {
Map<String,Integer> refs = currentLockers();
Integer refCnt = refs.get(key);
if(refCnt == null){
return false;
}
refCnt -= -1;
if(refCnt >0){
refs.put(key,refCnt);
}else{
refs.remove(key);
this._unlock(key);
}
return true;
}
public static void main(String[] args){
Jedis jedis = new Jedis();
RedisWithReentrantLock redis = new RedisWithReentrantLock(jedis);
System.out.println(redis.lock("codehole"));
System.out.println(redis.lock("codehole"));
System.out.println(redis.unlock("codehole"));
System.out.println(redis.unlock("codehole"));
}
}
延时队列
对于只有一组消费者的消息队列,使用redis可以非常轻松地搞定。需要注意的是,Redis的消息队列不是专业的消息队列,没有ack保证,如果对消息的可靠性有着极高要求,那么它就不适合使用。
异步消息队列
list: rpush,lpush,rpop,lpop;支持多个生产者和多个消费者并发进出消息。
队列空了
队列空了,客户端会陷入pop的死循环,不停pop,这就是浪费生命的空轮询。空轮询不仅拉高了客户端的CPU消耗,Redis的QPS也会被拉高。通常使用sleep解决。让线程睡一会。 Thread.sleep(1000)
阻塞读
睡眠会导致消息的延迟增大。可以使用blpop/brpop
b:blocking
阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立即醒过来,消息的延迟几乎为0.
空闲连接自动断开
如果线程一直阻塞在哪里,Redis的客户端就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少资源闲置。这时,brpop/blpop会抛出异常。在编写客户端时注意捕获到异常还要重试。
锁冲突处理
三种方式处理加锁失败:
- 直接抛出特定类型的异常:适合用户直接发起的请求,可以由前端的代码替代用户来进行延时重试控制。本质上是对当前请求的放弃,由用户决定是否重新发起请求。
- sleep:可能造成阻塞
- 延时队列
延时队列的实现
延时队列可通过Redis的zset(有序列表)来实现。将消息序列化成一个字符串作为zset的value,这个消息的到期处理时间作为score,然后用多个线程轮询zset获取到期的任务进行处理。多个线程是为了保障可用性,万一挂了一个线程还有其他线程可以继续处理。因为有多个线程,所以考虑并发争抢任务,确保任务不会被多次执行。
import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import redis.clients.jedis.Jedis;
public class RedisDelayingQueue<T> {
static class TaskItem<T> {
public String id;
public T msg;
}
//fastjson序列化对象中存在generic类型时,需要使用 TypeReference
private Type TaskType = new TypeRefenrence<TaskItem<T>>(){}.getType();
private Jedis jedis;
private String queueKey;
public RedisDelayingQueue(Jedis jedis, String queueKey) {
this.jedis = jedis;
this.queueKey = queueKey;
}
public void delay(T msg) {
TaskItem<T> task = new TaskItem<T>();
//分配唯一的uuid
task.id = UUID.randomUUID().toString();
task.msg = msg;
//fastjson序列化
String s = JSON.toJSONString(task);
//塞入延时队列,5s后再试
jedis.zadd(queueKey, System.currentTimeMillis()+5000,s);
}
public void loop(){
while(!Thread.interrupted()){
//只取一条
Set<String> values = jedis.zrangeByScore(queueuKey,0,System.currentTimeMillis(),0,1);
if(valus.isEmpty()) {
try{
Thread.sleep(500);
}catch(InterruptedException e) {
break;
}
continue;
}
String s = values.iterator().next();
if(jedis.zrem(queueuKey, s) > 0) { //抢到了
//fastjson反序列化
TaskItem<T> task = JSON.parseObject(s,TaskType);
this.handleMsg(task.msg);
}
}
}
public void handleMsg(T msg) {
System.out.println(msg);
}
public static void main(String[] args) {
Jedis jedis = new Jedis();
RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
THread producer = new Thread(){
public void run() {
for(int i=0;i<10;i++) {
queue.delay("codehole"+i);
}
}
};
Thread consumer. = new Thread(){
public void run() {
queue.loop();
}
};
producer.start();
consumer.start();
try{
producer.join();
Thread.sleep(6000);
consumer.interrupt();
consumer.join();
} catch (InterruptedException e) {
}
}
}
在上面的算法中,同一个任务可能会被多个进程取到之后再使用zrem进行争抢。那些没抢到的进程都白取了一次任务。
位图
为了解决bool型数据的存储,Redis提供了位图数据结构。byte数组。可以使用不同的get/set直接获取和设置整个位图的内容,也可以使用位图操作getbit/setbit等将byte数组看成位数组来处理。
可以零存整取,也可以整存整取、整存零取。
统计和查找
redis提供了位图统计指令bitcount和位图查找指令bitpos。bitcount用来统计指定范围内1的个数,bitpos用来查找指定范围内出现的第一个0或1。比如我们可以通过bitcount统计用户一共签到了多少天,bitpos统计用户从那一天开始第一次签到。
魔术指令bitfield
HyperLogLog
高级数据结构,提供不精确的去重计数方案。
pfadd,pfcount:增加计数和获取计数。pf:发明人的姓名缩写。
pfmerge:用于将多个pf计数值累加在一起形成一个新的pf值。
Redis对HyperLogLog的存储进行了优化,在计数比较小时,它的存储空间采用稀疏矩阵存储,空间占用很小,仅仅在计数慢慢变大,稀疏矩阵占用空间渐渐超过了阈值时,才会一次性转变为稠密矩阵,占用12KB的空间。
HyperLogLog实现原理
给定一系列的随机整数,记录下低位连续0位的最大长度K,通过这个K值可以估算出随机数的数量N。通过实验可以发现K和N的对数之间存在显著的线形相关性。如果N介于2K和2k+1之间,可以采用多个BitKeeper,然后进行加权估计。
import java.util.concurrent.ThreadLocalRandom;
public class PfTest {
static class BitKeeper {
private int maxbits;
public void random() {
long value = ThreadLocalRandom.current().nextLong(2L << 32);
int bits = lowZeros(value);
if (bits > this.maxbits) {
this.maxbits = bits;
}
}
private int lowZeros(long value) {
int i = 1;
for (; i < 32; i++) {
if (value >> i << i != value) {
break;
}
}
return i - 1;
}
}
static class Experiment {
private int n;
private BitKeeper keeper;
public Experiment(int n){
this.n = n;
this.keeper = new BitKeeper();
}
public void work() {
for(int i=0;i<n;i++){
this.keeper.random();
}
}
public void debug() {
System.out.printf("%d %.2f %d\n", this.n,Math.log(this.n) / Math.log(2), this.keeper.maxbits);
}
}
public static void main(String []args) {
for(int i=1000;i<100000;i+=10){
Experiment exp = new Experiment(i);
exp.work();
exp.debug();
}
}
}
}
pf的占用内存为12KB:在redis的HyperLoglog实现中使用的是16384个桶,也就是214,每个桶的maxbits需要6个bit来存储,最大可以表示63,214*6/8=12KB
布隆过滤器
当布隆过滤器说某个值存在时,这个值他可能不存在,当说某个值不存在时,这个值一定不存在。使用场景:布隆过滤器能准确过滤掉那些用户已经看过的内容,那些用户没有看过的新内容,它也会过滤掉极小一部分(误判),但是绝大多数新内容它都能准确识别,这样就可以保证推荐给用户的内容都是无重复的。
bf.add,bf.exists.多个的话需要增加m。
布隆过滤器对于已经见过的元素肯定不会误判,它只会误判那些没有见过的元素。
bf.reserve对应有三个参数:key,error_rate和initial_size。
布隆过滤器的原理
每个布隆过滤器对应到的redis数据结构里面就是一个大型的位数组和几个不一样的无偏hash函数。所谓无偏就是能够把元素的hash算得比较均匀,让元素被hash映射到位数组中的位置比较均匀。
向布隆过滤器中添加key时,会使用多个hash函数对key进行hash,算得一个整数索引值,然后对位数组长度进行取模运算得到一个位置,每个hash函数都会算得一个不同的位置。再把位数组的这几个位置置为1,就完成了add操作。
使用时不要让实际元素数量远大于初始化数量,当实际元素数量开始超出初始化数量时,应该对布隆过滤器进行重建,重新分配一个size更大的过滤器,再将所有的历史元素批量add进去。因为error_rate不会因为数量刚一超出就急剧增加,这给我们重建过滤器提供了时间。
空间占用统计
布隆过滤器有两个参数,第一个预计元素的数量n,第二个是错误率f。公式根据这两个输入得到两个输出,第一个输出是位数组的长度l,也就是需要的存储空间大小(bit),第二个输出是hash函数的最佳数量k。hash函数的数量也会直接影响到错误率,最佳的数量会有最低的错误率。
从公式中可以看出以下结论:
- 位数组相对越长(1/n),错误率f越低,这个和直观上的理解是一致的。
- 位数组相对越长,hash函数需要的最佳数量也越多,影响计算效率。
- 当一个元素平均需要一个字节8bit的指纹空间时,错误率大约为2%。
- 错误率为1%时,一个元素需要的平均指纹空间大约为10bit。
实际元素超出时误判率变化
布隆过滤器的其他应用
- 在爬虫系统中对URL进行去重。
- 在NoSQL数据库中使用非常广泛,可以显著降低数据库的IO请求数量,当用户来查询某个row时,可以先通过内存中的布隆过滤器过滤掉大量不存在的row请求,然后再去磁盘进行查询。
- 邮箱系统的垃圾邮件过滤功能也普遍用到了布隆过滤器。
简单限流
除了控制流量,限流还有一个应用的目的是控制用户行为,避免垃圾请求。
使用Redis实现简单限流策略
系统要限定用户的某个行为在规定的时间里只能允许发生N次。
定义接口:
# 指定用户user_id的某个行为action_key在特定的period内只允许发生最多的次数max_count
def is_action_allowed(user_id, action_key, period, max_count)
:
return True
can_reply = is_action_allowed("XXX", "reply", 60, 5)
if can_reply:
dp_reply()
else:
raise ActionThresholdOverflow()
解决方案
可以使用zset的score保存毫秒时间戳。用一个zset结构记录用户的行为历史,每一个行为都会作为zset中的一个key保存下来,同一个用户的同一种行为用一个zset记录。为节省内存,只需要保留时间窗口内的行为记录,同时如果用户是冷用户,滑动时间窗口内的行为是空记录,那么这个zset就可以从内存中移除,不再占用空间。通过统计滑动窗口内的行为数量与阈值max_count进行比较就可以得知行为是否被允许。
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
public class SimpleRateLimiter {
private Jedis jedis;
public SimpleRateLimiter(Jedis jedis) {
this.jedis = jedis;
}
public boolean isActionAllowed(String userId, String actionKey, long period, int maxCount) {
String key = String.format("hsit:%s%s", userId, actionKey);
long nowTs = System.currentTimeMillis();
Pipeline pipe = jedis.pipelined();
pipe.multi();
pipe.zadd(key, nowTs, "" + nowTs);
pipe.zremrangeByScore(key, 0, nowTs - period * 1000);
Response<Long> count = pipe.zcard(key);
pipe.expire(key, period);
pipe.exec();
pipe.close();
return count.get() <= maxCount;
}
public static void main(String [] args){
Jedis jedis = new Jedis();
SimpleRateLimiter limiter = new SimpleRateLimiter(jedis);
for(int i=0;i<20;i++){
System.out.println(limiter.isActionAllowed("XXX", "reply", 60, 5));
}
}
}
Q.E.D.