Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement stream pubsub. #11726

Closed

Conversation

CharlesChen888
Copy link
Contributor

@CharlesChen888 CharlesChen888 commented Jan 17, 2023

I am working on implementing stream pubsub, to resolve #10809. It is mostly done and I'd like to publish it to see if I missed anything or maybe there are some better ideas.

Original Issue

The main problem is that while using XREAD or XREADGROUP to remotely consume a stream, this pull mode has poor performance with twice network delay (consumer sends the command to server, then server returns the data to consumer).

If a push mode like PUBSUB is implemented in stream, we can half the network delay by actively pushing the message, without consumer sending commands.

Also, a reverse request to persist PUBSUB data to stream was already raised before.

Design

One direct approach is to copy sharded PUBSUB's code into stream's logic. However, handling keys is much more complicated than handling sharded channels, as we need to notify the client when the subscribed key is deleted, moved or migrated, etc.

Since XREAD/XREADGROUP has the block option, we can implement this feature by cyclically calling XREAD/XREADGROUP BLOCK for the subscribed client on the server side. But we can't actually block the client, so the solution is to create a mechanism parallel to the command block.

  1. When a client sends a subscribe command, the server adds the key and the client to a dict, just like calling blockForKeys.
  2. Then when some new data is added to the key (XADD in this PR), or when the key is deleted or expired, it awakens the subscribed key, adding it to a "ready_list".
  3. Finally all the awakened keys are handled after the call function and in beforeSleep.

Command

When XADD is called, and when there is at least one subscriber, the new added messages will automatically be published.

XSUBSCRIBE key [key ...]
XSUBSCRIBEGROUP group consumer numkeys key [key ...] [NOACK]
XUNSUBSCRIBE [key [key ...]]

Data Structure

Just like command block.

In redisDb:

dict *subscribed_keys;
dict *publishing_keys;

In redisServer:

list* publishing_keys;

In client:

dict *subscribed_keys;
long subscribed_key_cnt;

A streamSubscriber struct to record subscription info:

To record the group and the consumer the client is using, create a new streamSubscriber and use two pointers to store the group and consumer the client is using (NULL if it is XSUBSCRIBE), and use a flag to store NOACK and to mark a streamSubscriber invalid (when the group or the consumer is deleted).

/* A structure recording each subscriber's information, including group, consumer and flags for noack
   and invalid mark. */
typedef struct streamSubscriber {
    client* c;
    streamCG* group;
    streamConsumer* consumer;
    int flags;
} streamSubscriber;

And in stream:

The subscribers and group_subscribers here is where the data is actually stored.

The client_to_subscriber is only an index, which is used for quick judging whether a client is already subscribed to a stream, and to find the listNode when a client is going to unsubscribe a stream.

To store the streamIDs that need to be published, we store the start and the end ID.

list *subscribers;      /* List of subscribers subscribed by using XSUBSCRIBE. */
dict *group_subscribers;    /* Dict of lists of subscribers subscribed by using XSUBSCRIBEGROUP. */
dict *client_to_subscriber; /* An index for subscribers list, including those in groups.
                               We use client id as key, and value points to the listNode
                               holding the corresponding streamSubscriber structure. */
streamID pub_start;     /* The first ID that need to be published. */
streamID pub_end;       /* The last ID that need to be published. */

In streamCG:

The subscribers in streamCG is only a pointer. It is used for traversing all subscribers in a group.

dictEntry *subscribers; /* A pointer to the dictEntry whose value is the list of 
                           subscribers subscribed as a consumer in this group. */
listNode *receiver;     /* The subscriber to which the next group publish message
                           should be sent. */

Subscription/Publication Details

Outside of stream structure, everything is just like command block, except that we don't have "unblock_on_nokey" dict, since it is designed to unsubscribe all clients when the key is deleted.

Inside of the stream structure, things are a little bit complicated.

To subscribe a stream:

  1. add the key to the client's dict
  2. add the key and client to DB's dict
  3. add a streamSubscriber to the stream's list (XSUBSCRIBE) or dict (XSUBSCRIBEGROUP).

To publish a message:

  1. stream->subscribers will firstly be traversed to publish to all clients subscribed using XSUBSCRIBE
  2. stream->group_subscribers is traversed to unsubscribe all invalid subscribers, then for each group, the message will be published to one of the subscribers. Currently, clients in a group will receive the publish message by turns, no load balancing is considered.

Deleted/Moved/Overwritten Streams

Will signalPublishStream just like signalDeletedKeyAsReady in block.

Deleted Group Or Consumer

Will mark the subscribers using the group/consumer as invalid. Then signalPublishStream, and unsubscribe them after call or in beforeSleep.

Reacquire Lost Message

XSUBSCRIBEGROUP and XADD will have the same results as XREADGROUP, so messages without ACKs will be recorded in PEL, and clients can use XPENDING and XREAD to reacquire them.

Mixed Use of Regular Consumer and Group Subscriber

You can consider group subscribers as a special kind of regular consumer, that will XREADGROUP a new message once it is added to stream, so regular consumers will not get anything if there is a subscriber in the same group unless you put XADD and XREADGROUP in the same queue of MULTI.

So still, the group will deliver each ID only once, and each subscriber/consumer will not get any same message.

Yet Still Need To Be Done

  • cluster redirect: clients subscribed to migrated streams need to be sent a redirection error
  • swapdb: unsubscribe all clients when DB is swaped?
  • redis-cli: automatically send back XACK in stream pubsub mode?
  • add some fields in XINFO command's return
  • TCL tests

How To Use

Suppose there is a stream "a" with a group "g", using redis-cli, we can:

127.0.0.1:6379> xsubscribe a

And the following information will be returned:

1) "xsubscribe" // a fixed line
2) "a"          // key
3) (integer) 0  // dbnum
4) (integer) 1  // currently subscirbed stream num

Using another client to publish:

127.0.0.1:6379> xadd a publish * 1 2

And the following information will be reveived by the subscriber:

1) "xmessage"              // a fixed line
2) "a"                     // key
3) (integer) 0             // dbnum
4) 1) 1) "1673945541549-0" // published stream content
      2) 1) "1"
         2) "2"

When MULTI is used, multiple messages will be merged as one:

127.0.0.1:6379> multi 
OK
127.0.0.1:6379(TX)> xadd a publish * 3 4
QUEUED
127.0.0.1:6379(TX)> xadd a publish * 5 6
QUEUED
127.0.0.1:6379(TX)> exec
1) "1673945563138-0"
2) "1673945563138-1"
1) "xmessage"
2) "a"
3) (integer) 0
4) 1) 1) "1673945563138-0"
      2) 1) "3"
         2) "4"
   2) 1) "1673945563138-1"
      2) 1) "5"
         2) "6"

Similarly, using XSUBSCRIBEGROUP:

127.0.0.1:6379> xsubscribegroup g c 1 a 

And the following information will be returned:

1) "xsubscribegroup" // a fixed line
2) "a"               // key
3) (integer) 0       // dbnum
4) "g"               // group name
5) "c"               // consumer name
6) (integer) 1       // currently subscirbed stream num

@ranshid
Copy link
Collaborator

ranshid commented Jan 29, 2023

@CharlesChen888, this seems like you put up a lot of work and thought into this - thank you!
I only briefly went through your suggestion and I would like to ask:

  1. I think in #10809 disscussion it was discussed to supply an ID to enable subscribers to keep reading from the last point read so far, this is true for both XREAD and Group consumers to be able to make sure to not miss any msg (otherwise what differentiates this from sharded pub/sub?)
  2. I am not that clear as to how a a mixup of subscribers and regular group consumers work together? will the group keep it's current functionality to deliver each ID only once? will subscribers be able to access PEL massages?
  3. I agree this does not qualify as a "blocking" command, as the client is basically not blocked but is in "subscriber mode" but maybe we can work to simplify the code or reuse some of the blocking framework?

@CharlesChen888
Copy link
Contributor Author

CharlesChen888 commented Jan 30, 2023

