aboutsummaryrefslogtreecommitdiff
path: root/resource-managers
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-02-08 12:20:07 +0000
committerSean Owen <sowen@cloudera.com>2017-02-08 12:20:07 +0000
commite8d3fca4502d5f5b8f38525b5fdabe80ccf9a8ec (patch)
treebb3851bea6be9e71f2533e27ee4ca427e36ff3fd /resource-managers
parentd60dde26f98164ae146da1b5f409f4eb7c3621aa (diff)
downloadspark-e8d3fca4502d5f5b8f38525b5fdabe80ccf9a8ec.tar.gz
spark-e8d3fca4502d5f5b8f38525b5fdabe80ccf9a8ec.tar.bz2
spark-e8d3fca4502d5f5b8f38525b5fdabe80ccf9a8ec.zip
[SPARK-19464][CORE][YARN][TEST-HADOOP2.6] Remove support for Hadoop 2.5 and earlier
## What changes were proposed in this pull request? - Remove support for Hadoop 2.5 and earlier - Remove reflection and code constructs only needed to support multiple versions at once - Update docs to reflect newer versions - Remove older versions' builds and profiles. ## How was this patch tested? Existing tests Author: Sean Owen <sowen@cloudera.com> Closes #16810 from srowen/SPARK-19464.
Diffstat (limited to 'resource-managers')
-rw-r--r--resource-managers/yarn/pom.xml22
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala17
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala169
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala10
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala19
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala31
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala53
-rw-r--r--resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala65
-rw-r--r--resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala24
9 files changed, 81 insertions, 329 deletions
diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml
index f090d2427d..92a33de0ae 100644
--- a/resource-managers/yarn/pom.xml
+++ b/resource-managers/yarn/pom.xml
@@ -125,34 +125,12 @@
<scope>test</scope>
</dependency>
- <!--
- See SPARK-3710. hadoop-yarn-server-tests in Hadoop 2.2 fails to pull some needed
- dependencies, so they need to be added manually for the tests to work.
- -->
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- <version>6.1.26</version>
- <exclusions>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- </exclusions>
- <scope>test</scope>
- </dependency>
<!--
Jersey 1 dependencies only required for YARN integration testing. Creating a YARN cluster
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index f79c66b9ff..9df43aea3f 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark._
@@ -460,17 +461,15 @@ private[spark] class ApplicationMaster(
}
failureCount = 0
} catch {
- case i: InterruptedException =>
+ case i: InterruptedException => // do nothing
+ case e: ApplicationAttemptNotFoundException =>
+ failureCount += 1
+ logError("Exception from Reporter thread.", e)
+ finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
+ e.getMessage)
case e: Throwable =>
failureCount += 1
- // this exception was introduced in hadoop 2.4 and this code would not compile
- // with earlier versions if we refer it directly.
- if ("org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException" ==
- e.getClass().getName()) {
- logError("Exception from Reporter thread.", e)
- finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
- e.getMessage)
- } else if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
+ if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
s"$failureCount time(s) from Reporter thread.")
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index b212b0eaaf..635c1ac5e3 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -26,7 +26,6 @@ import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
-import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
import com.google.common.base.Objects
@@ -47,7 +46,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.Records
-import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException}
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
@@ -216,18 +215,7 @@ private[spark] class Client(
appContext.setApplicationType("SPARK")
sparkConf.get(APPLICATION_TAGS).foreach { tags =>
- try {
- // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use
- // reflection to set it, printing a warning if a tag was specified but the YARN version
- // doesn't support it.
- val method = appContext.getClass().getMethod(
- "setApplicationTags", classOf[java.util.Set[String]])
- method.invoke(appContext, new java.util.HashSet[String](tags.asJava))
- } catch {
- case e: NoSuchMethodException =>
- logWarning(s"Ignoring ${APPLICATION_TAGS.key} because this version of " +
- "YARN does not support it")
- }
+ appContext.setApplicationTags(new java.util.HashSet[String](tags.asJava))
}
sparkConf.get(MAX_APP_ATTEMPTS) match {
case Some(v) => appContext.setMaxAppAttempts(v)
@@ -236,15 +224,7 @@ private[spark] class Client(
}
sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
- try {
- val method = appContext.getClass().getMethod(
- "setAttemptFailuresValidityInterval", classOf[Long])
- method.invoke(appContext, interval: java.lang.Long)
- } catch {
- case e: NoSuchMethodException =>
- logWarning(s"Ignoring ${AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " +
- "the version of YARN does not support it")
- }
+ appContext.setAttemptFailuresValidityInterval(interval)
}
val capability = Records.newRecord(classOf[Resource])
@@ -253,53 +233,24 @@ private[spark] class Client(
sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
case Some(expr) =>
- try {
- val amRequest = Records.newRecord(classOf[ResourceRequest])
- amRequest.setResourceName(ResourceRequest.ANY)
- amRequest.setPriority(Priority.newInstance(0))
- amRequest.setCapability(capability)
- amRequest.setNumContainers(1)
- val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String])
- method.invoke(amRequest, expr)
-
- val setResourceRequestMethod =
- appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest])
- setResourceRequestMethod.invoke(appContext, amRequest)
- } catch {
- case e: NoSuchMethodException =>
- logWarning(s"Ignoring ${AM_NODE_LABEL_EXPRESSION.key} because the version " +
- "of YARN does not support it")
- appContext.setResource(capability)
- }
+ val amRequest = Records.newRecord(classOf[ResourceRequest])
+ amRequest.setResourceName(ResourceRequest.ANY)
+ amRequest.setPriority(Priority.newInstance(0))
+ amRequest.setCapability(capability)
+ amRequest.setNumContainers(1)
+ amRequest.setNodeLabelExpression(expr)
+ appContext.setAMContainerResourceRequest(amRequest)
case None =>
appContext.setResource(capability)
}
sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern =>
- try {
- val logAggregationContext = Records.newRecord(
- Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext"))
- .asInstanceOf[Object]
-
- val setRolledLogsIncludePatternMethod =
- logAggregationContext.getClass.getMethod("setRolledLogsIncludePattern", classOf[String])
- setRolledLogsIncludePatternMethod.invoke(logAggregationContext, includePattern)
-
- sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
- val setRolledLogsExcludePatternMethod =
- logAggregationContext.getClass.getMethod("setRolledLogsExcludePattern", classOf[String])
- setRolledLogsExcludePatternMethod.invoke(logAggregationContext, excludePattern)
- }
-
- val setLogAggregationContextMethod =
- appContext.getClass.getMethod("setLogAggregationContext",
- Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext"))
- setLogAggregationContextMethod.invoke(appContext, logAggregationContext)
- } catch {
- case NonFatal(e) =>
- logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +
- s"does not support it", e)
+ val logAggregationContext = Records.newRecord(classOf[LogAggregationContext])
+ logAggregationContext.setRolledLogsIncludePattern(includePattern)
+ sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
+ logAggregationContext.setRolledLogsExcludePattern(excludePattern)
}
+ appContext.setLogAggregationContext(logAggregationContext)
}
appContext
@@ -786,14 +737,12 @@ private[spark] class Client(
val pythonPath = new ListBuffer[String]()
val (pyFiles, pyArchives) = sparkConf.get(PY_FILES).partition(_.endsWith(".py"))
if (pyFiles.nonEmpty) {
- pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
- LOCALIZED_PYTHON_DIR)
+ pythonPath += buildPath(Environment.PWD.$$(), LOCALIZED_PYTHON_DIR)
}
(pySparkArchives ++ pyArchives).foreach { path =>
val uri = Utils.resolveURI(path)
if (uri.getScheme != LOCAL_SCHEME) {
- pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
- new Path(uri).getName())
+ pythonPath += buildPath(Environment.PWD.$$(), new Path(uri).getName())
} else {
pythonPath += uri.getPath()
}
@@ -802,7 +751,7 @@ private[spark] class Client(
// Finally, update the Spark config to propagate PYTHONPATH to the AM and executors.
if (pythonPath.nonEmpty) {
val pythonPathStr = (sys.env.get("PYTHONPATH") ++ pythonPath)
- .mkString(YarnSparkHadoopUtil.getClassPathSeparator)
+ .mkString(ApplicationConstants.CLASS_PATH_SEPARATOR)
env("PYTHONPATH") = pythonPathStr
sparkConf.setExecutorEnv("PYTHONPATH", pythonPathStr)
}
@@ -882,10 +831,7 @@ private[spark] class Client(
// Add Xmx for AM memory
javaOpts += "-Xmx" + amMemory + "m"
- val tmpDir = new Path(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
- YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR
- )
+ val tmpDir = new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
javaOpts += "-Djava.io.tmpdir=" + tmpDir
// TODO: Remove once cpuset version is pushed out.
@@ -982,15 +928,12 @@ private[spark] class Client(
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
}
val amArgs =
- Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++
- userArgs ++ Seq(
- "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
- LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
+ Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
+ Seq("--properties-file", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
// Command for the ApplicationMaster
- val commands = prefixEnv ++ Seq(
- YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server"
- ) ++
+ val commands = prefixEnv ++
+ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
@@ -1265,59 +1208,28 @@ private object Client extends Logging {
private[yarn] def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String])
: Unit = {
val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf)
- for (c <- classPathElementsToAdd.flatten) {
+ classPathElementsToAdd.foreach { c =>
YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim)
}
}
- private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] =
+ private def getYarnAppClasspath(conf: Configuration): Seq[String] =
Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match {
- case Some(s) => Some(s.toSeq)
+ case Some(s) => s.toSeq
case None => getDefaultYarnApplicationClasspath
}
- private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] =
+ private def getMRAppClasspath(conf: Configuration): Seq[String] =
Option(conf.getStrings("mapreduce.application.classpath")) match {
- case Some(s) => Some(s.toSeq)
+ case Some(s) => s.toSeq
case None => getDefaultMRApplicationClasspath
}
- private[yarn] def getDefaultYarnApplicationClasspath: Option[Seq[String]] = {
- val triedDefault = Try[Seq[String]] {
- val field = classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH")
- val value = field.get(null).asInstanceOf[Array[String]]
- value.toSeq
- } recoverWith {
- case e: NoSuchFieldException => Success(Seq.empty[String])
- }
-
- triedDefault match {
- case f: Failure[_] =>
- logError("Unable to obtain the default YARN Application classpath.", f.exception)
- case s: Success[Seq[String]] =>
- logDebug(s"Using the default YARN application classpath: ${s.get.mkString(",")}")
- }
-
- triedDefault.toOption
- }
-
- private[yarn] def getDefaultMRApplicationClasspath: Option[Seq[String]] = {
- val triedDefault = Try[Seq[String]] {
- val field = classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH")
- StringUtils.getStrings(field.get(null).asInstanceOf[String]).toSeq
- } recoverWith {
- case e: NoSuchFieldException => Success(Seq.empty[String])
- }
+ private[yarn] def getDefaultYarnApplicationClasspath: Seq[String] =
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.toSeq
- triedDefault match {
- case f: Failure[_] =>
- logError("Unable to obtain the default MR Application classpath.", f.exception)
- case s: Success[Seq[String]] =>
- logDebug(s"Using the default MR application classpath: ${s.get.mkString(",")}")
- }
-
- triedDefault.toOption
- }
+ private[yarn] def getDefaultMRApplicationClasspath: Seq[String] =
+ StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH).toSeq
/**
* Populate the classpath entry in the given environment map.
@@ -1339,11 +1251,9 @@ private object Client extends Logging {
addClasspathEntry(getClusterPath(sparkConf, cp), env)
}
- addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env)
+ addClasspathEntry(Environment.PWD.$$(), env)
- addClasspathEntry(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
- LOCALIZED_CONF_DIR, env)
+ addClasspathEntry(Environment.PWD.$$() + Path.SEPARATOR + LOCALIZED_CONF_DIR, env)
if (sparkConf.get(USER_CLASS_PATH_FIRST)) {
// in order to properly add the app jar when user classpath is first
@@ -1369,9 +1279,8 @@ private object Client extends Logging {
}
// Add the Spark jars to the classpath, depending on how they were distributed.
- addClasspathEntry(buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
- LOCALIZED_LIB_DIR, "*"), env)
- if (!sparkConf.get(SPARK_ARCHIVE).isDefined) {
+ addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_LIB_DIR, "*"), env)
+ if (sparkConf.get(SPARK_ARCHIVE).isEmpty) {
sparkConf.get(SPARK_JARS).foreach { jars =>
jars.filter(isLocalUri).foreach { jar =>
addClasspathEntry(getClusterPath(sparkConf, jar), env)
@@ -1430,13 +1339,11 @@ private object Client extends Logging {
if (uri != null && uri.getScheme == LOCAL_SCHEME) {
addClasspathEntry(getClusterPath(conf, uri.getPath), env)
} else if (fileName != null) {
- addClasspathEntry(buildPath(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
+ addClasspathEntry(buildPath(Environment.PWD.$$(), fileName), env)
} else if (uri != null) {
val localPath = getQualifiedLocalPath(uri, hadoopConf)
val linkName = Option(uri.getFragment()).getOrElse(localPath.getName())
- addClasspathEntry(buildPath(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env)
+ addClasspathEntry(buildPath(Environment.PWD.$$(), linkName), env)
}
}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 868c2edc5a..b55b4b147b 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -152,10 +152,7 @@ private[yarn] class ExecutorRunnable(
}
javaOpts += "-Djava.io.tmpdir=" +
- new Path(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
- YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR
- )
+ new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
// Certain configs need to be passed here because they are needed before the Executor
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
@@ -206,9 +203,8 @@ private[yarn] class ExecutorRunnable(
}.toSeq
YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
- val commands = prefixEnv ++ Seq(
- YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java",
- "-server") ++
+ val commands = prefixEnv ++
+ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index e498932e51..8a76dbd1bf 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -150,20 +150,6 @@ private[yarn] class YarnAllocator(
private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
- // ContainerRequest constructor that can take a node label expression. We grab it through
- // reflection because it's only available in later versions of YARN.
- private val nodeLabelConstructor = labelExpression.flatMap { expr =>
- try {
- Some(classOf[ContainerRequest].getConstructor(classOf[Resource],
- classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean],
- classOf[String]))
- } catch {
- case e: NoSuchMethodException =>
- logWarning(s"Node label expression $expr will be ignored because YARN version on" +
- " classpath does not support it.")
- None
- }
- }
// A map to store preferred hostname and possible task numbers running on it.
private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
@@ -414,10 +400,7 @@ private[yarn] class YarnAllocator(
resource: Resource,
nodes: Array[String],
racks: Array[String]): ContainerRequest = {
- nodeLabelConstructor.map { constructor =>
- constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, true: java.lang.Boolean,
- labelExpression.orNull)
- }.getOrElse(new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY))
+ new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY, true, labelExpression.orNull)
}
/**
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 53df11eb66..163dfb5a60 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -17,12 +17,8 @@
package org.apache.spark.deploy.yarn
-import java.util.{List => JList}
-
import scala.collection.JavaConverters._
-import scala.util.Try
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
@@ -99,24 +95,11 @@ private[spark] class YarnRMClient extends Logging {
def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] = {
// Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2,
// so not all stable releases have it.
- val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration])
- .invoke(null, conf).asInstanceOf[String]).getOrElse("http://")
-
- // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses.
- try {
- val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
- classOf[Configuration])
- val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
- val hosts = proxies.asScala.map { proxy => proxy.split(":")(0) }
- val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase }
- Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
- } catch {
- case e: NoSuchMethodException =>
- val proxy = WebAppUtils.getProxyHostAndPort(conf)
- val parts = proxy.split(":")
- val uriBase = prefix + proxy + proxyBase
- Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
- }
+ val prefix = WebAppUtils.getHttpSchemePrefix(conf)
+ val proxies = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf)
+ val hosts = proxies.asScala.map(_.split(":").head)
+ val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase }
+ Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
}
/** Returns the maximum number of attempts to register the AM. */
@@ -124,12 +107,10 @@ private[spark] class YarnRMClient extends Logging {
val sparkMaxAttempts = sparkConf.get(MAX_APP_ATTEMPTS).map(_.toInt)
val yarnMaxAttempts = yarnConf.getInt(
YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
- val retval: Int = sparkMaxAttempts match {
+ sparkMaxAttempts match {
case Some(x) => if (x <= yarnMaxAttempts) x else yarnMaxAttempts
case None => yarnMaxAttempts
}
-
- retval
}
}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index cc53b1b06e..9357885512 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -17,13 +17,11 @@
package org.apache.spark.deploy.yarn
-import java.io.File
import java.nio.charset.StandardCharsets.UTF_8
import java.util.regex.Matcher
import java.util.regex.Pattern
import scala.collection.mutable.{HashMap, ListBuffer}
-import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Text
@@ -31,7 +29,6 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils
@@ -137,7 +134,12 @@ object YarnSparkHadoopUtil {
* If the map already contains this key, append the value to the existing value instead.
*/
def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = {
- val newValue = if (env.contains(key)) { env(key) + getClassPathSeparator + value } else value
+ val newValue =
+ if (env.contains(key)) {
+ env(key) + ApplicationConstants.CLASS_PATH_SEPARATOR + value
+ } else {
+ value
+ }
env.put(key, newValue)
}
@@ -156,8 +158,8 @@ object YarnSparkHadoopUtil {
while (m.find()) {
val variable = m.group(1)
var replace = ""
- if (env.get(variable) != None) {
- replace = env.get(variable).get
+ if (env.contains(variable)) {
+ replace = env(variable)
} else {
// if this key is not configured for the child .. get it from the env
replace = System.getenv(variable)
@@ -235,13 +237,11 @@ object YarnSparkHadoopUtil {
YarnCommandBuilderUtils.quoteForBatchScript(arg)
} else {
val escaped = new StringBuilder("'")
- for (i <- 0 to arg.length() - 1) {
- arg.charAt(i) match {
- case '$' => escaped.append("\\$")
- case '"' => escaped.append("\\\"")
- case '\'' => escaped.append("'\\''")
- case c => escaped.append(c)
- }
+ arg.foreach {
+ case '$' => escaped.append("\\$")
+ case '"' => escaped.append("\\\"")
+ case '\'' => escaped.append("'\\''")
+ case c => escaped.append(c)
}
escaped.append("'").toString()
}
@@ -263,33 +263,6 @@ object YarnSparkHadoopUtil {
}
/**
- * Expand environment variable using Yarn API.
- * If environment.$$() is implemented, return the result of it.
- * Otherwise, return the result of environment.$()
- * Note: $$() is added in Hadoop 2.4.
- */
- private lazy val expandMethod =
- Try(classOf[Environment].getMethod("$$"))
- .getOrElse(classOf[Environment].getMethod("$"))
-
- def expandEnvironment(environment: Environment): String =
- expandMethod.invoke(environment).asInstanceOf[String]
-
- /**
- * Get class path separator using Yarn API.
- * If ApplicationConstants.CLASS_PATH_SEPARATOR is implemented, return it.
- * Otherwise, return File.pathSeparator
- * Note: CLASS_PATH_SEPARATOR is added in Hadoop 2.4.
- */
- private lazy val classPathSeparatorField =
- Try(classOf[ApplicationConstants].getField("CLASS_PATH_SEPARATOR"))
- .getOrElse(classOf[File].getField("pathSeparator"))
-
- def getClassPathSeparator(): String = {
- classPathSeparatorField.get(null).asInstanceOf[String]
- }
-
- /**
* Getting the initial target number of executors depends on whether dynamic allocation is
* enabled.
* If not using dynamic allocation it gets the number of executors requested by the user.
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 7deaf0af94..dd2180a0f5 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -23,8 +23,6 @@ import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.reflect.ClassTag
-import scala.util.Try
import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
@@ -67,19 +65,18 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
}
test("default Yarn application classpath") {
- getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP))
+ getDefaultYarnApplicationClasspath should be(Fixtures.knownDefYarnAppCP)
}
test("default MR application classpath") {
- getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP))
+ getDefaultMRApplicationClasspath should be(Fixtures.knownDefMRAppCP)
}
test("resultant classpath for an application that defines a classpath for YARN") {
withAppConf(Fixtures.mapYARNAppConf) { conf =>
val env = newEnv
populateHadoopClasspath(conf, env)
- classpath(env) should be(
- flatten(Fixtures.knownYARNAppCP, getDefaultMRApplicationClasspath))
+ classpath(env) should be(Fixtures.knownYARNAppCP +: getDefaultMRApplicationClasspath)
}
}
@@ -87,8 +84,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
withAppConf(Fixtures.mapMRAppConf) { conf =>
val env = newEnv
populateHadoopClasspath(conf, env)
- classpath(env) should be(
- flatten(getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP))
+ classpath(env) should be(getDefaultYarnApplicationClasspath :+ Fixtures.knownMRAppCP)
}
}
@@ -96,7 +92,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
withAppConf(Fixtures.mapAppConf) { conf =>
val env = newEnv
populateHadoopClasspath(conf, env)
- classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP))
+ classpath(env) should be(Array(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP))
}
}
@@ -104,14 +100,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
private val USER = "local:/userJar"
private val ADDED = "local:/addJar1,local:/addJar2,/addJar3"
- private val PWD =
- if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
- "{{PWD}}"
- } else if (Utils.isWindows) {
- "%PWD%"
- } else {
- Environment.PWD.$()
- }
+ private val PWD = "{{PWD}}"
test("Local jar URIs") {
val conf = new Configuration()
@@ -388,26 +377,18 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
object Fixtures {
val knownDefYarnAppCP: Seq[String] =
- getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration],
- "DEFAULT_YARN_APPLICATION_CLASSPATH",
- Seq[String]())(a => a.toSeq)
-
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.toSeq
val knownDefMRAppCP: Seq[String] =
- getFieldValue2[String, Array[String], Seq[String]](
- classOf[MRJobConfig],
- "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH",
- Seq[String]())(a => a.split(","))(a => a.toSeq)
+ MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH.split(",").toSeq
- val knownYARNAppCP = Some(Seq("/known/yarn/path"))
+ val knownYARNAppCP = "/known/yarn/path"
- val knownMRAppCP = Some(Seq("/known/mr/path"))
+ val knownMRAppCP = "/known/mr/path"
- val mapMRAppConf =
- Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get)
+ val mapMRAppConf = Map("mapreduce.application.classpath" -> knownMRAppCP)
- val mapYARNAppConf =
- Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get)
+ val mapYARNAppConf = Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP)
val mapAppConf = mapYARNAppConf ++ mapMRAppConf
}
@@ -423,28 +404,6 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
def classpath(env: MutableHashMap[String, String]): Array[String] =
env(Environment.CLASSPATH.name).split(":|;|<CPS>")
- def flatten(a: Option[Seq[String]], b: Option[Seq[String]]): Array[String] =
- (a ++ b).flatten.toArray
-
- def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B = {
- Try(clazz.getField(field))
- .map(_.get(null).asInstanceOf[A])
- .toOption
- .map(mapTo)
- .getOrElse(defaults)
- }
-
- def getFieldValue2[A: ClassTag, A1: ClassTag, B](
- clazz: Class[_],
- field: String,
- defaults: => B)(mapTo: A => B)(mapTo1: A1 => B): B = {
- Try(clazz.getField(field)).map(_.get(null)).map {
- case v: A => mapTo(v)
- case v1: A1 => mapTo1(v1)
- case _ => defaults
- }.toOption.getOrElse(defaults)
- }
-
private def createClient(
sparkConf: SparkConf,
conf: Configuration = new Configuration(),
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 7fbbe12609..a057618b39 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -22,8 +22,6 @@ import java.nio.charset.StandardCharsets
import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.io.Text
-import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.Matchers
@@ -147,28 +145,6 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
}
- test("test expandEnvironment result") {
- val target = Environment.PWD
- if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
- YarnSparkHadoopUtil.expandEnvironment(target) should be ("{{" + target + "}}")
- } else if (Utils.isWindows) {
- YarnSparkHadoopUtil.expandEnvironment(target) should be ("%" + target + "%")
- } else {
- YarnSparkHadoopUtil.expandEnvironment(target) should be ("$" + target)
- }
-
- }
-
- test("test getClassPathSeparator result") {
- if (classOf[ApplicationConstants].getFields().exists(_.getName == "CLASS_PATH_SEPARATOR")) {
- YarnSparkHadoopUtil.getClassPathSeparator() should be ("<CPS>")
- } else if (Utils.isWindows) {
- YarnSparkHadoopUtil.getClassPathSeparator() should be (";")
- } else {
- YarnSparkHadoopUtil.getClassPathSeparator() should be (":")
- }
- }
-
test("check different hadoop utils based on env variable") {
try {
System.setProperty("SPARK_YARN_MODE", "true")