aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoulmachine <soulmachine@gmail.com>2013-11-09 22:38:27 +0800
committersoulmachine <soulmachine@gmail.com>2013-11-09 22:38:27 +0800
commit28115fa8cb942c907a90e48ee1171f2a9b698411 (patch)
treefccf249320a81eeda0e05ae62a538d61b986fba5
parentdd63c548c228d7775670e4664be18ebd1c62bed7 (diff)
downloadspark-28115fa8cb942c907a90e48ee1171f2a9b698411.tar.gz
spark-28115fa8cb942c907a90e48ee1171f2a9b698411.tar.bz2
spark-28115fa8cb942c907a90e48ee1171f2a9b698411.zip
replace the thread with a Akka scheduler
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala23
1 files changed, 8 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 85033958ef..53a589615d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
+import akka.util.duration._
+
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler._
@@ -119,21 +121,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
backend.start()
if (System.getProperty("spark.speculation", "false").toBoolean) {
- new Thread("ClusterScheduler speculation check") {
- setDaemon(true)
-
- override def run() {
- logInfo("Starting speculative execution thread")
- while (true) {
- try {
- Thread.sleep(SPECULATION_INTERVAL)
- } catch {
- case e: InterruptedException => {}
- }
- checkSpeculatableTasks()
- }
- }
- }.start()
+ logInfo("Starting speculative execution thread")
+
+ sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
+ SPECULATION_INTERVAL milliseconds) {
+ checkSpeculatableTasks()
+ }
}
}