diff options
author | Sean Owen <sowen@cloudera.com> | 2017-02-08 12:20:07 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2017-02-08 12:20:07 +0000 |
commit | e8d3fca4502d5f5b8f38525b5fdabe80ccf9a8ec (patch) | |
tree | bb3851bea6be9e71f2533e27ee4ca427e36ff3fd /resource-managers | |
parent | d60dde26f98164ae146da1b5f409f4eb7c3621aa (diff) | |
download | spark-e8d3fca4502d5f5b8f38525b5fdabe80ccf9a8ec.tar.gz spark-e8d3fca4502d5f5b8f38525b5fdabe80ccf9a8ec.tar.bz2 spark-e8d3fca4502d5f5b8f38525b5fdabe80ccf9a8ec.zip |
[SPARK-19464][CORE][YARN][TEST-HADOOP2.6] Remove support for Hadoop 2.5 and earlier
## What changes were proposed in this pull request?
- Remove support for Hadoop 2.5 and earlier
- Remove reflection and code constructs only needed to support multiple versions at once
- Update docs to reflect newer versions
- Remove older versions' builds and profiles.
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes #16810 from srowen/SPARK-19464.
Diffstat (limited to 'resource-managers')
9 files changed, 81 insertions, 329 deletions
diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml index f090d2427d..92a33de0ae 100644 --- a/resource-managers/yarn/pom.xml +++ b/resource-managers/yarn/pom.xml @@ -125,34 +125,12 @@ <scope>test</scope> </dependency> - <!-- - See SPARK-3710. hadoop-yarn-server-tests in Hadoop 2.2 fails to pull some needed - dependencies, so they need to be added manually for the tests to work. - --> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-tests</artifactId> <classifier>tests</classifier> <scope>test</scope> </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - <version>6.1.26</version> - <exclusions> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - </exclusions> - <scope>test</scope> - </dependency> <!-- Jersey 1 dependencies only required for YARN integration testing. Creating a YARN cluster diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index f79c66b9ff..9df43aea3f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark._ @@ -460,17 +461,15 @@ private[spark] class ApplicationMaster( } failureCount = 0 } catch { - case i: InterruptedException => + case i: InterruptedException => // do nothing + case e: ApplicationAttemptNotFoundException => + failureCount += 1 + logError("Exception from Reporter thread.", e) + finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, + e.getMessage) case e: Throwable => failureCount += 1 - // this exception was introduced in hadoop 2.4 and this code would not compile - // with earlier versions if we refer it directly. - if ("org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException" == - e.getClass().getName()) { - logError("Exception from Reporter thread.", e) - finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, - e.getMessage) - } else if (!NonFatal(e) || failureCount >= reporterMaxFailures) { + if (!NonFatal(e) || failureCount >= reporterMaxFailures) { finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " + s"$failureCount time(s) from Reporter thread.") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b212b0eaaf..635c1ac5e3 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -26,7 +26,6 @@ import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} -import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal import com.google.common.base.Objects @@ -47,7 +46,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.util.Records -import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException} +import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.yarn.config._ import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager @@ -216,18 +215,7 @@ private[spark] class Client( appContext.setApplicationType("SPARK") sparkConf.get(APPLICATION_TAGS).foreach { tags => - try { - // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use - // reflection to set it, printing a warning if a tag was specified but the YARN version - // doesn't support it. - val method = appContext.getClass().getMethod( - "setApplicationTags", classOf[java.util.Set[String]]) - method.invoke(appContext, new java.util.HashSet[String](tags.asJava)) - } catch { - case e: NoSuchMethodException => - logWarning(s"Ignoring ${APPLICATION_TAGS.key} because this version of " + - "YARN does not support it") - } + appContext.setApplicationTags(new java.util.HashSet[String](tags.asJava)) } sparkConf.get(MAX_APP_ATTEMPTS) match { case Some(v) => appContext.setMaxAppAttempts(v) @@ -236,15 +224,7 @@ private[spark] class Client( } sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval => - try { - val method = appContext.getClass().getMethod( - "setAttemptFailuresValidityInterval", classOf[Long]) - method.invoke(appContext, interval: java.lang.Long) - } catch { - case e: NoSuchMethodException => - logWarning(s"Ignoring ${AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " + - "the version of YARN does not support it") - } + appContext.setAttemptFailuresValidityInterval(interval) } val capability = Records.newRecord(classOf[Resource]) @@ -253,53 +233,24 @@ private[spark] class Client( sparkConf.get(AM_NODE_LABEL_EXPRESSION) match { case Some(expr) => - try { - val amRequest = Records.newRecord(classOf[ResourceRequest]) - amRequest.setResourceName(ResourceRequest.ANY) - amRequest.setPriority(Priority.newInstance(0)) - amRequest.setCapability(capability) - amRequest.setNumContainers(1) - val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String]) - method.invoke(amRequest, expr) - - val setResourceRequestMethod = - appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest]) - setResourceRequestMethod.invoke(appContext, amRequest) - } catch { - case e: NoSuchMethodException => - logWarning(s"Ignoring ${AM_NODE_LABEL_EXPRESSION.key} because the version " + - "of YARN does not support it") - appContext.setResource(capability) - } + val amRequest = Records.newRecord(classOf[ResourceRequest]) + amRequest.setResourceName(ResourceRequest.ANY) + amRequest.setPriority(Priority.newInstance(0)) + amRequest.setCapability(capability) + amRequest.setNumContainers(1) + amRequest.setNodeLabelExpression(expr) + appContext.setAMContainerResourceRequest(amRequest) case None => appContext.setResource(capability) } sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern => - try { - val logAggregationContext = Records.newRecord( - Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext")) - .asInstanceOf[Object] - - val setRolledLogsIncludePatternMethod = - logAggregationContext.getClass.getMethod("setRolledLogsIncludePattern", classOf[String]) - setRolledLogsIncludePatternMethod.invoke(logAggregationContext, includePattern) - - sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern => - val setRolledLogsExcludePatternMethod = - logAggregationContext.getClass.getMethod("setRolledLogsExcludePattern", classOf[String]) - setRolledLogsExcludePatternMethod.invoke(logAggregationContext, excludePattern) - } - - val setLogAggregationContextMethod = - appContext.getClass.getMethod("setLogAggregationContext", - Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext")) - setLogAggregationContextMethod.invoke(appContext, logAggregationContext) - } catch { - case NonFatal(e) => - logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " + - s"does not support it", e) + val logAggregationContext = Records.newRecord(classOf[LogAggregationContext]) + logAggregationContext.setRolledLogsIncludePattern(includePattern) + sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern => + logAggregationContext.setRolledLogsExcludePattern(excludePattern) } + appContext.setLogAggregationContext(logAggregationContext) } appContext @@ -786,14 +737,12 @@ private[spark] class Client( val pythonPath = new ListBuffer[String]() val (pyFiles, pyArchives) = sparkConf.get(PY_FILES).partition(_.endsWith(".py")) if (pyFiles.nonEmpty) { - pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - LOCALIZED_PYTHON_DIR) + pythonPath += buildPath(Environment.PWD.$$(), LOCALIZED_PYTHON_DIR) } (pySparkArchives ++ pyArchives).foreach { path => val uri = Utils.resolveURI(path) if (uri.getScheme != LOCAL_SCHEME) { - pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - new Path(uri).getName()) + pythonPath += buildPath(Environment.PWD.$$(), new Path(uri).getName()) } else { pythonPath += uri.getPath() } @@ -802,7 +751,7 @@ private[spark] class Client( // Finally, update the Spark config to propagate PYTHONPATH to the AM and executors. if (pythonPath.nonEmpty) { val pythonPathStr = (sys.env.get("PYTHONPATH") ++ pythonPath) - .mkString(YarnSparkHadoopUtil.getClassPathSeparator) + .mkString(ApplicationConstants.CLASS_PATH_SEPARATOR) env("PYTHONPATH") = pythonPathStr sparkConf.setExecutorEnv("PYTHONPATH", pythonPathStr) } @@ -882,10 +831,7 @@ private[spark] class Client( // Add Xmx for AM memory javaOpts += "-Xmx" + amMemory + "m" - val tmpDir = new Path( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR - ) + val tmpDir = new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) javaOpts += "-Djava.io.tmpdir=" + tmpDir // TODO: Remove once cpuset version is pushed out. @@ -982,15 +928,12 @@ private[spark] class Client( Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg)) } val amArgs = - Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ - userArgs ++ Seq( - "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) + Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++ + Seq("--properties-file", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) // Command for the ApplicationMaster - val commands = prefixEnv ++ Seq( - YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server" - ) ++ + val commands = prefixEnv ++ + Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ amArgs ++ Seq( "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", @@ -1265,59 +1208,28 @@ private object Client extends Logging { private[yarn] def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) : Unit = { val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf) - for (c <- classPathElementsToAdd.flatten) { + classPathElementsToAdd.foreach { c => YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim) } } - private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] = + private def getYarnAppClasspath(conf: Configuration): Seq[String] = Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match { - case Some(s) => Some(s.toSeq) + case Some(s) => s.toSeq case None => getDefaultYarnApplicationClasspath } - private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] = + private def getMRAppClasspath(conf: Configuration): Seq[String] = Option(conf.getStrings("mapreduce.application.classpath")) match { - case Some(s) => Some(s.toSeq) + case Some(s) => s.toSeq case None => getDefaultMRApplicationClasspath } - private[yarn] def getDefaultYarnApplicationClasspath: Option[Seq[String]] = { - val triedDefault = Try[Seq[String]] { - val field = classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH") - val value = field.get(null).asInstanceOf[Array[String]] - value.toSeq - } recoverWith { - case e: NoSuchFieldException => Success(Seq.empty[String]) - } - - triedDefault match { - case f: Failure[_] => - logError("Unable to obtain the default YARN Application classpath.", f.exception) - case s: Success[Seq[String]] => - logDebug(s"Using the default YARN application classpath: ${s.get.mkString(",")}") - } - - triedDefault.toOption - } - - private[yarn] def getDefaultMRApplicationClasspath: Option[Seq[String]] = { - val triedDefault = Try[Seq[String]] { - val field = classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH") - StringUtils.getStrings(field.get(null).asInstanceOf[String]).toSeq - } recoverWith { - case e: NoSuchFieldException => Success(Seq.empty[String]) - } + private[yarn] def getDefaultYarnApplicationClasspath: Seq[String] = + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.toSeq - triedDefault match { - case f: Failure[_] => - logError("Unable to obtain the default MR Application classpath.", f.exception) - case s: Success[Seq[String]] => - logDebug(s"Using the default MR application classpath: ${s.get.mkString(",")}") - } - - triedDefault.toOption - } + private[yarn] def getDefaultMRApplicationClasspath: Seq[String] = + StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH).toSeq /** * Populate the classpath entry in the given environment map. @@ -1339,11 +1251,9 @@ private object Client extends Logging { addClasspathEntry(getClusterPath(sparkConf, cp), env) } - addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env) + addClasspathEntry(Environment.PWD.$$(), env) - addClasspathEntry( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + - LOCALIZED_CONF_DIR, env) + addClasspathEntry(Environment.PWD.$$() + Path.SEPARATOR + LOCALIZED_CONF_DIR, env) if (sparkConf.get(USER_CLASS_PATH_FIRST)) { // in order to properly add the app jar when user classpath is first @@ -1369,9 +1279,8 @@ private object Client extends Logging { } // Add the Spark jars to the classpath, depending on how they were distributed. - addClasspathEntry(buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - LOCALIZED_LIB_DIR, "*"), env) - if (!sparkConf.get(SPARK_ARCHIVE).isDefined) { + addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_LIB_DIR, "*"), env) + if (sparkConf.get(SPARK_ARCHIVE).isEmpty) { sparkConf.get(SPARK_JARS).foreach { jars => jars.filter(isLocalUri).foreach { jar => addClasspathEntry(getClusterPath(sparkConf, jar), env) @@ -1430,13 +1339,11 @@ private object Client extends Logging { if (uri != null && uri.getScheme == LOCAL_SCHEME) { addClasspathEntry(getClusterPath(conf, uri.getPath), env) } else if (fileName != null) { - addClasspathEntry(buildPath( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env) + addClasspathEntry(buildPath(Environment.PWD.$$(), fileName), env) } else if (uri != null) { val localPath = getQualifiedLocalPath(uri, hadoopConf) val linkName = Option(uri.getFragment()).getOrElse(localPath.getName()) - addClasspathEntry(buildPath( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env) + addClasspathEntry(buildPath(Environment.PWD.$$(), linkName), env) } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 868c2edc5a..b55b4b147b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -152,10 +152,7 @@ private[yarn] class ExecutorRunnable( } javaOpts += "-Djava.io.tmpdir=" + - new Path( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR - ) + new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) // Certain configs need to be passed here because they are needed before the Executor // registers with the Scheduler and transfers the spark configs. Since the Executor backend @@ -206,9 +203,8 @@ private[yarn] class ExecutorRunnable( }.toSeq YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) - val commands = prefixEnv ++ Seq( - YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", - "-server") ++ + val commands = prefixEnv ++ + Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", "--driver-url", masterAddress, diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index e498932e51..8a76dbd1bf 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -150,20 +150,6 @@ private[yarn] class YarnAllocator( private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION) - // ContainerRequest constructor that can take a node label expression. We grab it through - // reflection because it's only available in later versions of YARN. - private val nodeLabelConstructor = labelExpression.flatMap { expr => - try { - Some(classOf[ContainerRequest].getConstructor(classOf[Resource], - classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean], - classOf[String])) - } catch { - case e: NoSuchMethodException => - logWarning(s"Node label expression $expr will be ignored because YARN version on" + - " classpath does not support it.") - None - } - } // A map to store preferred hostname and possible task numbers running on it. private var hostToLocalTaskCounts: Map[String, Int] = Map.empty @@ -414,10 +400,7 @@ private[yarn] class YarnAllocator( resource: Resource, nodes: Array[String], racks: Array[String]): ContainerRequest = { - nodeLabelConstructor.map { constructor => - constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, true: java.lang.Boolean, - labelExpression.orNull) - }.getOrElse(new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY)) + new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY, true, labelExpression.orNull) } /** diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 53df11eb66..163dfb5a60 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -17,12 +17,8 @@ package org.apache.spark.deploy.yarn -import java.util.{List => JList} - import scala.collection.JavaConverters._ -import scala.util.Try -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest @@ -99,24 +95,11 @@ private[spark] class YarnRMClient extends Logging { def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] = { // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2, // so not all stable releases have it. - val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration]) - .invoke(null, conf).asInstanceOf[String]).getOrElse("http://") - - // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses. - try { - val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter", - classOf[Configuration]) - val proxies = method.invoke(null, conf).asInstanceOf[JList[String]] - val hosts = proxies.asScala.map { proxy => proxy.split(":")(0) } - val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase } - Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) - } catch { - case e: NoSuchMethodException => - val proxy = WebAppUtils.getProxyHostAndPort(conf) - val parts = proxy.split(":") - val uriBase = prefix + proxy + proxyBase - Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase) - } + val prefix = WebAppUtils.getHttpSchemePrefix(conf) + val proxies = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf) + val hosts = proxies.asScala.map(_.split(":").head) + val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase } + Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) } /** Returns the maximum number of attempts to register the AM. */ @@ -124,12 +107,10 @@ private[spark] class YarnRMClient extends Logging { val sparkMaxAttempts = sparkConf.get(MAX_APP_ATTEMPTS).map(_.toInt) val yarnMaxAttempts = yarnConf.getInt( YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) - val retval: Int = sparkMaxAttempts match { + sparkMaxAttempts match { case Some(x) => if (x <= yarnMaxAttempts) x else yarnMaxAttempts case None => yarnMaxAttempts } - - retval } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index cc53b1b06e..9357885512 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -17,13 +17,11 @@ package org.apache.spark.deploy.yarn -import java.io.File import java.nio.charset.StandardCharsets.UTF_8 import java.util.regex.Matcher import java.util.regex.Pattern import scala.collection.mutable.{HashMap, ListBuffer} -import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.Text @@ -31,7 +29,6 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.ApplicationConstants -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils @@ -137,7 +134,12 @@ object YarnSparkHadoopUtil { * If the map already contains this key, append the value to the existing value instead. */ def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = { - val newValue = if (env.contains(key)) { env(key) + getClassPathSeparator + value } else value + val newValue = + if (env.contains(key)) { + env(key) + ApplicationConstants.CLASS_PATH_SEPARATOR + value + } else { + value + } env.put(key, newValue) } @@ -156,8 +158,8 @@ object YarnSparkHadoopUtil { while (m.find()) { val variable = m.group(1) var replace = "" - if (env.get(variable) != None) { - replace = env.get(variable).get + if (env.contains(variable)) { + replace = env(variable) } else { // if this key is not configured for the child .. get it from the env replace = System.getenv(variable) @@ -235,13 +237,11 @@ object YarnSparkHadoopUtil { YarnCommandBuilderUtils.quoteForBatchScript(arg) } else { val escaped = new StringBuilder("'") - for (i <- 0 to arg.length() - 1) { - arg.charAt(i) match { - case '$' => escaped.append("\\$") - case '"' => escaped.append("\\\"") - case '\'' => escaped.append("'\\''") - case c => escaped.append(c) - } + arg.foreach { + case '$' => escaped.append("\\$") + case '"' => escaped.append("\\\"") + case '\'' => escaped.append("'\\''") + case c => escaped.append(c) } escaped.append("'").toString() } @@ -263,33 +263,6 @@ object YarnSparkHadoopUtil { } /** - * Expand environment variable using Yarn API. - * If environment.$$() is implemented, return the result of it. - * Otherwise, return the result of environment.$() - * Note: $$() is added in Hadoop 2.4. - */ - private lazy val expandMethod = - Try(classOf[Environment].getMethod("$$")) - .getOrElse(classOf[Environment].getMethod("$")) - - def expandEnvironment(environment: Environment): String = - expandMethod.invoke(environment).asInstanceOf[String] - - /** - * Get class path separator using Yarn API. - * If ApplicationConstants.CLASS_PATH_SEPARATOR is implemented, return it. - * Otherwise, return File.pathSeparator - * Note: CLASS_PATH_SEPARATOR is added in Hadoop 2.4. - */ - private lazy val classPathSeparatorField = - Try(classOf[ApplicationConstants].getField("CLASS_PATH_SEPARATOR")) - .getOrElse(classOf[File].getField("pathSeparator")) - - def getClassPathSeparator(): String = { - classPathSeparatorField.get(null).asInstanceOf[String] - } - - /** * Getting the initial target number of executors depends on whether dynamic allocation is * enabled. * If not using dynamic allocation it gets the number of executors requested by the user. diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 7deaf0af94..dd2180a0f5 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -23,8 +23,6 @@ import java.util.Properties import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap => MutableHashMap} -import scala.reflect.ClassTag -import scala.util.Try import org.apache.commons.lang3.SerializationUtils import org.apache.hadoop.conf.Configuration @@ -67,19 +65,18 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll } test("default Yarn application classpath") { - getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP)) + getDefaultYarnApplicationClasspath should be(Fixtures.knownDefYarnAppCP) } test("default MR application classpath") { - getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP)) + getDefaultMRApplicationClasspath should be(Fixtures.knownDefMRAppCP) } test("resultant classpath for an application that defines a classpath for YARN") { withAppConf(Fixtures.mapYARNAppConf) { conf => val env = newEnv populateHadoopClasspath(conf, env) - classpath(env) should be( - flatten(Fixtures.knownYARNAppCP, getDefaultMRApplicationClasspath)) + classpath(env) should be(Fixtures.knownYARNAppCP +: getDefaultMRApplicationClasspath) } } @@ -87,8 +84,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll withAppConf(Fixtures.mapMRAppConf) { conf => val env = newEnv populateHadoopClasspath(conf, env) - classpath(env) should be( - flatten(getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP)) + classpath(env) should be(getDefaultYarnApplicationClasspath :+ Fixtures.knownMRAppCP) } } @@ -96,7 +92,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll withAppConf(Fixtures.mapAppConf) { conf => val env = newEnv populateHadoopClasspath(conf, env) - classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP)) + classpath(env) should be(Array(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP)) } } @@ -104,14 +100,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll private val USER = "local:/userJar" private val ADDED = "local:/addJar1,local:/addJar2,/addJar3" - private val PWD = - if (classOf[Environment].getMethods().exists(_.getName == "$$")) { - "{{PWD}}" - } else if (Utils.isWindows) { - "%PWD%" - } else { - Environment.PWD.$() - } + private val PWD = "{{PWD}}" test("Local jar URIs") { val conf = new Configuration() @@ -388,26 +377,18 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll object Fixtures { val knownDefYarnAppCP: Seq[String] = - getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration], - "DEFAULT_YARN_APPLICATION_CLASSPATH", - Seq[String]())(a => a.toSeq) - + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.toSeq val knownDefMRAppCP: Seq[String] = - getFieldValue2[String, Array[String], Seq[String]]( - classOf[MRJobConfig], - "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH", - Seq[String]())(a => a.split(","))(a => a.toSeq) + MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH.split(",").toSeq - val knownYARNAppCP = Some(Seq("/known/yarn/path")) + val knownYARNAppCP = "/known/yarn/path" - val knownMRAppCP = Some(Seq("/known/mr/path")) + val knownMRAppCP = "/known/mr/path" - val mapMRAppConf = - Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get) + val mapMRAppConf = Map("mapreduce.application.classpath" -> knownMRAppCP) - val mapYARNAppConf = - Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get) + val mapYARNAppConf = Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP) val mapAppConf = mapYARNAppConf ++ mapMRAppConf } @@ -423,28 +404,6 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll def classpath(env: MutableHashMap[String, String]): Array[String] = env(Environment.CLASSPATH.name).split(":|;|<CPS>") - def flatten(a: Option[Seq[String]], b: Option[Seq[String]]): Array[String] = - (a ++ b).flatten.toArray - - def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B = { - Try(clazz.getField(field)) - .map(_.get(null).asInstanceOf[A]) - .toOption - .map(mapTo) - .getOrElse(defaults) - } - - def getFieldValue2[A: ClassTag, A1: ClassTag, B]( - clazz: Class[_], - field: String, - defaults: => B)(mapTo: A => B)(mapTo1: A1 => B): B = { - Try(clazz.getField(field)).map(_.get(null)).map { - case v: A => mapTo(v) - case v1: A1 => mapTo1(v1) - case _ => defaults - }.toOption.getOrElse(defaults) - } - private def createClient( sparkConf: SparkConf, conf: Configuration = new Configuration(), diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 7fbbe12609..a057618b39 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -22,8 +22,6 @@ import java.nio.charset.StandardCharsets import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.io.Text -import org.apache.hadoop.yarn.api.ApplicationConstants -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.records.ApplicationAccessType import org.apache.hadoop.yarn.conf.YarnConfiguration import org.scalatest.Matchers @@ -147,28 +145,6 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging } - test("test expandEnvironment result") { - val target = Environment.PWD - if (classOf[Environment].getMethods().exists(_.getName == "$$")) { - YarnSparkHadoopUtil.expandEnvironment(target) should be ("{{" + target + "}}") - } else if (Utils.isWindows) { - YarnSparkHadoopUtil.expandEnvironment(target) should be ("%" + target + "%") - } else { - YarnSparkHadoopUtil.expandEnvironment(target) should be ("$" + target) - } - - } - - test("test getClassPathSeparator result") { - if (classOf[ApplicationConstants].getFields().exists(_.getName == "CLASS_PATH_SEPARATOR")) { - YarnSparkHadoopUtil.getClassPathSeparator() should be ("<CPS>") - } else if (Utils.isWindows) { - YarnSparkHadoopUtil.getClassPathSeparator() should be (";") - } else { - YarnSparkHadoopUtil.getClassPathSeparator() should be (":") - } - } - test("check different hadoop utils based on env variable") { try { System.setProperty("SPARK_YARN_MODE", "true") |