Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow clients to subscribe to slot migrations #298

Open
wants to merge 2 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/cluster.c
Expand Up @@ -1204,6 +1204,13 @@ clusterNode *getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, i
return n;
}

/* For redirects, verb must start with a dash, e.g. "-ASK" or "-MOVED". */
sds clusterFormatRedirect(const char *verb, int slot, clusterNode *n, int use_tls_port) {
const char *endpoint = clusterNodePreferredEndpoint(n);
int port = clusterNodeClientPort(n, use_tls_port);
return sdscatprintf(sdsempty(), "%s %d %s:%d", verb, slot, endpoint, port);
}

Comment on lines +1207 to +1213
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any clients that primarily store a map of NodeID -> Nodes as opposed to endpoint:port -> Nodes? I ask because I'm wondering if it would be useful to also return the NodeID here as well. I know python doesn't, but I'm less familiar with the other clients, but if there are nodes that don't have the main node map key'd off the endpoints, then maybe it would be easier for them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, but since redirects use the host:port form, clients need to be able to identify them by this.

/* Send the client the right redirection code, according to error_code
* that should be set to one of CLUSTER_REDIR_* macros.
*
Expand All @@ -1229,11 +1236,8 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
error_code == CLUSTER_REDIR_ASK)
{
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
int port = clusterNodeClientPort(n, shouldReturnTlsInfo());
addReplyErrorSds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d",
(error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot, clusterNodePreferredEndpoint(n), port));
char *verb = (error_code == CLUSTER_REDIR_ASK) ? "-ASK" : "-MOVED";
addReplyErrorSds(c, clusterFormatRedirect(verb, hashslot, n, shouldReturnTlsInfo()));
} else {
serverPanic("getNodeByQuery() unknown error.");
}
Expand Down
1 change: 1 addition & 0 deletions src/cluster.h
Expand Up @@ -108,6 +108,7 @@ clusterNode *clusterLookupNode(const char *name, int length);
clusterNode *getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
int clusterRedirectBlockedClientIfNeeded(client *c);
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code);
sds clusterFormatRedirect(const char *verb, int slot, clusterNode *n, int use_tls_port);
void migrateCloseTimedoutSockets(void);
unsigned int keyHashSlot(char *key, int keylen);
int patternHashSlot(char *pattern, int length);
Expand Down
47 changes: 47 additions & 0 deletions src/cluster_legacy.c
Expand Up @@ -307,6 +307,10 @@ typedef struct {
clusterMsg msg;
} clusterMsgSendBlock;

/* Special values for server.cluster->moved_slot_since_sleep */
#define CLUSTER_MOVED_SLOT_NONE -2
#define CLUSTER_MOVED_SLOT_MULTIPLE -3

/* -----------------------------------------------------------------------------
* Initialization
* -------------------------------------------------------------------------- */
Expand Down Expand Up @@ -962,6 +966,9 @@ void clusterInit(void) {
server.cluster->state = CLUSTER_FAIL;
server.cluster->size = 0;
server.cluster->todo_before_sleep = 0;
server.cluster->moved_slot_since_sleep = CLUSTER_MOVED_SLOT_NONE;
server.cluster->moved_slot_channel =
createObject(OBJ_STRING, sdsnew("__cluster__:moved"));
server.cluster->nodes = dictCreate(&clusterNodesDictType);
server.cluster->shards = dictCreate(&clusterSdsToListType);
server.cluster->nodes_black_list =
Expand Down Expand Up @@ -4844,17 +4851,43 @@ void clusterCron(void) {
clusterUpdateState();
}

/* Notify clients subscribed to slot moved events. */
void clusterNotifyMovedSlot(int moved_slot, list *clients) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like this should be cluster.h, it seems like all cluster implementations would want to send this type of notification.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function is using clusterNode which is only in cluster_legacy.[ch].

This separation of cluster and cluster_legacy is quite arbitrary. I don't mind that you fix it or we can just merge the two again. Then I'll rebase this PR. :)

Do you have a better idea?

clusterNode *n = server.cluster->slots[moved_slot];
/* As for -MOVED redirects, the port in the message depends on whether the
* client is using TLS or not. */
robj *messages[2] = {NULL, NULL}; /* Created lazily. */
listNode *ln;
listIter li;
listRewind(clients, &li);
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
/* TLS judgement is like shouldReturnTlsInfo() but for c instead of current_client. */
int use_tls_port = c->conn ? connIsTLS(server.current_client->conn) : server.tls_cluster;
if (messages[use_tls_port] == NULL) {
sds s = clusterFormatRedirect("MOVED", moved_slot, n, use_tls_port);
messages[use_tls_port] = createObject(OBJ_STRING, s);
}
addReplyPubsubMessage(c, server.cluster->moved_slot_channel, messages[use_tls_port], shared.messagebulk);
updateClientMemUsageAndBucket(c);
}
if (messages[0]) decrRefCount(messages[0]);
if (messages[1]) decrRefCount(messages[1]);
}

