aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-06-09 17:31:19 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-06-09 17:31:19 -0700
commitaa0364510792c18a0973b6096cd38f611fc1c1a6 (patch)
tree822873f1e3465291f1e5dfbd12e5a4c559bb61d4 /yarn
parentb0768538e56e5bbda7aaabbe2a0197e30ba5f993 (diff)
downloadspark-aa0364510792c18a0973b6096cd38f611fc1c1a6.tar.gz
spark-aa0364510792c18a0973b6096cd38f611fc1c1a6.tar.bz2
spark-aa0364510792c18a0973b6096cd38f611fc1c1a6.zip
[SPARK-12447][YARN] Only update the states when executor is successfully launched
The details is described in https://issues.apache.org/jira/browse/SPARK-12447. vanzin Please help to review, thanks a lot. Author: jerryshao <sshao@hortonworks.com> Closes #10412 from jerryshao/SPARK-12447.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala72
2 files changed, 47 insertions, 30 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index fc753b7e75..3d0e996b18 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -55,15 +55,14 @@ private[yarn] class ExecutorRunnable(
executorCores: Int,
appId: String,
securityMgr: SecurityManager,
- localResources: Map[String, LocalResource])
- extends Runnable with Logging {
+ localResources: Map[String, LocalResource]) extends Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
lazy val env = prepareEnvironment(container)
- override def run(): Unit = {
+ def run(): Unit = {
logInfo("Starting Executor Container")
nmClient = NMClient.createNMClient()
nmClient.init(yarnConf)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 066c665954..b110d82fb8 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -24,6 +24,7 @@ import java.util.regex.Pattern
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
@@ -472,41 +473,58 @@ private[yarn] class YarnAllocator(
*/
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
for (container <- containersToUse) {
- numExecutorsRunning += 1
- assert(numExecutorsRunning <= targetNumExecutors)
+ executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
- executorIdCounter += 1
val executorId = executorIdCounter.toString
-
assert(container.getResource.getMemory >= resource.getMemory)
-
logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
- executorIdToContainer(executorId) = container
- containerIdToExecutorId(container.getId) = executorId
-
- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
- new HashSet[ContainerId])
-
- containerSet += containerId
- allocatedContainerToHostMap.put(containerId, executorHostname)
-
- val executorRunnable = new ExecutorRunnable(
- container,
- conf,
- sparkConf,
- driverUrl,
- executorId,
- executorHostname,
- executorMemory,
- executorCores,
- appAttemptId.getApplicationId.toString,
- securityMgr,
- localResources)
+
+ def updateInternalState(): Unit = synchronized {
+ numExecutorsRunning += 1
+ assert(numExecutorsRunning <= targetNumExecutors)
+ executorIdToContainer(executorId) = container
+ containerIdToExecutorId(container.getId) = executorId
+
+ val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
+ new HashSet[ContainerId])
+ containerSet += containerId
+ allocatedContainerToHostMap.put(containerId, executorHostname)
+ }
+
if (launchContainers) {
logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
driverUrl, executorHostname))
- launcherPool.execute(executorRunnable)
+
+ launcherPool.execute(new Runnable {
+ override def run(): Unit = {
+ try {
+ new ExecutorRunnable(
+ container,
+ conf,
+ sparkConf,
+ driverUrl,
+ executorId,
+ executorHostname,
+ executorMemory,
+ executorCores,
+ appAttemptId.getApplicationId.toString,
+ securityMgr,
+ localResources
+ ).run()
+ updateInternalState()
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Failed to launch executor $executorId on container $containerId", e)
+ // Assigned container should be released immediately to avoid unnecessary resource
+ // occupation.
+ amClient.releaseAssignedContainer(containerId)
+ }
+ }
+ })
+ } else {
+ // For test only
+ updateInternalState()
}
}
}