aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew xia <junluan.xia@intel.com>2013-03-29 08:34:28 +0800
committerAndrew xia <junluan.xia@intel.com>2013-03-29 08:34:28 +0800
commit1a28f92711cb59ad99bc9e3dd84a5990181e572b (patch)
treea110f6e7ccc99bd333de590a62d706a47c33f694
parentdef3d1c84a3e0d1371239e9358294a4b4ad46b9f (diff)
downloadspark-1a28f92711cb59ad99bc9e3dd84a5990181e572b.tar.gz
spark-1a28f92711cb59ad99bc9e3dd84a5990181e572b.tar.bz2
spark-1a28f92711cb59ad99bc9e3dd84a5990181e572b.zip
change some typo and some spacing
-rw-r--r--core/src/main/scala/spark/SparkContext.scala12
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/Stage.scala5
3 files changed, 10 insertions, 11 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 6eccb501c7..ed5f686379 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -74,7 +74,7 @@ class SparkContext(
if (System.getProperty("spark.driver.port") == null) {
System.setProperty("spark.driver.port", "0")
}
-
+
//Set the default task scheduler
if (System.getProperty("spark.cluster.taskscheduler") == null) {
System.setProperty("spark.cluster.taskscheduler", "spark.scheduler.cluster.FIFOTaskSetQueuesManager")
@@ -119,7 +119,7 @@ class SparkContext(
}
}
executorEnvs ++= environment
-
+
// Create and start the scheduler
private var taskScheduler: TaskScheduler = {
// Regular expression used for local[N] master format
@@ -216,14 +216,14 @@ class SparkContext(
}
private[spark] var checkpointDir: Option[String] = None
-
+
// Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new DynamicVariable[Properties](null)
-
+
def initLocalProperties() {
localProperties.value = new Properties()
}
-
+
def addLocalProperties(key: String, value: String) {
if(localProperties.value == null) {
localProperties.value = new Properties()
@@ -673,7 +673,7 @@ class SparkContext(
val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter)
runJob[T, U](rdd, processFunc, 0 until rdd.partitions.size, false, resultHandler)
}
-
+
/**
* Run a job that can return approximate results.
*/
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 0a64a4f041..abc24c0270 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -333,7 +333,7 @@ class DAGScheduler(
submitStage(stage)
}
}
-
+
/**
* Check for waiting or failed stages which are now eligible for resubmission.
* Ordinarily run on every iteration of the event loop.
@@ -720,7 +720,7 @@ class DAGScheduler(
sizeBefore = shuffleToMapStage.size
shuffleToMapStage.clearOldValues(cleanupTime)
logInfo("shuffleToMapStage " + sizeBefore + " --> " + shuffleToMapStage.size)
-
+
sizeBefore = pendingTasks.size
pendingTasks.clearOldValues(cleanupTime)
logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size)
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index bc54cd601d..7fc9e13fd9 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -5,7 +5,6 @@ import java.net.URI
import spark._
import spark.storage.BlockManagerId
-
/**
* A stage is a set of independent tasks all computing the same function that need to run as part
* of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
@@ -27,7 +26,7 @@ private[spark] class Stage(
val parents: List[Stage],
val priority: Int)
extends Logging {
-
+
val isShuffleMap = shuffleDep != None
val numPartitions = rdd.partitions.size
val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
@@ -61,7 +60,7 @@ private[spark] class Stage(
numAvailableOutputs -= 1
}
}
-
+
def removeOutputsOnExecutor(execId: String) {
var becameUnavailable = false
for (partition <- 0 until numPartitions) {