aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <reynoldx@gmail.com>2013-08-14 17:19:42 -0700
committerReynold Xin <reynoldx@gmail.com>2013-08-14 17:19:42 -0700
commit3886b5493341b2abdf12d4d8052145399a6e590a (patch)
treef5301195179bcb8b307b0c0d12032ffb45e4c03b /core
parent839f2d4f3f7f39615c1c840b0d7c9394da6a2e64 (diff)
downloadspark-3886b5493341b2abdf12d4d8052145399a6e590a.tar.gz
spark-3886b5493341b2abdf12d4d8052145399a6e590a.tar.bz2
spark-3886b5493341b2abdf12d4d8052145399a6e590a.zip
A few small scheduler / job description changes.
1. Renamed SparkContext.addLocalProperty to setLocalProperty. And allow this function to unset a property. 2. Renamed SparkContext.setDescription to setCurrentJobDescription. 3. Throw an exception if the fair scheduler allocation file is invalid.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala14
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala78
-rw-r--r--core/src/main/scala/spark/ui/UIWorkloadGenerator.scala4
-rw-r--r--core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala48
4 files changed, 74 insertions, 70 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 1069e27513..a9851c1722 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -267,16 +267,20 @@ class SparkContext(
localProperties.value = new Properties()
}
- def addLocalProperty(key: String, value: String) {
- if(localProperties.value == null) {
+ def setLocalProperty(key: String, value: String) {
+ if (localProperties.value == null) {
localProperties.value = new Properties()
}
- localProperties.value.setProperty(key,value)
+ if (value == null) {
+ localProperties.value.remove(key)
+ } else {
+ localProperties.value.setProperty(key, value)
+ }
}
/** Set a human readable description of the current job. */
- def setDescription(value: String) {
- addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
+ def setCurrentJobDescription(value: String) {
+ setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
}
// Post init
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
index b2d089f31d..2fc8a76a05 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
@@ -17,19 +17,14 @@
package spark.scheduler.cluster
-import java.io.{File, FileInputStream, FileOutputStream}
+import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException}
+import java.util.Properties
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import scala.util.control.Breaks._
-import scala.xml._
+import scala.xml.XML
import spark.Logging
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
-import java.util.Properties
/**
* An interface to build Schedulable tree
@@ -56,7 +51,7 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
- val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
+ val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file")
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
val DEFAULT_POOL_NAME = "default"
val MINIMUM_SHARES_PROPERTY = "minShare"
@@ -69,39 +64,44 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val DEFAULT_WEIGHT = 1
override def buildPools() {
+ if (schedulerAllocFile != null) {
val file = new File(schedulerAllocFile)
- if (file.exists()) {
- val xml = XML.loadFile(file)
- for (poolNode <- (xml \\ POOLS_PROPERTY)) {
-
- val poolName = (poolNode \ POOL_NAME_PROPERTY).text
- var schedulingMode = DEFAULT_SCHEDULING_MODE
- var minShare = DEFAULT_MINIMUM_SHARE
- var weight = DEFAULT_WEIGHT
-
- val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
- if (xmlSchedulingMode != "") {
- try {
- schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
- } catch {
- case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
+ if (file.exists()) {
+ val xml = XML.loadFile(file)
+ for (poolNode <- (xml \\ POOLS_PROPERTY)) {
+
+ val poolName = (poolNode \ POOL_NAME_PROPERTY).text
+ var schedulingMode = DEFAULT_SCHEDULING_MODE
+ var minShare = DEFAULT_MINIMUM_SHARE
+ var weight = DEFAULT_WEIGHT
+
+ val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
+ if (xmlSchedulingMode != "") {
+ try {
+ schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
+ } catch {
+ case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
+ }
}
- }
- val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
- if (xmlMinShare != "") {
- minShare = xmlMinShare.toInt
- }
+ val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
+ if (xmlMinShare != "") {
+ minShare = xmlMinShare.toInt
+ }
- val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
- if (xmlWeight != "") {
- weight = xmlWeight.toInt
- }
+ val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
+ if (xmlWeight != "") {
+ weight = xmlWeight.toInt
+ }
- val pool = new Pool(poolName, schedulingMode, minShare, weight)
- rootPool.addSchedulable(pool)
- logInfo("Create new pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
- poolName, schedulingMode, minShare, weight))
+ val pool = new Pool(poolName, schedulingMode, minShare, weight)
+ rootPool.addSchedulable(pool)
+ logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+ poolName, schedulingMode, minShare, weight))
+ }
+ } else {
+ throw new java.io.FileNotFoundException(
+ "Fair scheduler allocation file not found: " + schedulerAllocFile)
}
}
@@ -110,7 +110,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
- logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
+ logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
@@ -127,7 +127,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
- logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
+ logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
index 97ea644021..0dfb1a064c 100644
--- a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
@@ -48,9 +48,9 @@ private[spark] object UIWorkloadGenerator {
def setProperties(s: String) = {
if(schedulingMode == SchedulingMode.FAIR) {
- sc.addLocalProperty("spark.scheduler.cluster.fair.pool", s)
+ sc.setLocalProperty("spark.scheduler.cluster.fair.pool", s)
}
- sc.addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
+ sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
}
val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)
diff --git a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
index 66fd59e8bb..a79b8bf256 100644
--- a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
@@ -57,23 +57,23 @@ object TaskThreadInfo {
* 1. each thread contains one job.
* 2. each job contains one stage.
* 3. each stage only contains one task.
- * 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure
- * it will get cpu core resource, and will wait to finished after user manually
- * release "Lock" and then cluster will contain another free cpu cores.
- * 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue,
+ * 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure
+ * it will get cpu core resource, and will wait to finished after user manually
+ * release "Lock" and then cluster will contain another free cpu cores.
+ * 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue,
* thus it will be scheduled later when cluster has free cpu cores.
*/
class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore) {
-
+
TaskThreadInfo.threadToRunning(threadIndex) = false
val nums = sc.parallelize(threadIndex to threadIndex, 1)
TaskThreadInfo.threadToLock(threadIndex) = new Lock()
TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
new Thread {
if (poolName != null) {
- sc.addLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
+ sc.setLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
}
override def run() {
val ans = nums.map(number => {
@@ -88,7 +88,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
}
}.start()
}
-
+
test("Local FIFO scheduler end-to-end test") {
System.setProperty("spark.cluster.schedulingmode", "FIFO")
sc = new SparkContext("local[4]", "test")
@@ -103,8 +103,8 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
createThread(4,null,sc,sem)
TaskThreadInfo.threadToStarted(4).await()
// thread 5 and 6 (stage pending)must meet following two points
- // 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager
- // queue before executing TaskThreadInfo.threadToLock(1).jobFinished()
+ // 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager
+ // queue before executing TaskThreadInfo.threadToLock(1).jobFinished()
// 2. priority of stage in thread 5 should be prior to priority of stage in thread 6
// So I just use "sleep" 1s here for each thread.
// TODO: any better solution?
@@ -112,24 +112,24 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
Thread.sleep(1000)
createThread(6,null,sc,sem)
Thread.sleep(1000)
-
+
assert(TaskThreadInfo.threadToRunning(1) === true)
assert(TaskThreadInfo.threadToRunning(2) === true)
assert(TaskThreadInfo.threadToRunning(3) === true)
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === false)
assert(TaskThreadInfo.threadToRunning(6) === false)
-
+
TaskThreadInfo.threadToLock(1).jobFinished()
TaskThreadInfo.threadToStarted(5).await()
-
+
assert(TaskThreadInfo.threadToRunning(1) === false)
assert(TaskThreadInfo.threadToRunning(2) === true)
assert(TaskThreadInfo.threadToRunning(3) === true)
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === true)
assert(TaskThreadInfo.threadToRunning(6) === false)
-
+
TaskThreadInfo.threadToLock(3).jobFinished()
TaskThreadInfo.threadToStarted(6).await()
@@ -139,7 +139,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === true)
assert(TaskThreadInfo.threadToRunning(6) === true)
-
+
TaskThreadInfo.threadToLock(2).jobFinished()
TaskThreadInfo.threadToLock(4).jobFinished()
TaskThreadInfo.threadToLock(5).jobFinished()
@@ -160,18 +160,18 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToStarted(20).await()
createThread(30,"3",sc,sem)
TaskThreadInfo.threadToStarted(30).await()
-
+
assert(TaskThreadInfo.threadToRunning(10) === true)
assert(TaskThreadInfo.threadToRunning(20) === true)
assert(TaskThreadInfo.threadToRunning(30) === true)
-
+
createThread(11,"1",sc,sem)
TaskThreadInfo.threadToStarted(11).await()
createThread(21,"2",sc,sem)
TaskThreadInfo.threadToStarted(21).await()
createThread(31,"3",sc,sem)
TaskThreadInfo.threadToStarted(31).await()
-
+
assert(TaskThreadInfo.threadToRunning(11) === true)
assert(TaskThreadInfo.threadToRunning(21) === true)
assert(TaskThreadInfo.threadToRunning(31) === true)
@@ -185,19 +185,19 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
assert(TaskThreadInfo.threadToRunning(12) === true)
assert(TaskThreadInfo.threadToRunning(22) === true)
assert(TaskThreadInfo.threadToRunning(32) === false)
-
+
TaskThreadInfo.threadToLock(10).jobFinished()
TaskThreadInfo.threadToStarted(32).await()
-
+
assert(TaskThreadInfo.threadToRunning(32) === true)
- //1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager
+ //1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager
// queue so that cluster will assign free cpu core to stage 23 after stage 11 finished.
//2. priority of 23 and 33 will be meaningless as using fair scheduler here.
createThread(23,"2",sc,sem)
createThread(33,"3",sc,sem)
Thread.sleep(1000)
-
+
TaskThreadInfo.threadToLock(11).jobFinished()
TaskThreadInfo.threadToStarted(23).await()
@@ -206,7 +206,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToLock(12).jobFinished()
TaskThreadInfo.threadToStarted(33).await()
-
+
assert(TaskThreadInfo.threadToRunning(33) === true)
TaskThreadInfo.threadToLock(20).jobFinished()
@@ -217,7 +217,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToLock(31).jobFinished()
TaskThreadInfo.threadToLock(32).jobFinished()
TaskThreadInfo.threadToLock(33).jobFinished()
-
- sem.acquire(11)
+
+ sem.acquire(11)
}
}