Redis源码分析(三十)--- pubsub发布订阅模式(一)

2015-01-27 10:12:25 · 作者: · 浏览: 44

今天学习了Redis中比较高大上的名词,“发布订阅模式”,发布订阅模式这个词在我最开始接触听说的时候是在JMS(Java Message Service)java消息服务中听说的。这个名次用通俗的一点话说,就是我订阅了这类消息,当只有这类的消息进行广播发送的时候,我才会,其他的消息直接过滤,保证了一个高效的传输效率。下面切入正题,学习一下Redis是如何实现这个发布订阅模式的。先看看里面的简单的API构造;

/*-----------------------------------------------------------------------------
 * Pubsub low level API
 *----------------------------------------------------------------------------*/
void freePubsubPattern(void *p) /* 释放发布订阅的模式 */
int listMatchPubsubPattern(void *a, void *b) /* 发布订阅模式是否匹配 */
int clientSubscriptionsCount(redisClient *c) /* 返回客户端的所订阅的数量,包括channels + patterns管道和模式 */
int pubsubSubscribeChannel(redisClient *c, robj *channel) /* Client订阅一个Channel管道 */
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) /* 取消订阅Client中的Channel */
int pubsubSubscribePattern(redisClient *c, robj *pattern) /* Client客户端订阅一种模式 */
int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) /* Client客户端取消订阅pattern模式 */
int pubsubUnsubscribeAllChannels(redisClient *c, int notify) /* 客户端取消自身订阅的所有Channel */
int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) /* 客户端取消订阅所有的pattern模式 */
int pubsubPublishMessage(robj *channel, robj *message) /* 为所有订阅了Channel的Client发送消息message */

/* ------------PUB/SUB API ---------------- */
void subscribeCommand(redisClient *c) /* 订阅Channel的命令 */
void unsubscribeCommand(redisClient *c) /* 取消订阅Channel的命令 */
void psubscribeCommand(redisClient *c) /* 订阅模式命令 */
void punsubscribeCommand(redisClient *c) /* 取消订阅模式命令 */
void publishCommand(redisClient *c) /* 发布消息命令 */
void pubsubCommand(redisClient *c) /* 发布订阅命令 */
在这里面出现了高频的词Pattern(模式)和Channel(频道,叫管道比较别扭),也就是说,后续所有的关于发布订阅的东东都是基于这2者展开进行的。现在大致讲解一下在Redis中是如何实现此中模式的:

1.在RedisClient 内部维护了一个pubsub_channels的Channel列表,记录了此客户端所订阅的频道

2.在Server服务端,同样维护着一个类似的变量叫做,pubsub_channels,这是一个dict字典变量,每一个Channel对应着一批订阅了此频道的Client,也就是Channel-->list of Clients

3.当一个Client publish一个message的时候,会先去服务端的pubsub_channels找相应的Channel,遍历里面的Client,然后发送通知,即完成了整个发布订阅模式。

我们可以简单的看一下Redis订阅一个Channel的方法实现;

/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was already subscribed to that channel. */
/* Client订阅一个Channel管道 */
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    struct dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    //在Client的字典pubsub_channels中添加Channel
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel -> list of clients hash table */
        //添加Clietn到server中的pubsub_channels,对应的列表中
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
        	//如果此频道的Client列表为空,则创建新列表并添加
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
        	//否则,获取这个频道的客户端列表,在尾部添加新的客户端
            clients = dictGetVal(de);
        }
        listAddNodeTail(clients,c);
    }
    /* Notify the client */
    //添加给回复客户端
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}
添加操作主要分2部,Client自身的内部维护的pubsub_channels的添加,是一个dict字典对象,然后,是server端维护的pubsub_channels中的client列表的添加。在进行Channel频道的删除的时候,也是执行的这2步骤操作: