diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/cluster')
9 files changed, 255 insertions, 129 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 1a844b7e7e..85033958ef 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -17,7 +17,6 @@ package org.apache.spark.scheduler.cluster -import java.lang.{Boolean => JBoolean} import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicLong import java.util.{TimerTask, Timer} @@ -79,14 +78,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext) private val executorIdToHost = new HashMap[String, String] - // JAR server, if any JARs were added by the user to the SparkContext - var jarServer: HttpServer = null - - // URIs of JARs to pass to executor - var jarUris: String = "" - // Listener object to pass upcalls into - var listener: TaskSchedulerListener = null + var dagScheduler: DAGScheduler = null var backend: SchedulerBackend = null @@ -101,8 +94,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // This is a var so that we can reset it for testing purposes. private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this) - override def setListener(listener: TaskSchedulerListener) { - this.listener = listener + override def setDAGScheduler(dagScheduler: DAGScheduler) { + this.dagScheduler = dagScheduler } def initialize(context: SchedulerBackend) { @@ -171,8 +164,31 @@ private[spark] class ClusterScheduler(val sc: SparkContext) backend.reviveOffers() } - def taskSetFinished(manager: TaskSetManager) { - this.synchronized { + override def cancelTasks(stageId: Int): Unit = synchronized { + logInfo("Cancelling stage " + stageId) + activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) => + // There are two possible cases here: + // 1. The task set manager has been created and some tasks have been scheduled. + // In this case, send a kill signal to the executors to kill the task and then abort + // the stage. + // 2. The task set manager has been created but no tasks has been scheduled. In this case, + // simply abort the stage. + val taskIds = taskSetTaskIds(tsm.taskSet.id) + if (taskIds.size > 0) { + taskIds.foreach { tid => + val execId = taskIdToExecutorId(tid) + backend.killTask(tid, execId) + } + } + tsm.error("Stage %d was cancelled".format(stageId)) + } + } + + def taskSetFinished(manager: TaskSetManager): Unit = synchronized { + // Check to see if the given task set has been removed. This is possible in the case of + // multiple unrecoverable task failures (e.g. if the entire task set is killed when it has + // more than one running tasks). + if (activeTaskSets.contains(manager.taskSet.id)) { activeTaskSets -= manager.taskSet.id manager.parent.removeSchedulable(manager) logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name)) @@ -281,7 +297,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } // Update the DAGScheduler without holding a lock on this, since that can deadlock if (failedExecutor != None) { - listener.executorLost(failedExecutor.get) + dagScheduler.executorLost(failedExecutor.get) backend.reviveOffers() } if (taskFailed) { @@ -290,6 +306,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } + def handleTaskGettingResult(taskSetManager: ClusterTaskSetManager, tid: Long) { + taskSetManager.handleTaskGettingResult(tid) + } + def handleSuccessfulTask( taskSetManager: ClusterTaskSetManager, tid: Long, @@ -334,9 +354,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) if (backend != null) { backend.stop() } - if (jarServer != null) { - jarServer.stop() - } if (taskResultGetter != null) { taskResultGetter.stop() } @@ -384,9 +401,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) logError("Lost an executor " + executorId + " (already removed): " + reason) } } - // Call listener.executorLost without holding the lock on this to prevent deadlock + // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock if (failedExecutor != None) { - listener.executorLost(failedExecutor.get) + dagScheduler.executorLost(failedExecutor.get) backend.reviveOffers() } } @@ -405,7 +422,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } def executorGained(execId: String, host: String) { - listener.executorGained(execId, host) + dagScheduler.executorGained(execId, host) } def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala index 194ab55102..ee47aaffca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -17,18 +17,16 @@ package org.apache.spark.scheduler.cluster -import java.nio.ByteBuffer -import java.util.{Arrays, NoSuchElementException} +import java.util.Arrays import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet import scala.math.max import scala.math.min -import scala.Some import org.apache.spark.{ExceptionFailure, FetchFailed, Logging, Resubmitted, SparkEnv, - SparkException, Success, TaskEndReason, TaskResultLost, TaskState} + Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler._ import org.apache.spark.util.{SystemClock, Clock} @@ -417,11 +415,17 @@ private[spark] class ClusterTaskSetManager( } private def taskStarted(task: Task[_], info: TaskInfo) { - sched.listener.taskStarted(task, info) + sched.dagScheduler.taskStarted(task, info) + } + + def handleTaskGettingResult(tid: Long) = { + val info = taskInfos(tid) + info.markGettingResult() + sched.dagScheduler.taskGettingResult(tasks(info.index), info) } /** - * Marks the task as successful and notifies the listener that a task has ended. + * Marks the task as successful and notifies the DAGScheduler that a task has ended. */ def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = { val info = taskInfos(tid) @@ -431,7 +435,7 @@ private[spark] class ClusterTaskSetManager( if (!successful(index)) { logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format( tid, info.duration, info.host, tasksSuccessful, numTasks)) - sched.listener.taskEnded( + sched.dagScheduler.taskEnded( tasks(index), Success, result.value, result.accumUpdates, info, result.metrics) // Mark successful and stop if all the tasks have succeeded. @@ -447,7 +451,8 @@ private[spark] class ClusterTaskSetManager( } /** - * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the listener. + * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the + * DAG Scheduler. */ def handleFailedTask(tid: Long, state: TaskState, reason: Option[TaskEndReason]) { val info = taskInfos(tid) @@ -458,54 +463,57 @@ private[spark] class ClusterTaskSetManager( val index = info.index info.markFailed() if (!successful(index)) { - logInfo("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index)) + logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index)) copiesRunning(index) -= 1 // Check if the problem is a map output fetch failure. In that case, this // task will never succeed on any node, so tell the scheduler about it. reason.foreach { - _ match { - case fetchFailed: FetchFailed => - logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress) - sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null) - successful(index) = true - tasksSuccessful += 1 - sched.taskSetFinished(this) - removeAllRunningTasks() - return - - case ef: ExceptionFailure => - sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null)) - val key = ef.description - val now = clock.getTime() - val (printFull, dupCount) = { - if (recentExceptions.contains(key)) { - val (dupCount, printTime) = recentExceptions(key) - if (now - printTime > EXCEPTION_PRINT_INTERVAL) { - recentExceptions(key) = (0, now) - (true, 0) - } else { - recentExceptions(key) = (dupCount + 1, printTime) - (false, dupCount + 1) - } - } else { + case fetchFailed: FetchFailed => + logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress) + sched.dagScheduler.taskEnded(tasks(index), fetchFailed, null, null, info, null) + successful(index) = true + tasksSuccessful += 1 + sched.taskSetFinished(this) + removeAllRunningTasks() + return + + case TaskKilled => + logWarning("Task %d was killed.".format(tid)) + sched.dagScheduler.taskEnded(tasks(index), reason.get, null, null, info, null) + return + + case ef: ExceptionFailure => + sched.dagScheduler.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null)) + val key = ef.description + val now = clock.getTime() + val (printFull, dupCount) = { + if (recentExceptions.contains(key)) { + val (dupCount, printTime) = recentExceptions(key) + if (now - printTime > EXCEPTION_PRINT_INTERVAL) { recentExceptions(key) = (0, now) (true, 0) + } else { + recentExceptions(key) = (dupCount + 1, printTime) + (false, dupCount + 1) } - } - if (printFull) { - val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString)) - logInfo("Loss was due to %s\n%s\n%s".format( - ef.className, ef.description, locs.mkString("\n"))) } else { - logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount)) + recentExceptions(key) = (0, now) + (true, 0) } + } + if (printFull) { + val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString)) + logWarning("Loss was due to %s\n%s\n%s".format( + ef.className, ef.description, locs.mkString("\n"))) + } else { + logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount)) + } - case TaskResultLost => - logInfo("Lost result for TID %s on host %s".format(tid, info.host)) - sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null) + case TaskResultLost => + logWarning("Lost result for TID %s on host %s".format(tid, info.host)) + sched.dagScheduler.taskEnded(tasks(index), TaskResultLost, null, null, info, null) - case _ => {} - } + case _ => {} } // On non-fetch failures, re-enqueue the task as pending for a max number of retries addPendingTask(index) @@ -532,7 +540,7 @@ private[spark] class ClusterTaskSetManager( failed = true causeOfFailure = message // TODO: Kill running tasks if we were not terminated due to a Mesos error - sched.listener.taskSetFailed(taskSet, message) + sched.dagScheduler.taskSetFailed(taskSet, message) removeAllRunningTasks() sched.taskSetFinished(this) } @@ -605,7 +613,7 @@ private[spark] class ClusterTaskSetManager( addPendingTask(index) // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. - sched.listener.taskEnded(tasks(index), Resubmitted, null, null, info, null) + sched.dagScheduler.taskEnded(tasks(index), Resubmitted, null, null, info, null) } } } @@ -630,11 +638,11 @@ private[spark] class ClusterTaskSetManager( var foundTasks = false val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation) - if (tasksSuccessful >= minFinishedForSpeculation) { + if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) { val time = clock.getTime() val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray Arrays.sort(durations) - val medianDuration = durations(min((0.5 * numTasks).round.toInt, durations.size - 1)) + val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size - 1)) val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100) // TODO: Threshold should also look at standard deviation of task durations and have a lower // bound based on that. diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index c0b836bf1a..53316dae2a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -24,26 +24,28 @@ import org.apache.spark.scheduler.TaskDescription import org.apache.spark.util.{Utils, SerializableBuffer} -private[spark] sealed trait StandaloneClusterMessage extends Serializable +private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable -private[spark] object StandaloneClusterMessages { +private[spark] object CoarseGrainedClusterMessages { // Driver to executors - case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage + case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage + + case class KillTask(taskId: Long, executor: String) extends CoarseGrainedClusterMessage case class RegisteredExecutor(sparkProperties: Seq[(String, String)]) - extends StandaloneClusterMessage + extends CoarseGrainedClusterMessage - case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage + case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage // Executors to driver case class RegisterExecutor(executorId: String, hostPort: String, cores: Int) - extends StandaloneClusterMessage { + extends CoarseGrainedClusterMessage { Utils.checkHostPort(hostPort, "Expected host port") } case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, - data: SerializableBuffer) extends StandaloneClusterMessage + data: SerializableBuffer) extends CoarseGrainedClusterMessage object StatusUpdate { /** Alternate factory method that takes a ByteBuffer directly for the data field */ @@ -54,10 +56,14 @@ private[spark] object StandaloneClusterMessages { } // Internal messages in driver - case object ReviveOffers extends StandaloneClusterMessage + case object ReviveOffers extends CoarseGrainedClusterMessage + + case object StopDriver extends CoarseGrainedClusterMessage + + case object StopExecutor extends CoarseGrainedClusterMessage - case object StopDriver extends StandaloneClusterMessage + case object StopExecutors extends CoarseGrainedClusterMessage - case class RemoveExecutor(executorId: String, reason: String) extends StandaloneClusterMessage + case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b6f0ec961a..3ccc38d72b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -29,16 +29,19 @@ import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycle import org.apache.spark.{SparkException, Logging, TaskState} import org.apache.spark.scheduler.TaskDescription -import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._ +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.Utils /** - * A standalone scheduler backend, which waits for standalone executors to connect to it through - * Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained - * Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*). + * A scheduler backend that waits for coarse grained executors to connect to it through Akka. + * This backend holds onto each executor for the duration of the Spark job rather than relinquishing + * executors whenever a task is done and asking the scheduler to launch a new executor for + * each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the + * coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode + * (spark.deploy.*). */ private[spark] -class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem) +class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem) extends SchedulerBackend with Logging { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed @@ -84,17 +87,33 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { - freeCores(executorId) += 1 - makeOffers(executorId) + if (executorActor.contains(executorId)) { + freeCores(executorId) += 1 + makeOffers(executorId) + } else { + // Ignoring the update since we don't know about the executor. + val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s" + logWarning(msg.format(taskId, state, sender, executorId)) + } } case ReviveOffers => makeOffers() + case KillTask(taskId, executorId) => + executorActor(executorId) ! KillTask(taskId, executorId) + case StopDriver => sender ! true context.stop(self) + case StopExecutors => + logInfo("Asking each executor to shut down") + for (executor <- executorActor.values) { + executor ! StopExecutor + } + sender ! true + case RemoveExecutor(executorId, reason) => removeExecutor(executorId, reason) sender ! true @@ -159,16 +178,31 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } } driverActor = actorSystem.actorOf( - Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME) + Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME) } - private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + private val timeout = { + Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + } + + def stopExecutors() { + try { + if (driverActor != null) { + logInfo("Shutting down all executors") + val future = driverActor.ask(StopExecutors)(timeout) + Await.ready(future, timeout) + } + } catch { + case e: Exception => + throw new SparkException("Error asking standalone scheduler to shut down executors", e) + } + } override def stop() { try { if (driverActor != null) { val future = driverActor.ask(StopDriver)(timeout) - Await.result(future, timeout) + Await.ready(future, timeout) } } catch { case e: Exception => @@ -180,6 +214,10 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor driverActor ! ReviveOffers } + override def killTask(taskId: Long, executorId: String) { + driverActor ! KillTask(taskId, executorId) + } + override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism")) .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2)) @@ -187,7 +225,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor def removeExecutor(executorId: String, reason: String) { try { val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout) - Await.result(future, timeout) + Await.ready(future, timeout) } catch { case e: Exception => throw new SparkException("Error notifying standalone scheduler's driver actor", e) @@ -195,6 +233,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } } -private[spark] object StandaloneSchedulerBackend { - val ACTOR_NAME = "StandaloneScheduler" +private[spark] object CoarseGrainedSchedulerBackend { + val ACTOR_NAME = "CoarseGrainedScheduler" } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala index d57eb3276f..5367218faa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler.cluster -import org.apache.spark.{SparkContext} +import org.apache.spark.SparkContext /** * A backend interface for cluster scheduling systems that allows plugging in different ones under @@ -30,8 +30,8 @@ private[spark] trait SchedulerBackend { def reviveOffers(): Unit def defaultParallelism(): Int + def killTask(taskId: Long, executorId: String): Unit = throw new UnsupportedOperationException + // Memory used by each executor (in megabytes) protected val executorMemory: Int = SparkContext.executorMemoryRequested - - // TODO: Probably want to add a killTask too } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala new file mode 100644 index 0000000000..d78bdbaa7a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.spark.{Logging, SparkContext} + +private[spark] class SimrSchedulerBackend( + scheduler: ClusterScheduler, + sc: SparkContext, + driverFilePath: String) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + with Logging { + + val tmpPath = new Path(driverFilePath + "_tmp") + val filePath = new Path(driverFilePath) + + val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt + + override def start() { + super.start() + + val driverUrl = "akka://spark@%s:%s/user/%s".format( + System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), + CoarseGrainedSchedulerBackend.ACTOR_NAME) + + val conf = new Configuration() + val fs = FileSystem.get(conf) + + logInfo("Writing to HDFS file: " + driverFilePath) + logInfo("Writing Akka address: " + driverUrl) + + // Create temporary file to prevent race condition where executors get empty driverUrl file + val temp = fs.create(tmpPath, true) + temp.writeUTF(driverUrl) + temp.writeInt(maxCores) + temp.close() + + // "Atomic" rename + fs.rename(tmpPath, filePath) + } + + override def stop() { + val conf = new Configuration() + val fs = FileSystem.get(conf) + fs.delete(new Path(driverFilePath), false) + super.stopExecutors() + super.stop() + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index fa83ae19d6..7127a72d6d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -26,9 +26,9 @@ import org.apache.spark.util.Utils private[spark] class SparkDeploySchedulerBackend( scheduler: ClusterScheduler, sc: SparkContext, - master: String, + masters: Array[String], appName: String) - extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with ClientListener with Logging { @@ -44,15 +44,15 @@ private[spark] class SparkDeploySchedulerBackend( // The endpoint for executors to talk to us val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), - StandaloneSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME) val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}") val command = Command( - "org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs) + "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome().getOrElse(null) val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome, "http://" + sc.ui.appUIAddress) - client = new Client(sc.env.actorSystem, master, appDesc, this) + client = new Client(sc.env.actorSystem, masters, appDesc, this) client.start() } @@ -71,8 +71,14 @@ private[spark] class SparkDeploySchedulerBackend( override def disconnected() { if (!stopping) { - logError("Disconnected from Spark cluster!") - scheduler.error("Disconnected from Spark cluster") + logWarning("Disconnected from Spark cluster! Waiting for reconnection...") + } + } + + override def dead() { + if (!stopping) { + logError("Spark cluster looks dead, giving up.") + scheduler.error("Spark cluster looks down") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala index b2a8f06472..e68c527713 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala @@ -24,33 +24,16 @@ import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult} import org.apache.spark.serializer.SerializerInstance +import org.apache.spark.util.Utils /** * Runs a thread pool that deserializes and remotely fetches (if necessary) task results. */ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler) extends Logging { - private val MIN_THREADS = System.getProperty("spark.resultGetter.minThreads", "4").toInt - private val MAX_THREADS = System.getProperty("spark.resultGetter.maxThreads", "4").toInt - private val getTaskResultExecutor = new ThreadPoolExecutor( - MIN_THREADS, - MAX_THREADS, - 0L, - TimeUnit.SECONDS, - new LinkedBlockingDeque[Runnable], - new ResultResolverThreadFactory) - - class ResultResolverThreadFactory extends ThreadFactory { - private var counter = 0 - private var PREFIX = "Result resolver thread" - - override def newThread(r: Runnable): Thread = { - val thread = new Thread(r, "%s-%s".format(PREFIX, counter)) - counter += 1 - thread.setDaemon(true) - return thread - } - } + private val THREADS = System.getProperty("spark.resultGetter.threads", "4").toInt + private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool( + THREADS, "Result resolver thread") protected val serializer = new ThreadLocal[SerializerInstance] { override def initialValue(): SerializerInstance = { @@ -67,6 +50,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterSche case directResult: DirectTaskResult[_] => directResult case IndirectTaskResult(blockId) => logDebug("Fetching indirect task result for TID %s".format(tid)) + scheduler.handleTaskGettingResult(taskSetManager, tid) val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId) if (!serializedTaskResult.isDefined) { /* We won't be able to get the task result if the machine that ran the task failed diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index bf4040fafc..8de9b72b2f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -30,13 +30,14 @@ import org.apache.mesos._ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} import org.apache.spark.{SparkException, Logging, SparkContext, TaskState} -import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend} +import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend} /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the - * StandaloneBackend mechanism. This class is useful for lower and more predictable latency. + * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable + * latency. * * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to * remove this. @@ -46,7 +47,7 @@ private[spark] class CoarseMesosSchedulerBackend( sc: SparkContext, master: String, appName: String) - extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with MScheduler with Logging { @@ -122,20 +123,20 @@ private[spark] class CoarseMesosSchedulerBackend( val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), - StandaloneSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME) val uri = System.getProperty("spark.executor.uri") if (uri == null) { val runScript = new File(sparkHome, "spark-class").getCanonicalPath command.setValue( - "\"%s\" org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format( + "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format( runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) } else { // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". val basename = uri.split('/').last.split('.').head command.setValue( - "cd %s*; ./spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format( - basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) + "cd %s*; ./spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d" + .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) } return command.build() |