Redis可以做什么

  1. 记录帖子的点赞数、评论数和点击数(hash)
  2. 记录用户的帖子ID列表(排序),便于快速显示用户的帖子列表(zset)
  3. 记录帖子的标题、摘要、作者和封面信息,用于列表页展示(hash)
  4. 记录帖子的点赞用户ID列表,评论ID列表,用于显示和去重计数(zset)
  5. 缓存近期热帖的内容(帖子内容的空间占用比较大),减少数据库压力(hash)
  6. 记录帖子的相关文章ID,根据内容推荐相关帖子(list)
  7. 如果帖子ID是整数自增的,可以使用Redis来分配帖子ID(计数器)
  8. 收藏集和帖子之间的关系(zset)
  9. 记录热榜帖子ID列表、总热榜和分类热榜(zset)
  10. 缓存用户行为历史,过滤恶意行为(zset,hash)

Redis数据结构

string

字符串结构一个常见用途是缓存用户信息。将用的信息结构体使用JSON序列化成字符串,然后将序列化后的字符串塞进Redis来缓存,同样,取用户信息会进行一次反序列化。
Redis的字符串是动态字符串,可以修改。类似Java的ArrayList,采用预分配冗余空间的方式来减少内存的频繁分配。当字符串长度小于1MB时,扩容都是加倍现有的空间。大于1MB之后每次扩容1MB。字符串长度上限512MB。

list

Redis的列表相当于Java的LimkedList,是链表不是数组。双向。当列表弹出来最后一个元素之后,该数据结构会被自动删除,内存被回收。
Redis的列表结构常用来做异步队列使用。将需要延后处理的任务结构体序列化成字符串,塞进Redis列表,另一个线程从这个列表中轮询数据进行处理。
快速列表
Redis底层存储的并不是一个简单的linkedlist,而是称为快速链表(quicklist)的一个东西。首先在列表元素较少的情况下,会使用一块连续的内存存储,这个结构是ziplist,即压缩链表。它将所有的元素彼此紧挨着一起存储,分配的是一块连续的内存。当数据量比较多的时候才会改成quicklist,因为普通的链表需要的附加的指针空间太大,会浪费空间,还会加重内存的碎片化,比如prev和next指针。所以Redis将链表和ziplist结合起来组成了 quicklist,也就是将多个ziplist使用双向指针串起来使用。既满足了快速的插入删除性能,又不会出现太大的空间冗余。

hash

Redis的字典相当于Java中的HashMap。它是无序字典,内部存储了很多键值对。实现结构上与Java的HashMap也是一样的都是数组+链表的二维结构。第一维hash的数组碰撞时,就会将碰撞的元素使用链表串接起来。
渐进式rehash。

set

Redis的集合相当于Java中的HashSet,它内部的键值对是无序的、唯一的。它的内部实现相当于一个特殊的字典,字典中所有的value都是一个值NULL。

zset(有序列表)

类似于Java中的SortedSet和HashMap的结合体。一方面是一个Set保证了内部value的唯一性,另一方面可以给每个vlue赋予一个score,代表这个value的排序权重。内部实现用的是一种跳表的数据结构。
zset

跳跃列表
36b25c2fc83f3959ae374416d2dc587b

容器型数据结构的通用规则

list,set,hash,zset这四种数据结构是容器型数据结构,共享下面两条通用规则:

  • create if not existed:如果容器不存在,那就创建一个,再进行操作。
  • drop if no elements:如果容器里的元素没有了,那么立即删除容器,释放内存。

过期时间

Redis的所有数据结构可以设置过期时间,时间到了,Redis会自动删除相应的对象。
注意⚠️:如果一个字符串已经设定了过期时间,然后调用set方法修改了它,它的过期时间会消失。

分布式锁

Redis的分布式锁不能解决超时问题。如果再加锁和释放锁之间的逻辑执行得太长,以至于超出了锁的超时限制,就会出现问题。为了避免这个问题,Redis的分布式锁不要执行太长时间的任务。
可重入性:可重入性是指线程在持有锁的情况下再次请求加锁,如果一个锁支持同一个线程的多次加锁,那么这个锁就是可重入的。Redis分布式锁如果要支持可重入,需要对客户端的set方法进行包装,使用线程的Threadlocal变量存储当前持有锁的计数。不推荐使用可重入锁,它加重了客户端的复杂性,在编写业务方法时在逻辑上注意可以完全不使用可重入锁。

import redis.clients.jedis.Jedis;

