mirror of
https://github.com/fluencelabs/redis
synced 2025-04-26 11:02:13 +00:00
Merge pull request #6894 from madolson/csc-fixes
Client side caching fixes
This commit is contained in:
commit
155526a44e
@ -2044,7 +2044,7 @@ void clientCommand(client *c) {
|
|||||||
"REPLY (on|off|skip) -- Control the replies sent to the current connection.",
|
"REPLY (on|off|skip) -- Control the replies sent to the current connection.",
|
||||||
"SETNAME <name> -- Assign the name <name> to the current connection.",
|
"SETNAME <name> -- Assign the name <name> to the current connection.",
|
||||||
"UNBLOCK <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.",
|
"UNBLOCK <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.",
|
||||||
"TRACKING (on|off) [REDIRECT <id>] -- Enable client keys tracking for client side caching.",
|
"TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first] [PREFIX second] ... -- Enable client keys tracking for client side caching.",
|
||||||
"GETREDIR -- Return the client ID we are redirecting to when tracking is enabled.",
|
"GETREDIR -- Return the client ID we are redirecting to when tracking is enabled.",
|
||||||
NULL
|
NULL
|
||||||
};
|
};
|
||||||
@ -2233,18 +2233,30 @@ NULL
|
|||||||
|
|
||||||
if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) {
|
if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) {
|
||||||
j++;
|
j++;
|
||||||
|
if (redir != 0) {
|
||||||
|
addReplyError(c,"A client can only redirect to a single "
|
||||||
|
"other client");
|
||||||
|
zfree(prefix);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) !=
|
if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) !=
|
||||||
C_OK) return;
|
C_OK)
|
||||||
|
{
|
||||||
|
zfree(prefix);
|
||||||
|
return;
|
||||||
|
}
|
||||||
/* We will require the client with the specified ID to exist
|
/* We will require the client with the specified ID to exist
|
||||||
* right now, even if it is possible that it gets disconnected
|
* right now, even if it is possible that it gets disconnected
|
||||||
* later. Still a valid sanity check. */
|
* later. Still a valid sanity check. */
|
||||||
if (lookupClientByID(redir) == NULL) {
|
if (lookupClientByID(redir) == NULL) {
|
||||||
addReplyError(c,"The client ID you want redirect to "
|
addReplyError(c,"The client ID you want redirect to "
|
||||||
"does not exist");
|
"does not exist");
|
||||||
|
zfree(prefix);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
|
} else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
|
||||||
bcast++;
|
bcast = 1;
|
||||||
} else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) {
|
} else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) {
|
||||||
j++;
|
j++;
|
||||||
prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1));
|
prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1));
|
||||||
|
@ -44,7 +44,7 @@
|
|||||||
rax *TrackingTable = NULL;
|
rax *TrackingTable = NULL;
|
||||||
rax *PrefixTable = NULL;
|
rax *PrefixTable = NULL;
|
||||||
uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
|
uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
|
||||||
the whole tracking table. This givesn
|
the whole tracking table. This gives
|
||||||
an hint about the total memory we
|
an hint about the total memory we
|
||||||
are using server side for CSC. */
|
are using server side for CSC. */
|
||||||
robj *TrackingChannelName;
|
robj *TrackingChannelName;
|
||||||
@ -145,9 +145,9 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called after the excution of a readonly command in the
|
/* This function is called after the execution of a readonly command in the
|
||||||
* case the client 'c' has keys tracking enabled. It will populate the
|
* case the client 'c' has keys tracking enabled. It will populate the
|
||||||
* tracking ivalidation table according to the keys the user fetched, so that
|
* tracking invalidation table according to the keys the user fetched, so that
|
||||||
* Redis will know what are the clients that should receive an invalidation
|
* Redis will know what are the clients that should receive an invalidation
|
||||||
* message with certain groups of keys are modified. */
|
* message with certain groups of keys are modified. */
|
||||||
void trackingRememberKeys(client *c) {
|
void trackingRememberKeys(client *c) {
|
||||||
@ -292,19 +292,12 @@ void trackingInvalidateKey(robj *keyobj) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called when one or all the Redis databases are flushed
|
/* This function is called when one or all the Redis databases are flushed
|
||||||
* (dbid == -1 in case of FLUSHALL). Caching slots are not specific for
|
* (dbid == -1 in case of FLUSHALL). Caching keys are not specific for
|
||||||
* each DB but are global: currently what we do is sending a special
|
* each DB but are global: currently what we do is send a special
|
||||||
* notification to clients with tracking enabled, invalidating the caching
|
* notification to clients with tracking enabled, invalidating the caching
|
||||||
* slot "-1", which means, "all the keys", in order to avoid flooding clients
|
* key "", which means, "all the keys", in order to avoid flooding clients
|
||||||
* with many invalidation messages for all the keys they may hold.
|
* with many invalidation messages for all the keys they may hold.
|
||||||
*
|
*/
|
||||||
* However trying to flush the tracking table here is very costly:
|
|
||||||
* we need scanning 16 million caching slots in the table to check
|
|
||||||
* if they are used, this introduces a big delay. So what we do is to really
|
|
||||||
* flush the table in the case of FLUSHALL. When a FLUSHDB is called instead
|
|
||||||
* we just send the invalidation message to all the clients, but don't
|
|
||||||
* flush the table: it will slowly get garbage collected as more keys
|
|
||||||
* are modified in the used caching slots. */
|
|
||||||
void freeTrackingRadixTree(void *rt) {
|
void freeTrackingRadixTree(void *rt) {
|
||||||
raxFree(rt);
|
raxFree(rt);
|
||||||
}
|
}
|
||||||
@ -325,6 +318,7 @@ void trackingInvalidateKeysOnFlush(int dbid) {
|
|||||||
/* In case of FLUSHALL, reclaim all the memory used by tracking. */
|
/* In case of FLUSHALL, reclaim all the memory used by tracking. */
|
||||||
if (dbid == -1 && TrackingTable) {
|
if (dbid == -1 && TrackingTable) {
|
||||||
raxFreeWithCallback(TrackingTable,freeTrackingRadixTree);
|
raxFreeWithCallback(TrackingTable,freeTrackingRadixTree);
|
||||||
|
TrackingTable = raxNew();
|
||||||
TrackingTableTotalItems = 0;
|
TrackingTableTotalItems = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user