MIGRATE: fix replies processing and argument rewriting.

We need to process replies after errors in order to delete keys
successfully transferred. Also argument rewriting was fixed since
it was broken in several ways. Now a fresh argument vector is created
and set if we are acknowledged of at least one key.
This commit is contained in:
antirez
2015-12-11 13:48:41 +01:00
parent d6bc17c254
commit 2b74b9857b
2 changed files with 40 additions and 15 deletions

View File

@ -4742,16 +4742,22 @@ try_again:
/* Read the RESTORE replies. */
int error_from_target = 0;
int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
robj **newargv;
if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
for (j = 0; j < num_keys; j++) {
if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0)
goto socket_err;
if (error_from_target) continue; /* Just consume the replies. */
if ((select && buf1[0] == '-') || buf2[0] == '-') {
/* On error assume that last_dbid is no longer valid. */
cs->last_dbid = -1;
addReplyErrorFormat(c,"Target instance replied with error: %s",
(select && buf1[0] == '-') ? buf1+1 : buf2+1);
error_from_target = 1;
if (!error_from_target) {
cs->last_dbid = -1;
addReplyErrorFormat(c,"Target instance replied with error: %s",
(select && buf1[0] == '-') ? buf1+1 : buf2+1);
error_from_target = 1;
}
} else {
if (!copy) {
robj *aux;
@ -4761,17 +4767,23 @@ try_again:
signalModifiedKey(c->db,kv[j]);
server.dirty++;
/* Translate MIGRATE as DEL for replication/AOF. */
if (j == 0) {
aux = createStringObject("DEL",3);
rewriteClientCommandArgument(c,0,aux);
decrRefCount(aux);
}
rewriteClientCommandArgument(c,j+1,kv[j]);
/* Populate the argument vector to replace the old one. */
newargv[del_idx++] = kv[j];
}
}
}
if (!copy) {
/* Translate MIGRATE as DEL for replication/AOF. */
if (del_idx > 1) {
newargv[0] = createStringObject("DEL",3);
replaceClientCommandVector(c,newargv,del_idx);
} else {
/* No key transfer acknowledged, no need to rewrite as DEL. */
zfree(newargv);
}
}
if (!error_from_target) {
/* Update the last_dbid in migrateCachedSocket and reply +OK. */
cs->last_dbid = dbid;