aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-02-03 22:30:23 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2015-02-03 22:30:23 -0800
commit83de71c45bb9f22049243dd7518b679c4e13c2df (patch)
tree134797e34ea9543357263540bfb8d843080e0228
parent242b4f02df7f71ebcfa86a85c9ed39e40750a7fd (diff)
downloadspark-83de71c45bb9f22049243dd7518b679c4e13c2df.tar.gz
spark-83de71c45bb9f22049243dd7518b679c4e13c2df.tar.bz2
spark-83de71c45bb9f22049243dd7518b679c4e13c2df.zip
[SPARK-4939] revive offers periodically in LocalBackend
The locality timeout assume that the SchedulerBackend can revive offers periodically, but currently LocalBackend did do that, then some job with mixed locality levels in local mode will hang forever. This PR let LocalBackend revive offers periodically, just like in cluster mode. Author: Davies Liu <davies@databricks.com> Closes #4147 from davies/revive and squashes the following commits: 2acdf9d [Davies Liu] Update LocalBackend.scala 3c8ca7c [Davies Liu] Update LocalBackend.scala d1b60d2 [Davies Liu] address comments from Kay 33ac9bb [Davies Liu] fix build d0da0d5 [Davies Liu] Merge branch 'master' of github.com:apache/spark into revive 6cf5972 [Davies Liu] fix thread-safety ed62a31 [Davies Liu] fix scala style df9008b [Davies Liu] fix typo bfc1396 [Davies Liu] revive offers periodically in LocalBackend
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala11
1 files changed, 10 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 05b6fa5456..4676b828d3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -19,6 +19,8 @@ package org.apache.spark.scheduler.local
import java.nio.ByteBuffer
+import scala.concurrent.duration._
+
import akka.actor.{Actor, ActorRef, Props}
import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
@@ -46,6 +48,8 @@ private[spark] class LocalActor(
private val totalCores: Int)
extends Actor with ActorLogReceive with Logging {
+ import context.dispatcher // to use Akka's scheduler.scheduleOnce()
+
private var freeCores = totalCores
private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
@@ -74,11 +78,16 @@ private[spark] class LocalActor(
def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
- for (task <- scheduler.resourceOffers(offers).flatten) {
+ val tasks = scheduler.resourceOffers(offers).flatten
+ for (task <- tasks) {
freeCores -= scheduler.CPUS_PER_TASK
executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
task.name, task.serializedTask)
}
+ if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
+ // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
+ context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
+ }
}
}