Merge branch '2.4-issue327' into 2.4

This commit is contained in:
antirez 2012-02-20 17:30:35 +01:00
commit cf7d3f9d3d
7 changed files with 198 additions and 40 deletions

View File

@ -192,21 +192,27 @@ slave-serve-stale-data yes
# maxclients 128 # maxclients 128
# Don't use more memory than the specified amount of bytes. # Don't use more memory than the specified amount of bytes.
# When the memory limit is reached Redis will try to remove keys with an # When the memory limit is reached Redis will try to remove keys
# EXPIRE set. It will try to start freeing keys that are going to expire # accordingly to the eviction policy selected (see maxmemmory-policy).
# in little time and preserve keys with a longer time to live.
# Redis will also try to remove objects from free lists if possible.
# #
# If all this fails, Redis will start to reply with errors to commands # If Redis can't remove keys according to the policy, or if the policy is
# that will use more memory, like SET, LPUSH, and so on, and will continue # set to 'noeviction', Redis will start to reply with errors to commands
# to reply to most read-only commands like GET. # that would use more memory, like SET, LPUSH, and so on, and will continue
# to reply to read-only commands like GET.
# #
# WARNING: maxmemory can be a good idea mainly if you want to use Redis as a # This option is usually useful when using Redis as an LRU cache, or to set
# 'state' server or cache, not as a real DB. When Redis is used as a real # an hard memory limit for an instance (using the 'noeviction' policy).
# database the memory usage will grow over the weeks, it will be obvious if #
# it is going to use too much memory in the long run, and you'll have the time # WARNING: If you have slaves attached to an instance with maxmemory on,
# to upgrade. With maxmemory after the limit is reached you'll start to get # the size of the output buffers needed to feed the slaves are subtracted
# errors for write operations, and this may even lead to DB inconsistency. # from the used memory count, so that network problems / resyncs will
# not trigger a loop where keys are evicted, and in turn the output
# buffer of slaves is full with DELs of keys evicted triggering the deletion
# of more keys, and so forth until the database is completely emptied.
#
# In short... if you have slaves attached it is suggested that you set a lower
# limit for maxmemory so that there is some free RAM on the system for slave
# output buffers (but this is not needed if the policy is 'noeviction').
# #
# maxmemory <bytes> # maxmemory <bytes>

View File

@ -261,6 +261,7 @@ struct redisClient *createFakeClient(void) {
* so that Redis will not try to send replies to this client. */ * so that Redis will not try to send replies to this client. */
c->replstate = REDIS_REPL_WAIT_BGSAVE_START; c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
c->reply = listCreate(); c->reply = listCreate();
c->reply_bytes = 0;
c->watched_keys = listCreate(); c->watched_keys = listCreate();
listSetFreeMethod(c->reply,decrRefCount); listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue);

View File

@ -3,6 +3,14 @@
static void setProtocolError(redisClient *c, int pos); static void setProtocolError(redisClient *c, int pos);
/* To evaluate the output buffer size of a client we need to get size of
* allocated objects, however we can't used zmalloc_size() directly on sds
* strings because of the trick they use to work (the header is before the
* returned pointer), so we use this helper function. */
size_t zmalloc_size_sds(sds s) {
return zmalloc_size(s-sizeof(struct sdshdr));
}
void *dupClientReplyValue(void *o) { void *dupClientReplyValue(void *o) {
incrRefCount((robj*)o); incrRefCount((robj*)o);
return o; return o;
@ -41,6 +49,7 @@ redisClient *createClient(int fd) {
c->authenticated = 0; c->authenticated = 0;
c->replstate = REDIS_REPL_NONE; c->replstate = REDIS_REPL_NONE;
c->reply = listCreate(); c->reply = listCreate();
c->reply_bytes = 0;
listSetFreeMethod(c->reply,decrRefCount); listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue);
c->bpop.keys = NULL; c->bpop.keys = NULL;
@ -116,6 +125,7 @@ void _addReplyObjectToList(redisClient *c, robj *o) {
if (listLength(c->reply) == 0) { if (listLength(c->reply) == 0) {
incrRefCount(o); incrRefCount(o);
listAddNodeTail(c->reply,o); listAddNodeTail(c->reply,o);
c->reply_bytes += zmalloc_size_sds(o->ptr);
} else { } else {
tail = listNodeValue(listLast(c->reply)); tail = listNodeValue(listLast(c->reply));
@ -123,11 +133,14 @@ void _addReplyObjectToList(redisClient *c, robj *o) {
if (tail->ptr != NULL && if (tail->ptr != NULL &&
sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES) sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
{ {
c->reply_bytes -= zmalloc_size_sds(tail->ptr);
tail = dupLastObjectIfNeeded(c->reply); tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr)); tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
c->reply_bytes += zmalloc_size_sds(tail->ptr);
} else { } else {
incrRefCount(o); incrRefCount(o);
listAddNodeTail(c->reply,o); listAddNodeTail(c->reply,o);
c->reply_bytes += zmalloc_size_sds(o->ptr);
} }
} }
} }
@ -144,6 +157,7 @@ void _addReplySdsToList(redisClient *c, sds s) {
if (listLength(c->reply) == 0) { if (listLength(c->reply) == 0) {
listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
c->reply_bytes += zmalloc_size_sds(s);
} else { } else {
tail = listNodeValue(listLast(c->reply)); tail = listNodeValue(listLast(c->reply));
@ -151,11 +165,14 @@ void _addReplySdsToList(redisClient *c, sds s) {
if (tail->ptr != NULL && if (tail->ptr != NULL &&
sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES) sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
{ {
c->reply_bytes -= zmalloc_size_sds(tail->ptr);
tail = dupLastObjectIfNeeded(c->reply); tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,s,sdslen(s)); tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
c->reply_bytes += zmalloc_size_sds(tail->ptr);
sdsfree(s); sdsfree(s);
} else { } else {
listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
c->reply_bytes += zmalloc_size_sds(s);
} }
} }
} }
@ -166,7 +183,10 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) {
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return; if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
if (listLength(c->reply) == 0) { if (listLength(c->reply) == 0) {
listAddNodeTail(c->reply,createStringObject(s,len)); robj *o = createStringObject(s,len);
listAddNodeTail(c->reply,o);
c->reply_bytes += zmalloc_size_sds(o->ptr);
} else { } else {
tail = listNodeValue(listLast(c->reply)); tail = listNodeValue(listLast(c->reply));
@ -174,10 +194,15 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) {
if (tail->ptr != NULL && if (tail->ptr != NULL &&
sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES) sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
{ {
c->reply_bytes -= zmalloc_size_sds(tail->ptr);
tail = dupLastObjectIfNeeded(c->reply); tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,s,len); tail->ptr = sdscatlen(tail->ptr,s,len);
c->reply_bytes += zmalloc_size_sds(tail->ptr);
} else { } else {
listAddNodeTail(c->reply,createStringObject(s,len)); robj *o = createStringObject(s,len);
listAddNodeTail(c->reply,o);
c->reply_bytes += zmalloc_size_sds(o->ptr);
} }
} }
} }
@ -291,6 +316,7 @@ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
len = listNodeValue(ln); len = listNodeValue(ln);
len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length); len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
c->reply_bytes += zmalloc_size_sds(len->ptr);
if (ln->next != NULL) { if (ln->next != NULL) {
next = listNodeValue(ln->next); next = listNodeValue(ln->next);
@ -398,6 +424,7 @@ void copyClientOutputBuffer(redisClient *dst, redisClient *src) {
dst->reply = listDup(src->reply); dst->reply = listDup(src->reply);
memcpy(dst->buf,src->buf,src->bufpos); memcpy(dst->buf,src->buf,src->bufpos);
dst->bufpos = src->bufpos; dst->bufpos = src->bufpos;
dst->reply_bytes = src->reply_bytes;
} }
static void acceptCommonHandler(int fd) { static void acceptCommonHandler(int fd) {
@ -566,6 +593,7 @@ void freeClient(redisClient *c) {
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = privdata; redisClient *c = privdata;
int nwritten = 0, totwritten = 0, objlen; int nwritten = 0, totwritten = 0, objlen;
size_t objmem;
robj *o; robj *o;
REDIS_NOTUSED(el); REDIS_NOTUSED(el);
REDIS_NOTUSED(mask); REDIS_NOTUSED(mask);
@ -591,6 +619,7 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
} else { } else {
o = listNodeValue(listFirst(c->reply)); o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o->ptr); objlen = sdslen(o->ptr);
objmem = zmalloc_size_sds(o->ptr);
if (objlen == 0) { if (objlen == 0) {
listDelNode(c->reply,listFirst(c->reply)); listDelNode(c->reply,listFirst(c->reply));
@ -611,14 +640,20 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
if (c->sentlen == objlen) { if (c->sentlen == objlen) {
listDelNode(c->reply,listFirst(c->reply)); listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0; c->sentlen = 0;
c->reply_bytes -= objmem;
} }
} }
/* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT /* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve * bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from * other clients as well, even if a very large request comes from
* super fast link that is always able to accept data (in real world * super fast link that is always able to accept data (in real world
* scenario think about 'KEYS *' against the loopback interfae) */ * scenario think about 'KEYS *' against the loopback interface).
if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break; *
* However if we are over the maxmemory limit we ignore that and
* just deliver as much data as it is possible to deliver. */
if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory)) break;
} }
if (nwritten == -1) { if (nwritten == -1) {
if (errno == EAGAIN) { if (errno == EAGAIN) {
@ -1060,3 +1095,43 @@ void rewriteClientCommandVector(redisClient *c, int argc, ...) {
redisAssert(c->cmd != NULL); redisAssert(c->cmd != NULL);
va_end(ap); va_end(ap);
} }
/* This function returns the number of bytes that Redis is virtually
* using to store the reply still not read by the client.
* It is "virtual" since the reply output list may contain objects that
* are shared and are not really using additional memory.
*
* The function returns the total sum of the length of all the objects
* stored in the output list, plus the memory used to allocate every
* list node. The static reply buffer is not taken into account since it
* is allocated anyway.
*
* Note: this function is very fast so can be called as many time as
* the caller wishes. The main usage of this function currently is
* enforcing the client output length limits. */
unsigned long getClientOutputBufferMemoryUsage(redisClient *c) {
unsigned long list_item_size = sizeof(listNode)+sizeof(robj);
return c->reply_bytes + (list_item_size*listLength(c->reply));
}
/* Helper function used by freeMemoryIfNeeded() in order to flush slaves
* output buffers without returning control to the event loop. */
void flushSlavesOutputBuffers(void) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = listNodeValue(ln);
int events;
events = aeGetFileEvents(server.el,slave->fd);
if (events & AE_WRITABLE &&
slave->replstate == REDIS_REPL_ONLINE &&
listLength(slave->reply))
{
sendReplyToClient(server.el,slave->fd,slave,0);
}
}
}

View File

@ -1096,12 +1096,13 @@ int processCommand(redisClient *c) {
* First we try to free some memory if possible (if there are volatile * First we try to free some memory if possible (if there are volatile
* keys in the dataset). If there are not the only thing we can do * keys in the dataset). If there are not the only thing we can do
* is returning an error. */ * is returning an error. */
if (server.maxmemory) freeMemoryIfNeeded(); if (server.maxmemory) {
if (server.maxmemory && (c->cmd->flags & REDIS_CMD_DENYOOM) && int retval = freeMemoryIfNeeded();
zmalloc_used_memory() > server.maxmemory) if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
{ addReplyError(c,
addReplyError(c,"command not allowed when used memory > 'maxmemory'"); "command not allowed when used memory > 'maxmemory'");
return REDIS_OK; return REDIS_OK;
}
} }
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
@ -1528,23 +1529,57 @@ void monitorCommand(redisClient *c) {
/* ============================ Maxmemory directive ======================== */ /* ============================ Maxmemory directive ======================== */
/* This function gets called when 'maxmemory' is set on the config file to limit /* This function gets called when 'maxmemory' is set on the config file to limit
* the max memory used by the server, and we are out of memory. * the max memory used by the server, before processing a command.
* This function will try to, in order:
* *
* - Free objects from the free list * The goal of the function is to free enough memory to keep Redis under the
* - Try to remove keys with an EXPIRE set * configured memory limit.
* *
* It is not possible to free enough memory to reach used-memory < maxmemory * The function starts calculating how many bytes should be freed to keep
* the server will start refusing commands that will enlarge even more the * Redis under the limit, and enters a loop selecting the best keys to
* memory usage. * evict accordingly to the configured policy.
*
* If all the bytes needed to return back under the limit were freed the
* function returns REDIS_OK, otherwise REDIS_ERR is returned, and the caller
* should block the execution of commands that will result in more memory
* used by the server.
*/ */
void freeMemoryIfNeeded(void) { int freeMemoryIfNeeded(void) {
/* Remove keys accordingly to the active policy as long as we are size_t mem_used, mem_tofree, mem_freed;
* over the memory limit. */ int slaves = listLength(server.slaves);
if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION) return;
while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) { /* Remove the size of slaves output buffers and AOF buffer from the
int j, k, freed = 0; * count of used memory. */
mem_used = zmalloc_used_memory();
if (slaves) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = listNodeValue(ln);
unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
if (obuf_bytes > mem_used)
mem_used = 0;
else
mem_used -= obuf_bytes;
}
}
if (server.appendonly) {
mem_used -= sdslen(server.aofbuf);
mem_used -= sdslen(server.bgrewritebuf);
}
/* Check if we are over the memory limit. */
if (mem_used <= server.maxmemory) return REDIS_OK;
if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)
return REDIS_ERR; /* We need to free memory, but policy forbids. */
/* Compute how much memory we need to free. */
mem_tofree = mem_used - server.maxmemory;
mem_freed = 0;
while (mem_freed < mem_tofree) {
int j, k, keys_freed = 0;
for (j = 0; j < server.dbnum; j++) { for (j = 0; j < server.dbnum; j++) {
long bestval = 0; /* just to prevent warning */ long bestval = 0; /* just to prevent warning */
@ -1617,16 +1652,36 @@ void freeMemoryIfNeeded(void) {
/* Finally remove the selected key. */ /* Finally remove the selected key. */
if (bestkey) { if (bestkey) {
long long delta;
robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
propagateExpire(db,keyobj); propagateExpire(db,keyobj);
/* We compute the amount of memory freed by dbDelete() alone.
* It is possible that actually the memory needed to propagate
* the DEL in AOF and replication link is greater than the one
* we are freeing removing the key, but we can't account for
* that otherwise we would never exit the loop.
*
* AOF and Output buffer memory will be freed eventually so
* we only care about memory used by the key space. */
delta = (long long) zmalloc_used_memory();
dbDelete(db,keyobj); dbDelete(db,keyobj);
delta -= (long long) zmalloc_used_memory();
mem_freed += delta;
server.stat_evictedkeys++; server.stat_evictedkeys++;
decrRefCount(keyobj); decrRefCount(keyobj);
freed++; keys_freed++;
/* When the memory to free starts to be big enough, we may
* start spending so much time here that is impossible to
* deliver data to the slaves fast enough, so we force the
* transmission here inside the loop. */
if (slaves) flushSlavesOutputBuffers();
} }
} }
if (!freed) return; /* nothing to free... */ if (!keys_freed) return REDIS_ERR; /* nothing to free... */
} }
return REDIS_OK;
} }
/* =================================== Main! ================================ */ /* =================================== Main! ================================ */

