Updated to unstable

This commit is contained in:
antirez
2011-06-14 18:06:39 +02:00
169 changed files with 591 additions and 7793 deletions

View File

@ -46,6 +46,11 @@ BINCOLOR="\033[37;1m"
MAKECOLOR="\033[32;1m"
ENDCOLOR="\033[0m"
ifndef V
QUIET_CC = @printf ' %b %b\n' $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$@$(ENDCOLOR);
QUIET_LINK = @printf ' %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR);
endif
OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o dscache.o pubsub.o multi.o debug.o sort.o intset.o syncio.o diskstore.o cluster.o crc16.o endian.o scripting.o
BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o
CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o
@ -132,45 +137,37 @@ zipmap.o: zipmap.c zmalloc.h
zmalloc.o: zmalloc.c config.h
dependencies:
@echo $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)hiredis$(ENDCOLOR)
@printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)hiredis$(ENDCOLOR)
@cd ../deps/hiredis && $(MAKE) static ARCH="$(ARCH)"
@echo $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)linenoise$(ENDCOLOR)
@printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)linenoise$(ENDCOLOR)
@cd ../deps/linenoise && $(MAKE) ARCH="$(ARCH)"
@echo $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)Lua ansi$(ENDCOLOR)
@cd ../deps/lua && $(MAKE) ARCH="$(ARCH)" ansi
redis-server: $(OBJ)
@$(CC) -o $(PRGNAME) $(CCOPT) $(DEBUG) $(OBJ) ../deps/lua/src/liblua.a
@echo $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$(@)$(ENDCOLOR)
$(QUIET_CC)$(CC) -o $(PRGNAME) $(CCOPT) $(DEBUG) $(OBJ) ../deps/lua/src/liblua.a
redis-benchmark: dependencies $(BENCHOBJ)
@cd ../deps/hiredis && $(MAKE) static
@$(CC) -o $(BENCHPRGNAME) $(CCOPT) $(DEBUG) $(BENCHOBJ) ../deps/hiredis/libhiredis.a
@echo $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$(@)$(ENDCOLOR)
$(QUIET_LINK)$(CC) -o $(BENCHPRGNAME) $(CCOPT) $(DEBUG) $(BENCHOBJ) ../deps/hiredis/libhiredis.a
redis-benchmark.o:
@$(CC) -c $(CFLAGS) -I../deps/hiredis $(DEBUG) $(COMPILE_TIME) $<
@echo $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$(<)$(ENDCOLOR)
$(QUIET_CC)$(CC) -c $(CFLAGS) -I../deps/hiredis $(DEBUG) $(COMPILE_TIME) $<
redis-cli: dependencies $(CLIOBJ)
@$(CC) -o $(CLIPRGNAME) $(CCOPT) $(DEBUG) $(CLIOBJ) ../deps/hiredis/libhiredis.a ../deps/linenoise/linenoise.o
@echo $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$(@)$(ENDCOLOR)
$(QUIET_LINK)$(CC) -o $(CLIPRGNAME) $(CCOPT) $(DEBUG) $(CLIOBJ) ../deps/hiredis/libhiredis.a ../deps/linenoise/linenoise.o
redis-cli.o:
@$(CC) -c $(CFLAGS) -I../deps/hiredis -I../deps/linenoise $(DEBUG) $(COMPILE_TIME) $<
@echo $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$(<)$(ENDCOLOR)
$(QUIET_CC)$(CC) -c $(CFLAGS) -I../deps/hiredis -I../deps/linenoise $(DEBUG) $(COMPILE_TIME) $<
redis-check-dump: $(CHECKDUMPOBJ)
@$(CC) -o $(CHECKDUMPPRGNAME) $(CCOPT) $(DEBUG) $(CHECKDUMPOBJ)
@echo $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$(@)$(ENDCOLOR)
$(QUIET_LINK)$(CC) -o $(CHECKDUMPPRGNAME) $(CCOPT) $(DEBUG) $(CHECKDUMPOBJ)
redis-check-aof: $(CHECKAOFOBJ)
@$(CC) -o $(CHECKAOFPRGNAME) $(CCOPT) $(DEBUG) $(CHECKAOFOBJ)
@echo $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$(@)$(ENDCOLOR)
$(QUIET_LINK)$(CC) -o $(CHECKAOFPRGNAME) $(CCOPT) $(DEBUG) $(CHECKAOFOBJ)
.c.o:
@$(CC) -c $(CFLAGS) $(DEBUG) $(COMPILE_TIME) -I../deps/lua/src $<
@echo $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$(<)$(ENDCOLOR)
$(QUIET_CC)$(CC) -c $(CFLAGS) $(DEBUG) $(COMPILE_TIME) -I../deps/lua/src $<
clean:
rm -rf $(PRGNAME) $(BENCHPRGNAME) $(CLIPRGNAME) $(CHECKDUMPPRGNAME) $(CHECKAOFPRGNAME) *.o *.gcda *.gcno *.gcov

View File

