redis.c split into many different C files.

networking related stuff moved into networking.c

moved more code

more work on layout of source code

SDS instantaneuos memory saving. By Pieter and Salvatore at VMware ;)

cleanly compiling again after the first split, now splitting it in more C files

moving more things around... work in progress

split replication code

splitting more

Sets split

Hash split

replication split

even more splitting

more splitting

minor change
This commit is contained in:
antirez
2010-06-22 00:07:48 +02:00
parent c2ff0e90b8
commit e2641e09cc
65 changed files with 11811 additions and 12084 deletions

475
src/replication.c Normal file
View File

@ -0,0 +1,475 @@
#include "redis.h"
#include <sys/time.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int outc = 0, j;
robj **outv;
/* We need 1+(ARGS*3) objects since commands are using the new protocol
* and we one 1 object for the first "*<count>\r\n" multibulk count, then
* for every additional object we have "$<count>\r\n" + object + "\r\n". */
robj *static_outv[REDIS_STATIC_ARGS*3+1];
robj *lenobj;
if (argc <= REDIS_STATIC_ARGS) {
outv = static_outv;
} else {
outv = zmalloc(sizeof(robj*)*(argc*3+1));
}
lenobj = createObject(REDIS_STRING,
sdscatprintf(sdsempty(), "*%d\r\n", argc));
lenobj->refcount = 0;
outv[outc++] = lenobj;
for (j = 0; j < argc; j++) {
lenobj = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),"$%lu\r\n",
(unsigned long) stringObjectLen(argv[j])));
lenobj->refcount = 0;
outv[outc++] = lenobj;
outv[outc++] = argv[j];
outv[outc++] = shared.crlf;
}
/* Increment all the refcounts at start and decrement at end in order to
* be sure to free objects if there is no slave in a replication state
* able to be feed with commands */
for (j = 0; j < outc; j++) incrRefCount(outv[j]);
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
/* Feed all the other slaves, MONITORs and so on */
if (slave->slaveseldb != dictid) {
robj *selectcmd;
switch(dictid) {
case 0: selectcmd = shared.select0; break;
case 1: selectcmd = shared.select1; break;
case 2: selectcmd = shared.select2; break;
case 3: selectcmd = shared.select3; break;
case 4: selectcmd = shared.select4; break;
case 5: selectcmd = shared.select5; break;
case 6: selectcmd = shared.select6; break;
case 7: selectcmd = shared.select7; break;
case 8: selectcmd = shared.select8; break;
case 9: selectcmd = shared.select9; break;
default:
selectcmd = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),"select %d\r\n",dictid));
selectcmd->refcount = 0;
break;
}
addReply(slave,selectcmd);
slave->slaveseldb = dictid;
}
for (j = 0; j < outc; j++) addReply(slave,outv[j]);
}
for (j = 0; j < outc; j++) decrRefCount(outv[j]);
if (outv != static_outv) zfree(outv);
}
void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j;
sds cmdrepr = sdsnew("+");
robj *cmdobj;
struct timeval tv;
gettimeofday(&tv,NULL);
cmdrepr = sdscatprintf(cmdrepr,"%ld.%ld ",(long)tv.tv_sec,(long)tv.tv_usec);
if (dictid != 0) cmdrepr = sdscatprintf(cmdrepr,"(db %d) ", dictid);
for (j = 0; j < argc; j++) {
if (argv[j]->encoding == REDIS_ENCODING_INT) {
cmdrepr = sdscatprintf(cmdrepr, "%ld", (long)argv[j]->ptr);
} else {
cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
sdslen(argv[j]->ptr));
}
if (j != argc-1)
cmdrepr = sdscatlen(cmdrepr," ",1);
}
cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
cmdobj = createObject(REDIS_STRING,cmdrepr);
listRewind(monitors,&li);
while((ln = listNext(&li))) {
redisClient *monitor = ln->value;
addReply(monitor,cmdobj);
}
decrRefCount(cmdobj);
}
int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
ssize_t nwritten, ret = size;
time_t start = time(NULL);
timeout++;
while(size) {
if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
nwritten = write(fd,ptr,size);
if (nwritten == -1) return -1;
ptr += nwritten;
size -= nwritten;
}
if ((time(NULL)-start) > timeout) {
errno = ETIMEDOUT;
return -1;
}
}
return ret;
}
int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
ssize_t nread, totread = 0;
time_t start = time(NULL);
timeout++;
while(size) {
if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
nread = read(fd,ptr,size);
if (nread == -1) return -1;
ptr += nread;
size -= nread;
totread += nread;
}
if ((time(NULL)-start) > timeout) {
errno = ETIMEDOUT;
return -1;
}
}
return totread;
}
int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
ssize_t nread = 0;
size--;
while(size) {
char c;
if (syncRead(fd,&c,1,timeout) == -1) return -1;
if (c == '\n') {
*ptr = '\0';
if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
return nread;
} else {
*ptr++ = c;
*ptr = '\0';
nread++;
}
}
return nread;
}
void syncCommand(redisClient *c) {
/* ignore SYNC if aleady slave or in monitor mode */
if (c->flags & REDIS_SLAVE) return;
/* SYNC can't be issued when the server has pending data to send to
* the client about already issued commands. We need a fresh reply
* buffer registering the differences between the BGSAVE and the current
* dataset, so that we can copy to other slaves if needed. */
if (listLength(c->reply) != 0) {
addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
return;
}
redisLog(REDIS_NOTICE,"Slave ask for synchronization");
/* Here we need to check if there is a background saving operation
* in progress, or if it is required to start one */
if (server.bgsavechildpid != -1) {
/* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save */
redisClient *slave;
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
}
if (ln) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer. */
listRelease(c->reply);
c->reply = listDup(slave->reply);
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
/* No way, we need to wait for the next BGSAVE in order to
* register differences */
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
}
} else {
/* Ok we don't have a BGSAVE in progress, let's start one */
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
return;
}
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
}
c->repldbfd = -1;
c->flags |= REDIS_SLAVE;
c->slaveseldb = 0;
listAddNodeTail(server.slaves,c);
return;
}
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *slave = privdata;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
char buf[REDIS_IOBUF_LEN];
ssize_t nwritten, buflen;
if (slave->repldboff == 0) {
/* Write the bulk write count before to transfer the DB. In theory here
* we don't know how much room there is in the output buffer of the
* socket, but in pratice SO_SNDLOWAT (the minimum count for output
* operations) will never be smaller than the few bytes we need. */
sds bulkcount;
bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
slave->repldbsize);
if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
{
sdsfree(bulkcount);
freeClient(slave);
return;
}
sdsfree(bulkcount);
}
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
if (buflen <= 0) {
redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
(buflen == 0) ? "premature EOF" : strerror(errno));
freeClient(slave);
return;
}
if ((nwritten = write(fd,buf,buflen)) == -1) {
redisLog(REDIS_VERBOSE,"Write error sending DB to slave: %s",
strerror(errno));
freeClient(slave);
return;
}
slave->repldboff += nwritten;
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
slave->replstate = REDIS_REPL_ONLINE;
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendReplyToClient, slave) == AE_ERR) {
freeClient(slave);
return;
}
addReplySds(slave,sdsempty());
redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
}
}
/* This function is called at the end of every backgrond saving.
* The argument bgsaveerr is REDIS_OK if the background saving succeeded
* otherwise REDIS_ERR is passed to the function.
*
* The goal of this function is to handle slaves waiting for a successful
* background saving in order to perform non-blocking synchronization. */
void updateSlavesWaitingBgsave(int bgsaveerr) {
listNode *ln;
int startbgsave = 0;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
startbgsave = 1;
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
struct redis_stat buf;
if (bgsaveerr != REDIS_OK) {
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
continue;
}
if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = REDIS_REPL_SEND_BULK;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
}
}
if (startbgsave) {
if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
listIter li;
listRewind(server.slaves,&li);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
freeClient(slave);
}
}
}
}
int syncWithMaster(void) {
char buf[1024], tmpfile[256], authcmd[1024];
long dumpsize;
int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
int dfd, maxtries = 5;
if (fd == -1) {
redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
strerror(errno));
return REDIS_ERR;
}
/* AUTH with the master if required. */
if(server.masterauth) {
snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
close(fd);
redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
strerror(errno));
return REDIS_ERR;
}
/* Read the AUTH result. */
if (syncReadLine(fd,buf,1024,3600) == -1) {
close(fd);
redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
strerror(errno));
return REDIS_ERR;
}
if (buf[0] != '+') {
close(fd);
redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
return REDIS_ERR;
}
}
/* Issue the SYNC command */
if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
close(fd);
redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
return REDIS_ERR;
}
/* Read the bulk write count */
if (syncReadLine(fd,buf,1024,3600) == -1) {
close(fd);
redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
strerror(errno));
return REDIS_ERR;
}
if (buf[0] != '$') {
close(fd);
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
return REDIS_ERR;
}
dumpsize = strtol(buf+1,NULL,10);
redisLog(REDIS_NOTICE,"Receiving %ld bytes data dump from MASTER",dumpsize);
/* Read the bulk write data on a temp file */
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)time(NULL),(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
close(fd);
redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
return REDIS_ERR;
}
while(dumpsize) {
int nread, nwritten;
nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
if (nread == -1) {
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
strerror(errno));
close(fd);
close(dfd);
return REDIS_ERR;
}
nwritten = write(dfd,buf,nread);
if (nwritten == -1) {
redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
close(fd);
close(dfd);
return REDIS_ERR;
}
dumpsize -= nread;
}
close(dfd);
if (rename(tmpfile,server.dbfilename) == -1) {
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
unlink(tmpfile);
close(fd);
return REDIS_ERR;
}
emptyDb();
if (rdbLoad(server.dbfilename) != REDIS_OK) {
redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
close(fd);
return REDIS_ERR;
}
server.master = createClient(fd);
server.master->flags |= REDIS_MASTER;
server.master->authenticated = 1;
server.replstate = REDIS_REPL_CONNECTED;
return REDIS_OK;
}
void slaveofCommand(redisClient *c) {
if (!strcasecmp(c->argv[1]->ptr,"no") &&
!strcasecmp(c->argv[2]->ptr,"one")) {
if (server.masterhost) {
sdsfree(server.masterhost);
server.masterhost = NULL;
if (server.master) freeClient(server.master);
server.replstate = REDIS_REPL_NONE;
redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
}
} else {
sdsfree(server.masterhost);
server.masterhost = sdsdup(c->argv[1]->ptr);
server.masterport = atoi(c->argv[2]->ptr);
if (server.master) freeClient(server.master);
server.replstate = REDIS_REPL_CONNECT;
redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
server.masterhost, server.masterport);
}
addReply(c,shared.ok);
}