From 7712ed5b16d809e4cf63285b78f9b65d2588fb21 Mon Sep 17 00:00:00 2001 From: Masayoshi TSUZUKI Date: Sun, 1 Feb 2015 18:26:28 -0800 Subject: [SPARK-1825] Make Windows Spark client work fine with Linux YARN cluster Modified environment strings and path separators to platform-independent style if possible. Author: Masayoshi TSUZUKI Closes #3943 from tsudukim/feature/SPARK-1825 and squashes the following commits: ec4b865 [Masayoshi TSUZUKI] Rebased and modified as comments. f8a1d5a [Masayoshi TSUZUKI] Merge branch 'master' of github.com:tsudukim/spark into feature/SPARK-1825 3d03d35 [Masayoshi TSUZUKI] [SPARK-1825] Make Windows Spark client work fine with Linux YARN cluster --- .../org/apache/spark/deploy/yarn/Client.scala | 21 +++++++++++---- .../spark/deploy/yarn/ExecutorRunnable.scala | 8 ++++-- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 31 +++++++++++++++++++++- .../org/apache/spark/deploy/yarn/ClientSuite.scala | 18 ++++++++----- .../deploy/yarn/YarnSparkHadoopUtilSuite.scala | 25 +++++++++++++++++ 5 files changed, 89 insertions(+), 14 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index d4eeccf642..1a18e6509e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -400,7 +400,10 @@ private[spark] class Client( // Add Xmx for AM memory javaOpts += "-Xmx" + args.amMemory + "m" - val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + val tmpDir = new Path( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), + YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR + ) javaOpts += "-Djava.io.tmpdir=" + tmpDir // TODO: Remove once cpuset version is pushed out. @@ -491,7 +494,9 @@ private[spark] class Client( "--num-executors ", args.numExecutors.toString) // Command for the ApplicationMaster - val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++ + val commands = prefixEnv ++ Seq( + YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server" + ) ++ javaOpts ++ amArgs ++ Seq( "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", @@ -769,7 +774,9 @@ object Client extends Logging { env: HashMap[String, String], extraClassPath: Option[String] = None): Unit = { extraClassPath.foreach(addClasspathEntry(_, env)) - addClasspathEntry(Environment.PWD.$(), env) + addClasspathEntry( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env + ) // Normally the users app.jar is last in case conflicts with spark jars if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) { @@ -783,7 +790,9 @@ object Client extends Logging { } // Append all jar files under the working directory to the classpath. - addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env) + addClasspathEntry( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + "*", env + ) } /** @@ -838,7 +847,9 @@ object Client extends Logging { } } if (fileName != null) { - addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env) + addClasspathEntry( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + fileName, env + ) } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index c537da9f67..ee2002a35f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -142,7 +142,10 @@ class ExecutorRunnable( } javaOpts += "-Djava.io.tmpdir=" + - new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + new Path( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), + YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR + ) // Certain configs need to be passed here because they are needed before the Executor // registers with the Scheduler and transfers the spark configs. Since the Executor backend @@ -181,7 +184,8 @@ class ExecutorRunnable( // For log4j configuration to reference javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) - val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", + val commands = prefixEnv ++ Seq( + YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server", // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 4e39c1d580..146b2c0f1a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -22,12 +22,15 @@ import java.util.regex.Matcher import java.util.regex.Pattern import scala.collection.mutable.HashMap +import scala.util.Try import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.api.ApplicationConstants +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType} import org.apache.hadoop.conf.Configuration @@ -102,7 +105,7 @@ object YarnSparkHadoopUtil { * If the map already contains this key, append the value to the existing value instead. */ def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = { - val newValue = if (env.contains(key)) { env(key) + File.pathSeparator + value } else value + val newValue = if (env.contains(key)) { env(key) + getClassPathSeparator + value } else value env.put(key, newValue) } @@ -182,4 +185,30 @@ object YarnSparkHadoopUtil { ) } + /** + * Expand environment variable using Yarn API. + * If environment.$$() is implemented, return the result of it. + * Otherwise, return the result of environment.$() + * Note: $$() is added in Hadoop 2.4. + */ + private lazy val expandMethod = + Try(classOf[Environment].getMethod("$$")) + .getOrElse(classOf[Environment].getMethod("$")) + + def expandEnvironment(environment: Environment): String = + expandMethod.invoke(environment).asInstanceOf[String] + + /** + * Get class path separator using Yarn API. + * If ApplicationConstants.CLASS_PATH_SEPARATOR is implemented, return it. + * Otherwise, return File.pathSeparator + * Note: CLASS_PATH_SEPARATOR is added in Hadoop 2.4. + */ + private lazy val classPathSeparatorField = + Try(classOf[ApplicationConstants].getField("CLASS_PATH_SEPARATOR")) + .getOrElse(classOf[File].getField("pathSeparator")) + + def getClassPathSeparator(): String = { + classPathSeparatorField.get(null).asInstanceOf[String] + } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index aad50015b7..2bb3dcffd6 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -28,8 +28,6 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.mockito.Matchers._ import org.mockito.Mockito._ - - import org.scalatest.FunSuite import org.scalatest.Matchers @@ -89,7 +87,7 @@ class ClientSuite extends FunSuite with Matchers { Client.populateClasspath(args, conf, sparkConf, env) - val cp = env("CLASSPATH").split(File.pathSeparator) + val cp = env("CLASSPATH").split(":|;|") s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => val uri = new URI(entry) if (Client.LOCAL_SCHEME.equals(uri.getScheme())) { @@ -98,8 +96,16 @@ class ClientSuite extends FunSuite with Matchers { cp should not contain (uri.getPath()) } }) - cp should contain (Environment.PWD.$()) - cp should contain (s"${Environment.PWD.$()}${File.separator}*") + if (classOf[Environment].getMethods().exists(_.getName == "$$")) { + cp should contain("{{PWD}}") + cp should contain(s"{{PWD}}${Path.SEPARATOR}*") + } else if (Utils.isWindows) { + cp should contain("%PWD%") + cp should contain(s"%PWD%${Path.SEPARATOR}*") + } else { + cp should contain(Environment.PWD.$()) + cp should contain(s"${Environment.PWD.$()}${File.separator}*") + } cp should not contain (Client.SPARK_JAR) cp should not contain (Client.APP_JAR) } @@ -223,7 +229,7 @@ class ClientSuite extends FunSuite with Matchers { def newEnv = MutableHashMap[String, String]() - def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;") + def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;|") def flatten(a: Option[Seq[String]], b: Option[Seq[String]]) = (a ++ b).flatten.toArray diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 2cc5abb3a8..b5a2db8f62 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -20,12 +20,15 @@ package org.apache.spark.deploy.yarn import java.io.{File, IOException} import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.yarn.api.ApplicationConstants +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.conf.YarnConfiguration import org.scalatest.{FunSuite, Matchers} import org.apache.hadoop.yarn.api.records.ApplicationAccessType import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.util.Utils class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { @@ -148,4 +151,26 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { } } + + test("test expandEnvironment result") { + val target = Environment.PWD + if (classOf[Environment].getMethods().exists(_.getName == "$$")) { + YarnSparkHadoopUtil.expandEnvironment(target) should be ("{{" + target + "}}") + } else if (Utils.isWindows) { + YarnSparkHadoopUtil.expandEnvironment(target) should be ("%" + target + "%") + } else { + YarnSparkHadoopUtil.expandEnvironment(target) should be ("$" + target) + } + + } + + test("test getClassPathSeparator result") { + if (classOf[ApplicationConstants].getFields().exists(_.getName == "CLASS_PATH_SEPARATOR")) { + YarnSparkHadoopUtil.getClassPathSeparator() should be ("") + } else if (Utils.isWindows) { + YarnSparkHadoopUtil.getClassPathSeparator() should be (";") + } else { + YarnSparkHadoopUtil.getClassPathSeparator() should be (":") + } + } } -- cgit v1.2.3