New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] attempt to use external submission for rescheduling of actor mailboxes #31156
base: main
Are you sure you want to change the base?
Conversation
To avoid fairness issue on JDK 17. Refs akka#17341, akka#31117
@@ -232,7 +232,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) | |||
} | |||
} finally { | |||
setAsIdle() //Volatile write, needed here | |||
dispatcher.registerForExecution(this, false, false) | |||
dispatcher.registerForExecution(this, false, false, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the magic improvement where we say that we want to "reschedule" the mailbox, i.e. we yield because this actor has reached its throughput.
@@ -217,6 +218,10 @@ trait ExecutorServiceDelegate extends ExecutorService { | |||
def executor: ExecutorService | |||
|
|||
def execute(command: Runnable) = executor.execute(command) | |||
def executeExternal(command: Runnable) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is public API, so not sure if would want to add that method here publicly?
handle.invokeWithArguments(createTask(r)) | ||
|
||
private val handle = { | ||
val m = classOf[ForkJoinPool].getDeclaredMethod("externalPush", classOf[ForkJoinTask[_]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's hope that this method is available on all supported JDKs...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it was there already in 1.8 at least: https://hg.openjdk.java.net/jdk8/jdk8/jdk/file/687fd7c7986d/src/share/classes/java/util/concurrent/ForkJoinPool.java#l1481
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good, but as you say, we need to benchmark it (with low, default throughput setting) and possibly add a feature flag
@@ -120,11 +120,15 @@ class Dispatcher( | |||
protected[akka] override def registerForExecution( | |||
mbox: Mailbox, | |||
hasMessageHint: Boolean, | |||
hasSystemMessageHint: Boolean): Boolean = { | |||
hasSystemMessageHint: Boolean, | |||
rescheduled: Boolean): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
registerForExecution
is called from 3 places. Do we need the extra parameter or can we handle all of them as executeExternal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's one of the remaining questions. We would have to benchmark to be sure. The idea of this approach is that we only use external submission if an actor exhausted its throughput batch. This way all other submissions (e.g. sending a message to another actor) could still be scheduled on the local queue. Also, all the other calls would use a normal method invocation for scheduling (instead of a reflective method handle invocation) which might have a performance benefit.
Whether the solution in this PR is good enough is another question, because it would mean that a busy loop involving two or more actors could still starve a pool thread. But that's the whole issue here: we would have to define a sensible fairness metric and experiment if optimizing for that fairness metric would make a difference in certain scenarios. After all, right now fairness is based on "number of message processed" (by setting throughput
) which may or may not lead to a sensible behavior (and which is easy to enough to "exploit" accidentally by blocking or long running CPU-intensive tasks).
This needs a decision if we care enough about the fairness issue (to add this kind of complexity which needs to use unofficial API to make it work). My take on this is in the above comment. If you have busy loops in your programs, you should know that they pose a fairness issue in any case (thread starvation issues, stream graph starvation, missing safepoints leading to GC issues, etc.) If you care about fairness, and have long-running CPU-intensive loads, you should run them on dedicated resource pools to contain them. This often means delegating the problem to the OS which usually has better metrics to enforce fairness. On the other hand, fairness issues can be quite hard to identify so sensible automatic behavior can be a lifesaver... Others had differing opinions in #31117 (comment). |
openjdk/jdk#11319 is added in JDK20 |
To avoid fairness issue on JDK 17.
Refs #17341, #31117
This would have to be hidden behind a feature flag, needs benchmarks to be run (should be fine, though), but also we would have to consider whether using reflection would be possible (because it needs extra flags at least on JDK 17). This extra requirement could be mentioned on the config setting.