/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
* 0 if the client was not subscribed to the specified channel. */
/* 取消订阅Client中的Channel */
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
struct dictEntry *de;
list *clients;
listNode *ln;
int retval = 0;
/* Remove the channel from the client -> channels hash table */
incrRefCount(channel); /* channel may be just a pointer to the same object
we have in the hash tables. Protect it... */
//字典删除Client中pubsub_channels中的Channel
if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
retval = 1;
/* Remove the client from the channel -> clients list hash table */
//再移除Channel对应的Client列表
de = dictFind(server.pubsub_channels,channel);
redisAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
redisAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln);
if (listLength(clients) == 0) {
/* Free the list and associated hash entry at all if this was
* the latest client, so that it will be possible to abuse
* Redis PUBSUB creating millions of channels. */
dictDelete(server.pubsub_channels,channel);
}
}
/* Notify the client */
if (notify) {
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.unsubscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
decrRefCount(channel); /* it is finally safe to release it */
return retval;
}
里面还有对应的模式的订阅和取消订阅的操作,原理和channel完全一致,二者的区别在于,pattern是用来匹配的Channel的,这个是什么意思呢。在后面会做出答案,接着看。最后看一个最最核心的方法,客户端发步消息方法:
/* Publish a message */
/* 为所有订阅了Channel的Client发送消息message */
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
struct dictEntry *de;
listNode *ln;
listIter li;
/* Send to clients listening for that channel */
//找到Channel所对应的dictEntry
de = dictFind(server.pubsub_channels,channel);
if (de) {
//获取此Channel对应的客户单列表
list *list = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
//依次取出List中的客户单,添加消息回复
redisClient *c = ln->
value;
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
//添加消息回复
addReplyBulk(c,message);
receivers++;
}
}
/* Send to clients listening to matching channels */
/* 发送给尝试匹配该Channel的客户端消息 */
if (listLength(server.pubsub_patterns)) {
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;
//客户端的模式如果匹配了Channel,也会发送消息
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);
receivers++;
}
}
decrRefCount(channel);
}
return receivers;
}
pattern的作用就在上面体现了,如果某种pattern匹配了Channel频道,则模式的客户端也会接收消息。在server->pubsub_patterns中,pubsub_patterns是一个list列表,里面的每一个pattern只对应一个Client,就是上面的pat->client,这一点和Channel还是有本质的区别的。讲完发布订阅模式的基本操作后,顺便把与此相关的notify通知类也稍稍讲讲,通知只有3个方法,
/* ----------------- API ------------------- */
int keyspaceEventsStringToFlags(char *classes) /* 键值字符类型转为对应的