Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable from async function #2551

Open
DarkSatyr opened this issue Oct 23, 2023 · 1 comment
Open

Observable from async function #2551

DarkSatyr opened this issue Oct 23, 2023 · 1 comment

Comments

@DarkSatyr
Copy link

Short description of missing functionality:

Starting from RxSwift 6.5 there is a very helpful extension function to AsyncSequence asObservable() which helps to convert coroutine to Observable, but if I want to convert just async function to Observable I need to perform 2 steps: convert coroutine to Async[Throwing]Stream and then using asObservable() extension function convert sequence to Observable. While most of courotines are just 'singles' by nature I need to use take(1) operator after asObservable() or it won't complete at all (in case of unfolding constructor). The other option to use constructor with continuation, but I need to write a bunch of imperative-style code to produce terminative Async[Throwing]Stream. My proposal to add Observable.create factory method which accepts coroutine as an input and produce an Observable

Short code example of how you would like to use the API:

extension Observable {
 static func create(asyncFunc: @escaping () async throws -> Element) -> Observable<Element> {
   Observable.create { observer in
     let task = Task {
       do {
         let value = try await asyncFunc()
         observer.onNext(value)
         observer.onCompleted()
       } catch {
         observer.onError(error)
       }
     }
     return Disposables.create { task.cancel() }
   }
 }
}

The reason why I need this functionality:

This will simplify conversion and eliminate explicit use of Swift Streams

Code I have right now:

AsyncThrowingStream(unfolding: asyncFunc)
  .asObservable()
  .take(1)

or

 AsyncThrowingStream { continuation in
     Task {
         let result = try await provider.fetchSomeData(account: "")
         continuation.yield(result)
         continuation.finish()
     }   
    }
    .asObservable()
@jjatie
Copy link

jjatie commented Jan 10, 2024

asyncFunc must be an @Sendable closure to enable Strict Concurrency checking

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants