Safer handling of MULTI/EXEC on errors.

After the transcation starts with a MULIT, the previous behavior was to
return an error on problems such as maxmemory limit reached. But still
to execute the transaction with the subset of queued commands on EXEC.

While it is true that the client was able to check for errors
distinguish QUEUED by an error reply, MULTI/EXEC in most client
implementations uses pipelining for speed, so all the commands and EXEC
are sent without caring about replies.

With this change:

1) EXEC fails if at least one command was not queued because of an
error. The EXECABORT error is used.
2) A generic error is always reported on EXEC.
3) The client DISCARDs the MULTI state after a failed EXEC, otherwise
pipelining multiple transactions would be basically impossible:
After a failed EXEC the next transaction would be simply queued as
the tail of the previous transaction.
This commit is contained in:
antirez
2012-11-15 20:11:05 +01:00
parent dff1ec4a36
commit 60f9dac672
3 changed files with 44 additions and 21 deletions

View File

@ -1037,6 +1037,8 @@ void createSharedObjects(void) {
"-READONLY You can't write against a read only slave.\r\n"));
shared.oomerr = createObject(REDIS_STRING,sdsnew(
"-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
shared.execaborterr = createObject(REDIS_STRING,sdsnew(
"-EXECABORT Transaction discarded because of previous errors.\r\n"));
shared.space = createObject(REDIS_STRING,sdsnew(" "));
shared.colon = createObject(REDIS_STRING,sdsnew(":"));
shared.plus = createObject(REDIS_STRING,sdsnew("+"));
@ -1553,11 +1555,13 @@ int processCommand(redisClient *c) {
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
flagTransaction(c);
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
return REDIS_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return REDIS_OK;
@ -1566,6 +1570,7 @@ int processCommand(redisClient *c) {
/* Check if the user is authenticated */
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
flagTransaction(c);
addReplyError(c,"operation not permitted");
return REDIS_OK;
}
@ -1578,6 +1583,7 @@ int processCommand(redisClient *c) {
if (server.maxmemory) {
int retval = freeMemoryIfNeeded();
if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
flagTransaction(c);
addReply(c, shared.oomerr);
return REDIS_OK;
}
@ -1589,6 +1595,7 @@ int processCommand(redisClient *c) {
&& server.lastbgsave_status == REDIS_ERR &&
c->cmd->flags & REDIS_CMD_WRITE)
{
flagTransaction(c);
addReply(c, shared.bgsaveerr);
return REDIS_OK;
}
@ -1620,6 +1627,7 @@ int processCommand(redisClient *c) {
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & REDIS_CMD_STALE))
{
flagTransaction(c);
addReply(c, shared.masterdownerr);
return REDIS_OK;
}
@ -1641,6 +1649,7 @@ int processCommand(redisClient *c) {
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
flagTransaction(c);
addReply(c, shared.slowscripterr);
return REDIS_OK;
}