New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48209][CORE] Common (java side): Migrate error/warn/info
with variables to structured logging framework
#46493
Conversation
@@ -136,7 +135,7 @@ public void destroy() { | |||
try { | |||
manager.destroy(); | |||
} catch (InterruptedException ex) { | |||
logger.info("Interrupted while destroying trust manager: " + ex.toString(), ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove redundant ex.toString()
.
listener.getTransferType(), blockIdsToTransfer.length, | ||
numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e); | ||
|
||
if (numRetries > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Print different log message according to the value of numRetries
@@ -200,7 +200,7 @@ private[sql] object GrpcRetryHandler extends Logging { | |||
if (time.isDefined) { | |||
logWarning( | |||
log"Non-Fatal error during RPC execution: ${MDC(ERROR, lastException)}, " + | |||
log"retrying (wait=${MDC(WAIT_TIME, time.get.toMillis)} ms, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unify WAIT_TIME
into RETRY_WAIT_TIME
@@ -30,7 +30,7 @@ import com.codahale.metrics.{Metric, MetricSet} | |||
|
|||
import org.apache.spark.{SecurityManager, SparkConf} | |||
import org.apache.spark.ExecutorDeadException | |||
import org.apache.spark.internal.config | |||
import org.apache.spark.internal.{config, LogKeys, MDC} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because NettyBlockTransferService
extends BlockTransferService
BlockTransferService
is java code, and this change triggered
it.
@@ -19,6 +19,11 @@ | |||
|
|||
public class LoggerFactory { | |||
|
|||
public static Logger getLogger(String name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -368,7 +382,8 @@ public int removeBlocks(String appId, String execId, String[] blockIds) { | |||
if (file.delete()) { | |||
numRemovedBlocks++; | |||
} else { | |||
logger.warn("Failed to delete block: " + file.getAbsolutePath()); | |||
logger.warn("Failed to delete block: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it's appropriate to call this LogKeys.PATH$.MODULE$
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, path is fine
@@ -472,7 +487,8 @@ static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(D | |||
break; | |||
} | |||
AppExecId id = parseDbAppExecKey(key); | |||
logger.info("Reloading registered executors: " + id.toString()); | |||
logger.info("Reloading registered executors: {}", | |||
MDC.of(LogKeys.APP_EXECUTOR_ID$.MODULE$, id.toString())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove redundancy .toString()
@@ -211,13 +210,13 @@ public void run() { | |||
this.reloadCount += 1; | |||
} catch (Exception ex) { | |||
logger.warn( | |||
"Could not load truststore (keep using existing one) : " + ex.toString(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove redundancy ex.toString()
@@ -298,7 +303,9 @@ public void onFailure(Throwable e) { | |||
}); | |||
} catch (Exception e) { | |||
logger.error("Error while invoking receiveMergeBlockMetaReq() for appId {} shuffleId {} " | |||
+ "reduceId {}", req.appId, req.shuffleId, req.appId, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix typo
reduceId {req.appId}
-> reduceId {req.reduceId}
@@ -363,7 +367,8 @@ static MergedShuffleFileManager newMergedShuffleFileManagerInstance( | |||
return mergeManagerSubClazz.getConstructor(TransportConf.class, File.class) | |||
.newInstance(conf, mergeManagerFile); | |||
} catch (Exception e) { | |||
defaultLogger.error("Unable to create an instance of {}", mergeManagerImplClassName); | |||
defaultLogger.error("Unable to create an instance of {}", | |||
MDC.of(LogKeys.CLASS_NAME$.MODULE$, mergeManagerImplClassName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or call MERGED_SHUFFLE_FILE_MANAGER
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CLASS_NAME looks good.
@@ -1999,7 +2042,9 @@ private AppPathsInfo( | |||
this.subDirsPerLocalDir = subDirsPerLocalDir; | |||
if (logger.isInfoEnabled()) { | |||
logger.info("Updated active local dirs {} and sub dirs {} for application {}", | |||
Arrays.toString(activeLocalDirs),subDirsPerLocalDir, appId); | |||
MDC.of(LogKeys.LOCAL_DIRS$.MODULE$, Arrays.toString(activeLocalDirs)), | |||
MDC.of(LogKeys.NUM_SUB_DIRS$.MODULE$, subDirsPerLocalDir), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The subDirsPerLocalDir
is actually a numbe
r (the number of sub directories), not a sub directory path
.
logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs); | ||
logger.info("Application {} removed, cleanupLocalDirs = {}", | ||
MDC.of(LogKeys.APP_ID$.MODULE$, appId), | ||
MDC.of(LogKeys.LOCAL_DIRS$.MODULE$, cleanupLocalDirs)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleanupLocalDirs is a boolean. let's just use CLEANUP_LOCAL_DIRS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
LGTM overall. Thanks for the work! |
Updated, thanks! ❤️ |
@@ -292,6 +301,7 @@ object LogKeys { | |||
case object LOADED_VERSION extends LogKey | |||
case object LOAD_FACTOR extends LogKey | |||
case object LOAD_TIME extends LogKey | |||
case object LOCAL_DIRS extends LogKey |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need this anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, merge into PATHS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Thanks, merging to master |
What changes were proposed in this pull request?
The pr aims to
1.migrate
error/warn/info
in modulecommon
with variables tostructured logging framework
for java side.2.convert all dependencies on
org.slf4j.Logger & org.slf4j.LoggerFactory
toorg.apache.spark.internal.Logger & org.apache.spark.internal.LoggerFactory
, in order to completelyprohibit
importingorg.slf4j.Logger & org.slf4j.LoggerFactory
in java code later.Why are the changes needed?
To enhance Apache Spark's logging system by implementing structured logging.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
No.