View File

@ -339,6 +339,7 @@ typedef struct redisClient {
int multibulklen; /* number of multi bulk arguments left to read */ int multibulklen; /* number of multi bulk arguments left to read */
long bulklen; /* length of bulk argument in multi bulk request */ long bulklen; /* length of bulk argument in multi bulk request */
list *reply; list *reply;
unsigned long reply_bytes; /* Tot bytes of objects in reply list */
int sentlen; int sentlen;
time_t lastinteraction; /* time of the last interaction, used for timeout */ time_t lastinteraction; /* time of the last interaction, used for timeout */
int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */ int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
@ -715,6 +716,8 @@ void getClientsMaxBuffers(unsigned long *longest_output_list,
sds getClientInfoString(redisClient *client); sds getClientInfoString(redisClient *client);
sds getAllClientsInfoString(void); sds getAllClientsInfoString(void);
void rewriteClientCommandVector(redisClient *c, int argc, ...); void rewriteClientCommandVector(redisClient *c, int argc, ...);
unsigned long getClientOutputBufferMemoryUsage(redisClient *c);
void flushSlavesOutputBuffers(void);
#ifdef __GNUC__ #ifdef __GNUC__
void addReplyErrorFormat(redisClient *c, const char *fmt, ...) void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
@ -838,7 +841,7 @@ unsigned int zsetLength(robj *zobj);
void zsetConvert(robj *zobj, int encoding); void zsetConvert(robj *zobj, int encoding);
/* Core functions */ /* Core functions */
void freeMemoryIfNeeded(void); int freeMemoryIfNeeded(void);
int processCommand(redisClient *c); int processCommand(redisClient *c);
void setupSignalHandlers(void); void setupSignalHandlers(void);
struct redisCommand *lookupCommand(sds name); struct redisCommand *lookupCommand(sds name);

View File

@ -150,6 +150,20 @@ void *zrealloc(void *ptr, size_t size) {
#endif #endif
} }
/* Provide zmalloc_size() for systems where this function is not provided by
* malloc itself, given that in that case we store an header with this
* information as the first bytes of every allocation. */
#ifndef HAVE_MALLOC_SIZE
size_t zmalloc_size(void *ptr) {
void *realptr = (char*)ptr-PREFIX_SIZE;
size_t size = *((size_t*)realptr);
/* Assume at least that all the allocations are padded at sizeof(long) by
* the underlying allocator. */
if (size&(sizeof(long)-1)) size += sizeof(long)-(size&(sizeof(long)-1));
return size+PREFIX_SIZE;
}
#endif
void zfree(void *ptr) { void zfree(void *ptr) {
#ifndef HAVE_MALLOC_SIZE #ifndef HAVE_MALLOC_SIZE
void *realptr; void *realptr;

View File

@ -76,4 +76,8 @@ void zmalloc_enable_thread_safeness(void);
float zmalloc_get_fragmentation_ratio(void); float zmalloc_get_fragmentation_ratio(void);
size_t zmalloc_get_rss(void); size_t zmalloc_get_rss(void);
#ifndef HAVE_MALLOC_SIZE
size_t zmalloc_size(void *ptr);
#endif
#endif /* __ZMALLOC_H */ #endif /* __ZMALLOC_H */