diff options
Diffstat (limited to 'core')
8 files changed, 19 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4f711a5ea6..29407bcd30 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -260,7 +260,7 @@ class SparkContext( private val localProperties = new DynamicVariable[Properties](null) def initLocalProperties() { - localProperties.value = new Properties() + localProperties.value = new Properties() } def setLocalProperty(key: String, value: String) { @@ -723,7 +723,8 @@ class SparkContext( val callSite = Utils.formatSparkCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime - val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.value) + val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, + localProperties.value) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") rdd.doCheckpoint() result diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 3196ab5022..919acce828 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -94,7 +94,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) var rootPool: Pool = null // default scheduler is FIFO val schedulingMode: SchedulingMode = SchedulingMode.withName( - System.getProperty("spark.cluster.schedulingmode", "FIFO")) + System.getProperty("spark.scheduler.mode", "FIFO")) override def setListener(listener: TaskSchedulerListener) { this.listener = listener diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala index d04eeb6b98..f80823317b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala @@ -51,8 +51,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { - val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file") - val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool" + val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file") + val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool" val DEFAULT_POOL_NAME = "default" val MINIMUM_SHARES_PROPERTY = "minShare" val SCHEDULING_MODE_PROPERTY = "schedulingMode" @@ -60,7 +60,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) val POOL_NAME_PROPERTY = "@name" val POOLS_PROPERTY = "pool" val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO - val DEFAULT_MINIMUM_SHARE = 2 + val DEFAULT_MINIMUM_SHARE = 0 val DEFAULT_WEIGHT = 1 override def buildPools() { diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala index e8fa5e2f17..8cb4d1396f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala @@ -91,7 +91,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: var schedulableBuilder: SchedulableBuilder = null var rootPool: Pool = null val schedulingMode: SchedulingMode = SchedulingMode.withName( - System.getProperty("spark.cluster.schedulingmode", "FIFO")) + System.getProperty("spark.scheduler.mode", "FIFO")) val activeTaskSets = new HashMap[String, TaskSetManager] val taskIdToTaskSetId = new HashMap[Long, String] val taskSetTaskIds = new HashMap[String, HashSet[Long]] diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala index 0ecb22d2f9..3ec9760ed0 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala @@ -43,13 +43,13 @@ private[spark] object UIWorkloadGenerator { val appName = "Spark UI Tester" if (schedulingMode == SchedulingMode.FAIR) { - System.setProperty("spark.cluster.schedulingmode", "FAIR") + System.setProperty("spark.scheduler.mode", "FAIR") } val sc = new SparkContext(master, appName) def setProperties(s: String) = { if(schedulingMode == SchedulingMode.FAIR) { - sc.setLocalProperty("spark.scheduler.cluster.fair.pool", s) + sc.setLocalProperty("spark.scheduler.pool", s) } sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index e2bcd98545..5d46f38a2a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -95,7 +95,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList activeStages += stage val poolName = Option(stageSubmitted.properties).map { - p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME) + p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) stageToPool(stage) = poolName diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala index 92ad9f09b2..1b50ce06b3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala @@ -166,7 +166,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging val taskSet = new TaskSet(tasks.toArray,0,0,0,null) val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - System.setProperty("spark.fairscheduler.allocation.file", xmlPath) + System.setProperty("spark.scheduler.allocation.file", xmlPath) val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) val schedulableBuilder = new FairSchedulableBuilder(rootPool) schedulableBuilder.buildPools() @@ -179,13 +179,13 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging assert(rootPool.getSchedulableByName("1").weight === 1) assert(rootPool.getSchedulableByName("2").minShare === 3) assert(rootPool.getSchedulableByName("2").weight === 1) - assert(rootPool.getSchedulableByName("3").minShare === 2) + assert(rootPool.getSchedulableByName("3").minShare === 0) assert(rootPool.getSchedulableByName("3").weight === 1) val properties1 = new Properties() - properties1.setProperty("spark.scheduler.cluster.fair.pool","1") + properties1.setProperty("spark.scheduler.pool","1") val properties2 = new Properties() - properties2.setProperty("spark.scheduler.cluster.fair.pool","2") + properties2.setProperty("spark.scheduler.pool","2") val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet) val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet) diff --git a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala index 111340a65c..af76c843e8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala @@ -73,7 +73,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1) new Thread { if (poolName != null) { - sc.setLocalProperty("spark.scheduler.cluster.fair.pool", poolName) + sc.setLocalProperty("spark.scheduler.pool", poolName) } override def run() { val ans = nums.map(number => { @@ -90,7 +90,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { } test("Local FIFO scheduler end-to-end test") { - System.setProperty("spark.cluster.schedulingmode", "FIFO") + System.setProperty("spark.scheduler.mode", "FIFO") sc = new SparkContext("local[4]", "test") val sem = new Semaphore(0) @@ -150,9 +150,9 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { test("Local fair scheduler end-to-end test") { sc = new SparkContext("local[8]", "LocalSchedulerSuite") val sem = new Semaphore(0) - System.setProperty("spark.cluster.schedulingmode", "FAIR") + System.setProperty("spark.scheduler.mode", "FAIR") val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - System.setProperty("spark.fairscheduler.allocation.file", xmlPath) + System.setProperty("spark.scheduler.allocation.file", xmlPath) createThread(10,"1",sc,sem) TaskThreadInfo.threadToStarted(10).await() |