aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authortgravescs <tgraves_cs@yahoo.com>2013-11-19 12:39:26 -0600
committertgravescs <tgraves_cs@yahoo.com>2013-11-19 12:44:00 -0600
commit4093e9393aef95793f2d1d77fd0bbe80c8bb8d11 (patch)
treefc899a11dc2fd20ecadc947fd2e8fea44425d008 /yarn
parente2ebc3a9d8bca83bf842b134f2f056c1af0ad2be (diff)
downloadspark-4093e9393aef95793f2d1d77fd0bbe80c8bb8d11.tar.gz
spark-4093e9393aef95793f2d1d77fd0bbe80c8bb8d11.tar.bz2
spark-4093e9393aef95793f2d1d77fd0bbe80c8bb8d11.zip
Impove Spark on Yarn Error handling
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala39
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala32
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala16
3 files changed, 58 insertions, 29 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 0e47bd7a10..89b00415da 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -52,7 +52,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
-
+ // default to numWorkers * 2, with minimum of 3
+ private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures",
+ math.max(args.numWorkers * 2, 3).toString()).toInt
def run() {
// setup the directories so things go to yarn approved directories rather
@@ -225,12 +227,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
if (null != sparkContext) {
uiAddress = sparkContext.ui.appUIAddress
- this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args,
- sparkContext.preferredNodeLocationData)
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager,
+ appAttemptId, args, sparkContext.preferredNodeLocationData)
} else {
logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime +
- ", numTries = " + numTries)
- this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args)
+ ", numTries = " + numTries)
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager,
+ appAttemptId, args)
}
}
} finally {
@@ -249,8 +252,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
while(yarnAllocator.getNumWorkersRunning < args.numWorkers &&
// If user thread exists, then quit !
userThread.isAlive) {
-
- this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+ if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of worker failures reached")
+ }
+ yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
}
@@ -266,21 +272,27 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
- // must be <= timeoutInterval/ 2.
- // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
- // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
- val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+
+ // we want to be reasonably responsive without causing too many requests to RM.
+ val schedulerInterval =
+ System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+
+ // must be <= timeoutInterval / 2.
+ val interval = math.min(timeoutInterval / 2, schedulerInterval)
launchReporterThread(interval)
}
}
- // TODO: We might want to extend this to allocate more containers in case they die !
private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
val t = new Thread {
override def run() {
while (userThread.isAlive) {
+ if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of worker failures reached")
+ }
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) {
logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
@@ -319,7 +331,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
*/
- def finishApplicationMaster(status: FinalApplicationStatus) {
+ def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
synchronized {
if (isFinished) {
@@ -333,6 +345,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
+ finishReq.setDiagnostics(diagnostics)
// set tracking url to empty since we don't have a history server
finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index c38bdd14ec..1078d5b826 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -60,6 +60,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
def run() {
+ validateArgs()
+
init(yarnConf)
start()
logClusterResourceDetails()
@@ -84,6 +86,23 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
System.exit(0)
}
+ def validateArgs() = {
+ Map((System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
+ (args.userJar == null) -> "Error: You must specify a user jar!",
+ (args.userClass == null) -> "Error: You must specify a user class!",
+ (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
+ (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
+ ("Error: AM memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD),
+ (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
+ ("Error: Worker memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString()))
+ .foreach { case(cond, errStr) =>
+ if (cond) {
+ logError(errStr)
+ args.printUsageAndExit(1)
+ }
+ }
+ }
+
def getAppStagingDir(appId: ApplicationId): String = {
SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
}
@@ -97,7 +116,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
", queueChildQueueCount=" + queueInfo.getChildQueues.size)
}
-
def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
@@ -215,11 +233,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- if (System.getenv("SPARK_JAR") == null || args.userJar == null) {
- logError("Error: You must set SPARK_JAR environment variable and specify a user jar!")
- System.exit(1)
- }
-
Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
.foreach { case(destName, _localPath) =>
@@ -334,7 +347,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
-
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
// node, spark gc effects all other containers performance (which can also be other spark containers)
@@ -360,11 +372,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
}
- if (args.userClass == null) {
- logError("Error: You must specify a user class!")
- System.exit(1)
- }
-
val commands = List[String](javaCommand +
" -server " +
JAVA_OPTS +
@@ -442,6 +449,7 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true")
val args = new ClientArguments(argStrings)
+
new Client(args).run
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 25da9aa917..507a0743fd 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -72,9 +72,11 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
// Used to generate a unique id per worker
private val workerIdCounter = new AtomicInteger()
private val lastResponseId = new AtomicInteger()
+ private val numWorkersFailed = new AtomicInteger()
def getNumWorkersRunning: Int = numWorkersRunning.intValue
+ def getNumWorkersFailed: Int = numWorkersFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
@@ -253,8 +255,16 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
else {
// simply decrement count - next iteration of ReporterThread will take care of allocating !
numWorkersRunning.decrementAndGet()
- logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState +
- " httpaddress: " + completedContainer.getDiagnostics)
+ logInfo("Container completed not by us ? nodeId: " + containerId + ", state " + completedContainer.getState +
+ " httpaddress: " + completedContainer.getDiagnostics + " exit status: " + completedContainer.getExitStatus())
+
+ // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+ // there are some exit status' we shouldn't necessarily count against us, but for
+ // now I think its ok as none of the containers are expected to exit
+ if (completedContainer.getExitStatus() != 0) {
+ logInfo("Container marked as failed: " + containerId)
+ numWorkersFailed.incrementAndGet()
+ }
}
allocatedHostToContainersMap.synchronized {
@@ -378,8 +388,6 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
val releasedContainerList = createReleasedContainerList()
req.addAllReleases(releasedContainerList)
-
-
if (numWorkers > 0) {
logInfo("Allocating " + numWorkers + " worker containers with " + (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + " of memory each.")
}