Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: Stream select #577

Open
dermesser opened this issue Jul 8, 2023 · 6 comments
Open

Feature request: Stream select #577

dermesser opened this issue Jul 8, 2023 · 6 comments
Labels
enhancement New feature or request

Comments

@dermesser
Copy link

Thank you very much for your work on eio! I've thoroughly enjoyed working with it, despite being a newcomer to OCaml. Eio feels more advanced than the async frameworks in other languages I've used.

So far, I have been missing one feature: Waiting on multiple streams, which in languages like Go or Rust is called select. I've taken a look at the source code, but as a newbie I didn't want to charge ahead with some untenable solution (and besides, any possible implementation appeared non-trivial to me).

I don't have a specific API in mind, but something simple like

val select : ('a Stream.t * ('a -> 'b)) list -> 'b

to select between different channels producing items of the same type and permitting per-stream handling, or

val select2 : 'a Stream.t -> 'b Stream.t -> ('a, 'b) Either.t

for selecting between exactly two streams may already prove to be useful. I'd love to hear what you think.

@anmonteiro
Copy link
Contributor

Can you implement that in terms of Fiber.first and Stream.take?

@dermesser
Copy link
Author

Can you implement that in terms of Fiber.first and Stream.take?

That's clever! I hadn't thought of this at all, but it obviously makes the implementation trivial. I will try to prepare a PR to see if it is met with interest.

Unfortunately the use case of select2 described above appears to not be possible, though (it'd need a combination of Fiber.pair and Fiber.first, IIUC)

@talex5
Copy link
Collaborator

talex5 commented Jul 9, 2023

Something like this needs to be atomic, which Fiber.first doesn't support.

The easy solution is to use kcas queues, which can be composed easily.

However, it would also be nice to have direct support in Eio.Stream too. The general idea is that you create an Atomic to hold the result and then CAS that to the value. There are two implementations of Stream, depending on whether it is zero-capacity or not. For the zero-capacity case (sync.ml) there is already support for rejecting a value (causing the producer to retry with the next client). For the non-zero-capacity case, you have a lock and can just decide not to pop the value, I think.

@dermesser
Copy link
Author

Thank you for the quick reply! I've been looking around a bit, mostly out of interest, and some questions came up. Keep in mind that it's likely I'm just missing your point here.

The general idea is that you create an Atomic to hold the result and then CAS that to the value.

I've interpreted this to mean that, in order to keep within the framework that there is, we'd still have one fiber for each stream, but an additional atomic to make sure exactly one item is returned and no item is lost. For now, I'm only focused on the Locking implementation. So kind of like this?

(* WARNING: this is NOT real code, it WILL lead to deadlocks *)
  let select streams =
    let result = Atomic.make None in
    (* compare_and_set works for options, I guess? At least as long as we're comparing None to None. *)
    let place_result r = Atomic.compare_and_set result None r in
    let wait_for stream () = begin
      let item = take stream in
      if place_result item
      then ()  (* This channel was the first to receive an item. Return. *)
      (* This channel was not first. Place item back into stream. *)
      (* This will block this fiber indefinitely f the stream
         has been filled up in the meantime and there are no other readers! *)
      else add stream item
    end in
    let spawn_fibers sw = Fiber.any (List.map (fun stream -> fun () -> Fiber.fork ~sw (wait_for stream)) streams) in
    Switch.run spawn_fibers;
    Atomic.get result

For the non-zero-capacity case, you have a lock and can just decide not to pop the value, I think.

It appears that this is tricky to implement. One way to extend the code above is to implement a peek functionality for the Locking stream. In the case where we have to wait for an item to be added to the channel, the item will skip the queue and is directly transmitted through the Waiter.t.

At that point, the item is already in our hands, and we have to do something with it. Assuming another stream has already yielded an item and called place_result above, we can't return it, nor can we rely on the stream's queue having free capacity to add it back. It seems to me that the "skipping the queue" part, transmitting the item through a Waiter.t, makes our life more difficult here.

I've skipped the Sync kind of stream for now, this is just what I learned after messing around a bit.

@talex5
Copy link
Collaborator

talex5 commented Jul 12, 2023

I've interpreted this to mean that, in order to keep within the framework that there is, we'd still have one fiber for each stream, but an additional atomic to make sure exactly one item is returned and no item is lost.

No, there shouldn't be a need for any extra fibers. The taking fiber suspends itself and then registers with each stream being watched. When woken, it tries to store the result but if it fails, it rejects the value instead.

It appears that this is tricky to implement. One way to extend the code above is to implement a peek functionality for the Locking stream. In the case where we have to wait for an item to be added to the channel, the item will skip the queue and is directly transmitted through the Waiter.t

Yes, I think this needs to be changed. Probably Waiters would need to be modified (e.g. maybe waiter.finished could hold the value instead? and the same waiter can be on multiple queues?).

@talex5
Copy link
Collaborator

talex5 commented Jul 12, 2023

BTW, one reason I haven't tried this already is that we might want to replace the use of Waiters with Cells here first anyway.

dermesser added a commit to dermesser/eio that referenced this issue Jul 12, 2023
dermesser added a commit to dermesser/eio that referenced this issue Jul 12, 2023
dermesser added a commit to dermesser/eio that referenced this issue Jul 12, 2023
dermesser added a commit to dermesser/eio that referenced this issue Jul 12, 2023
@talex5 talex5 added the enhancement New feature or request label Jul 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants