add active defrag support for streams

This commit is contained in:
Oran Agra
2018-06-26 14:14:35 +03:00
parent e8099cabd1
commit 5616d4c603
6 changed files with 230 additions and 25 deletions

View File

@ -592,6 +592,171 @@ long defragSet(redisDb *db, dictEntry *kde) {
return defragged;
}
/* Defrag callback for radix tree iterator, called for each node,
* used in order to defrag the nodes allocations. */
int defragRaxNode(raxNode **noderef) {
raxNode *newnode = activeDefragAlloc(*noderef);
if (newnode) {
*noderef = newnode;
return 1;
}
return 0;
}
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
int scanLaterStraemListpacks(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) {
static unsigned char last[sizeof(streamID)];
raxIterator ri;
long iterations = 0;
if (ob->type != OBJ_STREAM || ob->encoding != OBJ_ENCODING_STREAM) {
*cursor = 0;
return 0;
}
stream *s = ob->ptr;
raxStart(&ri,s->rax);
if (*cursor == 0) {
/* if cursor is 0, we start new iteration */
defragRaxNode(&s->rax->head);
/* assign the iterator node callback before the seek, so that the
* initial nodes that are processed till the first item are covered */
ri.node_cb = defragRaxNode;
raxSeek(&ri,"^",NULL,0);
} else {
/* if cursor is non-zero, we seek to the static 'last' */
if (!raxSeek(&ri,">", last, sizeof(last))) {
*cursor = 0;
return 0;
}
/* assign the iterator node callback after the seek, so that the
* initial nodes that are processed till now aren't covered */
ri.node_cb = defragRaxNode;
}
(*cursor)++;
while (raxNext(&ri)) {
void *newdata = activeDefragAlloc(ri.data);
if (newdata)
raxSetData(ri.node, ri.data=newdata), (*defragged)++;
if (++iterations > 16) {
if (ustime() > endtime) {
serverAssert(ri.key_len==sizeof(last));
memcpy(last,ri.key,ri.key_len);
raxStop(&ri);
return 1;
}
iterations = 0;
}
}
raxStop(&ri);
*cursor = 0;
return 0;
}
/* optional callback used defrag each rax element (not including the element pointer itself) */
typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata, long *defragged);
/* defrag radix tree including:
* 1) rax struct
* 2) rax nodes
* 3) rax entry data (only if defrag_data is specified)
* 4) call a callback per element, and allow the callback to return a new pointer for the element */
long defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) {
long defragged = 0;
raxIterator ri;
rax* rax;
if ((rax = activeDefragAlloc(*raxref)))
defragged++, *raxref = rax;
rax = *raxref;
raxStart(&ri,rax);
ri.node_cb = defragRaxNode;
defragRaxNode(&rax->head);
raxSeek(&ri,"^",NULL,0);
while (raxNext(&ri)) {
void *newdata = NULL;
if (element_cb)
newdata = element_cb(&ri, element_cb_data, &defragged);
if (defrag_data && !newdata)
newdata = activeDefragAlloc(ri.data);
if (newdata)
raxSetData(ri.node, ri.data=newdata), defragged++;
}
raxStop(&ri);
return defragged;
}
typedef struct {
streamCG *cg;
streamConsumer *c;
} PendingEntryContext;
void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata, long *defragged) {
UNUSED(defragged);
PendingEntryContext *ctx = privdata;
streamNACK *nack = ri->data, *newnack;
nack->consumer = ctx->c; /* update nack pointer to consumer */
newnack = activeDefragAlloc(nack);
if (newnack) {
/* update consumer group pointer to the nack */
void *prev;
raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev);
serverAssert(prev==nack);
/* note: we don't increment 'defragged' that's done by the caller */
}
return newnack;
}
void* defragStreamConsumer(raxIterator *ri, void *privdata, long *defragged) {
streamConsumer *c = ri->data;
streamCG *cg = privdata;
void *newc = activeDefragAlloc(c);
if (newc) {
/* note: we don't increment 'defragged' that's done by the caller */
c = newc;
}
sds newsds = activeDefragSds(c->name);
if (newsds)
(*defragged)++, c->name = newsds;
if (c->pel) {
PendingEntryContext pel_ctx = {cg, c};
*defragged += defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx);
}
return newc; /* returns NULL if c was not defragged */
}
void* defragStreamConsumerGroup(raxIterator *ri, void *privdata, long *defragged) {
streamCG *cg = ri->data;
UNUSED(privdata);
if (cg->consumers)
*defragged += defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg);
if (cg->pel)
*defragged += defragRadixTree(&cg->pel, 0, NULL, NULL);
return NULL;
}
long defragStream(redisDb *db, dictEntry *kde) {
long defragged = 0;
robj *ob = dictGetVal(kde);
serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM);
stream *s = ob->ptr, *news;
/* handle the main struct */
if ((news = activeDefragAlloc(s)))
defragged++, ob->ptr = s = news;
if (raxSize(s->rax) > server.active_defrag_max_scan_fields) {
rax *newrax = activeDefragAlloc(s->rax);
if (newrax)
defragged++, s->rax = newrax;
defragLater(db, kde);
} else
defragged += defragRadixTree(&s->rax, 1, NULL, NULL);
if (s->cgroups)
defragged += defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL);
return defragged;
}
/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. Returns a stat of how many pointers were
* moved. */
@ -660,6 +825,8 @@ long defragKey(redisDb *db, dictEntry *de) {
} else {
serverPanic("Unknown hash encoding");
}
} else if (ob->type == OBJ_STREAM) {
defragged += defragStream(db, de);
} else if (ob->type == OBJ_MODULE) {
/* Currently defragmenting modules private data types
* is not supported. */
@ -680,7 +847,7 @@ void defragScanCallback(void *privdata, const dictEntry *de) {
server.stat_active_defrag_scanned++;
}
/* Defrag scan callback for for each hash table bicket,
/* Defrag scan callback for each hash table bicket,
* used in order to defrag the dictEntry allocations. */
void defragDictBucketCallback(void *privdata, dictEntry **bucketref) {
UNUSED(privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */
@ -728,27 +895,29 @@ long defragOtherGlobals() {
return defragged;
}
unsigned long defragLaterItem(dictEntry *de, unsigned long cursor) {
long defragged = 0;
/* returns 0 more work may or may not be needed (see non-zero cursor),
* and 1 if time is up and more work is needed. */
int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) {
if (de) {
robj *ob = dictGetVal(de);
if (ob->type == OBJ_LIST) {
defragged += scanLaterList(ob);
cursor = 0; /* list has no scan, we must finish it in one go */
server.stat_active_defrag_hits += scanLaterList(ob);
*cursor = 0; /* list has no scan, we must finish it in one go */
} else if (ob->type == OBJ_SET) {
defragged += scanLaterSet(ob, &cursor);
server.stat_active_defrag_hits += scanLaterSet(ob, cursor);
} else if (ob->type == OBJ_ZSET) {
defragged += scanLaterZset(ob, &cursor);
server.stat_active_defrag_hits += scanLaterZset(ob, cursor);
} else if (ob->type == OBJ_HASH) {
defragged += scanLaterHash(ob, &cursor);
server.stat_active_defrag_hits += scanLaterHash(ob, cursor);
} else if (ob->type == OBJ_STREAM) {
return scanLaterStraemListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits);
} else {
cursor = 0; /* object type may have changed since we schedule it for later */
*cursor = 0; /* object type may have changed since we schedule it for later */
}
} else {
cursor = 0; /* object may have been deleted already */
*cursor = 0; /* object may have been deleted already */
}
server.stat_active_defrag_hits += defragged;
return cursor;
return 0;
}
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
@ -788,17 +957,22 @@ int defragLaterStep(redisDb *db, long long endtime) {
dictEntry *de = dictFind(db->dict, current_key);
key_defragged = server.stat_active_defrag_hits;
do {
cursor = defragLaterItem(de, cursor);
int quit = 0;
if (defragLaterItem(de, &cursor, endtime))
quit = 1; /* time is up, we didn't finish all the work */
/* Don't start a new BIG key in this loop, this is because the
* next key can be a list, and scanLaterList must be done in once cycle */
if (!cursor)
quit = 1;
/* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields
* (if we have a lot of pointers in one hash bucket, or rehashing),
* check if we reached the time limit.
* But regardless, don't start a new BIG key in this loop, this is because the
* next key can be a list, and scanLaterList must be done in once cycle */
if (!cursor || (++iterations > 16 ||
* check if we reached the time limit. */
if (quit || (++iterations > 16 ||
server.stat_active_defrag_hits - prev_defragged > 512 ||
server.stat_active_defrag_scanned - prev_scanned > 64)) {
if (!cursor || ustime() > endtime) {
if (quit || ustime() > endtime) {
if(key_defragged != server.stat_active_defrag_hits)
server.stat_active_defrag_key_hits++;
else