Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEW] Implement stream PUBSUB #360

Open
CharlesChen888 opened this issue Apr 24, 2024 · 12 comments
Open

[NEW] Implement stream PUBSUB #360

CharlesChen888 opened this issue Apr 24, 2024 · 12 comments
Labels
major-decision-pending Needs decision by core team

Comments

@CharlesChen888
Copy link
Member

CharlesChen888 commented Apr 24, 2024

The problem/use-case that the feature addresses

The main problem is that while using XREAD or XREADGROUP to remotely consume a stream, this pull mode has poor performance with twice network delay (consumer sends the command to server, then server returns the data to consumer).

If a push mode like PUBSUB is implemented in stream, we can half the network delay by actively pushing the message, without consumer sending commands.

Description of the feature

Here is a draft design of the commands this feature may have.

Subscribe and unsubscribe:

XSUBSCEIBE key [key ...] id [id ...]
XSUBSCRIBEGROUP GROUP group consumer [NOACK] STREAMS key [key ...] id [id ...]
XUNSUBSCRIBE [key [key ...]]

Just like XREAD, when a client XSUBSCRIBE to a stream, server returns every message after id immediately, and all messages added to the stream afterwards, util the client unsubscribes. id can be "$", just like XREAD, and the server will only send messages newly added to the stream.

And just like XREADGROUP, XSUBSCRIBEGROUP puts the client into a subscription group as a consumer, and a new message will only be sent to one of the consumers of each group. ACKs and PEL works just as the same when using XREADGROUP.

Publish:

We can still use XADD, and by default the newly added messaged will be pushed to subscribers.

Alternatives you've considered

Another similar idea is persisting PUBSUB data to stream, which may require each PUBSUB message to have an ID.

Additional information

Former issue and PR in Redis: redis/redis#10809 redis/redis#11726
Any suggestion about the design or how to implement this is welcomed.

@ranshid
Copy link
Contributor

ranshid commented Apr 30, 2024

@CharlesChen888 thank you very much for this very important proposal!

some point I would like to share:

The main problem is that while using XREAD or XREADGROUP to remotely consume a stream, this pull mode has poor performance with twice network delay (consumer sends the command to server, then server returns the data to consumer).

While I agree the multiple round trips introduces higher latency for the clients and would like to add that is also causing extra server work. when many clients are blocked XREADing from a stream they will be unblocked and will then issue again at the XREAD which will lead to a burst of XREAD after some XADDs which is also problematic in terms of server CPU and general latency.

If a push mode like PUBSUB is implemented in stream, we can half the network delay by actively pushing the message, without consumer sending commands.

I would like to extend the discussion of that part. Do we plan to move XSUBSCRIBERs to be in SUBSCRIBER mode (like regular SUBSCRIBE)? I think that given the fact that we would like this to be supported on resp2 connected clients we should.
This would, however, need to be considered when we plan to introduce the XSUBSCRIBEGROUP. I think that it is important to support clients re-subscribing to streams, so in order to support that for XSUBSCRIBEGROUP we will probably have to add XACK to this subscriber mode.

And just like XREADGROUP, XSUBSCRIBEGROUP puts the client into a subscription group as a consumer, and a new message will only be sent to one of the consumers of each group.

What is the proposed algorithm here? will we do a simple round robin or maybe add a COUNT argument to the XSUBSCRIBEGROUP?

Regarding duplicate consumers - it is possible that many clients will subscribe as the same consumer? In case so I think that in order to correlate with the current XREADGROUP behavior they will not get the same msg twice ( a message will only be pushed to one of the clients) however when we subscribe using a specific ID (probably when a subscriber is re-connecting) messages will be duplicated? I have no good solution for that at this point but I think we should probably comeup with a consistent solution.

@zuiderkwast
Copy link
Contributor

I think that given the fact that we would like this to be supported on resp2 connected clients we should.

How important is it to support RESP2 clients actually? Let's consider requiring RESP3 for this?

If we can require RESP3 (error for RESP2), then we don't need to invent a magic channel name like we have for client tracking __redis__:invalidate (used only for RESP2 while RESP3 uses another format like push 1) "invalidate", 2) 1) "key").

@ranshid
Copy link
Contributor

ranshid commented Apr 30, 2024

How important is it to support RESP2 clients actually? Let's consider requiring RESP3 for this?

I think that most clients are still defaulting to resp2 and resp3 is not highly adopted at this point yet.

If we can require RESP3 (error for RESP2), then we don't need to invent a magic channel name like we have for client tracking redis:invalidate (used only for RESP2 while RESP3 uses another format like push 1) "invalidate", 2) 1) "key").

IIUC the suggestion was not to use a new pubsub channel, but rather use a new XSUBSCRIBE* API. I was simply suggesting that the client will probably be better put in a similar mode as pub/sub SUBSCRIBERs do (so it can send XUNSUBSCRIBE etc...)

@zuiderkwast
Copy link
Contributor

A problem with new SUBSCRIBE variants is that clients need to adapt to be aware of this. The [S|P]SUBSCRIBE commands troublesome because they don't have any in-band reply (in RESP3), just a push for each channel subscribed (or an error reply). Because of this, clients need to handle them separately so they don't get out of sync. (It gets complicated for async clients and pipelining.)

Another problem is that the payload of pubsub messages are just strings. I think we want to return a map of fields and values for the stream entry. Do we need a new "xmessage" type where the data is structured?

