From d85fe41b2b380e2879cb18008dbeb344ed7d7c92 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Wed, 25 Sep 2013 12:18:32 -0700 Subject: Improved organization of scheduling packages. This commit does not change any code -- only file organization. There are two components of this change: (1) Moving files out of the cluster package, and down a level to the scheduling package. These files are all used by the local scheduler in addition to the cluster scheduler(s), so should not be in the cluster package. As a result of this change, none of the files in the local package reference files in the cluster package. (2) Moving the mesos package to within the cluster package. The mesos scheduling code is for a cluster, and represents a specific case of cluster scheduling (the Mesos-related classes often subclass cluster scheduling classes). Thus, the most logical place for it is within the cluster package. --- .../main/scala/org/apache/spark/SparkContext.scala | 4 +- .../org/apache/spark/scheduler/DAGScheduler.scala | 1 - .../apache/spark/scheduler/DAGSchedulerEvent.scala | 1 - .../org/apache/spark/scheduler/JobLogger.scala | 1 - .../scala/org/apache/spark/scheduler/Pool.scala | 121 ++++++++ .../org/apache/spark/scheduler/Schedulable.scala | 48 +++ .../spark/scheduler/SchedulableBuilder.scala | 150 +++++++++ .../spark/scheduler/SchedulingAlgorithm.scala | 81 +++++ .../apache/spark/scheduler/SchedulingMode.scala | 29 ++ .../org/apache/spark/scheduler/SparkListener.scala | 1 - .../org/apache/spark/scheduler/StageInfo.scala | 2 +- .../apache/spark/scheduler/TaskDescription.scala | 37 +++ .../org/apache/spark/scheduler/TaskInfo.scala | 72 +++++ .../org/apache/spark/scheduler/TaskLocality.scala | 32 ++ .../org/apache/spark/scheduler/TaskScheduler.scala | 5 +- .../spark/scheduler/TaskSchedulerListener.scala | 1 - .../apache/spark/scheduler/TaskSetManager.scala | 50 +++ .../spark/scheduler/cluster/ClusterScheduler.scala | 2 +- .../scheduler/cluster/ClusterTaskSetManager.scala | 9 +- .../org/apache/spark/scheduler/cluster/Pool.scala | 121 -------- .../spark/scheduler/cluster/Schedulable.scala | 48 --- .../scheduler/cluster/SchedulableBuilder.scala | 150 --------- .../scheduler/cluster/SchedulingAlgorithm.scala | 81 ----- .../spark/scheduler/cluster/SchedulingMode.scala | 29 -- .../cluster/StandaloneClusterMessage.scala | 1 + .../cluster/StandaloneSchedulerBackend.scala | 1 + .../spark/scheduler/cluster/TaskDescription.scala | 37 --- .../apache/spark/scheduler/cluster/TaskInfo.scala | 72 ----- .../spark/scheduler/cluster/TaskLocality.scala | 32 -- .../spark/scheduler/cluster/TaskSetManager.scala | 51 --- .../mesos/CoarseMesosSchedulerBackend.scala | 286 +++++++++++++++++ .../cluster/mesos/MesosSchedulerBackend.scala | 345 +++++++++++++++++++++ .../spark/scheduler/local/LocalScheduler.scala | 3 +- .../scheduler/local/LocalTaskSetManager.scala | 4 +- .../mesos/CoarseMesosSchedulerBackend.scala | 286 ----------------- .../scheduler/mesos/MesosSchedulerBackend.scala | 343 -------------------- .../org/apache/spark/ui/UIWorkloadGenerator.scala | 2 +- .../org/apache/spark/ui/exec/ExecutorsUI.scala | 2 +- .../scala/org/apache/spark/ui/jobs/IndexPage.scala | 2 +- .../apache/spark/ui/jobs/JobProgressListener.scala | 4 +- .../org/apache/spark/ui/jobs/JobProgressUI.scala | 4 +- .../scala/org/apache/spark/ui/jobs/PoolTable.scala | 3 +- .../scala/org/apache/spark/ui/jobs/StagePage.scala | 6 +- .../org/apache/spark/ui/jobs/StageTable.scala | 3 +- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 6 +- 45 files changed, 1280 insertions(+), 1289 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/scheduler/Pool.scala create mode 100644 core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala create mode 100644 core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala create mode 100644 core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala create mode 100644 core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala create mode 100644 core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala create mode 100644 core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala create mode 100644 core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala create mode 100644 core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala create mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala create mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6bab1f31d0..912ce752fb 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -56,9 +56,9 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, - ClusterScheduler, Schedulable, SchedulingMode} + ClusterScheduler} import org.apache.spark.scheduler.local.LocalScheduler -import org.apache.spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} +import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.storage.{StorageUtils, BlockManagerSource} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap} diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3e3f04f087..8a55df4af0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -28,7 +28,6 @@ import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} -import org.apache.spark.scheduler.cluster.TaskInfo import org.apache.spark.storage.{BlockManager, BlockManagerMaster} import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap} diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 0d99670648..10ff1b4376 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -19,7 +19,6 @@ package org.apache.spark.scheduler import java.util.Properties -import org.apache.spark.scheduler.cluster.TaskInfo import scala.collection.mutable.Map import org.apache.spark._ diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index c8b78bf00a..3628b1b078 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -30,7 +30,6 @@ import scala.io.Source import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.executor.TaskMetrics -import org.apache.spark.scheduler.cluster.TaskInfo // Used to record runtime information for each job, including RDD graph // tasks' start/stop shuffle information and information from outside diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala new file mode 100644 index 0000000000..c9a66b3a75 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap + +import org.apache.spark.Logging +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode + +/** + * An Schedulable entity that represent collection of Pools or TaskSetManagers + */ + +private[spark] class Pool( + val poolName: String, + val schedulingMode: SchedulingMode, + initMinShare: Int, + initWeight: Int) + extends Schedulable + with Logging { + + var schedulableQueue = new ArrayBuffer[Schedulable] + var schedulableNameToSchedulable = new HashMap[String, Schedulable] + + var weight = initWeight + var minShare = initMinShare + var runningTasks = 0 + + var priority = 0 + var stageId = 0 + var name = poolName + var parent:Schedulable = null + + var taskSetSchedulingAlgorithm: SchedulingAlgorithm = { + schedulingMode match { + case SchedulingMode.FAIR => + new FairSchedulingAlgorithm() + case SchedulingMode.FIFO => + new FIFOSchedulingAlgorithm() + } + } + + override def addSchedulable(schedulable: Schedulable) { + schedulableQueue += schedulable + schedulableNameToSchedulable(schedulable.name) = schedulable + schedulable.parent= this + } + + override def removeSchedulable(schedulable: Schedulable) { + schedulableQueue -= schedulable + schedulableNameToSchedulable -= schedulable.name + } + + override def getSchedulableByName(schedulableName: String): Schedulable = { + if (schedulableNameToSchedulable.contains(schedulableName)) { + return schedulableNameToSchedulable(schedulableName) + } + for (schedulable <- schedulableQueue) { + var sched = schedulable.getSchedulableByName(schedulableName) + if (sched != null) { + return sched + } + } + return null + } + + override def executorLost(executorId: String, host: String) { + schedulableQueue.foreach(_.executorLost(executorId, host)) + } + + override def checkSpeculatableTasks(): Boolean = { + var shouldRevive = false + for (schedulable <- schedulableQueue) { + shouldRevive |= schedulable.checkSpeculatableTasks() + } + return shouldRevive + } + + override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { + var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] + val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator) + for (schedulable <- sortedSchedulableQueue) { + sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue() + } + return sortedTaskSetQueue + } + + override def increaseRunningTasks(taskNum: Int) { + runningTasks += taskNum + if (parent != null) { + parent.increaseRunningTasks(taskNum) + } + } + + override def decreaseRunningTasks(taskNum: Int) { + runningTasks -= taskNum + if (parent != null) { + parent.decreaseRunningTasks(taskNum) + } + } + + override def hasPendingTasks(): Boolean = { + schedulableQueue.exists(_.hasPendingTasks()) + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala new file mode 100644 index 0000000000..857adaef5a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode + +import scala.collection.mutable.ArrayBuffer +/** + * An interface for schedulable entities. + * there are two type of Schedulable entities(Pools and TaskSetManagers) + */ +private[spark] trait Schedulable { + var parent: Schedulable + // child queues + def schedulableQueue: ArrayBuffer[Schedulable] + def schedulingMode: SchedulingMode + def weight: Int + def minShare: Int + def runningTasks: Int + def priority: Int + def stageId: Int + def name: String + + def increaseRunningTasks(taskNum: Int): Unit + def decreaseRunningTasks(taskNum: Int): Unit + def addSchedulable(schedulable: Schedulable): Unit + def removeSchedulable(schedulable: Schedulable): Unit + def getSchedulableByName(name: String): Schedulable + def executorLost(executorId: String, host: String): Unit + def checkSpeculatableTasks(): Boolean + def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] + def hasPendingTasks(): Boolean +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala new file mode 100644 index 0000000000..4e25086ec9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.{FileInputStream, InputStream} +import java.util.{NoSuchElementException, Properties} + +import org.apache.spark.Logging + +import scala.xml.XML + +/** + * An interface to build Schedulable tree + * buildPools: build the tree nodes(pools) + * addTaskSetManager: build the leaf nodes(TaskSetManagers) + */ +private[spark] trait SchedulableBuilder { + def buildPools() + def addTaskSetManager(manager: Schedulable, properties: Properties) +} + +private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) + extends SchedulableBuilder with Logging { + + override def buildPools() { + // nothing + } + + override def addTaskSetManager(manager: Schedulable, properties: Properties) { + rootPool.addSchedulable(manager) + } +} + +private[spark] class FairSchedulableBuilder(val rootPool: Pool) + extends SchedulableBuilder with Logging { + + val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file")) + val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml" + val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool" + val DEFAULT_POOL_NAME = "default" + val MINIMUM_SHARES_PROPERTY = "minShare" + val SCHEDULING_MODE_PROPERTY = "schedulingMode" + val WEIGHT_PROPERTY = "weight" + val POOL_NAME_PROPERTY = "@name" + val POOLS_PROPERTY = "pool" + val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO + val DEFAULT_MINIMUM_SHARE = 0 + val DEFAULT_WEIGHT = 1 + + override def buildPools() { + var is: Option[InputStream] = None + try { + is = Option { + schedulerAllocFile.map { f => + new FileInputStream(f) + }.getOrElse { + getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) + } + } + + is.foreach { i => buildFairSchedulerPool(i) } + } finally { + is.foreach(_.close()) + } + + // finally create "default" pool + buildDefaultPool() + } + + private def buildDefaultPool() { + if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) { + val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, + DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) + rootPool.addSchedulable(pool) + logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( + DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) + } + } + + private def buildFairSchedulerPool(is: InputStream) { + val xml = XML.load(is) + for (poolNode <- (xml \\ POOLS_PROPERTY)) { + + val poolName = (poolNode \ POOL_NAME_PROPERTY).text + var schedulingMode = DEFAULT_SCHEDULING_MODE + var minShare = DEFAULT_MINIMUM_SHARE + var weight = DEFAULT_WEIGHT + + val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text + if (xmlSchedulingMode != "") { + try { + schedulingMode = SchedulingMode.withName(xmlSchedulingMode) + } catch { + case e: NoSuchElementException => + logWarning("Error xml schedulingMode, using default schedulingMode") + } + } + + val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text + if (xmlMinShare != "") { + minShare = xmlMinShare.toInt + } + + val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text + if (xmlWeight != "") { + weight = xmlWeight.toInt + } + + val pool = new Pool(poolName, schedulingMode, minShare, weight) + rootPool.addSchedulable(pool) + logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( + poolName, schedulingMode, minShare, weight)) + } + } + + override def addTaskSetManager(manager: Schedulable, properties: Properties) { + var poolName = DEFAULT_POOL_NAME + var parentPool = rootPool.getSchedulableByName(poolName) + if (properties != null) { + poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME) + parentPool = rootPool.getSchedulableByName(poolName) + if (parentPool == null) { + // we will create a new pool that user has configured in app + // instead of being defined in xml file + parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, + DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) + rootPool.addSchedulable(parentPool) + logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( + poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) + } + } + parentPool.addSchedulable(manager) + logInfo("Added task set " + manager.name + " tasks to pool "+poolName) + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala new file mode 100644 index 0000000000..3418640b8c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +/** + * An interface for sort algorithm + * FIFO: FIFO algorithm between TaskSetManagers + * FS: FS algorithm between Pools, and FIFO or FS within Pools + */ +private[spark] trait SchedulingAlgorithm { + def comparator(s1: Schedulable, s2: Schedulable): Boolean +} + +private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { + override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { + val priority1 = s1.priority + val priority2 = s2.priority + var res = math.signum(priority1 - priority2) + if (res == 0) { + val stageId1 = s1.stageId + val stageId2 = s2.stageId + res = math.signum(stageId1 - stageId2) + } + if (res < 0) { + return true + } else { + return false + } + } +} + +private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { + override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { + val minShare1 = s1.minShare + val minShare2 = s2.minShare + val runningTasks1 = s1.runningTasks + val runningTasks2 = s2.runningTasks + val s1Needy = runningTasks1 < minShare1 + val s2Needy = runningTasks2 < minShare2 + val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble + val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble + val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble + val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble + var res:Boolean = true + var compare:Int = 0 + + if (s1Needy && !s2Needy) { + return true + } else if (!s1Needy && s2Needy) { + return false + } else if (s1Needy && s2Needy) { + compare = minShareRatio1.compareTo(minShareRatio2) + } else { + compare = taskToWeightRatio1.compareTo(taskToWeightRatio2) + } + + if (compare < 0) { + return true + } else if (compare > 0) { + return false + } else { + return s1.name < s2.name + } + } +} + diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala new file mode 100644 index 0000000000..0a786deb16 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +/** + * "FAIR" and "FIFO" determines which policy is used + * to order tasks amongst a Schedulable's sub-queues + * "NONE" is used when the a Schedulable has no sub-queues. + */ +object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") { + + type SchedulingMode = Value + val FAIR,FIFO,NONE = Value +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index c3cf4b8907..62b521ad45 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -18,7 +18,6 @@ package org.apache.spark.scheduler import java.util.Properties -import org.apache.spark.scheduler.cluster.TaskInfo import org.apache.spark.util.{Utils, Distribution} import org.apache.spark.{Logging, SparkContext, TaskEndReason} import org.apache.spark.executor.TaskMetrics diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 72cb1c9ce8..b6f11969e5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -17,8 +17,8 @@ package org.apache.spark.scheduler -import org.apache.spark.scheduler.cluster.TaskInfo import scala.collection._ + import org.apache.spark.executor.TaskMetrics case class StageInfo( diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala new file mode 100644 index 0000000000..5190d234d4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.nio.ByteBuffer +import org.apache.spark.util.SerializableBuffer + +private[spark] class TaskDescription( + val taskId: Long, + val executorId: String, + val name: String, + val index: Int, // Index within this task's TaskSet + _serializedTask: ByteBuffer) + extends Serializable { + + // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer + private val buffer = new SerializableBuffer(_serializedTask) + + def serializedTask: ByteBuffer = buffer.value + + override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index) +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala new file mode 100644 index 0000000000..7c2a422aff --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.util.Utils + +/** + * Information about a running task attempt inside a TaskSet. + */ +private[spark] +class TaskInfo( + val taskId: Long, + val index: Int, + val launchTime: Long, + val executorId: String, + val host: String, + val taskLocality: TaskLocality.TaskLocality) { + + var finishTime: Long = 0 + var failed = false + + def markSuccessful(time: Long = System.currentTimeMillis) { + finishTime = time + } + + def markFailed(time: Long = System.currentTimeMillis) { + finishTime = time + failed = true + } + + def finished: Boolean = finishTime != 0 + + def successful: Boolean = finished && !failed + + def running: Boolean = !finished + + def status: String = { + if (running) + "RUNNING" + else if (failed) + "FAILED" + else if (successful) + "SUCCESS" + else + "UNKNOWN" + } + + def duration: Long = { + if (!finished) { + throw new UnsupportedOperationException("duration() called on unfinished tasks") + } else { + finishTime - launchTime + } + } + + def timeRunning(currentTime: Long): Long = currentTime - launchTime +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala new file mode 100644 index 0000000000..47b0f387aa --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + + +private[spark] object TaskLocality + extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") +{ + // process local is expected to be used ONLY within tasksetmanager for now. + val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value + + type TaskLocality = Value + + def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = { + condition <= constraint + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 63be8ba3f5..7c2a9f03d7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -17,10 +17,11 @@ package org.apache.spark.scheduler -import org.apache.spark.scheduler.cluster.Pool -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode + /** * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler. + * Each TaskScheduler schedulers task for a single SparkContext. * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage, * and are responsible for sending the tasks to the cluster, running them, retrying if there * are failures, and mitigating stragglers. They return events to the DAGScheduler through diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala index 83be051c1a..593fa9fb93 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala @@ -17,7 +17,6 @@ package org.apache.spark.scheduler -import org.apache.spark.scheduler.cluster.TaskInfo import scala.collection.mutable.Map import org.apache.spark.TaskEndReason diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala new file mode 100644 index 0000000000..f192b0b7a4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.nio.ByteBuffer + +import org.apache.spark.TaskState.TaskState + +/** + * Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of + * each task and is responsible for retries on failure and locality. The main interfaces to it + * are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, and + * statusUpdate, which tells it that one of its tasks changed state (e.g. finished). + * + * THREADING: This class is designed to only be called from code with a lock on the TaskScheduler + * (e.g. its event handlers). It should not be called from other threads. + */ +private[spark] trait TaskSetManager extends Schedulable { + def schedulableQueue = null + + def schedulingMode = SchedulingMode.NONE + + def taskSet: TaskSet + + def resourceOffer( + execId: String, + host: String, + availableCpus: Int, + maxLocality: TaskLocality.TaskLocality) + : Option[TaskDescription] + + def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) + + def error(message: String) +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 919acce828..a6dee604b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.HashSet import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicLong import java.util.{TimerTask, Timer} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala index 0ac3d7bcfd..411e49b021 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -25,15 +25,12 @@ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet import scala.math.max import scala.math.min +import scala.Some -import org.apache.spark.{FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskState} -import org.apache.spark.{ExceptionFailure, SparkException, TaskResultTooBigFailure} +import org.apache.spark.{ExceptionFailure, FetchFailed, Logging, Resubmitted, SparkEnv, + SparkException, Success, TaskEndReason, TaskResultTooBigFailure, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler._ -import scala.Some -import org.apache.spark.FetchFailed -import org.apache.spark.ExceptionFailure -import org.apache.spark.TaskResultTooBigFailure import org.apache.spark.util.{SystemClock, Clock} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala deleted file mode 100644 index 35b32600da..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap - -import org.apache.spark.Logging -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode - -/** - * An Schedulable entity that represent collection of Pools or TaskSetManagers - */ - -private[spark] class Pool( - val poolName: String, - val schedulingMode: SchedulingMode, - initMinShare: Int, - initWeight: Int) - extends Schedulable - with Logging { - - var schedulableQueue = new ArrayBuffer[Schedulable] - var schedulableNameToSchedulable = new HashMap[String, Schedulable] - - var weight = initWeight - var minShare = initMinShare - var runningTasks = 0 - - var priority = 0 - var stageId = 0 - var name = poolName - var parent:Schedulable = null - - var taskSetSchedulingAlgorithm: SchedulingAlgorithm = { - schedulingMode match { - case SchedulingMode.FAIR => - new FairSchedulingAlgorithm() - case SchedulingMode.FIFO => - new FIFOSchedulingAlgorithm() - } - } - - override def addSchedulable(schedulable: Schedulable) { - schedulableQueue += schedulable - schedulableNameToSchedulable(schedulable.name) = schedulable - schedulable.parent= this - } - - override def removeSchedulable(schedulable: Schedulable) { - schedulableQueue -= schedulable - schedulableNameToSchedulable -= schedulable.name - } - - override def getSchedulableByName(schedulableName: String): Schedulable = { - if (schedulableNameToSchedulable.contains(schedulableName)) { - return schedulableNameToSchedulable(schedulableName) - } - for (schedulable <- schedulableQueue) { - var sched = schedulable.getSchedulableByName(schedulableName) - if (sched != null) { - return sched - } - } - return null - } - - override def executorLost(executorId: String, host: String) { - schedulableQueue.foreach(_.executorLost(executorId, host)) - } - - override def checkSpeculatableTasks(): Boolean = { - var shouldRevive = false - for (schedulable <- schedulableQueue) { - shouldRevive |= schedulable.checkSpeculatableTasks() - } - return shouldRevive - } - - override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { - var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] - val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator) - for (schedulable <- sortedSchedulableQueue) { - sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue() - } - return sortedTaskSetQueue - } - - override def increaseRunningTasks(taskNum: Int) { - runningTasks += taskNum - if (parent != null) { - parent.increaseRunningTasks(taskNum) - } - } - - override def decreaseRunningTasks(taskNum: Int) { - runningTasks -= taskNum - if (parent != null) { - parent.decreaseRunningTasks(taskNum) - } - } - - override def hasPendingTasks(): Boolean = { - schedulableQueue.exists(_.hasPendingTasks()) - } -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala deleted file mode 100644 index f4726450ec..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode - -import scala.collection.mutable.ArrayBuffer -/** - * An interface for schedulable entities. - * there are two type of Schedulable entities(Pools and TaskSetManagers) - */ -private[spark] trait Schedulable { - var parent: Schedulable - // child queues - def schedulableQueue: ArrayBuffer[Schedulable] - def schedulingMode: SchedulingMode - def weight: Int - def minShare: Int - def runningTasks: Int - def priority: Int - def stageId: Int - def name: String - - def increaseRunningTasks(taskNum: Int): Unit - def decreaseRunningTasks(taskNum: Int): Unit - def addSchedulable(schedulable: Schedulable): Unit - def removeSchedulable(schedulable: Schedulable): Unit - def getSchedulableByName(name: String): Schedulable - def executorLost(executorId: String, host: String): Unit - def checkSpeculatableTasks(): Boolean - def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] - def hasPendingTasks(): Boolean -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala deleted file mode 100644 index 114617c51a..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import java.io.{FileInputStream, InputStream} -import java.util.{NoSuchElementException, Properties} - -import org.apache.spark.Logging - -import scala.xml.XML - -/** - * An interface to build Schedulable tree - * buildPools: build the tree nodes(pools) - * addTaskSetManager: build the leaf nodes(TaskSetManagers) - */ -private[spark] trait SchedulableBuilder { - def buildPools() - def addTaskSetManager(manager: Schedulable, properties: Properties) -} - -private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) - extends SchedulableBuilder with Logging { - - override def buildPools() { - // nothing - } - - override def addTaskSetManager(manager: Schedulable, properties: Properties) { - rootPool.addSchedulable(manager) - } -} - -private[spark] class FairSchedulableBuilder(val rootPool: Pool) - extends SchedulableBuilder with Logging { - - val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file")) - val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml" - val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool" - val DEFAULT_POOL_NAME = "default" - val MINIMUM_SHARES_PROPERTY = "minShare" - val SCHEDULING_MODE_PROPERTY = "schedulingMode" - val WEIGHT_PROPERTY = "weight" - val POOL_NAME_PROPERTY = "@name" - val POOLS_PROPERTY = "pool" - val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO - val DEFAULT_MINIMUM_SHARE = 0 - val DEFAULT_WEIGHT = 1 - - override def buildPools() { - var is: Option[InputStream] = None - try { - is = Option { - schedulerAllocFile.map { f => - new FileInputStream(f) - }.getOrElse { - getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) - } - } - - is.foreach { i => buildFairSchedulerPool(i) } - } finally { - is.foreach(_.close()) - } - - // finally create "default" pool - buildDefaultPool() - } - - private def buildDefaultPool() { - if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) { - val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, - DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) - rootPool.addSchedulable(pool) - logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( - DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) - } - } - - private def buildFairSchedulerPool(is: InputStream) { - val xml = XML.load(is) - for (poolNode <- (xml \\ POOLS_PROPERTY)) { - - val poolName = (poolNode \ POOL_NAME_PROPERTY).text - var schedulingMode = DEFAULT_SCHEDULING_MODE - var minShare = DEFAULT_MINIMUM_SHARE - var weight = DEFAULT_WEIGHT - - val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text - if (xmlSchedulingMode != "") { - try { - schedulingMode = SchedulingMode.withName(xmlSchedulingMode) - } catch { - case e: NoSuchElementException => - logWarning("Error xml schedulingMode, using default schedulingMode") - } - } - - val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text - if (xmlMinShare != "") { - minShare = xmlMinShare.toInt - } - - val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text - if (xmlWeight != "") { - weight = xmlWeight.toInt - } - - val pool = new Pool(poolName, schedulingMode, minShare, weight) - rootPool.addSchedulable(pool) - logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( - poolName, schedulingMode, minShare, weight)) - } - } - - override def addTaskSetManager(manager: Schedulable, properties: Properties) { - var poolName = DEFAULT_POOL_NAME - var parentPool = rootPool.getSchedulableByName(poolName) - if (properties != null) { - poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME) - parentPool = rootPool.getSchedulableByName(poolName) - if (parentPool == null) { - // we will create a new pool that user has configured in app - // instead of being defined in xml file - parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, - DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) - rootPool.addSchedulable(parentPool) - logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( - poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) - } - } - parentPool.addSchedulable(manager) - logInfo("Added task set " + manager.name + " tasks to pool "+poolName) - } -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala deleted file mode 100644 index cbeed4731a..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -/** - * An interface for sort algorithm - * FIFO: FIFO algorithm between TaskSetManagers - * FS: FS algorithm between Pools, and FIFO or FS within Pools - */ -private[spark] trait SchedulingAlgorithm { - def comparator(s1: Schedulable, s2: Schedulable): Boolean -} - -private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { - override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { - val priority1 = s1.priority - val priority2 = s2.priority - var res = math.signum(priority1 - priority2) - if (res == 0) { - val stageId1 = s1.stageId - val stageId2 = s2.stageId - res = math.signum(stageId1 - stageId2) - } - if (res < 0) { - return true - } else { - return false - } - } -} - -private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { - override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { - val minShare1 = s1.minShare - val minShare2 = s2.minShare - val runningTasks1 = s1.runningTasks - val runningTasks2 = s2.runningTasks - val s1Needy = runningTasks1 < minShare1 - val s2Needy = runningTasks2 < minShare2 - val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble - val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble - val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble - val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble - var res:Boolean = true - var compare:Int = 0 - - if (s1Needy && !s2Needy) { - return true - } else if (!s1Needy && s2Needy) { - return false - } else if (s1Needy && s2Needy) { - compare = minShareRatio1.compareTo(minShareRatio2) - } else { - compare = taskToWeightRatio1.compareTo(taskToWeightRatio2) - } - - if (compare < 0) { - return true - } else if (compare > 0) { - return false - } else { - return s1.name < s2.name - } - } -} - diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala deleted file mode 100644 index 34811389a0..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -/** - * "FAIR" and "FIFO" determines which policy is used - * to order tasks amongst a Schedulable's sub-queues - * "NONE" is used when the a Schedulable has no sub-queues. - */ -object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") { - - type SchedulingMode = Value - val FAIR,FIFO,NONE = Value -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala index 9c36d221f6..c0b836bf1a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster import java.nio.ByteBuffer import org.apache.spark.TaskState.TaskState +import org.apache.spark.scheduler.TaskDescription import org.apache.spark.util.{Utils, SerializableBuffer} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index b4ea0be415..f3aeea43d5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -29,6 +29,7 @@ import akka.util.Duration import akka.util.duration._ import org.apache.spark.{SparkException, Logging, TaskState} +import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._ import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala deleted file mode 100644 index 309ac2f6c9..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import java.nio.ByteBuffer -import org.apache.spark.util.SerializableBuffer - -private[spark] class TaskDescription( - val taskId: Long, - val executorId: String, - val name: String, - val index: Int, // Index within this task's TaskSet - _serializedTask: ByteBuffer) - extends Serializable { - - // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer - private val buffer = new SerializableBuffer(_serializedTask) - - def serializedTask: ByteBuffer = buffer.value - - override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index) -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala deleted file mode 100644 index 9685fb1a67..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import org.apache.spark.util.Utils - -/** - * Information about a running task attempt inside a TaskSet. - */ -private[spark] -class TaskInfo( - val taskId: Long, - val index: Int, - val launchTime: Long, - val executorId: String, - val host: String, - val taskLocality: TaskLocality.TaskLocality) { - - var finishTime: Long = 0 - var failed = false - - def markSuccessful(time: Long = System.currentTimeMillis) { - finishTime = time - } - - def markFailed(time: Long = System.currentTimeMillis) { - finishTime = time - failed = true - } - - def finished: Boolean = finishTime != 0 - - def successful: Boolean = finished && !failed - - def running: Boolean = !finished - - def status: String = { - if (running) - "RUNNING" - else if (failed) - "FAILED" - else if (successful) - "SUCCESS" - else - "UNKNOWN" - } - - def duration: Long = { - if (!finished) { - throw new UnsupportedOperationException("duration() called on unfinished tasks") - } else { - finishTime - launchTime - } - } - - def timeRunning(currentTime: Long): Long = currentTime - launchTime -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala deleted file mode 100644 index 5d4130e14a..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - - -private[spark] object TaskLocality - extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") -{ - // process local is expected to be used ONLY within tasksetmanager for now. - val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value - - type TaskLocality = Value - - def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = { - condition <= constraint - } -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala deleted file mode 100644 index 648a3ef922..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import java.nio.ByteBuffer - -import org.apache.spark.TaskState.TaskState -import org.apache.spark.scheduler.TaskSet - -/** - * Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of - * each task and is responsible for retries on failure and locality. The main interfaces to it - * are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, and - * statusUpdate, which tells it that one of its tasks changed state (e.g. finished). - * - * THREADING: This class is designed to only be called from code with a lock on the TaskScheduler - * (e.g. its event handlers). It should not be called from other threads. - */ -private[spark] trait TaskSetManager extends Schedulable { - def schedulableQueue = null - - def schedulingMode = SchedulingMode.NONE - - def taskSet: TaskSet - - def resourceOffer( - execId: String, - host: String, - availableCpus: Int, - maxLocality: TaskLocality.TaskLocality) - : Option[TaskDescription] - - def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) - - def error(message: String) -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala new file mode 100644 index 0000000000..8f2eef9a53 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.io.File +import java.util.{ArrayList => JArrayList, List => JList} +import java.util.Collections + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.JavaConversions._ + +import com.google.protobuf.ByteString +import org.apache.mesos.{Scheduler => MScheduler} +import org.apache.mesos._ +import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} + +import org.apache.spark.{SparkException, Logging, SparkContext, TaskState} +import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend} + +/** + * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds + * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever + * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the + * StandaloneBackend mechanism. This class is useful for lower and more predictable latency. + * + * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to + * remove this. + */ +private[spark] class CoarseMesosSchedulerBackend( + scheduler: ClusterScheduler, + sc: SparkContext, + master: String, + appName: String) + extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem) + with MScheduler + with Logging { + + val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures + + // Lock used to wait for scheduler to be registered + var isRegistered = false + val registeredLock = new Object() + + // Driver for talking to Mesos + var driver: SchedulerDriver = null + + // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) + val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt + + // Cores we have acquired with each Mesos task ID + val coresByTaskId = new HashMap[Int, Int] + var totalCoresAcquired = 0 + + val slaveIdsWithExecutors = new HashSet[String] + + val taskIdToSlaveId = new HashMap[Int, String] + val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed + + val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( + "Spark home is not set; set it through the spark.home system " + + "property, the SPARK_HOME environment variable or the SparkContext constructor")) + + val extraCoresPerSlave = System.getProperty("spark.mesos.extra.cores", "0").toInt + + var nextMesosTaskId = 0 + + def newMesosTaskId(): Int = { + val id = nextMesosTaskId + nextMesosTaskId += 1 + id + } + + override def start() { + super.start() + + synchronized { + new Thread("CoarseMesosSchedulerBackend driver") { + setDaemon(true) + override def run() { + val scheduler = CoarseMesosSchedulerBackend.this + val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build() + driver = new MesosSchedulerDriver(scheduler, fwInfo, master) + try { { + val ret = driver.run() + logInfo("driver.run() returned with code " + ret) + } + } catch { + case e: Exception => logError("driver.run() failed", e) + } + } + }.start() + + waitForRegister() + } + } + + def createCommand(offer: Offer, numCores: Int): CommandInfo = { + val environment = Environment.newBuilder() + sc.executorEnvs.foreach { case (key, value) => + environment.addVariables(Environment.Variable.newBuilder() + .setName(key) + .setValue(value) + .build()) + } + val command = CommandInfo.newBuilder() + .setEnvironment(environment) + val driverUrl = "akka://spark@%s:%s/user/%s".format( + System.getProperty("spark.driver.host"), + System.getProperty("spark.driver.port"), + StandaloneSchedulerBackend.ACTOR_NAME) + val uri = System.getProperty("spark.executor.uri") + if (uri == null) { + val runScript = new File(sparkHome, "spark-class").getCanonicalPath + command.setValue( + "\"%s\" org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format( + runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) + } else { + // Grab everything to the first '.'. We'll use that and '*' to + // glob the directory "correctly". + val basename = uri.split('/').last.split('.').head + command.setValue( + "cd %s*; ./spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format( + basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) + command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) + } + return command.build() + } + + override def offerRescinded(d: SchedulerDriver, o: OfferID) {} + + override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { + logInfo("Registered as framework ID " + frameworkId.getValue) + registeredLock.synchronized { + isRegistered = true + registeredLock.notifyAll() + } + } + + def waitForRegister() { + registeredLock.synchronized { + while (!isRegistered) { + registeredLock.wait() + } + } + } + + override def disconnected(d: SchedulerDriver) {} + + override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} + + /** + * Method called by Mesos to offer resources on slaves. We respond by launching an executor, + * unless we've already launched more than we wanted to. + */ + override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { + synchronized { + val filters = Filters.newBuilder().setRefuseSeconds(-1).build() + + for (offer <- offers) { + val slaveId = offer.getSlaveId.toString + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus").toInt + if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 && + failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && + !slaveIdsWithExecutors.contains(slaveId)) { + // Launch an executor on the slave + val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) + val taskId = newMesosTaskId() + taskIdToSlaveId(taskId) = slaveId + slaveIdsWithExecutors += slaveId + coresByTaskId(taskId) = cpusToUse + val task = MesosTaskInfo.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) + .setSlaveId(offer.getSlaveId) + .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave)) + .setName("Task " + taskId) + .addResources(createResource("cpus", cpusToUse)) + .addResources(createResource("mem", executorMemory)) + .build() + d.launchTasks(offer.getId, Collections.singletonList(task), filters) + } else { + // Filter it out + d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters) + } + } + } + } + + /** Helper function to pull out a resource from a Mesos Resources protobuf */ + private def getResource(res: JList[Resource], name: String): Double = { + for (r <- res if r.getName == name) { + return r.getScalar.getValue + } + // If we reached here, no resource with the required name was present + throw new IllegalArgumentException("No resource called " + name + " in " + res) + } + + /** Build a Mesos resource protobuf object */ + private def createResource(resourceName: String, quantity: Double): Protos.Resource = { + Resource.newBuilder() + .setName(resourceName) + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(quantity).build()) + .build() + } + + /** Check whether a Mesos task state represents a finished task */ + private def isFinished(state: MesosTaskState) = { + state == MesosTaskState.TASK_FINISHED || + state == MesosTaskState.TASK_FAILED || + state == MesosTaskState.TASK_KILLED || + state == MesosTaskState.TASK_LOST + } + + override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { + val taskId = status.getTaskId.getValue.toInt + val state = status.getState + logInfo("Mesos task " + taskId + " is now " + state) + synchronized { + if (isFinished(state)) { + val slaveId = taskIdToSlaveId(taskId) + slaveIdsWithExecutors -= slaveId + taskIdToSlaveId -= taskId + // Remove the cores we have remembered for this task, if it's in the hashmap + for (cores <- coresByTaskId.get(taskId)) { + totalCoresAcquired -= cores + coresByTaskId -= taskId + } + // If it was a failure, mark the slave as failed for blacklisting purposes + if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) { + failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1 + if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) { + logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " + + "is Spark installed on it?") + } + } + driver.reviveOffers() // In case we'd rejected everything before but have now lost a node + } + } + } + + override def error(d: SchedulerDriver, message: String) { + logError("Mesos error: " + message) + scheduler.error(message) + } + + override def stop() { + super.stop() + if (driver != null) { + driver.stop() + } + } + + override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} + + override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { + logInfo("Mesos slave lost: " + slaveId.getValue) + synchronized { + if (slaveIdsWithExecutors.contains(slaveId.getValue)) { + // Note that the slave ID corresponds to the executor ID on that slave + slaveIdsWithExecutors -= slaveId.getValue + removeExecutor(slaveId.getValue, "Mesos slave lost") + } + } + } + + override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { + logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) + slaveLost(d, s) + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala new file mode 100644 index 0000000000..50cbc2ca92 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.io.File +import java.util.{ArrayList => JArrayList, List => JList} +import java.util.Collections + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.JavaConversions._ + +import com.google.protobuf.ByteString +import org.apache.mesos.{Scheduler => MScheduler} +import org.apache.mesos._ +import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} + +import org.apache.spark.{Logging, SparkException, SparkContext, TaskState} +import org.apache.spark.scheduler.TaskDescription +import org.apache.spark.scheduler.cluster.{ClusterScheduler, ExecutorExited, ExecutorLossReason} +import org.apache.spark.scheduler.cluster.{SchedulerBackend, SlaveLost, WorkerOffer} +import org.apache.spark.util.Utils + +/** + * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a + * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks + * from multiple apps can run on different cores) and in time (a core can switch ownership). + */ +private[spark] class MesosSchedulerBackend( + scheduler: ClusterScheduler, + sc: SparkContext, + master: String, + appName: String) + extends SchedulerBackend + with MScheduler + with Logging { + + // Lock used to wait for scheduler to be registered + var isRegistered = false + val registeredLock = new Object() + + // Driver for talking to Mesos + var driver: SchedulerDriver = null + + // Which slave IDs we have executors on + val slaveIdsWithExecutors = new HashSet[String] + val taskIdToSlaveId = new HashMap[Long, String] + + // An ExecutorInfo for our tasks + var execArgs: Array[Byte] = null + + var classLoader: ClassLoader = null + + override def start() { + synchronized { + classLoader = Thread.currentThread.getContextClassLoader + + new Thread("MesosSchedulerBackend driver") { + setDaemon(true) + override def run() { + val scheduler = MesosSchedulerBackend.this + val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build() + driver = new MesosSchedulerDriver(scheduler, fwInfo, master) + try { + val ret = driver.run() + logInfo("driver.run() returned with code " + ret) + } catch { + case e: Exception => logError("driver.run() failed", e) + } + } + }.start() + + waitForRegister() + } + } + + def createExecutorInfo(execId: String): ExecutorInfo = { + val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( + "Spark home is not set; set it through the spark.home system " + + "property, the SPARK_HOME environment variable or the SparkContext constructor")) + val environment = Environment.newBuilder() + sc.executorEnvs.foreach { case (key, value) => + environment.addVariables(Environment.Variable.newBuilder() + .setName(key) + .setValue(value) + .build()) + } + val command = CommandInfo.newBuilder() + .setEnvironment(environment) + val uri = System.getProperty("spark.executor.uri") + if (uri == null) { + command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath) + } else { + // Grab everything to the first '.'. We'll use that and '*' to + // glob the directory "correctly". + val basename = uri.split('/').last.split('.').head + command.setValue("cd %s*; ./spark-executor".format(basename)) + command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) + } + val memory = Resource.newBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build()) + .build() + ExecutorInfo.newBuilder() + .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) + .setCommand(command) + .setData(ByteString.copyFrom(createExecArg())) + .addResources(memory) + .build() + } + + /** + * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array + * containing all the spark.* system properties in the form of (String, String) pairs. + */ + private def createExecArg(): Array[Byte] = { + if (execArgs == null) { + val props = new HashMap[String, String] + val iterator = System.getProperties.entrySet.iterator + while (iterator.hasNext) { + val entry = iterator.next + val (key, value) = (entry.getKey.toString, entry.getValue.toString) + if (key.startsWith("spark.")) { + props(key) = value + } + } + // Serialize the map as an array of (String, String) pairs + execArgs = Utils.serialize(props.toArray) + } + return execArgs + } + + private def setClassLoader(): ClassLoader = { + val oldClassLoader = Thread.currentThread.getContextClassLoader + Thread.currentThread.setContextClassLoader(classLoader) + return oldClassLoader + } + + private def restoreClassLoader(oldClassLoader: ClassLoader) { + Thread.currentThread.setContextClassLoader(oldClassLoader) + } + + override def offerRescinded(d: SchedulerDriver, o: OfferID) {} + + override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { + val oldClassLoader = setClassLoader() + try { + logInfo("Registered as framework ID " + frameworkId.getValue) + registeredLock.synchronized { + isRegistered = true + registeredLock.notifyAll() + } + } finally { + restoreClassLoader(oldClassLoader) + } + } + + def waitForRegister() { + registeredLock.synchronized { + while (!isRegistered) { + registeredLock.wait() + } + } + } + + override def disconnected(d: SchedulerDriver) {} + + override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} + + /** + * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets + * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that + * tasks are balanced across the cluster. + */ + override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { + val oldClassLoader = setClassLoader() + try { + synchronized { + // Build a big list of the offerable workers, and remember their indices so that we can + // figure out which Offer to reply to for each worker + val offerableIndices = new ArrayBuffer[Int] + val offerableWorkers = new ArrayBuffer[WorkerOffer] + + def enoughMemory(o: Offer) = { + val mem = getResource(o.getResourcesList, "mem") + val slaveId = o.getSlaveId.getValue + mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId) + } + + for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) { + offerableIndices += index + offerableWorkers += new WorkerOffer( + offer.getSlaveId.getValue, + offer.getHostname, + getResource(offer.getResourcesList, "cpus").toInt) + } + + // Call into the ClusterScheduler + val taskLists = scheduler.resourceOffers(offerableWorkers) + + // Build a list of Mesos tasks for each slave + val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]()) + for ((taskList, index) <- taskLists.zipWithIndex) { + if (!taskList.isEmpty) { + val offerNum = offerableIndices(index) + val slaveId = offers(offerNum).getSlaveId.getValue + slaveIdsWithExecutors += slaveId + mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size) + for (taskDesc <- taskList) { + taskIdToSlaveId(taskDesc.taskId) = slaveId + mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId)) + } + } + } + + // Reply to the offers + val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? + for (i <- 0 until offers.size) { + d.launchTasks(offers(i).getId, mesosTasks(i), filters) + } + } + } finally { + restoreClassLoader(oldClassLoader) + } + } + + /** Helper function to pull out a resource from a Mesos Resources protobuf */ + def getResource(res: JList[Resource], name: String): Double = { + for (r <- res if r.getName == name) { + return r.getScalar.getValue + } + // If we reached here, no resource with the required name was present + throw new IllegalArgumentException("No resource called " + name + " in " + res) + } + + /** Turn a Spark TaskDescription into a Mesos task */ + def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = { + val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build() + val cpuResource = Resource.newBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(1).build()) + .build() + return MesosTaskInfo.newBuilder() + .setTaskId(taskId) + .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) + .setExecutor(createExecutorInfo(slaveId)) + .setName(task.name) + .addResources(cpuResource) + .setData(ByteString.copyFrom(task.serializedTask)) + .build() + } + + /** Check whether a Mesos task state represents a finished task */ + def isFinished(state: MesosTaskState) = { + state == MesosTaskState.TASK_FINISHED || + state == MesosTaskState.TASK_FAILED || + state == MesosTaskState.TASK_KILLED || + state == MesosTaskState.TASK_LOST + } + + override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { + val oldClassLoader = setClassLoader() + try { + val tid = status.getTaskId.getValue.toLong + val state = TaskState.fromMesos(status.getState) + synchronized { + if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) { + // We lost the executor on this slave, so remember that it's gone + slaveIdsWithExecutors -= taskIdToSlaveId(tid) + } + if (isFinished(status.getState)) { + taskIdToSlaveId.remove(tid) + } + } + scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer) + } finally { + restoreClassLoader(oldClassLoader) + } + } + + override def error(d: SchedulerDriver, message: String) { + val oldClassLoader = setClassLoader() + try { + logError("Mesos error: " + message) + scheduler.error(message) + } finally { + restoreClassLoader(oldClassLoader) + } + } + + override def stop() { + if (driver != null) { + driver.stop() + } + } + + override def reviveOffers() { + driver.reviveOffers() + } + + override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} + + private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { + val oldClassLoader = setClassLoader() + try { + logInfo("Mesos slave lost: " + slaveId.getValue) + synchronized { + slaveIdsWithExecutors -= slaveId.getValue + } + scheduler.executorLost(slaveId.getValue, reason) + } finally { + restoreClassLoader(oldClassLoader) + } + } + + override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { + recordSlaveLost(d, slaveId, SlaveLost()) + } + + override def executorLost(d: SchedulerDriver, executorId: ExecutorID, + slaveId: SlaveID, status: Int) { + logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue, + slaveId.getValue)) + recordSlaveLost(d, slaveId, ExecutorExited(status)) + } + + // TODO: query Mesos for number of cores + override def defaultParallelism() = System.getProperty("spark.default.parallelism", "8").toInt +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala index 8cb4d1396f..e29438f4ed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala @@ -31,8 +31,7 @@ import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.ExecutorURLClassLoader import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster._ -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import akka.actor._ import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala index e52cb998bd..a2fda4c124 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala @@ -23,8 +23,8 @@ import scala.collection.mutable.HashMap import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, Success, TaskState} import org.apache.spark.TaskState.TaskState -import org.apache.spark.scheduler.{Task, TaskResult, TaskSet} -import org.apache.spark.scheduler.cluster.{Schedulable, TaskDescription, TaskInfo, TaskLocality, TaskSetManager} +import org.apache.spark.scheduler.{Schedulable, Task, TaskDescription, TaskInfo, TaskLocality, + TaskResult, TaskSet, TaskSetManager} private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet) diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala deleted file mode 100644 index 3dbe61d706..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.mesos - -import com.google.protobuf.ByteString - -import org.apache.mesos.{Scheduler => MScheduler} -import org.apache.mesos._ -import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} - -import org.apache.spark.{SparkException, Logging, SparkContext} -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.collection.JavaConversions._ -import java.io.File -import org.apache.spark.scheduler.cluster._ -import java.util.{ArrayList => JArrayList, List => JList} -import java.util.Collections -import org.apache.spark.TaskState - -/** - * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds - * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever - * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the - * StandaloneBackend mechanism. This class is useful for lower and more predictable latency. - * - * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to - * remove this. - */ -private[spark] class CoarseMesosSchedulerBackend( - scheduler: ClusterScheduler, - sc: SparkContext, - master: String, - appName: String) - extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem) - with MScheduler - with Logging { - - val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures - - // Lock used to wait for scheduler to be registered - var isRegistered = false - val registeredLock = new Object() - - // Driver for talking to Mesos - var driver: SchedulerDriver = null - - // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) - val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt - - // Cores we have acquired with each Mesos task ID - val coresByTaskId = new HashMap[Int, Int] - var totalCoresAcquired = 0 - - val slaveIdsWithExecutors = new HashSet[String] - - val taskIdToSlaveId = new HashMap[Int, String] - val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed - - val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( - "Spark home is not set; set it through the spark.home system " + - "property, the SPARK_HOME environment variable or the SparkContext constructor")) - - val extraCoresPerSlave = System.getProperty("spark.mesos.extra.cores", "0").toInt - - var nextMesosTaskId = 0 - - def newMesosTaskId(): Int = { - val id = nextMesosTaskId - nextMesosTaskId += 1 - id - } - - override def start() { - super.start() - - synchronized { - new Thread("CoarseMesosSchedulerBackend driver") { - setDaemon(true) - override def run() { - val scheduler = CoarseMesosSchedulerBackend.this - val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build() - driver = new MesosSchedulerDriver(scheduler, fwInfo, master) - try { { - val ret = driver.run() - logInfo("driver.run() returned with code " + ret) - } - } catch { - case e: Exception => logError("driver.run() failed", e) - } - } - }.start() - - waitForRegister() - } - } - - def createCommand(offer: Offer, numCores: Int): CommandInfo = { - val environment = Environment.newBuilder() - sc.executorEnvs.foreach { case (key, value) => - environment.addVariables(Environment.Variable.newBuilder() - .setName(key) - .setValue(value) - .build()) - } - val command = CommandInfo.newBuilder() - .setEnvironment(environment) - val driverUrl = "akka://spark@%s:%s/user/%s".format( - System.getProperty("spark.driver.host"), - System.getProperty("spark.driver.port"), - StandaloneSchedulerBackend.ACTOR_NAME) - val uri = System.getProperty("spark.executor.uri") - if (uri == null) { - val runScript = new File(sparkHome, "spark-class").getCanonicalPath - command.setValue( - "\"%s\" org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format( - runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) - } else { - // Grab everything to the first '.'. We'll use that and '*' to - // glob the directory "correctly". - val basename = uri.split('/').last.split('.').head - command.setValue( - "cd %s*; ./spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format( - basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) - command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) - } - return command.build() - } - - override def offerRescinded(d: SchedulerDriver, o: OfferID) {} - - override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - logInfo("Registered as framework ID " + frameworkId.getValue) - registeredLock.synchronized { - isRegistered = true - registeredLock.notifyAll() - } - } - - def waitForRegister() { - registeredLock.synchronized { - while (!isRegistered) { - registeredLock.wait() - } - } - } - - override def disconnected(d: SchedulerDriver) {} - - override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} - - /** - * Method called by Mesos to offer resources on slaves. We respond by launching an executor, - * unless we've already launched more than we wanted to. - */ - override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { - synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(-1).build() - - for (offer <- offers) { - val slaveId = offer.getSlaveId.toString - val mem = getResource(offer.getResourcesList, "mem") - val cpus = getResource(offer.getResourcesList, "cpus").toInt - if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { - // Launch an executor on the slave - val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) - val taskId = newMesosTaskId() - taskIdToSlaveId(taskId) = slaveId - slaveIdsWithExecutors += slaveId - coresByTaskId(taskId) = cpusToUse - val task = MesosTaskInfo.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) - .setSlaveId(offer.getSlaveId) - .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave)) - .setName("Task " + taskId) - .addResources(createResource("cpus", cpusToUse)) - .addResources(createResource("mem", executorMemory)) - .build() - d.launchTasks(offer.getId, Collections.singletonList(task), filters) - } else { - // Filter it out - d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters) - } - } - } - } - - /** Helper function to pull out a resource from a Mesos Resources protobuf */ - private def getResource(res: JList[Resource], name: String): Double = { - for (r <- res if r.getName == name) { - return r.getScalar.getValue - } - // If we reached here, no resource with the required name was present - throw new IllegalArgumentException("No resource called " + name + " in " + res) - } - - /** Build a Mesos resource protobuf object */ - private def createResource(resourceName: String, quantity: Double): Protos.Resource = { - Resource.newBuilder() - .setName(resourceName) - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(quantity).build()) - .build() - } - - /** Check whether a Mesos task state represents a finished task */ - private def isFinished(state: MesosTaskState) = { - state == MesosTaskState.TASK_FINISHED || - state == MesosTaskState.TASK_FAILED || - state == MesosTaskState.TASK_KILLED || - state == MesosTaskState.TASK_LOST - } - - override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { - val taskId = status.getTaskId.getValue.toInt - val state = status.getState - logInfo("Mesos task " + taskId + " is now " + state) - synchronized { - if (isFinished(state)) { - val slaveId = taskIdToSlaveId(taskId) - slaveIdsWithExecutors -= slaveId - taskIdToSlaveId -= taskId - // Remove the cores we have remembered for this task, if it's in the hashmap - for (cores <- coresByTaskId.get(taskId)) { - totalCoresAcquired -= cores - coresByTaskId -= taskId - } - // If it was a failure, mark the slave as failed for blacklisting purposes - if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) { - failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1 - if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) { - logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " + - "is Spark installed on it?") - } - } - driver.reviveOffers() // In case we'd rejected everything before but have now lost a node - } - } - } - - override def error(d: SchedulerDriver, message: String) { - logError("Mesos error: " + message) - scheduler.error(message) - } - - override def stop() { - super.stop() - if (driver != null) { - driver.stop() - } - } - - override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} - - override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { - logInfo("Mesos slave lost: " + slaveId.getValue) - synchronized { - if (slaveIdsWithExecutors.contains(slaveId.getValue)) { - // Note that the slave ID corresponds to the executor ID on that slave - slaveIdsWithExecutors -= slaveId.getValue - removeExecutor(slaveId.getValue, "Mesos slave lost") - } - } - } - - override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { - logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) - slaveLost(d, s) - } -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala deleted file mode 100644 index 541f86e338..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.mesos - -import com.google.protobuf.ByteString - -import org.apache.mesos.{Scheduler => MScheduler} -import org.apache.mesos._ -import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} - -import org.apache.spark.{SparkException, Logging, SparkContext} -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.collection.JavaConversions._ -import java.io.File -import org.apache.spark.scheduler.cluster._ -import java.util.{ArrayList => JArrayList, List => JList} -import java.util.Collections -import org.apache.spark.TaskState -import org.apache.spark.util.Utils - -/** - * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a - * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks - * from multiple apps can run on different cores) and in time (a core can switch ownership). - */ -private[spark] class MesosSchedulerBackend( - scheduler: ClusterScheduler, - sc: SparkContext, - master: String, - appName: String) - extends SchedulerBackend - with MScheduler - with Logging { - - // Lock used to wait for scheduler to be registered - var isRegistered = false - val registeredLock = new Object() - - // Driver for talking to Mesos - var driver: SchedulerDriver = null - - // Which slave IDs we have executors on - val slaveIdsWithExecutors = new HashSet[String] - val taskIdToSlaveId = new HashMap[Long, String] - - // An ExecutorInfo for our tasks - var execArgs: Array[Byte] = null - - var classLoader: ClassLoader = null - - override def start() { - synchronized { - classLoader = Thread.currentThread.getContextClassLoader - - new Thread("MesosSchedulerBackend driver") { - setDaemon(true) - override def run() { - val scheduler = MesosSchedulerBackend.this - val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build() - driver = new MesosSchedulerDriver(scheduler, fwInfo, master) - try { - val ret = driver.run() - logInfo("driver.run() returned with code " + ret) - } catch { - case e: Exception => logError("driver.run() failed", e) - } - } - }.start() - - waitForRegister() - } - } - - def createExecutorInfo(execId: String): ExecutorInfo = { - val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( - "Spark home is not set; set it through the spark.home system " + - "property, the SPARK_HOME environment variable or the SparkContext constructor")) - val environment = Environment.newBuilder() - sc.executorEnvs.foreach { case (key, value) => - environment.addVariables(Environment.Variable.newBuilder() - .setName(key) - .setValue(value) - .build()) - } - val command = CommandInfo.newBuilder() - .setEnvironment(environment) - val uri = System.getProperty("spark.executor.uri") - if (uri == null) { - command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath) - } else { - // Grab everything to the first '.'. We'll use that and '*' to - // glob the directory "correctly". - val basename = uri.split('/').last.split('.').head - command.setValue("cd %s*; ./spark-executor".format(basename)) - command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) - } - val memory = Resource.newBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build()) - .build() - ExecutorInfo.newBuilder() - .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) - .setCommand(command) - .setData(ByteString.copyFrom(createExecArg())) - .addResources(memory) - .build() - } - - /** - * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array - * containing all the spark.* system properties in the form of (String, String) pairs. - */ - private def createExecArg(): Array[Byte] = { - if (execArgs == null) { - val props = new HashMap[String, String] - val iterator = System.getProperties.entrySet.iterator - while (iterator.hasNext) { - val entry = iterator.next - val (key, value) = (entry.getKey.toString, entry.getValue.toString) - if (key.startsWith("spark.")) { - props(key) = value - } - } - // Serialize the map as an array of (String, String) pairs - execArgs = Utils.serialize(props.toArray) - } - return execArgs - } - - private def setClassLoader(): ClassLoader = { - val oldClassLoader = Thread.currentThread.getContextClassLoader - Thread.currentThread.setContextClassLoader(classLoader) - return oldClassLoader - } - - private def restoreClassLoader(oldClassLoader: ClassLoader) { - Thread.currentThread.setContextClassLoader(oldClassLoader) - } - - override def offerRescinded(d: SchedulerDriver, o: OfferID) {} - - override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - val oldClassLoader = setClassLoader() - try { - logInfo("Registered as framework ID " + frameworkId.getValue) - registeredLock.synchronized { - isRegistered = true - registeredLock.notifyAll() - } - } finally { - restoreClassLoader(oldClassLoader) - } - } - - def waitForRegister() { - registeredLock.synchronized { - while (!isRegistered) { - registeredLock.wait() - } - } - } - - override def disconnected(d: SchedulerDriver) {} - - override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} - - /** - * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets - * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that - * tasks are balanced across the cluster. - */ - override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { - val oldClassLoader = setClassLoader() - try { - synchronized { - // Build a big list of the offerable workers, and remember their indices so that we can - // figure out which Offer to reply to for each worker - val offerableIndices = new ArrayBuffer[Int] - val offerableWorkers = new ArrayBuffer[WorkerOffer] - - def enoughMemory(o: Offer) = { - val mem = getResource(o.getResourcesList, "mem") - val slaveId = o.getSlaveId.getValue - mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId) - } - - for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) { - offerableIndices += index - offerableWorkers += new WorkerOffer( - offer.getSlaveId.getValue, - offer.getHostname, - getResource(offer.getResourcesList, "cpus").toInt) - } - - // Call into the ClusterScheduler - val taskLists = scheduler.resourceOffers(offerableWorkers) - - // Build a list of Mesos tasks for each slave - val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]()) - for ((taskList, index) <- taskLists.zipWithIndex) { - if (!taskList.isEmpty) { - val offerNum = offerableIndices(index) - val slaveId = offers(offerNum).getSlaveId.getValue - slaveIdsWithExecutors += slaveId - mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size) - for (taskDesc <- taskList) { - taskIdToSlaveId(taskDesc.taskId) = slaveId - mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId)) - } - } - } - - // Reply to the offers - val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? - for (i <- 0 until offers.size) { - d.launchTasks(offers(i).getId, mesosTasks(i), filters) - } - } - } finally { - restoreClassLoader(oldClassLoader) - } - } - - /** Helper function to pull out a resource from a Mesos Resources protobuf */ - def getResource(res: JList[Resource], name: String): Double = { - for (r <- res if r.getName == name) { - return r.getScalar.getValue - } - // If we reached here, no resource with the required name was present - throw new IllegalArgumentException("No resource called " + name + " in " + res) - } - - /** Turn a Spark TaskDescription into a Mesos task */ - def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = { - val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build() - val cpuResource = Resource.newBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(1).build()) - .build() - return MesosTaskInfo.newBuilder() - .setTaskId(taskId) - .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) - .setExecutor(createExecutorInfo(slaveId)) - .setName(task.name) - .addResources(cpuResource) - .setData(ByteString.copyFrom(task.serializedTask)) - .build() - } - - /** Check whether a Mesos task state represents a finished task */ - def isFinished(state: MesosTaskState) = { - state == MesosTaskState.TASK_FINISHED || - state == MesosTaskState.TASK_FAILED || - state == MesosTaskState.TASK_KILLED || - state == MesosTaskState.TASK_LOST - } - - override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { - val oldClassLoader = setClassLoader() - try { - val tid = status.getTaskId.getValue.toLong - val state = TaskState.fromMesos(status.getState) - synchronized { - if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) { - // We lost the executor on this slave, so remember that it's gone - slaveIdsWithExecutors -= taskIdToSlaveId(tid) - } - if (isFinished(status.getState)) { - taskIdToSlaveId.remove(tid) - } - } - scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer) - } finally { - restoreClassLoader(oldClassLoader) - } - } - - override def error(d: SchedulerDriver, message: String) { - val oldClassLoader = setClassLoader() - try { - logError("Mesos error: " + message) - scheduler.error(message) - } finally { - restoreClassLoader(oldClassLoader) - } - } - - override def stop() { - if (driver != null) { - driver.stop() - } - } - - override def reviveOffers() { - driver.reviveOffers() - } - - override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} - - private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { - val oldClassLoader = setClassLoader() - try { - logInfo("Mesos slave lost: " + slaveId.getValue) - synchronized { - slaveIdsWithExecutors -= slaveId.getValue - } - scheduler.executorLost(slaveId.getValue, reason) - } finally { - restoreClassLoader(oldClassLoader) - } - } - - override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { - recordSlaveLost(d, slaveId, SlaveLost()) - } - - override def executorLost(d: SchedulerDriver, executorId: ExecutorID, - slaveId: SlaveID, status: Int) { - logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue, - slaveId.getValue)) - recordSlaveLost(d, slaveId, ExecutorExited(status)) - } - - // TODO: query Mesos for number of cores - override def defaultParallelism() = System.getProperty("spark.default.parallelism", "8").toInt -} diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala index 3ec9760ed0..453394dfda 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala @@ -21,7 +21,7 @@ import scala.util.Random import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ -import org.apache.spark.scheduler.cluster.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode /** diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index d1868dcf78..42e9be6e19 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -26,8 +26,8 @@ import org.eclipse.jetty.server.Handler import org.apache.spark.{ExceptionFailure, Logging, SparkContext} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.scheduler.cluster.TaskInfo import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListenerTaskEnd, SparkListener} +import org.apache.spark.scheduler.TaskInfo import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.Page.Executors import org.apache.spark.ui.UIUtils diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 3b428effaf..b39c0e9769 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.{NodeSeq, Node} -import org.apache.spark.scheduler.cluster.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode import org.apache.spark.ui.Page._ import org.apache.spark.ui.UIUtils._ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 5d46f38a2a..eb3b4e8522 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -21,10 +21,8 @@ import scala.Seq import scala.collection.mutable.{ListBuffer, HashMap, HashSet} import org.apache.spark.{ExceptionFailure, SparkContext, Success} -import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.TaskInfo import org.apache.spark.executor.TaskMetrics -import collection.mutable +import org.apache.spark.scheduler._ /** * Tracks task-level information to be displayed in the UI. diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala index 6aecef5120..e7eab374ad 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala @@ -32,8 +32,8 @@ import org.apache.spark.ui.JettyUtils._ import org.apache.spark.{ExceptionFailure, SparkContext, Success} import org.apache.spark.scheduler._ import collection.mutable -import org.apache.spark.scheduler.cluster.SchedulingMode -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.util.Utils /** Web UI showing progress status of all jobs in the given SparkContext. */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index b3d3666944..06810d8dbc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -21,8 +21,7 @@ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet import scala.xml.Node -import org.apache.spark.scheduler.Stage -import org.apache.spark.scheduler.cluster.Schedulable +import org.apache.spark.scheduler.{Schedulable, Stage} import org.apache.spark.ui.UIUtils /** Table showing list of pools */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index a9969ab1c0..163a3746ea 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -23,12 +23,12 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node +import org.apache.spark.{ExceptionFailure} +import org.apache.spark.executor.TaskMetrics import org.apache.spark.ui.UIUtils._ import org.apache.spark.ui.Page._ import org.apache.spark.util.{Utils, Distribution} -import org.apache.spark.{ExceptionFailure} -import org.apache.spark.scheduler.cluster.TaskInfo -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler.TaskInfo /** Page showing statistics and task list for a given stage */ private[spark] class StagePage(parent: JobProgressUI) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 32776eaa25..07db8622da 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -22,8 +22,7 @@ import java.util.Date import scala.xml.Node import scala.collection.mutable.HashSet -import org.apache.spark.scheduler.cluster.{SchedulingMode, TaskInfo} -import org.apache.spark.scheduler.Stage +import org.apache.spark.scheduler.{SchedulingMode, Stage, TaskInfo} import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 94f66c94c6..9ed591e494 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -32,9 +32,9 @@ import org.apache.spark.{Dependency, ShuffleDependency, OneToOneDependency} import org.apache.spark.{FetchFailed, Success, TaskEndReason} import org.apache.spark.storage.{BlockManagerId, BlockManagerMaster} -import org.apache.spark.scheduler.cluster.Pool -import org.apache.spark.scheduler.cluster.SchedulingMode -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import org.apache.spark.scheduler.Pool +import org.apache.spark.scheduler.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode /** * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler -- cgit v1.2.3