Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass Finagle Future into FuturePool Threads #862

Open
politrons opened this issue Jun 27, 2020 · 7 comments
Open

Pass Finagle Future into FuturePool Threads #862

politrons opened this issue Jun 27, 2020 · 7 comments

Comments

@politrons
Copy link

politrons commented Jun 27, 2020

Basically I have the problem that Finagle by default use an ExecutionContext with same number of threads as cores I have in my machine. And with block operations that number is not enough for a good performance and I would like to use another ExecutionContext with more Threads.

Here what I tried so far

  private val executorService = Executors.newFixedThreadPool(100)
  private val pool: ExecutorServiceFuturePool = FuturePool(executorService)

  private def shiftThread[T](response: Future[T]): Future[T] = {
    val shifted = Promise.interrupts[T](response)
    response.respond { t =>
      log.info(null, s"FINAGLE RESPONSE $t IN THREAD ${Thread.currentThread().getName}")
      pool(shifted.update(t))
    }
  }

But the only thing that I achieve is to move the response from the request in the FuturePool what I would like to do is make the request in the FuturePool already

Here as you can see the request/response is done in the finagle/netty thread pool

FINAGLE RESPONSE Return(Response("HTTP/1.1 Status(200)")) IN THREAD finagle/netty4-1-10

@politrons politrons changed the title Pass Twitter Future into FuturePool Threads Pass Finagle Future into FuturePool Threads Jun 29, 2020
@andrievsky
Copy link

andrievsky commented Jun 29, 2020

Hi,
if you would like Netty to map underling request processing to custom thread pool executor probably there's a way, however I'm not sure what you're hoping to achieve. Maybe this example with and without FuturePools could help:

Before

  val service = new Service[http.Request, http.Response] {
    def apply(req: http.Request): Future[http.Response] =
      
     doSomeHeavyLifting() // Blocks Netty thread

      Future.value(  
        http.Response(req.version, http.Status.Ok)
      )
  }
...

After

  val service = new Service[http.Request, http.Response] {
    def apply(req: http.Request): Future[http.Response] =
      FuturePools.unboundedPool().apply(
          
          doSomeHeavyLifting() // Blocks pool thread

          http.Response(req.version, http.Status.Ok)
      );
  }
...

@politrons
Copy link
Author

Hi for the answer. Basically I have the problem that Finagle by default use an ExecutionContext with same number of threads as cores I have in my machine. And with block operations that number is not enough for a good performance and I would like to use another ExecutionContext with more Threads.

Here what I tried so far

  private val executorService = Executors.newFixedThreadPool(100)
  private val pool: ExecutorServiceFuturePool = FuturePool(executorService)

  private def shiftThread[T](response: Future[T]): Future[T] = {
    val shifted = Promise.interrupts[T](response)
    response.respond { t =>
      log.info(null, s"FINAGLE RESPONSE $t IN THREAD ${Thread.currentThread().getName}")
      pool(shifted.update(t))
    }
  }

But the only thing that I achieve is to move the response from the request in the FuturePool what I would like to do is make the request in the FuturePool already

Here as you can see the request/response is done in the finagle/netty thread pool

FINAGLE RESPONSE Return(Response("HTTP/1.1 Status(200)")) IN THREAD finagle/netty4-1-10

@hamdiallam
Copy link
Contributor

hamdiallam commented Jun 29, 2020

Where is the blocking code being run? The same thread will run continuations for Twitter Futures & Promises

if you adjusted the following, I believe you'll see the desired affect.
shifted.respond { t => log.info(null, s"FINAGLE RESPONSE $t IN THREAD ${Thread.currentThread().getName}")

With the sample you've posted response is apart of the finagle thread, the log statement occurring in a continuation on a promise assigned to a Finagle/Netty thread. You want to shift the execution (using the FuturePool), then run the expensive code on the Future returned by the FuturePool, in your sample being shifted

@politrons
Copy link
Author

politrons commented Jun 30, 2020

Hi @hamdiallam This code that I posted was extracted from one filter that is trying to do that. The change you suggest it does not work and I think is logical. The code is just transfering the Finagle thread content into the promise, but when that happens it's too late since the whole request/response has been done in the Finagle Netty thread.

Code here https://github.com/twitter/finagle/blob/develop/finagle-core/src/main/scala/com/twitter/finagle/filter/OffloadFilter.scala#L89

As I see this is a death end I will stop and I will just use the java param we can pass in the process to specify number of workers.

You can close this ticket unless someone else can tell any other workaround

@hamdiallam thanks for all the support mate, cheers

@andrievsky
Copy link

@politrons From my experience you should never block Netty threads if you're looking for high and predictable performance of your service and offload blocking code to another pool.

You could try 2 things, both described here - https://twitter.github.io/finagle/guide/ThreadingModel.html

Just increase number of threads

Or per entire application (JVM process), using command-line flags:

-com.twitter.finagle.offload.numWorkers=14 -com.twitter.finagle.netty4.numWorkers=10

Offload blocking/heavy code

Offloading can be done on per-method (endpoint) basis:


import com.twitter.util.{Future, FuturePool}

def offloadedPermutations(s: String, pool: FuturePool): Future[String] =
  pool(s.permutations.mkString("\n"))

As well as per entire client or server:

import com.twitter.util.FuturePool
import com.twitter.finagle.Http

val server: Http.Server = Http.server
  .withExecutionOffloaded(FuturePool.unboundedPool)

val client: Http.Client = Http.client
  .withExecutionOffloaded(FuturePool.unboundedPool)

@politrons
Copy link
Author

We're using version 18.11.0 which not contains withExecutionOffloaded Like I said before since it's seems it's not possible extend the number of workers programatically I will use com.twitter.finagle.netty4.numWorkers=???

Thanks

@hamdiallam
Copy link
Contributor

@politrons

Can you clarify how the suggestion I posted does not work? I tried it out right now and does. It's the same mechanism in the OffloadFilter

In this example you can consideer executorA to be the finagle/netty threads and executorB being the application worker threads

val executorA = Executors.newCachedThreadPool(new NamedPoolThreadFactory("GroupA", true))
val executorB = Executors.newCachedThreadPool(new NamedPoolThreadFactory("GroupB", true))
val poolA = FuturePool(executorA)
val poolB = FuturePool(executorB)

def shiftToB[T](response: Future[T]): Future[T] = {
  val shifted = Promise.interrupts[T](response)
  response.respond { t => poolB(shifted.update(t)) }
  shifted
}

shiftToB(poolA(1)).map { _ =>
 // blocking code needs to run **after** shifting as a continuation of `shifted`
 // which is updated by a thread in `executorB` as a result of `poolB(shifted.update(t)`.
 // Hence the assigned GroupB thread will also run all of the continuations on `shifted`.
 println(Thread.currentThread().getName)
}

outputs "GroupB-1"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

3 participants