diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-09-07 00:34:12 -0400 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-09-08 00:29:11 -0700 |
commit | 651a96adf7b53085bd810e153f8eabf52eed1994 (patch) | |
tree | 70e9c70470c93c4630de0f958eaed4b98706d2ba /core/src | |
parent | 98fb69822cf780160bca51abeaab7c82e49fab54 (diff) | |
download | spark-651a96adf7b53085bd810e153f8eabf52eed1994.tar.gz spark-651a96adf7b53085bd810e153f8eabf52eed1994.tar.bz2 spark-651a96adf7b53085bd810e153f8eabf52eed1994.zip |
More fair scheduler docs and property names.
Also changed uses of "job" terminology to "application" when they
referred to an entire Spark program, to avoid confusion.
Diffstat (limited to 'core/src')
6 files changed, 13 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 89318712a5..edf71c9db6 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -260,7 +260,7 @@ class SparkContext( private val localProperties = new DynamicVariable[Properties](null) def initLocalProperties() { - localProperties.value = new Properties() + localProperties.value = new Properties() } def setLocalProperty(key: String, value: String) { @@ -723,7 +723,8 @@ class SparkContext( val callSite = Utils.formatSparkCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime - val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.value) + val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, + localProperties.value) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") rdd.doCheckpoint() result diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala index d04eeb6b98..f80823317b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala @@ -51,8 +51,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { - val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file") - val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool" + val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file") + val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool" val DEFAULT_POOL_NAME = "default" val MINIMUM_SHARES_PROPERTY = "minShare" val SCHEDULING_MODE_PROPERTY = "schedulingMode" @@ -60,7 +60,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) val POOL_NAME_PROPERTY = "@name" val POOLS_PROPERTY = "pool" val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO - val DEFAULT_MINIMUM_SHARE = 2 + val DEFAULT_MINIMUM_SHARE = 0 val DEFAULT_WEIGHT = 1 override def buildPools() { diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala index 2ae23cd523..3ec9760ed0 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala @@ -49,7 +49,7 @@ private[spark] object UIWorkloadGenerator { def setProperties(s: String) = { if(schedulingMode == SchedulingMode.FAIR) { - sc.setLocalProperty("spark.scheduler.cluster.fair.pool", s) + sc.setLocalProperty("spark.scheduler.pool", s) } sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index e2bcd98545..5d46f38a2a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -95,7 +95,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList activeStages += stage val poolName = Option(stageSubmitted.properties).map { - p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME) + p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) stageToPool(stage) = poolName diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala index 92ad9f09b2..2b0d90e748 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala @@ -166,7 +166,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging val taskSet = new TaskSet(tasks.toArray,0,0,0,null) val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - System.setProperty("spark.fairscheduler.allocation.file", xmlPath) + System.setProperty("spark.scheduler.allocation.file", xmlPath) val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) val schedulableBuilder = new FairSchedulableBuilder(rootPool) schedulableBuilder.buildPools() @@ -183,9 +183,9 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging assert(rootPool.getSchedulableByName("3").weight === 1) val properties1 = new Properties() - properties1.setProperty("spark.scheduler.cluster.fair.pool","1") + properties1.setProperty("spark.scheduler.pool","1") val properties2 = new Properties() - properties2.setProperty("spark.scheduler.cluster.fair.pool","2") + properties2.setProperty("spark.scheduler.pool","2") val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet) val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet) diff --git a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala index ca9c590a7d..af76c843e8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala @@ -73,7 +73,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1) new Thread { if (poolName != null) { - sc.setLocalProperty("spark.scheduler.cluster.fair.pool", poolName) + sc.setLocalProperty("spark.scheduler.pool", poolName) } override def run() { val ans = nums.map(number => { @@ -152,7 +152,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { val sem = new Semaphore(0) System.setProperty("spark.scheduler.mode", "FAIR") val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - System.setProperty("spark.fairscheduler.allocation.file", xmlPath) + System.setProperty("spark.scheduler.allocation.file", xmlPath) createThread(10,"1",sc,sem) TaskThreadInfo.threadToStarted(10).await() |