diff options
Diffstat (limited to 'resource-managers/yarn/src/main')
6 files changed, 69 insertions, 230 deletions
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index f79c66b9ff..9df43aea3f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark._ @@ -460,17 +461,15 @@ private[spark] class ApplicationMaster( } failureCount = 0 } catch { - case i: InterruptedException => + case i: InterruptedException => // do nothing + case e: ApplicationAttemptNotFoundException => + failureCount += 1 + logError("Exception from Reporter thread.", e) + finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, + e.getMessage) case e: Throwable => failureCount += 1 - // this exception was introduced in hadoop 2.4 and this code would not compile - // with earlier versions if we refer it directly. - if ("org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException" == - e.getClass().getName()) { - logError("Exception from Reporter thread.", e) - finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, - e.getMessage) - } else if (!NonFatal(e) || failureCount >= reporterMaxFailures) { + if (!NonFatal(e) || failureCount >= reporterMaxFailures) { finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " + s"$failureCount time(s) from Reporter thread.") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b212b0eaaf..635c1ac5e3 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -26,7 +26,6 @@ import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} -import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal import com.google.common.base.Objects @@ -47,7 +46,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.util.Records -import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException} +import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.yarn.config._ import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager @@ -216,18 +215,7 @@ private[spark] class Client( appContext.setApplicationType("SPARK") sparkConf.get(APPLICATION_TAGS).foreach { tags => - try { - // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use - // reflection to set it, printing a warning if a tag was specified but the YARN version - // doesn't support it. - val method = appContext.getClass().getMethod( - "setApplicationTags", classOf[java.util.Set[String]]) - method.invoke(appContext, new java.util.HashSet[String](tags.asJava)) - } catch { - case e: NoSuchMethodException => - logWarning(s"Ignoring ${APPLICATION_TAGS.key} because this version of " + - "YARN does not support it") - } + appContext.setApplicationTags(new java.util.HashSet[String](tags.asJava)) } sparkConf.get(MAX_APP_ATTEMPTS) match { case Some(v) => appContext.setMaxAppAttempts(v) @@ -236,15 +224,7 @@ private[spark] class Client( } sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval => - try { - val method = appContext.getClass().getMethod( - "setAttemptFailuresValidityInterval", classOf[Long]) - method.invoke(appContext, interval: java.lang.Long) - } catch { - case e: NoSuchMethodException => - logWarning(s"Ignoring ${AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " + - "the version of YARN does not support it") - } + appContext.setAttemptFailuresValidityInterval(interval) } val capability = Records.newRecord(classOf[Resource]) @@ -253,53 +233,24 @@ private[spark] class Client( sparkConf.get(AM_NODE_LABEL_EXPRESSION) match { case Some(expr) => - try { - val amRequest = Records.newRecord(classOf[ResourceRequest]) - amRequest.setResourceName(ResourceRequest.ANY) - amRequest.setPriority(Priority.newInstance(0)) - amRequest.setCapability(capability) - amRequest.setNumContainers(1) - val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String]) - method.invoke(amRequest, expr) - - val setResourceRequestMethod = - appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest]) - setResourceRequestMethod.invoke(appContext, amRequest) - } catch { - case e: NoSuchMethodException => - logWarning(s"Ignoring ${AM_NODE_LABEL_EXPRESSION.key} because the version " + - "of YARN does not support it") - appContext.setResource(capability) - } + val amRequest = Records.newRecord(classOf[ResourceRequest]) + amRequest.setResourceName(ResourceRequest.ANY) + amRequest.setPriority(Priority.newInstance(0)) + amRequest.setCapability(capability) + amRequest.setNumContainers(1) + amRequest.setNodeLabelExpression(expr) + appContext.setAMContainerResourceRequest(amRequest) case None => appContext.setResource(capability) } sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern => - try { - val logAggregationContext = Records.newRecord( - Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext")) - .asInstanceOf[Object] - - val setRolledLogsIncludePatternMethod = - logAggregationContext.getClass.getMethod("setRolledLogsIncludePattern", classOf[String]) - setRolledLogsIncludePatternMethod.invoke(logAggregationContext, includePattern) - - sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern => - val setRolledLogsExcludePatternMethod = - logAggregationContext.getClass.getMethod("setRolledLogsExcludePattern", classOf[String]) - setRolledLogsExcludePatternMethod.invoke(logAggregationContext, excludePattern) - } - - val setLogAggregationContextMethod = - appContext.getClass.getMethod("setLogAggregationContext", - Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext")) - setLogAggregationContextMethod.invoke(appContext, logAggregationContext) - } catch { - case NonFatal(e) => - logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " + - s"does not support it", e) + val logAggregationContext = Records.newRecord(classOf[LogAggregationContext]) + logAggregationContext.setRolledLogsIncludePattern(includePattern) + sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern => + logAggregationContext.setRolledLogsExcludePattern(excludePattern) } + appContext.setLogAggregationContext(logAggregationContext) } appContext @@ -786,14 +737,12 @@ private[spark] class Client( val pythonPath = new ListBuffer[String]() val (pyFiles, pyArchives) = sparkConf.get(PY_FILES).partition(_.endsWith(".py")) if (pyFiles.nonEmpty) { - pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - LOCALIZED_PYTHON_DIR) + pythonPath += buildPath(Environment.PWD.$$(), LOCALIZED_PYTHON_DIR) } (pySparkArchives ++ pyArchives).foreach { path => val uri = Utils.resolveURI(path) if (uri.getScheme != LOCAL_SCHEME) { - pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - new Path(uri).getName()) + pythonPath += buildPath(Environment.PWD.$$(), new Path(uri).getName()) } else { pythonPath += uri.getPath() } @@ -802,7 +751,7 @@ private[spark] class Client( // Finally, update the Spark config to propagate PYTHONPATH to the AM and executors. if (pythonPath.nonEmpty) { val pythonPathStr = (sys.env.get("PYTHONPATH") ++ pythonPath) - .mkString(YarnSparkHadoopUtil.getClassPathSeparator) + .mkString(ApplicationConstants.CLASS_PATH_SEPARATOR) env("PYTHONPATH") = pythonPathStr sparkConf.setExecutorEnv("PYTHONPATH", pythonPathStr) } @@ -882,10 +831,7 @@ private[spark] class Client( // Add Xmx for AM memory javaOpts += "-Xmx" + amMemory + "m" - val tmpDir = new Path( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR - ) + val tmpDir = new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) javaOpts += "-Djava.io.tmpdir=" + tmpDir // TODO: Remove once cpuset version is pushed out. @@ -982,15 +928,12 @@ private[spark] class Client( Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg)) } val amArgs = - Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ - userArgs ++ Seq( - "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) + Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++ + Seq("--properties-file", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) // Command for the ApplicationMaster - val commands = prefixEnv ++ Seq( - YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server" - ) ++ + val commands = prefixEnv ++ + Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ amArgs ++ Seq( "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", @@ -1265,59 +1208,28 @@ private object Client extends Logging { private[yarn] def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) : Unit = { val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf) - for (c <- classPathElementsToAdd.flatten) { + classPathElementsToAdd.foreach { c => YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim) } } - private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] = + private def getYarnAppClasspath(conf: Configuration): Seq[String] = Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match { - case Some(s) => Some(s.toSeq) + case Some(s) => s.toSeq case None => getDefaultYarnApplicationClasspath } - private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] = + private def getMRAppClasspath(conf: Configuration): Seq[String] = Option(conf.getStrings("mapreduce.application.classpath")) match { - case Some(s) => Some(s.toSeq) + case Some(s) => s.toSeq case None => getDefaultMRApplicationClasspath } - private[yarn] def getDefaultYarnApplicationClasspath: Option[Seq[String]] = { - val triedDefault = Try[Seq[String]] { - val field = classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH") - val value = field.get(null).asInstanceOf[Array[String]] - value.toSeq - } recoverWith { - case e: NoSuchFieldException => Success(Seq.empty[String]) - } - - triedDefault match { - case f: Failure[_] => - logError("Unable to obtain the default YARN Application classpath.", f.exception) - case s: Success[Seq[String]] => - logDebug(s"Using the default YARN application classpath: ${s.get.mkString(",")}") - } - - triedDefault.toOption - } - - private[yarn] def getDefaultMRApplicationClasspath: Option[Seq[String]] = { - val triedDefault = Try[Seq[String]] { - val field = classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH") - StringUtils.getStrings(field.get(null).asInstanceOf[String]).toSeq - } recoverWith { - case e: NoSuchFieldException => Success(Seq.empty[String]) - } + private[yarn] def getDefaultYarnApplicationClasspath: Seq[String] = + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.toSeq - triedDefault match { - case f: Failure[_] => - logError("Unable to obtain the default MR Application classpath.", f.exception) - case s: Success[Seq[String]] => - logDebug(s"Using the default MR application classpath: ${s.get.mkString(",")}") - } - - triedDefault.toOption - } + private[yarn] def getDefaultMRApplicationClasspath: Seq[String] = + StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH).toSeq /** * Populate the classpath entry in the given environment map. @@ -1339,11 +1251,9 @@ private object Client extends Logging { addClasspathEntry(getClusterPath(sparkConf, cp), env) } - addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env) + addClasspathEntry(Environment.PWD.$$(), env) - addClasspathEntry( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + - LOCALIZED_CONF_DIR, env) + addClasspathEntry(Environment.PWD.$$() + Path.SEPARATOR + LOCALIZED_CONF_DIR, env) if (sparkConf.get(USER_CLASS_PATH_FIRST)) { // in order to properly add the app jar when user classpath is first @@ -1369,9 +1279,8 @@ private object Client extends Logging { } // Add the Spark jars to the classpath, depending on how they were distributed. - addClasspathEntry(buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - LOCALIZED_LIB_DIR, "*"), env) - if (!sparkConf.get(SPARK_ARCHIVE).isDefined) { + addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_LIB_DIR, "*"), env) + if (sparkConf.get(SPARK_ARCHIVE).isEmpty) { sparkConf.get(SPARK_JARS).foreach { jars => jars.filter(isLocalUri).foreach { jar => addClasspathEntry(getClusterPath(sparkConf, jar), env) @@ -1430,13 +1339,11 @@ private object Client extends Logging { if (uri != null && uri.getScheme == LOCAL_SCHEME) { addClasspathEntry(getClusterPath(conf, uri.getPath), env) } else if (fileName != null) { - addClasspathEntry(buildPath( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env) + addClasspathEntry(buildPath(Environment.PWD.$$(), fileName), env) } else if (uri != null) { val localPath = getQualifiedLocalPath(uri, hadoopConf) val linkName = Option(uri.getFragment()).getOrElse(localPath.getName()) - addClasspathEntry(buildPath( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env) + addClasspathEntry(buildPath(Environment.PWD.$$(), linkName), env) } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 868c2edc5a..b55b4b147b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -152,10 +152,7 @@ private[yarn] class ExecutorRunnable( } javaOpts += "-Djava.io.tmpdir=" + - new Path( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR - ) + new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) // Certain configs need to be passed here because they are needed before the Executor // registers with the Scheduler and transfers the spark configs. Since the Executor backend @@ -206,9 +203,8 @@ private[yarn] class ExecutorRunnable( }.toSeq YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) - val commands = prefixEnv ++ Seq( - YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", - "-server") ++ + val commands = prefixEnv ++ + Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", "--driver-url", masterAddress, diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index e498932e51..8a76dbd1bf 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -150,20 +150,6 @@ private[yarn] class YarnAllocator( private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION) - // ContainerRequest constructor that can take a node label expression. We grab it through - // reflection because it's only available in later versions of YARN. - private val nodeLabelConstructor = labelExpression.flatMap { expr => - try { - Some(classOf[ContainerRequest].getConstructor(classOf[Resource], - classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean], - classOf[String])) - } catch { - case e: NoSuchMethodException => - logWarning(s"Node label expression $expr will be ignored because YARN version on" + - " classpath does not support it.") - None - } - } // A map to store preferred hostname and possible task numbers running on it. private var hostToLocalTaskCounts: Map[String, Int] = Map.empty @@ -414,10 +400,7 @@ private[yarn] class YarnAllocator( resource: Resource, nodes: Array[String], racks: Array[String]): ContainerRequest = { - nodeLabelConstructor.map { constructor => - constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, true: java.lang.Boolean, - labelExpression.orNull) - }.getOrElse(new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY)) + new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY, true, labelExpression.orNull) } /** diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 53df11eb66..163dfb5a60 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -17,12 +17,8 @@ package org.apache.spark.deploy.yarn -import java.util.{List => JList} - import scala.collection.JavaConverters._ -import scala.util.Try -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest @@ -99,24 +95,11 @@ private[spark] class YarnRMClient extends Logging { def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] = { // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2, // so not all stable releases have it. - val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration]) - .invoke(null, conf).asInstanceOf[String]).getOrElse("http://") - - // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses. - try { - val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter", - classOf[Configuration]) - val proxies = method.invoke(null, conf).asInstanceOf[JList[String]] - val hosts = proxies.asScala.map { proxy => proxy.split(":")(0) } - val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase } - Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) - } catch { - case e: NoSuchMethodException => - val proxy = WebAppUtils.getProxyHostAndPort(conf) - val parts = proxy.split(":") - val uriBase = prefix + proxy + proxyBase - Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase) - } + val prefix = WebAppUtils.getHttpSchemePrefix(conf) + val proxies = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf) + val hosts = proxies.asScala.map(_.split(":").head) + val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase } + Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) } /** Returns the maximum number of attempts to register the AM. */ @@ -124,12 +107,10 @@ private[spark] class YarnRMClient extends Logging { val sparkMaxAttempts = sparkConf.get(MAX_APP_ATTEMPTS).map(_.toInt) val yarnMaxAttempts = yarnConf.getInt( YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) - val retval: Int = sparkMaxAttempts match { + sparkMaxAttempts match { case Some(x) => if (x <= yarnMaxAttempts) x else yarnMaxAttempts case None => yarnMaxAttempts } - - retval } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index cc53b1b06e..9357885512 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -17,13 +17,11 @@ package org.apache.spark.deploy.yarn -import java.io.File import java.nio.charset.StandardCharsets.UTF_8 import java.util.regex.Matcher import java.util.regex.Pattern import scala.collection.mutable.{HashMap, ListBuffer} -import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.Text @@ -31,7 +29,6 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.ApplicationConstants -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils @@ -137,7 +134,12 @@ object YarnSparkHadoopUtil { * If the map already contains this key, append the value to the existing value instead. */ def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = { - val newValue = if (env.contains(key)) { env(key) + getClassPathSeparator + value } else value + val newValue = + if (env.contains(key)) { + env(key) + ApplicationConstants.CLASS_PATH_SEPARATOR + value + } else { + value + } env.put(key, newValue) } @@ -156,8 +158,8 @@ object YarnSparkHadoopUtil { while (m.find()) { val variable = m.group(1) var replace = "" - if (env.get(variable) != None) { - replace = env.get(variable).get + if (env.contains(variable)) { + replace = env(variable) } else { // if this key is not configured for the child .. get it from the env replace = System.getenv(variable) @@ -235,13 +237,11 @@ object YarnSparkHadoopUtil { YarnCommandBuilderUtils.quoteForBatchScript(arg) } else { val escaped = new StringBuilder("'") - for (i <- 0 to arg.length() - 1) { - arg.charAt(i) match { - case '$' => escaped.append("\\$") - case '"' => escaped.append("\\\"") - case '\'' => escaped.append("'\\''") - case c => escaped.append(c) - } + arg.foreach { + case '$' => escaped.append("\\$") + case '"' => escaped.append("\\\"") + case '\'' => escaped.append("'\\''") + case c => escaped.append(c) } escaped.append("'").toString() } @@ -263,33 +263,6 @@ object YarnSparkHadoopUtil { } /** - * Expand environment variable using Yarn API. - * If environment.$$() is implemented, return the result of it. - * Otherwise, return the result of environment.$() - * Note: $$() is added in Hadoop 2.4. - */ - private lazy val expandMethod = - Try(classOf[Environment].getMethod("$$")) - .getOrElse(classOf[Environment].getMethod("$")) - - def expandEnvironment(environment: Environment): String = - expandMethod.invoke(environment).asInstanceOf[String] - - /** - * Get class path separator using Yarn API. - * If ApplicationConstants.CLASS_PATH_SEPARATOR is implemented, return it. - * Otherwise, return File.pathSeparator - * Note: CLASS_PATH_SEPARATOR is added in Hadoop 2.4. - */ - private lazy val classPathSeparatorField = - Try(classOf[ApplicationConstants].getField("CLASS_PATH_SEPARATOR")) - .getOrElse(classOf[File].getField("pathSeparator")) - - def getClassPathSeparator(): String = { - classPathSeparatorField.get(null).asInstanceOf[String] - } - - /** * Getting the initial target number of executors depends on whether dynamic allocation is * enabled. * If not using dynamic allocation it gets the number of executors requested by the user. |