public class RedisWithReentrantLock {
    private ThreadLocal<Map<String.Integer>> lockers = new ThreadLocal<>();
    private Jedis jedis;

    public RedisWithReentrantLock(Jedis jedis) {
        this.jedis = jedis;
    }

    private boolean _lock(String key) {
        return jedis.set(key, "", "nx", "ex", 5L) != null;
    }

    private void _unlock(String key) {
        jedis.del(key);
    }

    private Map<String, Integer> currentLockers() {
        Map<String, Integer> refs = lockers.get();
        if (refs != null) {
            return refs;
        }
        lockers.set(new HashMap<>());
        return lockers.get();
    }

    public boolean lock(String key) {
        Map<String, Integer> refs = currentLockers();
        Integer refCnt = refs.get(key);
        if (refCnt != null) {
            refs.put(key, refCnt + 1);
            return true;
        }
        boolean ok = this._lock(key);
        if (!ok) {
            return false;
        }
        refs.put(key, 1);
        return true;
    }

    public boolean unlock(String key) {
        Map<String,Integer> refs = currentLockers();
        Integer refCnt = refs.get(key);
        if(refCnt == null){
            return false;
        }
        refCnt -= -1;
        if(refCnt >0){
            refs.put(key,refCnt);
        }else{
            refs.remove(key);
            this._unlock(key);
        }
        return true;
    }
    public static void main(String[] args){
        Jedis jedis = new Jedis();
        RedisWithReentrantLock redis = new RedisWithReentrantLock(jedis);
        System.out.println(redis.lock("codehole"));
        System.out.println(redis.lock("codehole"));
        System.out.println(redis.unlock("codehole"));
        System.out.println(redis.unlock("codehole"));
    }
}

延时队列

对于只有一组消费者的消息队列,使用redis可以非常轻松地搞定。需要注意的是,Redis的消息队列不是专业的消息队列,没有ack保证,如果对消息的可靠性有着极高要求,那么它就不适合使用。

异步消息队列

list: rpush,lpush,rpop,lpop;支持多个生产者和多个消费者并发进出消息。

242ff9eadd78726b964d3630618dee10

队列空了

队列空了,客户端会陷入pop的死循环,不停pop,这就是浪费生命的空轮询。空轮询不仅拉高了客户端的CPU消耗,Redis的QPS也会被拉高。通常使用sleep解决。让线程睡一会。 Thread.sleep(1000)

阻塞读

睡眠会导致消息的延迟增大。可以使用blpop/brpop
b:blocking
阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立即醒过来,消息的延迟几乎为0.

空闲连接自动断开

如果线程一直阻塞在哪里,Redis的客户端就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少资源闲置。这时,brpop/blpop会抛出异常。在编写客户端时注意捕获到异常还要重试。

锁冲突处理

三种方式处理加锁失败:

  1. 直接抛出特定类型的异常:适合用户直接发起的请求,可以由前端的代码替代用户来进行延时重试控制。本质上是对当前请求的放弃,由用户决定是否重新发起请求。
  2. sleep:可能造成阻塞
  3. 延时队列

延时队列的实现

延时队列可通过Redis的zset(有序列表)来实现。将消息序列化成一个字符串作为zset的value,这个消息的到期处理时间作为score,然后用多个线程轮询zset获取到期的任务进行处理。多个线程是为了保障可用性,万一挂了一个线程还有其他线程可以继续处理。因为有多个线程,所以考虑并发争抢任务,确保任务不会被多次执行。

import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;

import redis.clients.jedis.Jedis;

