RESP3: hiredis updated with recent version + some RESP3 support.

This commit is contained in:
antirez
2018-12-04 19:01:33 +01:00
parent 809e3a44a7
commit d5c54f0b3a
16 changed files with 499 additions and 226 deletions

67
deps/hiredis/async.c vendored
View File

@ -336,7 +336,8 @@ static void __redisAsyncDisconnect(redisAsyncContext *ac) {
if (ac->err == 0) {
/* For clean disconnects, there should be no pending callbacks. */
assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR);
int ret = __redisShiftCallback(&ac->replies,NULL);
assert(ret == REDIS_ERR);
} else {
/* Disconnection is caused by an error, make sure that pending
* callbacks cannot call new commands. */
@ -364,6 +365,7 @@ void redisAsyncDisconnect(redisAsyncContext *ac) {
static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
redisContext *c = &(ac->c);
dict *callbacks;
redisCallback *cb;
dictEntry *de;
int pvariant;
char *stype;
@ -387,16 +389,28 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
de = dictFind(callbacks,sname);
if (de != NULL) {
memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb));
cb = dictGetEntryVal(de);
/* If this is an subscribe reply decrease pending counter. */
if (strcasecmp(stype+pvariant,"subscribe") == 0) {
cb->pending_subs -= 1;
}
memcpy(dstcb,cb,sizeof(*dstcb));
/* If this is an unsubscribe message, remove it. */
if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
dictDelete(callbacks,sname);
if (cb->pending_subs == 0)
dictDelete(callbacks,sname);
/* If this was the last unsubscribe message, revert to
* non-subscribe mode. */
assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
if (reply->element[2]->integer == 0)
/* Unset subscribed flag only when no pipelined pending subscribe. */
if (reply->element[2]->integer == 0
&& dictSize(ac->sub.channels) == 0
&& dictSize(ac->sub.patterns) == 0)
c->flags &= ~REDIS_SUBSCRIBED;
}
}
@ -410,7 +424,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb = {NULL, NULL, NULL};
redisCallback cb = {NULL, NULL, 0, NULL};
void *reply = NULL;
int status;
@ -492,22 +506,22 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
* write event fires. When connecting was not successful, the connect callback
* is called with a REDIS_ERR status and the context is free'd. */
static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
int completed = 0;
redisContext *c = &(ac->c);
if (redisCheckSocketError(c) == REDIS_ERR) {
/* Try again later when connect(2) is still in progress. */
if (errno == EINPROGRESS)
return REDIS_OK;
if (ac->onConnect) ac->onConnect(ac,REDIS_ERR);
if (redisCheckConnectDone(c, &completed) == REDIS_ERR) {
/* Error! */
redisCheckSocketError(c);
if (ac->onConnect) ac->onConnect(ac, REDIS_ERR);
__redisAsyncDisconnect(ac);
return REDIS_ERR;
} else if (completed == 1) {
/* connected! */
if (ac->onConnect) ac->onConnect(ac, REDIS_OK);
c->flags |= REDIS_CONNECTED;
return REDIS_OK;
} else {
return REDIS_OK;
}
/* Mark context as connected. */
c->flags |= REDIS_CONNECTED;
if (ac->onConnect) ac->onConnect(ac,REDIS_OK);
return REDIS_OK;
}
/* This function should be called when the socket is readable.
@ -583,6 +597,9 @@ static const char *nextArgument(const char *start, const char **str, size_t *len
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
redisContext *c = &(ac->c);
redisCallback cb;
struct dict *cbdict;
dictEntry *de;
redisCallback *existcb;
int pvariant, hasnext;
const char *cstr, *astr;
size_t clen, alen;
@ -596,6 +613,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
/* Setup callback */
cb.fn = fn;
cb.privdata = privdata;
cb.pending_subs = 1;
/* Find out which command will be appended. */
p = nextArgument(cmd,&cstr,&clen);
@ -612,9 +630,18 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
while ((p = nextArgument(p,&astr,&alen)) != NULL) {
sname = sdsnewlen(astr,alen);
if (pvariant)
ret = dictReplace(ac->sub.patterns,sname,&cb);
cbdict = ac->sub.patterns;
else
ret = dictReplace(ac->sub.channels,sname,&cb);
cbdict = ac->sub.channels;
de = dictFind(cbdict,sname);
if (de != NULL) {
existcb = dictGetEntryVal(de);
cb.pending_subs = existcb->pending_subs + 1;
}
ret = dictReplace(cbdict,sname,&cb);
if (ret == 0) sdsfree(sname);
}
@ -676,6 +703,8 @@ int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *priv
int len;
int status;
len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
if (len < 0)
return REDIS_ERR;
status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
sdsfree(cmd);
return status;