aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-10-30 19:29:38 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2013-10-30 19:29:38 -0700
commita124658e53a5abeda00a2582385a294c8e452d21 (patch)
treec667705be40854affbe979403d758b27e49fab98
parent5e91495f5c718c837b5a5af2268f6faad00d357f (diff)
downloadspark-a124658e53a5abeda00a2582385a294c8e452d21.tar.gz
spark-a124658e53a5abeda00a2582385a294c8e452d21.tar.bz2
spark-a124658e53a5abeda00a2582385a294c8e452d21.zip
Fixed most issues with unit tests
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala94
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala (renamed from core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala)3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala (renamed from core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala)13
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerSuite.scala (renamed from core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala)56
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala (renamed from core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala)43
5 files changed, 103 insertions, 106 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 00f2fdd657..394a1bb06f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -34,6 +34,24 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
/**
+ * TaskScheduler that records the task sets that the DAGScheduler requested executed.
+ */
+class TaskSetRecordingTaskScheduler(sc: SparkContext) extends TaskScheduler(sc) {
+ /** Set of TaskSets the DAGScheduler has requested executed. */
+ val taskSets = scala.collection.mutable.Buffer[TaskSet]()
+ override def start() = {}
+ override def stop() = {}
+ override def submitTasks(taskSet: TaskSet) = {
+ // normally done by TaskSetManager
+ taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
+ taskSets += taskSet
+ }
+ override def cancelTasks(stageId: Int) {}
+ override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
+ override def defaultParallelism() = 2
+}
+
+/**
* Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
* rather than spawning an event loop thread as happens in the real code. They use EasyMock
* to mock out two classes that DAGScheduler interacts with: TaskScheduler (to which TaskSets are
@@ -46,24 +64,7 @@ import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
* and capturing the resulting TaskSets from the mock TaskScheduler.
*/
class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
-
- /** Set of TaskSets the DAGScheduler has requested executed. */
- val taskSets = scala.collection.mutable.Buffer[TaskSet]()
- val taskScheduler = new TaskScheduler() {
- override def rootPool: Pool = null
- override def schedulingMode: SchedulingMode = SchedulingMode.NONE
- override def start() = {}
- override def stop() = {}
- override def submitTasks(taskSet: TaskSet) = {
- // normally done by TaskSetManager
- taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
- taskSets += taskSet
- }
- override def cancelTasks(stageId: Int) {}
- override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
- override def defaultParallelism() = 2
- }
-
+ var taskScheduler: TaskSetRecordingTaskScheduler = null
var mapOutputTracker: MapOutputTrackerMaster = null
var scheduler: DAGScheduler = null
@@ -96,7 +97,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
before {
sc = new SparkContext("local", "DAGSchedulerSuite")
- taskSets.clear()
+ taskScheduler = new TaskSetRecordingTaskScheduler(sc)
+ taskScheduler.taskSets.clear()
cacheLocations.clear()
results.clear()
mapOutputTracker = new MapOutputTrackerMaster()
@@ -204,7 +206,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
test("run trivial job") {
val rdd = makeRdd(1, Nil)
submit(rdd, Array(0))
- complete(taskSets(0), List((Success, 42)))
+ complete(taskScheduler.taskSets(0), List((Success, 42)))
assert(results === Map(0 -> 42))
}
@@ -225,7 +227,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val baseRdd = makeRdd(1, Nil)
val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
submit(finalRdd, Array(0))
- complete(taskSets(0), Seq((Success, 42)))
+ complete(taskScheduler.taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
}
@@ -235,7 +237,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
cacheLocations(baseRdd.id -> 0) =
Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))
submit(finalRdd, Array(0))
- val taskSet = taskSets(0)
+ val taskSet = taskScheduler.taskSets(0)
assertLocations(taskSet, Seq(Seq("hostA", "hostB")))
complete(taskSet, Seq((Success, 42)))
assert(results === Map(0 -> 42))
@@ -243,7 +245,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
test("trivial job failure") {
submit(makeRdd(1, Nil), Array(0))
- failed(taskSets(0), "some failure")
+ failed(taskScheduler.taskSets(0), "some failure")
assert(failure.getMessage === "Job aborted: some failure")
}
@@ -253,12 +255,12 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val shuffleId = shuffleDep.shuffleId
val reduceRdd = makeRdd(1, List(shuffleDep))
submit(reduceRdd, Array(0))
- complete(taskSets(0), Seq(
+ complete(taskScheduler.taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
- complete(taskSets(1), Seq((Success, 42)))
+ complete(taskScheduler.taskSets(1), Seq((Success, 42)))
assert(results === Map(0 -> 42))
}
@@ -268,11 +270,11 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val shuffleId = shuffleDep.shuffleId
val reduceRdd = makeRdd(2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
- complete(taskSets(0), Seq(
+ complete(taskScheduler.taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
// the 2nd ResultTask failed
- complete(taskSets(1), Seq(
+ complete(taskScheduler.taskSets(1), Seq(
(Success, 42),
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null)))
// this will get called
@@ -280,10 +282,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
// ask the scheduler to try it again
scheduler.resubmitFailedStages()
// have the 2nd attempt pass
- complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
+ complete(taskScheduler.taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
// we can see both result blocks now
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB"))
- complete(taskSets(3), Seq((Success, 43)))
+ complete(taskScheduler.taskSets(3), Seq((Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
}
@@ -299,7 +301,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)
val noAccum = Map[Long, Any]()
- val taskSet = taskSets(0)
+ val taskSet = taskScheduler.taskSets(0)
// should be ignored for being too old
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
// should work because it's a non-failed host
@@ -311,7 +313,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
- complete(taskSets(1), Seq((Success, 42), (Success, 43)))
+ complete(taskScheduler.taskSets(1), Seq((Success, 42), (Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
}
@@ -326,14 +328,14 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
runEvent(ExecutorLost("exec-hostA"))
// DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
// rather than marking it is as failed and waiting.
- complete(taskSets(0), Seq(
+ complete(taskScheduler.taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
// have hostC complete the resubmitted task
- complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
+ complete(taskScheduler.taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
- complete(taskSets(2), Seq((Success, 42)))
+ complete(taskScheduler.taskSets(2), Seq((Success, 42)))
assert(results === Map(0 -> 42))
}
@@ -345,23 +347,23 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val finalRdd = makeRdd(1, List(shuffleDepTwo))
submit(finalRdd, Array(0))
// have the first stage complete normally
- complete(taskSets(0), Seq(
+ complete(taskScheduler.taskSets(0), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2))))
// have the second stage complete normally
- complete(taskSets(1), Seq(
+ complete(taskScheduler.taskSets(1), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostC", 1))))
// fail the third stage because hostA went down
- complete(taskSets(2), Seq(
+ complete(taskScheduler.taskSets(2), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
// TODO assert this:
// blockManagerMaster.removeExecutor("exec-hostA")
// have DAGScheduler try again
scheduler.resubmitFailedStages()
- complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 2))))
- complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1))))
- complete(taskSets(5), Seq((Success, 42)))
+ complete(taskScheduler.taskSets(3), Seq((Success, makeMapStatus("hostA", 2))))
+ complete(taskScheduler.taskSets(4), Seq((Success, makeMapStatus("hostA", 1))))
+ complete(taskScheduler.taskSets(5), Seq((Success, 42)))
assert(results === Map(0 -> 42))
}
@@ -375,24 +377,24 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD"))
cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))
// complete stage 2
- complete(taskSets(0), Seq(
+ complete(taskScheduler.taskSets(0), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2))))
// complete stage 1
- complete(taskSets(1), Seq(
+ complete(taskScheduler.taskSets(1), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
// pretend stage 0 failed because hostA went down
- complete(taskSets(2), Seq(
+ complete(taskScheduler.taskSets(2), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
// TODO assert this:
// blockManagerMaster.removeExecutor("exec-hostA")
// DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun.
scheduler.resubmitFailedStages()
- assertLocations(taskSets(3), Seq(Seq("hostD")))
+ assertLocations(taskScheduler.taskSets(3), Seq(Seq("hostD")))
// allow hostD to recover
- complete(taskSets(3), Seq((Success, makeMapStatus("hostD", 1))))
- complete(taskSets(4), Seq((Success, 42)))
+ complete(taskScheduler.taskSets(3), Seq((Success, makeMapStatus("hostD", 1))))
+ complete(taskScheduler.taskSets(4), Seq((Success, 42)))
assert(results === Map(0 -> 42))
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index 0f01515179..0b90c4e74c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -15,10 +15,9 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
import org.apache.spark.TaskContext
-import org.apache.spark.scheduler.{TaskLocation, Task}
class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0) {
override def runTask(context: TaskContext): Int = 0
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 77d3038614..30e6bc5721 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -15,14 +15,13 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
import java.nio.ByteBuffer
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
-import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
import org.apache.spark.storage.TaskResultBlockId
/**
@@ -31,12 +30,12 @@ import org.apache.spark.storage.TaskResultBlockId
* Used to test the case where a BlockManager evicts the task result (or dies) before the
* TaskResult is retrieved.
*/
-class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
+class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskScheduler)
extends TaskResultGetter(sparkEnv, scheduler) {
var removedResult = false
override def enqueueSuccessfulTask(
- taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) {
+ taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
if (!removedResult) {
// Only remove the result once, since we'd like to test the case where the task eventually
// succeeds.
@@ -92,11 +91,11 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
test("task retried if result missing from block manager") {
// If this test hangs, it's probably because no resource offers were made after the task
// failed.
- val scheduler: ClusterScheduler = sc.taskScheduler match {
- case clusterScheduler: ClusterScheduler =>
+ val scheduler: TaskScheduler = sc.taskScheduler match {
+ case clusterScheduler: TaskScheduler =>
clusterScheduler
case _ =>
- assert(false, "Expect local cluster to use ClusterScheduler")
+ assert(false, "Expect local cluster to use TaskScheduler")
throw new ClassCastException
}
scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerSuite.scala
index 95d3553d91..bfbffdf261 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerSuite.scala
@@ -15,14 +15,12 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import org.apache.spark._
-import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster._
import scala.collection.mutable.ArrayBuffer
import java.util.Properties
@@ -31,9 +29,9 @@ class FakeTaskSetManager(
initPriority: Int,
initStageId: Int,
initNumTasks: Int,
- clusterScheduler: ClusterScheduler,
+ taskScheduler: TaskScheduler,
taskSet: TaskSet)
- extends ClusterTaskSetManager(clusterScheduler, taskSet) {
+ extends TaskSetManager(taskScheduler, taskSet) {
parent = null
weight = 1
@@ -104,9 +102,9 @@ class FakeTaskSetManager(
}
}
-class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
+class TaskSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
- def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): FakeTaskSetManager = {
+ def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskScheduler, taskSet: TaskSet): FakeTaskSetManager = {
new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
}
@@ -132,8 +130,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
}
test("FIFO Scheduler Test") {
- sc = new SparkContext("local", "ClusterSchedulerSuite")
- val clusterScheduler = new ClusterScheduler(sc)
+ sc = new SparkContext("local", "TaskSchedulerSuite")
+ val taskScheduler = new TaskScheduler(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new FakeTask(0)
tasks += task
@@ -143,9 +141,9 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()
- val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet)
- val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet)
- val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet)
+ val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, taskScheduler, taskSet)
+ val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, taskScheduler, taskSet)
+ val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, taskScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager0, null)
schedulableBuilder.addTaskSetManager(taskSetManager1, null)
schedulableBuilder.addTaskSetManager(taskSetManager2, null)
@@ -159,8 +157,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
}
test("Fair Scheduler Test") {
- sc = new SparkContext("local", "ClusterSchedulerSuite")
- val clusterScheduler = new ClusterScheduler(sc)
+ sc = new SparkContext("local", "TaskSchedulerSuite")
+ val taskScheduler = new TaskScheduler(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new FakeTask(0)
tasks += task
@@ -188,15 +186,15 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
val properties2 = new Properties()
properties2.setProperty("spark.scheduler.pool","2")
- val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet)
- val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet)
- val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet)
+ val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, taskScheduler, taskSet)
+ val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, taskScheduler, taskSet)
+ val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, taskScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
- val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet)
- val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet)
+ val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, taskScheduler, taskSet)
+ val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, taskScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
@@ -216,8 +214,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
}
test("Nested Pool Test") {
- sc = new SparkContext("local", "ClusterSchedulerSuite")
- val clusterScheduler = new ClusterScheduler(sc)
+ sc = new SparkContext("local", "TaskSchedulerSuite")
+ val taskScheduler = new TaskScheduler(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new FakeTask(0)
tasks += task
@@ -239,23 +237,23 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
pool1.addSchedulable(pool10)
pool1.addSchedulable(pool11)
- val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet)
- val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet)
+ val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, taskScheduler, taskSet)
+ val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, taskScheduler, taskSet)
pool00.addSchedulable(taskSetManager000)
pool00.addSchedulable(taskSetManager001)
- val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet)
- val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet)
+ val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, taskScheduler, taskSet)
+ val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, taskScheduler, taskSet)
pool01.addSchedulable(taskSetManager010)
pool01.addSchedulable(taskSetManager011)
- val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet)
- val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet)
+ val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, taskScheduler, taskSet)
+ val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, taskScheduler, taskSet)
pool10.addSchedulable(taskSetManager100)
pool10.addSchedulable(taskSetManager101)
- val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet)
- val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet)
+ val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, taskScheduler, taskSet)
+ val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, taskScheduler, taskSet)
pool11.addSchedulable(taskSetManager110)
pool11.addSchedulable(taskSetManager111)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index b97f2b19b5..fe3ea7b594 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable
@@ -23,12 +23,11 @@ import scala.collection.mutable
import org.scalatest.FunSuite
import org.apache.spark._
-import org.apache.spark.scheduler._
import org.apache.spark.executor.TaskMetrics
import java.nio.ByteBuffer
import org.apache.spark.util.{Utils, FakeClock}
-class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler(taskScheduler) {
+class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
taskScheduler.startedTasks += taskInfo.index
}
@@ -53,13 +52,13 @@ class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler
}
/**
- * A mock ClusterScheduler implementation that just remembers information about tasks started and
+ * A mock TaskScheduler implementation that just remembers information about tasks started and
* feedback received from the TaskSetManagers. Note that it's important to initialize this with
* a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
- * to work, and these are required for locality in ClusterTaskSetManager.
+ * to work, and these are required for locality in TaskSetManager.
*/
-class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
- extends ClusterScheduler(sc)
+class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
+ extends TaskScheduler(sc)
{
val startedTasks = new ArrayBuffer[Long]
val endedTasks = new mutable.HashMap[Long, TaskEndReason]
@@ -79,16 +78,16 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
}
-class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
+class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
test("TaskSet with no preferences") {
sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(1)
- val manager = new ClusterTaskSetManager(sched, taskSet)
+ val manager = new TaskSetManager(sched, taskSet)
// Offer a host with no CPUs
assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None)
@@ -112,9 +111,9 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
test("multiple offers with no preferences") {
sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(3)
- val manager = new ClusterTaskSetManager(sched, taskSet)
+ val manager = new TaskSetManager(sched, taskSet)
// First three offers should all find tasks
for (i <- 0 until 3) {
@@ -143,7 +142,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
test("basic delay scheduling") {
sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = createTaskSet(4,
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host2", "exec2")),
@@ -151,7 +150,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
Seq() // Last task has no locality prefs
)
val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+ val manager = new TaskSetManager(sched, taskSet, clock)
// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
@@ -187,7 +186,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
test("delay scheduling with fallback") {
sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc,
+ val sched = new FakeTaskScheduler(sc,
("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
val taskSet = createTaskSet(5,
Seq(TaskLocation("host1")),
@@ -197,7 +196,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
Seq(TaskLocation("host2"))
)
val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+ val manager = new TaskSetManager(sched, taskSet, clock)
// First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
@@ -227,14 +226,14 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
test("delay scheduling with failed hosts") {
sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = createTaskSet(3,
Seq(TaskLocation("host1")),
Seq(TaskLocation("host2")),
Seq(TaskLocation("host3"))
)
val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+ val manager = new TaskSetManager(sched, taskSet, clock)
// First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
@@ -259,10 +258,10 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
test("task result lost") {
sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(1)
val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+ val manager = new TaskSetManager(sched, taskSet, clock)
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
@@ -276,10 +275,10 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
test("repeated failures lead to task set abortion") {
sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(1)
val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+ val manager = new TaskSetManager(sched, taskSet, clock)
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
// after the last failure.