t(context.Background(), peerClient)
}
// 转发指令给其他客户端,发送指令之前需要先发一下选择的db
func (cluster *clusterDatabase) relay(peer string, c resp.Connection, args [][]byte) resp.Reply {
if peer == cluster.self {
return cluster.db.Exec(c, args)
}
peerClient, err := cluster.getPeerClient(peer)
if err != nil {
return reply.MakeErrReply(err.Error())
}
defer func() {
_ = cluster.returnPeerClient(peer, peerClient)
}()
peerClient.Send(utils.ToCmdLine("SELECT", strconv.Itoa(c.GetDBIndex())))
return peerClient.Send(args)
}
// 指令广播给所有节点
func (cluster *clusterDatabase) broadcast(c resp.Connection, args [][]byte) map[string]resp.Reply {
result := make(map[string]resp.Reply)
for _, node := range cluster.nodes {
relay := cluster.relay(node, c, args)
result[node] = relay
}
return result
}
communication:与其他节点通信。执行模式有本地(自己执行),转发(别人执行),群发(所有节点执行)
getPeerClient :从连接池拿一个连接
returnPeerClient :归还连接
relay :转发指令给其他客户端,发送指令之前需要先发一下选择的db
broadcast :指令广播给所有节点
cluster/router.go
func makeRouter() map[string]CmdFunc {
routerMap := make(map[string]CmdFunc)
routerMap["ping"] = ping
routerMap["del"] = Del
routerMap["exists"] = defaultFunc
routerMap["type"] = defaultFunc
routerMap["rename"] = Rename
routerMap["renamenx"] = Rename
routerMap["set"] = defaultFunc
routerMap["setnx"] = defaultFunc
routerMap["get"] = defaultFunc
routerMap["getset"] = defaultFunc
routerMap["flushdb"] = FlushDB
routerMap["select"] = execSelect
return routerMap
}
func defaultFunc(cluster *clusterDatabase, c resp.Connection, args [][]byte) resp.Reply {
key := string(args[1])
peer := cluster.peerPicker.PickNode(key)
return cluster.relay(peer, c, args)
}
defaultFunc:转发指令的默认实现
cluster/ping.go
func ping(cluster *clusterDatabase, c resp.Connection, cmdAndArgs [][]byte) resp.Reply {
return cluster.db.Exec(c, cmdAndArgs)
}
cluster/rename.go
func Rename(cluster *clusterDatabase, c resp.Connection, args [][]byte) resp.Reply {
if len(args) != 3 {
return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command")
}
src := string(args[1])
dest := string(args[2])
srcPeer := cluster.peerPicker.PickNode(src)
destPeer := cluster.peerPicker.PickNode(dest)
if srcPeer != destPeer {
return reply.MakeErrReply("ERR rename must within one slot in cluster mode")
}
return cluster.relay(srcPeer, c, args)
}
Rename:修改key的name,两个key的hash必须在同一个节点中
cluster/keys.go
// keys指令实现
func FlushDB(cluster *clusterDatabase, c resp.Connection, args [][]byte) resp.Reply {
replies := cluster.broadcast(c, args)
var errReply reply.ErrorReply
for _, v := range replies {
if reply.IsErrorReply(v) {
errReply = v.(reply.ErrorReply)
break
}
}
if errReply == nil {
return &reply.OkReply{}
}
return reply.MakeErrReply("error occurs: " + errReply.Error())
}
cluster/del.go
// del指令实现
func Del(cluster *clusterDatabase, c resp.Connection, args [][]byte) resp.Reply {
replies := cluster.broadcast(c, args)
var errReply reply.ErrorReply
var deleted int64 = 0
for _, v := range replies {
if reply.IsErrorReply(v) {
errReply = v.(reply.ErrorReply)
break
}
intReply, ok := v.(*reply.IntReply)
if !ok {
errReply = reply.MakeErrReply(&quo