mirror of
https://github.com/fluencelabs/redis
synced 2025-06-26 23:41:33 +00:00
non-blocking VM data structures, just a start
This commit is contained in:
53
redis.c
53
redis.c
@ -59,6 +59,7 @@
|
||||
#include <sys/uio.h>
|
||||
#include <limits.h>
|
||||
#include <math.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#if defined(__sun)
|
||||
#include "solarisfixes.h"
|
||||
@ -162,6 +163,7 @@
|
||||
* Check vmFindContiguousPages() to know more about this magic numbers. */
|
||||
#define REDIS_VM_MAX_NEAR_PAGES 65536
|
||||
#define REDIS_VM_MAX_RANDOM_JUMP 4096
|
||||
#define REDIS_VM_MAX_THREADS 32
|
||||
|
||||
/* Client flags */
|
||||
#define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
|
||||
@ -170,6 +172,7 @@
|
||||
#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
|
||||
#define REDIS_MULTI 16 /* This client is in a MULTI context */
|
||||
#define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
|
||||
#define REDIS_IO_WAIT 64 /* The client is waiting for Virutal Memory I/O */
|
||||
|
||||
/* Slave replication state - slave side */
|
||||
#define REDIS_REPL_NONE 0 /* No active replication */
|
||||
@ -303,6 +306,8 @@ typedef struct redisClient {
|
||||
int blockingkeysnum; /* Number of blocking keys */
|
||||
time_t blockingto; /* Blocking operation timeout. If UNIX current time
|
||||
* is >= blockingto then the operation timed out. */
|
||||
list *io_keys; /* Keys this client is waiting to be loaded from the
|
||||
* swap file in order to continue. */
|
||||
} redisClient;
|
||||
|
||||
struct saveparam {
|
||||
@ -381,6 +386,16 @@ struct redisServer {
|
||||
off_t vm_near_pages; /* Number of pages allocated sequentially */
|
||||
unsigned char *vm_bitmap; /* Bitmap of free/used pages */
|
||||
time_t unixtime; /* Unix time sampled every second. */
|
||||
/* Virtual memory I/O threads stuff */
|
||||
pthread_t io_threads[REDIS_VM_MAX_THREADS];
|
||||
/* An I/O thread process an element taken from the io_jobs queue and
|
||||
* put the result of the operation in the io_done list. */
|
||||
list *io_jobs; /* List of VM I/O jobs */
|
||||
list *io_done; /* List of VM processed jobs */
|
||||
list *io_clients; /* All the clients waiting for SWAP I/O operations */
|
||||
pthread_mutex_t io_mutex; /* lock to access io_jobs and io_done */
|
||||
int io_active_threads; /* Number of running I/O threads */
|
||||
int vm_max_threads; /* Max number of I/O threads running at the same time */
|
||||
/* Virtual memory stats */
|
||||
unsigned long long vm_stats_used_pages;
|
||||
unsigned long long vm_stats_swapped_objects;
|
||||
@ -451,6 +466,18 @@ struct sharedObjectsStruct {
|
||||
|
||||
static double R_Zero, R_PosInf, R_NegInf, R_Nan;
|
||||
|
||||
/* VM threaded I/O request message */
|
||||
#define REDIS_IOREQ_LOAD 0
|
||||
#define REDIS_IOREQ_SWAP 1
|
||||
typedef struct ioreq {
|
||||
int type; /* Request type, REDIS_IOREQ_* */
|
||||
int dbid; /* Redis database ID */
|
||||
robj *key; /* This I/O request is about swapping this key */
|
||||
robj *val; /* the value to swap for REDIS_IOREQ_SWAP, otherwise this
|
||||
* field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
|
||||
off_t page; /* Swap page where to read/write the object */
|
||||
} ioreq;
|
||||
|
||||
/*================================ Prototypes =============================== */
|
||||
|
||||
static void freeStringObject(robj *o);
|
||||
@ -1292,6 +1319,7 @@ static void initServerConfig() {
|
||||
server.vm_page_size = 256; /* 256 bytes per page */
|
||||
server.vm_pages = 1024*1024*100; /* 104 millions of pages */
|
||||
server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
|
||||
server.vm_max_threads = 4;
|
||||
|
||||
resetServerSaveParams();
|
||||
|
||||
@ -1539,6 +1567,8 @@ static void loadServerConfig(char *filename) {
|
||||
server.vm_page_size = strtoll(argv[1], NULL, 10);
|
||||
} else if (!strcasecmp(argv[0],"vm-pages") && argc == 2) {
|
||||
server.vm_pages = strtoll(argv[1], NULL, 10);
|
||||
} else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) {
|
||||
server.vm_max_threads = strtoll(argv[1], NULL, 10);
|
||||
} else {
|
||||
err = "Bad directive or wrong number of arguments"; goto loaderr;
|
||||
}
|
||||
@ -1587,9 +1617,17 @@ static void freeClient(redisClient *c) {
|
||||
listRelease(c->reply);
|
||||
freeClientArgv(c);
|
||||
close(c->fd);
|
||||
/* Remove from the list of clients */
|
||||
ln = listSearchKey(server.clients,c);
|
||||
redisAssert(ln != NULL);
|
||||
listDelNode(server.clients,ln);
|
||||
/* Remove from the list of clients waiting for VM operations */
|
||||
if (server.vm_enabled && listLength(c->io_keys)) {
|
||||
ln = listSearchKey(server.io_clients,c);
|
||||
if (ln) listDelNode(server.io_clients,ln);
|
||||
listRelease(c->io_keys);
|
||||
}
|
||||
/* Other cleanup */
|
||||
if (c->flags & REDIS_SLAVE) {
|
||||
if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
|
||||
close(c->repldbfd);
|
||||
@ -2053,7 +2091,7 @@ again:
|
||||
* would not be called at all, but after the execution of the first commands
|
||||
* in the input buffer the client may be blocked, and the "goto again"
|
||||
* will try to reiterate. The following line will make it return asap. */
|
||||
if (c->flags & REDIS_BLOCKED) return;
|
||||
if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
|
||||
if (c->bulklen == -1) {
|
||||
/* Read the first line of the query */
|
||||
char *p = strchr(c->querybuf,'\n');
|
||||
@ -2190,10 +2228,12 @@ static redisClient *createClient(int fd) {
|
||||
c->authenticated = 0;
|
||||
c->replstate = REDIS_REPL_NONE;
|
||||
c->reply = listCreate();
|
||||
c->blockingkeys = NULL;
|
||||
c->blockingkeysnum = 0;
|
||||
listSetFreeMethod(c->reply,decrRefCount);
|
||||
listSetDupMethod(c->reply,dupClientReplyValue);
|
||||
c->blockingkeys = NULL;
|
||||
c->blockingkeysnum = 0;
|
||||
c->io_keys = listCreate();
|
||||
listSetFreeMethod(c->io_keys,decrRefCount);
|
||||
if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
|
||||
readQueryFromClient, c) == AE_ERR) {
|
||||
freeClient(c);
|
||||
@ -6823,6 +6863,13 @@ static void vmInit(void) {
|
||||
/* Try to remove the swap file, so the OS will really delete it from the
|
||||
* file system when Redis exists. */
|
||||
unlink("/tmp/redisvm");
|
||||
|
||||
/* Initialize threaded I/O */
|
||||
server.io_jobs = listCreate();
|
||||
server.io_done = listCreate();
|
||||
server.io_clients = listCreate();
|
||||
pthread_mutex_init(&server.io_mutex,NULL);
|
||||
server.io_active_threads = 0;
|
||||
}
|
||||
|
||||
/* Mark the page as used */
|
||||
|
Reference in New Issue
Block a user