public class RedisDelayingQueue<T> {
	static class TaskItem<T> {
		public String id;
		public T msg;
		}
	//fastjson序列化对象中存在generic类型时,需要使用			TypeReference
	private Type TaskType = new TypeRefenrence<TaskItem<T>>(){}.getType();
	private Jedis jedis;
	private String queueKey;
	public RedisDelayingQueue(Jedis jedis, String queueKey) {
	this.jedis = jedis;
	this.queueKey = queueKey;
	}
public void delay(T msg) {
	TaskItem<T> task = new TaskItem<T>();
	//分配唯一的uuid
	task.id = UUID.randomUUID().toString();
	task.msg = msg;
	//fastjson序列化
	String s = JSON.toJSONString(task);
	//塞入延时队列,5s后再试
	jedis.zadd(queueKey, System.currentTimeMillis()+5000,s);
}

public void loop(){
	while(!Thread.interrupted()){
	//只取一条
	Set<String> values = jedis.zrangeByScore(queueuKey,0,System.currentTimeMillis(),0,1);
	if(valus.isEmpty()) {
		try{
			Thread.sleep(500);
		}catch(InterruptedException e) {
			break;
		}
		continue;
	}
	String s = values.iterator().next();
	if(jedis.zrem(queueuKey, s) > 0) { //抢到了
		//fastjson反序列化
		TaskItem<T> task = JSON.parseObject(s,TaskType);
		this.handleMsg(task.msg);
	}
}
}
public void handleMsg(T msg) {
	System.out.println(msg);
}

public static void main(String[] args) {
	Jedis jedis = new Jedis();
	RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
	THread producer = new Thread(){
public void run() {
		for(int i=0;i<10;i++) {
			queue.delay("codehole"+i);
		}
	}
};
Thread consumer. = new Thread(){
	public void run() {
		queue.loop();
	}
};

producer.start();
consumer.start();
try{
	producer.join();
	Thread.sleep(6000);
	consumer.interrupt();
	consumer.join();
} catch (InterruptedException e) {
}
}
}
	

在上面的算法中,同一个任务可能会被多个进程取到之后再使用zrem进行争抢。那些没抢到的进程都白取了一次任务。

位图

为了解决bool型数据的存储,Redis提供了位图数据结构。byte数组。可以使用不同的get/set直接获取和设置整个位图的内容,也可以使用位图操作getbit/setbit等将byte数组看成位数组来处理。
可以零存整取,也可以整存整取、整存零取。

统计和查找

redis提供了位图统计指令bitcount和位图查找指令bitpos。bitcount用来统计指定范围内1的个数,bitpos用来查找指定范围内出现的第一个0或1。比如我们可以通过bitcount统计用户一共签到了多少天,bitpos统计用户从那一天开始第一次签到。

魔术指令bitfield

HyperLogLog

高级数据结构,提供不精确的去重计数方案。
pfadd,pfcount:增加计数和获取计数。pf:发明人的姓名缩写。
pfmerge:用于将多个pf计数值累加在一起形成一个新的pf值。

Redis对HyperLogLog的存储进行了优化,在计数比较小时,它的存储空间采用稀疏矩阵存储,空间占用很小,仅仅在计数慢慢变大,稀疏矩阵占用空间渐渐超过了阈值时,才会一次性转变为稠密矩阵,占用12KB的空间。

HyperLogLog实现原理

给定一系列的随机整数,记录下低位连续0位的最大长度K,通过这个K值可以估算出随机数的数量N。通过实验可以发现K和N的对数之间存在显著的线形相关性。如果N介于2K和2k+1之间,可以采用多个BitKeeper,然后进行加权估计。

import java.util.concurrent.ThreadLocalRandom;

public class PfTest {
    static class BitKeeper {
        private int maxbits;

        public void random() {
            long value = ThreadLocalRandom.current().nextLong(2L << 32);
            int bits = lowZeros(value);
            if (bits > this.maxbits) {
                this.maxbits = bits;
            }
        }

        private int lowZeros(long value) {
            int i = 1;
            for (; i < 32; i++) {
                if (value >> i << i != value) {
                    break;
                }
            }
            return i - 1;
        }
    }
    static class Experiment {
        private int n;
        private BitKeeper keeper;

        public Experiment(int n){
            this.n = n;
            this.keeper = new BitKeeper();
        }

        public void work() {
            for(int i=0;i<n;i++){
                this.keeper.random();
            }
        }
        public void debug() {
            System.out.printf("%d %.2f %d\n", this.n,Math.log(this.n) / Math.log(2), this.keeper.maxbits);
        }
    }
    public static void main(String []args) {
        for(int i=1000;i<100000;i+=10){
            Experiment exp = new Experiment(i);
            exp.work();
            exp.debug();
        }
    }
    }
}

pf的占用内存为12KB:在redis的HyperLoglog实现中使用的是16384个桶,也就是214,每个桶的maxbits需要6个bit来存储,最大可以表示63,214*6/8=12KB

布隆过滤器

当布隆过滤器说某个值存在时,这个值他可能不存在,当说某个值不存在时,这个值一定不存在。使用场景:布隆过滤器能准确过滤掉那些用户已经看过的内容,那些用户没有看过的新内容,它也会过滤掉极小一部分(误判),但是绝大多数新内容它都能准确识别,这样就可以保证推荐给用户的内容都是无重复的。
bf.add,bf.exists.多个的话需要增加m。
布隆过滤器对于已经见过的元素肯定不会误判,它只会误判那些没有见过的元素。
bf.reserve对应有三个参数:key,error_rate和initial_size。

