diff options
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala | 70 |
1 files changed, 45 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 96325a0329..f8bee3eea5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -20,10 +20,11 @@ package org.apache.spark.scheduler import java.io.{FileInputStream, InputStream} import java.util.{NoSuchElementException, Properties} -import scala.xml.XML +import scala.xml.{Node, XML} import org.apache.spark.SparkConf import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.util.Utils /** @@ -102,38 +103,57 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) for (poolNode <- (xml \\ POOLS_PROPERTY)) { val poolName = (poolNode \ POOL_NAME_PROPERTY).text - var schedulingMode = DEFAULT_SCHEDULING_MODE - var minShare = DEFAULT_MINIMUM_SHARE - var weight = DEFAULT_WEIGHT - - val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text - if (xmlSchedulingMode != "") { - try { - schedulingMode = SchedulingMode.withName(xmlSchedulingMode) - } catch { - case e: NoSuchElementException => - logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " + - s"using the default schedulingMode: $schedulingMode") - } - } - val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text - if (xmlMinShare != "") { - minShare = xmlMinShare.toInt - } + val schedulingMode = getSchedulingModeValue(poolNode, poolName, DEFAULT_SCHEDULING_MODE) + val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, DEFAULT_MINIMUM_SHARE) + val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, DEFAULT_WEIGHT) - val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text - if (xmlWeight != "") { - weight = xmlWeight.toInt - } + rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight)) - val pool = new Pool(poolName, schedulingMode, minShare, weight) - rootPool.addSchedulable(pool) logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, schedulingMode, minShare, weight)) } } + private def getSchedulingModeValue( + poolNode: Node, + poolName: String, + defaultValue: SchedulingMode): SchedulingMode = { + + val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase + val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " + + s"schedulingMode: $defaultValue for pool: $poolName" + try { + if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) { + SchedulingMode.withName(xmlSchedulingMode) + } else { + logWarning(warningMessage) + defaultValue + } + } catch { + case e: NoSuchElementException => + logWarning(warningMessage) + defaultValue + } + } + + private def getIntValue( + poolNode: Node, + poolName: String, + propertyName: String, defaultValue: Int): Int = { + + val data = (poolNode \ propertyName).text.trim + try { + data.toInt + } catch { + case e: NumberFormatException => + logWarning(s"Error while loading scheduler allocation file. " + + s"$propertyName is blank or invalid: $data, using the default $propertyName: " + + s"$defaultValue for pool: $poolName") + defaultValue + } + } + override def addTaskSetManager(manager: Schedulable, properties: Properties) { var poolName = DEFAULT_POOL_NAME var parentPool = rootPool.getSchedulableByName(poolName) |