1,服务器启动:
- int main(int argc, char **argv)
//这里省略一堆初始化代码
//死循环的方法
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
- void aeMain(aeEventLoop *eventLoop)
eventLoop->stop = 0;
while (!eventLoop->stop) {
//开始循环处理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
2,事件循环处理
- int aeProcessEvents(aeEventLoop *eventLoop, int flags)
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
//.........省略部分代码
//这里会回写上一次循环中客户端的读写事件,如:本次循环接受到一个get命令,则在下一次循环中回写;(注意:这里是6.0的处理,与版本实现有关)
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
//这里获取本次事件循环中所有客户端请求事件,例如当前有两个客户端同时发送了两条命令,则有两个事件
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
//循环处理客户端读写事件
for (j = 0; j < numevents; j++) {
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
/* Check time events */
//处理完客户端事件后,开始处理时间时间
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
到这里已经可以知道redis处理客户端事件时是单线程还是多线程的了,下面了解一下bgsave, bgrewriteaof,rdb与aof创建子进程的过程相似,直接看rdb的代码,如下
- void bgsaveCommand(client *c)
//不为-1表示有子进程在进行rdb
if (server.rdb_child_pid != -1) {
addReplyError(c,"Background save already in progress");
} else if (hasActiveChildProcess()) {
if (schedule) {
server.rdb_bgsave_scheduled = 1;
addReplyStatus(c,"Background saving scheduled");
} else {
addReplyError(c,
"Another child process is active (AOF?): can't BGSAVE right now. "
"Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
"possible.");
}
} else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) {
addReplyStatus(c,"Background saving started");
} else {
addReply(c,shared.err);
}
int hasActiveChildProcess() ,方法注释如下:Return true if there are no active children processes doing RDB saving, AOF rewriting, or some side process spawned by a loaded module.代码:
return server.rdb_child_pid != -1 || //没有子进程在进行rdb server.aof_child_pid != -1 || // 没有子进程进程aof server.module_child_pid != -1;//这个没有看到触发
- 设置module_child_pid方法的注释:
/* Create a background child process with the current frozen snaphost of the * main process where you can do some processing in the background without * affecting / freezing the traffic and no need for threads and GIL locking. * Note that Redis allows for only one concurrent fork. * When the child wants to exit, it should call RedisModule_ExitFromChild. * If the parent wants to kill the child it should call RedisModule_KillForkChild * The done handler callback will be executed on the parent process when the * child existed (but not when killed) * Return: -1 on failure, on success the parent process will get a positive PID * of the child, and the child process will get 0. */
- 从上述注释可以知道,redis最多只会启动两个进程,在fork子进程时都会先去检测是否已经有子进程。
执行bgsave命令
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi)
pid_t childpid;
//再次检查是否已经有子进程
if (hasActiveChildProcess()) return C_ERR;
server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
openChildInfoPipe();
//fork子进程处理bgsave
if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
int retval;
/* Child */
redisSetProcTitle("redis-rdb-bgsave");
redisSetCpuAffinity(server.bgsave_cpulist);
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
sendChildCOWInfo(CHILD_TYPE_RDB, "RDB");
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
if (childpid == -1) {
closeChildInfoPipe();
server.lastbgsave_status = C_ERR;
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return C_ERR;
}
serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
updateDictResizePolicy();
return C_OK;
}
return C_OK; /* unreached */
3,总结
- redis是单线程的,进入main方法初始化server后,调用一个死循环的方法进行事件处理,在每次事件循环中获取当前所有客户端的事件,循环处理,处理完后响应消息在下一次循环回写客户端。在处理完文件事件后,进行时间事件处理
- redis只允许有一个子进程,在fork子进程时都要先检查是否已经有子进程
- redis 6.0后支持IOThreads多线程,后续补充相关文章