aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-09-06 15:54:54 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-09-06 15:54:54 -0700
commit0bd00ff2454c5046e4cb084ee64d432c4d3dcbc3 (patch)
tree9db958479bc72358c2082f000a914a2dce77f41b /yarn
parent4f769b903bc9822c262f0a15f5933cc05c67923f (diff)
downloadspark-0bd00ff2454c5046e4cb084ee64d432c4d3dcbc3.tar.gz
spark-0bd00ff2454c5046e4cb084ee64d432c4d3dcbc3.tar.bz2
spark-0bd00ff2454c5046e4cb084ee64d432c4d3dcbc3.zip
[SPARK-15891][YARN] Clean up some logging in the YARN AM.
To make the log file more readable, rework some of the logging done by the AM: - log executor command / env just once, since they're all almost the same; the information that changes, such as executor ID, is already available in other log messages. - avoid printing logs when nothing happens, especially when updating the container requests in the allocator. - print fewer log messages when requesting many unlocalized executors, instead of repeating the same message multiple times. - removed some logs that seemed unnecessary. In the process, I slightly fixed up the wording in a few log messages, and did some minor clean up of method arguments that were redundant. Tested by running existing unit tests, and analyzing the logs of an application that exercises dynamic allocation by forcing executors to be allocated and be killed in waves. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #14943 from vanzin/SPARK-15891.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala15
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala92
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala34
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala4
4 files changed, 82 insertions, 63 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index a4b575c85d..ad50ea789a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -173,7 +173,6 @@ private[spark] class ApplicationMaster(
sys.props.remove(e.key)
}
- logInfo("Prepared Local resources " + resources)
resources.toMap
}
@@ -329,7 +328,7 @@ private[spark] class ApplicationMaster(
val appId = client.getAttemptId().getApplicationId().toString()
val attemptId = client.getAttemptId().getAttemptId().toString()
val historyAddress =
- sparkConf.get(HISTORY_SERVER_ADDRESS)
+ _sparkConf.get(HISTORY_SERVER_ADDRESS)
.map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) }
.map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
.getOrElse("")
@@ -338,6 +337,18 @@ private[spark] class ApplicationMaster(
_sparkConf.get("spark.driver.host"),
_sparkConf.get("spark.driver.port").toInt,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+ // Before we initialize the allocator, let's log the information about how executors will
+ // be run up front, to avoid printing this out for every single executor being launched.
+ // Use placeholders for information that changes such as executor IDs.
+ logInfo {
+ val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
+ val executorCores = sparkConf.get(EXECUTOR_CORES)
+ val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>",
+ "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
+ dummyRunner.launchContextDebugInfo()
+ }
+
allocator = client.register(driverUrl,
driverRef,
yarnConf,
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 3d0e996b18..8e0533f39a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -24,7 +24,6 @@ import java.util.Collections
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
@@ -45,11 +44,11 @@ import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils
private[yarn] class ExecutorRunnable(
- container: Container,
- conf: Configuration,
+ container: Option[Container],
+ conf: YarnConfiguration,
sparkConf: SparkConf,
masterAddress: String,
- slaveId: String,
+ executorId: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
@@ -59,43 +58,46 @@ private[yarn] class ExecutorRunnable(
var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
- val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
- lazy val env = prepareEnvironment(container)
def run(): Unit = {
- logInfo("Starting Executor Container")
+ logDebug("Starting Executor Container")
nmClient = NMClient.createNMClient()
- nmClient.init(yarnConf)
+ nmClient.init(conf)
nmClient.start()
startContainer()
}
- def startContainer(): java.util.Map[String, ByteBuffer] = {
- logInfo("Setting up ContainerLaunchContext")
+ def launchContextDebugInfo(): String = {
+ val commands = prepareCommand()
+ val env = prepareEnvironment()
+
+ s"""
+ |===============================================================================
+ |YARN executor launch context:
+ | env:
+ |${env.map { case (k, v) => s" $k -> $v\n" }.mkString}
+ | command:
+ | ${commands.mkString(" \\ \n ")}
+ |
+ | resources:
+ |${localResources.map { case (k, v) => s" $k -> $v\n" }.mkString}
+ |===============================================================================""".stripMargin
+ }
+ def startContainer(): java.util.Map[String, ByteBuffer] = {
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
.asInstanceOf[ContainerLaunchContext]
+ val env = prepareEnvironment().asJava
ctx.setLocalResources(localResources.asJava)
- ctx.setEnvironment(env.asJava)
+ ctx.setEnvironment(env)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
- val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
- appId)
-
- logInfo(s"""
- |===============================================================================
- |YARN executor launch context:
- | env:
- |${env.map { case (k, v) => s" $k -> $v\n" }.mkString}
- | command:
- | ${commands.mkString(" ")}
- |===============================================================================
- """.stripMargin)
+ val commands = prepareCommand()
ctx.setCommands(commands.asJava)
ctx.setApplicationACLs(
@@ -119,21 +121,15 @@ private[yarn] class ExecutorRunnable(
// Send the start request to the ContainerManager
try {
- nmClient.startContainer(container, ctx)
+ nmClient.startContainer(container.get, ctx)
} catch {
case ex: Exception =>
- throw new SparkException(s"Exception while starting container ${container.getId}" +
+ throw new SparkException(s"Exception while starting container ${container.get.getId}" +
s" on host $hostname", ex)
}
}
- private def prepareCommand(
- masterAddress: String,
- slaveId: String,
- hostname: String,
- executorMemory: Int,
- executorCores: Int,
- appId: String): List[String] = {
+ private def prepareCommand(): List[String] = {
// Extra options for the JVM
val javaOpts = ListBuffer[String]()
@@ -216,23 +212,23 @@ private[yarn] class ExecutorRunnable(
"-server") ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
- "--driver-url", masterAddress.toString,
- "--executor-id", slaveId.toString,
- "--hostname", hostname.toString,
+ "--driver-url", masterAddress,
+ "--executor-id", executorId,
+ "--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId) ++
userClassPath ++
Seq(
- "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
- "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+ s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
+ s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
// TODO: it would be nicer to just make sure there are no null commands here
commands.map(s => if (s == null) "null" else s).toList
}
- private def prepareEnvironment(container: Container): HashMap[String, String] = {
+ private def prepareEnvironment(): HashMap[String, String] = {
val env = new HashMap[String, String]()
- Client.populateClasspath(null, yarnConf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))
+ Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))
sparkConf.getExecutorEnv.foreach { case (key, value) =>
// This assumes each executor environment variable set here is a path
@@ -246,20 +242,22 @@ private[yarn] class ExecutorRunnable(
}
// lookup appropriate http scheme for container log urls
- val yarnHttpPolicy = yarnConf.get(
+ val yarnHttpPolicy = conf.get(
YarnConfiguration.YARN_HTTP_POLICY_KEY,
YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
)
val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"
// Add log urls
- sys.env.get("SPARK_USER").foreach { user =>
- val containerId = ConverterUtils.toString(container.getId)
- val address = container.getNodeHttpAddress
- val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user"
-
- env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096"
- env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096"
+ container.foreach { c =>
+ sys.env.get("SPARK_USER").foreach { user =>
+ val containerId = ConverterUtils.toString(c.getId)
+ val address = c.getNodeHttpAddress
+ val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user"
+
+ env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096"
+ env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096"
+ }
}
System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index dbdac3369b..0daf1ea0bc 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -26,10 +26,10 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.log4j.{Level, Logger}
@@ -60,7 +60,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
private[yarn] class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
- conf: Configuration,
+ conf: YarnConfiguration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
@@ -297,8 +297,9 @@ private[yarn] class YarnAllocator(
val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
if (missing > 0) {
- logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
- s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
+ logInfo(s"Will request $missing executor container(s), each with " +
+ s"${resource.getVirtualCores} core(s) and " +
+ s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)")
// Split the pending container request into three groups: locality matched list, locality
// unmatched list and non-locality list. Take the locality matched container request into
@@ -314,7 +315,9 @@ private[yarn] class YarnAllocator(
amClient.removeContainerRequest(stale)
}
val cancelledContainers = staleRequests.size
- logInfo(s"Canceled $cancelledContainers container requests (locality no longer needed)")
+ if (cancelledContainers > 0) {
+ logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")
+ }
// consider the number of new containers and cancelled stale containers available
val availableContainers = missing + cancelledContainers
@@ -344,14 +347,24 @@ private[yarn] class YarnAllocator(
anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
amClient.removeContainerRequest(nonLocal)
}
- logInfo(s"Canceled $numToCancel container requests for any host to resubmit with locality")
+ if (numToCancel > 0) {
+ logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality")
+ }
}
newLocalityRequests.foreach { request =>
amClient.addContainerRequest(request)
- logInfo(s"Submitted container request (host: ${hostStr(request)}, capability: $resource)")
}
+ if (log.isInfoEnabled()) {
+ val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
+ if (anyHost.nonEmpty) {
+ logInfo(s"Submitted ${anyHost.size} unlocalized container requests.")
+ }
+ localized.foreach { request =>
+ logInfo(s"Submitted container request for host ${hostStr(request)}.")
+ }
+ }
} else if (numPendingAllocate > 0 && missing < 0) {
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
@@ -479,7 +492,7 @@ private[yarn] class YarnAllocator(
val containerId = container.getId
val executorId = executorIdCounter.toString
assert(container.getResource.getMemory >= resource.getMemory)
- logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
+ logInfo(s"Launching container $containerId on host $executorHostname")
def updateInternalState(): Unit = synchronized {
numExecutorsRunning += 1
@@ -494,14 +507,11 @@ private[yarn] class YarnAllocator(
}
if (launchContainers) {
- logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
- driverUrl, executorHostname))
-
launcherPool.execute(new Runnable {
override def run(): Unit = {
try {
new ExecutorRunnable(
- container,
+ Some(container),
conf,
sparkConf,
driverUrl,
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 207dbf56d3..696e552c35 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -19,12 +19,12 @@ package org.apache.spark.deploy.yarn
import java.util.{Arrays, List => JList}
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
import org.apache.hadoop.net.DNSToSwitchMapping
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterEach, Matchers}
@@ -49,7 +49,7 @@ class MockResolver extends DNSToSwitchMapping {
}
class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
- val conf = new Configuration()
+ val conf = new YarnConfiguration()
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
classOf[MockResolver], classOf[DNSToSwitchMapping])