Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When a topic is deleted, connected clients should have their connection closed #3836

Open
digikata opened this issue Jan 25, 2024 · 10 comments
Labels
enhancement New feature or request help wanted Good issue for community involvement SPU SPU related StreamController Streaming Controller technical debt

Comments

@digikata
Copy link
Contributor

digikata commented Jan 25, 2024

Both producers and consumers should have their connections closed. This issue mainly affects consumers streaming records from a topic.

Preferably with a specific error (TopicDeleted or TopicNotAvailable).

Setup:

$ fluvio cluster create
$ fluvio topic create delete-me
$ fluvio consume delete-me

Action:

$ fluvio topic delete delete-me

Expectation:

# previous fluvio consume (cli or api)
$ fluvio consume delete-me
...
Connection closed: TopicDeleted

The SPU is already notified for topic deletes from the SC because it has to do work to delete the topic. Any connections associated with that topic need to be closed down by the SPU.

This is an task of intermediate difficulty, mentoring from Infinyon is available if you are interested in working on this issue.

@digikata digikata added enhancement New feature or request help wanted Good issue for community involvement SPU SPU related StreamController Streaming Controller labels Jan 25, 2024
@jdafont
Copy link
Contributor

jdafont commented Feb 10, 2024

Howdy, thanks for the assistance so far in Discord for helping me figure my way through this. So far, a viable solution to me is looking like the following, but would love to get some thoughts/gut checks before implementing:

In crates/fluvio-spu/src/services/public/stream_fetch.rs async fn process (...) -> Result<(), StreamFetchError>, we do a select! in a loop. Here, we could listen to a new ReplicaDeleted event. When this event is selected, we check if the replica deleted matches our own, and if so we can simply return a StreamFetchError::Fetch(...) with a new appropriate error code like ReplicaDeleted with the appropriate error message. When the error is returned, send_back_error is called. I think this might be enough to terminate the connection, but it's not totally obvious to me because we return Ok(()) here, which bubbles up to a loop at crates/fluvio-spu/src/services/public/mod.rs async fn respond(...).

The next question is how to emit the event whenever the SC sends over a deleted topic notification. Both the consumer connection and SC dispatcher connections have access to the shared global context ctx. The dispatcher processes replica changes and eventually works it's way down into crates/fluvio-spu/src/core/global_context.rs async fn apply_replica_actions(...). Here, we process replica deletions for both followers and leaders. Currently, it seems like only leaders handle file stream fetch requests. However, in either case, we can emit the replica deleted signal whenever we process the replica deletion typically.

Sorry for the wall of text! Let me know if that sounds like a good plan and if I should continue with an implementation.

@sehz
Copy link
Contributor

sehz commented Feb 10, 2024

Just to clarify, there are two I/O to fluvio cluster. First is to SC which manages metadata and second to SPU which handles actual consumer/producer. SC should not be handling SPU connection. It is responsibility of SPU to manage its own connection. This is fundamentally how Fluvio works.

Here's suggestion; just add new ErrorCode variant. In the SPU,ScDispatcher can expire OffsetChangeLister which is used by StreamFetchHandler. Then StreamFetchHandler can detect then and send appropriate ErrorCode. This should work because consumer always talks to leader who manages replica.

Thanks for working on this, this is not trivial excercise.

@jdafont
Copy link
Contributor

jdafont commented Feb 10, 2024

Understood on the two I/O to Fluvio. I appreciate the suggestion and would like a little clarification, if possible. I'm not entirely sure what you mean by expiring the OffsetChangeListener. I suppose I am interpreting that to mean something like changing the listener to return a Result<i64, ErrorCode> instead of just the i64 as it is now. Then, it would be quite simple to return the appropriate error code.

It does not seem so easy to figure out which OffsetChangeListener to expire from the perspective of ScDispatcher, because the OffsetChangeListener lives, ultimately, in the ConnectionContext which gets created for each consumer connection, which the ScDispatcher has no real reference to.

@sehz
Copy link
Contributor

sehz commented Feb 11, 2024

It would be challenge to change OffsetChangeListener. Instead, it could return some negative number or other to indicate end of stream. Similar ideas to other stream

@jdafont
Copy link
Contributor

jdafont commented Feb 12, 2024

I posted a PR to explore these ideas and would appreciate some feedback to get it up to standards. I still have tests to add around this feature and would appreciate some guidance on what's required to be acceptable. The current solution registers connections (OffsetPublishers) with the shared leader state. This allows the ScDispatcher to trivially signal a new offset value that represents an error (TopicDelete = -2). One drawback to this is that the SharedLeaderState will accumulate (and, worse, keep alive) OffsetPublisher objects resulting in a small memory-leak for each connection. Perhaps I can downgrade these to weak Arcs to alleviate that problem. Anyways, I got it working locally and would love some feedback!

#3861

@sehz
Copy link
Contributor

sehz commented Feb 13, 2024

Provide comments. Using a cleaner scheduler can be used to clean up stale connections. You can take a look cleaner.rs in storage crate as example

@jdafont
Copy link
Contributor

jdafont commented Feb 13, 2024

Appreciate the comments! I'll take a look at that and will knock out the tests + changes over the next day or two hopefully.

@jdafont
Copy link
Contributor

jdafont commented Feb 15, 2024

Okay so with the merged PR (#3861), the consumer side of things is taken care of. Now, maybe I can look into the producer side of things. Also, the error message printed isn't completely the same as the expected message described, so I can look into changing the consumer CLI command to produce that error as expected (instead of "the topic is deleted").

If there's some other higher priority, I'd be happy to work on that as well.

@ajhunyady
Copy link
Contributor

Nice work @jdafont, and thank you for looking into the producer as well.

Copy link

Stale issue message

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Good issue for community involvement SPU SPU related StreamController Streaming Controller technical debt
Projects
None yet
Development

No branches or pull requests

5 participants