@ranshid

  1. In my design, I try to make stream pubsub is very similar to sharded pubsub, so that it is easier to code. If we supply an ID, then XSUBSCRIBE(GROUP) command would also do the job of XREAD(GROUP). I prefer to separate them: the server actively feeds a subscriber with only messages arrived after subscription, and the subscriber can get messages before subscription by actively sending XREAD (which can't be done with sharded pubsub). If a subscriber wants to be sure it does not miss any message, it should use XSUBSCRIBEGROUP, as all published messages will be recorded in PEL (if not using NOACK) before ACKed.
  2. You can consider group subscribers as a special kind of regular consumer, that will XREADGROUP a new message once it is added to stream, so regular consumers will not get anything if there is a subscriber in the same group. So yes, the group will deliver each ID only once, and yes, subscribers are able to access PEL massages. Howerver, if you put XADD and XREADGROUP in the same transaciton, the message will be read "twice", by XREADGROUP and by publish, since publish is done after call. But this is meaningless, as it is making producer to consume messages by itself.
  3. Actually I tried to reuse some of the blocking framework, but it did not save a lot of code. The main trouble is from inside the stream.

@ranshid
Copy link
Collaborator

ranshid commented Jan 30, 2023

  1. I think that this is missing the point in some way. the sharded pubsub provides sufficient way to achieve that IMO. The way I see the best practice is that a disconnected subscriber will re-subscribe with the last ID he consumed. You solution has some benefit in case of consuming stream groups on the replicas (since we do not change the group metadata) but I suspect it still makes the stream subscriber less usable. It is true that a disconnected subscriber will be able to consume first with XREAD, but it will be difficult for it to understand till what point to XREAD especially with XREADGROUP. I think what you are proposing is more a stream "eavesdropper" which is fine but providing the top comment suggest that this is mainly providing a way to avoid the roundtrip I wonder if having each subscriber subscribe with ID is such a bad idea? subscriber could use the ">" in order to get the same behavior as you suggesting here.

  2. I think that we should aim for a consistent behavior. if we look at subscribers the same as a a regular stream consumer then it should follow the same rules of consumption. IMO a subscriber should be the same as an XREAD/XREADGROUP NOACK client

@itamarhaber WDYT?

  1. I think we can make some changes to the blocking infrastructure such as exposing a different API to be registered as key-listener so that the overall fairness will remain for all key-events subscribers, it is true that no command-reprocessing" is needed here and that a subscriber is not blocked, but I find it might provide a better infrastructure.
    @guybe7 @madolson WDYT?

@CharlesChen888
Copy link
Contributor Author

@ranshid
Let's say we have a stream s with IDs from 1 to 100, what happens if a client sends XSUBSCRIBE s 10 (10 is the ID)? Should the server return messages from 10 to 100 immediately?

@ranshid
Copy link
Collaborator

ranshid commented Jan 30, 2023

@ranshid Let's say we have a stream s with IDs from 1 to 100, what happens if a client sends XSUBSCRIBE s 10 (10 is the ID)? Should the server return messages from 10 to 100 immediately?

IMO yes it should this is the same as will be with XREAD right? we might want to think of a way to limit the max bulk number of massages in order to prevent OOM killing the client, but I still think that a subscribing client should be able to consume massages from the last point it stopped.

@CharlesChen888
Copy link
Contributor Author

@ranshid
OK, subscribing with ID seems more reasonable. We may also use an option to control whether to return all missed messages or just to return an ID range, so that the subscriber can use XRANGE to get them.

@hpatro hpatro added the streams label Feb 14, 2023
@hpatro
Copy link
Collaborator

hpatro commented Feb 14, 2023

Couldn't dive deep into the code yet. I had few thoughts about the high level features.

  • Pub/Sub Interaction This feature should work well with existing Pub/Sub interaction i.e. allow cross compatiblilty between the consumption of stream based pubsub data and non stream based (pubsub channels) pubsub data. Otherwise, the client connection would be restricted to only streams based data and in turn maintain more no. of connections.
  • Client Output Buffer should also adhere to client-output-buffer-limit pubsub. It should be bounded by the config or else it could affect the stability.
  • Subscription Phases The volume of data output can't be controlled by the clients in this mechanism as opposed to the pull based model as it has COUNT to control the data consumption. There has to be two phases to data transmission to the client i.e. BOOTSTRAP and INCREMENTAL.
    • BOOTSTRAP phase is to publish the data out which already exists on the stream. This could be a large dataset. Hence, during this phase we need to push data incrementally into the client buffer and avoid overwhelming it. It needs to adhere to a certain threshold (clientBufferLimitsDefaults), once the output buffer grows to that size, message publishing is stopped. We could introduce a cron to publish data to the pending subscription list via XSUBSCRIBE.
    • INCREMENTAL phase should enter PUBSUB mode to utilize the infrastructure already exists around PUBSUB. This would allow a combination of various subscription (SUBSCRIBE, SSUBSCRIBE, PSUBSCRIBE) within this mode via the single connection.
  • Client disconnection: Client should maintain the cursor on their side and provide it to resume the data read from that offset.

@CharlesChen888
Copy link
Contributor Author

@hpatro

  • Pub/Sub Interaction I am trying to make stream pubsub more of an isolated system (subscription to keys). So when client wants to consume a stream, it subscribes to a key of stream type, and when it wants to consume normal pubsub data, it subscribes to a channel. Both will work as long as the protocol can distinguish two kinds of data.
  • Client Output Buffer Yes I agree with your suggestion.
  • Subscription Phases For BOOTSTRAP phase, I think we can leave the problem to client side, which is sending an ID range of existing data to the client when it starts to subscribe, and let the client decide when and how to read them (using XREAD or XRANGE).
  • Client disconnection It is a good suggestion for client side. But if a client don't implement this, it can use XSUBSCRIBEGROUP (just like XREADGROUP), which will keep a PEL for messages that have not received ACK. And when client reconnects to server, it can use XPENDING to read those missed data.

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@madolson
Copy link
Contributor

@CharlesChen888 Someone at AWS asked if you would re-open this against Valkey.

@CharlesChen888
Copy link
Contributor Author

CharlesChen888 commented Apr 24, 2024

Someone at AWS asked if you would re-open this against Valkey.

@madolson What is the use case of stream pubsub in AWS?

The implementation here, after some discussion among me and my colleagues, turns out to be not as good as we thought. I'd love to redo this and push to Valkey if I have time.

An issue targeting to implement this is proposed in Valkey valkey-io/valkey#360

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

[NEW] Support pubsub subscribe to a Stream with consumer groups
5 participants