建站资讯

曹工说Redis源码(5)

作者:admin 发布时间:2020-08-02
所在位置:首页 > 基础教程 曹工说Redis源码(5)-- redis server 启动过程解析,以及EventLoop每次事件处理前的前置工作解析(下) - 三国梦回 - 博客园 来源: 作者: 访问次数:87
曹工说Redis源码(5)-- redis server 启动过程解析,以及EventLoop每次事件处理前的前置工作解析(下)
文章导航

Redis源码系列的初衷,是帮助我们尽快理解Redis,更懂Redis,而怎样才能懂,光看是不够的,建议跟着下面的这一篇,把环境搭建起来,后续可以自己阅读源码,或者跟着这边一起阅读。由于我用c也是很多年以前了,些许错误无可避免,希望读者能不吝指出。

曹工说Redis源码(1)-- redis debug环境搭建,使用clion,达到和调试java一样的效果

曹工说Redis源码(2)-- redis server 启动过程解析及简单c语言基本知识补充

曹工说Redis源码(3)-- redis server 启动过程完整解析(中)

曹工说Redis源码(4)-- 通过redis server源码来理解 listen 函数中的 backlog 参数

本讲将延续第三讲的主题,将启动过程的主体讲完。为了保证阅读体验,避免过于突兀,可以先阅读第三讲。本讲,主要讲解余下的部分:

创建pid文件 加载rdb、aof,读取数据 运行事故处理器,准备事件处理,EventLoop每次事件处理前的前置工作 创建pid文件

pid,也就是进程id,以后台模式运行时,redis会把自己的pid,写入到一个文件中,默认的文件路径和名称为:/var/run/redis.pid。

环境变量可配:

# When running daemonized, Redis writes a pid file in /var/run/redis.pid by
# default. You can specify a custom pid file location here.
pidfile /var/run/redis.pid

这些代码非常简洁:

