Redis核心流程? 本文分析基于Redis-1.0源码,核心流程代码主要分布在redis.c,ae.c两个文件中。

Notion版本

1.Redis核心流程中的重要数据结构

struct redisServer {    int port;     int fd;     redisDb *db;    aeEventLoop *el;    list *clients;    ...};
typedef struct redisClient {    int fd;    redisDb *db;    sds querybuf;    list *reply;} redisClient;
typedef struct redisDb {    dict *dict;    dict *expires;    int id;} redisDb;
typedef struct redisObject {    void *ptr;    int type;    int refcount;} robj;
typedef struct aeEventLoop {    long long timeEventNextId;    aeFileEvent *fileEventHead;    aeTimeEvent *timeEventHead;    int stop;} aeEventLoop;

基于以上数据结构,Redis就可以构建核心的Server与client的交互流程

2.Redis核心流程解析

不像一些科学计算的程序,运行一次就可以产出所有结果。Redis功能是通过不断地对外界事件进行响应实现的。这种类型的程序,一般都存在事件循环线程,不断地响应,并处理外界事件。

int main(int argc, char **argv) {    initServerConfig();...    initServer();    if (server.daemonize) daemonize();    redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);...    if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,        acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");    redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);    aeMain(server.el);    aeDeleteEventLoop(server.el);    return 0;}// ...表示略去的不重要的代码

在main函数入口中,主要做了三件事:

  • 初始化服务器配置,并初始化服务器
  • 使用aeCreateFileEvent函数在服务器结构的事件循环el中注册了事件,事件的回调函数为acceptHandler。这个函数会在server.fd就绪的时候被调用,进行accept行为,建立对应的redisClient资源。(具体细节会在后面进行介绍)
  • 开启aeMain()事件循环,处理事件。在redis-1.0的版本中,使用select来对网络I/O行为进行多路复用,当有就绪事件时,就调用事先在事件中注册的回调函数进行处理。

我们来看看acceptHandler函数中的逻辑:

static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {...    cfd = anetAccept(server.neterr, fd, cip, &cport);...    if ((c = createClient(cfd)) == NULL) {...    }...    server.stat_numconnections++;}

acceptHandler函数主要为客户端连接建立了一个redisClient对象,用来管理后续的请求与响应。那么createClient是如何创建redisClient的呢?

下面是createClient函数的源码:

static redisClient *createClient(int fd) {    redisClient *c = zmalloc(sizeof(*c));    anetNonBlock(NULL,fd);    anetTcpNoDelay(NULL,fd);    if (!c) return NULL;    selectDb(c,0);    c->fd = fd;    c->querybuf = sdsempty();    c->argc = 0;    c->argv = NULL;    c->bulklen = -1;    c->sentlen = 0;    c->flags = 0;    c->lastinteraction = time(NULL);    c->authenticated = 0;    c->replstate = REDIS_REPL_NONE;    if ((c->reply = listCreate()) == NULL) oom("listCreate");    listSetFreeMethod(c->reply,decrRefCount);    listSetDupMethod(c->reply,dupClientReplyValue);    if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,        readQueryFromClient, c, NULL) == AE_ERR) {        freeClient(c);        return NULL;    }    if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");    return c;}

在createClient中,首先会设置网络的I/O类型,以及一些TCP参数(说明连接是以TCP的方式进行的)。而后,会选择与客户端相连的数据库,并对redisClient中的一些属性进行初始化。最后会使用redisClient的句柄fd创建一个新的ae可读事件,在这个事件中,使用readQueryFromClient作为回调函数。最后,新建的这个redisClient会被添加到redisServer的clients链表中。

readQueryFromClient做了什么事呢?

static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {    redisClient *c = (redisClient*) privdata;    char buf[REDIS_IOBUF_LEN];    int nread;    REDIS_NOTUSED(el);    REDIS_NOTUSED(mask);    nread = read(fd, buf, REDIS_IOBUF_LEN);...again:    if (c->bulklen == -1) {        /* Read the first line of the query */        char *p = strchr(c->querybuf,'\n');        size_t querylen;        if (p) {...            if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;            return;        } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {            redisLog(REDIS_DEBUG, "Client protocol error");            freeClient(c);            return;        }    }...}

在readQueryFromClient函数中,会执行来自客户端buffer中的命令。并且,该事件的生命周期与响应的redisClient生命周期是一致的(被freeClient函数所关闭),当遇到错误或客户端请求关闭时,才会注销该事件。其余情况,会在每次调用之后,仍存在于事件循环el中。我们可以看到,真正用来执行命令的是processCommand函数

在processCommand函数中,有以下逻辑:

static int processCommand(redisClient *c) {    struct redisCommand *cmd;...    cmd = lookupCommand(c->argv[0]->ptr);...    /* Exec the command */    dirty = server.dirty;    cmd->proc(c);    if (server.dirty-dirty != 0 && listLength(server.slaves))        replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);    if (listLength(server.monitors))        replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);    server.stat_numcommands++;    /* Prepare the client for the next command */    if (c->flags & REDIS_CLOSE) {        freeClient(c);        return 0;    }    resetClient(c);    return 1;}

这一部分包含一个查找命令的逻辑以及执行命令的逻辑。所有的命令都被放在cmdTable[]中。

我们以get命令为例,看看get命令的proc函数getCommand都做了什么事情:

static void getCommand(redisClient *c) {    robj *o = lookupKeyRead(c->db,c->argv[1]);    if (o == NULL) {        addReply(c,shared.nullbulk);    } else {        if (o->type != REDIS_STRING) {            addReply(c,shared.wrongtypeerr);        } else {            addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));            addReply(c,o);            addReply(c,shared.crlf);        }    }}

先从db中查找对应的redisObject,并且通过addReply方法加入到响应中。那么addReply做了什么事情呢?

static void addReply(redisClient *c, robj *obj) {    if (listLength(c->reply) == 0 &&        (c->replstate == REDIS_REPL_NONE ||         c->replstate == REDIS_REPL_ONLINE) &&        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,        sendReplyToClient, c, NULL) == AE_ERR) return;    if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");    incrRefCount(obj);}

可以看到,在addReply中生成了一个ae可写事件,并注册了名为sendReplyToClient的回调函数。

sendReplyToClient是redisServer解析query,并响应query结果链路中的最后一环:

static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {    redisClient *c = privdata;    int nwritten = 0, totwritten = 0, objlen;    robj *o;    REDIS_NOTUSED(el);    REDIS_NOTUSED(mask);    if (server.glueoutputbuf && listLength(c->reply) > 1)        glueReplyBuffersIfNeeded(c);    while(listLength(c->reply)) {    ...        nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);...    }    if (nwritten == -1) {        if (errno == EAGAIN) {            nwritten = 0;        } else {            redisLog(REDIS_DEBUG,                "Error writing to client: %s", strerror(errno));            freeClient(c);            return;        }    }    if (totwritten > 0) c->lastinteraction = time(NULL);    if (listLength(c->reply) == 0) {        c->sentlen = 0;        aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);    }}

这个函数负责不停的将redisClient.reply中的响应写到客户端。直到没有reply可写之后,调用aeDeleteFileEvent来删除这个事件。(这个事件的生命周期随着一次调用响应结束而结束)。3

3. 总结

看到这里,大家可以发现,redis的核心ae循环处理流程中,始终只有一个主线程在运行。通过事件驱动的方式,处理相应的事件。同时,使用select多路复用来快速响应I/O事件,达到了很高的效率。

参考

  1. redis-1.0 SOURCE-CODE
  2. 系列