Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate sharded pubsub data via replication link #307

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/commands.def
Expand Up @@ -10768,7 +10768,7 @@ struct COMMAND_STRUCT serverCommandTable[] = {
{MAKE_CMD("publish","Posts a message to a channel.","O(N+M) where N is the number of clients subscribed to the receiving channel and M is the total number of subscribed patterns (by any client).","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PUBLISH_History,0,PUBLISH_Tips,0,publishCommand,3,CMD_PUBSUB|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_MAY_REPLICATE|CMD_SENTINEL,0,PUBLISH_Keyspecs,0,NULL,2),.args=PUBLISH_Args},
{MAKE_CMD("pubsub","A container for Pub/Sub commands.","Depends on subcommand.","2.8.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PUBSUB_History,0,PUBSUB_Tips,0,NULL,-2,0,0,PUBSUB_Keyspecs,0,NULL,0),.subcommands=PUBSUB_Subcommands},
{MAKE_CMD("punsubscribe","Stops listening to messages published to channels that match one or more patterns.","O(N) where N is the number of patterns to unsubscribe.","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PUNSUBSCRIBE_History,0,PUNSUBSCRIBE_Tips,0,punsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,PUNSUBSCRIBE_Keyspecs,0,NULL,1),.args=PUNSUBSCRIBE_Args},
{MAKE_CMD("spublish","Post a message to a shard channel","O(N) where N is the number of clients subscribed to the receiving shard channel.","7.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SPUBLISH_History,0,SPUBLISH_Tips,0,spublishCommand,3,CMD_PUBSUB|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_MAY_REPLICATE,0,SPUBLISH_Keyspecs,1,NULL,2),.args=SPUBLISH_Args},
{MAKE_CMD("spublish","Post a message to a shard channel","O(N) where N is the number of clients subscribed to the receiving shard channel.","7.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SPUBLISH_History,0,SPUBLISH_Tips,0,spublishCommand,3,CMD_PUBSUB|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_MAY_REPLICATE|CMD_WRITE,0,SPUBLISH_Keyspecs,1,NULL,2),.args=SPUBLISH_Args},
{MAKE_CMD("ssubscribe","Listens for messages published to shard channels.","O(N) where N is the number of shard channels to subscribe to.","7.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SSUBSCRIBE_History,0,SSUBSCRIBE_Tips,0,ssubscribeCommand,-2,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0,SSUBSCRIBE_Keyspecs,1,NULL,1),.args=SSUBSCRIBE_Args},
{MAKE_CMD("subscribe","Listens for messages published to channels.","O(N) where N is the number of channels to subscribe to.","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SUBSCRIBE_History,0,SUBSCRIBE_Tips,0,subscribeCommand,-2,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,SUBSCRIBE_Keyspecs,0,NULL,1),.args=SUBSCRIBE_Args},
{MAKE_CMD("sunsubscribe","Stops listening to messages posted to shard channels.","O(N) where N is the number of shard channels to unsubscribe.","7.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SUNSUBSCRIBE_History,0,SUNSUBSCRIBE_Tips,0,sunsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0,SUNSUBSCRIBE_Keyspecs,1,NULL,1),.args=SUNSUBSCRIBE_Args},
Expand Down
3 changes: 2 additions & 1 deletion src/commands/spublish.json
Expand Up @@ -11,7 +11,8 @@
"LOADING",
"STALE",
"FAST",
"MAY_REPLICATE"
"MAY_REPLICATE",
"WRITE"
],
"arguments": [
{
Expand Down
5 changes: 2 additions & 3 deletions src/pubsub.c
Expand Up @@ -717,9 +717,8 @@ void channelList(client *c, sds pat, kvstore *pubsub_channels) {

/* SPUBLISH <shardchannel> <message> */
void spublishCommand(client *c) {
int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],1);
if (!server.cluster_enabled)
forceCommandPropagation(c,PROPAGATE_REPL);
int receivers = pubsubPublishMessage(c->argv[1], c->argv[2], 1);
forceCommandPropagation(c, PROPAGATE_REPL);
addReplyLongLong(c,receivers);
}

Expand Down
7 changes: 4 additions & 3 deletions tests/unit/cluster/links.tcl
Expand Up @@ -74,7 +74,7 @@ start_cluster 1 2 {tags {external:skip cluster}} {
set primary [Rn $primary_id]
set replica1 [Rn $replica1_id]

test "Broadcast message across a cluster shard while a cluster link is down" {
test "Broadcast message on a primary across a cluster shard while a cluster link is down" {
set replica1_node_id [$replica1 CLUSTER MYID]

set channelname ch3
Expand All @@ -94,7 +94,7 @@ start_cluster 1 2 {tags {external:skip cluster}} {
# Verify number of links with cluster stable state
assert_equal [expr [number_of_peers $primary_id]*2] [number_of_links $primary_id]

# Disconnect the cluster between primary and replica1 and publish a message.
# Disconnect the cluster link from primary to replica1 and publish a message.
$primary MULTI
$primary DEBUG CLUSTERLINK KILL TO $replica1_node_id
$primary SPUBLISH $channelname hello
Expand All @@ -113,7 +113,8 @@ start_cluster 1 2 {tags {external:skip cluster}} {
# Publish a message afterwards.
$primary SPUBLISH $channelname world

# Verify replica1 has received only (world) / hello is lost.
# Verify replica1 has received both (hello/world), irrespective of the cluster link health.
assert_equal "smessage ch3 hello" [$subscribeclient1 read]
assert_equal "smessage ch3 world" [$subscribeclient1 read]

# Verify replica2 has received both messages (hello/world)
Expand Down
34 changes: 15 additions & 19 deletions tests/unit/cluster/sharded-pubsub.tcl
Expand Up @@ -5,13 +5,19 @@ start_cluster 1 1 {tags {external:skip cluster}} {
set primary [Rn $primary_id]
set replica [Rn $replica1_id]

test "Sharded pubsub publish behavior on a primary" {
assert_equal 0 [$primary spublish ch1 "hello"]
}

test "Sharded pubsub publish behavior on a replica" {
assert_error "*MOVED*" {$replica spublish ch1 "hello"}
}


test "Sharded pubsub publish behavior within multi/exec" {
foreach {node} {primary replica} {
set node [set $node]
$node MULTI
$node SPUBLISH ch1 "hello"
$node EXEC
}
$primary MULTI
$primary SPUBLISH ch1 "hello"
$primary EXEC
}

test "Sharded pubsub within multi/exec with cross slot operation" {
Expand All @@ -29,10 +35,9 @@ start_cluster 1 1 {tags {external:skip cluster}} {
$primary EXEC
} {0 {}}

test "Sharded pubsub publish behavior within multi/exec with read operation on replica" {
test "Sharded pubsub publish behavior within multi/exec on replica" {
$replica MULTI
$replica SPUBLISH foo "hello"
catch {[$replica GET foo]} err
catch {[$replica SPUBLISH foo "hello"]} err
assert_match {MOVED*} $err
catch {[$replica EXEC]} err
assert_match {EXECABORT*} $err
Expand All @@ -44,13 +49,4 @@ start_cluster 1 1 {tags {external:skip cluster}} {
$primary SET foo bar
$primary EXEC
} {0 OK}

test "Sharded pubsub publish behavior within multi/exec with write operation on replica" {
$replica MULTI
$replica SPUBLISH foo "hello"
catch {[$replica SET foo bar]} err
assert_match {MOVED*} $err
catch {[$replica EXEC]} err
assert_match {EXECABORT*} $err
}
}
}