void createPidFile(void) {
 // 1
 FILE *fp = fopen(server.pidfile, w 
 if (fp) {
 // 2
 fprintf(fp, %d\n , (int) getpid());
 // 3
 fclose(fp);
1,打开文件,这里的pidfile就是前面的文件夹名称,/var/run/redis.pid,环境变量可以对其修改。模式为w,表示将对其写入。 2,调用pid,获取当前进程的pid,写入该文件描述符 3,关闭文件。 加载rdb、aof

在启动,会检查aof和rdb选项是否打开,如果打开,则会去加载数据,这里要注意的是,redis总是先查看是否有 aof 开关是否打开;打开的话,则直接使用 aof;

如果 aof 没打开,则去加载 rdb 文件。

void loadDataFromDisk(void) {
 // 记录开始时间
 long long start = ustime();
 // AOF 持久化已打开
 if (server.aof_state == REDIS_AOF_ON) {
 // 尝试载入 AOF 文件
 if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
 // 打印载入信息,并计算载入耗时长度
 redisLog(REDIS_NOTICE, DB loaded from append only file: %.3f seconds ,
 (float) (ustime() - start) / 1000000);
 // AOF 持久化未打开
 } else {
 // 尝试载入 RDB 文件
 if (rdbLoad(server.rdb_filename) == REDIS_OK) {
 // 打印载入信息,并计算载入耗时长度
 redisLog(REDIS_NOTICE, DB loaded from disk: %.3f seconds ,
 (float) (ustime() - start) / 1000000);

加载的过程,现在来讲,不太合适,比如以aof为例,aof文件中存储了一条条的命令,加载 aof 文件的过程,其实就会在进程内部创建一个 fake client(源码中就这样命名,也就是一个假的手机客户端),来一条条地发送 aof 文件中的命令进行执行。

这个命令执行的过程,现在讲会有点早,所以 aof 也放后面吧,讲了命令执行再回头巡视这块。

事件循环结构体讲解

核心流程如下:

 // 1
 aeSetBeforeSleepProc(server.el, beforeSleep);
 // 2
 aeMain(server.el);

先看2处,这里传入server这个静态变量中的el属性,该属性就代表了当前事故处理器的状态,其定义如下:

 // 事件状态
 aeEventLoop *el;

el,实际就是EventLoop的简写;结构体 aeEventLoop,里面维护了:当前使用的多路复用库的函数、当前注册到多路复用库,在发生读写事件时,需要被通知的socket 文件描述符、以及其他一些东西。

typedef struct aeEventLoop {
 // 目前已注册的最大描述符
 int maxfd; /* highest file descriptor currently registered */
 // 目前已追踪的最大描述符
 int setsize; /* max number of file descriptors tracked */
 // 用于生成时间事件 id
 long long timeEventNextId;
 // 最后一次执行时间事件的时间
 time_t lastTime; /* Used to detect system clock skew */
 // 1 已注册的文件事件
 aeFileEvent *events; /* Registered events */
 // 2 已就绪的文件事件
 aeFiredEvent *fired; /* Fired events */
 // 3 时间事件
 aeTimeEvent *timeEventHead;
 // 事故处理器的开关
 int stop;
 // 4 多路复用库的私有数据
 void *apidata; /* This is used for polling API specific data */
 // 5 在事件处理前要执行的函数
 aeBeforeSleepProc *beforesleep;
} aeEventLoop;

1处,注册到多路复用库,需要监听的socket 文件描述符事件,比如,某socket的可读事件;


2处,以select或者epoll这类多路复用库为例,在一次 select 中,如果发现某些socket事件已经满足,则,这些ready的事件,会被存放到本属性中。

由于我的描述比较抽象,这里拿一段 man select中的说明给大家看下:

select() allow a program to monitor multiple file descriptors, waiting until one or more of the file e ready for some class of I/O operation (e.g., input possible). A file descriptor is considered ready if it is possible to perform the corresponding I/O operation (e.g., read(2)) without blocking.

直译一下:select() 允许一个程序去监听多个文件描述符,等待直到1个或多个文件描述符变成 ready状态,该状态下,可以不阻塞地读写该文件描述符。


3处,事件事件,主要用来周期执行,执行一些redis的后台任务,如删除过期key,后面细讲。


4处,指向当前正在使用的多路复用库的相关数据,目前redis支持:select、epoll、kqueue、evport


这里的1处,就是设置前面第5点提到的,设置事件处理前,先要执行的一个函数。

事件循环CPU的主循环
void aeMain(aeEventLoop *eventLoop) {
 eventLoop- stop = 0;
 while (!eventLoop- stop) {
 // 如果有需要在事故处理前执行的函数,那么运行它
 if (eventLoop- beforesleep != NULL)
 eventLoop- beforesleep(eventLoop);
 // 开始事件处理
 aeProcessEvents(eventLoop, AE_ALL_EVENTS);

能看到,一共2个部分,首先执行eventLoop的事故处理前要执行的函数;接着再开始事件处理。

事故处理前的前置执行函数

这里讲解下面这一句:

 eventLoop- beforesleep(eventLoop);

这个函数,在前面已经看到了,被赋值为:

 aeSetBeforeSleepProc(server.el, beforeSleep);

这个 beforeSleep如下:

void beforeSleep(struct aeEventLoop *eventLoop) {
 /* Run a fast expire cycle (the called function will return
 * ASAP if a fast cycle is not needed). */
 // 1 执行一次快速的主动过期检查
 if (server.active_expire_enabled server.masterhost == NULL)
 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
 // 2
 /* Write the AOF buffer on disk */
 // 3 将 AOF 缓冲区域的内容写入到 AOF 文件
 flushAppendOnlyFile(0);
 /* Call the Redis Cluster before sleep function. */
 // 在进入下个事件循环前,执行一些集群收尾工作
 if (server.cluster_enabled) clusterBeforeSleep();

1,这里会去执行主动的过期检查,大致流程代码如下:

void activeExpireCycle(int type) {
 /* This function has some global state in order to continue the work
 * incrementally across calls. */
 // 静态变量,用来累积函数连续执行时的数据
 static unsigned int current_db = 0; /* Last DB tested. */
 unsigned int j, iteration = 0;
 // 默认每次处理的数据库查询数量
 unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
 // 函数开始的时间
 long long start = ustime(), timelimit;
 dbs_per_call = server.dbnum;
 timelimit = 1000000 * ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC / server.hz / 100;
 timelimit_exit = 0;
 if (timelimit = 0) timelimit = 1;
 // 1 遍历数据库查询
 for (j = 0; j dbs_per_call; j++) {
 int expired;
 // 指向要处理的数据库查询
 redisDb *db = server.db + (current_db % server.dbnum);
 current_db++;
 do {
 unsigned long num, slots;
 long long now, ttl_sum;
 int ttl_samples;
 /* If there is nothing to expire try next DB ASAP. */
 // 2 读取数据库中带过期时间的键的数量 如果该数量为 0 ,直接跳过这个数据库查询
 if ((num = dictSize(db- expires)) == 0) {
 db- avg_ttl = 0;
 break;
 // 3 读取数据库中键值对的数量
 slots = dictSlots(db- expires);
 // 获取当前时间
 now = mstime();
 // 每次最多只能检查 LOOKUPS_PER_LOOP 个键
 if (num ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
 num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
 // 4 开始遍历数据库查询
 while (num--) {
 dictEntry *de;
 long long ttl;
 // 从 expires 中随机取出一个带过期时间的键
 if ((de = dictGetRandomKey(db- expires)) == NULL) break;
 // 计算 TTL
 ttl = dictGetSignedIntegerVal(de) - now;
 // 5 如果键已经过期,那么删除它,并将 expired 电子计数器增一
 if (activeExpireCycleTryExpire(db, de, now)) expired++;
 // 6 为这个数据库查询更新平均 TTL 数据统计
 // 更新遍历次数
 iteration++;
 // 7 每遍历 16 次执行一次
 if ((iteration 0xf) == 0 /* check once every 16 iterations. */
 (ustime() - start) timelimit) {
 // 如果遍历次数正好是 16 的倍数
 // 并且遍历的时间超过了 timelimit
 // 那么断开 timelimit_exit
 timelimit_exit = 1;
 // 8 已经超时了,返回
 if (timelimit_exit) return;
 /* We don't repeat the cycle if there are less than 25% of keys
 * found expired in the current DB. */
 // 如果已删除的过期键占当前总数据库查询带过期时间的键数量的 25 %
 // 那么不再遍历
 } while (expired ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP / 4);

这个函数,删减了一部分,留下了主流程:

1处,遍历数据库查询,一般就是遍历16个库 2处,获取当前库中,过期键的数量,过期键都存储在db- expires中,只需要算这个map的size即可;要是没有要过期的,处理下一个库 3处,获取过期键的数量 4处,开始遍历当前数据库查询的过期键,最多遍历20次,这里的num,被ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP赋值,这个值定义为20,换句话说,每次扫描一个库中,20个过期键 5处,如果键过期,则将这个key过期掉,比如从当前数据库查询删除,发布事件等等 6处,计算一些数据统计 7处,遍历16次,检查下是否已经执行了足够长的时间;因为redis是单线程的,不能一直执行过期键清理任务,还要处理手机客户端请求呢,所以,这里每执行16次循环,就检查下时间,看看是否已经超时,超时直接返回。 8处,超时返回

讲完了主动过期,接着讲前面的流程,2处,涉及一些主从复制相关的东西,这块放到后面吧


//1 是否有 SYNC 正在后台进行? sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

1处,会去判断一个静态变量,该变量是一个队列,用于存储后台任务。另外一个后台线程(没错,redis不是单纯的单线程,还是有其他线程的),会去该队列取任务,取不到就阻塞;取到了则执行。而刷新 aof 到磁盘这种重io的工作,就是被封装为一个任务,丢到这个队列中的。所以,这里去判断队列的大小是否为0.

/* Return the number of pending jobs of the specified type. 
 * 返回等待中的 type 类型的工作的数量
unsigned long long bioPendingJobsOfType(int type) {
 unsigned long long val;
 pthread_mutex_lock( bio_mutex[type]);
 // 1
 val = bio_pending[type];
 pthread_mutex_unlock( bio_mutex[type]);
 return val;

1处这里的val,就是存储指定类型的任务的数量。我们这里传入的type为 REDIS_BIO_AOF_FSYNC,所以就是看看:aof 刷盘的任务数量。


nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); if (nwritten != (signed)sdslen(server.aof_buf)) { // 2 }else{ // 3 /* Successful write(2). If AOF was in error state, restore the * OK state and log the event. */ // 写入成功,更新最后写入状态 if (server.aof_last_write_status == REDIS_ERR) { redisLog(REDIS_WARNING, AOF write error looks solved, Redis can write again. server.aof_last_write_status = REDIS_OK;

1处,执行写入,将server.aof_buf这个缓冲区域的内容,写入aof文件,写入的字节长度为sdslen(server.aof_buf)。也就是,将整个缓冲区域写入。


2处,如果写入的长度,并不等于缓冲区域的长度,表示只写了一部分,进入异常分支

为何写入的会比预期的少,我们看看官方说明:

write() writes up to count bytes from the buffer pointed buf to the file referred to by the file descriptor fd.
The number of bytes written may be less than count if, for example, there is insufficient space on the underlying physical medium, or the RLIMIT_FSIZE resource limit is encountered (see setrlimit(2)), or the call was interrupted by a signal handler after having written less than count bytes. (See also pipe(7).)

这里的第二段就说了,多是因为底层物理介质的空间不够;进程的资源限制;或者被中断。


flush到磁盘

前面write是写入到电脑操作系统的os cache中,但是都还没落盘。必须执行flush之后,才会刷盘。

 // 总是执行 fsnyc
 if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
 /* aof_fsync is defined as fdatasync() for Linux in order to avoid
 * flushing metadata. */
 // 1
 aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
 // 更新最后一次执行 fsnyc 的时间
 server.aof_last_fsync = server.unixtime;
 // 策略为每秒 fsnyc ,并且距离上次 fsync 已经超过 1 秒
 } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC 
 server.unixtime server.aof_last_fsync)) {
 // 2 放到后台执行
 if (!sync_in_progress) aof_background_fsync(server.aof_fd);
 // 更新最后一次执行 fsync 的时间
 server.aof_last_fsync = server.unixtime;

2处,如果策略为每秒刷盘:AOF_FSYNC_EVERYSEC,放到后台去刷盘。这里的放到后台,就是放到前面提到的任务队列中,由其他线程去刷。

void aof_background_fsync(int fd) {
 bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
 struct bio_job *job = zmalloc(sizeof(*job));
 job- time = time(NULL);
 job- arg1 = arg1;
 job- arg2 = arg2;
 job- arg3 = arg3;
 pthread_mutex_lock( bio_mutex[type]);
 // 1 将新工作推入队列
 listAddNodeTail(bio_jobs[type],job);
 bio_pending[type]++;
 pthread_cond_signal( bio_condvar[type]);
 pthread_mutex_unlock( bio_mutex[type]);

这里的1处,能看到,将任务丢到了队列中,且前后进行了加锁。因为这个队列,是会被其他线程访问的,所以为了线程安全,进行了加锁。


本篇主要讲了,redis启动过程当中,主循环的大流程,以及在主循环去处理一个事件之前,要执行的任务。这个主循环如何事件处理,放到下篇继续。


收缩