Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] attempt to use external submission for rescheduling of actor mailboxes #31156

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

jrudolph
Copy link
Member

To avoid fairness issue on JDK 17.

Refs #17341, #31117

This would have to be hidden behind a feature flag, needs benchmarks to be run (should be fine, though), but also we would have to consider whether using reflection would be possible (because it needs extra flags at least on JDK 17). This extra requirement could be mentioned on the config setting.

@@ -232,7 +232,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
}
} finally {
setAsIdle() //Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
dispatcher.registerForExecution(this, false, false, true)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the magic improvement where we say that we want to "reschedule" the mailbox, i.e. we yield because this actor has reached its throughput.

@@ -217,6 +218,10 @@ trait ExecutorServiceDelegate extends ExecutorService {
def executor: ExecutorService

def execute(command: Runnable) = executor.execute(command)
def executeExternal(command: Runnable) =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is public API, so not sure if would want to add that method here publicly?

handle.invokeWithArguments(createTask(r))

private val handle = {
val m = classOf[ForkJoinPool].getDeclaredMethod("externalPush", classOf[ForkJoinTask[_]])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's hope that this method is available on all supported JDKs...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, but as you say, we need to benchmark it (with low, default throughput setting) and possibly add a feature flag

@@ -120,11 +120,15 @@ class Dispatcher(
protected[akka] override def registerForExecution(
mbox: Mailbox,
hasMessageHint: Boolean,
hasSystemMessageHint: Boolean): Boolean = {
hasSystemMessageHint: Boolean,
rescheduled: Boolean): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

registerForExecution is called from 3 places. Do we need the extra parameter or can we handle all of them as executeExternal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's one of the remaining questions. We would have to benchmark to be sure. The idea of this approach is that we only use external submission if an actor exhausted its throughput batch. This way all other submissions (e.g. sending a message to another actor) could still be scheduled on the local queue. Also, all the other calls would use a normal method invocation for scheduling (instead of a reflective method handle invocation) which might have a performance benefit.

Whether the solution in this PR is good enough is another question, because it would mean that a busy loop involving two or more actors could still starve a pool thread. But that's the whole issue here: we would have to define a sensible fairness metric and experiment if optimizing for that fairness metric would make a difference in certain scenarios. After all, right now fairness is based on "number of message processed" (by setting throughput) which may or may not lead to a sensible behavior (and which is easy to enough to "exploit" accidentally by blocking or long running CPU-intensive tasks).

@jrudolph
Copy link
Member Author

This needs a decision if we care enough about the fairness issue (to add this kind of complexity which needs to use unofficial API to make it work).

My take on this is in the above comment. If you have busy loops in your programs, you should know that they pose a fairness issue in any case (thread starvation issues, stream graph starvation, missing safepoints leading to GC issues, etc.) If you care about fairness, and have long-running CPU-intensive loads, you should run them on dedicated resource pools to contain them. This often means delegating the problem to the OS which usually has better metrics to enforce fairness.

On the other hand, fairness issues can be quite hard to identify so sensible automatic behavior can be a lifesaver...

Others had differing opinions in #31117 (comment).

@He-Pin
Copy link
Member

He-Pin commented Feb 18, 2023

openjdk/jdk#11319 is added in JDK20

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants