aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorAndrew xia <junluan.xia@intel.com>2013-06-15 01:34:17 +0800
committerAndrew xia <junluan.xia@intel.com>2013-06-15 01:46:13 +0800
commit53add598f2fe09759a0df1e08f87f70503f808c5 (patch)
tree4a3e2c814d7e96cc3137eba33668ea1a41fe376f /core/src/test/scala
parent190ec617997d621c11ed1aab662a6e3a06815d2f (diff)
downloadspark-53add598f2fe09759a0df1e08f87f70503f808c5.tar.gz
spark-53add598f2fe09759a0df1e08f87f70503f808c5.tar.bz2
spark-53add598f2fe09759a0df1e08f87f70503f808c5.zip
Update LocalSchedulerSuite to avoid using sleep for task launch
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala83
1 files changed, 59 insertions, 24 deletions
diff --git a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
index 37d14ed113..8bd813fd14 100644
--- a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
@@ -9,9 +9,7 @@ import spark.scheduler.cluster._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ConcurrentMap, HashMap}
import java.util.concurrent.Semaphore
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.atomic.AtomicInteger
-
+import java.util.concurrent.CountDownLatch
import java.util.Properties
class Lock() {
@@ -35,9 +33,19 @@ class Lock() {
object TaskThreadInfo {
val threadToLock = HashMap[Int, Lock]()
val threadToRunning = HashMap[Int, Boolean]()
+ val threadToStarted = HashMap[Int, CountDownLatch]()
}
-
+/*
+ * 1. each thread contains one job.
+ * 2. each job contains one stage.
+ * 3. each stage only contains one task.
+ * 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure
+ * it will get cpu core resource, and will wait to finished after user manually
+ * release "Lock" and then cluster will contain another free cpu cores.
+ * 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue,
+ * thus it will be scheduled later when cluster has free cpu cores.
+ */
class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore) {
@@ -45,22 +53,23 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToRunning(threadIndex) = false
val nums = sc.parallelize(threadIndex to threadIndex, 1)
TaskThreadInfo.threadToLock(threadIndex) = new Lock()
+ TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
new Thread {
- if (poolName != null) {
- sc.addLocalProperties("spark.scheduler.cluster.fair.pool",poolName)
- }
- override def run() {
- val ans = nums.map(number => {
- TaskThreadInfo.threadToRunning(number) = true
- TaskThreadInfo.threadToLock(number).jobWait()
- number
- }).collect()
- assert(ans.toList === List(threadIndex))
- sem.release()
- TaskThreadInfo.threadToRunning(threadIndex) = false
- }
+ if (poolName != null) {
+ sc.addLocalProperties("spark.scheduler.cluster.fair.pool",poolName)
+ }
+ override def run() {
+ val ans = nums.map(number => {
+ TaskThreadInfo.threadToRunning(number) = true
+ TaskThreadInfo.threadToStarted(number).countDown()
+ TaskThreadInfo.threadToLock(number).jobWait()
+ TaskThreadInfo.threadToRunning(number) = false
+ number
+ }).collect()
+ assert(ans.toList === List(threadIndex))
+ sem.release()
+ }
}.start()
- Thread.sleep(2000)
}
test("Local FIFO scheduler end-to-end test") {
@@ -69,11 +78,24 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
val sem = new Semaphore(0)
createThread(1,null,sc,sem)
+ TaskThreadInfo.threadToStarted(1).await()
createThread(2,null,sc,sem)
+ TaskThreadInfo.threadToStarted(2).await()
createThread(3,null,sc,sem)
+ TaskThreadInfo.threadToStarted(3).await()
createThread(4,null,sc,sem)
+ TaskThreadInfo.threadToStarted(4).await()
+ // thread 5 and 6 (stage pending)must meet following two points
+ // 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager
+ // queue before executing TaskThreadInfo.threadToLock(1).jobFinished()
+ // 2. priority of stage in thread 5 should be prior to priority of stage in thread 6
+ // So I just use "sleep" 1s here for each thread.
+ // TODO: any better solution?
createThread(5,null,sc,sem)
+ Thread.sleep(1000)
createThread(6,null,sc,sem)
+ Thread.sleep(1000)
+
assert(TaskThreadInfo.threadToRunning(1) === true)
assert(TaskThreadInfo.threadToRunning(2) === true)
assert(TaskThreadInfo.threadToRunning(3) === true)
@@ -82,8 +104,8 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
assert(TaskThreadInfo.threadToRunning(6) === false)
TaskThreadInfo.threadToLock(1).jobFinished()
- Thread.sleep(1000)
-
+ TaskThreadInfo.threadToStarted(5).await()
+
assert(TaskThreadInfo.threadToRunning(1) === false)
assert(TaskThreadInfo.threadToRunning(2) === true)
assert(TaskThreadInfo.threadToRunning(3) === true)
@@ -92,7 +114,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
assert(TaskThreadInfo.threadToRunning(6) === false)
TaskThreadInfo.threadToLock(3).jobFinished()
- Thread.sleep(1000)
+ TaskThreadInfo.threadToStarted(6).await()
assert(TaskThreadInfo.threadToRunning(1) === false)
assert(TaskThreadInfo.threadToRunning(2) === true)
@@ -116,23 +138,31 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
System.setProperty("spark.fairscheduler.allocation.file", xmlPath)
createThread(10,"1",sc,sem)
+ TaskThreadInfo.threadToStarted(10).await()
createThread(20,"2",sc,sem)
+ TaskThreadInfo.threadToStarted(20).await()
createThread(30,"3",sc,sem)
+ TaskThreadInfo.threadToStarted(30).await()
assert(TaskThreadInfo.threadToRunning(10) === true)
assert(TaskThreadInfo.threadToRunning(20) === true)
assert(TaskThreadInfo.threadToRunning(30) === true)
createThread(11,"1",sc,sem)
+ TaskThreadInfo.threadToStarted(11).await()
createThread(21,"2",sc,sem)
+ TaskThreadInfo.threadToStarted(21).await()
createThread(31,"3",sc,sem)
+ TaskThreadInfo.threadToStarted(31).await()
assert(TaskThreadInfo.threadToRunning(11) === true)
assert(TaskThreadInfo.threadToRunning(21) === true)
assert(TaskThreadInfo.threadToRunning(31) === true)
createThread(12,"1",sc,sem)
+ TaskThreadInfo.threadToStarted(12).await()
createThread(22,"2",sc,sem)
+ TaskThreadInfo.threadToStarted(22).await()
createThread(32,"3",sc,sem)
assert(TaskThreadInfo.threadToRunning(12) === true)
@@ -140,20 +170,25 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
assert(TaskThreadInfo.threadToRunning(32) === false)
TaskThreadInfo.threadToLock(10).jobFinished()
- Thread.sleep(1000)
+ TaskThreadInfo.threadToStarted(32).await()
+
assert(TaskThreadInfo.threadToRunning(32) === true)
+ //1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager
+ // queue so that cluster will assign free cpu core to stage 23 after stage 11 finished.
+ //2. priority of 23 and 33 will be meaningless as using fair scheduler here.
createThread(23,"2",sc,sem)
createThread(33,"3",sc,sem)
+ Thread.sleep(1000)
TaskThreadInfo.threadToLock(11).jobFinished()
- Thread.sleep(1000)
+ TaskThreadInfo.threadToStarted(23).await()
assert(TaskThreadInfo.threadToRunning(23) === true)
assert(TaskThreadInfo.threadToRunning(33) === false)
TaskThreadInfo.threadToLock(12).jobFinished()
- Thread.sleep(1000)
+ TaskThreadInfo.threadToStarted(33).await()
assert(TaskThreadInfo.threadToRunning(33) === true)