aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala65
-rw-r--r--core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager1
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala73
-rw-r--r--dev/.rat-excludes1
7 files changed, 193 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e41088f7c8..e7eabd2896 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -20,7 +20,7 @@ package org.apache.spark
import java.io._
import java.lang.reflect.Constructor
import java.net.URI
-import java.util.{Arrays, Properties, UUID}
+import java.util.{Arrays, Properties, ServiceLoader, UUID}
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
@@ -2453,9 +2453,32 @@ object SparkContext extends Logging {
"in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
createTaskScheduler(sc, "mesos://" + zkUrl, deployMode)
- case _ =>
- throw new SparkException("Could not parse Master URL: '" + master + "'")
+ case masterUrl =>
+ val cm = getClusterManager(masterUrl) match {
+ case Some(clusterMgr) => clusterMgr
+ case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
+ }
+ try {
+ val scheduler = cm.createTaskScheduler(sc, masterUrl)
+ val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
+ cm.initialize(scheduler, backend)
+ (backend, scheduler)
+ } catch {
+ case NonFatal(e) =>
+ throw new SparkException("External scheduler cannot be instantiated", e)
+ }
+ }
+ }
+
+ private def getClusterManager(url: String): Option[ExternalClusterManager] = {
+ val loader = Utils.getContextOrSparkClassLoader
+ val serviceLoaders =
+ ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
+ if (serviceLoaders.size > 1) {
+ throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) registered " +
+ s"for the url $url:")
}
+ serviceLoaders.headOption
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 71b4ad160d..db5b774806 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -64,7 +64,7 @@ private[spark] class CoarseGrainedExecutorBackend(
// Always receive `true`. Just ignore it
case Failure(e) =>
logError(s"Cannot register with driver: $driverUrl", e)
- System.exit(1)
+ exitExecutor()
}(ThreadUtils.sameThread)
}
@@ -81,12 +81,12 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
- System.exit(1)
+ exitExecutor()
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
- System.exit(1)
+ exitExecutor()
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
@@ -97,7 +97,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
logError("Received KillTask command but executor was null")
- System.exit(1)
+ exitExecutor()
} else {
executor.killTask(taskId, interruptThread)
}
@@ -127,7 +127,7 @@ private[spark] class CoarseGrainedExecutorBackend(
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
} else if (driver.exists(_.address == remoteAddress)) {
logError(s"Driver $remoteAddress disassociated! Shutting down.")
- System.exit(1)
+ exitExecutor()
} else {
logWarning(s"An unknown ($remoteAddress) driver disconnected.")
}
@@ -140,6 +140,13 @@ private[spark] class CoarseGrainedExecutorBackend(
case None => logWarning(s"Drop $msg because has not yet connected to driver")
}
}
+
+ /**
+ * This function can be overloaded by other child classes to handle
+ * executor exits differently. For e.g. when an executor goes down,
+ * back-end may not want to take the parent process down.
+ */
+ protected def exitExecutor(): Unit = System.exit(1)
}
private[spark] object CoarseGrainedExecutorBackend extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9f94fdef24..b20bd11f7d 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -153,6 +153,21 @@ private[spark] class Executor(
}
}
+ /**
+ * Function to kill the running tasks in an executor.
+ * This can be called by executor back-ends to kill the
+ * tasks instead of taking the JVM down.
+ * @param interruptThread whether to interrupt the task thread
+ */
+ def killAllTasks(interruptThread: Boolean) : Unit = {
+ // kill all the running tasks
+ for (taskRunner <- runningTasks.values().asScala) {
+ if (taskRunner != null) {
+ taskRunner.kill(interruptThread)
+ }
+ }
+ }
+
def stop(): Unit = {
env.metricsSystem.report()
heartbeater.shutdown()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala b/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala
new file mode 100644
index 0000000000..6ca1f569b9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * A cluster manager interface to plugin external scheduler.
+ */
+@DeveloperApi
+trait ExternalClusterManager {
+
+ /**
+ * Check if this cluster manager instance can create scheduler components
+ * for a certain master URL.
+ * @param masterURL the master URL
+ * @return True if the cluster manager can create scheduler backend/
+ */
+ def canCreate(masterURL: String): Boolean
+
+ /**
+ * Create a task scheduler instance for the given SparkContext
+ * @param sc SparkContext
+ * @param masterURL the master URL
+ * @return TaskScheduler that will be responsible for task handling
+ */
+ def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
+
+ /**
+ * Create a scheduler backend for the given SparkContext and scheduler. This is
+ * called after task scheduler is created using [[ExternalClusterManager.createTaskScheduler()]].
+ * @param sc SparkContext
+ * @param masterURL the master URL
+ * @param scheduler TaskScheduler that will be used with the scheduler backend.
+ * @return SchedulerBackend that works with a TaskScheduler
+ */
+ def createSchedulerBackend(sc: SparkContext,
+ masterURL: String,
+ scheduler: TaskScheduler): SchedulerBackend
+
+ /**
+ * Initialize task scheduler and backend scheduler. This is called after the
+ * scheduler components are created
+ * @param scheduler TaskScheduler that will be responsible for task handling
+ * @param backend SchedulerBackend that works with a TaskScheduler
+ */
+ def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
+}
diff --git a/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
new file mode 100644
index 0000000000..3c570ffd8f
--- /dev/null
+++ b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
@@ -0,0 +1 @@
+org.apache.spark.scheduler.DummyExternalClusterManager \ No newline at end of file
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
new file mode 100644
index 0000000000..9971d48a52
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+import org.apache.spark.storage.BlockManagerId
+
+class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext
+{
+ test("launch of backend and scheduler") {
+ val conf = new SparkConf().setMaster("myclusterManager").
+ setAppName("testcm").set("spark.driver.allowMultipleContexts", "true")
+ sc = new SparkContext(conf)
+ // check if the scheduler components are created
+ assert(sc.schedulerBackend.isInstanceOf[DummySchedulerBackend])
+ assert(sc.taskScheduler.isInstanceOf[DummyTaskScheduler])
+ }
+}
+
+private class DummyExternalClusterManager extends ExternalClusterManager {
+
+ def canCreate(masterURL: String): Boolean = masterURL == "myclusterManager"
+
+ def createTaskScheduler(sc: SparkContext,
+ masterURL: String): TaskScheduler = new DummyTaskScheduler
+
+ def createSchedulerBackend(sc: SparkContext,
+ masterURL: String,
+ scheduler: TaskScheduler): SchedulerBackend = new DummySchedulerBackend()
+
+ def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {}
+
+}
+
+private class DummySchedulerBackend extends SchedulerBackend {
+ def start() {}
+ def stop() {}
+ def reviveOffers() {}
+ def defaultParallelism(): Int = 1
+}
+
+private class DummyTaskScheduler extends TaskScheduler {
+ override def rootPool: Pool = null
+ override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+ override def start(): Unit = {}
+ override def stop(): Unit = {}
+ override def submitTasks(taskSet: TaskSet): Unit = {}
+ override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {}
+ override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
+ override def defaultParallelism(): Int = 2
+ override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+ override def applicationAttemptId(): Option[String] = None
+ def executorHeartbeatReceived(
+ execId: String,
+ accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+ blockManagerId: BlockManagerId): Boolean = true
+}
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index 8b5061415f..a409a3cb36 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -98,3 +98,4 @@ LZ4BlockInputStream.java
spark-deps-.*
.*csv
.*tsv
+org.apache.spark.scheduler.ExternalClusterManager