diff options
author | Andrew xia <junluan.xia@intel.com> | 2013-06-15 01:34:17 +0800 |
---|---|---|
committer | Andrew xia <junluan.xia@intel.com> | 2013-06-15 01:46:13 +0800 |
commit | 53add598f2fe09759a0df1e08f87f70503f808c5 (patch) | |
tree | 4a3e2c814d7e96cc3137eba33668ea1a41fe376f /core/src/test | |
parent | 190ec617997d621c11ed1aab662a6e3a06815d2f (diff) | |
download | spark-53add598f2fe09759a0df1e08f87f70503f808c5.tar.gz spark-53add598f2fe09759a0df1e08f87f70503f808c5.tar.bz2 spark-53add598f2fe09759a0df1e08f87f70503f808c5.zip |
Update LocalSchedulerSuite to avoid using sleep for task launch
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala | 83 |
1 files changed, 59 insertions, 24 deletions
diff --git a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala index 37d14ed113..8bd813fd14 100644 --- a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala @@ -9,9 +9,7 @@ import spark.scheduler.cluster._ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.{ConcurrentMap, HashMap} import java.util.concurrent.Semaphore -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicInteger - +import java.util.concurrent.CountDownLatch import java.util.Properties class Lock() { @@ -35,9 +33,19 @@ class Lock() { object TaskThreadInfo { val threadToLock = HashMap[Int, Lock]() val threadToRunning = HashMap[Int, Boolean]() + val threadToStarted = HashMap[Int, CountDownLatch]() } - +/* + * 1. each thread contains one job. + * 2. each job contains one stage. + * 3. each stage only contains one task. + * 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure + * it will get cpu core resource, and will wait to finished after user manually + * release "Lock" and then cluster will contain another free cpu cores. + * 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue, + * thus it will be scheduled later when cluster has free cpu cores. + */ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore) { @@ -45,22 +53,23 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { TaskThreadInfo.threadToRunning(threadIndex) = false val nums = sc.parallelize(threadIndex to threadIndex, 1) TaskThreadInfo.threadToLock(threadIndex) = new Lock() + TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1) new Thread { - if (poolName != null) { - sc.addLocalProperties("spark.scheduler.cluster.fair.pool",poolName) - } - override def run() { - val ans = nums.map(number => { - TaskThreadInfo.threadToRunning(number) = true - TaskThreadInfo.threadToLock(number).jobWait() - number - }).collect() - assert(ans.toList === List(threadIndex)) - sem.release() - TaskThreadInfo.threadToRunning(threadIndex) = false - } + if (poolName != null) { + sc.addLocalProperties("spark.scheduler.cluster.fair.pool",poolName) + } + override def run() { + val ans = nums.map(number => { + TaskThreadInfo.threadToRunning(number) = true + TaskThreadInfo.threadToStarted(number).countDown() + TaskThreadInfo.threadToLock(number).jobWait() + TaskThreadInfo.threadToRunning(number) = false + number + }).collect() + assert(ans.toList === List(threadIndex)) + sem.release() + } }.start() - Thread.sleep(2000) } test("Local FIFO scheduler end-to-end test") { @@ -69,11 +78,24 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { val sem = new Semaphore(0) createThread(1,null,sc,sem) + TaskThreadInfo.threadToStarted(1).await() createThread(2,null,sc,sem) + TaskThreadInfo.threadToStarted(2).await() createThread(3,null,sc,sem) + TaskThreadInfo.threadToStarted(3).await() createThread(4,null,sc,sem) + TaskThreadInfo.threadToStarted(4).await() + // thread 5 and 6 (stage pending)must meet following two points + // 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager + // queue before executing TaskThreadInfo.threadToLock(1).jobFinished() + // 2. priority of stage in thread 5 should be prior to priority of stage in thread 6 + // So I just use "sleep" 1s here for each thread. + // TODO: any better solution? createThread(5,null,sc,sem) + Thread.sleep(1000) createThread(6,null,sc,sem) + Thread.sleep(1000) + assert(TaskThreadInfo.threadToRunning(1) === true) assert(TaskThreadInfo.threadToRunning(2) === true) assert(TaskThreadInfo.threadToRunning(3) === true) @@ -82,8 +104,8 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { assert(TaskThreadInfo.threadToRunning(6) === false) TaskThreadInfo.threadToLock(1).jobFinished() - Thread.sleep(1000) - + TaskThreadInfo.threadToStarted(5).await() + assert(TaskThreadInfo.threadToRunning(1) === false) assert(TaskThreadInfo.threadToRunning(2) === true) assert(TaskThreadInfo.threadToRunning(3) === true) @@ -92,7 +114,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { assert(TaskThreadInfo.threadToRunning(6) === false) TaskThreadInfo.threadToLock(3).jobFinished() - Thread.sleep(1000) + TaskThreadInfo.threadToStarted(6).await() assert(TaskThreadInfo.threadToRunning(1) === false) assert(TaskThreadInfo.threadToRunning(2) === true) @@ -116,23 +138,31 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { System.setProperty("spark.fairscheduler.allocation.file", xmlPath) createThread(10,"1",sc,sem) + TaskThreadInfo.threadToStarted(10).await() createThread(20,"2",sc,sem) + TaskThreadInfo.threadToStarted(20).await() createThread(30,"3",sc,sem) + TaskThreadInfo.threadToStarted(30).await() assert(TaskThreadInfo.threadToRunning(10) === true) assert(TaskThreadInfo.threadToRunning(20) === true) assert(TaskThreadInfo.threadToRunning(30) === true) createThread(11,"1",sc,sem) + TaskThreadInfo.threadToStarted(11).await() createThread(21,"2",sc,sem) + TaskThreadInfo.threadToStarted(21).await() createThread(31,"3",sc,sem) + TaskThreadInfo.threadToStarted(31).await() assert(TaskThreadInfo.threadToRunning(11) === true) assert(TaskThreadInfo.threadToRunning(21) === true) assert(TaskThreadInfo.threadToRunning(31) === true) createThread(12,"1",sc,sem) + TaskThreadInfo.threadToStarted(12).await() createThread(22,"2",sc,sem) + TaskThreadInfo.threadToStarted(22).await() createThread(32,"3",sc,sem) assert(TaskThreadInfo.threadToRunning(12) === true) @@ -140,20 +170,25 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { assert(TaskThreadInfo.threadToRunning(32) === false) TaskThreadInfo.threadToLock(10).jobFinished() - Thread.sleep(1000) + TaskThreadInfo.threadToStarted(32).await() + assert(TaskThreadInfo.threadToRunning(32) === true) + //1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager + // queue so that cluster will assign free cpu core to stage 23 after stage 11 finished. + //2. priority of 23 and 33 will be meaningless as using fair scheduler here. createThread(23,"2",sc,sem) createThread(33,"3",sc,sem) + Thread.sleep(1000) TaskThreadInfo.threadToLock(11).jobFinished() - Thread.sleep(1000) + TaskThreadInfo.threadToStarted(23).await() assert(TaskThreadInfo.threadToRunning(23) === true) assert(TaskThreadInfo.threadToRunning(33) === false) TaskThreadInfo.threadToLock(12).jobFinished() - Thread.sleep(1000) + TaskThreadInfo.threadToStarted(33).await() assert(TaskThreadInfo.threadToRunning(33) === true) |