aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala11
1 files changed, 10 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 05b6fa5456..4676b828d3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -19,6 +19,8 @@ package org.apache.spark.scheduler.local
import java.nio.ByteBuffer
+import scala.concurrent.duration._
+
import akka.actor.{Actor, ActorRef, Props}
import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
@@ -46,6 +48,8 @@ private[spark] class LocalActor(
private val totalCores: Int)
extends Actor with ActorLogReceive with Logging {
+ import context.dispatcher // to use Akka's scheduler.scheduleOnce()
+
private var freeCores = totalCores
private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
@@ -74,11 +78,16 @@ private[spark] class LocalActor(
def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
- for (task <- scheduler.resourceOffers(offers).flatten) {
+ val tasks = scheduler.resourceOffers(offers).flatten
+ for (task <- tasks) {
freeCores -= scheduler.CPUS_PER_TASK
executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
task.name, task.serializedTask)
}
+ if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
+ // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
+ context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
+ }
}
}