dataspace change hook functions backported to 2.4. They are just wrappers at this point but this makes simpler to cherry-pick between branches.

This commit is contained in:
antirez
2011-07-12 09:56:41 +02:00
parent 91654a40e7
commit 19453556d4
7 changed files with 63 additions and 44 deletions

View File

@ -103,7 +103,7 @@ void setKey(redisDb *db, robj *key, robj *val) {
} }
incrRefCount(val); incrRefCount(val);
removeExpire(db,key); removeExpire(db,key);
touchWatchedKey(db,key); signalModifiedKey(db,key);
} }
int dbExists(redisDb *db, robj *key) { int dbExists(redisDb *db, robj *key) {
@ -169,20 +169,37 @@ int selectDb(redisClient *c, int id) {
return REDIS_OK; return REDIS_OK;
} }
/*-----------------------------------------------------------------------------
* Hooks for key space changes.
*
* Every time a key in the database is modified the function
* signalModifiedKey() is called.
*
* Every time a DB is flushed the function signalFlushDb() is called.
*----------------------------------------------------------------------------*/
void signalModifiedKey(redisDb *db, robj *key) {
touchWatchedKey(db,key);
}
void signalFlushedDb(int dbid) {
touchWatchedKeysOnFlush(dbid);
}
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------
* Type agnostic commands operating on the key space * Type agnostic commands operating on the key space
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
void flushdbCommand(redisClient *c) { void flushdbCommand(redisClient *c) {
server.dirty += dictSize(c->db->dict); server.dirty += dictSize(c->db->dict);
touchWatchedKeysOnFlush(c->db->id); signalFlushedDb(c->db->id);
dictEmpty(c->db->dict); dictEmpty(c->db->dict);
dictEmpty(c->db->expires); dictEmpty(c->db->expires);
addReply(c,shared.ok); addReply(c,shared.ok);
} }
void flushallCommand(redisClient *c) { void flushallCommand(redisClient *c) {
touchWatchedKeysOnFlush(-1); signalFlushedDb(-1);
server.dirty += emptyDb(); server.dirty += emptyDb();
addReply(c,shared.ok); addReply(c,shared.ok);
if (server.bgsavechildpid != -1) { if (server.bgsavechildpid != -1) {
@ -198,7 +215,7 @@ void delCommand(redisClient *c) {
for (j = 1; j < c->argc; j++) { for (j = 1; j < c->argc; j++) {
if (dbDelete(c->db,c->argv[j])) { if (dbDelete(c->db,c->argv[j])) {
touchWatchedKey(c->db,c->argv[j]); signalModifiedKey(c->db,c->argv[j]);
server.dirty++; server.dirty++;
deleted++; deleted++;
} }
@ -346,8 +363,8 @@ void renameGenericCommand(redisClient *c, int nx) {
dbAdd(c->db,c->argv[2],o); dbAdd(c->db,c->argv[2],o);
} }
dbDelete(c->db,c->argv[1]); dbDelete(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[2]); signalModifiedKey(c->db,c->argv[2]);
server.dirty++; server.dirty++;
addReply(c,nx ? shared.cone : shared.ok); addReply(c,nx ? shared.cone : shared.ok);
} }
@ -510,13 +527,13 @@ void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
if (seconds <= 0 && !server.loading) { if (seconds <= 0 && !server.loading) {
if (dbDelete(c->db,key)) server.dirty++; if (dbDelete(c->db,key)) server.dirty++;
addReply(c, shared.cone); addReply(c, shared.cone);
touchWatchedKey(c->db,key); signalModifiedKey(c->db,key);
return; return;
} else { } else {
time_t when = time(NULL)+seconds; time_t when = time(NULL)+seconds;
setExpire(c->db,key,when); setExpire(c->db,key,when);
addReply(c,shared.cone); addReply(c,shared.cone);
touchWatchedKey(c->db,key); signalModifiedKey(c->db,key);
server.dirty++; server.dirty++;
return; return;
} }

View File

@ -917,6 +917,8 @@ robj *dbRandomKey(redisDb *db);
int dbDelete(redisDb *db, robj *key); int dbDelete(redisDb *db, robj *key);
long long emptyDb(); long long emptyDb();
int selectDb(redisClient *c, int id); int selectDb(redisClient *c, int id);
void signalModifiedKey(redisDb *db, robj *key);
void signalFlushedDb(int dbid);
/* Git SHA1 */ /* Git SHA1 */
char *redisGitSHA1(void); char *redisGitSHA1(void);

View File

@ -279,7 +279,7 @@ void hsetCommand(redisClient *c) {
hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]); hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]);
update = hashTypeSet(o,c->argv[2],c->argv[3]); update = hashTypeSet(o,c->argv[2],c->argv[3]);
addReply(c, update ? shared.czero : shared.cone); addReply(c, update ? shared.czero : shared.cone);
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
} }
@ -294,7 +294,7 @@ void hsetnxCommand(redisClient *c) {
hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]); hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]);
hashTypeSet(o,c->argv[2],c->argv[3]); hashTypeSet(o,c->argv[2],c->argv[3]);
addReply(c, shared.cone); addReply(c, shared.cone);
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
} }
} }
@ -315,7 +315,7 @@ void hmsetCommand(redisClient *c) {
hashTypeSet(o,c->argv[i],c->argv[i+1]); hashTypeSet(o,c->argv[i],c->argv[i+1]);
} }
addReply(c, shared.ok); addReply(c, shared.ok);
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
} }
@ -342,7 +342,7 @@ void hincrbyCommand(redisClient *c) {
hashTypeSet(o,c->argv[2],new); hashTypeSet(o,c->argv[2],new);
decrRefCount(new); decrRefCount(new);
addReplyLongLong(c,value); addReplyLongLong(c,value);
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
} }
@ -408,7 +408,7 @@ void hdelCommand(redisClient *c) {
} }
} }
if (deleted) { if (deleted) {
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty += deleted; server.dirty += deleted;
} }
addReplyLongLong(c,deleted); addReplyLongLong(c,deleted);

View File

@ -286,7 +286,7 @@ void pushGenericCommand(redisClient *c, int where) {
pushed++; pushed++;
} }
addReplyLongLong(c,addlen + (lobj ? listTypeLength(lobj) : 0)); addReplyLongLong(c,addlen + (lobj ? listTypeLength(lobj) : 0));
if (pushed) touchWatchedKey(c->db,c->argv[1]); if (pushed) signalModifiedKey(c->db,c->argv[1]);
server.dirty += pushed; server.dirty += pushed;
} }
@ -335,7 +335,7 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
if (subject->encoding == REDIS_ENCODING_ZIPLIST && if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
ziplistLen(subject->ptr) > server.list_max_ziplist_entries) ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST); listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
} else { } else {
/* Notify client of a failed insert */ /* Notify client of a failed insert */
@ -344,7 +344,7 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
} }
} else { } else {
listTypePush(subject,val,where); listTypePush(subject,val,where);
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
} }
@ -432,7 +432,7 @@ void lsetCommand(redisClient *c) {
o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr)); o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
decrRefCount(value); decrRefCount(value);
addReply(c,shared.ok); addReply(c,shared.ok);
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
} }
} else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
@ -444,7 +444,7 @@ void lsetCommand(redisClient *c) {
listNodeValue(ln) = value; listNodeValue(ln) = value;
incrRefCount(value); incrRefCount(value);
addReply(c,shared.ok); addReply(c,shared.ok);
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
} }
} else { } else {
@ -463,7 +463,7 @@ void popGenericCommand(redisClient *c, int where) {
addReplyBulk(c,value); addReplyBulk(c,value);
decrRefCount(value); decrRefCount(value);
if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]); if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
} }
} }
@ -578,7 +578,7 @@ void ltrimCommand(redisClient *c) {
redisPanic("Unknown list encoding"); redisPanic("Unknown list encoding");
} }
if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]); if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
addReply(c,shared.ok); addReply(c,shared.ok);
} }
@ -621,7 +621,7 @@ void lremCommand(redisClient *c) {
if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]); if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
addReplyLongLong(c,removed); addReplyLongLong(c,removed);
if (removed) touchWatchedKey(c->db,c->argv[1]); if (removed) signalModifiedKey(c->db,c->argv[1]);
} }
/* This is the semantic of this command: /* This is the semantic of this command:
@ -649,7 +649,7 @@ void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey,
dstobj = createZiplistObject(); dstobj = createZiplistObject();
dbAdd(c->db,dstkey,dstobj); dbAdd(c->db,dstkey,dstobj);
} else { } else {
touchWatchedKey(c->db,dstkey); signalModifiedKey(c->db,dstkey);
} }
listTypePush(dstobj,value,REDIS_HEAD); listTypePush(dstobj,value,REDIS_HEAD);
/* If we are pushing as a result of LPUSH against a key /* If we are pushing as a result of LPUSH against a key
@ -698,7 +698,7 @@ void rpoplpushCommand(redisClient *c) {
/* Delete the source list when it is empty */ /* Delete the source list when it is empty */
if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey); if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
touchWatchedKey(c->db,touchedkey); signalModifiedKey(c->db,touchedkey);
decrRefCount(touchedkey); decrRefCount(touchedkey);
server.dirty++; server.dirty++;
} }

