aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-05-18 16:55:45 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-05-18 16:55:45 -0700
commit4fb52f9545ae338fae2d3aeea4bfc35d5df44853 (patch)
tree39a97e0973fcb1a7fd528d226443ef99765bf1ba /core
parenteb4632f282d070e1dfd5ffed968fa212896137da (diff)
downloadspark-4fb52f9545ae338fae2d3aeea4bfc35d5df44853.tar.gz
spark-4fb52f9545ae338fae2d3aeea4bfc35d5df44853.tar.bz2
spark-4fb52f9545ae338fae2d3aeea4bfc35d5df44853.zip
[SPARK-7624] Revert #4147
Author: Davies Liu <davies@databricks.com> Closes #6172 from davies/revert_4147 and squashes the following commits: 3bfbbde [Davies Liu] Revert #4147
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala23
1 files changed, 2 insertions, 21 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index e64d06c4d3..3078a1b10b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -18,14 +18,12 @@
package org.apache.spark.scheduler.local
import java.nio.ByteBuffer
-import java.util.concurrent.TimeUnit
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
-import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
-import org.apache.spark.util.{ThreadUtils, Utils}
private case class ReviveOffers()
@@ -47,9 +45,6 @@ private[spark] class LocalEndpoint(
private val totalCores: Int)
extends ThreadSafeRpcEndpoint with Logging {
- private val reviveThread =
- ThreadUtils.newDaemonSingleThreadScheduledExecutor("local-revive-thread")
-
private var freeCores = totalCores
private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
@@ -79,27 +74,13 @@ private[spark] class LocalEndpoint(
context.reply(true)
}
-
def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
- val tasks = scheduler.resourceOffers(offers).flatten
- for (task <- tasks) {
+ for (task <- scheduler.resourceOffers(offers).flatten) {
freeCores -= scheduler.CPUS_PER_TASK
executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
task.name, task.serializedTask)
}
- if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
- // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
- reviveThread.schedule(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- Option(self).foreach(_.send(ReviveOffers))
- }
- }, 1000, TimeUnit.MILLISECONDS)
- }
- }
-
- override def onStop(): Unit = {
- reviveThread.shutdownNow()
}
}