aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorMridul Muralidharan <mridulm@yahoo-inc.com>2015-07-30 10:37:53 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2015-07-30 10:37:53 -0700
commite53534655d6198e5b8a507010d26c7b4c4e7f1fd (patch)
treebbdf3c91556225c98fbd6d2d66ae7f41813fe674 /yarn
parent5363ed71568c3e7c082146d654a9c669d692d894 (diff)
downloadspark-e53534655d6198e5b8a507010d26c7b4c4e7f1fd.tar.gz
spark-e53534655d6198e5b8a507010d26c7b4c4e7f1fd.tar.bz2
spark-e53534655d6198e5b8a507010d26c7b4c4e7f1fd.zip
[SPARK-8297] [YARN] Scheduler backend is not notified in case node fails in YARN
This change adds code to notify the scheduler backend when a container dies in YARN. Author: Mridul Muralidharan <mridulm@yahoo-inc.com> Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #7431 from vanzin/SPARK-8297 and squashes the following commits: 471e4a0 [Marcelo Vanzin] Fix unit test after merge. d4adf4e [Marcelo Vanzin] Merge branch 'master' into SPARK-8297 3b262e8 [Marcelo Vanzin] Merge branch 'master' into SPARK-8297 537da6f [Marcelo Vanzin] Make an expected log less scary. 04dc112 [Marcelo Vanzin] Use driver <-> AM communication to send "remove executor" request. 8855b97 [Marcelo Vanzin] Merge remote-tracking branch 'mridul/fix_yarn_scheduler_bug' into SPARK-8297 687790f [Mridul Muralidharan] Merge branch 'fix_yarn_scheduler_bug' of github.com:mridulm/spark into fix_yarn_scheduler_bug e1b0067 [Mridul Muralidharan] Fix failing testcase, fix merge issue from our 1.3 -> master 9218fcc [Mridul Muralidharan] Fix failing testcase 362d64a [Mridul Muralidharan] Merge branch 'fix_yarn_scheduler_bug' of github.com:mridulm/spark into fix_yarn_scheduler_bug 62ad0cc [Mridul Muralidharan] Merge branch 'fix_yarn_scheduler_bug' of github.com:mridulm/spark into fix_yarn_scheduler_bug bbf8811 [Mridul Muralidharan] Merge branch 'fix_yarn_scheduler_bug' of github.com:mridulm/spark into fix_yarn_scheduler_bug 9ee1307 [Mridul Muralidharan] Fix SPARK-8297 a3a0f01 [Mridul Muralidharan] Fix SPARK-8297
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala22
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala32
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala5
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala29
4 files changed, 74 insertions, 14 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 44acc7374d..1d67b3ebb5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -229,7 +229,11 @@ private[spark] class ApplicationMaster(
sparkContextRef.compareAndSet(sc, null)
}
- private def registerAM(_rpcEnv: RpcEnv, uiAddress: String, securityMgr: SecurityManager) = {
+ private def registerAM(
+ _rpcEnv: RpcEnv,
+ driverRef: RpcEndpointRef,
+ uiAddress: String,
+ securityMgr: SecurityManager) = {
val sc = sparkContextRef.get()
val appId = client.getAttemptId().getApplicationId().toString()
@@ -246,6 +250,7 @@ private[spark] class ApplicationMaster(
RpcAddress(_sparkConf.get("spark.driver.host"), _sparkConf.get("spark.driver.port").toInt),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
allocator = client.register(driverUrl,
+ driverRef,
yarnConf,
_sparkConf,
if (sc != null) sc.preferredNodeLocationData else Map(),
@@ -262,17 +267,20 @@ private[spark] class ApplicationMaster(
*
* In cluster mode, the AM and the driver belong to same process
* so the AMEndpoint need not monitor lifecycle of the driver.
+ *
+ * @return A reference to the driver's RPC endpoint.
*/
private def runAMEndpoint(
host: String,
port: String,
- isClusterMode: Boolean): Unit = {
+ isClusterMode: Boolean): RpcEndpointRef = {
val driverEndpoint = rpcEnv.setupEndpointRef(
SparkEnv.driverActorSystemName,
RpcAddress(host, port.toInt),
YarnSchedulerBackend.ENDPOINT_NAME)
amEndpoint =
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
+ driverEndpoint
}
private def runDriver(securityMgr: SecurityManager): Unit = {
@@ -290,11 +298,11 @@ private[spark] class ApplicationMaster(
"Timed out waiting for SparkContext.")
} else {
rpcEnv = sc.env.rpcEnv
- runAMEndpoint(
+ val driverRef = runAMEndpoint(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
- registerAM(rpcEnv, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
+ registerAM(rpcEnv, driverRef, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
userClassThread.join()
}
}
@@ -302,9 +310,9 @@ private[spark] class ApplicationMaster(
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
val port = sparkConf.getInt("spark.yarn.am.port", 0)
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr)
- waitForSparkDriver()
+ val driverRef = waitForSparkDriver()
addAmIpFilter()
- registerAM(rpcEnv, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
+ registerAM(rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
// In client mode the actor will stop the reporter thread.
reporterThread.join()
@@ -428,7 +436,7 @@ private[spark] class ApplicationMaster(
}
}
- private def waitForSparkDriver(): Unit = {
+ private def waitForSparkDriver(): RpcEndpointRef = {
logInfo("Waiting for Spark driver to be reachable.")
var driverUp = false
val hostport = args.userArgs(0)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 6c103394af..59caa787b6 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -36,6 +36,9 @@ import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
/**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
@@ -52,6 +55,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
*/
private[yarn] class YarnAllocator(
driverUrl: String,
+ driverRef: RpcEndpointRef,
conf: Configuration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
@@ -88,6 +92,9 @@ private[yarn] class YarnAllocator(
// Visible for testing.
private[yarn] val executorIdToContainer = new HashMap[String, Container]
+ private var numUnexpectedContainerRelease = 0L
+ private val containerIdToExecutorId = new HashMap[ContainerId, String]
+
// Executor memory in MB.
protected val executorMemory = args.executorMemory
// Additional memory overhead.
@@ -184,6 +191,7 @@ private[yarn] class YarnAllocator(
def killExecutor(executorId: String): Unit = synchronized {
if (executorIdToContainer.contains(executorId)) {
val container = executorIdToContainer.remove(executorId).get
+ containerIdToExecutorId.remove(container.getId)
internalReleaseContainer(container)
numExecutorsRunning -= 1
} else {
@@ -383,6 +391,7 @@ private[yarn] class YarnAllocator(
logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
executorIdToContainer(executorId) = container
+ containerIdToExecutorId(container.getId) = executorId
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId])
@@ -413,12 +422,8 @@ private[yarn] class YarnAllocator(
private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
for (completedContainer <- completedContainers) {
val containerId = completedContainer.getContainerId
-
- if (releasedContainers.contains(containerId)) {
- // Already marked the container for release, so remove it from
- // `releasedContainers`.
- releasedContainers.remove(containerId)
- } else {
+ val alreadyReleased = releasedContainers.remove(containerId)
+ if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning -= 1
@@ -460,6 +465,18 @@ private[yarn] class YarnAllocator(
allocatedContainerToHostMap.remove(containerId)
}
+
+ containerIdToExecutorId.remove(containerId).foreach { eid =>
+ executorIdToContainer.remove(eid)
+
+ if (!alreadyReleased) {
+ // The executor could have gone away (like no route to host, node failure, etc)
+ // Notify backend about the failure of the executor
+ numUnexpectedContainerRelease += 1
+ driverRef.send(RemoveExecutor(eid,
+ s"Yarn deallocated the executor $eid (container $containerId)"))
+ }
+ }
}
}
@@ -467,6 +484,9 @@ private[yarn] class YarnAllocator(
releasedContainers.add(container.getId())
amClient.releaseAssignedContainer(container.getId())
}
+
+ private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease
+
}
private object YarnAllocator {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 7f533ee55e..4999f9c062 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.hadoop.yarn.webapp.util.WebAppUtils
import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.util.Utils
@@ -56,6 +57,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
*/
def register(
driverUrl: String,
+ driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
preferredNodeLocations: Map[String, Set[SplitInfo]],
@@ -73,7 +75,8 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
registered = true
}
- new YarnAllocator(driverUrl, conf, sparkConf, amClient, getAttemptId(), args, securityMgr)
+ new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), args,
+ securityMgr)
}
/**
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 37a789fcd3..58318bf9bc 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -27,10 +27,14 @@ import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.scalatest.{BeforeAndAfterEach, Matchers}
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+import org.mockito.Mockito._
+
import org.apache.spark.{SecurityManager, SparkFunSuite}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.YarnAllocator._
+import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.SplitInfo
class MockResolver extends DNSToSwitchMapping {
@@ -90,6 +94,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
"--class", "SomeClass")
new YarnAllocator(
"not used",
+ mock(classOf[RpcEndpointRef]),
conf,
sparkConf,
rmClient,
@@ -230,6 +235,30 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.getNumPendingAllocate should be (1)
}
+ test("lost executor removed from backend") {
+ val handler = createAllocator(4)
+ handler.updateResourceRequests()
+ handler.getNumExecutorsRunning should be (0)
+ handler.getNumPendingAllocate should be (4)
+
+ val container1 = createContainer("host1")
+ val container2 = createContainer("host2")
+ handler.handleAllocatedContainers(Array(container1, container2))
+
+ handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map())
+
+ val statuses = Seq(container1, container2).map { c =>
+ ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)
+ }
+ handler.updateResourceRequests()
+ handler.processCompletedContainers(statuses.toSeq)
+ handler.updateResourceRequests()
+ handler.getNumExecutorsRunning should be (0)
+ handler.getNumPendingAllocate should be (2)
+ handler.getNumExecutorsFailed should be (2)
+ handler.getNumUnexpectedContainerRelease should be (2)
+ }
+
test("memory exceeded diagnostic regexes") {
val diagnostics =
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +