Allow compression of interior quicklist nodes

Let user set how many nodes to *not* compress.

We can specify a compression "depth" of how many nodes
to leave uncompressed on each end of the quicklist.

Depth 0 = disable compression.
Depth 1 = only leave head/tail uncompressed.
  - (read as: "skip 1 node on each end of the list before compressing")
Depth 2 = leave head, head->next, tail->prev, tail uncompressed.
  - ("skip 2 nodes on each end of the list before compressing")
Depth 3 = Depth 2 + head->next->next + tail->prev->prev
  - ("skip 3 nodes...")
etc.

This also:
  - updates RDB storage to use native quicklist compression (if node is
    already compressed) instead of uncompressing, generating the RDB string,
    then re-compressing the quicklist node.
  - internalizes the "fill" parameter for the quicklist so we don't
    need to pass it to _every_ function.  Now it's just a property of
    the list.
  - allows a runtime-configurable compression option, so we can
    expose a compresion parameter in the configuration file if people
    want to trade slight request-per-second performance for up to 90%+
    memory savings in some situations.
  - updates the quicklist tests to do multiple passes: 200k+ tests now.
This commit is contained in:
Matt Stancliff
2014-12-10 21:26:31 -05:00
parent 5127e39980
commit abdd1414a8
5 changed files with 1451 additions and 962 deletions

View File

@ -43,10 +43,7 @@ void listTypePush(robj *subject, robj *value, int where) {
int pos = (where == REDIS_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
value = getDecodedObject(value);
size_t len = sdslen(value->ptr);
size_t zlen = server.list_max_ziplist_entries;
/* If this value is greater than our allowed values, create it in
* an isolated ziplist */
quicklistPush(subject->ptr, zlen, value->ptr, len, pos);
quicklistPush(subject->ptr, value->ptr, len, pos);
decrRefCount(value);
} else {
redisPanic("Unknown list encoding");
@ -146,12 +143,11 @@ void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
value = getDecodedObject(value);
sds str = value->ptr;
size_t len = sdslen(str);
size_t zlen = server.list_max_ziplist_entries;
if (where == REDIS_TAIL) {
quicklistInsertAfter((quicklist *)entry->entry.quicklist, zlen,
quicklistInsertAfter((quicklist *)entry->entry.quicklist,
&entry->entry, str, len);
} else if (where == REDIS_HEAD) {
quicklistInsertBefore((quicklist *)entry->entry.quicklist, zlen,
quicklistInsertBefore((quicklist *)entry->entry.quicklist,
&entry->entry, str, len);
}
decrRefCount(value);
@ -188,7 +184,7 @@ void listTypeConvert(robj *subject, int enc) {
size_t zlen = server.list_max_ziplist_entries;
subject->encoding = REDIS_ENCODING_QUICKLIST;
subject->ptr = quicklistCreateFromZiplist(zlen, subject->ptr);
subject->ptr = quicklistCreateFromZiplist(zlen, 0 /*FIXME*/, subject->ptr);
} else {
redisPanic("Unsupported list conversion");
}
@ -211,6 +207,8 @@ void pushGenericCommand(redisClient *c, int where) {
c->argv[j] = tryObjectEncoding(c->argv[j]);
if (!lobj) {
lobj = createQuicklistObject();
quicklistSetFill(lobj->ptr, server.list_max_ziplist_entries);
quicklistSetCompress(lobj->ptr, 0 /*FIXME*/);
dbAdd(c->db,c->argv[1],lobj);
}
listTypePush(lobj,c->argv[j],where);
@ -539,6 +537,8 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value
/* Create the list if the key does not exist */
if (!dstobj) {
dstobj = createQuicklistObject();
quicklistSetFill(dstobj->ptr, server.list_max_ziplist_entries);
quicklistSetCompress(dstobj->ptr, 0 /*FIXME*/);
dbAdd(c->db,dstkey,dstobj);
}
signalModifiedKey(c->db,dstkey);