aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <reynoldx@gmail.com>2013-09-22 15:06:48 -0700
committerReynold Xin <reynoldx@gmail.com>2013-09-22 15:06:48 -0700
commit834686b108ce31cbee531d89de6c6e80913448f4 (patch)
tree71239dda6351242bb27c1ff7d82789fbb93918bf
parenta2ea069a5f2ed83268109deade456dc0fc9b79ee (diff)
parent77e9da1f34a0b9e556d7c0bbd4aeaa5c635b881d (diff)
downloadspark-834686b108ce31cbee531d89de6c6e80913448f4.tar.gz
spark-834686b108ce31cbee531d89de6c6e80913448f4.tar.bz2
spark-834686b108ce31cbee531d89de6c6e80913448f4.zip
Merge pull request #928 from jerryshao/fairscheduler-refactor
Refactor FairSchedulableBuilder
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala99
1 files changed, 56 insertions, 43 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
index f80823317b..114617c51a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
@@ -17,14 +17,12 @@
package org.apache.spark.scheduler.cluster
-import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException}
-import java.util.Properties
-
-import scala.xml.XML
+import java.io.{FileInputStream, InputStream}
+import java.util.{NoSuchElementException, Properties}
import org.apache.spark.Logging
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import scala.xml.XML
/**
* An interface to build Schedulable tree
@@ -51,7 +49,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
- val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file")
+ val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
+ val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
val MINIMUM_SHARES_PROPERTY = "minShare"
@@ -64,48 +63,26 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val DEFAULT_WEIGHT = 1
override def buildPools() {
- if (schedulerAllocFile != null) {
- val file = new File(schedulerAllocFile)
- if (file.exists()) {
- val xml = XML.loadFile(file)
- for (poolNode <- (xml \\ POOLS_PROPERTY)) {
-
- val poolName = (poolNode \ POOL_NAME_PROPERTY).text
- var schedulingMode = DEFAULT_SCHEDULING_MODE
- var minShare = DEFAULT_MINIMUM_SHARE
- var weight = DEFAULT_WEIGHT
-
- val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
- if (xmlSchedulingMode != "") {
- try {
- schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
- } catch {
- case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
- }
- }
-
- val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
- if (xmlMinShare != "") {
- minShare = xmlMinShare.toInt
- }
-
- val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
- if (xmlWeight != "") {
- weight = xmlWeight.toInt
- }
-
- val pool = new Pool(poolName, schedulingMode, minShare, weight)
- rootPool.addSchedulable(pool)
- logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
- poolName, schedulingMode, minShare, weight))
+ var is: Option[InputStream] = None
+ try {
+ is = Option {
+ schedulerAllocFile.map { f =>
+ new FileInputStream(f)
+ }.getOrElse {
+ getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
}
- } else {
- throw new java.io.FileNotFoundException(
- "Fair scheduler allocation file not found: " + schedulerAllocFile)
}
+
+ is.foreach { i => buildFairSchedulerPool(i) }
+ } finally {
+ is.foreach(_.close())
}
// finally create "default" pool
+ buildDefaultPool()
+ }
+
+ private def buildDefaultPool() {
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
@@ -115,6 +92,42 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
}
}
+ private def buildFairSchedulerPool(is: InputStream) {
+ val xml = XML.load(is)
+ for (poolNode <- (xml \\ POOLS_PROPERTY)) {
+
+ val poolName = (poolNode \ POOL_NAME_PROPERTY).text
+ var schedulingMode = DEFAULT_SCHEDULING_MODE
+ var minShare = DEFAULT_MINIMUM_SHARE
+ var weight = DEFAULT_WEIGHT
+
+ val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
+ if (xmlSchedulingMode != "") {
+ try {
+ schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
+ } catch {
+ case e: NoSuchElementException =>
+ logWarning("Error xml schedulingMode, using default schedulingMode")
+ }
+ }
+
+ val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
+ if (xmlMinShare != "") {
+ minShare = xmlMinShare.toInt
+ }
+
+ val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
+ if (xmlWeight != "") {
+ weight = xmlWeight.toInt
+ }
+
+ val pool = new Pool(poolName, schedulingMode, minShare, weight)
+ rootPool.addSchedulable(pool)
+ logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+ poolName, schedulingMode, minShare, weight))
+ }
+ }
+
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
var poolName = DEFAULT_POOL_NAME
var parentPool = rootPool.getSchedulableByName(poolName)