/* This function is called before the event handler returns to sleep for
* events. It is useful to perform operations that must be done ASAP in
* reaction to events fired but that are not safe to perform inside event
* handlers, or to perform potentially expansive tasks that we need to do
* a single time before replying to clients. */
void clusterBeforeSleep(void) {
int flags = server.cluster->todo_before_sleep;
int moved_slot = server.cluster->moved_slot_since_sleep;

/* Reset our flags (not strictly needed since every single function
* called for flags set should be able to clear its flag). */
server.cluster->todo_before_sleep = 0;
server.cluster->moved_slot_since_sleep = CLUSTER_MOVED_SLOT_NONE;

if (flags & CLUSTER_TODO_HANDLE_MANUALFAILOVER) {
/* Handle manual failover as soon as possible so that won't have a 100ms
Expand All @@ -4874,6 +4907,16 @@ void clusterBeforeSleep(void) {
if (flags & CLUSTER_TODO_UPDATE_STATE)
clusterUpdateState();

/* Notify clients subscribed to moved slot events. To avoid flooding the
* clients, we only publish the moved slot message if exactly one slot has
* been migrated. In cases like failover, clients will receive -MOVED
* redirects and will need to refresh the full slot mapping with nodes
* including replicas. */
if (moved_slot >= 0) {
list *clients = pubsubGetSubscribers(server.cluster->moved_slot_channel);
if (clients) clusterNotifyMovedSlot(moved_slot, clients);
}

/* Save the config, possibly using fsync. */
if (flags & CLUSTER_TODO_SAVE_CONFIG) {
int fsync = flags & CLUSTER_TODO_FSYNC_CONFIG;
Expand Down Expand Up @@ -4976,6 +5019,10 @@ int clusterAddSlot(clusterNode *n, int slot) {
if (server.cluster->slots[slot]) return C_ERR;
clusterNodeSetSlotBit(n,slot);
server.cluster->slots[slot] = n;
if (server.cluster->moved_slot_since_sleep == CLUSTER_MOVED_SLOT_NONE)
server.cluster->moved_slot_since_sleep = slot;
else
server.cluster->moved_slot_since_sleep = CLUSTER_MOVED_SLOT_MULTIPLE;
return C_OK;
}

Expand Down
3 changes: 3 additions & 0 deletions src/cluster_legacy.h
Expand Up @@ -341,6 +341,9 @@ struct clusterState {
/* The following fields are used by masters to take state on elections. */
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
int moved_slot_since_sleep; /* Single slot moved since last sleep or
* -1 for none or -2 for multiple. */
robj *moved_slot_channel; /* Shared robj string object. */
/* Stats */
/* Messages received and sent by type. */
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
Expand Down
8 changes: 8 additions & 0 deletions src/pubsub.c
Expand Up @@ -468,6 +468,14 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) {
return count;
}

/* Returns a list of clients subscribed to the given channel or NULL if the
* channel has no subscribers. The channel is non-sharded. Pattern subscriptions
* are not included. This is used for special channels for notifications.*/
list *pubsubGetSubscribers(robj *channel) {
dictEntry *de = kvstoreDictFind(*pubSubType.serverPubSubChannels, 0, channel);
return de ? dictGetVal(de) : NULL;
}

/*
* Publish a message to all the subscribers.
*/
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Expand Up @@ -3181,6 +3181,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify);
int pubsubPublishMessage(robj *channel, robj *message, int sharded);
int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded);
void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk);
list *pubsubGetSubscribers(robj *channel);
int serverPubsubSubscriptionCount(void);
int serverPubsubShardSubscriptionCount(void);
size_t pubsubMemOverhead(client *c);
Expand Down
35 changes: 33 additions & 2 deletions tests/cluster/tests/15-cluster-slots.tcl
Expand Up @@ -50,14 +50,45 @@ test "client can handle keys with hash tag" {
}

test "slot migration is valid from primary to another primary" {
set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]]
set startup_port [get_instance_attrib redis 0 port]
set cluster [redis_cluster 127.0.0.1:$startup_port]
set key order1
set slot [$cluster cluster keyslot $key]
array set nodefrom [$cluster masternode_for_slot $slot]
array set nodeto [$cluster masternode_notfor_slot $slot]
# A 3rd node for checking that it gets informed about the migration
array set node3 [$cluster masternode_notfor_slot $slot]
while { $node3(port) eq $nodefrom(port) || $node3(port) eq $nodeto(port) } {
array set node3 [$cluster masternode_notfor_slot $slot]
}
# Check that all nodes are different
assert_not_equal $nodefrom(port) $nodeto(port)
assert_not_equal $nodefrom(port) $node3(port)
assert_not_equal $nodeto(port) $node3(port)

# Test subscribe to moved slot notifications
set rd1 [redis_deferring_client_by_addr 127.0.0.1 $nodefrom(port)]
set rd2 [redis_deferring_client_by_addr 127.0.0.1 $nodeto(port)]
set rd3 [redis_deferring_client_by_addr 127.0.0.1 $node3(port)]
assert_equal {1} [subscribe $rd1 {__cluster__:moved}]
assert_equal {1} [subscribe $rd2 {__cluster__:moved}]
assert_equal {1} [subscribe $rd3 {__cluster__:moved}]

assert_equal {OK} [$nodeto(link) cluster setslot $slot importing $nodefrom(id)]
assert_equal {OK} [$nodefrom(link) cluster setslot $slot migrating $nodeto(id)]

assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)]
assert_equal {OK} [$nodeto(link) cluster setslot $slot node $nodeto(id)]

# Check that we got the pubsub MOVED message from all nodes.
set expect_content "MOVED $slot 127.0.0.1:$nodeto(port)"
set expect_publish "message __cluster__:moved {$expect_content}"
assert_equal $expect_publish [$rd1 read]
assert_equal $expect_publish [$rd2 read]
assert_equal $expect_publish [$rd3 read]
$rd1 close
$rd2 close
$rd3 close
}

test "slot migration is invalid from primary to replica" {
Expand Down Expand Up @@ -125,4 +156,4 @@ if {$::tls} {
# Compare the ports in the first row
assert_no_match [lindex $slots_tls 0 3 1] [lindex $slots_plain 0 3 1]
}
}
}