布隆过滤器的原理

每个布隆过滤器对应到的redis数据结构里面就是一个大型的位数组和几个不一样的无偏hash函数。所谓无偏就是能够把元素的hash算得比较均匀,让元素被hash映射到位数组中的位置比较均匀。
向布隆过滤器中添加key时,会使用多个hash函数对key进行hash,算得一个整数索引值,然后对位数组长度进行取模运算得到一个位置,每个hash函数都会算得一个不同的位置。再把位数组的这几个位置置为1,就完成了add操作。
使用时不要让实际元素数量远大于初始化数量,当实际元素数量开始超出初始化数量时,应该对布隆过滤器进行重建,重新分配一个size更大的过滤器,再将所有的历史元素批量add进去。因为error_rate不会因为数量刚一超出就急剧增加,这给我们重建过滤器提供了时间。

空间占用统计

布隆过滤器有两个参数,第一个预计元素的数量n,第二个是错误率f。公式根据这两个输入得到两个输出,第一个输出是位数组的长度l,也就是需要的存储空间大小(bit),第二个输出是hash函数的最佳数量k。hash函数的数量也会直接影响到错误率,最佳的数量会有最低的错误率。

k=0.7*(1/n) f=0.6185^(1/n)

从公式中可以看出以下结论:

  • 位数组相对越长(1/n),错误率f越低,这个和直观上的理解是一致的。
  • 位数组相对越长,hash函数需要的最佳数量也越多,影响计算效率。
  • 当一个元素平均需要一个字节8bit的指纹空间时,错误率大约为2%。
  • 错误率为1%时,一个元素需要的平均指纹空间大约为10bit。

实际元素超出时误判率变化

f=(1-0.5^t)^k #极限近似,k是hash函数的最佳数量

布隆过滤器的其他应用

  • 在爬虫系统中对URL进行去重。
  • 在NoSQL数据库中使用非常广泛,可以显著降低数据库的IO请求数量,当用户来查询某个row时,可以先通过内存中的布隆过滤器过滤掉大量不存在的row请求,然后再去磁盘进行查询。
  • 邮箱系统的垃圾邮件过滤功能也普遍用到了布隆过滤器。

简单限流

除了控制流量,限流还有一个应用的目的是控制用户行为,避免垃圾请求。

使用Redis实现简单限流策略

系统要限定用户的某个行为在规定的时间里只能允许发生N次。
定义接口:

# 指定用户user_id的某个行为action_key在特定的period内只允许发生最多的次数max_count
def is_action_allowed(user_id, action_key, period, max_count)
:
	return True
can_reply = is_action_allowed("XXX", "reply", 60, 5)
if can_reply:
	dp_reply()
else:
	raise ActionThresholdOverflow()

解决方案

可以使用zset的score保存毫秒时间戳。用一个zset结构记录用户的行为历史,每一个行为都会作为zset中的一个key保存下来,同一个用户的同一种行为用一个zset记录。为节省内存,只需要保留时间窗口内的行为记录,同时如果用户是冷用户,滑动时间窗口内的行为是空记录,那么这个zset就可以从内存中移除,不再占用空间。通过统计滑动窗口内的行为数量与阈值max_count进行比较就可以得知行为是否被允许。

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;

public class SimpleRateLimiter {
    private Jedis jedis;

    public SimpleRateLimiter(Jedis jedis) {
        this.jedis = jedis;
    }

    public boolean isActionAllowed(String userId, String actionKey, long period, int maxCount) {
        String key = String.format("hsit:%s%s", userId, actionKey);
        long nowTs = System.currentTimeMillis();
        Pipeline pipe = jedis.pipelined();
        pipe.multi();
        pipe.zadd(key, nowTs, "" + nowTs);
        pipe.zremrangeByScore(key, 0, nowTs - period * 1000);
        Response<Long> count = pipe.zcard(key);
        pipe.expire(key, period);
        pipe.exec();
        pipe.close();
        return count.get() <= maxCount;
    }
    public static void main(String [] args){
        Jedis jedis = new Jedis();
        SimpleRateLimiter limiter = new SimpleRateLimiter(jedis);
        for(int i=0;i<20;i++){
            System.out.println(limiter.isActionAllowed("XXX", "reply", 60, 5));
        }
    }
}

Q.E.D.


南七技校练习时长四年的练习生