diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala | 11 |
1 files changed, 10 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 05b6fa5456..4676b828d3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -19,6 +19,8 @@ package org.apache.spark.scheduler.local import java.nio.ByteBuffer +import scala.concurrent.duration._ + import akka.actor.{Actor, ActorRef, Props} import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState} @@ -46,6 +48,8 @@ private[spark] class LocalActor( private val totalCores: Int) extends Actor with ActorLogReceive with Logging { + import context.dispatcher // to use Akka's scheduler.scheduleOnce() + private var freeCores = totalCores private val localExecutorId = SparkContext.DRIVER_IDENTIFIER @@ -74,11 +78,16 @@ private[spark] class LocalActor( def reviveOffers() { val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) - for (task <- scheduler.resourceOffers(offers).flatten) { + val tasks = scheduler.resourceOffers(offers).flatten + for (task <- tasks) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, task.name, task.serializedTask) } + if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) { + // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout + context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers) + } } } |