aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHemant Bhanawat <hemant@snappydata.io>2016-04-16 23:43:32 -0700
committerReynold Xin <rxin@databricks.com>2016-04-16 23:43:32 -0700
commitaf1f4da76268115c5a4cc3035d3236ad27f7240a (patch)
treeaf080d14f8f5f1172e67353d0350404f92506f99
parent3394b12c379fe0a423d73dc6316aadca18cd2110 (diff)
downloadspark-af1f4da76268115c5a4cc3035d3236ad27f7240a.tar.gz
spark-af1f4da76268115c5a4cc3035d3236ad27f7240a.tar.bz2
spark-af1f4da76268115c5a4cc3035d3236ad27f7240a.zip
[SPARK-13904][SCHEDULER] Add support for pluggable cluster manager
## What changes were proposed in this pull request? This commit adds support for pluggable cluster manager. And also allows a cluster manager to clean up tasks without taking the parent process down. To plug a new external cluster manager, ExternalClusterManager trait should be implemented. It returns task scheduler and backend scheduler that will be used by SparkContext to schedule tasks. An external cluster manager is registered using the java.util.ServiceLoader mechanism (This mechanism is also being used to register data sources like parquet, json, jdbc etc.). This allows auto-loading implementations of ExternalClusterManager interface. Currently, when a driver fails, executors exit using system.exit. This does not bode well for cluster managers that would like to reuse the parent process of an executor. Hence, 1. Moving system.exit to a function that can be overriden in subclasses of CoarseGrainedExecutorBackend. 2. Added functionality of killing all the running tasks in an executor. ## How was this patch tested? ExternalClusterManagerSuite.scala was added to test this patch. Author: Hemant Bhanawat <hemant@snappydata.io> Closes #11723 from hbhanawat/pluggableScheduler.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala65
-rw-r--r--core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager1
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala73
-rw-r--r--dev/.rat-excludes1
7 files changed, 193 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e41088f7c8..e7eabd2896 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -20,7 +20,7 @@ package org.apache.spark
import java.io._
import java.lang.reflect.Constructor
import java.net.URI
-import java.util.{Arrays, Properties, UUID}
+import java.util.{Arrays, Properties, ServiceLoader, UUID}
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
@@ -2453,9 +2453,32 @@ object SparkContext extends Logging {
"in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
createTaskScheduler(sc, "mesos://" + zkUrl, deployMode)
- case _ =>
- throw new SparkException("Could not parse Master URL: '" + master + "'")
+ case masterUrl =>
+ val cm = getClusterManager(masterUrl) match {
+ case Some(clusterMgr) => clusterMgr
+ case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
+ }
+ try {
+ val scheduler = cm.createTaskScheduler(sc, masterUrl)
+ val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
+ cm.initialize(scheduler, backend)
+ (backend, scheduler)
+ } catch {
+ case NonFatal(e) =>
+ throw new SparkException("External scheduler cannot be instantiated", e)
+ }
+ }
+ }
+
+ private def getClusterManager(url: String): Option[ExternalClusterManager] = {
+ val loader = Utils.getContextOrSparkClassLoader
+ val serviceLoaders =
+ ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
+ if (serviceLoaders.size > 1) {
+ throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) registered " +
+ s"for the url $url:")
}
+ serviceLoaders.headOption
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 71b4ad160d..db5b774806 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -64,7 +64,7 @@ private[spark] class CoarseGrainedExecutorBackend(
// Always receive `true`. Just ignore it
case Failure(e) =>
logError(s"Cannot register with driver: $driverUrl", e)
- System.exit(1)
+ exitExecutor()
}(ThreadUtils.sameThread)
}
@@ -81,12 +81,12 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
- System.exit(1)
+ exitExecutor()
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
- System.exit(1)
+ exitExecutor()
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
@@ -97,7 +97,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
logError("Received KillTask command but executor was null")
- System.exit(1)
+ exitExecutor()
} else {
executor.killTask(taskId, interruptThread)
}
@@ -127,7 +127,7 @@ private[spark] class CoarseGrainedExecutorBackend(
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
} else if (driver.exists(_.address == remoteAddress)) {
logError(s"Driver $remoteAddress disassociated! Shutting down.")
- System.exit(1)
+ exitExecutor()
} else {
logWarning(s"An unknown ($remoteAddress) driver disconnected.")
}
@@ -140,6 +140,13 @@ private[spark] class CoarseGrainedExecutorBackend(
case None => logWarning(s"Drop $msg because has not yet connected to driver")
}
}
+
+ /**
+ * This function can be overloaded by other child classes to handle
+ * executor exits differently. For e.g. when an executor goes down,
+ * back-end may not want to take the parent process down.
+ */
+ protected def exitExecutor(): Unit = System.exit(1)
}
private[spark] object CoarseGrainedExecutorBackend extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9f94fdef24..b20bd11f7d 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -153,6 +153,21 @@ private[spark] class Executor(
}
}
+ /**
+ * Function to kill the running tasks in an executor.
+ * This can be called by executor back-ends to kill the
+ * tasks instead of taking the JVM down.
+ * @param interruptThread whether to interrupt the task thread
+ */
+ def killAllTasks(interruptThread: Boolean) : Unit = {
+ // kill all the running tasks
+ for (taskRunner <- runningTasks.values().asScala) {
+ if (taskRunner != null) {
+ taskRunner.kill(interruptThread)
+ }
+ }
+ }
+
def stop(): Unit = {
env.metricsSystem.report()
heartbeater.shutdown()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala b/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala
new file mode 100644
index 0000000000..6ca1f569b9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * A cluster manager interface to plugin external scheduler.
+ */
+@DeveloperApi
+trait ExternalClusterManager {
+
+ /**
+ * Check if this cluster manager instance can create scheduler components
+ * for a certain master URL.
+ * @param masterURL the master URL
+ * @return True if the cluster manager can create scheduler backend/
+ */
+ def canCreate(masterURL: String): Boolean
+
+ /**
+ * Create a task scheduler instance for the given SparkContext
+ * @param sc SparkContext
+ * @param masterURL the master URL
+ * @return TaskScheduler that will be responsible for task handling
+ */
+ def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
+
+ /**
+ * Create a scheduler backend for the given SparkContext and scheduler. This is
+ * called after task scheduler is created using [[ExternalClusterManager.createTaskScheduler()]].
+ * @param sc SparkContext
+ * @param masterURL the master URL
+ * @param scheduler TaskScheduler that will be used with the scheduler backend.
+ * @return SchedulerBackend that works with a TaskScheduler
+ */
+ def createSchedulerBackend(sc: SparkContext,
+ masterURL: String,
+ scheduler: TaskScheduler): SchedulerBackend
+
+ /**
+ * Initialize task scheduler and backend scheduler. This is called after the
+ * scheduler components are created
+ * @param scheduler TaskScheduler that will be responsible for task handling
+ * @param backend SchedulerBackend that works with a TaskScheduler
+ */
+ def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
+}
diff --git a/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
new file mode 100644
index 0000000000..3c570ffd8f
--- /dev/null
+++ b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
@@ -0,0 +1 @@
+org.apache.spark.scheduler.DummyExternalClusterManager \ No newline at end of file
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
new file mode 100644
index 0000000000..9971d48a52
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+import org.apache.spark.storage.BlockManagerId
+
+class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext
+{
+ test("launch of backend and scheduler") {
+ val conf = new SparkConf().setMaster("myclusterManager").
+ setAppName("testcm").set("spark.driver.allowMultipleContexts", "true")
+ sc = new SparkContext(conf)
+ // check if the scheduler components are created
+ assert(sc.schedulerBackend.isInstanceOf[DummySchedulerBackend])
+ assert(sc.taskScheduler.isInstanceOf[DummyTaskScheduler])
+ }
+}
+
+private class DummyExternalClusterManager extends ExternalClusterManager {
+
+ def canCreate(masterURL: String): Boolean = masterURL == "myclusterManager"
+
+ def createTaskScheduler(sc: SparkContext,
+ masterURL: String): TaskScheduler = new DummyTaskScheduler
+
+ def createSchedulerBackend(sc: SparkContext,
+ masterURL: String,
+ scheduler: TaskScheduler): SchedulerBackend = new DummySchedulerBackend()
+
+ def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {}
+
+}
+
+private class DummySchedulerBackend extends SchedulerBackend {
+ def start() {}
+ def stop() {}
+ def reviveOffers() {}
+ def defaultParallelism(): Int = 1
+}
+
+private class DummyTaskScheduler extends TaskScheduler {
+ override def rootPool: Pool = null
+ override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+ override def start(): Unit = {}
+ override def stop(): Unit = {}
+ override def submitTasks(taskSet: TaskSet): Unit = {}
+ override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {}
+ override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
+ override def defaultParallelism(): Int = 2
+ override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+ override def applicationAttemptId(): Option[String] = None
+ def executorHeartbeatReceived(
+ execId: String,
+ accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+ blockManagerId: BlockManagerId): Boolean = true
+}
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index 8b5061415f..a409a3cb36 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -98,3 +98,4 @@ LZ4BlockInputStream.java
spark-deps-.*
.*csv
.*tsv
+org.apache.spark.scheduler.ExternalClusterManager