diff --git a/src/cluster.c b/src/cluster.c index 99c02cd86..4f6a66f2e 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1204,6 +1204,13 @@ clusterNode *getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, i return n; } +/* For redirects, verb must start with a dash, e.g. "-ASK" or "-MOVED". */ +sds clusterFormatRedirect(const char *verb, int slot, clusterNode *n, int use_tls_port) { + const char *endpoint = clusterNodePreferredEndpoint(n); + int port = clusterNodeClientPort(n, use_tls_port); + return sdscatprintf(sdsempty(), "%s %d %s:%d", verb, slot, endpoint, port); +} + /* Send the client the right redirection code, according to error_code * that should be set to one of CLUSTER_REDIR_* macros. * @@ -1229,11 +1236,8 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co error_code == CLUSTER_REDIR_ASK) { /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ - int port = clusterNodeClientPort(n, shouldReturnTlsInfo()); - addReplyErrorSds(c,sdscatprintf(sdsempty(), - "-%s %d %s:%d", - (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", - hashslot, clusterNodePreferredEndpoint(n), port)); + char *verb = (error_code == CLUSTER_REDIR_ASK) ? "-ASK" : "-MOVED"; + addReplyErrorSds(c, clusterFormatRedirect(verb, hashslot, n, shouldReturnTlsInfo())); } else { serverPanic("getNodeByQuery() unknown error."); } diff --git a/src/cluster.h b/src/cluster.h index a7211615d..f72594754 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -108,6 +108,7 @@ clusterNode *clusterLookupNode(const char *name, int length); clusterNode *getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); int clusterRedirectBlockedClientIfNeeded(client *c); void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code); +sds clusterFormatRedirect(const char *verb, int slot, clusterNode *n, int use_tls_port); void migrateCloseTimedoutSockets(void); unsigned int keyHashSlot(char *key, int keylen); int patternHashSlot(char *pattern, int length); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 843e6299c..e3587b6f3 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -307,6 +307,10 @@ typedef struct { clusterMsg msg; } clusterMsgSendBlock; +/* Special values for server.cluster->moved_slot_since_sleep */ +#define CLUSTER_MOVED_SLOT_NONE -2 +#define CLUSTER_MOVED_SLOT_MULTIPLE -3 + /* ----------------------------------------------------------------------------- * Initialization * -------------------------------------------------------------------------- */ @@ -962,6 +966,9 @@ void clusterInit(void) { server.cluster->state = CLUSTER_FAIL; server.cluster->size = 0; server.cluster->todo_before_sleep = 0; + server.cluster->moved_slot_since_sleep = CLUSTER_MOVED_SLOT_NONE; + server.cluster->moved_slot_channel = + createObject(OBJ_STRING, sdsnew("__cluster__:moved")); server.cluster->nodes = dictCreate(&clusterNodesDictType); server.cluster->shards = dictCreate(&clusterSdsToListType); server.cluster->nodes_black_list = @@ -4844,6 +4851,30 @@ void clusterCron(void) { clusterUpdateState(); } +/* Notify clients subscribed to slot moved events. */ +void clusterNotifyMovedSlot(int moved_slot, list *clients) { + clusterNode *n = server.cluster->slots[moved_slot]; + /* As for -MOVED redirects, the port in the message depends on whether the + * client is using TLS or not. */ + robj *messages[2] = {NULL, NULL}; /* Created lazily. */ + listNode *ln; + listIter li; + listRewind(clients, &li); + while ((ln = listNext(&li)) != NULL) { + client *c = ln->value; + /* TLS judgement is like shouldReturnTlsInfo() but for c instead of current_client. */ + int use_tls_port = c->conn ? connIsTLS(server.current_client->conn) : server.tls_cluster; + if (messages[use_tls_port] == NULL) { + sds s = clusterFormatRedirect("MOVED", moved_slot, n, use_tls_port); + messages[use_tls_port] = createObject(OBJ_STRING, s); + } + addReplyPubsubMessage(c, server.cluster->moved_slot_channel, messages[use_tls_port], shared.messagebulk); + updateClientMemUsageAndBucket(c); + } + if (messages[0]) decrRefCount(messages[0]); + if (messages[1]) decrRefCount(messages[1]); +} + /* This function is called before the event handler returns to sleep for * events. It is useful to perform operations that must be done ASAP in * reaction to events fired but that are not safe to perform inside event @@ -4851,10 +4882,12 @@ void clusterCron(void) { * a single time before replying to clients. */ void clusterBeforeSleep(void) { int flags = server.cluster->todo_before_sleep; + int moved_slot = server.cluster->moved_slot_since_sleep; /* Reset our flags (not strictly needed since every single function * called for flags set should be able to clear its flag). */ server.cluster->todo_before_sleep = 0; + server.cluster->moved_slot_since_sleep = CLUSTER_MOVED_SLOT_NONE; if (flags & CLUSTER_TODO_HANDLE_MANUALFAILOVER) { /* Handle manual failover as soon as possible so that won't have a 100ms @@ -4874,6 +4907,16 @@ void clusterBeforeSleep(void) { if (flags & CLUSTER_TODO_UPDATE_STATE) clusterUpdateState(); + /* Notify clients subscribed to moved slot events. To avoid flooding the + * clients, we only publish the moved slot message if exactly one slot has + * been migrated. In cases like failover, clients will receive -MOVED + * redirects and will need to refresh the full slot mapping with nodes + * including replicas. */ + if (moved_slot >= 0) { + list *clients = pubsubGetSubscribers(server.cluster->moved_slot_channel); + if (clients) clusterNotifyMovedSlot(moved_slot, clients); + } + /* Save the config, possibly using fsync. */ if (flags & CLUSTER_TODO_SAVE_CONFIG) { int fsync = flags & CLUSTER_TODO_FSYNC_CONFIG; @@ -4976,6 +5019,10 @@ int clusterAddSlot(clusterNode *n, int slot) { if (server.cluster->slots[slot]) return C_ERR; clusterNodeSetSlotBit(n,slot); server.cluster->slots[slot] = n; + if (server.cluster->moved_slot_since_sleep == CLUSTER_MOVED_SLOT_NONE) + server.cluster->moved_slot_since_sleep = slot; + else + server.cluster->moved_slot_since_sleep = CLUSTER_MOVED_SLOT_MULTIPLE; return C_OK; } diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 9caf07bae..40eee12af 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -341,6 +341,9 @@ struct clusterState { /* The following fields are used by masters to take state on elections. */ uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */ int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */ + int moved_slot_since_sleep; /* Single slot moved since last sleep or + * -1 for none or -2 for multiple. */ + robj *moved_slot_channel; /* Shared robj string object. */ /* Stats */ /* Messages received and sent by type. */ long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT]; diff --git a/src/pubsub.c b/src/pubsub.c index 1fcad2565..2ae784ac2 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -468,6 +468,14 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) { return count; } +/* Returns a list of clients subscribed to the given channel or NULL if the + * channel has no subscribers. The channel is non-sharded. Pattern subscriptions + * are not included. This is used for special channels for notifications.*/ +list *pubsubGetSubscribers(robj *channel) { + dictEntry *de = kvstoreDictFind(*pubSubType.serverPubSubChannels, 0, channel); + return de ? dictGetVal(de) : NULL; +} + /* * Publish a message to all the subscribers. */ diff --git a/src/server.h b/src/server.h index d891fccfb..c2e04ab8e 100644 --- a/src/server.h +++ b/src/server.h @@ -3181,6 +3181,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify); int pubsubPublishMessage(robj *channel, robj *message, int sharded); int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded); void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk); +list *pubsubGetSubscribers(robj *channel); int serverPubsubSubscriptionCount(void); int serverPubsubShardSubscriptionCount(void); size_t pubsubMemOverhead(client *c); diff --git a/tests/cluster/tests/15-cluster-slots.tcl b/tests/cluster/tests/15-cluster-slots.tcl index 892e9049b..d7f4bc1b0 100644 --- a/tests/cluster/tests/15-cluster-slots.tcl +++ b/tests/cluster/tests/15-cluster-slots.tcl @@ -50,14 +50,45 @@ test "client can handle keys with hash tag" { } test "slot migration is valid from primary to another primary" { - set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]] + set startup_port [get_instance_attrib redis 0 port] + set cluster [redis_cluster 127.0.0.1:$startup_port] set key order1 set slot [$cluster cluster keyslot $key] array set nodefrom [$cluster masternode_for_slot $slot] array set nodeto [$cluster masternode_notfor_slot $slot] + # A 3rd node for checking that it gets informed about the migration + array set node3 [$cluster masternode_notfor_slot $slot] + while { $node3(port) eq $nodefrom(port) || $node3(port) eq $nodeto(port) } { + array set node3 [$cluster masternode_notfor_slot $slot] + } + # Check that all nodes are different + assert_not_equal $nodefrom(port) $nodeto(port) + assert_not_equal $nodefrom(port) $node3(port) + assert_not_equal $nodeto(port) $node3(port) + + # Test subscribe to moved slot notifications + set rd1 [redis_deferring_client_by_addr 127.0.0.1 $nodefrom(port)] + set rd2 [redis_deferring_client_by_addr 127.0.0.1 $nodeto(port)] + set rd3 [redis_deferring_client_by_addr 127.0.0.1 $node3(port)] + assert_equal {1} [subscribe $rd1 {__cluster__:moved}] + assert_equal {1} [subscribe $rd2 {__cluster__:moved}] + assert_equal {1} [subscribe $rd3 {__cluster__:moved}] + + assert_equal {OK} [$nodeto(link) cluster setslot $slot importing $nodefrom(id)] + assert_equal {OK} [$nodefrom(link) cluster setslot $slot migrating $nodeto(id)] assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)] assert_equal {OK} [$nodeto(link) cluster setslot $slot node $nodeto(id)] + + # Check that we got the pubsub MOVED message from all nodes. + set expect_content "MOVED $slot 127.0.0.1:$nodeto(port)" + set expect_publish "message __cluster__:moved {$expect_content}" + assert_equal $expect_publish [$rd1 read] + assert_equal $expect_publish [$rd2 read] + assert_equal $expect_publish [$rd3 read] + $rd1 close + $rd2 close + $rd3 close } test "slot migration is invalid from primary to replica" { @@ -125,4 +156,4 @@ if {$::tls} { # Compare the ports in the first row assert_no_match [lindex $slots_tls 0 3 1] [lindex $slots_plain 0 3 1] } -} \ No newline at end of file +}