aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew xia <junluan.xia@intel.com>2013-07-30 11:41:38 +0800
committerAndrew xia <junluan.xia@intel.com>2013-07-30 11:41:38 +0800
commit5406013997d6b9e9e3f43c09cd3f53ec7b815efb (patch)
tree593431ed690c44bfea6ad2714c8adf61f3b4edf1 /core
parent614ee16cc4c63260f13d0c7494fbaafa8a061e95 (diff)
downloadspark-5406013997d6b9e9e3f43c09cd3f53ec7b815efb.tar.gz
spark-5406013997d6b9e9e3f43c09cd3f53ec7b815efb.tar.bz2
spark-5406013997d6b9e9e3f43c09cd3f53ec7b815efb.zip
refactor codes less than 100 character per line
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala7
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListener.scala15
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala15
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala3
-rw-r--r--core/src/main/scala/spark/ui/jobs/IndexPage.scala9
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressListener.scala3
9 files changed, 39 insertions, 24 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 375636071d..7747160290 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -44,9 +44,10 @@ import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
-import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
-import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler, Schedulable,
-SchedulingMode}
+import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener,
+ SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
+import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
+ ClusterScheduler, Schedulable, SchedulingMode}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo}
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index c865743e37..5fda78e152 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -473,7 +473,8 @@ class DAGScheduler(
}
if (tasks.size > 0) {
val properties = idToActiveJob(stage.priority).properties
- sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size, properties)))
+ sparkListeners.foreach(_.onStageSubmitted(
+ SparkListenerStageSubmitted(stage, tasks.size, properties)))
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index 07372ee786..49f7c85c29 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -8,17 +8,18 @@ import spark.executor.TaskMetrics
sealed trait SparkListenerEvents
-case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties) extends SparkListenerEvents
+case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties)
+ extends SparkListenerEvents
case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
taskMetrics: TaskMetrics) extends SparkListenerEvents
-case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
+case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
extends SparkListenerEvents
-case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
+case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
extends SparkListenerEvents
trait SparkListener {
@@ -26,12 +27,12 @@ trait SparkListener {
* Called when a stage is completed, with information on the completed stage
*/
def onStageCompleted(stageCompleted: StageCompleted) { }
-
+
/**
* Called when a stage is submitted
*/
def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
-
+
/**
* Called when a task ends
*/
@@ -41,12 +42,12 @@ trait SparkListener {
* Called when a job starts
*/
def onJobStart(jobStart: SparkListenerJobStart) { }
-
+
/**
* Called when a job ends
*/
def onJobEnd(jobEnd: SparkListenerJobEnd) { }
-
+
}
/**
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 74b3e43d2b..20680bbf87 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -9,10 +9,10 @@ import scala.collection.mutable.HashSet
import spark._
import spark.TaskState.TaskState
import spark.scheduler._
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
import java.util.{TimerTask, Timer}
-import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
@@ -257,7 +257,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
for (manager <- sortedTaskSetQueue) {
- logInfo("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks))
+ logInfo("parentName:%s, name:%s, runningTasks:%s".format(
+ manager.parent.name, manager.name, manager.runningTasks))
}
for (manager <- sortedTaskSetQueue) {
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 4d11b0959a..5e2351bafd 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -644,7 +644,8 @@ private[spark] class ClusterTaskSetManager(
}
}
- // TODO: for now we just find Pool not TaskSetManager, we can extend this function in future if needed
+ // TODO: for now we just find Pool not TaskSetManager
+ // we can extend this function in future if needed
override def getSchedulableByName(name: String): Schedulable = {
return null
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
index a2fa80aa36..db51b48494 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
@@ -24,7 +24,8 @@ private[spark] trait SchedulableBuilder {
def addTaskSetManager(manager: Schedulable, properties: Properties)
}
-private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
+private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
+ extends SchedulableBuilder with Logging {
override def buildPools() {
// nothing
@@ -35,7 +36,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends Schedula
}
}
-private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
+private[spark] class FairSchedulableBuilder(val rootPool: Pool)
+ extends SchedulableBuilder with Logging {
val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
@@ -88,7 +90,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
// finally create "default" pool
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
- val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
+ val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
+ DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
@@ -102,8 +105,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
parentPool = rootPool.getSchedulableByName(poolName)
if (parentPool == null) {
- // we will create a new pool that user has configured in app instead of being defined in xml file
- parentPool = new Pool(poolName,DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
+ // we will create a new pool that user has configured in app
+ // instead of being defined in xml file
+ parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
+ DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
index a7f0f6f393..cd0642772d 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
@@ -1,7 +1,8 @@
package spark.scheduler.cluster
/**
- * "FAIR" and "FIFO" determines which policy is used to order tasks amongst a Schedulable's sub-queues
+ * "FAIR" and "FIFO" determines which policy is used
+ * to order tasks amongst a Schedulable's sub-queues
* "NONE" is used when the a Schedulable has no sub-queues.
*/
object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") {
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index abef683791..04651e9c60 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -40,9 +40,12 @@ private[spark] class IndexPage(parent: JobProgressUI) {
</div>
</div> ++
<h3>Pools </h3> ++ poolTable.toNodeSeq ++
- <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq++
- <h3>Completed Stages : {completedStages.size}</h3> ++ completedStagesTable.toNodeSeq++
- <h3>Failed Stages : {failedStages.size}</h3> ++ failedStagesTable.toNodeSeq
+ <h3>Active Stages : {activeStages.size}</h3> ++
+ activeStagesTable.toNodeSeq++
+ <h3>Completed Stages : {completedStages.size}</h3> ++
+ completedStagesTable.toNodeSeq++
+ <h3>Failed Stages : {failedStages.size}</h3> ++
+ failedStagesTable.toNodeSeq
headerSparkPage(content, parent.sc, "Spark Stages/Pools", Jobs)
}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
index d4767bea22..da767b3c0a 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
@@ -53,7 +53,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
activeStages += stage
var poolName = DEFAULT_POOL_NAME
if (stageSubmitted.properties != null) {
- poolName = stageSubmitted.properties.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
+ poolName = stageSubmitted.properties.getProperty("spark.scheduler.cluster.fair.pool",
+ DEFAULT_POOL_NAME)
}
stageToPool(stage) = poolName
val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())