View File

@ -235,7 +235,7 @@ void saddCommand(redisClient *c) {
c->argv[j] = tryObjectEncoding(c->argv[j]); c->argv[j] = tryObjectEncoding(c->argv[j]);
if (setTypeAdd(set,c->argv[j])) added++; if (setTypeAdd(set,c->argv[j])) added++;
} }
if (added) touchWatchedKey(c->db,c->argv[1]); if (added) signalModifiedKey(c->db,c->argv[1]);
server.dirty += added; server.dirty += added;
addReplyLongLong(c,added); addReplyLongLong(c,added);
} }
@ -257,7 +257,7 @@ void sremCommand(redisClient *c) {
} }
} }
if (deleted) { if (deleted) {
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty += deleted; server.dirty += deleted;
} }
addReplyLongLong(c,deleted); addReplyLongLong(c,deleted);
@ -294,8 +294,8 @@ void smoveCommand(redisClient *c) {
/* Remove the src set from the database when empty */ /* Remove the src set from the database when empty */
if (setTypeSize(srcset) == 0) dbDelete(c->db,c->argv[1]); if (setTypeSize(srcset) == 0) dbDelete(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[2]); signalModifiedKey(c->db,c->argv[2]);
server.dirty++; server.dirty++;
/* Create the destination set when it doesn't exist */ /* Create the destination set when it doesn't exist */
@ -356,7 +356,7 @@ void spopCommand(redisClient *c) {
addReplyBulk(c,ele); addReplyBulk(c,ele);
if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
} }
@ -397,7 +397,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
zfree(sets); zfree(sets);
if (dstkey) { if (dstkey) {
if (dbDelete(c->db,dstkey)) { if (dbDelete(c->db,dstkey)) {
touchWatchedKey(c->db,dstkey); signalModifiedKey(c->db,dstkey);
server.dirty++; server.dirty++;
} }
addReply(c,shared.czero); addReply(c,shared.czero);
@ -502,7 +502,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
decrRefCount(dstset); decrRefCount(dstset);
addReply(c,shared.czero); addReply(c,shared.czero);
} }
touchWatchedKey(c->db,dstkey); signalModifiedKey(c->db,dstkey);
server.dirty++; server.dirty++;
} else { } else {
setDeferredMultiBulkLength(c,replylen,cardinality); setDeferredMultiBulkLength(c,replylen,cardinality);
@ -594,7 +594,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *
decrRefCount(dstset); decrRefCount(dstset);
addReply(c,shared.czero); addReply(c,shared.czero);
} }
touchWatchedKey(c->db,dstkey); signalModifiedKey(c->db,dstkey);
server.dirty++; server.dirty++;
} }
zfree(sets); zfree(sets);

View File

@ -142,7 +142,7 @@ void setbitCommand(redisClient *c) {
byteval &= ~(1 << bit); byteval &= ~(1 << bit);
byteval |= ((on & 0x1) << bit); byteval |= ((on & 0x1) << bit);
((char*)o->ptr)[byte] = byteval; ((char*)o->ptr)[byte] = byteval;
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
addReply(c, bitval ? shared.cone : shared.czero); addReply(c, bitval ? shared.cone : shared.czero);
} }
@ -230,7 +230,7 @@ void setrangeCommand(redisClient *c) {
if (sdslen(value) > 0) { if (sdslen(value) > 0) {
o->ptr = sdsgrowzero(o->ptr,offset+sdslen(value)); o->ptr = sdsgrowzero(o->ptr,offset+sdslen(value));
memcpy((char*)o->ptr+offset,value,sdslen(value)); memcpy((char*)o->ptr+offset,value,sdslen(value));
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
} }
addReplyLongLong(c,sdslen(o->ptr)); addReplyLongLong(c,sdslen(o->ptr));
@ -347,7 +347,7 @@ void incrDecrCommand(redisClient *c, long long incr) {
dbOverwrite(c->db,c->argv[1],new); dbOverwrite(c->db,c->argv[1],new);
else else
dbAdd(c->db,c->argv[1],new); dbAdd(c->db,c->argv[1],new);
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
addReply(c,shared.colon); addReply(c,shared.colon);
addReply(c,new); addReply(c,new);
@ -410,7 +410,7 @@ void appendCommand(redisClient *c) {
o->ptr = sdscatlen(o->ptr,append->ptr,sdslen(append->ptr)); o->ptr = sdscatlen(o->ptr,append->ptr,sdslen(append->ptr));
totlen = sdslen(o->ptr); totlen = sdslen(o->ptr);
} }
touchWatchedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
addReplyLongLong(c,totlen); addReplyLongLong(c,totlen);
} }

