aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-10-31 23:42:56 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2013-10-31 23:42:56 -0700
commitfb64828b0b573f3a77938592f168af7aa3a2b6c5 (patch)
tree45328b142c5a7d67b1a461821c71df62f103e8d3 /core
parenta124658e53a5abeda00a2582385a294c8e452d21 (diff)
downloadspark-fb64828b0b573f3a77938592f168af7aa3a2b6c5.tar.gz
spark-fb64828b0b573f3a77938592f168af7aa3a2b6c5.tar.bz2
spark-fb64828b0b573f3a77938592f168af7aa3a2b6c5.zip
Cleaned up imports and fixed test bug
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala9
3 files changed, 6 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 3f694dd25d..b4ec695ece 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -27,7 +27,6 @@ import scala.collection.mutable.HashSet
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
-import org.apache.spark.scheduler._
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
/**
@@ -449,7 +448,7 @@ private[spark] class TaskScheduler(val sc: SparkContext, isLocal: Boolean = fals
}
-object TaskScheduler {
+private[spark] object TaskScheduler {
/**
* Used to balance containers across hosts.
*
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 13271b10f3..90b6519027 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -28,7 +28,6 @@ import scala.math.min
import org.apache.spark.{ExceptionFailure, FetchFailed, Logging, Resubmitted, SparkEnv,
Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
import org.apache.spark.TaskState.TaskState
-import org.apache.spark.scheduler._
import org.apache.spark.util.{SystemClock, Clock}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 394a1bb06f..5b5a2178f3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -36,14 +36,15 @@ import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
/**
* TaskScheduler that records the task sets that the DAGScheduler requested executed.
*/
-class TaskSetRecordingTaskScheduler(sc: SparkContext) extends TaskScheduler(sc) {
+class TaskSetRecordingTaskScheduler(sc: SparkContext,
+ mapOutputTrackerMaster: MapOutputTrackerMaster) extends TaskScheduler(sc) {
/** Set of TaskSets the DAGScheduler has requested executed. */
val taskSets = scala.collection.mutable.Buffer[TaskSet]()
override def start() = {}
override def stop() = {}
override def submitTasks(taskSet: TaskSet) = {
// normally done by TaskSetManager
- taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
+ taskSet.tasks.foreach(_.epoch = mapOutputTrackerMaster.getEpoch)
taskSets += taskSet
}
override def cancelTasks(stageId: Int) {}
@@ -97,11 +98,11 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
before {
sc = new SparkContext("local", "DAGSchedulerSuite")
- taskScheduler = new TaskSetRecordingTaskScheduler(sc)
+ mapOutputTracker = new MapOutputTrackerMaster()
+ taskScheduler = new TaskSetRecordingTaskScheduler(sc, mapOutputTracker)
taskScheduler.taskSets.clear()
cacheLocations.clear()
results.clear()
- mapOutputTracker = new MapOutputTrackerMaster()
scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, null) {
override def runLocally(job: ActiveJob) {
// don't bother with the thread while unit testing