Skip to content

Commit

Permalink
Introduce CLUSTER SLOT-STATS command (#20).
Browse files Browse the repository at this point in the history
The command provides detailed slot usage statistics upon invocation,
with initial support for key-count metric. cpu-usec (approved) and
memory-bytes (pending-approval) metrics will soon follow after the
merger of this PR.
  • Loading branch information
kyle-yh-kim committed Apr 23, 2024
1 parent 87a5bfc commit 605c923
Show file tree
Hide file tree
Showing 7 changed files with 655 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/Makefile
Expand Up @@ -383,7 +383,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slots.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
2 changes: 2 additions & 0 deletions src/cluster.h
Expand Up @@ -103,6 +103,8 @@ char *clusterNodeHostname(clusterNode *node);
const char *clusterNodePreferredEndpoint(clusterNode *n);
long long clusterNodeReplOffset(clusterNode *node);
clusterNode *clusterLookupNode(const char *name, int length);
unsigned int countKeysInSlot(unsigned int hashslot);
int getSlotOrReply(client *c, robj *o);

/* functions with shared implementations */
clusterNode *getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
Expand Down
217 changes: 217 additions & 0 deletions src/cluster_slots.c
@@ -0,0 +1,217 @@
/* Cluster slots APIs and commands - to retrieve, update and process slot level data
* in association with Valkey cluster.
*
* Copyright (c) 2024, Kyle Kim <kimkyle at amazon dot com>, Amazon Web Services.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Valkey nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "server.h"
#include "cluster.h"
#include "cluster_legacy.h"

#define DEFAULT_SLOT -1
#define DEFAULT_STAT 0
#define UNASSIGNED_SLOT 0
#define ORDER_BY_KEY_COUNT 1
#define ORDER_BY_INVALID -1

/* -----------------------------------------------------------------------------
* CLUSTER SLOT-STATS command
* -------------------------------------------------------------------------- */

typedef struct sortedSlotStatEntry {
int slot;
uint64_t stat;
} sortedSlotStatEntry;

static int doesSlotBelongToMyShard(int slot) {
clusterNode *n = clusterNodeGetMaster(server.cluster->myself);
return server.cluster->slots[slot] == n;
}

static void markAssignedSlots(unsigned char *slots) {
for (int slot = 0; slot < CLUSTER_SLOTS; slot++) {
if (doesSlotBelongToMyShard(slot)) slots[slot]++;
}
}

static int countAssignedSlotsFromSlotsArray(unsigned char *slots) {
int count = 0;
for (int slot = 0; slot < CLUSTER_SLOTS; slot++) {
if (slots[slot]) count++;
}
return count;
}

static void checkSlotAssignment(unsigned char *slots, int start_slot, int end_slot) {
for (int slot = start_slot; slot <= end_slot; slot++) {
if (doesSlotBelongToMyShard(slot)) {
slots[slot]++;
}
}
}

static uint64_t getSingleSlotStat(int slot, int order_by) {
serverAssert(order_by != ORDER_BY_INVALID);
uint64_t singleSlotStat = 0;
if (order_by == ORDER_BY_KEY_COUNT) {
singleSlotStat = countKeysInSlot(slot);
}
return singleSlotStat;
}

static int slotStatEntryAscCmp(const void *a, const void *b) {
sortedSlotStatEntry entry_a = *((sortedSlotStatEntry *) a);
sortedSlotStatEntry entry_b = *((sortedSlotStatEntry *) b);
return entry_a.stat - entry_b.stat;
}

static int slotStatEntryDescCmp(const void *a, const void *b) {
sortedSlotStatEntry entry_a = *((sortedSlotStatEntry *) a);
sortedSlotStatEntry entry_b = *((sortedSlotStatEntry *) b);
return entry_b.stat - entry_a.stat;
}

static void sortSlotStats(sortedSlotStatEntry sorted[], int order_by, int desc) {
int i = 0;

for (int slot = 0; slot < CLUSTER_SLOTS; slot++) {
if (doesSlotBelongToMyShard(slot)) {
sorted[i].slot = slot;
sorted[i].stat = getSingleSlotStat(slot, order_by);
i++;
}
}
qsort(sorted, i, sizeof(sortedSlotStatEntry), (desc) ? slotStatEntryDescCmp : slotStatEntryAscCmp);
}

static void addReplySingleSlotStat(client *c, int slot) {
addReplyLongLong(c, slot);
addReplyMapLen(c, 1);
addReplyBulkCString(c, "key-count");
addReplyLongLong(c, countKeysInSlot(slot));
}

static void addReplySlotStats(client *c, unsigned char *slots) {
int num_slots_assigned = countAssignedSlotsFromSlotsArray(slots);
addReplyMapLen(c, num_slots_assigned);

for (int slot = 0; slot < CLUSTER_SLOTS; slot++) {
if (slots[slot]) addReplySingleSlotStat(c, slot);
}
}

static void addReplySortedSlotStats(client *c, sortedSlotStatEntry sorted[], long limit) {
int num_slots_assigned = getMyShardSlotCount();
int len = min(limit, num_slots_assigned);
addReplyMapLen(c, len);

for (int i = 0; i < len; i++) {
addReplySingleSlotStat(c, sorted[i].slot);
}
}

static void sortAndAddReplySlotStats(client *c, int order_by, long limit, int desc) {
sortedSlotStatEntry sorted[CLUSTER_SLOTS];
sortSlotStats(sorted, order_by, desc);
addReplySortedSlotStats(c, sorted, limit);
}

void clusterSlotStatsCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}

