New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement stream pubsub. #11726
Implement stream pubsub. #11726
Conversation
219a3eb
to
5d7ff31
Compare
@CharlesChen888, this seems like you put up a lot of work and thought into this - thank you!
|
|
119f6e1
to
8c5c94b
Compare
@itamarhaber WDYT?
|
@ranshid |
IMO yes it should this is the same as will be with XREAD right? we might want to think of a way to limit the max bulk number of massages in order to prevent OOM killing the client, but I still think that a subscribing client should be able to consume massages from the last point it stopped. |
@ranshid |
Couldn't dive deep into the code yet. I had few thoughts about the high level features.
|
|
|
@CharlesChen888 Someone at AWS asked if you would re-open this against Valkey. |
@madolson What is the use case of stream pubsub in AWS? The implementation here, after some discussion among me and my colleagues, turns out to be not as good as we thought. I'd love to redo this and push to Valkey if I have time. An issue targeting to implement this is proposed in Valkey valkey-io/valkey#360 |
I am working on implementing stream pubsub, to resolve #10809. It is mostly done and I'd like to publish it to see if I missed anything or maybe there are some better ideas.
Original Issue
The main problem is that while using
XREAD
orXREADGROUP
to remotely consume a stream, this pull mode has poor performance with twice network delay (consumer sends the command to server, then server returns the data to consumer).If a push mode like PUBSUB is implemented in stream, we can half the network delay by actively pushing the message, without consumer sending commands.
Also, a reverse request to persist PUBSUB data to stream was already raised before.
Design
One direct approach is to copy sharded PUBSUB's code into stream's logic. However, handling keys is much more complicated than handling sharded channels, as we need to notify the client when the subscribed key is deleted, moved or migrated, etc.
Since
XREAD/XREADGROUP
has theblock
option, we can implement this feature by cyclically callingXREAD/XREADGROUP BLOCK
for the subscribed client on the server side. But we can't actually block the client, so the solution is to create a mechanism parallel to the command block.blockForKeys
.call
function and inbeforeSleep
.Command
When
XADD
is called, and when there is at least one subscriber, the new added messages will automatically be published.Data Structure
Just like command block.
In redisDb:
In redisServer:
In client:
A streamSubscriber struct to record subscription info:
To record the group and the consumer the client is using, create a new streamSubscriber and use two pointers to store the group and consumer the client is using (NULL if it is XSUBSCRIBE), and use a flag to store NOACK and to mark a streamSubscriber invalid (when the group or the consumer is deleted).
And in stream:
The
subscribers
andgroup_subscribers
here is where the data is actually stored.The client_to_subscriber is only an index, which is used for quick judging whether a client is already subscribed to a stream, and to find the listNode when a client is going to unsubscribe a stream.
To store the streamIDs that need to be published, we store the start and the end ID.
In streamCG:
The
subscribers
instreamCG
is only a pointer. It is used for traversing all subscribers in a group.Subscription/Publication Details
Outside of stream structure, everything is just like command block, except that we don't have "unblock_on_nokey" dict, since it is designed to unsubscribe all clients when the key is deleted.
Inside of the stream structure, things are a little bit complicated.
To subscribe a stream:
To publish a message:
Deleted/Moved/Overwritten Streams
Will
signalPublishStream
just likesignalDeletedKeyAsReady
in block.Deleted Group Or Consumer
Will mark the subscribers using the group/consumer as invalid. Then
signalPublishStream
, and unsubscribe them aftercall
or inbeforeSleep
.Reacquire Lost Message
XSUBSCRIBEGROUP
andXADD
will have the same results asXREADGROUP
, so messages without ACKs will be recorded in PEL, and clients can useXPENDING
andXREAD
to reacquire them.Mixed Use of Regular Consumer and Group Subscriber
You can consider group subscribers as a special kind of regular consumer, that will
XREADGROUP
a new message once it is added to stream, so regular consumers will not get anything if there is a subscriber in the same group unless you putXADD
andXREADGROUP
in the same queue ofMULTI
.So still, the group will deliver each ID only once, and each subscriber/consumer will not get any same message.
Yet Still Need To Be Done
How To Use
Suppose there is a stream "a" with a group "g", using redis-cli, we can:
And the following information will be returned:
Using another client to publish:
And the following information will be reveived by the subscriber:
When MULTI is used, multiple messages will be merged as one:
Similarly, using XSUBSCRIBEGROUP:
And the following information will be returned: