mirror of
https://github.com/fluencelabs/redis
synced 2025-05-21 22:51:20 +00:00
Merge branch 'issue327' into unstable
This commit is contained in:
commit
48a32944e6
32
redis.conf
32
redis.conf
@ -201,21 +201,27 @@ slave-serve-stale-data yes
|
|||||||
# maxclients 10000
|
# maxclients 10000
|
||||||
|
|
||||||
# Don't use more memory than the specified amount of bytes.
|
# Don't use more memory than the specified amount of bytes.
|
||||||
# When the memory limit is reached Redis will try to remove keys with an
|
# When the memory limit is reached Redis will try to remove keys
|
||||||
# EXPIRE set. It will try to start freeing keys that are going to expire
|
# accordingly to the eviction policy selected (see maxmemmory-policy).
|
||||||
# in little time and preserve keys with a longer time to live.
|
|
||||||
# Redis will also try to remove objects from free lists if possible.
|
|
||||||
#
|
#
|
||||||
# If all this fails, Redis will start to reply with errors to commands
|
# If Redis can't remove keys according to the policy, or if the policy is
|
||||||
# that will use more memory, like SET, LPUSH, and so on, and will continue
|
# set to 'noeviction', Redis will start to reply with errors to commands
|
||||||
# to reply to most read-only commands like GET.
|
# that would use more memory, like SET, LPUSH, and so on, and will continue
|
||||||
|
# to reply to read-only commands like GET.
|
||||||
#
|
#
|
||||||
# WARNING: maxmemory can be a good idea mainly if you want to use Redis as a
|
# This option is usually useful when using Redis as an LRU cache, or to set
|
||||||
# 'state' server or cache, not as a real DB. When Redis is used as a real
|
# an hard memory limit for an instance (using the 'noeviction' policy).
|
||||||
# database the memory usage will grow over the weeks, it will be obvious if
|
#
|
||||||
# it is going to use too much memory in the long run, and you'll have the time
|
# WARNING: If you have slaves attached to an instance with maxmemory on,
|
||||||
# to upgrade. With maxmemory after the limit is reached you'll start to get
|
# the size of the output buffers needed to feed the slaves are subtracted
|
||||||
# errors for write operations, and this may even lead to DB inconsistency.
|
# from the used memory count, so that network problems / resyncs will
|
||||||
|
# not trigger a loop where keys are evicted, and in turn the output
|
||||||
|
# buffer of slaves is full with DELs of keys evicted triggering the deletion
|
||||||
|
# of more keys, and so forth until the database is completely emptied.
|
||||||
|
#
|
||||||
|
# In short... if you have slaves attached it is suggested that you set a lower
|
||||||
|
# limit for maxmemory so that there is some free RAM on the system for slave
|
||||||
|
# output buffers (but this is not needed if the policy is 'noeviction').
|
||||||
#
|
#
|
||||||
# maxmemory <bytes>
|
# maxmemory <bytes>
|
||||||
|
|
||||||
|
5
src/db.c
5
src/db.c
@ -488,9 +488,10 @@ long long getExpire(redisDb *db, robj *key) {
|
|||||||
void propagateExpire(redisDb *db, robj *key) {
|
void propagateExpire(redisDb *db, robj *key) {
|
||||||
robj *argv[2];
|
robj *argv[2];
|
||||||
|
|
||||||
argv[0] = createStringObject("DEL",3);
|
argv[0] = shared.del;
|
||||||
argv[1] = key;
|
argv[1] = key;
|
||||||
incrRefCount(key);
|
incrRefCount(argv[0]);
|
||||||
|
incrRefCount(argv[1]);
|
||||||
|
|
||||||
if (server.aof_state != REDIS_AOF_OFF)
|
if (server.aof_state != REDIS_AOF_OFF)
|
||||||
feedAppendOnlyFile(server.delCommand,db->id,argv,2);
|
feedAppendOnlyFile(server.delCommand,db->id,argv,2);
|
||||||
|
@ -3,6 +3,14 @@
|
|||||||
|
|
||||||
static void setProtocolError(redisClient *c, int pos);
|
static void setProtocolError(redisClient *c, int pos);
|
||||||
|
|
||||||
|
/* To evaluate the output buffer size of a client we need to get size of
|
||||||
|
* allocated objects, however we can't used zmalloc_size() directly on sds
|
||||||
|
* strings because of the trick they use to work (the header is before the
|
||||||
|
* returned pointer), so we use this helper function. */
|
||||||
|
size_t zmalloc_size_sds(sds s) {
|
||||||
|
return zmalloc_size(s-sizeof(struct sdshdr));
|
||||||
|
}
|
||||||
|
|
||||||
void *dupClientReplyValue(void *o) {
|
void *dupClientReplyValue(void *o) {
|
||||||
incrRefCount((robj*)o);
|
incrRefCount((robj*)o);
|
||||||
return o;
|
return o;
|
||||||
@ -137,6 +145,7 @@ void _addReplyObjectToList(redisClient *c, robj *o) {
|
|||||||
if (listLength(c->reply) == 0) {
|
if (listLength(c->reply) == 0) {
|
||||||
incrRefCount(o);
|
incrRefCount(o);
|
||||||
listAddNodeTail(c->reply,o);
|
listAddNodeTail(c->reply,o);
|
||||||
|
c->reply_bytes += zmalloc_size_sds(o->ptr);
|
||||||
} else {
|
} else {
|
||||||
tail = listNodeValue(listLast(c->reply));
|
tail = listNodeValue(listLast(c->reply));
|
||||||
|
|
||||||
@ -144,14 +153,16 @@ void _addReplyObjectToList(redisClient *c, robj *o) {
|
|||||||
if (tail->ptr != NULL &&
|
if (tail->ptr != NULL &&
|
||||||
sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
|
sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
|
||||||
{
|
{
|
||||||
|
c->reply_bytes -= zmalloc_size_sds(tail->ptr);
|
||||||
tail = dupLastObjectIfNeeded(c->reply);
|
tail = dupLastObjectIfNeeded(c->reply);
|
||||||
tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
|
tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
|
||||||
|
c->reply_bytes += zmalloc_size_sds(tail->ptr);
|
||||||
} else {
|
} else {
|
||||||
incrRefCount(o);
|
incrRefCount(o);
|
||||||
listAddNodeTail(c->reply,o);
|
listAddNodeTail(c->reply,o);
|
||||||
|
c->reply_bytes += zmalloc_size_sds(o->ptr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c->reply_bytes += sdslen(o->ptr);
|
|
||||||
asyncCloseClientOnOutputBufferLimitReached(c);
|
asyncCloseClientOnOutputBufferLimitReached(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -165,9 +176,9 @@ void _addReplySdsToList(redisClient *c, sds s) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
c->reply_bytes += sdslen(s);
|
|
||||||
if (listLength(c->reply) == 0) {
|
if (listLength(c->reply) == 0) {
|
||||||
listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
|
listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
|
||||||
|
c->reply_bytes += zmalloc_size_sds(s);
|
||||||
} else {
|
} else {
|
||||||
tail = listNodeValue(listLast(c->reply));
|
tail = listNodeValue(listLast(c->reply));
|
||||||
|
|
||||||
@ -175,11 +186,14 @@ void _addReplySdsToList(redisClient *c, sds s) {
|
|||||||
if (tail->ptr != NULL &&
|
if (tail->ptr != NULL &&
|
||||||
sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
|
sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
|
||||||
{
|
{
|
||||||
|
c->reply_bytes -= zmalloc_size_sds(tail->ptr);
|
||||||
tail = dupLastObjectIfNeeded(c->reply);
|
tail = dupLastObjectIfNeeded(c->reply);
|
||||||
tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
|
tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
|
||||||
|
c->reply_bytes += zmalloc_size_sds(tail->ptr);
|
||||||
sdsfree(s);
|
sdsfree(s);
|
||||||
} else {
|
} else {
|
||||||
listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
|
listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
|
||||||
|
c->reply_bytes += zmalloc_size_sds(s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
asyncCloseClientOnOutputBufferLimitReached(c);
|
asyncCloseClientOnOutputBufferLimitReached(c);
|
||||||
@ -191,7 +205,10 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) {
|
|||||||
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
|
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
|
||||||
|
|
||||||
if (listLength(c->reply) == 0) {
|
if (listLength(c->reply) == 0) {
|
||||||
listAddNodeTail(c->reply,createStringObject(s,len));
|
robj *o = createStringObject(s,len);
|
||||||
|
|
||||||
|
listAddNodeTail(c->reply,o);
|
||||||
|
c->reply_bytes += zmalloc_size_sds(o->ptr);
|
||||||
} else {
|
} else {
|
||||||
tail = listNodeValue(listLast(c->reply));
|
tail = listNodeValue(listLast(c->reply));
|
||||||
|
|
||||||
@ -199,13 +216,17 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) {
|
|||||||
if (tail->ptr != NULL &&
|
if (tail->ptr != NULL &&
|
||||||
sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
|
sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
|
||||||
{
|
{
|
||||||
|
c->reply_bytes -= zmalloc_size_sds(tail->ptr);
|
||||||
tail = dupLastObjectIfNeeded(c->reply);
|
tail = dupLastObjectIfNeeded(c->reply);
|
||||||
tail->ptr = sdscatlen(tail->ptr,s,len);
|
tail->ptr = sdscatlen(tail->ptr,s,len);
|
||||||
|
c->reply_bytes += zmalloc_size_sds(tail->ptr);
|
||||||
} else {
|
} else {
|
||||||
listAddNodeTail(c->reply,createStringObject(s,len));
|
robj *o = createStringObject(s,len);
|
||||||
|
|
||||||
|
listAddNodeTail(c->reply,o);
|
||||||
|
c->reply_bytes += zmalloc_size_sds(o->ptr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c->reply_bytes += len;
|
|
||||||
asyncCloseClientOnOutputBufferLimitReached(c);
|
asyncCloseClientOnOutputBufferLimitReached(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -336,7 +357,7 @@ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
|
|||||||
|
|
||||||
len = listNodeValue(ln);
|
len = listNodeValue(ln);
|
||||||
len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
|
len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
|
||||||
c->reply_bytes += sdslen(len->ptr);
|
c->reply_bytes += zmalloc_size_sds(len->ptr);
|
||||||
if (ln->next != NULL) {
|
if (ln->next != NULL) {
|
||||||
next = listNodeValue(ln->next);
|
next = listNodeValue(ln->next);
|
||||||
|
|
||||||
@ -363,6 +384,18 @@ void addReplyDouble(redisClient *c, double d) {
|
|||||||
void addReplyLongLongWithPrefix(redisClient *c, long long ll, char prefix) {
|
void addReplyLongLongWithPrefix(redisClient *c, long long ll, char prefix) {
|
||||||
char buf[128];
|
char buf[128];
|
||||||
int len;
|
int len;
|
||||||
|
|
||||||
|
/* Things like $3\r\n or *2\r\n are emitted very often by the protocol
|
||||||
|
* so we have a few shared objects to use if the integer is small
|
||||||
|
* like it is most of the times. */
|
||||||
|
if (prefix == '*' && ll < REDIS_SHARED_BULKHDR_LEN) {
|
||||||
|
addReply(c,shared.mbulkhdr[ll]);
|
||||||
|
return;
|
||||||
|
} else if (prefix == '$' && ll < REDIS_SHARED_BULKHDR_LEN) {
|
||||||
|
addReply(c,shared.bulkhdr[ll]);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
buf[0] = prefix;
|
buf[0] = prefix;
|
||||||
len = ll2string(buf+1,sizeof(buf)-1,ll);
|
len = ll2string(buf+1,sizeof(buf)-1,ll);
|
||||||
buf[len+1] = '\r';
|
buf[len+1] = '\r';
|
||||||
@ -626,6 +659,7 @@ void freeClientsInAsyncFreeQueue(void) {
|
|||||||
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||||
redisClient *c = privdata;
|
redisClient *c = privdata;
|
||||||
int nwritten = 0, totwritten = 0, objlen;
|
int nwritten = 0, totwritten = 0, objlen;
|
||||||
|
size_t objmem;
|
||||||
robj *o;
|
robj *o;
|
||||||
REDIS_NOTUSED(el);
|
REDIS_NOTUSED(el);
|
||||||
REDIS_NOTUSED(mask);
|
REDIS_NOTUSED(mask);
|
||||||
@ -651,6 +685,7 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
} else {
|
} else {
|
||||||
o = listNodeValue(listFirst(c->reply));
|
o = listNodeValue(listFirst(c->reply));
|
||||||
objlen = sdslen(o->ptr);
|
objlen = sdslen(o->ptr);
|
||||||
|
objmem = zmalloc_size_sds(o->ptr);
|
||||||
|
|
||||||
if (objlen == 0) {
|
if (objlen == 0) {
|
||||||
listDelNode(c->reply,listFirst(c->reply));
|
listDelNode(c->reply,listFirst(c->reply));
|
||||||
@ -671,15 +706,20 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
if (c->sentlen == objlen) {
|
if (c->sentlen == objlen) {
|
||||||
listDelNode(c->reply,listFirst(c->reply));
|
listDelNode(c->reply,listFirst(c->reply));
|
||||||
c->sentlen = 0;
|
c->sentlen = 0;
|
||||||
c->reply_bytes -= objlen;
|
c->reply_bytes -= objmem;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
|
/* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT
|
||||||
* bytes, in a single threaded server it's a good idea to serve
|
* bytes, in a single threaded server it's a good idea to serve
|
||||||
* other clients as well, even if a very large request comes from
|
* other clients as well, even if a very large request comes from
|
||||||
* super fast link that is always able to accept data (in real world
|
* super fast link that is always able to accept data (in real world
|
||||||
* scenario think about 'KEYS *' against the loopback interfae) */
|
* scenario think about 'KEYS *' against the loopback interface).
|
||||||
if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
|
*
|
||||||
|
* However if we are over the maxmemory limit we ignore that and
|
||||||
|
* just deliver as much data as it is possible to deliver. */
|
||||||
|
if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
|
||||||
|
(server.maxmemory == 0 ||
|
||||||
|
zmalloc_used_memory() < server.maxmemory)) break;
|
||||||
}
|
}
|
||||||
if (nwritten == -1) {
|
if (nwritten == -1) {
|
||||||
if (errno == EAGAIN) {
|
if (errno == EAGAIN) {
|
||||||
@ -1205,7 +1245,7 @@ void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) {
|
|||||||
* the caller wishes. The main usage of this function currently is
|
* the caller wishes. The main usage of this function currently is
|
||||||
* enforcing the client output length limits. */
|
* enforcing the client output length limits. */
|
||||||
unsigned long getClientOutputBufferMemoryUsage(redisClient *c) {
|
unsigned long getClientOutputBufferMemoryUsage(redisClient *c) {
|
||||||
unsigned long list_item_size = sizeof(listNode);
|
unsigned long list_item_size = sizeof(listNode)+sizeof(robj);
|
||||||
|
|
||||||
return c->reply_bytes + (list_item_size*listLength(c->reply));
|
return c->reply_bytes + (list_item_size*listLength(c->reply));
|
||||||
}
|
}
|
||||||
@ -1298,3 +1338,24 @@ void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
|
|||||||
sdsfree(client);
|
sdsfree(client);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Helper function used by freeMemoryIfNeeded() in order to flush slaves
|
||||||
|
* output buffers without returning control to the event loop. */
|
||||||
|
void flushSlavesOutputBuffers(void) {
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
|
||||||
|
listRewind(server.slaves,&li);
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
|
redisClient *slave = listNodeValue(ln);
|
||||||
|
int events;
|
||||||
|
|
||||||
|
events = aeGetFileEvents(server.el,slave->fd);
|
||||||
|
if (events & AE_WRITABLE &&
|
||||||
|
slave->replstate == REDIS_REPL_ONLINE &&
|
||||||
|
listLength(slave->reply))
|
||||||
|
{
|
||||||
|
sendReplyToClient(server.el,slave->fd,slave,0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
12
src/pubsub.c
12
src/pubsub.c
@ -41,7 +41,7 @@ int pubsubSubscribeChannel(redisClient *c, robj *channel) {
|
|||||||
listAddNodeTail(clients,c);
|
listAddNodeTail(clients,c);
|
||||||
}
|
}
|
||||||
/* Notify the client */
|
/* Notify the client */
|
||||||
addReply(c,shared.mbulk3);
|
addReply(c,shared.mbulkhdr[3]);
|
||||||
addReply(c,shared.subscribebulk);
|
addReply(c,shared.subscribebulk);
|
||||||
addReplyBulk(c,channel);
|
addReplyBulk(c,channel);
|
||||||
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
|
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
|
||||||
@ -77,7 +77,7 @@ int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
|
|||||||
}
|
}
|
||||||
/* Notify the client */
|
/* Notify the client */
|
||||||
if (notify) {
|
if (notify) {
|
||||||
addReply(c,shared.mbulk3);
|
addReply(c,shared.mbulkhdr[3]);
|
||||||
addReply(c,shared.unsubscribebulk);
|
addReply(c,shared.unsubscribebulk);
|
||||||
addReplyBulk(c,channel);
|
addReplyBulk(c,channel);
|
||||||
addReplyLongLong(c,dictSize(c->pubsub_channels)+
|
addReplyLongLong(c,dictSize(c->pubsub_channels)+
|
||||||
@ -103,7 +103,7 @@ int pubsubSubscribePattern(redisClient *c, robj *pattern) {
|
|||||||
listAddNodeTail(server.pubsub_patterns,pat);
|
listAddNodeTail(server.pubsub_patterns,pat);
|
||||||
}
|
}
|
||||||
/* Notify the client */
|
/* Notify the client */
|
||||||
addReply(c,shared.mbulk3);
|
addReply(c,shared.mbulkhdr[3]);
|
||||||
addReply(c,shared.psubscribebulk);
|
addReply(c,shared.psubscribebulk);
|
||||||
addReplyBulk(c,pattern);
|
addReplyBulk(c,pattern);
|
||||||
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
|
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
|
||||||
@ -128,7 +128,7 @@ int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
|
|||||||
}
|
}
|
||||||
/* Notify the client */
|
/* Notify the client */
|
||||||
if (notify) {
|
if (notify) {
|
||||||
addReply(c,shared.mbulk3);
|
addReply(c,shared.mbulkhdr[3]);
|
||||||
addReply(c,shared.punsubscribebulk);
|
addReply(c,shared.punsubscribebulk);
|
||||||
addReplyBulk(c,pattern);
|
addReplyBulk(c,pattern);
|
||||||
addReplyLongLong(c,dictSize(c->pubsub_channels)+
|
addReplyLongLong(c,dictSize(c->pubsub_channels)+
|
||||||
@ -188,7 +188,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
|
|||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
redisClient *c = ln->value;
|
redisClient *c = ln->value;
|
||||||
|
|
||||||
addReply(c,shared.mbulk3);
|
addReply(c,shared.mbulkhdr[3]);
|
||||||
addReply(c,shared.messagebulk);
|
addReply(c,shared.messagebulk);
|
||||||
addReplyBulk(c,channel);
|
addReplyBulk(c,channel);
|
||||||
addReplyBulk(c,message);
|
addReplyBulk(c,message);
|
||||||
@ -206,7 +206,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
|
|||||||
sdslen(pat->pattern->ptr),
|
sdslen(pat->pattern->ptr),
|
||||||
(char*)channel->ptr,
|
(char*)channel->ptr,
|
||||||
sdslen(channel->ptr),0)) {
|
sdslen(channel->ptr),0)) {
|
||||||
addReply(pat->client,shared.mbulk4);
|
addReply(pat->client,shared.mbulkhdr[4]);
|
||||||
addReply(pat->client,shared.pmessagebulk);
|
addReply(pat->client,shared.pmessagebulk);
|
||||||
addReplyBulk(pat->client,pat->pattern);
|
addReplyBulk(pat->client,pat->pattern);
|
||||||
addReplyBulk(pat->client,channel);
|
addReplyBulk(pat->client,channel);
|
||||||
|
106
src/redis.c
106
src/redis.c
@ -851,12 +851,17 @@ void createSharedObjects(void) {
|
|||||||
shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
|
shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
|
||||||
shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
|
shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
|
||||||
shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
|
shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
|
||||||
shared.mbulk3 = createStringObject("*3\r\n",4);
|
shared.del = createStringObject("DEL",3);
|
||||||
shared.mbulk4 = createStringObject("*4\r\n",4);
|
|
||||||
for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
|
for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
|
||||||
shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
|
shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
|
||||||
shared.integers[j]->encoding = REDIS_ENCODING_INT;
|
shared.integers[j]->encoding = REDIS_ENCODING_INT;
|
||||||
}
|
}
|
||||||
|
for (j = 0; j < REDIS_SHARED_BULKHDR_LEN; j++) {
|
||||||
|
shared.mbulkhdr[j] = createObject(REDIS_STRING,
|
||||||
|
sdscatprintf(sdsempty(),"*%d\r\n",j));
|
||||||
|
shared.bulkhdr[j] = createObject(REDIS_STRING,
|
||||||
|
sdscatprintf(sdsempty(),"$%d\r\n",j));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void initServerConfig() {
|
void initServerConfig() {
|
||||||
@ -1270,12 +1275,13 @@ int processCommand(redisClient *c) {
|
|||||||
* First we try to free some memory if possible (if there are volatile
|
* First we try to free some memory if possible (if there are volatile
|
||||||
* keys in the dataset). If there are not the only thing we can do
|
* keys in the dataset). If there are not the only thing we can do
|
||||||
* is returning an error. */
|
* is returning an error. */
|
||||||
if (server.maxmemory) freeMemoryIfNeeded();
|
if (server.maxmemory) {
|
||||||
if (server.maxmemory && (c->cmd->flags & REDIS_CMD_DENYOOM) &&
|
int retval = freeMemoryIfNeeded();
|
||||||
zmalloc_used_memory() > server.maxmemory)
|
if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
|
||||||
{
|
addReplyError(c,
|
||||||
addReplyError(c,"command not allowed when used memory > 'maxmemory'");
|
"command not allowed when used memory > 'maxmemory'");
|
||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
|
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
|
||||||
@ -1778,23 +1784,57 @@ void monitorCommand(redisClient *c) {
|
|||||||
/* ============================ Maxmemory directive ======================== */
|
/* ============================ Maxmemory directive ======================== */
|
||||||
|
|
||||||
/* This function gets called when 'maxmemory' is set on the config file to limit
|
/* This function gets called when 'maxmemory' is set on the config file to limit
|
||||||
* the max memory used by the server, and we are out of memory.
|
* the max memory used by the server, before processing a command.
|
||||||
* This function will try to, in order:
|
|
||||||
*
|
*
|
||||||
* - Free objects from the free list
|
* The goal of the function is to free enough memory to keep Redis under the
|
||||||
* - Try to remove keys with an EXPIRE set
|
* configured memory limit.
|
||||||
*
|
*
|
||||||
* It is not possible to free enough memory to reach used-memory < maxmemory
|
* The function starts calculating how many bytes should be freed to keep
|
||||||
* the server will start refusing commands that will enlarge even more the
|
* Redis under the limit, and enters a loop selecting the best keys to
|
||||||
* memory usage.
|
* evict accordingly to the configured policy.
|
||||||
|
*
|
||||||
|
* If all the bytes needed to return back under the limit were freed the
|
||||||
|
* function returns REDIS_OK, otherwise REDIS_ERR is returned, and the caller
|
||||||
|
* should block the execution of commands that will result in more memory
|
||||||
|
* used by the server.
|
||||||
*/
|
*/
|
||||||
void freeMemoryIfNeeded(void) {
|
int freeMemoryIfNeeded(void) {
|
||||||
/* Remove keys accordingly to the active policy as long as we are
|
size_t mem_used, mem_tofree, mem_freed;
|
||||||
* over the memory limit. */
|
int slaves = listLength(server.slaves);
|
||||||
if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION) return;
|
|
||||||
|
|
||||||
while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
|
/* Remove the size of slaves output buffers and AOF buffer from the
|
||||||
int j, k, freed = 0;
|
* count of used memory. */
|
||||||
|
mem_used = zmalloc_used_memory();
|
||||||
|
if (slaves) {
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
|
||||||
|
listRewind(server.slaves,&li);
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
|
redisClient *slave = listNodeValue(ln);
|
||||||
|
unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
|
||||||
|
if (obuf_bytes > mem_used)
|
||||||
|
mem_used = 0;
|
||||||
|
else
|
||||||
|
mem_used -= obuf_bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (server.aof_state != REDIS_AOF_OFF) {
|
||||||
|
mem_used -= sdslen(server.aof_buf);
|
||||||
|
mem_used -= sdslen(server.aof_rewrite_buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Check if we are over the memory limit. */
|
||||||
|
if (mem_used <= server.maxmemory) return REDIS_OK;
|
||||||
|
|
||||||
|
if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)
|
||||||
|
return REDIS_ERR; /* We need to free memory, but policy forbids. */
|
||||||
|
|
||||||
|
/* Compute how much memory we need to free. */
|
||||||
|
mem_tofree = mem_used - server.maxmemory;
|
||||||
|
mem_freed = 0;
|
||||||
|
while (mem_freed < mem_tofree) {
|
||||||
|
int j, k, keys_freed = 0;
|
||||||
|
|
||||||
for (j = 0; j < server.dbnum; j++) {
|
for (j = 0; j < server.dbnum; j++) {
|
||||||
long bestval = 0; /* just to prevent warning */
|
long bestval = 0; /* just to prevent warning */
|
||||||
@ -1867,16 +1907,36 @@ void freeMemoryIfNeeded(void) {
|
|||||||
|
|
||||||
/* Finally remove the selected key. */
|
/* Finally remove the selected key. */
|
||||||
if (bestkey) {
|
if (bestkey) {
|
||||||
|
long long delta;
|
||||||
|
|
||||||
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
|
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
|
||||||
propagateExpire(db,keyobj);
|
propagateExpire(db,keyobj);
|
||||||
|
/* We compute the amount of memory freed by dbDelete() alone.
|
||||||
|
* It is possible that actually the memory needed to propagate
|
||||||
|
* the DEL in AOF and replication link is greater than the one
|
||||||
|
* we are freeing removing the key, but we can't account for
|
||||||
|
* that otherwise we would never exit the loop.
|
||||||
|
*
|
||||||
|
* AOF and Output buffer memory will be freed eventually so
|
||||||
|
* we only care about memory used by the key space. */
|
||||||
|
delta = (long long) zmalloc_used_memory();
|
||||||
dbDelete(db,keyobj);
|
dbDelete(db,keyobj);
|
||||||
|
delta -= (long long) zmalloc_used_memory();
|
||||||
|
mem_freed += delta;
|
||||||
server.stat_evictedkeys++;
|
server.stat_evictedkeys++;
|
||||||
decrRefCount(keyobj);
|
decrRefCount(keyobj);
|
||||||
freed++;
|
keys_freed++;
|
||||||
|
|
||||||
|
/* When the memory to free starts to be big enough, we may
|
||||||
|
* start spending so much time here that is impossible to
|
||||||
|
* deliver data to the slaves fast enough, so we force the
|
||||||
|
* transmission here inside the loop. */
|
||||||
|
if (slaves) flushSlavesOutputBuffers();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!freed) return; /* nothing to free... */
|
if (!keys_freed) return REDIS_ERR; /* nothing to free... */
|
||||||
}
|
}
|
||||||
|
return REDIS_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* =================================== Main! ================================ */
|
/* =================================== Main! ================================ */
|
||||||
|
12
src/redis.h
12
src/redis.h
@ -46,6 +46,7 @@
|
|||||||
#define REDIS_EXPIRELOOKUPS_PER_CRON 10 /* lookup 10 expires per loop */
|
#define REDIS_EXPIRELOOKUPS_PER_CRON 10 /* lookup 10 expires per loop */
|
||||||
#define REDIS_MAX_WRITE_PER_EVENT (1024*64)
|
#define REDIS_MAX_WRITE_PER_EVENT (1024*64)
|
||||||
#define REDIS_SHARED_INTEGERS 10000
|
#define REDIS_SHARED_INTEGERS 10000
|
||||||
|
#define REDIS_SHARED_BULKHDR_LEN 32
|
||||||
#define REDIS_MAX_LOGMSG_LEN 1024 /* Default maximum length of syslog messages */
|
#define REDIS_MAX_LOGMSG_LEN 1024 /* Default maximum length of syslog messages */
|
||||||
#define REDIS_AOF_REWRITE_PERC 100
|
#define REDIS_AOF_REWRITE_PERC 100
|
||||||
#define REDIS_AOF_REWRITE_MIN_SIZE (1024*1024)
|
#define REDIS_AOF_REWRITE_MIN_SIZE (1024*1024)
|
||||||
@ -358,9 +359,11 @@ struct sharedObjectsStruct {
|
|||||||
*outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *plus,
|
*outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *plus,
|
||||||
*select0, *select1, *select2, *select3, *select4,
|
*select0, *select1, *select2, *select3, *select4,
|
||||||
*select5, *select6, *select7, *select8, *select9,
|
*select5, *select6, *select7, *select8, *select9,
|
||||||
*messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *mbulk3,
|
*messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk,
|
||||||
*mbulk4, *psubscribebulk, *punsubscribebulk,
|
*psubscribebulk, *punsubscribebulk, *del,
|
||||||
*integers[REDIS_SHARED_INTEGERS];
|
*integers[REDIS_SHARED_INTEGERS],
|
||||||
|
*mbulkhdr[REDIS_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
|
||||||
|
*bulkhdr[REDIS_SHARED_BULKHDR_LEN]; /* "$<value>\r\n" */
|
||||||
};
|
};
|
||||||
|
|
||||||
/* ZSETs use a specialized version of Skiplists */
|
/* ZSETs use a specialized version of Skiplists */
|
||||||
@ -817,6 +820,7 @@ void freeClientsInAsyncFreeQueue(void);
|
|||||||
void asyncCloseClientOnOutputBufferLimitReached(redisClient *c);
|
void asyncCloseClientOnOutputBufferLimitReached(redisClient *c);
|
||||||
int getClientLimitClassByName(char *name);
|
int getClientLimitClassByName(char *name);
|
||||||
char *getClientLimitClassName(int class);
|
char *getClientLimitClassName(int class);
|
||||||
|
void flushSlavesOutputBuffers(void);
|
||||||
|
|
||||||
#ifdef __GNUC__
|
#ifdef __GNUC__
|
||||||
void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
|
void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
|
||||||
@ -940,7 +944,7 @@ unsigned int zsetLength(robj *zobj);
|
|||||||
void zsetConvert(robj *zobj, int encoding);
|
void zsetConvert(robj *zobj, int encoding);
|
||||||
|
|
||||||
/* Core functions */
|
/* Core functions */
|
||||||
void freeMemoryIfNeeded(void);
|
int freeMemoryIfNeeded(void);
|
||||||
int processCommand(redisClient *c);
|
int processCommand(redisClient *c);
|
||||||
void setupSignalHandlers(void);
|
void setupSignalHandlers(void);
|
||||||
struct redisCommand *lookupCommand(sds name);
|
struct redisCommand *lookupCommand(sds name);
|
||||||
|
@ -150,6 +150,20 @@ void *zrealloc(void *ptr, size_t size) {
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Provide zmalloc_size() for systems where this function is not provided by
|
||||||
|
* malloc itself, given that in that case we store an header with this
|
||||||
|
* information as the first bytes of every allocation. */
|
||||||
|
#ifndef HAVE_MALLOC_SIZE
|
||||||
|
size_t zmalloc_size(void *ptr) {
|
||||||
|
void *realptr = (char*)ptr-PREFIX_SIZE;
|
||||||
|
size_t size = *((size_t*)realptr);
|
||||||
|
/* Assume at least that all the allocations are padded at sizeof(long) by
|
||||||
|
* the underlying allocator. */
|
||||||
|
if (size&(sizeof(long)-1)) size += sizeof(long)-(size&(sizeof(long)-1));
|
||||||
|
return size+PREFIX_SIZE;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
void zfree(void *ptr) {
|
void zfree(void *ptr) {
|
||||||
#ifndef HAVE_MALLOC_SIZE
|
#ifndef HAVE_MALLOC_SIZE
|
||||||
void *realptr;
|
void *realptr;
|
||||||
|
@ -76,4 +76,8 @@ void zmalloc_enable_thread_safeness(void);
|
|||||||
float zmalloc_get_fragmentation_ratio(void);
|
float zmalloc_get_fragmentation_ratio(void);
|
||||||
size_t zmalloc_get_rss(void);
|
size_t zmalloc_get_rss(void);
|
||||||
|
|
||||||
|
#ifndef HAVE_MALLOC_SIZE
|
||||||
|
size_t zmalloc_size(void *ptr);
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif /* __ZMALLOC_H */
|
#endif /* __ZMALLOC_H */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user