/* Initialize slot assignment array. */
unsigned char slots[CLUSTER_SLOTS]= {UNASSIGNED_SLOT};

/* No further arguments. */
if (c->argc == 2) {
/* CLUSTER SLOT-STATS */
markAssignedSlots(slots);
addReplySlotStats(c, slots);
return;
}

/* Parse additional arguments. */
if (!strcasecmp(c->argv[2]->ptr,"slotsrange") && c->argc == 5) {
/* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */
int startslot, endslot;
if ((startslot = getSlotOrReply(c,c->argv[3])) == C_ERR ||
(endslot = getSlotOrReply(c,c->argv[4])) == C_ERR) {
return;
}
if (startslot > endslot) {
addReplyErrorFormat(c,"start slot number %d is greater than end slot number %d", startslot, endslot);
return;
}
checkSlotAssignment(slots, startslot, endslot);
addReplySlotStats(c, slots);
} else if (!strcasecmp(c->argv[2]->ptr,"orderby") && c->argc >= 4) {
/* CLUSTER SLOT-STATS ORDERBY column [LIMIT limit] [ASC | DESC] */
int desc = 1, order_by = ORDER_BY_INVALID;
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
order_by = ORDER_BY_KEY_COUNT;
} else {
addReplyError(c, "unrecognized sort column for ORDER BY. The supported columns are: key-count.");
return;
}
int i = 4; /* Next argument index, following ORDERBY */
int limit_counter = 0, asc_desc_counter = 0;
long limit;
while(i < c->argc) {
int moreargs = c->argc > i+1;
if (!strcasecmp(c->argv[i]->ptr,"limit") && moreargs) {
if (getRangeLongFromObjectOrReply(
c, c->argv[i+1], 1, CLUSTER_SLOTS, &limit,
"limit has to lie in between 1 and 16384 (maximum number of slots)") != C_OK)
return;
i++;
limit_counter++;
} else if (!strcasecmp(c->argv[i]->ptr,"asc")) {
desc = 0;
asc_desc_counter++;
} else if (!strcasecmp(c->argv[i]->ptr,"desc")) {
desc = 1;
asc_desc_counter++;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}

if (limit_counter > 1 || asc_desc_counter > 1) {
addReplyError(c, "you cannot provide multiple filters of the same type.");
return;
}
i++;
}
sortAndAddReplySlotStats(c, order_by, limit, desc);
} else {
addReplySubcommandSyntaxError(c);
}
}
51 changes: 51 additions & 0 deletions src/commands.def
Expand Up @@ -921,6 +921,56 @@ struct COMMAND_ARG CLUSTER_SLAVES_Args[] = {
{MAKE_ARG("node-id",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/********** CLUSTER SLOT_STATS ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
/* CLUSTER SLOT_STATS history */
#define CLUSTER_SLOT_STATS_History NULL
#endif

#ifndef SKIP_CMD_TIPS_TABLE
/* CLUSTER SLOT_STATS tips */
const char *CLUSTER_SLOT_STATS_Tips[] = {
"nondeterministic_output",
"all_shards",
};
#endif

#ifndef SKIP_CMD_KEY_SPECS_TABLE
/* CLUSTER SLOT_STATS key specs */
#define CLUSTER_SLOT_STATS_Keyspecs NULL
#endif

/* CLUSTER SLOT_STATS filter slotsrange argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_slotsrange_Subargs[] = {
{MAKE_ARG("start-slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("end-slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* CLUSTER SLOT_STATS filter orderby order argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_orderby_order_Subargs[] = {
{MAKE_ARG("asc",ARG_TYPE_PURE_TOKEN,-1,"ASC",NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("desc",ARG_TYPE_PURE_TOKEN,-1,"DESC",NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* CLUSTER SLOT_STATS filter orderby argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_orderby_Subargs[] = {
{MAKE_ARG("column",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("limit",ARG_TYPE_INTEGER,-1,"LIMIT",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)},
{MAKE_ARG("order",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_SLOT_STATS_filter_orderby_order_Subargs},
};

/* CLUSTER SLOT_STATS filter argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_Subargs[] = {
{MAKE_ARG("slotsrange",ARG_TYPE_BLOCK,-1,"SLOTSRANGE",NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_SLOT_STATS_filter_slotsrange_Subargs},
{MAKE_ARG("orderby",ARG_TYPE_BLOCK,-1,"ORDERBY",NULL,NULL,CMD_ARG_OPTIONAL,3,NULL),.subargs=CLUSTER_SLOT_STATS_filter_orderby_Subargs},
};

/* CLUSTER SLOT_STATS argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_Args[] = {
{MAKE_ARG("filter",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_SLOT_STATS_filter_Subargs},
};

/********** CLUSTER SLOTS ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
Expand Down Expand Up @@ -972,6 +1022,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,0,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_SETSLOT_Args},
{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)},
{MAKE_CMD("slaves","Lists the replica nodes of a master node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args},
{MAKE_CMD("slot-stats","Return array of slot usage statistics for slots assigned to the current node","O(N) where N is the total number of slots based on arguments. O(N log N) with ORDERBY subcommand.","7.2.6",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOT_STATS_History,0,CLUSTER_SLOT_STATS_Tips,2,clusterSlotStatsCommand,-2,CMD_STALE|CMD_LOADING,0,CLUSTER_SLOT_STATS_Keyspecs,0,NULL,1),.args=CLUSTER_SLOT_STATS_Args},
{MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER SHARDS`","7.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)},
{0}
};
Expand Down
79 changes: 79 additions & 0 deletions src/commands/cluster-slot-stats.json
@@ -0,0 +1,79 @@
{
"SLOT-STATS": {
"summary": "Return array of slot usage statistics for slots assigned to the current node",
"complexity": "O(N) where N is the total number of slots based on arguments. O(N log N) with ORDERBY subcommand.",
"group": "cluster",
"since": "7.2.6",
"arity": -2,
"container": "CLUSTER",
"function": "clusterSlotStatsCommand",
"command_flags": [
"STALE",
"LOADING"
],
"command_tips": [
"NONDETERMINISTIC_OUTPUT",
"ALL_SHARDS"
],
"arguments": [
{
"name": "filter",
"type": "oneof",
"optional": true,
"arguments": [
{
"token": "SLOTSRANGE",
"name": "slotsrange",
"type": "block",
"optional": true,
"arguments": [
{
"name": "start-slot",
"type": "integer"
},
{
"name": "end-slot",
"type": "integer"
}
]
},
{
"token": "ORDERBY",
"name": "orderby",
"type": "block",
"optional": true,
"arguments": [
{
"name": "column",
"type": "string"
},
{
"token": "LIMIT",
"name": "limit",
"type": "integer",
"optional": true
},
{
"name": "order",
"type": "oneof",
"optional": true,
"arguments": [
{
"name": "asc",
"type": "pure-token",
"token": "ASC"
},
{
"name": "desc",
"type": "pure-token",
"token": "DESC"
}
]
}
]
}
]
}
]
}
}
1 change: 1 addition & 0 deletions src/server.h
Expand Up @@ -3662,6 +3662,7 @@ void sunsubscribeCommand(client *c);
void watchCommand(client *c);
void unwatchCommand(client *c);
void clusterCommand(client *c);
void clusterSlotStatsCommand(client *c);
void restoreCommand(client *c);
void migrateCommand(client *c);
void askingCommand(client *c);
Expand Down

0 comments on commit 605c923

Please sign in to comment.