|
|
|
@ -2140,7 +2140,7 @@ void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|
|
|
|
* full length of the packet. When a whole packet is in memory this function
|
|
|
|
|
* will call the function to process the packet. And so forth. */
|
|
|
|
|
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|
|
|
|
char buf[sizeof(clusterMsg)];
|
|
|
|
|
clusterMsg buf[1];
|
|
|
|
|
ssize_t nread;
|
|
|
|
|
clusterMsg *hdr;
|
|
|
|
|
clusterLink *link = (clusterLink*) privdata;
|
|
|
|
@ -2517,7 +2517,8 @@ void clusterBroadcastPong(int target) {
|
|
|
|
|
*
|
|
|
|
|
* If link is NULL, then the message is broadcasted to the whole cluster. */
|
|
|
|
|
void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
|
|
|
|
|
unsigned char buf[sizeof(clusterMsg)], *payload;
|
|
|
|
|
unsigned char *payload;
|
|
|
|
|
clusterMsg buf[1];
|
|
|
|
|
clusterMsg *hdr = (clusterMsg*) buf;
|
|
|
|
|
uint32_t totlen;
|
|
|
|
|
uint32_t channel_len, message_len;
|
|
|
|
@ -2537,7 +2538,7 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
|
|
|
|
|
|
|
|
|
|
/* Try to use the local buffer if possible */
|
|
|
|
|
if (totlen < sizeof(buf)) {
|
|
|
|
|
payload = buf;
|
|
|
|
|
payload = (unsigned char*)buf;
|
|
|
|
|
} else {
|
|
|
|
|
payload = zmalloc(totlen);
|
|
|
|
|
memcpy(payload,hdr,sizeof(*hdr));
|
|
|
|
@ -2554,7 +2555,7 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
|
|
|
|
|
|
|
|
|
|
decrRefCount(channel);
|
|
|
|
|
decrRefCount(message);
|
|
|
|
|
if (payload != buf) zfree(payload);
|
|
|
|
|
if (payload != (unsigned char*)buf) zfree(payload);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Send a FAIL message to all the nodes we are able to contact.
|
|
|
|
@ -2563,7 +2564,7 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
|
|
|
|
|
* we switch the node state to CLUSTER_NODE_FAIL and ask all the other
|
|
|
|
|
* nodes to do the same ASAP. */
|
|
|
|
|
void clusterSendFail(char *nodename) {
|
|
|
|
|
unsigned char buf[sizeof(clusterMsg)];
|
|
|
|
|
clusterMsg buf[1];
|
|
|
|
|
clusterMsg *hdr = (clusterMsg*) buf;
|
|
|
|
|
|
|
|
|
|
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
|
|
|
|
@ -2575,7 +2576,7 @@ void clusterSendFail(char *nodename) {
|
|
|
|
|
* slots configuration. The node name, slots bitmap, and configEpoch info
|
|
|
|
|
* are included. */
|
|
|
|
|
void clusterSendUpdate(clusterLink *link, clusterNode *node) {
|
|
|
|
|
unsigned char buf[sizeof(clusterMsg)];
|
|
|
|
|
clusterMsg buf[1];
|
|
|
|
|
clusterMsg *hdr = (clusterMsg*) buf;
|
|
|
|
|
|
|
|
|
|
if (link == NULL) return;
|
|
|
|
@ -2583,7 +2584,7 @@ void clusterSendUpdate(clusterLink *link, clusterNode *node) {
|
|
|
|
|
memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
|
|
|
|
|
hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
|
|
|
|
|
memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
|
|
|
|
|
clusterSendMessage(link,buf,ntohl(hdr->totlen));
|
|
|
|
|
clusterSendMessage(link,(unsigned char*)buf,ntohl(hdr->totlen));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Send a MODULE message.
|
|
|
|
@ -2591,7 +2592,8 @@ void clusterSendUpdate(clusterLink *link, clusterNode *node) {
|
|
|
|
|
* If link is NULL, then the message is broadcasted to the whole cluster. */
|
|
|
|
|
void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
|
|
|
|
|
unsigned char *payload, uint32_t len) {
|
|
|
|
|
unsigned char buf[sizeof(clusterMsg)], *heapbuf;
|
|
|
|
|
unsigned char *heapbuf;
|
|
|
|
|
clusterMsg buf[1];
|
|
|
|
|
clusterMsg *hdr = (clusterMsg*) buf;
|
|
|
|
|
uint32_t totlen;
|
|
|
|
|
|
|
|
|
@ -2606,7 +2608,7 @@ void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
|
|
|
|
|
|
|
|
|
|
/* Try to use the local buffer if possible */
|
|
|
|
|
if (totlen < sizeof(buf)) {
|
|
|
|
|
heapbuf = buf;
|
|
|
|
|
heapbuf = (unsigned char*)buf;
|
|
|
|
|
} else {
|
|
|
|
|
heapbuf = zmalloc(totlen);
|
|
|
|
|
memcpy(heapbuf,hdr,sizeof(*hdr));
|
|
|
|
@ -2619,7 +2621,7 @@ void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
|
|
|
|
|
else
|
|
|
|
|
clusterBroadcastMessage(heapbuf,totlen);
|
|
|
|
|
|
|
|
|
|
if (heapbuf != buf) zfree(heapbuf);
|
|
|
|
|
if (heapbuf != (unsigned char*)buf) zfree(heapbuf);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* This function gets a cluster node ID string as target, the same way the nodes
|
|
|
|
@ -2663,7 +2665,7 @@ void clusterPropagatePublish(robj *channel, robj *message) {
|
|
|
|
|
* Note that we send the failover request to everybody, master and slave nodes,
|
|
|
|
|
* but only the masters are supposed to reply to our query. */
|
|
|
|
|
void clusterRequestFailoverAuth(void) {
|
|
|
|
|
unsigned char buf[sizeof(clusterMsg)];
|
|
|
|
|
clusterMsg buf[1];
|
|
|
|
|
clusterMsg *hdr = (clusterMsg*) buf;
|
|
|
|
|
uint32_t totlen;
|
|
|
|
|
|
|
|
|
@ -2679,7 +2681,7 @@ void clusterRequestFailoverAuth(void) {
|
|
|
|
|
|
|
|
|
|
/* Send a FAILOVER_AUTH_ACK message to the specified node. */
|
|
|
|
|
void clusterSendFailoverAuth(clusterNode *node) {
|
|
|
|
|
unsigned char buf[sizeof(clusterMsg)];
|
|
|
|
|
clusterMsg buf[1];
|
|
|
|
|
clusterMsg *hdr = (clusterMsg*) buf;
|
|
|
|
|
uint32_t totlen;
|
|
|
|
|
|
|
|
|
@ -2687,12 +2689,12 @@ void clusterSendFailoverAuth(clusterNode *node) {
|
|
|
|
|
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
|
|
|
|
|
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
|
|
|
|
|
hdr->totlen = htonl(totlen);
|
|
|
|
|
clusterSendMessage(node->link,buf,totlen);
|
|
|
|
|
clusterSendMessage(node->link,(unsigned char*)buf,totlen);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Send a MFSTART message to the specified node. */
|
|
|
|
|
void clusterSendMFStart(clusterNode *node) {
|
|
|
|
|
unsigned char buf[sizeof(clusterMsg)];
|
|
|
|
|
clusterMsg buf[1];
|
|
|
|
|
clusterMsg *hdr = (clusterMsg*) buf;
|
|
|
|
|
uint32_t totlen;
|
|
|
|
|
|
|
|
|
@ -2700,7 +2702,7 @@ void clusterSendMFStart(clusterNode *node) {
|
|
|
|
|
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
|
|
|
|
|
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
|
|
|
|
|
hdr->totlen = htonl(totlen);
|
|
|
|
|
clusterSendMessage(node->link,buf,totlen);
|
|
|
|
|
clusterSendMessage(node->link,(unsigned char*)buf,totlen);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Vote for the node asking for our vote if there are the conditions. */
|
|
|
|
|