Streams: use non static macro node limits.

Also add the concept of size/items limit, instead of just having as
limit the number of bytes.
This commit is contained in:
antirez
2018-06-07 14:24:45 +02:00
parent c85ae56edc
commit 19a438e2c0
4 changed files with 28 additions and 2 deletions

View File

@ -1485,6 +1485,8 @@ void initServerConfig(void) {
server.zset_max_ziplist_entries = OBJ_ZSET_MAX_ZIPLIST_ENTRIES;
server.zset_max_ziplist_value = OBJ_ZSET_MAX_ZIPLIST_VALUE;
server.hll_sparse_max_bytes = CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES;
server.stream_node_max_bytes = OBJ_STREAM_NODE_MAX_BYTES;
server.stream_node_max_entries = OBJ_STREAM_NODE_MAX_ENTRIES;
server.shutdown_asap = 0;
server.cluster_enabled = 0;
server.cluster_node_timeout = CLUSTER_DEFAULT_NODE_TIMEOUT;

View File

@ -348,12 +348,14 @@ typedef long long mstime_t; /* millisecond time type. */
#define AOF_FSYNC_EVERYSEC 2
#define CONFIG_DEFAULT_AOF_FSYNC AOF_FSYNC_EVERYSEC
/* Zip structure related defaults */
/* Zipped structures related defaults */
#define OBJ_HASH_MAX_ZIPLIST_ENTRIES 512
#define OBJ_HASH_MAX_ZIPLIST_VALUE 64
#define OBJ_SET_MAX_INTSET_ENTRIES 512
#define OBJ_ZSET_MAX_ZIPLIST_ENTRIES 128
#define OBJ_ZSET_MAX_ZIPLIST_VALUE 64
#define OBJ_STREAM_NODE_MAX_BYTES 4096
#define OBJ_STREAM_NODE_MAX_ENTRIES 100
/* List defaults */
#define OBJ_LIST_MAX_ZIPLIST_SIZE -2
@ -1177,6 +1179,8 @@ struct redisServer {
size_t zset_max_ziplist_entries;
size_t zset_max_ziplist_value;
size_t hll_sparse_max_bytes;
size_t stream_node_max_bytes;
int64_t stream_node_max_entries;
/* List parameters */
int list_max_ziplist_size;
int list_compress_depth;

View File

@ -237,8 +237,20 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
* regular stream entries (see below), and marks the fact that there are
* no more entires, when we scan the stream from right to left. */
/* First of all, check if we can append to the current macro node or
* if we need to switch to the next one. 'lp' will be set to NULL if
* the current node is full. */
if (lp != NULL) {
if (lp_bytes > server.stream_node_max_bytes) {
lp = NULL;
} else {
int64_t count = lpGetInteger(lpFirst(lp));
if (count > server.stream_node_max_entries) lp = NULL;
}
}
int flags = STREAM_ITEM_FLAG_NONE;
if (lp == NULL || lp_bytes > STREAM_BYTES_PER_LISTPACK) {
if (lp == NULL || lp_bytes > server.stream_node_max_bytes) {
master_id = id;
streamEncodeID(rax_key,&id);
/* Create the listpack having the master entry ID and fields. */