aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala15
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala92
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala34
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala4
4 files changed, 82 insertions, 63 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index a4b575c85d..ad50ea789a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -173,7 +173,6 @@ private[spark] class ApplicationMaster(
sys.props.remove(e.key)
}
- logInfo("Prepared Local resources " + resources)
resources.toMap
}
@@ -329,7 +328,7 @@ private[spark] class ApplicationMaster(
val appId = client.getAttemptId().getApplicationId().toString()
val attemptId = client.getAttemptId().getAttemptId().toString()
val historyAddress =
- sparkConf.get(HISTORY_SERVER_ADDRESS)
+ _sparkConf.get(HISTORY_SERVER_ADDRESS)
.map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) }
.map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
.getOrElse("")
@@ -338,6 +337,18 @@ private[spark] class ApplicationMaster(
_sparkConf.get("spark.driver.host"),
_sparkConf.get("spark.driver.port").toInt,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+ // Before we initialize the allocator, let's log the information about how executors will
+ // be run up front, to avoid printing this out for every single executor being launched.
+ // Use placeholders for information that changes such as executor IDs.
+ logInfo {
+ val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
+ val executorCores = sparkConf.get(EXECUTOR_CORES)
+ val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>",
+ "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
+ dummyRunner.launchContextDebugInfo()
+ }
+
allocator = client.register(driverUrl,
driverRef,
yarnConf,
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 3d0e996b18..8e0533f39a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -24,7 +24,6 @@ import java.util.Collections
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
@@ -45,11 +44,11 @@ import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils
private[yarn] class ExecutorRunnable(
- container: Container,
- conf: Configuration,
+ container: Option[Container],
+ conf: YarnConfiguration,
sparkConf: SparkConf,
masterAddress: String,
- slaveId: String,
+ executorId: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
@@ -59,43 +58,46 @@ private[yarn] class ExecutorRunnable(
var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
- val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
- lazy val env = prepareEnvironment(container)
def run(): Unit = {
- logInfo("Starting Executor Container")
+ logDebug("Starting Executor Container")
nmClient = NMClient.createNMClient()
- nmClient.init(yarnConf)
+ nmClient.init(conf)
nmClient.start()
startContainer()
}
- def startContainer(): java.util.Map[String, ByteBuffer] = {
- logInfo("Setting up ContainerLaunchContext")
+ def launchContextDebugInfo(): String = {
+ val commands = prepareCommand()
+ val env = prepareEnvironment()
+
+ s"""
+ |===============================================================================
+ |YARN executor launch context:
+ | env:
+ |${env.map { case (k, v) => s" $k -> $v\n" }.mkString}
+ | command:
+ | ${commands.mkString(" \\ \n ")}
+ |
+ | resources:
+ |${localResources.map { case (k, v) => s" $k -> $v\n" }.mkString}
+ |===============================================================================""".stripMargin
+ }
+ def startContainer(): java.util.Map[String, ByteBuffer] = {
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
.asInstanceOf[ContainerLaunchContext]
+ val env = prepareEnvironment().asJava
ctx.setLocalResources(localResources.asJava)
- ctx.setEnvironment(env.asJava)
+ ctx.setEnvironment(env)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
- val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
- appId)
-
- logInfo(s"""
- |===============================================================================
- |YARN executor launch context:
- | env:
- |${env.map { case (k, v) => s" $k -> $v\n" }.mkString}
- | command:
- | ${commands.mkString(" ")}
- |===============================================================================
- """.stripMargin)
+ val commands = prepareCommand()
ctx.setCommands(commands.asJava)
ctx.setApplicationACLs(
@@ -119,21 +121,15 @@ private[yarn] class ExecutorRunnable(
// Send the start request to the ContainerManager
try {
- nmClient.startContainer(container, ctx)
+ nmClient.startContainer(container.get, ctx)
} catch {
case ex: Exception =>
- throw new SparkException(s"Exception while starting container ${container.getId}" +
+ throw new SparkException(s"Exception while starting container ${container.get.getId}" +
s" on host $hostname", ex)
}
}
- private def prepareCommand(
- masterAddress: String,
- slaveId: String,
- hostname: String,
- executorMemory: Int,
- executorCores: Int,
- appId: String): List[String] = {
+ private def prepareCommand(): List[String] = {
// Extra options for the JVM
val javaOpts = ListBuffer[String]()
@@ -216,23 +212,23 @@ private[yarn] class ExecutorRunnable(
"-server") ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
- "--driver-url", masterAddress.toString,
- "--executor-id", slaveId.toString,
- "--hostname", hostname.toString,
+ "--driver-url", masterAddress,
+ "--executor-id", executorId,
+ "--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId) ++
userClassPath ++
Seq(
- "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
- "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+ s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
+ s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
// TODO: it would be nicer to just make sure there are no null commands here
commands.map(s => if (s == null) "null" else s).toList
}
- private def prepareEnvironment(container: Container): HashMap[String, String] = {
+ private def prepareEnvironment(): HashMap[String, String] = {
val env = new HashMap[String, String]()
- Client.populateClasspath(null, yarnConf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))
+ Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))
sparkConf.getExecutorEnv.foreach { case (key, value) =>
// This assumes each executor environment variable set here is a path
@@ -246,20 +242,22 @@ private[yarn] class ExecutorRunnable(
}
// lookup appropriate http scheme for container log urls
- val yarnHttpPolicy = yarnConf.get(
+ val yarnHttpPolicy = conf.get(
YarnConfiguration.YARN_HTTP_POLICY_KEY,
YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
)
val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"
// Add log urls
- sys.env.get("SPARK_USER").foreach { user =>
- val containerId = ConverterUtils.toString(container.getId)
- val address = container.getNodeHttpAddress
- val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user"
-
- env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096"
- env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096"
+ container.foreach { c =>
+ sys.env.get("SPARK_USER").foreach { user =>
+ val containerId = ConverterUtils.toString(c.getId)
+ val address = c.getNodeHttpAddress
+ val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user"
+
+ env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096"
+ env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096"
+ }
}
System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index dbdac3369b..0daf1ea0bc 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -26,10 +26,10 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.log4j.{Level, Logger}
@@ -60,7 +60,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
private[yarn] class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
- conf: Configuration,
+ conf: YarnConfiguration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
@@ -297,8 +297,9 @@ private[yarn] class YarnAllocator(
val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
if (missing > 0) {
- logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
- s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
+ logInfo(s"Will request $missing executor container(s), each with " +
+ s"${resource.getVirtualCores} core(s) and " +
+ s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)")
// Split the pending container request into three groups: locality matched list, locality
// unmatched list and non-locality list. Take the locality matched container request into
@@ -314,7 +315,9 @@ private[yarn] class YarnAllocator(
amClient.removeContainerRequest(stale)
}
val cancelledContainers = staleRequests.size
- logInfo(s"Canceled $cancelledContainers container requests (locality no longer needed)")
+ if (cancelledContainers > 0) {
+ logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")
+ }
// consider the number of new containers and cancelled stale containers available
val availableContainers = missing + cancelledContainers
@@ -344,14 +347,24 @@ private[yarn] class YarnAllocator(
anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
amClient.removeContainerRequest(nonLocal)
}
- logInfo(s"Canceled $numToCancel container requests for any host to resubmit with locality")
+ if (numToCancel > 0) {
+ logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality")
+ }
}
newLocalityRequests.foreach { request =>
amClient.addContainerRequest(request)
- logInfo(s"Submitted container request (host: ${hostStr(request)}, capability: $resource)")
}
+ if (log.isInfoEnabled()) {
+ val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
+ if (anyHost.nonEmpty) {
+ logInfo(s"Submitted ${anyHost.size} unlocalized container requests.")
+ }
+ localized.foreach { request =>
+ logInfo(s"Submitted container request for host ${hostStr(request)}.")
+ }
+ }
} else if (numPendingAllocate > 0 && missing < 0) {
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
@@ -479,7 +492,7 @@ private[yarn] class YarnAllocator(
val containerId = container.getId
val executorId = executorIdCounter.toString
assert(container.getResource.getMemory >= resource.getMemory)
- logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
+ logInfo(s"Launching container $containerId on host $executorHostname")
def updateInternalState(): Unit = synchronized {
numExecutorsRunning += 1
@@ -494,14 +507,11 @@ private[yarn] class YarnAllocator(
}
if (launchContainers) {
- logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
- driverUrl, executorHostname))
-
launcherPool.execute(new Runnable {
override def run(): Unit = {
try {
new ExecutorRunnable(
- container,
+ Some(container),
conf,
sparkConf,
driverUrl,
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 207dbf56d3..696e552c35 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -19,12 +19,12 @@ package org.apache.spark.deploy.yarn
import java.util.{Arrays, List => JList}
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
import org.apache.hadoop.net.DNSToSwitchMapping
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterEach, Matchers}
@@ -49,7 +49,7 @@ class MockResolver extends DNSToSwitchMapping {
}
class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
- val conf = new Configuration()
+ val conf = new YarnConfiguration()
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
classOf[MockResolver], classOf[DNSToSwitchMapping])