New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48218][CORE] TransportClientFactory.createClient may NPE cause FetchFailedException #46506
Conversation
@@ -169,8 +169,10 @@ public TransportClient createClient(String remoteHost, int remotePort, boolean f | |||
// this code was able to update things. | |||
TransportChannelHandler handler = cachedClient.getChannel().pipeline() | |||
.get(TransportChannelHandler.class); | |||
synchronized (handler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your error message doesn't match with code. Maybe, it came from your own fork?
Caused by: java.lang.NullPointerException
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:178)
@@ -169,8 +169,10 @@ public TransportClient createClient(String remoteHost, int remotePort, boolean f | |||
// this code was able to update things. | |||
TransportChannelHandler handler = cachedClient.getChannel().pipeline() | |||
.get(TransportChannelHandler.class); | |||
synchronized (handler) { | |||
handler.getResponseHandler().updateTimeOfLastRequest(); | |||
if (handler != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is a safe replacement.
Although I don't know your code, if we need to check nullability, shall we do inside synchronized
block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please let me know if you are observing synchronized(handler)
line is the root cause of NPE. I assumed that NPE happens at handler.getRes...
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synchronized
will throw an NPE if called on null
- and handler
is a local variable at this point, so the null
check should be fine.
This should be a fairly rare occurrence - if I am not wrong, this is due to socket close between the isActive
call and the subsequent pipeline().get(TransportChannelHandler.class)
call - but agree with @dongjoon-hyun, we have to make sure the exception is at the sychronized
statement itself : which version was this observed against @cxzl25 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the info.
Yes, it would be a correct fix if we have a matched line error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks everyone for the quick comments!
I observed a small number of cases in the production environment.
This is a modified version based on Spark3.2, so the number of lines is a bit inconsistent.
However, when I checked the number of lines of code and decompiled the jar, they all pointed to synchronized (handler )
this line of code.
The NPE issue here is somewhat similar to SPARK-11865.
When another thread is closing the channel, isActive
may return true, and may be false when checked for the second time. There is an intermediate state, resulting in NPE.
I tried to reproduce this issue in the latest version to match the number of rows, however it's a bit hard to do.
if (cachedClient != null && cachedClient.isActive() // true) {
TransportChannelHandler handler = cachedClient.getChannel().pipeline()
.get(TransportChannelHandler.class);
synchronized (handler) {
handler.getResponseHandler().updateTimeOfLastRequest();
}
if (cachedClient.isActive() // false) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarifying !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+CC @yaooqinn as well who last reviewed some of this code.
@@ -169,8 +169,10 @@ public TransportClient createClient(String remoteHost, int remotePort, boolean f | |||
// this code was able to update things. | |||
TransportChannelHandler handler = cachedClient.getChannel().pipeline() | |||
.get(TransportChannelHandler.class); | |||
synchronized (handler) { | |||
handler.getResponseHandler().updateTimeOfLastRequest(); | |||
if (handler != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synchronized
will throw an NPE if called on null
- and handler
is a local variable at this point, so the null
check should be fine.
This should be a fairly rare occurrence - if I am not wrong, this is due to socket close between the isActive
call and the subsequent pipeline().get(TransportChannelHandler.class)
call - but agree with @dongjoon-hyun, we have to make sure the exception is at the sychronized
statement itself : which version was this observed against @cxzl25 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine with the change, assuming the exception was observed at the right location - agree with @dongjoon-hyun, this is unclear right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+CC @dongjoon-hyun
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for being late. I was traveling.
…s null when creating client ### What changes were proposed in this pull request? `TransportClientFactory` checks whether `handler` is null when creating client. ### Why are the changes needed? There is a case that `cachedClient.isActive()` may return true and may return false when checked for the second time when another thread is closing the channel, which causes that the `handler` may be null. Therefore, `TransportClientFactory` should check whether handler is null when creating client. Backport apache/spark#46506. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA. Closes #2517 from SteNicholas/CELEBORN-1430. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
What changes were proposed in this pull request?
This PR aims to add a check for
TransportChannelHandler
to be non-null in theTransportClientFactory.createClient
method.Why are the changes needed?
Line 178 synchronized (handler) , handler == null
org.apache.spark.network.client.TransportClientFactory#createClient(java.lang.String, int, boolean)
Does this PR introduce any user-facing change?
No
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
No