aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authortedyu <yuzhihong@gmail.com>2016-05-20 18:13:18 -0500
committerSean Owen <sowen@cloudera.com>2016-05-20 18:13:18 -0500
commit06c9f520714e07259c6f8ce6f9ea5a230a278cb5 (patch)
treef940eae227d7b6fc3943f4f224604a9a40ba72e8 /yarn
parenta78d6ce376edf2a8836e01f47b9dff5371058d4c (diff)
downloadspark-06c9f520714e07259c6f8ce6f9ea5a230a278cb5.tar.gz
spark-06c9f520714e07259c6f8ce6f9ea5a230a278cb5.tar.bz2
spark-06c9f520714e07259c6f8ce6f9ea5a230a278cb5.zip
[SPARK-15273] YarnSparkHadoopUtil#getOutOfMemoryErrorArgument should respect OnOutOfMemoryError parameter given by user
## What changes were proposed in this pull request? As Nirav reported in this thread: http://search-hadoop.com/m/q3RTtdF3yNLMd7u YarnSparkHadoopUtil#getOutOfMemoryErrorArgument previously specified 'kill %p' unconditionally. We should respect the parameter given by user. ## How was this patch tested? Existing tests Author: tedyu <yuzhihong@gmail.com> Closes #13057 from tedyu/master.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala9
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala23
2 files changed, 17 insertions, 15 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 3d370e6d71..fc753b7e75 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -211,15 +211,10 @@ private[yarn] class ExecutorRunnable(
Seq("--user-class-path", "file:" + absPath)
}.toSeq
+ YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
val commands = prefixEnv ++ Seq(
YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java",
- "-server",
- // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
- // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
- // an inconsistent state.
- // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
- // 'something' to fail job ... akin to blacklisting trackers in mapred ?
- YarnSparkHadoopUtil.getOutOfMemoryErrorArgument) ++
+ "-server") ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
"--driver-url", masterAddress.toString,
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 44181610d7..de6cd94613 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -24,7 +24,8 @@ import java.security.PrivilegedExceptionAction
import java.util.regex.Matcher
import java.util.regex.Pattern
-import scala.collection.mutable.HashMap
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{HashMap, ListBuffer}
import scala.reflect.runtime._
import scala.util.Try
@@ -405,6 +406,12 @@ object YarnSparkHadoopUtil {
}
/**
+ * Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
+ * Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
+ * an inconsistent state.
+ * TODO: If the OOM is not recoverable by rescheduling it on different node, then do
+ * 'something' to fail job ... akin to blacklisting trackers in mapred ?
+ *
* The handler if an OOM Exception is thrown by the JVM must be configured on Windows
* differently: the 'taskkill' command should be used, whereas Unix-based systems use 'kill'.
*
@@ -415,14 +422,14 @@ object YarnSparkHadoopUtil {
* the behavior of '%' in a .cmd file: it gets interpreted as an incomplete environment
* variable. Windows .cmd files escape a '%' by '%%'. Thus, the correct way of writing
* '%%p' in an escaped way is '%%%%p'.
- *
- * @return The correct OOM Error handler JVM option, platform dependent.
*/
- def getOutOfMemoryErrorArgument: String = {
- if (Utils.isWindows) {
- escapeForShell("-XX:OnOutOfMemoryError=taskkill /F /PID %%%%p")
- } else {
- "-XX:OnOutOfMemoryError='kill %p'"
+ private[yarn] def addOutOfMemoryErrorArgument(javaOpts: ListBuffer[String]): Unit = {
+ if (!javaOpts.exists(_.contains("-XX:OnOutOfMemoryError"))) {
+ if (Utils.isWindows) {
+ javaOpts += escapeForShell("-XX:OnOutOfMemoryError=taskkill /F /PID %%%%p")
+ } else {
+ javaOpts += "-XX:OnOutOfMemoryError='kill %p'"
+ }
}
}