mirror of
https://github.com/fluencelabs/redis
synced 2025-06-14 09:41:21 +00:00
Added basic support for clusters to redis-benchmark.
This commit is contained in:
@ -48,16 +48,19 @@
|
||||
#include "adlist.h"
|
||||
#include "zmalloc.h"
|
||||
#include "atomicvar.h"
|
||||
#include "crc16_slottable.h"
|
||||
|
||||
#define UNUSED(V) ((void) V)
|
||||
#define RANDPTR_INITIAL_SIZE 8
|
||||
#define MAX_LATENCY_PRECISION 3
|
||||
#define MAX_THREADS 16
|
||||
#define CLUSTER_SLOTS 16384
|
||||
|
||||
#define CLIENT_GET_EVENTLOOP(c) \
|
||||
(c->thread_id >= 0 ? config.threads[c->thread_id]->el : config.el)
|
||||
|
||||
struct benchmarkThread;
|
||||
struct clusterNode;
|
||||
|
||||
static struct config {
|
||||
aeEventLoop *el;
|
||||
@ -92,6 +95,10 @@ static struct config {
|
||||
int precision;
|
||||
int num_threads;
|
||||
struct benchmarkThread **threads;
|
||||
int cluster_mode;
|
||||
int cluster_node_count;
|
||||
struct clusterNode **cluster_nodes;
|
||||
struct clusterNode *cluster_slots[CLUSTER_SLOTS];
|
||||
/* Thread mutexes to be used as fallbacks by atomicvar.h */
|
||||
pthread_mutex_t requests_issued_mutex;
|
||||
pthread_mutex_t requests_finished_mutex;
|
||||
@ -123,6 +130,23 @@ typedef struct benchmarkThread {
|
||||
aeEventLoop *el;
|
||||
} benchmarkThread;
|
||||
|
||||
/* Cluster. */
|
||||
typedef struct clusterNode {
|
||||
char *ip;
|
||||
int port;
|
||||
sds name;
|
||||
int flags;
|
||||
sds replicate; /* Master ID if node is a slave */
|
||||
uint8_t slots[CLUSTER_SLOTS];
|
||||
int slots_count;
|
||||
int replicas_count;
|
||||
sds *migrating; /* An array of sds where even strings are slots and odd
|
||||
* strings are the destination node IDs. */
|
||||
sds *importing; /* An array of sds where even strings are slots and odd
|
||||
* strings are the source node IDs. */
|
||||
int migrating_count; /* Length of the migrating array (migrating slots*2) */
|
||||
int importing_count; /* Length of the importing array (importing slots*2) */
|
||||
} clusterNode;
|
||||
|
||||
/* Prototypes */
|
||||
static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
@ -131,6 +155,7 @@ static benchmarkThread *createBenchmarkThread(int index);
|
||||
static void freeBenchmarkThread(benchmarkThread *thread);
|
||||
static void freeBenchmarkThreads();
|
||||
static void *execBenchmarkThread(void *ptr);
|
||||
static clusterNode *createClusterNode(char *ip, int port);
|
||||
int showThroughput(struct aeEventLoop *eventLoop, long long id,
|
||||
void *clientData);
|
||||
|
||||
@ -373,10 +398,22 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
* Even when cloning another client, prefix commands are applied if needed.*/
|
||||
static client createClient(char *cmd, size_t len, client from, int thread_id) {
|
||||
int j;
|
||||
int is_cluster_client = (config.cluster_mode && thread_id >= 0);
|
||||
client c = zmalloc(sizeof(struct _client));
|
||||
|
||||
if (config.hostsocket == NULL) {
|
||||
c->context = redisConnectNonBlock(config.hostip,config.hostport);
|
||||
if (config.hostsocket == NULL || is_cluster_client) {
|
||||
const char *ip;
|
||||
int port;
|
||||
if (!is_cluster_client) {
|
||||
ip = config.hostip;
|
||||
port = config.hostport;
|
||||
} else {
|
||||
clusterNode *node = config.cluster_nodes[thread_id];
|
||||
if (node == NULL) exit(1);
|
||||
ip = (const char *) node->ip;
|
||||
port = node->port;
|
||||
}
|
||||
c->context = redisConnectNonBlock(ip,port);
|
||||
} else {
|
||||
c->context = redisConnectUnixNonBlock(config.hostsocket);
|
||||
}
|
||||
@ -621,6 +658,220 @@ static void *execBenchmarkThread(void *ptr) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static clusterNode *createClusterNode(char *ip, int port) {
|
||||
clusterNode *node = zmalloc(sizeof(*node));
|
||||
if (!node) return NULL;
|
||||
node->ip = ip;
|
||||
node->port = port;
|
||||
node->name = NULL;
|
||||
node->flags = 0;
|
||||
node->replicate = NULL;
|
||||
node->replicas_count = 0;
|
||||
memset(node->slots, 0, sizeof(node->slots));
|
||||
node->slots_count = 0;
|
||||
node->migrating = NULL;
|
||||
node->importing = NULL;
|
||||
node->migrating_count = 0;
|
||||
node->importing_count = 0;
|
||||
return node;
|
||||
}
|
||||
|
||||
static void freeClusterNode(clusterNode *node) {
|
||||
int i;
|
||||
if (node->name) sdsfree(node->name);
|
||||
if (node->replicate) sdsfree(node->replicate);
|
||||
if (node->migrating != NULL) {
|
||||
for (i = 0; i < node->migrating_count; i++) sdsfree(node->migrating[i]);
|
||||
zfree(node->migrating);
|
||||
}
|
||||
if (node->importing != NULL) {
|
||||
for (i = 0; i < node->importing_count; i++) sdsfree(node->importing[i]);
|
||||
zfree(node->importing);
|
||||
}
|
||||
zfree(node);
|
||||
}
|
||||
|
||||
static void freeClusterNodes() {
|
||||
int i = 0;
|
||||
for (; i < config.cluster_node_count; i++) {
|
||||
clusterNode *n = config.cluster_nodes[i];
|
||||
if (n) freeClusterNode(n);
|
||||
}
|
||||
zfree(config.cluster_nodes);
|
||||
}
|
||||
|
||||
static clusterNode **addClusterNode(clusterNode *node) {
|
||||
int count = config.cluster_node_count + 1;
|
||||
config.cluster_nodes = zrealloc(config.cluster_nodes,
|
||||
count * sizeof(*node));
|
||||
if (!config.cluster_nodes) return NULL;
|
||||
config.cluster_nodes[config.cluster_node_count++] = node;
|
||||
return config.cluster_nodes;
|
||||
}
|
||||
|
||||
static int fetchClusterConfiguration() {
|
||||
int success = 1;
|
||||
redisContext *ctx = NULL;
|
||||
redisReply *reply = NULL;
|
||||
if (config.hostsocket == NULL)
|
||||
ctx = redisConnect(config.hostip,config.hostport);
|
||||
else
|
||||
ctx = redisConnectUnix(config.hostsocket);
|
||||
if (ctx->err) {
|
||||
fprintf(stderr,"Could not connect to Redis at ");
|
||||
if (config.hostsocket == NULL) {
|
||||
fprintf(stderr,"%s:%d: %s\n",config.hostip,config.hostport,
|
||||
ctx->errstr);
|
||||
} else fprintf(stderr,"%s: %s\n",config.hostsocket,ctx->errstr);
|
||||
exit(1);
|
||||
}
|
||||
clusterNode *firstNode = createClusterNode((char *) config.hostip,
|
||||
config.hostport);
|
||||
if (!firstNode) {success = 0; goto cleanup;}
|
||||
reply = redisCommand(ctx, "CLUSTER NODES");
|
||||
success = (reply != NULL);
|
||||
if (!success) goto cleanup;
|
||||
success = (reply->type != REDIS_REPLY_ERROR);
|
||||
if (!success) {
|
||||
fprintf(stderr, "Cluster node %s:%d replied with error:\n%s\n",
|
||||
config.hostip, config.hostport, reply->str);
|
||||
goto cleanup;
|
||||
}
|
||||
char *lines = reply->str, *p, *line;
|
||||
while ((p = strstr(lines, "\n")) != NULL) {
|
||||
*p = '\0';
|
||||
line = lines;
|
||||
lines = p + 1;
|
||||
char *name = NULL, *addr = NULL, *flags = NULL, *master_id = NULL;
|
||||
int i = 0;
|
||||
while ((p = strchr(line, ' ')) != NULL) {
|
||||
*p = '\0';
|
||||
char *token = line;
|
||||
line = p + 1;
|
||||
switch(i++){
|
||||
case 0: name = token; break;
|
||||
case 1: addr = token; break;
|
||||
case 2: flags = token; break;
|
||||
case 3: master_id = token; break;
|
||||
}
|
||||
if (i == 8) break; // Slots
|
||||
}
|
||||
if (!flags) {
|
||||
fprintf(stderr, "Invalid CLUSTER NODES reply: missing flags.\n");
|
||||
success = 0;
|
||||
goto cleanup;
|
||||
}
|
||||
int myself = (strstr(flags, "myself") != NULL);
|
||||
int is_replica = (strstr(flags, "slave") != NULL ||
|
||||
(master_id != NULL && master_id[0] != '-'));
|
||||
if (is_replica) continue;
|
||||
clusterNode *node = NULL;
|
||||
if (myself) {
|
||||
node = firstNode;
|
||||
} else {
|
||||
if (addr == NULL) {
|
||||
fprintf(stderr, "Invalid CLUSTER NODES reply: missing addr.\n");
|
||||
success = 0;
|
||||
goto cleanup;
|
||||
}
|
||||
char *paddr = strchr(addr, ':');
|
||||
if (paddr == NULL) {
|
||||
success = 0;
|
||||
goto cleanup;
|
||||
}
|
||||
*paddr = '\0';
|
||||
char *ip = addr;
|
||||
addr = paddr + 1;
|
||||
/* If internal bus is specified, then just drop it. */
|
||||
if ((paddr = strchr(addr, '@')) != NULL) *paddr = '\0';
|
||||
int port = atoi(addr);
|
||||
node = createClusterNode(ip, port);
|
||||
}
|
||||
if (node == NULL) {
|
||||
success = 0;
|
||||
goto cleanup;
|
||||
}
|
||||
node->name = sdsnew(name);
|
||||
if (!addClusterNode(node)) {
|
||||
success = 0;
|
||||
goto cleanup;
|
||||
}
|
||||
if (i == 8) {
|
||||
int remaining = strlen(line);
|
||||
while (remaining > 0) {
|
||||
p = strchr(line, ' ');
|
||||
if (p == NULL) p = line + remaining;
|
||||
remaining -= (p - line);
|
||||
|
||||
char *slotsdef = line;
|
||||
*p = '\0';
|
||||
if (remaining) {
|
||||
line = p + 1;
|
||||
remaining--;
|
||||
} else line = p;
|
||||
char *dash = NULL;
|
||||
if (slotsdef[0] == '[') {
|
||||
slotsdef++;
|
||||
if ((p = strstr(slotsdef, "->-"))) { // Migrating
|
||||
*p = '\0';
|
||||
p += 3;
|
||||
char *closing_bracket = strchr(p, ']');
|
||||
if (closing_bracket) *closing_bracket = '\0';
|
||||
sds slot = sdsnew(slotsdef);
|
||||
sds dst = sdsnew(p);
|
||||
node->migrating_count += 2;
|
||||
node->migrating =
|
||||
zrealloc(node->migrating,
|
||||
(node->migrating_count * sizeof(sds)));
|
||||
node->migrating[node->migrating_count - 2] =
|
||||
slot;
|
||||
node->migrating[node->migrating_count - 1] =
|
||||
dst;
|
||||
} else if ((p = strstr(slotsdef, "-<-"))) {//Importing
|
||||
*p = '\0';
|
||||
p += 3;
|
||||
char *closing_bracket = strchr(p, ']');
|
||||
if (closing_bracket) *closing_bracket = '\0';
|
||||
sds slot = sdsnew(slotsdef);
|
||||
sds src = sdsnew(p);
|
||||
node->importing_count += 2;
|
||||
node->importing = zrealloc(node->importing,
|
||||
(node->importing_count * sizeof(sds)));
|
||||
node->importing[node->importing_count - 2] =
|
||||
slot;
|
||||
node->importing[node->importing_count - 1] =
|
||||
src;
|
||||
}
|
||||
} else if ((dash = strchr(slotsdef, '-')) != NULL) {
|
||||
p = dash;
|
||||
int start, stop;
|
||||
*p = '\0';
|
||||
start = atoi(slotsdef);
|
||||
stop = atoi(p + 1);
|
||||
node->slots_count += (stop - (start - 1));
|
||||
while (start <= stop) {
|
||||
int slot = start++;
|
||||
node->slots[slot] = 1;
|
||||
config.cluster_slots[slot] = node;
|
||||
}
|
||||
} else if (p > slotsdef) {
|
||||
int slot = atoi(slotsdef);
|
||||
node->slots[slot] = 1;
|
||||
node->slots_count++;
|
||||
config.cluster_slots[slot] = node;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
cleanup:
|
||||
if (ctx) redisFree(ctx);
|
||||
if (!success) {
|
||||
if (config.cluster_nodes) freeClusterNodes();
|
||||
}
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
/* Returns number of consumed options. */
|
||||
int parseOptions(int argc, const char **argv) {
|
||||
int i;
|
||||
@ -704,6 +955,8 @@ int parseOptions(int argc, const char **argv) {
|
||||
MAX_THREADS);
|
||||
config.num_threads = MAX_THREADS;
|
||||
} else if (config.num_threads < 0) config.num_threads = 0;
|
||||
} else if (!strcmp(argv[i],"--cluster")) {
|
||||
config.cluster_mode = 1;
|
||||
} else if (!strcmp(argv[i],"--help")) {
|
||||
exit_status = 0;
|
||||
goto usage;
|
||||
@ -850,12 +1103,48 @@ int main(int argc, const char **argv) {
|
||||
config.precision = 1;
|
||||
config.num_threads = 0;
|
||||
config.threads = NULL;
|
||||
config.cluster_mode = 0;
|
||||
config.cluster_node_count = 0;
|
||||
config.cluster_nodes = NULL;
|
||||
memset(config.cluster_slots, 0, sizeof(config.cluster_slots));
|
||||
|
||||
i = parseOptions(argc,argv);
|
||||
argc -= i;
|
||||
argv += i;
|
||||
|
||||
config.latency = zmalloc(sizeof(long long)*config.requests);
|
||||
|
||||
if (config.cluster_mode) {
|
||||
/* Fetch cluster configuration. */
|
||||
if (!fetchClusterConfiguration() || !config.cluster_nodes) {
|
||||
if (!config.hostsocket) {
|
||||
fprintf(stderr, "Failed to fetch cluster configuration from "
|
||||
"%s:%d\n", config.hostip, config.hostport);
|
||||
} else {
|
||||
fprintf(stderr, "Failed to fetch cluster configuration from "
|
||||
"%s\n", config.hostsocket);
|
||||
}
|
||||
exit(1);
|
||||
}
|
||||
if (config.cluster_node_count <= 1) {
|
||||
fprintf(stderr, "Invalid cluster: %d node(s).\n",
|
||||
config.cluster_node_count);
|
||||
exit(1);
|
||||
}
|
||||
printf("Cluster has %d master nodes:\n\n", config.cluster_node_count);
|
||||
int i = 0;
|
||||
for (; i < config.cluster_node_count; i++) {
|
||||
clusterNode *node = config.cluster_nodes[i];
|
||||
if (!node) {
|
||||
fprintf(stderr, "Invalid cluster node #%d\n", i);
|
||||
exit(1);
|
||||
}
|
||||
if (node->name) printf("%s ", node->name);
|
||||
printf("%s:%d\n", node->ip, node->port);
|
||||
}
|
||||
config.num_threads = config.cluster_node_count;
|
||||
}
|
||||
|
||||
if (config.num_threads > 0) {
|
||||
pthread_mutex_init(&(config.requests_issued_mutex), NULL);
|
||||
pthread_mutex_init(&(config.requests_finished_mutex), NULL);
|
||||
|
Reference in New Issue
Block a user