Merge branch 'unstable' into unstable-zset

Conflicts:
	src/object.c
This commit is contained in:
Pieter Noordhuis
2011-04-06 16:15:01 +02:00
24 changed files with 2362 additions and 181 deletions

View File

@ -25,7 +25,7 @@ PREFIX= /usr/local
INSTALL_BIN= $(PREFIX)/bin
INSTALL= cp -p
OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o dscache.o pubsub.o multi.o debug.o sort.o intset.o syncio.o diskstore.o
OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o dscache.o pubsub.o multi.o debug.o sort.o intset.o syncio.o diskstore.o cluster.o crc16.o endian.o
BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o
CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o
CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o
@ -105,6 +105,8 @@ t_zset.o: t_zset.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
util.o: util.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
cluster.o: redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
ziplist.o: ziplist.c zmalloc.h ziplist.h
zipmap.o: zipmap.c zmalloc.h
zmalloc.o: zmalloc.c config.h

1442
src/cluster.c Normal file

File diff suppressed because it is too large Load Diff

View File

@ -289,6 +289,10 @@ void loadServerConfig(char *filename) {
err = "Target command name already exists"; goto loaderr;
}
}
} else if (!strcasecmp(argv[0],"cluster-enabled") && argc == 2) {
if ((server.cluster_enabled = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else {
err = "Bad directive or wrong number of arguments"; goto loaderr;
}

View File

@ -21,7 +21,7 @@
#define redis_malloc_size(p) malloc_size(p)
#endif
/* define redis_fstat to fstat or fstat64() */
/* Tefine redis_fstat to fstat or fstat64() */
#if defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
#define redis_fstat fstat64
#define redis_stat stat64
@ -30,22 +30,22 @@
#define redis_stat stat
#endif
/* test for proc filesystem */
/* Test for proc filesystem */
#ifdef __linux__
#define HAVE_PROCFS 1
#endif
/* test for task_info() */
/* Test for task_info() */
#if defined(__APPLE__)
#define HAVE_TASKINFO 1
#endif
/* test for backtrace() */
/* Test for backtrace() */
#if defined(__APPLE__) || defined(__linux__)
#define HAVE_BACKTRACE 1
#endif
/* test for polling API */
/* Test for polling API */
#ifdef __linux__
#define HAVE_EPOLL 1
#endif
@ -54,11 +54,62 @@
#define HAVE_KQUEUE 1
#endif
/* define aof_fsync to fdatasync() in Linux and fsync() for all the rest */
/* Define aof_fsync to fdatasync() in Linux and fsync() for all the rest */
#ifdef __linux__
#define aof_fsync fdatasync
#else
#define aof_fsync fsync
#endif
/* Byte ordering detection */
#include <sys/types.h> /* This will likely define BYTE_ORDER */
#ifndef BYTE_ORDER
#if (BSD >= 199103)
# include <machine/endian.h>
#else
#if defined(linux) || defined(__linux__)
# include <endian.h>
#else
#define LITTLE_ENDIAN 1234 /* least-significant byte first (vax, pc) */
#define BIG_ENDIAN 4321 /* most-significant byte first (IBM, net) */
#define PDP_ENDIAN 3412 /* LSB first in word, MSW first in long (pdp)*/
#if defined(vax) || defined(ns32000) || defined(sun386) || defined(__i386__) || \
defined(MIPSEL) || defined(_MIPSEL) || defined(BIT_ZERO_ON_RIGHT) || \
defined(__alpha__) || defined(__alpha)
#define BYTE_ORDER LITTLE_ENDIAN
#endif
#if defined(sel) || defined(pyr) || defined(mc68000) || defined(sparc) || \
defined(is68k) || defined(tahoe) || defined(ibm032) || defined(ibm370) || \
defined(MIPSEB) || defined(_MIPSEB) || defined(_IBMR2) || defined(DGUX) ||\
defined(apollo) || defined(__convex__) || defined(_CRAY) || \
defined(__hppa) || defined(__hp9000) || \
defined(__hp9000s300) || defined(__hp9000s700) || \
defined (BIT_ZERO_ON_LEFT) || defined(m68k) || defined(__sparc)
#define BYTE_ORDER BIG_ENDIAN
#endif
#endif /* linux */
#endif /* BSD */
#endif /* BYTE_ORDER */
#if defined(__BYTE_ORDER) && !defined(BYTE_ORDER)
#if (__BYTE_ORDER == __LITTLE_ENDIAN)
#define BYTE_ORDER LITTLE_ENDIAN
#else
#define BYTE_ORDER BIG_ENDIAN
#endif
#endif
#if !defined(BYTE_ORDER) || \
(BYTE_ORDER != BIG_ENDIAN && BYTE_ORDER != LITTLE_ENDIAN)
/* you must determine what the correct bit order is for
* your compiler - the next line is an intentional error
* which will force your compiles to bomb until you fix
* the above macros.
*/
#error "Undefined or invalid BYTE_ORDER"
#endif
#endif

74
src/crc16.c Normal file
View File

@ -0,0 +1,74 @@
#include "redis.h"
/*
* Copyright 2001-2010 Georges Menie (www.menie.org)
* Copyright 2010 Salvatore Sanfilippo (adapted to Redis coding style)
* All rights reserved.
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the University of California, Berkeley nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/* CRC16 implementation acording to CCITT standards */
static const uint16_t crc16tab[256]= {
0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,
0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de,
0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485,
0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d,
0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4,
0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc,
0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823,
0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b,
0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12,
0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a,
0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41,
0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49,
0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70,
0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78,
0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f,
0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067,
0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e,
0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256,
0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d,
0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405,
0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c,
0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634,
0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab,
0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3,
0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a,
0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92,
0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9,
0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1,
0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8,
0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0
};
uint16_t crc16(const char *buf, int len) {
int counter;
uint16_t crc = 0;
for (counter = 0; counter < len; counter++)
crc = (crc<<8) ^ crc16tab[((crc>>8) ^ *buf++)&0x00FF];
return crc;
}

View File

@ -307,6 +307,10 @@ void existsCommand(redisClient *c) {
void selectCommand(redisClient *c) {
int id = atoi(c->argv[1]->ptr);
if (server.cluster_enabled) {
addReplyError(c,"SELECT is not allowed in cluster mode");
return;
}
if (selectDb(c,id) == REDIS_ERR) {
addReplyError(c,"invalid DB index");
} else {
@ -428,6 +432,11 @@ void moveCommand(redisClient *c) {
redisDb *src, *dst;
int srcid;
if (server.cluster_enabled) {
addReplyError(c,"MOVE is not allowed in cluster mode");
return;
}
/* Obtain source and target DB pointers */
src = c->db;
srcid = c->db->id;
@ -616,3 +625,76 @@ void persistCommand(redisClient *c) {
}
}
}
/* -----------------------------------------------------------------------------
* API to get key arguments from commands
* ---------------------------------------------------------------------------*/
int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, int *numkeys) {
int j, i = 0, last, *keys;
REDIS_NOTUSED(argv);
if (cmd->firstkey == 0) {
*numkeys = 0;
return NULL;
}
last = cmd->lastkey;
if (last < 0) last = argc+last;
keys = zmalloc(sizeof(int)*((last - cmd->firstkey)+1));
for (j = cmd->firstkey; j <= last; j += cmd->keystep) {
redisAssert(j < argc);
keys[i++] = j;
}
*numkeys = i;
return keys;
}
int *getKeysFromCommand(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags) {
if (cmd->getkeys_proc) {
return cmd->getkeys_proc(cmd,argv,argc,numkeys,flags);
} else {
return getKeysUsingCommandTable(cmd,argv,argc,numkeys);
}
}
void getKeysFreeResult(int *result) {
zfree(result);
}
int *noPreloadGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags) {
if (flags & REDIS_GETKEYS_PRELOAD) {
*numkeys = 0;
return NULL;
} else {
return getKeysUsingCommandTable(cmd,argv,argc,numkeys);
}
}
int *renameGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags) {
if (flags & REDIS_GETKEYS_PRELOAD) {
int *keys = zmalloc(sizeof(int));
*numkeys = 1;
keys[0] = 1;
return keys;
} else {
return getKeysUsingCommandTable(cmd,argv,argc,numkeys);
}
}
int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags) {
int i, num, *keys;
REDIS_NOTUSED(cmd);
REDIS_NOTUSED(flags);
num = atoi(argv[2]->ptr);
/* Sanity check. Don't return any key if the command is going to
* reply with syntax error. */
if (num > (argc-3)) {
*numkeys = 0;
return NULL;
}
keys = zmalloc(sizeof(int)*num);
for (i = 0; i < num; i++) keys[i] = 3+i;
*numkeys = num;
return keys;
}

View File

@ -212,7 +212,7 @@ int cacheFreeOneEntry(void) {
}
}
if (best == NULL) {
/* Was not able to fix a single object... we should check if our
/* Not able to free a single object? we should check if our
* IO queues have stuff in queue, and try to consume the queue
* otherwise we'll use an infinite amount of memory if changes to
* the dataset are faster than I/O */
@ -240,13 +240,6 @@ int cacheFreeOneEntry(void) {
return REDIS_OK;
}
/* Return true if it's safe to swap out objects in a given moment.
* Basically we don't want to swap objects out while there is a BGSAVE
* or a BGAEOREWRITE running in backgroud. */
int dsCanTouchDiskStore(void) {
return (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1);
}
/* ==================== Disk store negative caching ========================
*
* When disk store is enabled, we need negative caching, that is, to remember
@ -390,11 +383,10 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
}
cacheScheduleIODelFlag(j->db,j->key,REDIS_IO_LOADINPROG);
handleClientsBlockedOnSwappedKey(j->db,j->key);
freeIOJob(j);
} else if (j->type == REDIS_IOJOB_SAVE) {
cacheScheduleIODelFlag(j->db,j->key,REDIS_IO_SAVEINPROG);
freeIOJob(j);
}
freeIOJob(j);
processed++;
if (privdata == NULL) cacheScheduleIOPushJobs(0);
if (processed == toprocess) return;
@ -595,8 +587,6 @@ void queueIOJob(iojob *j) {
redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
(void*)j, j->type, (char*)j->key->ptr);
listAddNodeTail(server.io_newjobs,j);
if (server.io_active_threads < server.vm_max_threads)
spawnIOThread();
}
/* Consume all the IO scheduled operations, and all the thread IO jobs
@ -900,64 +890,19 @@ int waitForSwappedKey(redisClient *c, robj *key) {
listAddNodeTail(l,c);
/* Are we already loading the key from disk? If not create a job */
if (de == NULL)
cacheScheduleIO(c->db,key,REDIS_IO_LOAD);
if (de == NULL) {
int flags = cacheScheduleIOGetFlags(c->db,key);
/* It is possible that even if there are no clients waiting for
* a load operation, still we have a load operation in progress.
* For instance think to a client performing a GET and then
* closing the connection */
if ((flags & (REDIS_IO_LOAD|REDIS_IO_LOADINPROG)) == 0)
cacheScheduleIO(c->db,key,REDIS_IO_LOAD);
}
return 1;
}
/* Preload keys for any command with first, last and step values for
* the command keys prototype, as defined in the command table. */
void waitForMultipleSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) {
int j, last;
if (cmd->vm_firstkey == 0) return;
last = cmd->vm_lastkey;
if (last < 0) last = argc+last;
for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep) {
redisAssert(j < argc);
waitForSwappedKey(c,argv[j]);
}
}
/* Preload keys needed for the ZUNIONSTORE and ZINTERSTORE commands.
* Note that the number of keys to preload is user-defined, so we need to
* apply a sanity check against argc. */
void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) {
int i, num;
REDIS_NOTUSED(cmd);
num = atoi(argv[2]->ptr);
if (num > (argc-3)) return;
for (i = 0; i < num; i++) {
waitForSwappedKey(c,argv[3+i]);
}
}
/* Preload keys needed to execute the entire MULTI/EXEC block.
*
* This function is called by blockClientOnSwappedKeys when EXEC is issued,
* and will block the client when any command requires a swapped out value. */
void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) {
int i, margc;
struct redisCommand *mcmd;
robj **margv;
REDIS_NOTUSED(cmd);
REDIS_NOTUSED(argc);
REDIS_NOTUSED(argv);
if (!(c->flags & REDIS_MULTI)) return;
for (i = 0; i < c->mstate.count; i++) {
mcmd = c->mstate.commands[i].cmd;
margc = c->mstate.commands[i].argc;
margv = c->mstate.commands[i].argv;
if (mcmd->vm_preload_proc != NULL) {
mcmd->vm_preload_proc(c,mcmd,margc,margv);
} else {
waitForMultipleSwappedKeys(c,mcmd,margc,margv);
}
}
}
/* Is this client attempting to run a command against swapped keys?
* If so, block it ASAP, load the keys in background, then resume it.
*
@ -969,10 +914,39 @@ void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int
* Return 1 if the client is marked as blocked, 0 if the client can
* continue as the keys it is going to access appear to be in memory. */
int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd) {
if (cmd->vm_preload_proc != NULL) {
cmd->vm_preload_proc(c,cmd,c->argc,c->argv);
int *keyindex, numkeys, j, i;
/* EXEC is a special case, we need to preload all the commands
* queued into the transaction */
if (cmd->proc == execCommand) {
struct redisCommand *mcmd;
robj **margv;
int margc;
if (!(c->flags & REDIS_MULTI)) return 0;
for (i = 0; i < c->mstate.count; i++) {
mcmd = c->mstate.commands[i].cmd;
margc = c->mstate.commands[i].argc;
margv = c->mstate.commands[i].argv;
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
REDIS_GETKEYS_PRELOAD);
for (j = 0; j < numkeys; j++) {
redisLog(REDIS_DEBUG,"Preloading %s",
(char*)margv[keyindex[j]]->ptr);
waitForSwappedKey(c,margv[keyindex[j]]);
}
getKeysFreeResult(keyindex);
}
} else {
waitForMultipleSwappedKeys(c,cmd,c->argc,c->argv);
keyindex = getKeysFromCommand(cmd,c->argv,c->argc,&numkeys,
REDIS_GETKEYS_PRELOAD);
for (j = 0; j < numkeys; j++) {
redisLog(REDIS_DEBUG,"Preloading %s",
(char*)c->argv[keyindex[j]]->ptr);
waitForSwappedKey(c,c->argv[keyindex[j]]);
}
getKeysFreeResult(keyindex);
}
/* If the client was blocked for at least one key, mark it as blocked. */

63
src/endian.c Normal file
View File

@ -0,0 +1,63 @@
/* Toggle the 16 bit unsigned integer pointed by *p from little endian to
* big endian */
void memrev16(void *p) {
unsigned char *x = p, t;
t = x[0];
x[0] = x[1];
x[1] = t;
}
/* Toggle the 32 bit unsigned integer pointed by *p from little endian to
* big endian */
void memrev32(void *p) {
unsigned char *x = p, t;
t = x[0];
x[0] = x[3];
x[3] = t;
t = x[1];
x[1] = x[2];
x[2] = t;
}
/* Toggle the 64 bit unsigned integer pointed by *p from little endian to
* big endian */
void memrev64(void *p) {
unsigned char *x = p, t;
t = x[0];
x[0] = x[7];
x[7] = t;
t = x[1];
x[1] = x[6];
x[6] = t;
t = x[2];
x[2] = x[5];
x[5] = t;
t = x[3];
x[3] = x[4];
x[4] = t;
}
#ifdef TESTMAIN
#include <stdio.h>
int main(void) {
char buf[32];
sprintf(buf,"ciaoroma");
memrev16(buf);
printf("%s\n", buf);
sprintf(buf,"ciaoroma");
memrev32(buf);
printf("%s\n", buf);
sprintf(buf,"ciaoroma");
memrev64(buf);
printf("%s\n", buf);
return 0;
}
#endif

20
src/endian.h Normal file
View File

@ -0,0 +1,20 @@
#ifndef __ENDIAN_H
#define __ENDIAN_H
void memrev16(void *p);
void memrev32(void *p);
void memrev64(void *p);
/* variants of the function doing the actual convertion only if the target
* host is big endian */
#if (BYTE_ORDER == LITTLE_ENDIAN)
#define memrev16ifbe(p)
#define memrev32ifbe(p)
#define memrev64ifbe(p)
#else
#define memrev16ifbe(p) memrev16(p)
#define memrev32ifbe(p) memrev32(p)
#define memrev64ifbe(p) memrev64(p)
#endif
#endif

View File

@ -3,6 +3,7 @@
#include <string.h>
#include "intset.h"
#include "zmalloc.h"
#include "endian.h"
/* Note that these encodings are ordered, so:
* INTSET_ENC_INT16 < INTSET_ENC_INT32 < INTSET_ENC_INT64. */
@ -16,16 +17,29 @@ static uint8_t _intsetValueEncoding(int64_t v) {
return INTSET_ENC_INT64;
else if (v < INT16_MIN || v > INT16_MAX)
return INTSET_ENC_INT32;
return INTSET_ENC_INT16;
else
return INTSET_ENC_INT16;
}
/* Return the value at pos, given an encoding. */
static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) {
if (enc == INTSET_ENC_INT64)
return ((int64_t*)is->contents)[pos];
else if (enc == INTSET_ENC_INT32)
return ((int32_t*)is->contents)[pos];
return ((int16_t*)is->contents)[pos];
int64_t v64;
int32_t v32;
int16_t v16;
if (enc == INTSET_ENC_INT64) {
memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64));
memrev64ifbe(&v64);
return v64;
} else if (enc == INTSET_ENC_INT32) {
memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32));
memrev32ifbe(&v32);
return v32;
} else {
memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16));
memrev16ifbe(&v16);
return v16;
}
}
/* Return the value at pos, using the configured encoding. */
@ -35,12 +49,16 @@ static int64_t _intsetGet(intset *is, int pos) {
/* Set the value at pos, using the configured encoding. */
static void _intsetSet(intset *is, int pos, int64_t value) {
if (is->encoding == INTSET_ENC_INT64)
if (is->encoding == INTSET_ENC_INT64) {
((int64_t*)is->contents)[pos] = value;
else if (is->encoding == INTSET_ENC_INT32)
memrev64ifbe(((int64_t*)is->contents)+pos);
} else if (is->encoding == INTSET_ENC_INT32) {
((int32_t*)is->contents)[pos] = value;
else
memrev32ifbe(((int32_t*)is->contents)+pos);
} else {
((int16_t*)is->contents)[pos] = value;
memrev16ifbe(((int16_t*)is->contents)+pos);
}
}
/* Create an empty intset. */

View File

@ -60,9 +60,6 @@ redisClient *createClient(int fd) {
/* Set the event loop to listen for write events on the client's socket.
* Typically gets called every time a reply is built. */
int _installWriteEvent(redisClient *c) {
/* When CLOSE_AFTER_REPLY is set, no more replies may be added! */
redisAssert(!(c->flags & REDIS_CLOSE_AFTER_REPLY));
if (c->fd <= 0) return REDIS_ERR;
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
@ -88,9 +85,15 @@ robj *dupLastObjectIfNeeded(list *reply) {
return listNodeValue(ln);
}
/* -----------------------------------------------------------------------------
* Low level functions to add more data to output buffers.
* -------------------------------------------------------------------------- */
int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
size_t available = sizeof(c->buf)-c->bufpos;
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;
/* If there already are entries in the reply list, we cannot
* add anything more to the static buffer. */
if (listLength(c->reply) > 0) return REDIS_ERR;
@ -105,6 +108,9 @@ int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
void _addReplyObjectToList(redisClient *c, robj *o) {
robj *tail;
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
if (listLength(c->reply) == 0) {
incrRefCount(o);
listAddNodeTail(c->reply,o);
@ -128,6 +134,12 @@ void _addReplyObjectToList(redisClient *c, robj *o) {
* needed it will be free'd, otherwise it ends up in a robj. */
void _addReplySdsToList(redisClient *c, sds s) {
robj *tail;
if (c->flags & REDIS_CLOSE_AFTER_REPLY) {
sdsfree(s);
return;
}
if (listLength(c->reply) == 0) {
listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
} else {
@ -148,6 +160,9 @@ void _addReplySdsToList(redisClient *c, sds s) {
void _addReplyStringToList(redisClient *c, char *s, size_t len) {
robj *tail;
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
if (listLength(c->reply) == 0) {
listAddNodeTail(c->reply,createStringObject(s,len));
} else {
@ -165,6 +180,11 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) {
}
}
/* -----------------------------------------------------------------------------
* Higher level functions to queue data on the client output buffer.
* The following functions are the ones that commands implementations will call.
* -------------------------------------------------------------------------- */
void addReply(redisClient *c, robj *obj) {
if (_installWriteEvent(c) != REDIS_OK) return;

View File

@ -93,9 +93,13 @@ robj *createHashObject(void) {
robj *createZsetObject(void) {
zset *zs = zmalloc(sizeof(*zs));
robj *o;
zs->dict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate();
return createObject(REDIS_ZSET,zs);
o = createObject(REDIS_ZSET,zs);
o->encoding = REDIS_ENCODING_SKIPLIST;
return o;
}
robj *createZsetZiplistObject(void) {
@ -417,6 +421,7 @@ char *strEncoding(int encoding) {
case REDIS_ENCODING_LINKEDLIST: return "linkedlist";
case REDIS_ENCODING_ZIPLIST: return "ziplist";
case REDIS_ENCODING_INTSET: return "intset";
case REDIS_ENCODING_SKIPLIST: return "skiplist";
default: return "unknown";
}
}
@ -431,3 +436,42 @@ unsigned long estimateObjectIdleTime(robj *o) {
REDIS_LRU_CLOCK_RESOLUTION;
}
}
/* This is an helper function for the DEBUG command. We need to lookup keys
* without any modification of LRU or other parameters. */
robj *objectCommandLookup(redisClient *c, robj *key) {
dictEntry *de;
if ((de = dictFind(c->db->dict,key->ptr)) == NULL) return NULL;
return (robj*) dictGetEntryVal(de);
}
robj *objectCommandLookupOrReply(redisClient *c, robj *key, robj *reply) {
robj *o = objectCommandLookup(c,key);
if (!o) addReply(c, reply);
return o;
}
/* Object command allows to inspect the internals of an Redis Object.
* Usage: OBJECT <verb> ... arguments ... */
void objectCommand(redisClient *c) {
robj *o;
if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
== NULL) return;
addReplyLongLong(c,o->refcount);
} else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
== NULL) return;
addReplyBulkCString(c,strEncoding(o->encoding));
} else if (!strcasecmp(c->argv[1]->ptr,"idletime") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
== NULL) return;
addReplyLongLong(c,estimateObjectIdleTime(o));
} else {
addReplyError(c,"Syntax error. Try OBJECT (refcount|encoding|idletime)");
}
}

View File

@ -245,7 +245,7 @@ int rdbSaveDoubleValue(FILE *fp, double val) {
return rdbWriteRaw(fp,buf,len);
}
/* Save a Redis object. */
/* Save a Redis object. Returns -1 on error, 0 on success. */
int rdbSaveObject(FILE *fp, robj *o) {
int n, nwritten = 0;

View File

@ -394,15 +394,18 @@ static sds cliFormatReplyRaw(redisReply *r) {
switch (r->type) {
case REDIS_REPLY_NIL:
/* Nothing... */
break;
break;
case REDIS_REPLY_ERROR:
out = sdscatlen(out,r->str,r->len);
out = sdscatlen(out,"\n",1);
break;
case REDIS_REPLY_STATUS:
case REDIS_REPLY_STRING:
out = sdscatlen(out,r->str,r->len);
break;
break;
case REDIS_REPLY_INTEGER:
out = sdscatprintf(out,"%lld",r->integer);
break;
break;
case REDIS_REPLY_ARRAY:
for (i = 0; i < r->elements; i++) {
if (i > 0) out = sdscat(out,config.mb_delim);
@ -410,7 +413,7 @@ static sds cliFormatReplyRaw(redisReply *r) {
out = sdscatlen(out,tmp,sdslen(tmp));
sdsfree(tmp);
}
break;
break;
default:
fprintf(stderr,"Unknown reply type: %d\n", r->type);
exit(1);
@ -464,7 +467,15 @@ static int cliSendCommand(int argc, char **argv, int repeat) {
return REDIS_OK;
}
output_raw = !strcasecmp(command,"info");
output_raw = 0;
if (!strcasecmp(command,"info") ||
(argc == 2 && !strcasecmp(command,"cluster") &&
(!strcasecmp(argv[1],"nodes") ||
!strcasecmp(argv[1],"info"))))
{
output_raw = 1;
}
if (!strcasecmp(command,"help") || !strcasecmp(command,"?")) {
cliOutputHelp(--argc, ++argv);
return REDIS_OK;

View File

@ -70,12 +70,12 @@ struct redisServer server; /* server global state */
struct redisCommand *commandTable;
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,0,NULL,1,1,1,0,0},
{"set",setCommand,3,REDIS_CMD_DENYOOM,NULL,0,0,0,0,0},
{"setnx",setnxCommand,3,REDIS_CMD_DENYOOM,NULL,0,0,0,0,0},
{"setex",setexCommand,4,REDIS_CMD_DENYOOM,NULL,0,0,0,0,0},
{"set",setCommand,3,REDIS_CMD_DENYOOM,noPreloadGetKeys,1,1,1,0,0},
{"setnx",setnxCommand,3,REDIS_CMD_DENYOOM,noPreloadGetKeys,1,1,1,0,0},
{"setex",setexCommand,4,REDIS_CMD_DENYOOM,noPreloadGetKeys,2,2,1,0,0},
{"append",appendCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
{"strlen",strlenCommand,2,0,NULL,1,1,1,0,0},
{"del",delCommand,-2,0,NULL,0,0,0,0,0},
{"del",delCommand,-2,0,noPreloadGetKeys,1,-1,1,0,0},
{"exists",existsCommand,2,0,NULL,1,1,1,0,0},
{"setbit",setbitCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
{"getbit",getbitCommand,3,0,NULL,1,1,1,0,0},
@ -94,7 +94,7 @@ struct redisCommand redisCommandTable[] = {
{"lpop",lpopCommand,2,0,NULL,1,1,1,0,0},
{"brpop",brpopCommand,-3,0,NULL,1,1,1,0,0},
{"brpoplpush",brpoplpushCommand,4,REDIS_CMD_DENYOOM,NULL,1,2,1,0,0},
{"blpop",blpopCommand,-3,0,NULL,1,1,1,0,0},
{"blpop",blpopCommand,-3,0,NULL,1,-2,1,0,0},
{"llen",llenCommand,2,0,NULL,1,1,1,0,0},
{"lindex",lindexCommand,3,0,NULL,1,1,1,0,0},
{"lset",lsetCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
@ -121,8 +121,8 @@ struct redisCommand redisCommandTable[] = {
{"zrem",zremCommand,3,0,NULL,1,1,1,0,0},
{"zremrangebyscore",zremrangebyscoreCommand,4,0,NULL,1,1,1,0,0},
{"zremrangebyrank",zremrangebyrankCommand,4,0,NULL,1,1,1,0,0},
{"zunionstore",zunionstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0,0,0},
{"zinterstore",zinterstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0,0,0},
{"zunionstore",zunionstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterGetKeys,0,0,0,0,0},
{"zinterstore",zinterstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterGetKeys,0,0,0,0,0},
{"zrange",zrangeCommand,-4,0,NULL,1,1,1,0,0},
{"zrangebyscore",zrangebyscoreCommand,-4,0,NULL,1,1,1,0,0},
{"zrevrangebyscore",zrevrangebyscoreCommand,-4,0,NULL,1,1,1,0,0},
@ -152,10 +152,10 @@ struct redisCommand redisCommandTable[] = {
{"randomkey",randomkeyCommand,1,0,NULL,0,0,0,0,0},
{"select",selectCommand,2,0,NULL,0,0,0,0,0},
{"move",moveCommand,3,0,NULL,1,1,1,0,0},
{"rename",renameCommand,3,0,NULL,1,1,1,0,0},
{"renamenx",renamenxCommand,3,0,NULL,1,1,1,0,0},
{"expire",expireCommand,3,0,NULL,0,0,0,0,0},
{"expireat",expireatCommand,3,0,NULL,0,0,0,0,0},
{"rename",renameCommand,3,0,renameGetKeys,1,2,1,0,0},
{"renamenx",renamenxCommand,3,0,renameGetKeys,1,2,1,0,0},
{"expire",expireCommand,3,0,NULL,1,1,1,0,0},
{"expireat",expireatCommand,3,0,NULL,1,1,1,0,0},
{"keys",keysCommand,2,0,NULL,0,0,0,0,0},
{"dbsize",dbsizeCommand,1,0,NULL,0,0,0,0,0},
{"auth",authCommand,2,0,NULL,0,0,0,0,0},
@ -168,7 +168,7 @@ struct redisCommand redisCommandTable[] = {
{"lastsave",lastsaveCommand,1,0,NULL,0,0,0,0,0},
{"type",typeCommand,2,0,NULL,1,1,1,0,0},
{"multi",multiCommand,1,0,NULL,0,0,0,0,0},
{"exec",execCommand,1,REDIS_CMD_DENYOOM,execBlockClientOnSwappedKeys,0,0,0,0,0},
{"exec",execCommand,1,REDIS_CMD_DENYOOM,NULL,0,0,0,0,0},
{"discard",discardCommand,1,0,NULL,0,0,0,0,0},
{"sync",syncCommand,1,0,NULL,0,0,0,0,0},
{"flushdb",flushdbCommand,1,0,NULL,0,0,0,0,0},
@ -186,8 +186,13 @@ struct redisCommand redisCommandTable[] = {
{"psubscribe",psubscribeCommand,-2,0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,0,NULL,0,0,0,0,0},
{"publish",publishCommand,3,REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0,0,0},
{"watch",watchCommand,-2,0,NULL,0,0,0,0,0},
{"unwatch",unwatchCommand,1,0,NULL,0,0,0,0,0}
{"watch",watchCommand,-2,0,noPreloadGetKeys,1,-1,1,0,0},
{"unwatch",unwatchCommand,1,0,NULL,0,0,0,0,0},
{"cluster",clusterCommand,-2,0,NULL,0,0,0,0,0},
{"restore",restoreCommand,4,0,NULL,0,0,0,0,0},
{"migrate",migrateCommand,6,0,NULL,0,0,0,0,0},
{"dump",dumpCommand,2,0,NULL,0,0,0,0,0},
{"object",objectCommand,-2,0,NULL,0,0,0,0,0}
};
/*============================ Utility functions ============================ */
@ -440,6 +445,17 @@ dictType keylistDictType = {
dictListDestructor /* val destructor */
};
/* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
* clusterNode structures. */
dictType clusterNodesDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
NULL /* val destructor */
};
int htNeedsResize(dict *dict) {
long long size, used;
@ -669,6 +685,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* to detect transfer failures. */
if (!(loops % 10)) replicationCron();
/* Run other sub-systems specific cron jobs */
if (server.cluster_enabled && !(loops % 10)) clusterCron();
server.cronloops++;
return 100;
}
@ -825,6 +844,8 @@ void initServerConfig() {
server.zset_max_ziplist_value = REDIS_ZSET_MAX_ZIPLIST_VALUE;
server.shutdown_asap = 0;
server.cache_flush_delay = 0;
server.cluster_enabled = 0;
server.cluster.configfile = zstrdup("nodes.conf");
updateLRUClock();
resetServerSaveParams();
@ -947,6 +968,7 @@ void initServer() {
}
if (server.ds_enabled) dsInit();
if (server.cluster_enabled) clusterInit();
srand(time(NULL)^getpid());
}
@ -1053,6 +1075,27 @@ int processCommand(redisClient *c) {
return REDIS_OK;
}
/* If cluster is enabled, redirect here */
if (server.cluster_enabled &&
!(cmd->getkeys_proc == NULL && cmd->firstkey == 0)) {
int hashslot;
if (server.cluster.state != REDIS_CLUSTER_OK) {
addReplyError(c,"The cluster is down. Check with CLUSTER INFO for more information");
return REDIS_OK;
} else {
clusterNode *n = getNodeByQuery(c,cmd,c->argv,c->argc,&hashslot);
if (n == NULL) {
addReplyError(c,"Invalid cross-node request");
return REDIS_OK;
} else if (n != server.cluster.myself) {
addReplySds(c,sdscatprintf(sdsempty(),
"-MOVED %d %s:%d\r\n",hashslot,n->ip,n->port));
return REDIS_OK;
}
}
}
/* Handle the maxmemory directive.
*
* First we try to free some memory if possible (if there are volatile

View File

@ -18,6 +18,7 @@
#include <inttypes.h>
#include <pthread.h>
#include <syslog.h>
#include <netinet/in.h>
#include "ae.h" /* Event driven programming library */
#include "sds.h" /* Dynamic safe strings */
@ -87,6 +88,7 @@
#define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define REDIS_ENCODING_INTSET 6 /* Encoded as intset */
#define REDIS_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
/* Object types only used for dumping to disk */
#define REDIS_EXPIRETIME 253
@ -364,7 +366,124 @@ struct sharedObjectsStruct {
*integers[REDIS_SHARED_INTEGERS];
};
/* Global server state structure */
/*-----------------------------------------------------------------------------
* Redis cluster data structures
*----------------------------------------------------------------------------*/
#define REDIS_CLUSTER_SLOTS 4096
#define REDIS_CLUSTER_OK 0 /* Everything looks ok */
#define REDIS_CLUSTER_FAIL 1 /* The cluster can't work */
#define REDIS_CLUSTER_NEEDHELP 2 /* The cluster works, but needs some help */
#define REDIS_CLUSTER_NAMELEN 40 /* sha1 hex length */
#define REDIS_CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */
struct clusterNode;
/* clusterLink encapsulates everything needed to talk with a remote node. */
typedef struct clusterLink {
int fd; /* TCP socket file descriptor */
sds sndbuf; /* Packet send buffer */
sds rcvbuf; /* Packet reception buffer */
struct clusterNode *node; /* Node related to this link if any, or NULL */
} clusterLink;
/* Node flags */
#define REDIS_NODE_MASTER 1 /* The node is a master */
#define REDIS_NODE_SLAVE 2 /* The node is a slave */
#define REDIS_NODE_PFAIL 4 /* Failure? Need acknowledge */
#define REDIS_NODE_FAIL 8 /* The node is believed to be malfunctioning */
#define REDIS_NODE_MYSELF 16 /* This node is myself */
#define REDIS_NODE_HANDSHAKE 32 /* We have still to exchange the first ping */
#define REDIS_NODE_NOADDR 64 /* We don't know the address of this node */
#define REDIS_NODE_MEET 128 /* Send a MEET message to this node */
#define REDIS_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
struct clusterNode {
char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
int flags; /* REDIS_NODE_... */
unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */
int numslaves; /* Number of slave nodes, if this is a master */
struct clusterNode **slaves; /* pointers to slave nodes */
struct clusterNode *slaveof; /* pointer to the master node */
time_t ping_sent; /* Unix time we sent latest ping */
time_t pong_received; /* Unix time we received the pong */
char *configdigest; /* Configuration digest of this node */
time_t configdigest_ts; /* Configuration digest timestamp */
char ip[16]; /* Latest known IP address of this node */
int port; /* Latest known port of this node */
clusterLink *link; /* TCP/IP link with this node */
};
typedef struct clusterNode clusterNode;
typedef struct {
char *configfile;
clusterNode *myself; /* This node */
int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */
int node_timeout;
dict *nodes; /* Hash table of name -> clusterNode structures */
clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];
clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];
clusterNode *slots[REDIS_CLUSTER_SLOTS];
} clusterState;
/* Redis cluster messages header */
/* Note that the PING, PONG and MEET messages are actually the same exact
* kind of packet. PONG is the reply to ping, in the extact format as a PING,
* while MEET is a special PING that forces the receiver to add the sender
* as a node (if it is not already in the list). */
#define CLUSTERMSG_TYPE_PING 0 /* Ping */
#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */
#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */
#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */
/* Initially we don't know our "name", but we'll find it once we connect
* to the first node, using the getsockname() function. Then we'll use this
* address for all the next messages. */
typedef struct {
char nodename[REDIS_CLUSTER_NAMELEN];
uint32_t ping_sent;
uint32_t pong_received;
char ip[16]; /* IP address last time it was seen */
uint16_t port; /* port last time it was seen */
uint16_t flags;
uint32_t notused; /* for 64 bit alignment */
} clusterMsgDataGossip;
typedef struct {
char nodename[REDIS_CLUSTER_NAMELEN];
} clusterMsgDataFail;
union clusterMsgData {
/* PING, MEET and PONG */
struct {
/* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[1];
} ping;
/* FAIL */
struct {
clusterMsgDataFail about;
} fail;
};
typedef struct {
uint32_t totlen; /* Total length of this message */
uint16_t type; /* Message type */
uint16_t count; /* Only used for some kind of messages. */
char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node */
unsigned char myslots[REDIS_CLUSTER_SLOTS/8];
char slaveof[REDIS_CLUSTER_NAMELEN];
char configdigest[32];
uint16_t port; /* Sender TCP base port */
unsigned char state; /* Cluster state from the POV of the sender */
unsigned char notused[5]; /* Reserved for future use. For alignment. */
union clusterMsgData data;
} clusterMsg;
/*-----------------------------------------------------------------------------
* Global server state
*----------------------------------------------------------------------------*/
struct redisServer {
/* General */
pthread_t mainthread;
@ -377,6 +496,7 @@ struct redisServer {
char *unixsocket;
int ipfd;
int sofd;
int cfd;
list *clients;
list *slaves, *monitors;
char neterr[ANET_ERR_LEN];
@ -505,6 +625,8 @@ struct redisServer {
/* Misc */
unsigned lruclock:22; /* clock incrementing every minute, for LRU */
unsigned lruclock_padding:10;
int cluster_enabled;
clusterState cluster;
};
typedef struct pubsubPattern {
@ -513,20 +635,19 @@ typedef struct pubsubPattern {
} pubsubPattern;
typedef void redisCommandProc(redisClient *c);
typedef void redisVmPreloadProc(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
typedef int *redisGetKeysProc(struct redisCommand *cmd, robj **argv, int argc, int *numkeys, int flags);
struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
int flags;
/* Use a function to determine which keys need to be loaded
* in the background prior to executing this command. Takes precedence
* over vm_firstkey and others, ignored when NULL */
redisVmPreloadProc *vm_preload_proc;
/* Use a function to determine keys arguments in a command line.
* Used both for diskstore preloading and Redis Cluster. */
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
int vm_firstkey; /* The first argument that's a key (0 = no keys) */
int vm_lastkey; /* THe last argument that's a key */
int vm_keystep; /* The step between first and last key */
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* THe last argument that's a key */
int keystep; /* The step between first and last key */
long long microseconds, calls;
};
@ -640,6 +761,7 @@ extern struct redisServer server;
extern struct sharedObjectsStruct shared;
extern dictType setDictType;
extern dictType zsetDictType;
extern dictType clusterNodesDictType;
extern double R_Zero, R_PosInf, R_NegInf, R_Nan;
dictType hashDictType;
@ -755,6 +877,7 @@ int fwriteBulkString(FILE *fp, char *s, unsigned long len);
int fwriteBulkDouble(FILE *fp, double d);
int fwriteBulkLongLong(FILE *fp, long long l);
int fwriteBulkObject(FILE *fp, robj *obj);
int fwriteBulkCount(FILE *fp, char prefix, int count);
/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
@ -842,8 +965,6 @@ void freeIOJob(iojob *j);
void queueIOJob(iojob *j);
void waitEmptyIOJobsQueue(void);
void processAllPendingIOJobs(void);
void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);
int dontWaitForSwappedKey(redisClient *c, robj *key);
void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
@ -932,6 +1053,24 @@ int selectDb(redisClient *c, int id);
void signalModifiedKey(redisDb *db, robj *key);
void signalFlushedDb(int dbid);
/* API to get key arguments from commands */
#define REDIS_GETKEYS_ALL 0
#define REDIS_GETKEYS_PRELOAD 1
int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys, int flags);
void getKeysFreeResult(int *result);
int *noPreloadGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
int *renameGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
/* Cluster */
void clusterInit(void);
unsigned short crc16(const char *buf, int len);
unsigned int keyHashSlot(char *key, int keylen);
clusterNode *createClusterNode(char *nodename, int flags);
int clusterAddNode(clusterNode *node);
void clusterCron(void);
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot);
/* Git SHA1 */
char *redisGitSHA1(void);
char *redisGitDirty(void);
@ -1054,6 +1193,11 @@ void punsubscribeCommand(redisClient *c);
void publishCommand(redisClient *c);
void watchCommand(redisClient *c);
void unwatchCommand(redisClient *c);
void clusterCommand(redisClient *c);
void restoreCommand(redisClient *c);
void migrateCommand(redisClient *c);
void dumpCommand(redisClient *c);
void objectCommand(redisClient *c);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));

View File

@ -26,6 +26,12 @@
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* History:
*
* - 22 March 2011: History section created on top of sds.c
* - 22 March 2011: Fixed a problem with "\xab" escapes convertion in
* function sdssplitargs().
*/
#define SDS_ABORT_ON_OOM
@ -45,7 +51,11 @@ static void sdsOomAbort(void) {
sds sdsnewlen(const void *init, size_t initlen) {
struct sdshdr *sh;
sh = zmalloc(sizeof(struct sdshdr)+initlen+1);
if (init) {
sh = zmalloc(sizeof(struct sdshdr)+initlen+1);
} else {
sh = zcalloc(sizeof(struct sdshdr)+initlen+1);
}
#ifdef SDS_ABORT_ON_OOM
if (sh == NULL) sdsOomAbort();
#else
@ -53,10 +63,8 @@ sds sdsnewlen(const void *init, size_t initlen) {
#endif
sh->len = initlen;
sh->free = 0;
if (initlen) {
if (init) memcpy(sh->buf, init, initlen);
else memset(sh->buf,0,initlen);
}
if (initlen && init)
memcpy(sh->buf, init, initlen);
sh->buf[initlen] = '\0';
return (char*)sh->buf;
}
@ -399,11 +407,11 @@ sds sdscatrepr(sds s, char *p, size_t len) {
case '"':
s = sdscatprintf(s,"\\%c",*p);
break;
case '\n': s = sdscatlen(s,"\\n",1); break;
case '\r': s = sdscatlen(s,"\\r",1); break;
case '\t': s = sdscatlen(s,"\\t",1); break;
case '\a': s = sdscatlen(s,"\\a",1); break;
case '\b': s = sdscatlen(s,"\\b",1); break;
case '\n': s = sdscatlen(s,"\\n",2); break;
case '\r': s = sdscatlen(s,"\\r",2); break;
case '\t': s = sdscatlen(s,"\\t",2); break;
case '\a': s = sdscatlen(s,"\\a",2); break;
case '\b': s = sdscatlen(s,"\\b",2); break;
default:
if (isprint(*p))
s = sdscatprintf(s,"%c",*p);
@ -416,6 +424,37 @@ sds sdscatrepr(sds s, char *p, size_t len) {
return sdscatlen(s,"\"",1);
}
/* Helper function for sdssplitargs() that returns non zero if 'c'
* is a valid hex digit. */
int is_hex_digit(char c) {
return (c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') ||
(c >= 'A' && c <= 'F');
}
/* Helper function for sdssplitargs() that converts an hex digit into an
* integer from 0 to 15 */
int hex_digit_to_int(char c) {
switch(c) {
case '0': return 0;
case '1': return 1;
case '2': return 2;
case '3': return 3;
case '4': return 4;
case '5': return 5;
case '6': return 6;
case '7': return 7;
case '8': return 8;
case '9': return 9;
case 'a': case 'A': return 10;
case 'b': case 'B': return 11;
case 'c': case 'C': return 12;
case 'd': case 'D': return 13;
case 'e': case 'E': return 14;
case 'f': case 'F': return 15;
default: return 0;
}
}
/* Split a line into arguments, where every argument can be in the
* following programming-language REPL-alike form:
*
@ -445,7 +484,17 @@ sds *sdssplitargs(char *line, int *argc) {
if (current == NULL) current = sdsempty();
while(!done) {
if (inq) {
if (*p == '\\' && *(p+1)) {
if (*p == '\\' && *(p+1) == 'x' &&
is_hex_digit(*(p+2)) &&
is_hex_digit(*(p+3)))
{
unsigned char byte;
byte = (hex_digit_to_int(*(p+2))*16)+
hex_digit_to_int(*(p+3));
current = sdscatlen(current,(char*)&byte,1);
p += 3;
} else if (*p == '\\' && *(p+1)) {
char c;
p++;

View File

@ -28,55 +28,7 @@ A million repetitions of "a"
#include "solarisfixes.h"
#endif
#include "sha1.h"
#ifndef BYTE_ORDER
#if (BSD >= 199103)
# include <machine/endian.h>
#else
#if defined(linux) || defined(__linux__)
# include <endian.h>
#else
#define LITTLE_ENDIAN 1234 /* least-significant byte first (vax, pc) */
#define BIG_ENDIAN 4321 /* most-significant byte first (IBM, net) */
#define PDP_ENDIAN 3412 /* LSB first in word, MSW first in long (pdp)*/
#if defined(vax) || defined(ns32000) || defined(sun386) || defined(__i386__) || \
defined(MIPSEL) || defined(_MIPSEL) || defined(BIT_ZERO_ON_RIGHT) || \
defined(__alpha__) || defined(__alpha)
#define BYTE_ORDER LITTLE_ENDIAN
#endif
#if defined(sel) || defined(pyr) || defined(mc68000) || defined(sparc) || \
defined(is68k) || defined(tahoe) || defined(ibm032) || defined(ibm370) || \
defined(MIPSEB) || defined(_MIPSEB) || defined(_IBMR2) || defined(DGUX) ||\
defined(apollo) || defined(__convex__) || defined(_CRAY) || \
defined(__hppa) || defined(__hp9000) || \
defined(__hp9000s300) || defined(__hp9000s700) || \
defined (BIT_ZERO_ON_LEFT) || defined(m68k) || defined(__sparc)
#define BYTE_ORDER BIG_ENDIAN
#endif
#endif /* linux */
#endif /* BSD */
#endif /* BYTE_ORDER */
#if defined(__BYTE_ORDER) && !defined(BYTE_ORDER)
#if (__BYTE_ORDER == __LITTLE_ENDIAN)
#define BYTE_ORDER LITTLE_ENDIAN
#else
#define BYTE_ORDER BIG_ENDIAN
#endif
#endif
#if !defined(BYTE_ORDER) || \
(BYTE_ORDER != BIG_ENDIAN && BYTE_ORDER != LITTLE_ENDIAN && \
BYTE_ORDER != PDP_ENDIAN)
/* you must determine what the correct bit order is for
* your compiler - the next line is an intentional error
* which will force your compiles to bomb until you fix
* the above macros.
*/
#error "Undefined or invalid BYTE_ORDER"
#endif
#include "config.h"
#define rol(value, bits) (((value) << (bits)) | ((value) >> (32 - (bits))))

View File

@ -107,6 +107,7 @@ int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
int fwriteBulkString(FILE *fp, char *s, unsigned long len) {
char cbuf[128];
int clen;
cbuf[0] = '$';
clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len);
cbuf[clen++] = '\r';
@ -117,6 +118,19 @@ int fwriteBulkString(FILE *fp, char *s, unsigned long len) {
return 1;
}
/* Write a multi bulk count in the form "*<count>\r\n" */
int fwriteBulkCount(FILE *fp, char prefix, int count) {
char cbuf[128];
int clen;
cbuf[0] = prefix;
clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
cbuf[clen++] = '\r';
cbuf[clen++] = '\n';
if (fwrite(cbuf,clen,1,fp) == 0) return 0;
return 1;
}
/* Write a double value in bulk format $<count>\r\n<payload>\r\n */
int fwriteBulkDouble(FILE *fp, double d) {
char buf[128], dbuf[128];

View File

@ -68,6 +68,7 @@
#include <limits.h>
#include "zmalloc.h"
#include "ziplist.h"
#include "endian.h"
int ll2string(char *s, size_t len, long long value);
@ -207,6 +208,7 @@ static unsigned int zipPrevDecodeLength(unsigned char *p, unsigned int *lensize)
} else {
if (lensize) *lensize = 1+sizeof(len);
memcpy(&len,p+1,sizeof(len));
memrev32ifbe(&len);
}
return len;
}
@ -223,6 +225,7 @@ static unsigned int zipPrevEncodeLength(unsigned char *p, unsigned int len) {
} else {
p[0] = ZIP_BIGLEN;
memcpy(p+1,&len,sizeof(len));
memrev32ifbe(p+1);
return 1+sizeof(len);
}
}
@ -234,6 +237,7 @@ static void zipPrevEncodeLengthForceLarge(unsigned char *p, unsigned int len) {
if (p == NULL) return;
p[0] = ZIP_BIGLEN;
memcpy(p+1,&len,sizeof(len));
memrev32ifbe(p+1);
}
/* Return the difference in number of bytes needed to store the new length
@ -287,12 +291,15 @@ static void zipSaveInteger(unsigned char *p, int64_t value, unsigned char encodi
if (encoding == ZIP_INT_16B) {
i16 = value;
memcpy(p,&i16,sizeof(i16));
memrev16ifbe(p);
} else if (encoding == ZIP_INT_32B) {
i32 = value;
memcpy(p,&i32,sizeof(i32));
memrev32ifbe(p);
} else if (encoding == ZIP_INT_64B) {
i64 = value;
memcpy(p,&i64,sizeof(i64));
memrev64ifbe(p);
} else {
assert(NULL);
}
@ -305,12 +312,15 @@ static int64_t zipLoadInteger(unsigned char *p, unsigned char encoding) {
int64_t i64, ret = 0;
if (encoding == ZIP_INT_16B) {
memcpy(&i16,p,sizeof(i16));
memrev16ifbe(&i16);
ret = i16;
} else if (encoding == ZIP_INT_32B) {
memcpy(&i32,p,sizeof(i32));
memrev16ifbe(&i32);
ret = i32;
} else if (encoding == ZIP_INT_64B) {
memcpy(&i64,p,sizeof(i64));
memrev16ifbe(&i64);
ret = i64;
} else {
assert(NULL);

View File

@ -80,6 +80,7 @@
#include <string.h>
#include <assert.h>
#include "zmalloc.h"
#include "endian.h"
#define ZIPMAP_BIGLEN 254
#define ZIPMAP_END 255
@ -108,6 +109,7 @@ static unsigned int zipmapDecodeLength(unsigned char *p) {
if (len < ZIPMAP_BIGLEN) return len;
memcpy(&len,p+1,sizeof(unsigned int));
memrev32ifbe(&len);
return len;
}
@ -123,6 +125,7 @@ static unsigned int zipmapEncodeLength(unsigned char *p, unsigned int len) {
} else {
p[0] = ZIPMAP_BIGLEN;
memcpy(p+1,&len,sizeof(len));
memrev32ifbe(p+1);
return 1+sizeof(len);
}
}