From 651a96adf7b53085bd810e153f8eabf52eed1994 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sat, 7 Sep 2013 00:34:12 -0400 Subject: More fair scheduler docs and property names. Also changed uses of "job" terminology to "application" when they referred to an entire Spark program, to avoid confusion. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 5 +++-- .../org/apache/spark/scheduler/cluster/SchedulableBuilder.scala | 6 +++--- core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala | 2 +- .../main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 2 +- 4 files changed, 8 insertions(+), 7 deletions(-) (limited to 'core/src/main/scala') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 89318712a5..edf71c9db6 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -260,7 +260,7 @@ class SparkContext( private val localProperties = new DynamicVariable[Properties](null) def initLocalProperties() { - localProperties.value = new Properties() + localProperties.value = new Properties() } def setLocalProperty(key: String, value: String) { @@ -723,7 +723,8 @@ class SparkContext( val callSite = Utils.formatSparkCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime - val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.value) + val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, + localProperties.value) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") rdd.doCheckpoint() result diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala index d04eeb6b98..f80823317b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala @@ -51,8 +51,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { - val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file") - val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool" + val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file") + val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool" val DEFAULT_POOL_NAME = "default" val MINIMUM_SHARES_PROPERTY = "minShare" val SCHEDULING_MODE_PROPERTY = "schedulingMode" @@ -60,7 +60,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) val POOL_NAME_PROPERTY = "@name" val POOLS_PROPERTY = "pool" val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO - val DEFAULT_MINIMUM_SHARE = 2 + val DEFAULT_MINIMUM_SHARE = 0 val DEFAULT_WEIGHT = 1 override def buildPools() { diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala index 2ae23cd523..3ec9760ed0 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala @@ -49,7 +49,7 @@ private[spark] object UIWorkloadGenerator { def setProperties(s: String) = { if(schedulingMode == SchedulingMode.FAIR) { - sc.setLocalProperty("spark.scheduler.cluster.fair.pool", s) + sc.setLocalProperty("spark.scheduler.pool", s) } sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index e2bcd98545..5d46f38a2a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -95,7 +95,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList activeStages += stage val poolName = Option(stageSubmitted.properties).map { - p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME) + p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) stageToPool(stage) = poolName -- cgit v1.2.3