View File

@ -882,7 +882,7 @@ void zaddGenericCommand(redisClient *c, int incr) {
zobj->ptr = zzlDelete(zobj->ptr,eptr); zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score); zobj->ptr = zzlInsert(zobj->ptr,ele,score);
touchWatchedKey(c->db,key); signalModifiedKey(c->db,key);
server.dirty++; server.dirty++;
} }
} else { } else {
@ -894,7 +894,7 @@ void zaddGenericCommand(redisClient *c, int incr) {
if (sdslen(ele->ptr) > server.zset_max_ziplist_value) if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
zsetConvert(zobj,REDIS_ENCODING_SKIPLIST); zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
touchWatchedKey(c->db,key); signalModifiedKey(c->db,key);
server.dirty++; server.dirty++;
if (!incr) added++; if (!incr) added++;
} }
@ -929,7 +929,7 @@ void zaddGenericCommand(redisClient *c, int incr) {
incrRefCount(curobj); /* Re-inserted in skiplist. */ incrRefCount(curobj); /* Re-inserted in skiplist. */
dictGetEntryVal(de) = &znode->score; /* Update score ptr. */ dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
touchWatchedKey(c->db,key); signalModifiedKey(c->db,key);
server.dirty++; server.dirty++;
} }
} else { } else {
@ -938,7 +938,7 @@ void zaddGenericCommand(redisClient *c, int incr) {
redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK); redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
incrRefCount(ele); /* Added to dictionary. */ incrRefCount(ele); /* Added to dictionary. */
touchWatchedKey(c->db,key); signalModifiedKey(c->db,key);
server.dirty++; server.dirty++;
if (!incr) added++; if (!incr) added++;
} }
@ -1010,7 +1010,7 @@ void zremCommand(redisClient *c) {
} }
if (deleted) { if (deleted) {
touchWatchedKey(c->db,key); signalModifiedKey(c->db,key);
server.dirty += deleted; server.dirty += deleted;
} }
addReplyLongLong(c,deleted); addReplyLongLong(c,deleted);
@ -1043,7 +1043,7 @@ void zremrangebyscoreCommand(redisClient *c) {
redisPanic("Unknown sorted set encoding"); redisPanic("Unknown sorted set encoding");
} }
if (deleted) touchWatchedKey(c->db,key); if (deleted) signalModifiedKey(c->db,key);
server.dirty += deleted; server.dirty += deleted;
addReplyLongLong(c,deleted); addReplyLongLong(c,deleted);
} }
@ -1091,7 +1091,7 @@ void zremrangebyrankCommand(redisClient *c) {
redisPanic("Unknown sorted set encoding"); redisPanic("Unknown sorted set encoding");
} }
if (deleted) touchWatchedKey(c->db,key); if (deleted) signalModifiedKey(c->db,key);
server.dirty += deleted; server.dirty += deleted;
addReplyLongLong(c,deleted); addReplyLongLong(c,deleted);
} }
@ -1623,7 +1623,7 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
zuiClearIterator(&src[i]); zuiClearIterator(&src[i]);
if (dbDelete(c->db,dstkey)) { if (dbDelete(c->db,dstkey)) {
touchWatchedKey(c->db,dstkey); signalModifiedKey(c->db,dstkey);
touched = 1; touched = 1;
server.dirty++; server.dirty++;
} }
@ -1635,7 +1635,7 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
dbAdd(c->db,dstkey,dstobj); dbAdd(c->db,dstkey,dstobj);
addReplyLongLong(c,zsetLength(dstobj)); addReplyLongLong(c,zsetLength(dstobj));
if (!touched) touchWatchedKey(c->db,dstkey); if (!touched) signalModifiedKey(c->db,dstkey);
server.dirty++; server.dirty++;
} else { } else {
decrRefCount(dstobj); decrRefCount(dstobj);