mirror of
https://github.com/fluencelabs/redis
synced 2025-06-21 13:01:32 +00:00
more step forwards for disk store to be able to run
This commit is contained in:
62
src/debug.c
62
src/debug.c
@ -200,60 +200,28 @@ void debugCommand(redisClient *c) {
|
|||||||
} else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
|
||||||
dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr);
|
dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr);
|
||||||
robj *val;
|
robj *val;
|
||||||
|
char *strenc;
|
||||||
|
char *storage;
|
||||||
|
|
||||||
if (!de) {
|
if (!de) {
|
||||||
addReply(c,shared.nokeyerr);
|
addReply(c,shared.nokeyerr);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
val = dictGetEntryVal(de);
|
val = dictGetEntryVal(de);
|
||||||
if (!server.vm_enabled || (val->storage == REDIS_VM_MEMORY ||
|
strenc = strEncoding(val->encoding);
|
||||||
val->storage == REDIS_VM_SWAPPING)) {
|
switch(val->storage) {
|
||||||
char *strenc;
|
case REDIS_DS_MEMORY: storage = "memory"; break;
|
||||||
|
case REDIS_DS_DIRTY: storage = "dirty"; break;
|
||||||
strenc = strEncoding(val->encoding);
|
case REDIS_DS_SAVING: storage = "saving"; break;
|
||||||
addReplyStatusFormat(c,
|
default: storage = "unknown"; break;
|
||||||
"Value at:%p refcount:%d "
|
|
||||||
"encoding:%s serializedlength:%lld "
|
|
||||||
"lru:%d lru_seconds_idle:%lu",
|
|
||||||
(void*)val, val->refcount,
|
|
||||||
strenc, (long long) rdbSavedObjectLen(val),
|
|
||||||
val->lru, estimateObjectIdleTime(val));
|
|
||||||
} else {
|
|
||||||
vmpointer *vp = (vmpointer*) val;
|
|
||||||
addReplyStatusFormat(c,
|
|
||||||
"Value swapped at: page %llu "
|
|
||||||
"using %llu pages",
|
|
||||||
(unsigned long long) vp->page,
|
|
||||||
(unsigned long long) vp->usedpages);
|
|
||||||
}
|
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) {
|
|
||||||
lookupKeyRead(c->db,c->argv[2]);
|
|
||||||
addReply(c,shared.ok);
|
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"swapout") && c->argc == 3) {
|
|
||||||
dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr);
|
|
||||||
robj *val;
|
|
||||||
vmpointer *vp;
|
|
||||||
|
|
||||||
if (!server.vm_enabled) {
|
|
||||||
addReplyError(c,"Virtual Memory is disabled");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (!de) {
|
|
||||||
addReply(c,shared.nokeyerr);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
val = dictGetEntryVal(de);
|
|
||||||
/* Swap it */
|
|
||||||
if (val->storage != REDIS_VM_MEMORY) {
|
|
||||||
addReplyError(c,"This key is not in memory");
|
|
||||||
} else if (val->refcount != 1) {
|
|
||||||
addReplyError(c,"Object is shared");
|
|
||||||
} else if ((vp = vmSwapObjectBlocking(val)) != NULL) {
|
|
||||||
dictGetEntryVal(de) = vp;
|
|
||||||
addReply(c,shared.ok);
|
|
||||||
} else {
|
|
||||||
addReply(c,shared.err);
|
|
||||||
}
|
}
|
||||||
|
addReplyStatusFormat(c,
|
||||||
|
"Value at:%p refcount:%d "
|
||||||
|
"encoding:%s serializedlength:%lld "
|
||||||
|
"lru:%d lru_seconds_idle:%lu storage:%s",
|
||||||
|
(void*)val, val->refcount,
|
||||||
|
strenc, (long long) rdbSavedObjectLen(val),
|
||||||
|
val->lru, estimateObjectIdleTime(val), storage);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"populate") && c->argc == 3) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"populate") && c->argc == 3) {
|
||||||
long keys, j;
|
long keys, j;
|
||||||
robj *key, *val;
|
robj *key, *val;
|
||||||
|
@ -115,5 +115,8 @@ int dsSet(redisDb *db, robj *key, robj *val) {
|
|||||||
robj *dsGet(redisDb *db, robj *key) {
|
robj *dsGet(redisDb *db, robj *key) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int dsDel(redisDb *db, robj *key) {
|
||||||
|
}
|
||||||
|
|
||||||
int dsExists(redisDb *db, robj *key) {
|
int dsExists(redisDb *db, robj *key) {
|
||||||
}
|
}
|
||||||
|
100
src/dscache.c
100
src/dscache.c
@ -227,6 +227,7 @@ int cacheFreeOneEntry(void) {
|
|||||||
dbDelete(best_db,kobj);
|
dbDelete(best_db,kobj);
|
||||||
decrRefCount(kobj);
|
decrRefCount(kobj);
|
||||||
}
|
}
|
||||||
|
return REDIS_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Return true if it's safe to swap out objects in a given moment.
|
/* Return true if it's safe to swap out objects in a given moment.
|
||||||
@ -240,7 +241,8 @@ int dsCanTouchDiskStore(void) {
|
|||||||
|
|
||||||
void freeIOJob(iojob *j) {
|
void freeIOJob(iojob *j) {
|
||||||
decrRefCount(j->key);
|
decrRefCount(j->key);
|
||||||
decrRefCount(j->val);
|
/* j->val can be NULL if the job is about deleting the key from disk. */
|
||||||
|
if (j->val) decrRefCount(j->val);
|
||||||
zfree(j);
|
zfree(j);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -279,13 +281,17 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
|
|||||||
|
|
||||||
/* Post process it in the main thread, as there are things we
|
/* Post process it in the main thread, as there are things we
|
||||||
* can do just here to avoid race conditions and/or invasive locks */
|
* can do just here to avoid race conditions and/or invasive locks */
|
||||||
redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr);
|
redisLog(REDIS_DEBUG,"COMPLETED Job type %s, key: %s",
|
||||||
|
(j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
|
||||||
|
(unsigned char*)j->key->ptr);
|
||||||
de = dictFind(j->db->dict,j->key->ptr);
|
de = dictFind(j->db->dict,j->key->ptr);
|
||||||
redisAssert(de != NULL);
|
redisAssert(de != NULL);
|
||||||
if (j->type == REDIS_IOJOB_LOAD) {
|
if (j->type == REDIS_IOJOB_LOAD) {
|
||||||
|
/* Create the key-value pair in the in-memory database */
|
||||||
dbAdd(j->db,j->key,j->val);
|
dbAdd(j->db,j->key,j->val);
|
||||||
|
/* Handle clients waiting for this key to be loaded. */
|
||||||
|
handleClientsBlockedOnSwappedKey(j->db,j->key);
|
||||||
freeIOJob(j);
|
freeIOJob(j);
|
||||||
/* FIXME: notify clients waiting for this key */
|
|
||||||
} else if (j->type == REDIS_IOJOB_SAVE) {
|
} else if (j->type == REDIS_IOJOB_SAVE) {
|
||||||
redisAssert(j->val->storage == REDIS_DS_SAVING);
|
redisAssert(j->val->storage == REDIS_DS_SAVING);
|
||||||
j->val->storage = REDIS_DS_MEMORY;
|
j->val->storage = REDIS_DS_MEMORY;
|
||||||
@ -330,22 +336,23 @@ void *IOThreadEntryPoint(void *arg) {
|
|||||||
j = ln->value;
|
j = ln->value;
|
||||||
listDelNode(server.io_newjobs,ln);
|
listDelNode(server.io_newjobs,ln);
|
||||||
/* Add the job in the processing queue */
|
/* Add the job in the processing queue */
|
||||||
j->thread = pthread_self();
|
|
||||||
listAddNodeTail(server.io_processing,j);
|
listAddNodeTail(server.io_processing,j);
|
||||||
ln = listLast(server.io_processing); /* We use ln later to remove it */
|
ln = listLast(server.io_processing); /* We use ln later to remove it */
|
||||||
unlockThreadedIO();
|
unlockThreadedIO();
|
||||||
redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'",
|
redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'",
|
||||||
(long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
|
(long) pthread_self(),
|
||||||
|
(j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
|
||||||
|
(void*)j, (char*)j->key->ptr);
|
||||||
|
|
||||||
/* Process the Job */
|
/* Process the Job */
|
||||||
if (j->type == REDIS_IOJOB_LOAD) {
|
if (j->type == REDIS_IOJOB_LOAD) {
|
||||||
vmpointer *vp = (vmpointer*)j->id;
|
j->val = dsGet(j->db,j->key);
|
||||||
j->val = vmReadObjectFromSwap(j->page,vp->vtype);
|
redisAssert(j->val != NULL);
|
||||||
} else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
|
} else if (j->type == REDIS_IOJOB_SAVE) {
|
||||||
j->pages = rdbSavedObjectPages(j->val);
|
if (j->val)
|
||||||
} else if (j->type == REDIS_IOJOB_DO_SWAP) {
|
dsSet(j->db,j->key,j->val);
|
||||||
if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
|
else
|
||||||
j->canceled = 1;
|
dsDel(j->db,j->key);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Done: insert the job into the processed queue */
|
/* Done: insert the job into the processed queue */
|
||||||
@ -420,52 +427,50 @@ void queueIOJob(iojob *j) {
|
|||||||
spawnIOThread();
|
spawnIOThread();
|
||||||
}
|
}
|
||||||
|
|
||||||
int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
|
void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
|
||||||
iojob *j;
|
iojob *j;
|
||||||
|
|
||||||
j = zmalloc(sizeof(*j));
|
j = zmalloc(sizeof(*j));
|
||||||
j->type = REDIS_IOJOB_PREPARE_SWAP;
|
j->type = type;
|
||||||
j->db = db;
|
j->db = db;
|
||||||
j->key = key;
|
j->key = key;
|
||||||
incrRefCount(key);
|
incrRefCount(key);
|
||||||
j->id = j->val = val;
|
j->val = val;
|
||||||
incrRefCount(val);
|
incrRefCount(val);
|
||||||
j->canceled = 0;
|
|
||||||
j->thread = (pthread_t) -1;
|
|
||||||
val->storage = REDIS_VM_SWAPPING;
|
|
||||||
|
|
||||||
lockThreadedIO();
|
lockThreadedIO();
|
||||||
queueIOJob(j);
|
queueIOJob(j);
|
||||||
unlockThreadedIO();
|
unlockThreadedIO();
|
||||||
return REDIS_OK;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ============ Virtual Memory - Blocking clients on missing keys =========== */
|
/* ============ Virtual Memory - Blocking clients on missing keys =========== */
|
||||||
|
|
||||||
/* This function makes the clinet 'c' waiting for the key 'key' to be loaded.
|
/* This function makes the clinet 'c' waiting for the key 'key' to be loaded.
|
||||||
* If there is not already a job loading the key, it is craeted.
|
* If the key is already in memory we don't need to block, regardless
|
||||||
* The key is added to the io_keys list in the client structure, and also
|
* of the storage of the value object for this key:
|
||||||
|
*
|
||||||
|
* - If it's REDIS_DS_MEMORY we have the key in memory.
|
||||||
|
* - If it's REDIS_DS_DIRTY they key was modified, but still in memory.
|
||||||
|
* - if it's REDIS_DS_SAVING the key is being saved by an IO Job. When
|
||||||
|
* the client will lookup the key it will block if the key is still
|
||||||
|
* in this stage but it's more or less the best we can do.
|
||||||
|
* FIXME: we should try if it's actually better to suspend the client
|
||||||
|
* accessing an object that is being saved, and awake it only when
|
||||||
|
* the saving was completed.
|
||||||
|
*
|
||||||
|
* Otherwise if the key is not in memory, we block the client and start
|
||||||
|
* an IO Job to load it:
|
||||||
|
*
|
||||||
|
* the key is added to the io_keys list in the client structure, and also
|
||||||
* in the hash table mapping swapped keys to waiting clients, that is,
|
* in the hash table mapping swapped keys to waiting clients, that is,
|
||||||
* server.io_waited_keys. */
|
* server.io_waited_keys. */
|
||||||
int waitForSwappedKey(redisClient *c, robj *key) {
|
int waitForSwappedKey(redisClient *c, robj *key) {
|
||||||
struct dictEntry *de;
|
struct dictEntry *de;
|
||||||
robj *o;
|
|
||||||
list *l;
|
list *l;
|
||||||
|
|
||||||
/* If the key does not exist or is already in RAM we don't need to
|
/* Return ASAP if the key is in memory */
|
||||||
* block the client at all. */
|
|
||||||
de = dictFind(c->db->dict,key->ptr);
|
de = dictFind(c->db->dict,key->ptr);
|
||||||
if (de == NULL) return 0;
|
if (de != NULL) return 0;
|
||||||
o = dictGetEntryVal(de);
|
|
||||||
if (o->storage == REDIS_VM_MEMORY) {
|
|
||||||
return 0;
|
|
||||||
} else if (o->storage == REDIS_VM_SWAPPING) {
|
|
||||||
/* We were swapping the key, undo it! */
|
|
||||||
vmCancelThreadedIOJob(o);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* OK: the key is either swapped, or being loaded just now. */
|
|
||||||
|
|
||||||
/* Add the key to the list of keys this client is waiting for.
|
/* Add the key to the list of keys this client is waiting for.
|
||||||
* This maps clients to keys they are waiting for. */
|
* This maps clients to keys they are waiting for. */
|
||||||
@ -488,25 +493,8 @@ int waitForSwappedKey(redisClient *c, robj *key) {
|
|||||||
listAddNodeTail(l,c);
|
listAddNodeTail(l,c);
|
||||||
|
|
||||||
/* Are we already loading the key from disk? If not create a job */
|
/* Are we already loading the key from disk? If not create a job */
|
||||||
if (o->storage == REDIS_VM_SWAPPED) {
|
if (de == NULL)
|
||||||
iojob *j;
|
dsCreateIOJob(REDIS_IOJOB_LOAD,c->db,key,NULL);
|
||||||
vmpointer *vp = (vmpointer*)o;
|
|
||||||
|
|
||||||
o->storage = REDIS_VM_LOADING;
|
|
||||||
j = zmalloc(sizeof(*j));
|
|
||||||
j->type = REDIS_IOJOB_LOAD;
|
|
||||||
j->db = c->db;
|
|
||||||
j->id = (robj*)vp;
|
|
||||||
j->key = key;
|
|
||||||
incrRefCount(key);
|
|
||||||
j->page = vp->page;
|
|
||||||
j->val = NULL;
|
|
||||||
j->canceled = 0;
|
|
||||||
j->thread = (pthread_t) -1;
|
|
||||||
lockThreadedIO();
|
|
||||||
queueIOJob(j);
|
|
||||||
unlockThreadedIO();
|
|
||||||
}
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -584,7 +572,7 @@ int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd) {
|
|||||||
if (listLength(c->io_keys)) {
|
if (listLength(c->io_keys)) {
|
||||||
c->flags |= REDIS_IO_WAIT;
|
c->flags |= REDIS_IO_WAIT;
|
||||||
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
|
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
|
||||||
server.vm_blocked_clients++;
|
server.cache_blocked_clients++;
|
||||||
return 1;
|
return 1;
|
||||||
} else {
|
} else {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -772,6 +772,7 @@ int dsOpen(void);
|
|||||||
int dsClose(void);
|
int dsClose(void);
|
||||||
int dsSet(redisDb *db, robj *key, robj *val);
|
int dsSet(redisDb *db, robj *key, robj *val);
|
||||||
robj *dsGet(redisDb *db, robj *key);
|
robj *dsGet(redisDb *db, robj *key);
|
||||||
|
int dsDel(redisDb *db, robj *key);
|
||||||
int dsExists(redisDb *db, robj *key);
|
int dsExists(redisDb *db, robj *key);
|
||||||
|
|
||||||
/* Disk Store Cache */
|
/* Disk Store Cache */
|
||||||
|
Reference in New Issue
Block a user