Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48218][CORE] TransportClientFactory.createClient may NPE cause FetchFailedException #46506

Closed
wants to merge 1 commit into from

Conversation

cxzl25
Copy link
Contributor

@cxzl25 cxzl25 commented May 9, 2024

What changes were proposed in this pull request?

This PR aims to add a check for TransportChannelHandler to be non-null in the TransportClientFactory.createClient method.

Why are the changes needed?

Line 178 synchronized (handler) , handler == null

org.apache.spark.network.client.TransportClientFactory#createClient(java.lang.String, int, boolean)

      TransportChannelHandler handler = cachedClient.getChannel().pipeline()
        .get(TransportChannelHandler.class);
      synchronized (handler) {
        handler.getResponseHandler().updateTimeOfLastRequest();
      }
org.apache.spark.shuffle.FetchFailedException
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:913)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)

Caused by: java.lang.NullPointerException
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:178)
	at org.apache.spark.network.shuffle.ExternalBlockStoreClient.lambda$fetchBlocks$0(ExternalBlockStoreClient.java:128)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:154)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.start(RetryingBlockTransferor.java:133)
	at org.apache.spark.network.shuffle.ExternalBlockStoreClient.fetchBlocks(ExternalBlockStoreClient.java:139)

Does this PR introduce any user-facing change?

No

How was this patch tested?

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label May 9, 2024
@@ -169,8 +169,10 @@ public TransportClient createClient(String remoteHost, int remotePort, boolean f
// this code was able to update things.
TransportChannelHandler handler = cachedClient.getChannel().pipeline()
.get(TransportChannelHandler.class);
synchronized (handler) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your error message doesn't match with code. Maybe, it came from your own fork?

Caused by: java.lang.NullPointerException
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:178)

@@ -169,8 +169,10 @@ public TransportClient createClient(String remoteHost, int remotePort, boolean f
// this code was able to update things.
TransportChannelHandler handler = cachedClient.getChannel().pipeline()
.get(TransportChannelHandler.class);
synchronized (handler) {
handler.getResponseHandler().updateTimeOfLastRequest();
if (handler != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is a safe replacement.

Although I don't know your code, if we need to check nullability, shall we do inside synchronized block?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me know if you are observing synchronized(handler) line is the root cause of NPE. I assumed that NPE happens at handler.getRes....

Copy link
Contributor

@mridulm mridulm May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized will throw an NPE if called on null - and handler is a local variable at this point, so the null check should be fine.

This should be a fairly rare occurrence - if I am not wrong, this is due to socket close between the isActive call and the subsequent pipeline().get(TransportChannelHandler.class) call - but agree with @dongjoon-hyun, we have to make sure the exception is at the sychronized statement itself : which version was this observed against @cxzl25 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the info.
Yes, it would be a correct fix if we have a matched line error message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks everyone for the quick comments!

I observed a small number of cases in the production environment.
This is a modified version based on Spark3.2, so the number of lines is a bit inconsistent.
However, when I checked the number of lines of code and decompiled the jar, they all pointed to synchronized (handler ) this line of code.

image

The NPE issue here is somewhat similar to SPARK-11865.
When another thread is closing the channel, isActive may return true, and may be false when checked for the second time. There is an intermediate state, resulting in NPE.

I tried to reproduce this issue in the latest version to match the number of rows, however it's a bit hard to do.

    if (cachedClient != null && cachedClient.isActive() // true) {
      TransportChannelHandler handler = cachedClient.getChannel().pipeline()
        .get(TransportChannelHandler.class);
      synchronized (handler) {
        handler.getResponseHandler().updateTimeOfLastRequest();
      }

      if (cachedClient.isActive() // false) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying !

mridulm
mridulm previously approved these changes May 9, 2024
Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+CC @yaooqinn as well who last reviewed some of this code.

@@ -169,8 +169,10 @@ public TransportClient createClient(String remoteHost, int remotePort, boolean f
// this code was able to update things.
TransportChannelHandler handler = cachedClient.getChannel().pipeline()
.get(TransportChannelHandler.class);
synchronized (handler) {
handler.getResponseHandler().updateTimeOfLastRequest();
if (handler != null) {
Copy link
Contributor

@mridulm mridulm May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized will throw an NPE if called on null - and handler is a local variable at this point, so the null check should be fine.

This should be a fairly rare occurrence - if I am not wrong, this is due to socket close between the isActive call and the subsequent pipeline().get(TransportChannelHandler.class) call - but agree with @dongjoon-hyun, we have to make sure the exception is at the sychronized statement itself : which version was this observed against @cxzl25 ?

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with the change, assuming the exception was observed at the right location - agree with @dongjoon-hyun, this is unclear right now.

@mridulm mridulm dismissed their stale review May 9, 2024 19:42

See followup

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-48218][SHUFFLE] TransportClientFactory.createClient may NPE cause FetchFailedException [SPARK-48218][CORE] TransportClientFactory.createClient may NPE cause FetchFailedException May 9, 2024
Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for being late. I was traveling.

@dongjoon-hyun
Copy link
Member

Merged to master for Apache Spark 4.0.0-preview2-rc2.

Thank you, @cxzl25 , @mridulm , @yaooqinn .

FMX pushed a commit to apache/celeborn that referenced this pull request May 16, 2024
…s null when creating client

### What changes were proposed in this pull request?

`TransportClientFactory` checks whether `handler` is null when creating client.

### Why are the changes needed?

There is a case that `cachedClient.isActive()` may return true and may return false when checked for the second time when another thread is closing the channel, which causes that the `handler` may be null. Therefore, `TransportClientFactory` should check whether handler is null when creating client.

Backport apache/spark#46506.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

GA.

Closes #2517 from SteNicholas/CELEBORN-1430.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants