aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2013-09-12 10:44:15 +0800
committerjerryshao <saisai.shao@intel.com>2013-09-22 14:20:55 +0800
commit5850f599dd93d2871e4e9777531cbbcae80a2475 (patch)
tree3eb4cfd85967c3ab3cc684bba6cf92ce63829421
parenta2ea069a5f2ed83268109deade456dc0fc9b79ee (diff)
downloadspark-5850f599dd93d2871e4e9777531cbbcae80a2475.tar.gz
spark-5850f599dd93d2871e4e9777531cbbcae80a2475.tar.bz2
spark-5850f599dd93d2871e4e9777531cbbcae80a2475.zip
Refactor FairSchedulableBuilder:
1. Configuration can be read from classpath if not set explicitly. 2. Add missing close handler.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala92
1 files changed, 53 insertions, 39 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
index f80823317b..f25924befa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler.cluster
-import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException}
+import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException, InputStream}
import java.util.Properties
import scala.xml.XML
@@ -51,7 +51,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
- val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file")
+ val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
+ val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
val MINIMUM_SHARES_PROPERTY = "minShare"
@@ -64,48 +65,26 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val DEFAULT_WEIGHT = 1
override def buildPools() {
- if (schedulerAllocFile != null) {
- val file = new File(schedulerAllocFile)
- if (file.exists()) {
- val xml = XML.loadFile(file)
- for (poolNode <- (xml \\ POOLS_PROPERTY)) {
-
- val poolName = (poolNode \ POOL_NAME_PROPERTY).text
- var schedulingMode = DEFAULT_SCHEDULING_MODE
- var minShare = DEFAULT_MINIMUM_SHARE
- var weight = DEFAULT_WEIGHT
-
- val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
- if (xmlSchedulingMode != "") {
- try {
- schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
- } catch {
- case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
- }
- }
-
- val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
- if (xmlMinShare != "") {
- minShare = xmlMinShare.toInt
- }
-
- val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
- if (xmlWeight != "") {
- weight = xmlWeight.toInt
- }
-
- val pool = new Pool(poolName, schedulingMode, minShare, weight)
- rootPool.addSchedulable(pool)
- logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
- poolName, schedulingMode, minShare, weight))
+ var is: Option[InputStream] = None
+ try {
+ is = Option {
+ schedulerAllocFile map { f =>
+ new FileInputStream(f)
+ } getOrElse {
+ getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
}
- } else {
- throw new java.io.FileNotFoundException(
- "Fair scheduler allocation file not found: " + schedulerAllocFile)
}
+
+ is foreach { i => buildFairSchedulerPool(i) }
+ } finally {
+ is.foreach(_.close)
}
// finally create "default" pool
+ buildDefaultPool()
+ }
+
+ private def buildDefaultPool() {
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
@@ -115,6 +94,41 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
}
}
+ private def buildFairSchedulerPool(is: InputStream) {
+ val xml = XML.load(is)
+ for (poolNode <- (xml \\ POOLS_PROPERTY)) {
+
+ val poolName = (poolNode \ POOL_NAME_PROPERTY).text
+ var schedulingMode = DEFAULT_SCHEDULING_MODE
+ var minShare = DEFAULT_MINIMUM_SHARE
+ var weight = DEFAULT_WEIGHT
+
+ val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
+ if (xmlSchedulingMode != "") {
+ try {
+ schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
+ } catch {
+ case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
+ }
+ }
+
+ val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
+ if (xmlMinShare != "") {
+ minShare = xmlMinShare.toInt
+ }
+
+ val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
+ if (xmlWeight != "") {
+ weight = xmlWeight.toInt
+ }
+
+ val pool = new Pool(poolName, schedulingMode, minShare, weight)
+ rootPool.addSchedulable(pool)
+ logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+ poolName, schedulingMode, minShare, weight))
+ }
+ }
+
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
var poolName = DEFAULT_POOL_NAME
var parentPool = rootPool.getSchedulableByName(poolName)