1) xmessage
2) stream1
3) 121345678-1
4) 1) field1
   2) value1
   3) field2
   4) value2

I think there are enough problems with pubsub...

@ranshid
Copy link
Contributor

ranshid commented Apr 30, 2024

A problem with new SUBSCRIBE variants is that clients need to adapt to be aware of this. The [S|P]SUBSCRIBE commands troublesome because they don't have any in-band reply (in RESP3), just a push for each channel subscribed (or an error reply). Because of this, clients need to handle them separately so they don't get out of sync. (It gets complicated for async clients and pipelining.)

I am not sure how we can workaround this one. For example one alternative could have been to give XREAD/XREADGROUP the ability to set a flag indicating it will never stop and be re blocked after each time it will consume all the messages. This would be bad since the only way a client can control state and resets is by disconnecting. IMO the best way to manage a client which is subscribed to a stream is having it placed in a similar mode to a channel subscriber.
Note that we can set the massage format anyway we want, since this is not a "regular" subscriber (ie we do not use the publish API).

Another problem is that the payload of pubsub messages are just strings. I think we want to return a map of fields and values for the stream entry. Do we need a new "xmessage" type where the data is structured?

I am not sure I follow. AFAIK the pushed pubsub messages are array-reply type, so we can use array format to set different message fields in the published stream messages.

@zuiderkwast
Copy link
Contributor

AFAIK the pushed pubsub messages are array-reply type, so we can use array format to set different message fields in the published stream messages.

The pubsub message is an array of length 3 for [S]SUBSCRIBE and of length 4 for PSUBSCRIBE. The payload (the last element in this array) is a string. I know that clients check this (at least the array length) to know that it's a message and not something else.

1) "message"
2) "channel1"
3) "hello"
1) "pmessage"
2) "ch*"
3) "channel1"
4) "hello"

Do you think we can relax the type of the last element and let it be a nested structure?

@zuiderkwast
Copy link
Contributor

Most client libs do support RESP3, but it's optional to use it, is that right?

I think we have an opportunity to push forward RESP3 usage here. Clients will adapt if they need to.

If we avoid calling it pubsub, then we can have a command that returns +OK and a clean push message instead of inventing new variants of pubsub.

@zuiderkwast
Copy link
Contributor

On the other hand, we should go for the implementation with the least surprise. If it looks like a pubsub command, it should be a pubsub command.

This is very important bridging between streams and pubsub. It really fills a gap.

Can we decide first if we want this or not and later decide about the details?

@valkey-io/core-team Please think about it.

@zuiderkwast zuiderkwast added the major-decision-pending Needs decision by core team label Apr 30, 2024
@hwware
Copy link
Member

hwware commented May 3, 2024

Honestly said, I think this idea is not necessary.

  1. In Valkey (previous Redis), every command sent by client must go through the network twice -- client send the command, and sever response the command with message or error code. Thus, I do not quite understand how XREAD or XREADGROUP command are different from SET, HSET etc command.

  2. From the documentation, we can know, RESP2 client is allowed to subscribe a few commands, and for RESP3 clients, they are allowed to subscribe any command. It means if clients choose RESP3, XREAD or XREADGROUP is already over there. I do not think we need implement this again in a same software.

@ranshid
Copy link
Contributor

ranshid commented May 8, 2024

In Valkey (previous Redis), every command sent by client must go through the network twice -- client send the command, and sever response the command with message or error code. Thus, I do not quite understand how XREAD or XREADGROUP command are different from SET, HSET etc command.

I think the case of streams is somewhat different. When the application simply wants to read from a timestream, you might be right, but in cases application is looking for a more persistent messaging system is were things starts to be complicated. imagine you have many readers (lets say 1k) blocked reading data from the stream. After each XADD the readers will be unblocked and all will execute the (somewhat expensive) XREAD command again. they will all just get a single msg back and will all issue a storm of XREADs again and again. this causes many spikes of CPU every time the stream is written + it increase the latency clients have reading new msgs written to the stream while until they issued another XREAD.

From the documentation, we can know, RESP2 client is allowed to subscribe a few commands, and for RESP3 clients, they are allowed to subscribe any command. It means if clients choose RESP3, XREAD or XREADGROUP is already over there. I do not think we need implement this again in a same software.

I am not sure what exactly you meant here, but in case you are suggesting client can perform transactions of XADD+PUBLISH like:

multi
XADD mystream * message content
PUBLISH mystream-channel message content
exec

it is true. but it makes it very difficult for the subscribers to continue listening once they got disconnected. reader will have to read from the stream all the messages it missed when it was disconnected from the channel subscription and catchup to the point he matches the stream top and the channel.
I think this suggestion makes it much easier for clients to get push notifications from streams.

@zuiderkwast
Copy link
Contributor

zuiderkwast commented May 8, 2024

In Valkey (previous Redis), every command sent by client must go through the network twice

I want to add that this is not true for pubsub. But pubsub doesn't have the guarantees of streams. So users need to choose between the guaranteed delivery (streams) or better performance (pubsub). With this feature the can get both and I think it makes Valkey a very good and fast message broker.

@zuiderkwast
Copy link
Contributor

I want to revoke my previous idea to make it RESP3 only. It's better that it's a new pubsub variant.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
major-decision-pending Needs decision by core team
Projects
None yet
Development

No branches or pull requests

4 participants