aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala67
1 files changed, 46 insertions, 21 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index f8bee3eea5..e53c4fb5b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}
+import scala.util.control.NonFatal
import scala.xml.{Node, XML}
import org.apache.spark.SparkConf
@@ -55,7 +56,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
extends SchedulableBuilder with Logging {
- val schedulerAllocFile = conf.getOption("spark.scheduler.allocation.file")
+ val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file"
+ val schedulerAllocFile = conf.getOption(SCHEDULER_ALLOCATION_FILE_PROPERTY)
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
@@ -69,19 +71,35 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
val DEFAULT_WEIGHT = 1
override def buildPools() {
- var is: Option[InputStream] = None
+ var fileData: Option[(InputStream, String)] = None
try {
- is = Option {
- schedulerAllocFile.map { f =>
- new FileInputStream(f)
- }.getOrElse {
- Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
+ fileData = schedulerAllocFile.map { f =>
+ val fis = new FileInputStream(f)
+ logInfo(s"Creating Fair Scheduler pools from $f")
+ Some((fis, f))
+ }.getOrElse {
+ val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
+ if (is != null) {
+ logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE")
+ Some((is, DEFAULT_SCHEDULER_FILE))
+ } else {
+ logWarning("Fair Scheduler configuration file not found so jobs will be scheduled in " +
+ s"FIFO order. To use fair scheduling, configure pools in $DEFAULT_SCHEDULER_FILE or " +
+ s"set $SCHEDULER_ALLOCATION_FILE_PROPERTY to a file that contains the configuration.")
+ None
}
}
- is.foreach { i => buildFairSchedulerPool(i) }
+ fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, fileName) }
+ } catch {
+ case NonFatal(t) =>
+ val defaultMessage = "Error while building the fair scheduler pools"
+ val message = fileData.map { case (is, fileName) => s"$defaultMessage from $fileName" }
+ .getOrElse(defaultMessage)
+ logError(message, t)
+ throw t
} finally {
- is.foreach(_.close())
+ fileData.foreach { case (is, fileName) => is.close() }
}
// finally create "default" pool
@@ -93,24 +111,27 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
- logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+ logInfo("Created default pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
- private def buildFairSchedulerPool(is: InputStream) {
+ private def buildFairSchedulerPool(is: InputStream, fileName: String) {
val xml = XML.load(is)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
- val schedulingMode = getSchedulingModeValue(poolNode, poolName, DEFAULT_SCHEDULING_MODE)
- val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, DEFAULT_MINIMUM_SHARE)
- val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, DEFAULT_WEIGHT)
+ val schedulingMode = getSchedulingModeValue(poolNode, poolName,
+ DEFAULT_SCHEDULING_MODE, fileName)
+ val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY,
+ DEFAULT_MINIMUM_SHARE, fileName)
+ val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY,
+ DEFAULT_WEIGHT, fileName)
rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight))
- logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+ logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
}
}
@@ -118,11 +139,13 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
private def getSchedulingModeValue(
poolNode: Node,
poolName: String,
- defaultValue: SchedulingMode): SchedulingMode = {
+ defaultValue: SchedulingMode,
+ fileName: String): SchedulingMode = {
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase
- val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " +
- s"schedulingMode: $defaultValue for pool: $poolName"
+ val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode found in " +
+ s"Fair Scheduler configuration file: $fileName, using " +
+ s"the default schedulingMode: $defaultValue for pool: $poolName"
try {
if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) {
SchedulingMode.withName(xmlSchedulingMode)
@@ -140,14 +163,16 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
private def getIntValue(
poolNode: Node,
poolName: String,
- propertyName: String, defaultValue: Int): Int = {
+ propertyName: String,
+ defaultValue: Int,
+ fileName: String): Int = {
val data = (poolNode \ propertyName).text.trim
try {
data.toInt
} catch {
case e: NumberFormatException =>
- logWarning(s"Error while loading scheduler allocation file. " +
+ logWarning(s"Error while loading fair scheduler configuration from $fileName: " +
s"$propertyName is blank or invalid: $data, using the default $propertyName: " +
s"$defaultValue for pool: $poolName")
defaultValue
@@ -166,7 +191,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
- logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+ logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}