From f776bc98878428940b5130c0d7d9b7ee452c0bd3 Mon Sep 17 00:00:00 2001 From: woshilaiceshide Date: Wed, 23 Jul 2014 11:05:41 -0700 Subject: [CORE] SPARK-2640: In "local[N]", free cores of the only executor should be touched by "spark.task.cpus" for every finish/start-up of tasks. Make spark's "local[N]" better. In our company, we use "local[N]" in production. It works exellentlly. It's our best choice. Author: woshilaiceshide Closes #1544 from woshilaiceshide/localX and squashes the following commits: 6c85154 [woshilaiceshide] [CORE] SPARK-2640: In "local[N]", free cores of the only executor should be touched by "spark.task.cpus" for every finish/start-up of tasks. --- .../main/scala/org/apache/spark/scheduler/local/LocalBackend.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index e9f6273bfd..5b897597fa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -57,7 +57,7 @@ private[spark] class LocalActor( case StatusUpdate(taskId, state, serializedData) => scheduler.statusUpdate(taskId, state, serializedData) if (TaskState.isFinished(state)) { - freeCores += 1 + freeCores += scheduler.CPUS_PER_TASK reviveOffers() } @@ -68,7 +68,7 @@ private[spark] class LocalActor( def reviveOffers() { val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) for (task <- scheduler.resourceOffers(offers).flatten) { - freeCores -= 1 + freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, task.taskId, task.name, task.serializedTask) } } -- cgit v1.2.3