Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SubFlow.asSources #32235

Open
jroper opened this issue Nov 14, 2023 · 1 comment
Open

SubFlow.asSources #32235

jroper opened this issue Nov 14, 2023 · 1 comment
Labels
discuss Tickets that need some discussion before proceeding. Not decided if it's a good idea. t:stream

Comments

@jroper
Copy link
Contributor

jroper commented Nov 14, 2023

I would like to work with a sub flow as a stream of sources. A use case for doing this is streaming a single big file into many small files. It's possible to do this today with prefixAndTail(0).map(_._2).concatSubstreams, however, the aesthetics of that are pretty bad, it's not obvious to the reader what is being achieved by that unless they are very familiar with the pattern already, and from a discovery point of view, when a developer thinks "I want to turn my sub flow into a flow of sources", and looks through the API to find a method that would help them do that, prefixAndTail does not stand out as the name of a method that would help them achieve that. I also can't find anything in the documentation that mentions this pattern of use for prefixAndTail.

Of course, SubFlow.to(Sink) does allow writing to multiple sinks, and hence multiple small files, but the problem with that is you lose the materialized values, so for example, if one of the sinks fail on completion, so that the file isn't saved, that signal is lost, and in most use cases, you need to know about that, and you will usually want to stop the parent stream in that case.

Here's some example code that demonstrates the use case:

val MaxFileSize = 1000000l

Flow[ByteString]
  .fold((ByteString.empty, 0l)){
    case ((last, count), bytes) =>
      (bytes, count + last.length)
  }.splitAfter {
    case (bytes, count) =>
      MaxFileSize - (count % MaxFileSize) >= bytes.length
  }.asSources
    .zipWithIndex
    .mapAsync(1) {
      case ((source, _), idx) =>
        source.to(FileIO.toPath(Paths.get(s"my-file-$idx")))
    }
@johanandren
Copy link
Member

Looks like a nice API addition to me from a first look. Behaviour might not be obvious though, splitAfter is easy, but the other way of ending up with a SubFlow is groupBy. I guess that'd means it'd have to be a merge since it would deadlock on a new group before the previous cancels or hits max substreams.

Maybe some other more specific operator factory for this use case would make more sense and be easier to understand?

@johanandren johanandren added t:stream discuss Tickets that need some discussion before proceeding. Not decided if it's a good idea. labels Nov 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discuss Tickets that need some discussion before proceeding. Not decided if it's a good idea. t:stream
Projects
None yet
Development

No branches or pull requests

2 participants