@ -8,6 +8,8 @@
#include <sys/resource.h>
#include <sys/wait.h>
void aofUpdateCurrentSize(void);
/* Called when the user switches from "appendonly yes" to "appendonly no"
* at runtime using the CONFIG command. */
void stopAppendOnly(void) {
@ -19,15 +21,15 @@ void stopAppendOnly(void) {
server.appendseldb = -1;
server.appendonly = 0;
/* rewrite operation in progress? kill it, wait child exit */
if (server.bgsavechildpid != -1) {
if (server.bgrewritechildpid != -1) {
int statloc;
if (kill(server.bgsavechildpid,SIGKILL) != -1)
if (kill(server.bgrewritechildpid,SIGKILL) != -1)
wait3(&statloc,0,NULL);
/* reset the buffer accumulating changes while the child saves */
sdsfree(server.bgrewritebuf);
server.bgrewritebuf = sdsempty();
server.bgsavechildpid = -1;
server.bgrewritechildpid = -1;
}
}
@ -82,6 +84,7 @@ void flushAppendOnlyFile(void) {
}
sdsfree(server.aofbuf);
server.aofbuf = sdsempty();
server.appendonly_current_size += nwritten;
/* Don't Fsync if no-appendfsync-on-rewrite is set to yes and we have
* childs performing heavy I/O on disk. */
@ -221,6 +224,7 @@ int loadAppendOnlyFile(char *filename) {
long loops = 0;
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.appendonly_current_size = 0;
fclose(fp);
return REDIS_ERR;
}
@ -299,6 +303,8 @@ int loadAppendOnlyFile(char *filename) {
freeFakeClient(fakeClient);
server.appendonly = appendonly;
stopLoading();
aofUpdateCurrentSize();
server.auto_aofrewrite_base_size = server.appendonly_current_size;
return REDIS_OK;
readerr:
@ -565,16 +571,18 @@ werr:
*/
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;
if (server.bgrewritechildpid != -1) return REDIS_ERR;
if (server.ds_enabled != 0) {
redisLog(REDIS_WARNING,"BGREWRITEAOF called with diskstore enabled: AOF is not supported when diskstore is enabled. Operation not performed.");
return REDIS_ERR;
}
start = ustime();
if ((childpid = fork()) == 0) {
/* Child */
char tmpfile[256];
/* Child */
if (server.ipfd > 0) close(server.ipfd);
if (server.sofd > 0) close(server.sofd);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
@ -585,6 +593,7 @@ int rewriteAppendOnlyFileBackground(void) {
}
} else {
/* Parent */
server.stat_fork_time = ustime()-start;
if (childpid == -1) {
redisLog(REDIS_WARNING,
"Can't rewrite append only file in background: fork: %s",
@ -608,9 +617,10 @@ int rewriteAppendOnlyFileBackground(void) {
void bgrewriteaofCommand(redisClient *c) {
if (server.bgrewritechildpid != -1) {
addReplyError(c,"Background append only file rewriting already in progress");
return;
}
if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
} else if (server.bgsavechildpid != -1) {
server.aofrewrite_scheduled = 1;
addReplyStatus(c,"Background append only file rewriting scheduled");
} else if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
addReplyStatus(c,"Background append only file rewriting started");
} else {
addReply(c,shared.err);
@ -624,6 +634,21 @@ void aofRemoveTempFile(pid_t childpid) {
unlink(tmpfile);
}
/* Update the server.appendonly_current_size filed explicitly using stat(2)
* to check the size of the file. This is useful after a rewrite or after
* a restart, normally the size is updated just adding the write length
* to the current lenght, that is much faster. */
void aofUpdateCurrentSize(void) {
struct redis_stat sb;
if (redis_fstat(server.appendfd,&sb) == -1) {
redisLog(REDIS_WARNING,"Unable to check the AOF length: %s",
strerror(errno));
} else {
server.appendonly_current_size = sb.st_size;
}
}
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
* Handle this. */
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
@ -664,6 +689,8 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
if (server.appendfsync != APPENDFSYNC_NO) aof_fsync(fd);
server.appendseldb = -1; /* Make sure it will issue SELECT */
redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
aofUpdateCurrentSize();
server.auto_aofrewrite_base_size = server.appendonly_current_size;
} else {
/* If append only is disabled we just generate a dump in this
* format. Why not? */

View File

@ -231,6 +231,18 @@ void loadServerConfig(char *filename) {
err = "argument must be 'no', 'always' or 'everysec'";
goto loaderr;
}
} else if (!strcasecmp(argv[0],"auto-aof-rewrite-percentage") &&
argc == 2)
{
server.auto_aofrewrite_perc = atoi(argv[1]);
if (server.auto_aofrewrite_perc < 0) {
err = "Invalid negative percentage for AOF auto rewrite";
goto loaderr;
}
} else if (!strcasecmp(argv[0],"auto-aof-rewrite-min-size") &&
argc == 2)
{
server.auto_aofrewrite_min_size = memtoll(argv[1],NULL);
} else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
server.requirepass = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
@ -397,6 +409,12 @@ void configSetCommand(redisClient *c) {
}
}
}
} else if (!strcasecmp(c->argv[2]->ptr,"auto-aof-rewrite-percentage")) {
if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt;
server.auto_aofrewrite_perc = ll;
} else if (!strcasecmp(c->argv[2]->ptr,"auto-aof-rewrite-min-size")) {
if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt;
server.auto_aofrewrite_min_size = ll;
} else if (!strcasecmp(c->argv[2]->ptr,"save")) {
int vlen, j;
sds *v = sdssplitlen(o->ptr,sdslen(o->ptr)," ",1,&vlen);
@ -586,6 +604,16 @@ void configGetCommand(redisClient *c) {
sdsfree(buf);
matches++;
}
if (stringmatch(pattern,"auto-aof-rewrite-percentage",0)) {
addReplyBulkCString(c,"auto-aof-rewrite-percentage");
addReplyBulkLongLong(c,server.auto_aofrewrite_perc);
matches++;
}
if (stringmatch(pattern,"auto-aof-rewrite-min-size",0)) {
addReplyBulkCString(c,"auto-aof-rewrite-min-size");
addReplyBulkLongLong(c,server.auto_aofrewrite_min_size);
matches++;
}
if (stringmatch(pattern,"slave-serve-stale-data",0)) {
addReplyBulkCString(c,"slave-serve-stale-data");
addReplyBulkCString(c,server.repl_serve_stale_data ? "yes" : "no");

View File

@ -180,7 +180,7 @@ void decrRefCount(void *obj) {
robj *o = obj;
if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
if (--(o->refcount) == 0) {
if (o->refcount == 1) {
switch(o->type) {
case REDIS_STRING: freeStringObject(o); break;
case REDIS_LIST: freeListObject(o); break;
@ -189,8 +189,9 @@ void decrRefCount(void *obj) {
case REDIS_HASH: freeHashObject(o); break;
default: redisPanic("Unknown object type"); break;
}
o->ptr = NULL; /* defensive programming. We'll see NULL in traces. */
zfree(o);
} else {
o->refcount--;
}
}

View File

@ -482,6 +482,7 @@ werr:
int rdbSaveBackground(char *filename) {
pid_t childpid;
long long start;
if (server.bgsavechildpid != -1 ||
server.bgsavethread != (pthread_t) -1) return REDIS_ERR;
@ -493,6 +494,7 @@ int rdbSaveBackground(char *filename) {
return dsRdbSaveBackground(filename);
}
start = ustime();
if ((childpid = fork()) == 0) {
int retval;
@ -503,6 +505,7 @@ int rdbSaveBackground(char *filename) {
_exit((retval == REDIS_OK) ? 0 : 1);
} else {
/* Parent */
server.stat_fork_time = ustime()-start;
if (childpid == -1) {
redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
strerror(errno));
@ -1035,9 +1038,9 @@ void saveCommand(redisClient *c) {
void bgsaveCommand(redisClient *c) {
if (server.bgsavechildpid != -1 || server.bgsavethread != (pthread_t)-1) {
addReplyError(c,"Background save already in progress");
return;
}
if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
} else if (server.bgrewritechildpid != -1) {
addReplyError(c,"Can't BGSAVE while AOF log rewriting is in progress");
} else if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
addReplyStatus(c,"Background saving started");
} else {
addReply(c,shared.err);

View File

@ -48,7 +48,10 @@
#define REDIS_NOTUSED(V) ((void) V)
static struct config {
int debug;
aeEventLoop *el;
const char *hostip;
int hostport;
const char *hostsocket;
int numclients;
int requests;
int liveclients;
@ -57,15 +60,11 @@ static struct config {
int datasize;
int randomkeys;
int randomkeys_keyspacelen;
aeEventLoop *el;
char *hostip;
int hostport;
char *hostsocket;
int keepalive;
long long start;
long long totlatency;
long long *latency;
char *title;
const char *title;
list *clients;
int quiet;
int loop;
@ -227,7 +226,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
}
}
static client createClient(char *cmd, int len) {
static client createClient(const char *cmd, size_t len) {
client c = zmalloc(sizeof(struct _client));
if (config.hostsocket == NULL) {
c->context = redisConnectNonBlock(config.hostip,config.hostport);
@ -311,7 +310,7 @@ static void showLatencyReport(void) {
}
}
static void benchmark(char *title, char *cmd, int len) {
static void benchmark(const char *title, const char *cmd, int len) {
client c;
config.title = title;
@ -328,73 +327,87 @@ static void benchmark(char *title, char *cmd, int len) {
freeAllClients();
}
void parseOptions(int argc, char **argv) {
/* Returns number of consumed options. */
int parseOptions(int argc, const char **argv) {
int i;
int lastarg;
int exit_status = 1;
for (i = 1; i < argc; i++) {
int lastarg = i==argc-1;
if (!strcmp(argv[i],"-c") && !lastarg) {
config.numclients = atoi(argv[i+1]);
i++;
} else if (!strcmp(argv[i],"-n") && !lastarg) {
config.requests = atoi(argv[i+1]);
i++;
} else if (!strcmp(argv[i],"-k") && !lastarg) {
config.keepalive = atoi(argv[i+1]);
i++;
} else if (!strcmp(argv[i],"-h") && !lastarg) {
config.hostip = argv[i+1];
i++;
} else if (!strcmp(argv[i],"-p") && !lastarg) {
config.hostport = atoi(argv[i+1]);
i++;
} else if (!strcmp(argv[i],"-s") && !lastarg) {
config.hostsocket = argv[i+1];
i++;
} else if (!strcmp(argv[i],"-d") && !lastarg) {
config.datasize = atoi(argv[i+1]);
i++;
lastarg = (i == (argc-1));
if (!strcmp(argv[i],"-c")) {
if (lastarg) goto invalid;
config.numclients = atoi(argv[++i]);
} else if (!strcmp(argv[i],"-n")) {
if (lastarg) goto invalid;
config.requests = atoi(argv[++i]);
} else if (!strcmp(argv[i],"-k")) {
if (lastarg) goto invalid;
config.keepalive = atoi(argv[++i]);
} else if (!strcmp(argv[i],"-h")) {
if (lastarg) goto invalid;
config.hostip = strdup(argv[++i]);
} else if (!strcmp(argv[i],"-p")) {
if (lastarg) goto invalid;
config.hostport = atoi(argv[++i]);
} else if (!strcmp(argv[i],"-s")) {
if (lastarg) goto invalid;
config.hostsocket = strdup(argv[++i]);
} else if (!strcmp(argv[i],"-d")) {
if (lastarg) goto invalid;
config.datasize = atoi(argv[++i]);
if (config.datasize < 1) config.datasize=1;
if (config.datasize > 1024*1024) config.datasize = 1024*1024;
} else if (!strcmp(argv[i],"-r") && !lastarg) {
} else if (!strcmp(argv[i],"-r")) {
if (lastarg) goto invalid;
config.randomkeys = 1;
config.randomkeys_keyspacelen = atoi(argv[i+1]);
config.randomkeys_keyspacelen = atoi(argv[++i]);
if (config.randomkeys_keyspacelen < 0)
config.randomkeys_keyspacelen = 0;
i++;
} else if (!strcmp(argv[i],"-q")) {
config.quiet = 1;
} else if (!strcmp(argv[i],"-l")) {
config.loop = 1;
} else if (!strcmp(argv[i],"-D")) {
config.debug = 1;
} else if (!strcmp(argv[i],"-I")) {
config.idlemode = 1;
} else if (!strcmp(argv[i],"--help")) {
exit_status = 0;
goto usage;
} else {
printf("Wrong option '%s' or option argument missing\n\n",argv[i]);
printf("Usage: redis-benchmark [-h <host>] [-p <port>] [-c <clients>] [-n <requests]> [-k <boolean>]\n\n");
printf(" -h <hostname> Server hostname (default 127.0.0.1)\n");
printf(" -p <port> Server port (default 6379)\n");
printf(" -s <socket> Server socket (overrides host and port)\n");
printf(" -c <clients> Number of parallel connections (default 50)\n");
printf(" -n <requests> Total number of requests (default 10000)\n");
printf(" -d <size> Data size of SET/GET value in bytes (default 2)\n");
printf(" -k <boolean> 1=keep alive 0=reconnect (default 1)\n");
printf(" -r <keyspacelen> Use random keys for SET/GET/INCR, random values for SADD\n");
printf(" Using this option the benchmark will get/set keys\n");
printf(" in the form mykey_rand000000012456 instead of constant\n");
printf(" keys, the <keyspacelen> argument determines the max\n");
printf(" number of values for the random number. For instance\n");
printf(" if set to 10 only rand000000000000 - rand000000000009\n");
printf(" range will be allowed.\n");
printf(" -q Quiet. Just show query/sec values\n");
printf(" -l Loop. Run the tests forever\n");
printf(" -I Idle mode. Just open N idle connections and wait.\n");
printf(" -D Debug mode. more verbose.\n");
exit(1);
/* Assume the user meant to provide an option when the arg starts
* with a dash. We're done otherwise and should use the remainder
* as the command and arguments for running the benchmark. */
if (argv[i][0] == '-') goto invalid;
return i;
}
}
return i;
invalid:
printf("Invalid option \"%s\" or option argument missing\n\n",argv[i]);
usage:
printf("Usage: redis-benchmark [-h <host>] [-p <port>] [-c <clients>] [-n <requests]> [-k <boolean>]\n\n");
printf(" -h <hostname> Server hostname (default 127.0.0.1)\n");
printf(" -p <port> Server port (default 6379)\n");
printf(" -s <socket> Server socket (overrides host and port)\n");
printf(" -c <clients> Number of parallel connections (default 50)\n");
printf(" -n <requests> Total number of requests (default 10000)\n");
printf(" -d <size> Data size of SET/GET value in bytes (default 2)\n");
printf(" -k <boolean> 1=keep alive 0=reconnect (default 1)\n");
printf(" -r <keyspacelen> Use random keys for SET/GET/INCR, random values for SADD\n");
printf(" Using this option the benchmark will get/set keys\n");
printf(" in the form mykey_rand000000012456 instead of constant\n");
printf(" keys, the <keyspacelen> argument determines the max\n");
printf(" number of values for the random number. For instance\n");
printf(" if set to 10 only rand000000000000 - rand000000000009\n");
printf(" range will be allowed.\n");
printf(" -q Quiet. Just show query/sec values\n");
printf(" -l Loop. Run the tests forever\n");
printf(" -I Idle mode. Just open N idle connections and wait.\n");
exit(exit_status);
}
int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData) {
@ -409,14 +422,16 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData
return 250; /* every 250ms */
}
int main(int argc, char **argv) {
int main(int argc, const char **argv) {
int i;
char *data, *cmd;
int len;
client c;
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
config.debug = 0;
config.numclients = 50;
config.requests = 10000;
config.liveclients = 0;
@ -436,7 +451,10 @@ int main(int argc, char **argv) {
config.hostport = 6379;
config.hostsocket = NULL;
parseOptions(argc,argv);
i = parseOptions(argc,argv);
argc -= i;
argv += i;
config.latency = zmalloc(sizeof(long long)*config.requests);
if (config.keepalive == 0) {
@ -451,10 +469,25 @@ int main(int argc, char **argv) {
/* and will wait for every */
}
do {
char *data, *cmd;
int len;
/* Run benchmark with command in the remainder of the arguments. */
if (argc) {
sds title = sdsnew(argv[0]);
for (i = 1; i < argc; i++) {
title = sdscatlen(title, " ", 1);
title = sdscatlen(title, (char*)argv[i], strlen(argv[i]));
}
do {
len = redisFormatCommandArgv(&cmd,argc,argv,NULL);
benchmark(title,cmd,len);
free(cmd);
} while(config.loop);
return 0;
}
/* Run default benchmark suite. */
do {
data = zmalloc(config.datasize+1);
memset(data,'x',config.datasize);
data[config.datasize] = '\0';

View File

@ -55,6 +55,7 @@ static struct config {
int hostport;
char *hostsocket;
long repeat;
long interval;
int dbnum;
int interactive;
int shutdown;
@ -87,9 +88,11 @@ static long long mstime(void) {
static void cliRefreshPrompt(void) {
if (config.dbnum == 0)
snprintf(config.prompt,sizeof(config.prompt),"redis> ");
snprintf(config.prompt,sizeof(config.prompt),"redis %s:%d> ",
config.hostip, config.hostport);
else
snprintf(config.prompt,sizeof(config.prompt),"redis:%d> ",config.dbnum);
snprintf(config.prompt,sizeof(config.prompt),"redis %s:%d[%d]> ",
config.hostip, config.hostport, config.dbnum);
}
/*------------------------------------------------------------------------------
@ -314,10 +317,9 @@ static int cliConnect(int force) {
return REDIS_OK;
}
static void cliPrintContextErrorAndExit() {
static void cliPrintContextError() {
if (context == NULL) return;
fprintf(stderr,"Error: %s\n",context->errstr);
exit(1);
}
static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
@ -436,7 +438,8 @@ static int cliReadReply(int output_raw_strings) {
if (context->err == REDIS_ERR_EOF)
return REDIS_ERR;
}
cliPrintContextErrorAndExit();
cliPrintContextError();
exit(1);
return REDIS_ERR; /* avoid compiler warning */
}
@ -462,10 +465,7 @@ static int cliSendCommand(int argc, char **argv, int repeat) {
size_t *argvlen;
int j, output_raw;
if (context == NULL) {
printf("Not connected, please use: connect <host> <port>\n");
return REDIS_OK;
}
if (context == NULL) return REDIS_ERR;
output_raw = 0;
if (!strcasecmp(command,"info") ||
@ -518,6 +518,8 @@ static int cliSendCommand(int argc, char **argv, int repeat) {
cliRefreshPrompt();
}
}
if (config.interval) usleep(config.interval);
fflush(stdout); /* Make it grep friendly */
}
free(argvlen);
@ -553,6 +555,10 @@ static int parseOptions(int argc, char **argv) {
} else if (!strcmp(argv[i],"-r") && !lastarg) {
config.repeat = strtoll(argv[i+1],NULL,10);
i++;
} else if (!strcmp(argv[i],"-i") && !lastarg) {
double seconds = atof(argv[i+1]);
config.interval = seconds*1000000;
i++;
} else if (!strcmp(argv[i],"-n") && !lastarg) {
config.dbnum = atoi(argv[i+1]);
i++;
@ -605,6 +611,8 @@ static void usage() {
" -s <socket> Server socket (overrides hostname and port)\n"
" -a <password> Password to use when connecting to the server\n"
" -r <repeat> Execute specified command N times\n"
" -i <interval> When -r is used, waits <interval> seconds per command.\n"
" It is possible to specify sub-second times like -i 0.1.\n"
" -n <db> Database number\n"
" -x Read last argument from STDIN\n"
" -d <delimiter> Multi-bulk delimiter in for raw formatting (default: \\n)\n"
@ -616,6 +624,7 @@ static void usage() {
" cat /etc/passwd | redis-cli -x set mypasswd\n"
" redis-cli get mypasswd\n"
" redis-cli -r 100 lpush mylist x\n"
" redis-cli -r 100 -i 1 info | grep used_memory_human:\n"
"\n"
"When no command is given, redis-cli starts in interactive mode.\n"
"Type \"help\" in interactive mode for information on available commands.\n"
@ -681,14 +690,25 @@ static void repl() {
linenoiseClearScreen();
} else {
long long start_time = mstime(), elapsed;
int repeat, skipargs = 0;
if (cliSendCommand(argc,argv,1) != REDIS_OK) {
repeat = atoi(argv[0]);
if (repeat) {
skipargs = 1;
} else {
repeat = 1;
}
if (cliSendCommand(argc-skipargs,argv+skipargs,repeat)
!= REDIS_OK)
{
cliConnect(1);
/* If we still cannot send the command,
* print error and abort. */
if (cliSendCommand(argc,argv,1) != REDIS_OK)
cliPrintContextErrorAndExit();
/* If we still cannot send the command print error.
* We'll try to reconnect the next time. */
if (cliSendCommand(argc-skipargs,argv+skipargs,repeat)
!= REDIS_OK)
cliPrintContextError();
}
elapsed = mstime()-start_time;
if (elapsed >= 500) {
@ -726,6 +746,7 @@ int main(int argc, char **argv) {
config.hostport = 6379;
config.hostsocket = NULL;
config.repeat = 1;
config.interval = 0;
config.dbnum = 0;
config.interactive = 0;
config.shutdown = 0;
@ -741,11 +762,15 @@ int main(int argc, char **argv) {
argc -= firstarg;
argv += firstarg;
/* Try to connect */
if (cliConnect(0) != REDIS_OK) exit(1);
/* Start interactive mode when no command is provided */
if (argc == 0) repl();
if (argc == 0) {
/* Note that in repl mode we don't abort on connection error.
* A new attempt will be performed for every command send. */
cliConnect(0);
repl();
}
/* Otherwise, we have some arguments to execute */
if (cliConnect(0) != REDIS_OK) exit(1);
return noninteractive(argc,convertToSds(argc,argv));
}

View File

@ -116,9 +116,9 @@ struct redisCommand redisCommandTable[] = {
{"sdiff",sdiffCommand,-2,REDIS_CMD_DENYOOM,NULL,1,-1,1,0,0},
{"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_DENYOOM,NULL,2,-1,1,0,0},
{"smembers",sinterCommand,2,0,NULL,1,1,1,0,0},
{"zadd",zaddCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
{"zadd",zaddCommand,-4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
{"zincrby",zincrbyCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
{"zrem",zremCommand,3,0,NULL,1,1,1,0,0},
{"zrem",zremCommand,-3,0,NULL,1,1,1,0,0},
{"zremrangebyscore",zremrangebyscoreCommand,4,0,NULL,1,1,1,0,0},
{"zremrangebyrank",zremrangebyrankCommand,4,0,NULL,1,1,1,0,0},
{"zunionstore",zunionstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterGetKeys,0,0,0,0,0},
@ -635,6 +635,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
closeTimedoutClients();
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1 &&
server.aofrewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
/* Check if a background saving or AOF rewrite in progress terminated. */
if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
int statloc;
@ -669,9 +677,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
}
} else if (!server.ds_enabled) {
/* If there is not a background saving in progress check if
* we have to save now */
time_t now = time(NULL);
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now */
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
@ -683,6 +692,21 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
break;
}
}
/* Trigger an AOF rewrite if needed */
if (server.bgsavechildpid == -1 &&
server.bgrewritechildpid == -1 &&
server.auto_aofrewrite_perc &&
server.appendonly_current_size > server.auto_aofrewrite_min_size)
{
int base = server.auto_aofrewrite_base_size ?
server.auto_aofrewrite_base_size : 1;
long long growth = (server.appendonly_current_size*100/base) - 100;
if (growth >= server.auto_aofrewrite_perc) {
redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}
/* Expire a few keys per cycle, only if this is a master.
@ -832,6 +856,10 @@ void initServerConfig() {
server.appendonly = 0;
server.appendfsync = APPENDFSYNC_EVERYSEC;
server.no_appendfsync_on_rewrite = 0;
server.auto_aofrewrite_perc = REDIS_AUTO_AOFREWRITE_PERC;
server.auto_aofrewrite_min_size = REDIS_AUTO_AOFREWRITE_MIN_SIZE;
server.auto_aofrewrite_base_size = 0;
server.aofrewrite_scheduled = 0;
server.lastfsync = time(NULL);
server.appendfd = -1;
server.appendseldb = -1; /* Make sure the first time will not match */
@ -876,6 +904,7 @@ void initServerConfig() {
server.masterport = 6379;
server.master = NULL;
server.replstate = REDIS_REPL_NONE;
server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = 1;
/* Double constants initialization */
@ -968,6 +997,7 @@ void initServer() {
server.stat_keyspace_misses = 0;
server.stat_keyspace_hits = 0;
server.stat_peak_memory = 0;
server.stat_fork_time = 0;
server.unixtime = time(NULL);
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
@ -1373,6 +1403,16 @@ sds genRedisInfoString(char *section) {
server.lastsave,
server.bgrewritechildpid != -1);
if (server.appendonly) {
info = sdscatprintf(info,
"aof_current_size:%lld\r\n"
"aof_base_size:%lld\r\n"
"aof_pending_rewrite:%d\r\n",
(long long) server.appendonly_current_size,
(long long) server.auto_aofrewrite_base_size,
server.aofrewrite_scheduled);
}
if (server.loading) {
double perc;
time_t eta, elapsed;
@ -1446,7 +1486,8 @@ sds genRedisInfoString(char *section) {
"keyspace_hits:%lld\r\n"
"keyspace_misses:%lld\r\n"
"pubsub_channels:%ld\r\n"
"pubsub_patterns:%u\r\n",
"pubsub_patterns:%u\r\n"
"latest_fork_usec:%lld\r\n",
server.stat_numconnections,
server.stat_numcommands,
server.stat_expiredkeys,
@ -1454,7 +1495,8 @@ sds genRedisInfoString(char *section) {
server.stat_keyspace_hits,
server.stat_keyspace_misses,
dictSize(server.pubsub_channels),
listLength(server.pubsub_patterns));
listLength(server.pubsub_patterns),
server.stat_fork_time);
}
/* Replication */

View File

@ -42,7 +42,6 @@
#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
#define REDIS_IOBUF_LEN 1024
#define REDIS_LOADBUF_LEN 1024
#define REDIS_STATIC_ARGS 8
#define REDIS_DEFAULT_DBNUM 16
#define REDIS_CONFIGLINE_MAX 1024
#define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
@ -52,6 +51,8 @@
#define REDIS_SHARED_INTEGERS 10000
#define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */
#define REDIS_MAX_LOGMSG_LEN 1024 /* Default maximum length of syslog messages */
#define REDIS_AUTO_AOFREWRITE_PERC 100
#define REDIS_AUTO_AOFREWRITE_MIN_SIZE (1024*1024)
/* Hash table parameters */
#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
@ -155,10 +156,14 @@
#define REDIS_REQ_MULTIBULK 2
/* Slave replication state - slave side */
#define REDIS_REPL_NONE 0 /* No active replication */
#define REDIS_REPL_CONNECT 1 /* Must connect to master */
#define REDIS_REPL_TRANSFER 2 /* Receiving .rdb from master */
#define REDIS_REPL_CONNECTED 3 /* Connected to master */
#define REDIS_REPL_NONE 0 /* No active replication */
#define REDIS_REPL_CONNECT 1 /* Must connect to master */
#define REDIS_REPL_CONNECTING 2 /* Connecting to master */
#define REDIS_REPL_TRANSFER 3 /* Receiving .rdb from master */
#define REDIS_REPL_CONNECTED 4 /* Connected to master */
/* Synchronous read timeout - slave side */
#define REDIS_REPL_SYNCIO_TIMEOUT 5
/* Slave replication state - from the point of view of master
* Note that in SEND_BULK and ONLINE state the slave receives new updates
@ -548,6 +553,7 @@ struct redisServer {
long long stat_keyspace_hits; /* number of successful lookups of keys */
long long stat_keyspace_misses; /* number of failed lookups of keys */
size_t stat_peak_memory; /* max used memory record */
long long stat_fork_time; /* time needed to perform latets fork() */
/* Configuration */
int verbosity;
int maxidletime;
@ -556,6 +562,11 @@ struct redisServer {
int appendonly;
int appendfsync;
int no_appendfsync_on_rewrite;
int auto_aofrewrite_perc; /* Rewrite AOF if % growth is > M and... */
off_t auto_aofrewrite_min_size; /* the AOF file is at least N bytes. */
off_t auto_aofrewrite_base_size;/* AOF size on latest startup or rewrite. */
off_t appendonly_current_size; /* AOF current size. */
int aofrewrite_scheduled; /* Rewrite once BGSAVE terminates. */
int shutdown_asap;
int activerehashing;
char *requirepass;
@ -590,6 +601,7 @@ struct redisServer {
char *masterhost;
int masterport;
redisClient *master; /* client that is master for this slave */
int repl_syncio_timeout; /* timeout for synchronous I/O calls */
int replstate; /* replication status if the instance is a slave */
off_t repl_transfer_left; /* bytes left reading .rdb */
int repl_transfer_s; /* slave -> master SYNC socket */
@ -898,7 +910,6 @@ int fwriteBulkCount(FILE *fp, char prefix, int count);
/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc);
int syncWithMaster(void);
void updateSlavesWaitingBgsave(int bgsaveerr);
void replicationCron(void);

View File

@ -10,38 +10,8 @@
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int outc = 0, j;
robj **outv;
/* We need 1+(ARGS*3) objects since commands are using the new protocol
* and we one 1 object for the first "*<count>\r\n" multibulk count, then
* for every additional object we have "$<count>\r\n" + object + "\r\n". */
robj *static_outv[REDIS_STATIC_ARGS*3+1];
robj *lenobj;
int j;
if (argc <= REDIS_STATIC_ARGS) {
outv = static_outv;
} else {
outv = zmalloc(sizeof(robj*)*(argc*3+1));
}
lenobj = createObject(REDIS_STRING,
sdscatprintf(sdsempty(), "*%d\r\n", argc));
lenobj->refcount = 0;
outv[outc++] = lenobj;
for (j = 0; j < argc; j++) {
lenobj = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),"$%lu\r\n",
(unsigned long) stringObjectLen(argv[j])));
lenobj->refcount = 0;
outv[outc++] = lenobj;
outv[outc++] = argv[j];
outv[outc++] = shared.crlf;
}
/* Increment all the refcounts at start and decrement at end in order to
* be sure to free objects if there is no slave in a replication state
* able to be feed with commands */
for (j = 0; j < outc; j++) incrRefCount(outv[j]);
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
@ -49,7 +19,9 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
/* Feed all the other slaves, MONITORs and so on */
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the intial SYNC completes),
* or are already in sync with the master. */
if (slave->slaveseldb != dictid) {
robj *selectcmd;
@ -73,10 +45,9 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
addReply(slave,selectcmd);
slave->slaveseldb = dictid;
}
for (j = 0; j < outc; j++) addReply(slave,outv[j]);
addReplyMultiBulkLen(slave,argc);
for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]);
}
for (j = 0; j < outc; j++) decrRefCount(outv[j]);
if (outv != static_outv) zfree(outv);
}
void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc) {
@ -315,19 +286,18 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
/* If repl_transfer_left == -1 we still have to read the bulk length
* from the master reply. */
if (server.repl_transfer_left == -1) {
if (syncReadLine(fd,buf,1024,3600) == -1) {
if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout) == -1) {
redisLog(REDIS_WARNING,
"I/O error reading bulk count from MASTER: %s",
strerror(errno));
replicationAbortSyncTransfer();
return;
goto error;
}
if (buf[0] == '-') {
redisLog(REDIS_WARNING,
"MASTER aborted replication with an error: %s",
buf+1);
replicationAbortSyncTransfer();
return;
goto error;
} else if (buf[0] == '\0') {
/* At this stage just a newline works as a PING in order to take
* the connection live. So we refresh our last interaction
@ -336,8 +306,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
return;
} else if (buf[0] != '$') {
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
replicationAbortSyncTransfer();
return;
goto error;
}
server.repl_transfer_left = strtol(buf+1,NULL,10);
redisLog(REDIS_NOTICE,
@ -359,8 +328,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
server.repl_transfer_lastio = time(NULL);
if (write(server.repl_transfer_fd,buf,nread) != nread) {
redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
replicationAbortSyncTransfer();
return;
goto error;
}
server.repl_transfer_left -= nread;
/* Check if the transfer is now complete */
@ -390,49 +358,58 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
server.master->authenticated = 1;
server.replstate = REDIS_REPL_CONNECTED;
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
/* Rewrite the AOF file now that the dataset changed. */
if (server.appendonly) rewriteAppendOnlyFileBackground();
}
return;
error:
replicationAbortSyncTransfer();
return;
}
int syncWithMaster(void) {
char buf[1024], tmpfile[256], authcmd[1024];
int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[1024], tmpfile[256];
int dfd, maxtries = 5;
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
if (fd == -1) {
redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
strerror(errno));
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
/* This event should only be triggered once since it is used to have a
* non-blocking connect(2) to the master. It has been triggered when this
* function is called, so we can delete it. */
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
/* AUTH with the master if required. */
if(server.masterauth) {
snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
close(fd);
char authcmd[1024];
size_t authlen;
authlen = snprintf(authcmd,sizeof(authcmd),"AUTH %s\r\n",server.masterauth);
if (syncWrite(fd,authcmd,authlen,server.repl_syncio_timeout) == -1) {
redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
strerror(errno));
return REDIS_ERR;
}
goto error;
}
/* Read the AUTH result. */
if (syncReadLine(fd,buf,1024,3600) == -1) {
close(fd);
if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout) == -1) {
redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
strerror(errno));
return REDIS_ERR;
goto error;
}
if (buf[0] != '+') {
close(fd);
redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
return REDIS_ERR;
goto error;
}
}
/* Issue the SYNC command */
if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
close(fd);
if (syncWrite(fd,"SYNC \r\n",7,server.repl_syncio_timeout) == -1) {
redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
return REDIS_ERR;
goto error;
}
/* Prepare a suitable temp file for bulk transfer */
@ -444,25 +421,51 @@ int syncWithMaster(void) {
sleep(1);
}
if (dfd == -1) {
close(fd);
redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
return REDIS_ERR;
goto error;
}
/* Setup the non blocking download of the bulk file. */
if (aeCreateFileEvent(server.el, fd, AE_READABLE, readSyncBulkPayload, NULL)
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
goto error;
}
server.replstate = REDIS_REPL_TRANSFER;
server.repl_transfer_left = -1;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = time(NULL);
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
error:
server.replstate = REDIS_REPL_CONNECT;
close(fd);
return;
}
int connectWithMaster(void) {
int fd;
fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
if (fd == -1) {
redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
strerror(errno));
return REDIS_ERR;
}
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
AE_ERR)
{
close(fd);
redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
return REDIS_ERR;
}
server.replstate = REDIS_REPL_TRANSFER;
server.repl_transfer_left = -1;
server.repl_transfer_s = fd;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = time(NULL);
server.repl_transfer_tmpfile = zstrdup(tmpfile);
server.replstate = REDIS_REPL_CONNECTING;
return REDIS_OK;
}
@ -517,9 +520,8 @@ void replicationCron(void) {
/* Check if we should connect to a MASTER */
if (server.replstate == REDIS_REPL_CONNECT) {
redisLog(REDIS_NOTICE,"Connecting to MASTER...");
if (syncWithMaster() == REDIS_OK) {
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started: SYNC sent");
if (server.appendonly) rewriteAppendOnlyFileBackground();
if (connectWithMaster() == REDIS_OK) {
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
}
}

View File

@ -249,8 +249,11 @@ void sremCommand(redisClient *c) {
for (j = 2; j < c->argc; j++) {
if (setTypeRemove(set,c->argv[j])) {
if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
deleted++;
if (setTypeSize(set) == 0) {
dbDelete(c->db,c->argv[1]);
break;
}
}
}
if (deleted) {

View File

@ -810,11 +810,29 @@ void zaddGenericCommand(redisClient *c, int incr) {
robj *ele;
robj *zobj;
robj *curobj;
double score, curscore = 0.0;
double score = 0, *scores, curscore = 0.0;
int j, elements = (c->argc-2)/2;
int added = 0;
if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK)
if (c->argc % 2) {
addReply(c,shared.syntaxerr);
return;
}
/* Start parsing all the scores, we need to emit any syntax error
* before executing additions to the sorted set, as the command should
* either execute fully or nothing at all. */
scores = zmalloc(sizeof(double)*elements);
for (j = 0; j < elements; j++) {
if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL)
!= REDIS_OK)
{
zfree(scores);
return;
}
}
/* Lookup the key and create the sorted set if does not exist. */
zobj = lookupKeyWrite(c->db,key);
if (zobj == NULL) {
if (server.zset_max_ziplist_entries == 0 ||
@ -828,111 +846,105 @@ void zaddGenericCommand(redisClient *c, int incr) {
} else {
if (zobj->type != REDIS_ZSET) {
addReply(c,shared.wrongtypeerr);
zfree(scores);
return;
}
}
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *eptr;
for (j = 0; j < elements; j++) {
score = scores[j];
/* Prefer non-encoded element when dealing with ziplists. */
ele = c->argv[3];
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
if (incr) {
score += curscore;
if (isnan(score)) {
addReplyError(c,nanerr);
/* Don't need to check if the sorted set is empty, because
* we know it has at least one element. */
return;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *eptr;
/* Prefer non-encoded element when dealing with ziplists. */
ele = c->argv[3+j*2];
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
if (incr) {
score += curscore;
if (isnan(score)) {
addReplyError(c,nanerr);
/* Don't need to check if the sorted set is empty
* because we know it has at least one element. */
zfree(scores);
return;
}
}
}
/* Remove and re-insert when score changed. */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
/* Remove and re-insert when score changed. */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
signalModifiedKey(c->db,key);
server.dirty++;
}
} else {
/* Optimize: check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
signalModifiedKey(c->db,key);
server.dirty++;
if (!incr) added++;
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;
if (incr) /* ZINCRBY */
addReplyDouble(c,score);
else /* ZADD */
addReply(c,shared.czero);
} else {
/* Optimize: check if the element is too large or the list becomes
* too long *before* executing zzlInsert. */
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]);
de = dictFind(zs->dict,ele);
if (de != NULL) {
curobj = dictGetEntryKey(de);
curscore = *(double*)dictGetEntryVal(de);
signalModifiedKey(c->db,key);
server.dirty++;
if (incr) /* ZINCRBY */
addReplyDouble(c,score);
else /* ZADD */
addReply(c,shared.cone);
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;
ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
de = dictFind(zs->dict,ele);
if (de != NULL) {
curobj = dictGetEntryKey(de);
curscore = *(double*)dictGetEntryVal(de);
if (incr) {
score += curscore;
if (isnan(score)) {
addReplyError(c,nanerr);
/* Don't need to check if the sorted set is empty, because
* we know it has at least one element. */
return;
if (incr) {
score += curscore;
if (isnan(score)) {
addReplyError(c,nanerr);
/* Don't need to check if the sorted set is empty
* because we know it has at least one element. */
zfree(scores);
return;
}
}
}
/* Remove and re-insert when score changed. We can safely delete
* the key object from the skiplist, since the dictionary still has
* a reference to it. */
if (score != curscore) {
redisAssert(zslDelete(zs->zsl,curscore,curobj));
znode = zslInsert(zs->zsl,score,curobj);
incrRefCount(curobj); /* Re-inserted in skiplist. */
dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
/* Remove and re-insert when score changed. We can safely
* delete the key object from the skiplist, since the
* dictionary still has a reference to it. */
if (score != curscore) {
redisAssert(zslDelete(zs->zsl,curscore,curobj));
znode = zslInsert(zs->zsl,score,curobj);
incrRefCount(curobj); /* Re-inserted in skiplist. */
dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
signalModifiedKey(c->db,key);
server.dirty++;
}
} else {
znode = zslInsert(zs->zsl,score,ele);
incrRefCount(ele); /* Inserted in skiplist. */
redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
incrRefCount(ele); /* Added to dictionary. */
signalModifiedKey(c->db,key);
server.dirty++;
if (!incr) added++;
}
if (incr) /* ZINCRBY */
addReplyDouble(c,score);
else /* ZADD */
addReply(c,shared.czero);
} else {
znode = zslInsert(zs->zsl,score,ele);
incrRefCount(ele); /* Inserted in skiplist. */
redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
incrRefCount(ele); /* Added to dictionary. */
signalModifiedKey(c->db,key);
server.dirty++;
if (incr) /* ZINCRBY */
addReplyDouble(c,score);
else /* ZADD */
addReply(c,shared.cone);
redisPanic("Unknown sorted set encoding");
}
} else {
redisPanic("Unknown sorted set encoding");
}
zfree(scores);
if (incr) /* ZINCRBY */
addReplyDouble(c,score);
else /* ZADD */
addReplyLongLong(c,added);
}
void zaddCommand(redisClient *c) {
@ -945,8 +957,8 @@ void zincrbyCommand(redisClient *c) {
void zremCommand(redisClient *c) {
robj *key = c->argv[1];
robj *ele = c->argv[2];
robj *zobj;
int deleted = 0, j;
if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) return;
@ -954,39 +966,48 @@ void zremCommand(redisClient *c) {
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *eptr;
if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
} else {
addReply(c,shared.czero);
return;
for (j = 2; j < c->argc; j++) {
if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
deleted++;
zobj->ptr = zzlDelete(zobj->ptr,eptr);
if (zzlLength(zobj->ptr) == 0) {
dbDelete(c->db,key);
break;
}
}
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
dictEntry *de;
double score;
de = dictFind(zs->dict,ele);
if (de != NULL) {
/* Delete from the skiplist */
score = *(double*)dictGetEntryVal(de);
redisAssert(zslDelete(zs->zsl,score,ele));
for (j = 2; j < c->argc; j++) {
de = dictFind(zs->dict,c->argv[j]);
if (de != NULL) {
deleted++;
/* Delete from the hash table */
dictDelete(zs->dict,ele);
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
} else {
addReply(c,shared.czero);
return;
/* Delete from the skiplist */
score = *(double*)dictGetEntryVal(de);
redisAssert(zslDelete(zs->zsl,score,c->argv[j]));
/* Delete from the hash table */
dictDelete(zs->dict,c->argv[j]);
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
if (dictSize(zs->dict) == 0) {
dbDelete(c->db,key);
break;
}
}
}
} else {
redisPanic("Unknown sorted set encoding");
}
signalModifiedKey(c->db,key);
server.dirty++;
addReply(c,shared.cone);
if (deleted) {
signalModifiedKey(c->db,key);
server.dirty += deleted;
}
addReplyLongLong(c,deleted);
}
void zremrangebyscoreCommand(redisClient *c) {