From 1a28f92711cb59ad99bc9e3dd84a5990181e572b Mon Sep 17 00:00:00 2001 From: Andrew xia Date: Fri, 29 Mar 2013 08:34:28 +0800 Subject: change some typo and some spacing --- core/src/main/scala/spark/SparkContext.scala | 12 ++++++------ core/src/main/scala/spark/scheduler/DAGScheduler.scala | 4 ++-- core/src/main/scala/spark/scheduler/Stage.scala | 5 ++--- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 6eccb501c7..ed5f686379 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -74,7 +74,7 @@ class SparkContext( if (System.getProperty("spark.driver.port") == null) { System.setProperty("spark.driver.port", "0") } - + //Set the default task scheduler if (System.getProperty("spark.cluster.taskscheduler") == null) { System.setProperty("spark.cluster.taskscheduler", "spark.scheduler.cluster.FIFOTaskSetQueuesManager") @@ -119,7 +119,7 @@ class SparkContext( } } executorEnvs ++= environment - + // Create and start the scheduler private var taskScheduler: TaskScheduler = { // Regular expression used for local[N] master format @@ -216,14 +216,14 @@ class SparkContext( } private[spark] var checkpointDir: Option[String] = None - + // Thread Local variable that can be used by users to pass information down the stack private val localProperties = new DynamicVariable[Properties](null) - + def initLocalProperties() { localProperties.value = new Properties() } - + def addLocalProperties(key: String, value: String) { if(localProperties.value == null) { localProperties.value = new Properties() @@ -673,7 +673,7 @@ class SparkContext( val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter) runJob[T, U](rdd, processFunc, 0 until rdd.partitions.size, false, resultHandler) } - + /** * Run a job that can return approximate results. */ diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 0a64a4f041..abc24c0270 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -333,7 +333,7 @@ class DAGScheduler( submitStage(stage) } } - + /** * Check for waiting or failed stages which are now eligible for resubmission. * Ordinarily run on every iteration of the event loop. @@ -720,7 +720,7 @@ class DAGScheduler( sizeBefore = shuffleToMapStage.size shuffleToMapStage.clearOldValues(cleanupTime) logInfo("shuffleToMapStage " + sizeBefore + " --> " + shuffleToMapStage.size) - + sizeBefore = pendingTasks.size pendingTasks.clearOldValues(cleanupTime) logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size) diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala index bc54cd601d..7fc9e13fd9 100644 --- a/core/src/main/scala/spark/scheduler/Stage.scala +++ b/core/src/main/scala/spark/scheduler/Stage.scala @@ -5,7 +5,6 @@ import java.net.URI import spark._ import spark.storage.BlockManagerId - /** * A stage is a set of independent tasks all computing the same function that need to run as part * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run @@ -27,7 +26,7 @@ private[spark] class Stage( val parents: List[Stage], val priority: Int) extends Logging { - + val isShuffleMap = shuffleDep != None val numPartitions = rdd.partitions.size val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) @@ -61,7 +60,7 @@ private[spark] class Stage( numAvailableOutputs -= 1 } } - + def removeOutputsOnExecutor(execId: String) { var becameUnavailable = false for (partition <- 0 until numPartitions) { -- cgit v1.2.3