Modules Cluster API: message bus implementation.

This commit is contained in:
antirez
2018-03-29 15:13:31 +02:00
parent 8ac7af1c5d
commit 0701cad3de
4 changed files with 111 additions and 5 deletions

View File

@ -75,6 +75,7 @@ void clusterDelNode(clusterNode *delnode);
sds representClusterNodeFlags(sds ci, uint16_t flags);
uint64_t clusterGetMaxEpoch(void);
int clusterBumpConfigEpochWithoutConsensus(void);
void moduleCallClusterReceivers(char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
/* -----------------------------------------------------------------------------
* Initialization
@ -1682,6 +1683,12 @@ int clusterProcessPacket(clusterLink *link) {
explen += sizeof(clusterMsgDataUpdate);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_MODULE) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) -
3 + ntohl(hdr->data.module.msg.len);
if (totlen != explen) return 1;
}
/* Check if the sender is a known node. */
@ -2076,6 +2083,15 @@ int clusterProcessPacket(clusterLink *link) {
* config accordingly. */
clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
hdr->data.update.nodecfg.slots);
} else if (type == CLUSTERMSG_TYPE_MODULE) {
if (!sender) return 1; /* Protect the module from unknown nodes. */
/* We need to route this message back to the right module subscribed
* for the right message type. */
uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
uint32_t len = ntohl(hdr->data.module.msg.len);
uint8_t type = hdr->data.module.msg.type;
unsigned char *payload = hdr->data.module.msg.bulk_data;
moduleCallClusterReceivers(sender->name,module_id,type,payload,len);
} else {
serverLog(LL_WARNING,"Received unknown packet type: %d", type);
}
@ -2563,6 +2579,61 @@ void clusterSendUpdate(clusterLink *link, clusterNode *node) {
clusterSendMessage(link,buf,ntohl(hdr->totlen));
}
/* Send a MODULE message.
*
* If link is NULL, then the message is broadcasted to the whole cluster. */
void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
unsigned char *payload, uint32_t len) {
unsigned char buf[sizeof(clusterMsg)], *heapbuf;
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgModule) - 3 + len;
hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
hdr->data.module.msg.type = type;
hdr->data.module.msg.len = htonl(len);
hdr->totlen = htonl(totlen);
/* Try to use the local buffer if possible */
if (totlen < sizeof(buf)) {
heapbuf = buf;
} else {
heapbuf = zmalloc(totlen);
memcpy(heapbuf,hdr,sizeof(*hdr));
hdr = (clusterMsg*) heapbuf;
}
memcpy(hdr->data.module.msg.bulk_data,payload,len);
if (link)
clusterSendMessage(link,heapbuf,totlen);
else
clusterBroadcastMessage(heapbuf,totlen);
if (heapbuf != buf) zfree(heapbuf);
}
/* This function gets a cluster node ID string as target, the same way the nodes
* addresses are represented in the modules side, resolves the node, and sends
* the message. If the target is NULL the message is broadcasted.
*
* The function returns C_OK if the target is valid, otherwise C_ERR is
* returned. */
int clusterSendModuleMessageToTarget(char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len) {
clusterNode *node = NULL;
if (target != NULL) {
node = clusterLookupNode(target);
if (node == NULL || node->link == NULL) return C_ERR;
}
clusterSendModule(target ? node->link : NULL,
module_id, type, payload, len);
return C_OK;
}
/* -----------------------------------------------------------------------------
* CLUSTER Pub/Sub support
*
@ -4008,6 +4079,7 @@ const char *clusterGetMessageTypeString(int type) {
case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
case CLUSTERMSG_TYPE_UPDATE: return "update";
case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
case CLUSTERMSG_TYPE_MODULE: return "module";
}
return "unknown";
}