aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
authorMasayoshi TSUZUKI <tsudukim@oss.nttdata.co.jp>2015-02-01 18:26:28 -0800
committerAndrew Or <andrew@databricks.com>2015-02-01 18:28:55 -0800
commit7712ed5b16d809e4cf63285b78f9b65d2588fb21 (patch)
treefa2f350e3ea49ef0de516b52032a078aaa2468ab /yarn/src
parent1ca0a1014e3782dd0045d6e403992ac5114486ad (diff)
downloadspark-7712ed5b16d809e4cf63285b78f9b65d2588fb21.tar.gz
spark-7712ed5b16d809e4cf63285b78f9b65d2588fb21.tar.bz2
spark-7712ed5b16d809e4cf63285b78f9b65d2588fb21.zip
[SPARK-1825] Make Windows Spark client work fine with Linux YARN cluster
Modified environment strings and path separators to platform-independent style if possible. Author: Masayoshi TSUZUKI <tsudukim@oss.nttdata.co.jp> Closes #3943 from tsudukim/feature/SPARK-1825 and squashes the following commits: ec4b865 [Masayoshi TSUZUKI] Rebased and modified as comments. f8a1d5a [Masayoshi TSUZUKI] Merge branch 'master' of github.com:tsudukim/spark into feature/SPARK-1825 3d03d35 [Masayoshi TSUZUKI] [SPARK-1825] Make Windows Spark client work fine with Linux YARN cluster
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala21
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala8
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala31
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala18
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala25
5 files changed, 89 insertions, 14 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index d4eeccf642..1a18e6509e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -400,7 +400,10 @@ private[spark] class Client(
// Add Xmx for AM memory
javaOpts += "-Xmx" + args.amMemory + "m"
- val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+ val tmpDir = new Path(
+ YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+ YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR
+ )
javaOpts += "-Djava.io.tmpdir=" + tmpDir
// TODO: Remove once cpuset version is pushed out.
@@ -491,7 +494,9 @@ private[spark] class Client(
"--num-executors ", args.numExecutors.toString)
// Command for the ApplicationMaster
- val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
+ val commands = prefixEnv ++ Seq(
+ YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server"
+ ) ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
@@ -769,7 +774,9 @@ object Client extends Logging {
env: HashMap[String, String],
extraClassPath: Option[String] = None): Unit = {
extraClassPath.foreach(addClasspathEntry(_, env))
- addClasspathEntry(Environment.PWD.$(), env)
+ addClasspathEntry(
+ YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env
+ )
// Normally the users app.jar is last in case conflicts with spark jars
if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
@@ -783,7 +790,9 @@ object Client extends Logging {
}
// Append all jar files under the working directory to the classpath.
- addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env)
+ addClasspathEntry(
+ YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + "*", env
+ )
}
/**
@@ -838,7 +847,9 @@ object Client extends Logging {
}
}
if (fileName != null) {
- addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env)
+ addClasspathEntry(
+ YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + fileName, env
+ )
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index c537da9f67..ee2002a35f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -142,7 +142,10 @@ class ExecutorRunnable(
}
javaOpts += "-Djava.io.tmpdir=" +
- new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+ new Path(
+ YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+ YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR
+ )
// Certain configs need to be passed here because they are needed before the Executor
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
@@ -181,7 +184,8 @@ class ExecutorRunnable(
// For log4j configuration to reference
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
- val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java",
+ val commands = prefixEnv ++ Seq(
+ YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java",
"-server",
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
// Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 4e39c1d580..146b2c0f1a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -22,12 +22,15 @@ import java.util.regex.Matcher
import java.util.regex.Pattern
import scala.collection.mutable.HashMap
+import scala.util.Try
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
import org.apache.hadoop.conf.Configuration
@@ -102,7 +105,7 @@ object YarnSparkHadoopUtil {
* If the map already contains this key, append the value to the existing value instead.
*/
def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = {
- val newValue = if (env.contains(key)) { env(key) + File.pathSeparator + value } else value
+ val newValue = if (env.contains(key)) { env(key) + getClassPathSeparator + value } else value
env.put(key, newValue)
}
@@ -182,4 +185,30 @@ object YarnSparkHadoopUtil {
)
}
+ /**
+ * Expand environment variable using Yarn API.
+ * If environment.$$() is implemented, return the result of it.
+ * Otherwise, return the result of environment.$()
+ * Note: $$() is added in Hadoop 2.4.
+ */
+ private lazy val expandMethod =
+ Try(classOf[Environment].getMethod("$$"))
+ .getOrElse(classOf[Environment].getMethod("$"))
+
+ def expandEnvironment(environment: Environment): String =
+ expandMethod.invoke(environment).asInstanceOf[String]
+
+ /**
+ * Get class path separator using Yarn API.
+ * If ApplicationConstants.CLASS_PATH_SEPARATOR is implemented, return it.
+ * Otherwise, return File.pathSeparator
+ * Note: CLASS_PATH_SEPARATOR is added in Hadoop 2.4.
+ */
+ private lazy val classPathSeparatorField =
+ Try(classOf[ApplicationConstants].getField("CLASS_PATH_SEPARATOR"))
+ .getOrElse(classOf[File].getField("pathSeparator"))
+
+ def getClassPathSeparator(): String = {
+ classPathSeparatorField.get(null).asInstanceOf[String]
+ }
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index aad50015b7..2bb3dcffd6 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -28,8 +28,6 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.mockito.Matchers._
import org.mockito.Mockito._
-
-
import org.scalatest.FunSuite
import org.scalatest.Matchers
@@ -89,7 +87,7 @@ class ClientSuite extends FunSuite with Matchers {
Client.populateClasspath(args, conf, sparkConf, env)
- val cp = env("CLASSPATH").split(File.pathSeparator)
+ val cp = env("CLASSPATH").split(":|;|<CPS>")
s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
val uri = new URI(entry)
if (Client.LOCAL_SCHEME.equals(uri.getScheme())) {
@@ -98,8 +96,16 @@ class ClientSuite extends FunSuite with Matchers {
cp should not contain (uri.getPath())
}
})
- cp should contain (Environment.PWD.$())
- cp should contain (s"${Environment.PWD.$()}${File.separator}*")
+ if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
+ cp should contain("{{PWD}}")
+ cp should contain(s"{{PWD}}${Path.SEPARATOR}*")
+ } else if (Utils.isWindows) {
+ cp should contain("%PWD%")
+ cp should contain(s"%PWD%${Path.SEPARATOR}*")
+ } else {
+ cp should contain(Environment.PWD.$())
+ cp should contain(s"${Environment.PWD.$()}${File.separator}*")
+ }
cp should not contain (Client.SPARK_JAR)
cp should not contain (Client.APP_JAR)
}
@@ -223,7 +229,7 @@ class ClientSuite extends FunSuite with Matchers {
def newEnv = MutableHashMap[String, String]()
- def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;")
+ def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;|<CPS>")
def flatten(a: Option[Seq[String]], b: Option[Seq[String]]) = (a ++ b).flatten.toArray
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 2cc5abb3a8..b5a2db8f62 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -20,12 +20,15 @@ package org.apache.spark.deploy.yarn
import java.io.{File, IOException}
import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.{FunSuite, Matchers}
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.util.Utils
class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
@@ -148,4 +151,26 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
}
}
+
+ test("test expandEnvironment result") {
+ val target = Environment.PWD
+ if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
+ YarnSparkHadoopUtil.expandEnvironment(target) should be ("{{" + target + "}}")
+ } else if (Utils.isWindows) {
+ YarnSparkHadoopUtil.expandEnvironment(target) should be ("%" + target + "%")
+ } else {
+ YarnSparkHadoopUtil.expandEnvironment(target) should be ("$" + target)
+ }
+
+ }
+
+ test("test getClassPathSeparator result") {
+ if (classOf[ApplicationConstants].getFields().exists(_.getName == "CLASS_PATH_SEPARATOR")) {
+ YarnSparkHadoopUtil.getClassPathSeparator() should be ("<CPS>")
+ } else if (Utils.isWindows) {
+ YarnSparkHadoopUtil.getClassPathSeparator() should be (";")
+ } else {
+ YarnSparkHadoopUtil.getClassPathSeparator() should be (":")
+ }
+ }
}