mirror of
https://github.com/fluencelabs/redis
synced 2025-05-30 18:51:19 +00:00
Redis 2.6 branch obtained from unstable removing all the cluster related code.
This commit is contained in:
parent
87faf90696
commit
571e257db1
@ -73,7 +73,7 @@ QUIET_CC = @printf ' %b %b\n' $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$@$(ENDCOLOR
|
|||||||
QUIET_LINK = @printf ' %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR);
|
QUIET_LINK = @printf ' %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR);
|
||||||
endif
|
endif
|
||||||
|
|
||||||
OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o
|
OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o migrate.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o
|
||||||
BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o
|
BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o
|
||||||
CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o
|
CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o
|
||||||
CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o
|
CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o
|
||||||
@ -101,8 +101,6 @@ aof.o: aof.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
|
|||||||
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
|
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
|
||||||
bio.o: bio.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
|
bio.o: bio.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
|
||||||
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h bio.h
|
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h bio.h
|
||||||
cluster.o: cluster.c redis.h fmacros.h config.h ae.h sds.h dict.h \
|
|
||||||
adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
|
|
||||||
config.o: config.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
|
config.o: config.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
|
||||||
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
|
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
|
||||||
crc16.o: crc16.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
|
crc16.o: crc16.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
|
||||||
|
1762
src/cluster.c
1762
src/cluster.c
File diff suppressed because it is too large
Load Diff
@ -298,13 +298,6 @@ void loadServerConfigFromString(char *config) {
|
|||||||
err = "Target command name already exists"; goto loaderr;
|
err = "Target command name already exists"; goto loaderr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (!strcasecmp(argv[0],"cluster-enabled") && argc == 2) {
|
|
||||||
if ((server.cluster_enabled = yesnotoi(argv[1])) == -1) {
|
|
||||||
err = "argument must be 'yes' or 'no'"; goto loaderr;
|
|
||||||
}
|
|
||||||
} else if (!strcasecmp(argv[0],"cluster-config-file") && argc == 2) {
|
|
||||||
zfree(server.cluster.configfile);
|
|
||||||
server.cluster.configfile = zstrdup(argv[1]);
|
|
||||||
} else if (!strcasecmp(argv[0],"lua-time-limit") && argc == 2) {
|
} else if (!strcasecmp(argv[0],"lua-time-limit") && argc == 2) {
|
||||||
server.lua_time_limit = strtoll(argv[1],NULL,10);
|
server.lua_time_limit = strtoll(argv[1],NULL,10);
|
||||||
} else if (!strcasecmp(argv[0],"slowlog-log-slower-than") &&
|
} else if (!strcasecmp(argv[0],"slowlog-log-slower-than") &&
|
||||||
|
43
src/db.c
43
src/db.c
@ -86,7 +86,6 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
|
|||||||
int retval = dictAdd(db->dict, copy, val);
|
int retval = dictAdd(db->dict, copy, val);
|
||||||
|
|
||||||
redisAssertWithInfo(NULL,key,retval == REDIS_OK);
|
redisAssertWithInfo(NULL,key,retval == REDIS_OK);
|
||||||
if (server.cluster_enabled) SlotToKeyAdd(key);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Overwrite an existing key with a new value. Incrementing the reference
|
/* Overwrite an existing key with a new value. Incrementing the reference
|
||||||
@ -154,7 +153,6 @@ int dbDelete(redisDb *db, robj *key) {
|
|||||||
* the key, because it is shared with the main dictionary. */
|
* the key, because it is shared with the main dictionary. */
|
||||||
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
|
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
|
||||||
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
|
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
|
||||||
if (server.cluster_enabled) SlotToKeyDel(key);
|
|
||||||
return 1;
|
return 1;
|
||||||
} else {
|
} else {
|
||||||
return 0;
|
return 0;
|
||||||
@ -254,10 +252,6 @@ void existsCommand(redisClient *c) {
|
|||||||
void selectCommand(redisClient *c) {
|
void selectCommand(redisClient *c) {
|
||||||
int id = atoi(c->argv[1]->ptr);
|
int id = atoi(c->argv[1]->ptr);
|
||||||
|
|
||||||
if (server.cluster_enabled && id != 0) {
|
|
||||||
addReplyError(c,"SELECT is not allowed in cluster mode");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (selectDb(c,id) == REDIS_ERR) {
|
if (selectDb(c,id) == REDIS_ERR) {
|
||||||
addReplyError(c,"invalid DB index");
|
addReplyError(c,"invalid DB index");
|
||||||
} else {
|
} else {
|
||||||
@ -398,11 +392,6 @@ void moveCommand(redisClient *c) {
|
|||||||
redisDb *src, *dst;
|
redisDb *src, *dst;
|
||||||
int srcid;
|
int srcid;
|
||||||
|
|
||||||
if (server.cluster_enabled) {
|
|
||||||
addReplyError(c,"MOVE is not allowed in cluster mode");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Obtain source and target DB pointers */
|
/* Obtain source and target DB pointers */
|
||||||
src = c->db;
|
src = c->db;
|
||||||
srcid = c->db->id;
|
srcid = c->db->id;
|
||||||
@ -714,35 +703,3 @@ int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *num
|
|||||||
*numkeys = num;
|
*numkeys = num;
|
||||||
return keys;
|
return keys;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Slot to Key API. This is used by Redis Cluster in order to obtain in
|
|
||||||
* a fast way a key that belongs to a specified hash slot. This is useful
|
|
||||||
* while rehashing the cluster. */
|
|
||||||
void SlotToKeyAdd(robj *key) {
|
|
||||||
unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
|
|
||||||
|
|
||||||
zslInsert(server.cluster.slots_to_keys,hashslot,key);
|
|
||||||
incrRefCount(key);
|
|
||||||
}
|
|
||||||
|
|
||||||
void SlotToKeyDel(robj *key) {
|
|
||||||
unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
|
|
||||||
|
|
||||||
zslDelete(server.cluster.slots_to_keys,hashslot,key);
|
|
||||||
}
|
|
||||||
|
|
||||||
unsigned int GetKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
|
|
||||||
zskiplistNode *n;
|
|
||||||
zrangespec range;
|
|
||||||
int j = 0;
|
|
||||||
|
|
||||||
range.min = range.max = hashslot;
|
|
||||||
range.minex = range.maxex = 0;
|
|
||||||
|
|
||||||
n = zslFirstInRange(server.cluster.slots_to_keys, range);
|
|
||||||
while(n && n->score == hashslot && count--) {
|
|
||||||
keys[j++] = n->obj;
|
|
||||||
n = n->level[0].forward;
|
|
||||||
}
|
|
||||||
return j;
|
|
||||||
}
|
|
||||||
|
190
src/migrate.c
Normal file
190
src/migrate.c
Normal file
@ -0,0 +1,190 @@
|
|||||||
|
#include "redis.h"
|
||||||
|
|
||||||
|
/* -----------------------------------------------------------------------------
|
||||||
|
* RESTORE and MIGRATE commands
|
||||||
|
* -------------------------------------------------------------------------- */
|
||||||
|
|
||||||
|
/* RESTORE key ttl serialized-value */
|
||||||
|
void restoreCommand(redisClient *c) {
|
||||||
|
long ttl;
|
||||||
|
rio payload;
|
||||||
|
int type;
|
||||||
|
robj *obj;
|
||||||
|
|
||||||
|
/* Make sure this key does not already exist here... */
|
||||||
|
if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
|
||||||
|
addReplyError(c,"Target key name is busy.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Check if the TTL value makes sense */
|
||||||
|
if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
|
||||||
|
return;
|
||||||
|
} else if (ttl < 0) {
|
||||||
|
addReplyError(c,"Invalid TTL value, must be >= 0");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
rioInitWithBuffer(&payload,c->argv[3]->ptr);
|
||||||
|
if (((type = rdbLoadObjectType(&payload)) == -1) ||
|
||||||
|
((obj = rdbLoadObject(type,&payload)) == NULL))
|
||||||
|
{
|
||||||
|
addReplyError(c,"Bad data format");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Create the key and set the TTL if any */
|
||||||
|
dbAdd(c->db,c->argv[1],obj);
|
||||||
|
if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
|
||||||
|
signalModifiedKey(c->db,c->argv[1]);
|
||||||
|
addReply(c,shared.ok);
|
||||||
|
server.dirty++;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* MIGRATE host port key dbid timeout */
|
||||||
|
void migrateCommand(redisClient *c) {
|
||||||
|
int fd;
|
||||||
|
long timeout;
|
||||||
|
long dbid;
|
||||||
|
time_t ttl;
|
||||||
|
robj *o;
|
||||||
|
rio cmd, payload;
|
||||||
|
|
||||||
|
/* Sanity check */
|
||||||
|
if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
|
||||||
|
return;
|
||||||
|
if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
|
||||||
|
return;
|
||||||
|
if (timeout <= 0) timeout = 1;
|
||||||
|
|
||||||
|
/* Check if the key is here. If not we reply with success as there is
|
||||||
|
* nothing to migrate (for instance the key expired in the meantime), but
|
||||||
|
* we include such information in the reply string. */
|
||||||
|
if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
|
||||||
|
addReplySds(c,sdsnew("+NOKEY\r\n"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Connect */
|
||||||
|
fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
|
||||||
|
atoi(c->argv[2]->ptr));
|
||||||
|
if (fd == -1) {
|
||||||
|
addReplyErrorFormat(c,"Can't connect to target node: %s",
|
||||||
|
server.neterr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
|
||||||
|
addReplyError(c,"Timeout connecting to the client");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
rioInitWithBuffer(&cmd,sdsempty());
|
||||||
|
redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
|
||||||
|
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
|
||||||
|
redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
|
||||||
|
|
||||||
|
ttl = getExpire(c->db,c->argv[3]);
|
||||||
|
redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
|
||||||
|
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
|
||||||
|
redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
|
||||||
|
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
|
||||||
|
redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
|
||||||
|
|
||||||
|
/* Finally the last argument that is the serailized object payload
|
||||||
|
* in the form: <type><rdb-serialized-object>. */
|
||||||
|
rioInitWithBuffer(&payload,sdsempty());
|
||||||
|
redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
|
||||||
|
redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1);
|
||||||
|
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr)));
|
||||||
|
sdsfree(payload.io.buffer.ptr);
|
||||||
|
|
||||||
|
/* Tranfer the query to the other node in 64K chunks. */
|
||||||
|
{
|
||||||
|
sds buf = cmd.io.buffer.ptr;
|
||||||
|
size_t pos = 0, towrite;
|
||||||
|
int nwritten = 0;
|
||||||
|
|
||||||
|
while ((towrite = sdslen(buf)-pos) > 0) {
|
||||||
|
towrite = (towrite > (64*1024) ? (64*1024) : towrite);
|
||||||
|
nwritten = syncWrite(fd,buf+nwritten,towrite,timeout);
|
||||||
|
if (nwritten != (signed)towrite) goto socket_wr_err;
|
||||||
|
pos += nwritten;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Read back the reply. */
|
||||||
|
{
|
||||||
|
char buf1[1024];
|
||||||
|
char buf2[1024];
|
||||||
|
|
||||||
|
/* Read the two replies */
|
||||||
|
if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
|
||||||
|
goto socket_rd_err;
|
||||||
|
if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
|
||||||
|
goto socket_rd_err;
|
||||||
|
if (buf1[0] == '-' || buf2[0] == '-') {
|
||||||
|
addReplyErrorFormat(c,"Target instance replied with error: %s",
|
||||||
|
(buf1[0] == '-') ? buf1+1 : buf2+1);
|
||||||
|
} else {
|
||||||
|
robj *aux;
|
||||||
|
|
||||||
|
dbDelete(c->db,c->argv[3]);
|
||||||
|
signalModifiedKey(c->db,c->argv[3]);
|
||||||
|
addReply(c,shared.ok);
|
||||||
|
server.dirty++;
|
||||||
|
|
||||||
|
/* Translate MIGRATE as DEL for replication/AOF. */
|
||||||
|
aux = createStringObject("DEL",3);
|
||||||
|
rewriteClientCommandVector(c,2,aux,c->argv[3]);
|
||||||
|
decrRefCount(aux);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sdsfree(cmd.io.buffer.ptr);
|
||||||
|
close(fd);
|
||||||
|
return;
|
||||||
|
|
||||||
|
socket_wr_err:
|
||||||
|
redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
|
||||||
|
strerror(errno));
|
||||||
|
addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
|
||||||
|
strerror(errno));
|
||||||
|
sdsfree(cmd.io.buffer.ptr);
|
||||||
|
close(fd);
|
||||||
|
return;
|
||||||
|
|
||||||
|
socket_rd_err:
|
||||||
|
redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
|
||||||
|
strerror(errno));
|
||||||
|
addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
|
||||||
|
strerror(errno));
|
||||||
|
sdsfree(cmd.io.buffer.ptr);
|
||||||
|
close(fd);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* DUMP keyname
|
||||||
|
* DUMP is actually not used by Redis Cluster but it is the obvious
|
||||||
|
* complement of RESTORE and can be useful for different applications. */
|
||||||
|
void dumpCommand(redisClient *c) {
|
||||||
|
robj *o, *dumpobj;
|
||||||
|
rio payload;
|
||||||
|
|
||||||
|
/* Check if the key is here. */
|
||||||
|
if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
|
||||||
|
addReply(c,shared.nullbulk);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Serialize the object in a RDB-like format. It consist of an object type
|
||||||
|
* byte followed by the serialized object. This is understood by RESTORE. */
|
||||||
|
rioInitWithBuffer(&payload,sdsempty());
|
||||||
|
redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
|
||||||
|
redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o));
|
||||||
|
|
||||||
|
/* Transfer to the client */
|
||||||
|
dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr);
|
||||||
|
addReplyBulk(c,dumpobj);
|
||||||
|
decrRefCount(dumpobj);
|
||||||
|
return;
|
||||||
|
}
|
@ -263,6 +263,5 @@ void punsubscribeCommand(redisClient *c) {
|
|||||||
|
|
||||||
void publishCommand(redisClient *c) {
|
void publishCommand(redisClient *c) {
|
||||||
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
|
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
|
||||||
if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]);
|
|
||||||
addReplyLongLong(c,receivers);
|
addReplyLongLong(c,receivers);
|
||||||
}
|
}
|
||||||
|
53
src/redis.c
53
src/redis.c
@ -232,10 +232,8 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
{"publish",publishCommand,3,"rpf",0,NULL,0,0,0,0,0},
|
{"publish",publishCommand,3,"rpf",0,NULL,0,0,0,0,0},
|
||||||
{"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
|
{"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
|
||||||
{"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
|
{"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
|
||||||
{"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0},
|
|
||||||
{"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0},
|
{"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0},
|
||||||
{"migrate",migrateCommand,6,"aw",0,NULL,0,0,0,0,0},
|
{"migrate",migrateCommand,6,"aw",0,NULL,0,0,0,0,0},
|
||||||
{"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0},
|
|
||||||
{"dump",dumpCommand,2,"ar",0,NULL,1,1,1,0,0},
|
{"dump",dumpCommand,2,"ar",0,NULL,1,1,1,0,0},
|
||||||
{"object",objectCommand,-2,"r",0,NULL,2,2,2,0,0},
|
{"object",objectCommand,-2,"r",0,NULL,2,2,2,0,0},
|
||||||
{"client",clientCommand,-2,"ar",0,NULL,0,0,0,0,0},
|
{"client",clientCommand,-2,"ar",0,NULL,0,0,0,0,0},
|
||||||
@ -507,17 +505,6 @@ dictType keylistDictType = {
|
|||||||
dictListDestructor /* val destructor */
|
dictListDestructor /* val destructor */
|
||||||
};
|
};
|
||||||
|
|
||||||
/* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
|
|
||||||
* clusterNode structures. */
|
|
||||||
dictType clusterNodesDictType = {
|
|
||||||
dictSdsHash, /* hash function */
|
|
||||||
NULL, /* key dup */
|
|
||||||
NULL, /* val dup */
|
|
||||||
dictSdsKeyCompare, /* key compare */
|
|
||||||
dictSdsDestructor, /* key destructor */
|
|
||||||
NULL /* val destructor */
|
|
||||||
};
|
|
||||||
|
|
||||||
int htNeedsResize(dict *dict) {
|
int htNeedsResize(dict *dict) {
|
||||||
long long size, used;
|
long long size, used;
|
||||||
|
|
||||||
@ -792,9 +779,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
* to detect transfer failures. */
|
* to detect transfer failures. */
|
||||||
if (!(loops % 10)) replicationCron();
|
if (!(loops % 10)) replicationCron();
|
||||||
|
|
||||||
/* Run other sub-systems specific cron jobs */
|
|
||||||
if (server.cluster_enabled && !(loops % 10)) clusterCron();
|
|
||||||
|
|
||||||
server.cronloops++;
|
server.cronloops++;
|
||||||
return 100;
|
return 100;
|
||||||
}
|
}
|
||||||
@ -949,8 +933,6 @@ void initServerConfig() {
|
|||||||
server.shutdown_asap = 0;
|
server.shutdown_asap = 0;
|
||||||
server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD;
|
server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD;
|
||||||
server.repl_timeout = REDIS_REPL_TIMEOUT;
|
server.repl_timeout = REDIS_REPL_TIMEOUT;
|
||||||
server.cluster_enabled = 0;
|
|
||||||
server.cluster.configfile = zstrdup("nodes.conf");
|
|
||||||
server.lua_caller = NULL;
|
server.lua_caller = NULL;
|
||||||
server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
|
server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
|
||||||
server.lua_client = NULL;
|
server.lua_client = NULL;
|
||||||
@ -1151,7 +1133,6 @@ void initServer() {
|
|||||||
server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
|
server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (server.cluster_enabled) clusterInit();
|
|
||||||
scriptingInit();
|
scriptingInit();
|
||||||
slowlogInit();
|
slowlogInit();
|
||||||
bioInit();
|
bioInit();
|
||||||
@ -1374,29 +1355,6 @@ int processCommand(redisClient *c) {
|
|||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If cluster is enabled, redirect here */
|
|
||||||
if (server.cluster_enabled &&
|
|
||||||
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) {
|
|
||||||
int hashslot;
|
|
||||||
|
|
||||||
if (server.cluster.state != REDIS_CLUSTER_OK) {
|
|
||||||
addReplyError(c,"The cluster is down. Check with CLUSTER INFO for more information");
|
|
||||||
return REDIS_OK;
|
|
||||||
} else {
|
|
||||||
int ask;
|
|
||||||
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&ask);
|
|
||||||
if (n == NULL) {
|
|
||||||
addReplyError(c,"Multi keys request invalid in cluster");
|
|
||||||
return REDIS_OK;
|
|
||||||
} else if (n != server.cluster.myself) {
|
|
||||||
addReplySds(c,sdscatprintf(sdsempty(),
|
|
||||||
"-%s %d %s:%d\r\n", ask ? "ASK" : "MOVED",
|
|
||||||
hashslot,n->ip,n->port));
|
|
||||||
return REDIS_OK;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Handle the maxmemory directive.
|
/* Handle the maxmemory directive.
|
||||||
*
|
*
|
||||||
* First we try to free some memory if possible (if there are volatile
|
* First we try to free some memory if possible (if there are volatile
|
||||||
@ -1884,15 +1842,6 @@ sds genRedisInfoString(char *section) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Clusetr */
|
|
||||||
if (allsections || defsections || !strcasecmp(section,"cluster")) {
|
|
||||||
if (sections++) info = sdscat(info,"\r\n");
|
|
||||||
info = sdscatprintf(info,
|
|
||||||
"# Cluster\r\n"
|
|
||||||
"cluster_enabled:%d\r\n",
|
|
||||||
server.cluster_enabled);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Key space */
|
/* Key space */
|
||||||
if (allsections || defsections || !strcasecmp(section,"keyspace")) {
|
if (allsections || defsections || !strcasecmp(section,"keyspace")) {
|
||||||
if (sections++) info = sdscat(info,"\r\n");
|
if (sections++) info = sdscat(info,"\r\n");
|
||||||
@ -2172,7 +2121,7 @@ void redisAsciiArt(void) {
|
|||||||
redisGitSHA1(),
|
redisGitSHA1(),
|
||||||
strtol(redisGitDirty(),NULL,10) > 0,
|
strtol(redisGitDirty(),NULL,10) > 0,
|
||||||
(sizeof(long) == 8) ? "64" : "32",
|
(sizeof(long) == 8) ? "64" : "32",
|
||||||
server.cluster_enabled ? "cluster" : "stand alone",
|
"stand alone",
|
||||||
server.port,
|
server.port,
|
||||||
(long) getpid()
|
(long) getpid()
|
||||||
);
|
);
|
||||||
|
148
src/redis.h
148
src/redis.h
@ -425,134 +425,6 @@ typedef struct redisOpArray {
|
|||||||
int numops;
|
int numops;
|
||||||
} redisOpArray;
|
} redisOpArray;
|
||||||
|
|
||||||
/*-----------------------------------------------------------------------------
|
|
||||||
* Redis cluster data structures
|
|
||||||
*----------------------------------------------------------------------------*/
|
|
||||||
|
|
||||||
#define REDIS_CLUSTER_SLOTS 4096
|
|
||||||
#define REDIS_CLUSTER_OK 0 /* Everything looks ok */
|
|
||||||
#define REDIS_CLUSTER_FAIL 1 /* The cluster can't work */
|
|
||||||
#define REDIS_CLUSTER_NEEDHELP 2 /* The cluster works, but needs some help */
|
|
||||||
#define REDIS_CLUSTER_NAMELEN 40 /* sha1 hex length */
|
|
||||||
#define REDIS_CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */
|
|
||||||
|
|
||||||
struct clusterNode;
|
|
||||||
|
|
||||||
/* clusterLink encapsulates everything needed to talk with a remote node. */
|
|
||||||
typedef struct clusterLink {
|
|
||||||
int fd; /* TCP socket file descriptor */
|
|
||||||
sds sndbuf; /* Packet send buffer */
|
|
||||||
sds rcvbuf; /* Packet reception buffer */
|
|
||||||
struct clusterNode *node; /* Node related to this link if any, or NULL */
|
|
||||||
} clusterLink;
|
|
||||||
|
|
||||||
/* Node flags */
|
|
||||||
#define REDIS_NODE_MASTER 1 /* The node is a master */
|
|
||||||
#define REDIS_NODE_SLAVE 2 /* The node is a slave */
|
|
||||||
#define REDIS_NODE_PFAIL 4 /* Failure? Need acknowledge */
|
|
||||||
#define REDIS_NODE_FAIL 8 /* The node is believed to be malfunctioning */
|
|
||||||
#define REDIS_NODE_MYSELF 16 /* This node is myself */
|
|
||||||
#define REDIS_NODE_HANDSHAKE 32 /* We have still to exchange the first ping */
|
|
||||||
#define REDIS_NODE_NOADDR 64 /* We don't know the address of this node */
|
|
||||||
#define REDIS_NODE_MEET 128 /* Send a MEET message to this node */
|
|
||||||
#define REDIS_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
|
|
||||||
|
|
||||||
struct clusterNode {
|
|
||||||
char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
|
|
||||||
int flags; /* REDIS_NODE_... */
|
|
||||||
unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */
|
|
||||||
int numslaves; /* Number of slave nodes, if this is a master */
|
|
||||||
struct clusterNode **slaves; /* pointers to slave nodes */
|
|
||||||
struct clusterNode *slaveof; /* pointer to the master node */
|
|
||||||
time_t ping_sent; /* Unix time we sent latest ping */
|
|
||||||
time_t pong_received; /* Unix time we received the pong */
|
|
||||||
char *configdigest; /* Configuration digest of this node */
|
|
||||||
time_t configdigest_ts; /* Configuration digest timestamp */
|
|
||||||
char ip[16]; /* Latest known IP address of this node */
|
|
||||||
int port; /* Latest known port of this node */
|
|
||||||
clusterLink *link; /* TCP/IP link with this node */
|
|
||||||
};
|
|
||||||
typedef struct clusterNode clusterNode;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
char *configfile;
|
|
||||||
clusterNode *myself; /* This node */
|
|
||||||
int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */
|
|
||||||
int node_timeout;
|
|
||||||
dict *nodes; /* Hash table of name -> clusterNode structures */
|
|
||||||
clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];
|
|
||||||
clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];
|
|
||||||
clusterNode *slots[REDIS_CLUSTER_SLOTS];
|
|
||||||
zskiplist *slots_to_keys;
|
|
||||||
} clusterState;
|
|
||||||
|
|
||||||
/* Redis cluster messages header */
|
|
||||||
|
|
||||||
/* Note that the PING, PONG and MEET messages are actually the same exact
|
|
||||||
* kind of packet. PONG is the reply to ping, in the extact format as a PING,
|
|
||||||
* while MEET is a special PING that forces the receiver to add the sender
|
|
||||||
* as a node (if it is not already in the list). */
|
|
||||||
#define CLUSTERMSG_TYPE_PING 0 /* Ping */
|
|
||||||
#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */
|
|
||||||
#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */
|
|
||||||
#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */
|
|
||||||
#define CLUSTERMSG_TYPE_PUBLISH 4 /* Pub/Sub Publish propatagion */
|
|
||||||
|
|
||||||
/* Initially we don't know our "name", but we'll find it once we connect
|
|
||||||
* to the first node, using the getsockname() function. Then we'll use this
|
|
||||||
* address for all the next messages. */
|
|
||||||
typedef struct {
|
|
||||||
char nodename[REDIS_CLUSTER_NAMELEN];
|
|
||||||
uint32_t ping_sent;
|
|
||||||
uint32_t pong_received;
|
|
||||||
char ip[16]; /* IP address last time it was seen */
|
|
||||||
uint16_t port; /* port last time it was seen */
|
|
||||||
uint16_t flags;
|
|
||||||
uint32_t notused; /* for 64 bit alignment */
|
|
||||||
} clusterMsgDataGossip;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
char nodename[REDIS_CLUSTER_NAMELEN];
|
|
||||||
} clusterMsgDataFail;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
uint32_t channel_len;
|
|
||||||
uint32_t message_len;
|
|
||||||
unsigned char bulk_data[8]; /* defined as 8 just for alignment concerns. */
|
|
||||||
} clusterMsgDataPublish;
|
|
||||||
|
|
||||||
union clusterMsgData {
|
|
||||||
/* PING, MEET and PONG */
|
|
||||||
struct {
|
|
||||||
/* Array of N clusterMsgDataGossip structures */
|
|
||||||
clusterMsgDataGossip gossip[1];
|
|
||||||
} ping;
|
|
||||||
|
|
||||||
/* FAIL */
|
|
||||||
struct {
|
|
||||||
clusterMsgDataFail about;
|
|
||||||
} fail;
|
|
||||||
|
|
||||||
/* PUBLISH */
|
|
||||||
struct {
|
|
||||||
clusterMsgDataPublish msg;
|
|
||||||
} publish;
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
uint32_t totlen; /* Total length of this message */
|
|
||||||
uint16_t type; /* Message type */
|
|
||||||
uint16_t count; /* Only used for some kind of messages. */
|
|
||||||
char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node */
|
|
||||||
unsigned char myslots[REDIS_CLUSTER_SLOTS/8];
|
|
||||||
char slaveof[REDIS_CLUSTER_NAMELEN];
|
|
||||||
char configdigest[32];
|
|
||||||
uint16_t port; /* Sender TCP base port */
|
|
||||||
unsigned char state; /* Cluster state from the POV of the sender */
|
|
||||||
unsigned char notused[5]; /* Reserved for future use. For alignment. */
|
|
||||||
union clusterMsgData data;
|
|
||||||
} clusterMsg;
|
|
||||||
|
|
||||||
/*-----------------------------------------------------------------------------
|
/*-----------------------------------------------------------------------------
|
||||||
* Global server state
|
* Global server state
|
||||||
*----------------------------------------------------------------------------*/
|
*----------------------------------------------------------------------------*/
|
||||||
@ -578,7 +450,6 @@ struct redisServer {
|
|||||||
mode_t unixsocketperm; /* UNIX socket permission */
|
mode_t unixsocketperm; /* UNIX socket permission */
|
||||||
int ipfd; /* TCP socket file descriptor */
|
int ipfd; /* TCP socket file descriptor */
|
||||||
int sofd; /* Unix socket file descriptor */
|
int sofd; /* Unix socket file descriptor */
|
||||||
int cfd; /* Cluster bus lisetning socket */
|
|
||||||
list *clients; /* List of active clients */
|
list *clients; /* List of active clients */
|
||||||
list *clients_to_close; /* Clients to close asynchronously */
|
list *clients_to_close; /* Clients to close asynchronously */
|
||||||
list *slaves, *monitors; /* List of slaves and MONITORs */
|
list *slaves, *monitors; /* List of slaves and MONITORs */
|
||||||
@ -696,9 +567,6 @@ struct redisServer {
|
|||||||
/* Pubsub */
|
/* Pubsub */
|
||||||
dict *pubsub_channels; /* Map channels to list of subscribed clients */
|
dict *pubsub_channels; /* Map channels to list of subscribed clients */
|
||||||
list *pubsub_patterns; /* A list of pubsub_patterns */
|
list *pubsub_patterns; /* A list of pubsub_patterns */
|
||||||
/* Cluster */
|
|
||||||
int cluster_enabled; /* Is cluster enabled? */
|
|
||||||
clusterState cluster; /* State of the cluster */
|
|
||||||
/* Scripting */
|
/* Scripting */
|
||||||
lua_State *lua; /* The Lua interpreter. We use just one for all clients */
|
lua_State *lua; /* The Lua interpreter. We use just one for all clients */
|
||||||
redisClient *lua_client; /* The "fake client" to query Redis from Lua */
|
redisClient *lua_client; /* The "fake client" to query Redis from Lua */
|
||||||
@ -733,8 +601,7 @@ struct redisCommand {
|
|||||||
int arity;
|
int arity;
|
||||||
char *sflags; /* Flags as string represenation, one char per flag. */
|
char *sflags; /* Flags as string represenation, one char per flag. */
|
||||||
int flags; /* The actual flags, obtained from the 'sflags' field. */
|
int flags; /* The actual flags, obtained from the 'sflags' field. */
|
||||||
/* Use a function to determine keys arguments in a command line.
|
/* Use a function to determine keys arguments in a command line. */
|
||||||
* Used for Redis Cluster redirect. */
|
|
||||||
redisGetKeysProc *getkeys_proc;
|
redisGetKeysProc *getkeys_proc;
|
||||||
/* What keys should be loaded in background when calling this command? */
|
/* What keys should be loaded in background when calling this command? */
|
||||||
int firstkey; /* The first argument that's a key (0 = no keys) */
|
int firstkey; /* The first argument that's a key (0 = no keys) */
|
||||||
@ -810,7 +677,6 @@ extern struct redisServer server;
|
|||||||
extern struct sharedObjectsStruct shared;
|
extern struct sharedObjectsStruct shared;
|
||||||
extern dictType setDictType;
|
extern dictType setDictType;
|
||||||
extern dictType zsetDictType;
|
extern dictType zsetDictType;
|
||||||
extern dictType clusterNodesDictType;
|
|
||||||
extern dictType dbDictType;
|
extern dictType dbDictType;
|
||||||
extern double R_Zero, R_PosInf, R_NegInf, R_Nan;
|
extern double R_Zero, R_PosInf, R_NegInf, R_Nan;
|
||||||
dictType hashDictType;
|
dictType hashDictType;
|
||||||
@ -1082,16 +948,6 @@ int *noPreloadGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numke
|
|||||||
int *renameGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
|
int *renameGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
|
||||||
int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
|
int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
|
||||||
|
|
||||||
/* Cluster */
|
|
||||||
void clusterInit(void);
|
|
||||||
unsigned short crc16(const char *buf, int len);
|
|
||||||
unsigned int keyHashSlot(char *key, int keylen);
|
|
||||||
clusterNode *createClusterNode(char *nodename, int flags);
|
|
||||||
int clusterAddNode(clusterNode *node);
|
|
||||||
void clusterCron(void);
|
|
||||||
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
|
|
||||||
void clusterPropagatePublish(robj *channel, robj *message);
|
|
||||||
|
|
||||||
/* Scripting */
|
/* Scripting */
|
||||||
void scriptingInit(void);
|
void scriptingInit(void);
|
||||||
|
|
||||||
@ -1223,10 +1079,8 @@ void punsubscribeCommand(redisClient *c);
|
|||||||
void publishCommand(redisClient *c);
|
void publishCommand(redisClient *c);
|
||||||
void watchCommand(redisClient *c);
|
void watchCommand(redisClient *c);
|
||||||
void unwatchCommand(redisClient *c);
|
void unwatchCommand(redisClient *c);
|
||||||
void clusterCommand(redisClient *c);
|
|
||||||
void restoreCommand(redisClient *c);
|
void restoreCommand(redisClient *c);
|
||||||
void migrateCommand(redisClient *c);
|
void migrateCommand(redisClient *c);
|
||||||
void askingCommand(redisClient *c);
|
|
||||||
void dumpCommand(redisClient *c);
|
void dumpCommand(redisClient *c);
|
||||||
void objectCommand(redisClient *c);
|
void objectCommand(redisClient *c);
|
||||||
void clientCommand(redisClient *c);
|
void clientCommand(redisClient *c);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user