diff options
7 files changed, 193 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e41088f7c8..e7eabd2896 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -20,7 +20,7 @@ package org.apache.spark import java.io._ import java.lang.reflect.Constructor import java.net.URI -import java.util.{Arrays, Properties, UUID} +import java.util.{Arrays, Properties, ServiceLoader, UUID} import java.util.concurrent.ConcurrentMap import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference} @@ -2453,9 +2453,32 @@ object SparkContext extends Logging { "in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.") createTaskScheduler(sc, "mesos://" + zkUrl, deployMode) - case _ => - throw new SparkException("Could not parse Master URL: '" + master + "'") + case masterUrl => + val cm = getClusterManager(masterUrl) match { + case Some(clusterMgr) => clusterMgr + case None => throw new SparkException("Could not parse Master URL: '" + master + "'") + } + try { + val scheduler = cm.createTaskScheduler(sc, masterUrl) + val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) + cm.initialize(scheduler, backend) + (backend, scheduler) + } catch { + case NonFatal(e) => + throw new SparkException("External scheduler cannot be instantiated", e) + } + } + } + + private def getClusterManager(url: String): Option[ExternalClusterManager] = { + val loader = Utils.getContextOrSparkClassLoader + val serviceLoaders = + ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url)) + if (serviceLoaders.size > 1) { + throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) registered " + + s"for the url $url:") } + serviceLoaders.headOption } } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 71b4ad160d..db5b774806 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -64,7 +64,7 @@ private[spark] class CoarseGrainedExecutorBackend( // Always receive `true`. Just ignore it case Failure(e) => logError(s"Cannot register with driver: $driverUrl", e) - System.exit(1) + exitExecutor() }(ThreadUtils.sameThread) } @@ -81,12 +81,12 @@ private[spark] class CoarseGrainedExecutorBackend( case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) - System.exit(1) + exitExecutor() case LaunchTask(data) => if (executor == null) { logError("Received LaunchTask command but executor was null") - System.exit(1) + exitExecutor() } else { val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) @@ -97,7 +97,7 @@ private[spark] class CoarseGrainedExecutorBackend( case KillTask(taskId, _, interruptThread) => if (executor == null) { logError("Received KillTask command but executor was null") - System.exit(1) + exitExecutor() } else { executor.killTask(taskId, interruptThread) } @@ -127,7 +127,7 @@ private[spark] class CoarseGrainedExecutorBackend( logInfo(s"Driver from $remoteAddress disconnected during shutdown") } else if (driver.exists(_.address == remoteAddress)) { logError(s"Driver $remoteAddress disassociated! Shutting down.") - System.exit(1) + exitExecutor() } else { logWarning(s"An unknown ($remoteAddress) driver disconnected.") } @@ -140,6 +140,13 @@ private[spark] class CoarseGrainedExecutorBackend( case None => logWarning(s"Drop $msg because has not yet connected to driver") } } + + /** + * This function can be overloaded by other child classes to handle + * executor exits differently. For e.g. when an executor goes down, + * back-end may not want to take the parent process down. + */ + protected def exitExecutor(): Unit = System.exit(1) } private[spark] object CoarseGrainedExecutorBackend extends Logging { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 9f94fdef24..b20bd11f7d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -153,6 +153,21 @@ private[spark] class Executor( } } + /** + * Function to kill the running tasks in an executor. + * This can be called by executor back-ends to kill the + * tasks instead of taking the JVM down. + * @param interruptThread whether to interrupt the task thread + */ + def killAllTasks(interruptThread: Boolean) : Unit = { + // kill all the running tasks + for (taskRunner <- runningTasks.values().asScala) { + if (taskRunner != null) { + taskRunner.kill(interruptThread) + } + } + } + def stop(): Unit = { env.metricsSystem.report() heartbeater.shutdown() diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala b/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala new file mode 100644 index 0000000000..6ca1f569b9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.SparkContext +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * A cluster manager interface to plugin external scheduler. + */ +@DeveloperApi +trait ExternalClusterManager { + + /** + * Check if this cluster manager instance can create scheduler components + * for a certain master URL. + * @param masterURL the master URL + * @return True if the cluster manager can create scheduler backend/ + */ + def canCreate(masterURL: String): Boolean + + /** + * Create a task scheduler instance for the given SparkContext + * @param sc SparkContext + * @param masterURL the master URL + * @return TaskScheduler that will be responsible for task handling + */ + def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler + + /** + * Create a scheduler backend for the given SparkContext and scheduler. This is + * called after task scheduler is created using [[ExternalClusterManager.createTaskScheduler()]]. + * @param sc SparkContext + * @param masterURL the master URL + * @param scheduler TaskScheduler that will be used with the scheduler backend. + * @return SchedulerBackend that works with a TaskScheduler + */ + def createSchedulerBackend(sc: SparkContext, + masterURL: String, + scheduler: TaskScheduler): SchedulerBackend + + /** + * Initialize task scheduler and backend scheduler. This is called after the + * scheduler components are created + * @param scheduler TaskScheduler that will be responsible for task handling + * @param backend SchedulerBackend that works with a TaskScheduler + */ + def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit +} diff --git a/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager new file mode 100644 index 0000000000..3c570ffd8f --- /dev/null +++ b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager @@ -0,0 +1 @@ +org.apache.spark.scheduler.DummyExternalClusterManager
\ No newline at end of file diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala new file mode 100644 index 0000000000..9971d48a52 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode +import org.apache.spark.storage.BlockManagerId + +class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext +{ + test("launch of backend and scheduler") { + val conf = new SparkConf().setMaster("myclusterManager"). + setAppName("testcm").set("spark.driver.allowMultipleContexts", "true") + sc = new SparkContext(conf) + // check if the scheduler components are created + assert(sc.schedulerBackend.isInstanceOf[DummySchedulerBackend]) + assert(sc.taskScheduler.isInstanceOf[DummyTaskScheduler]) + } +} + +private class DummyExternalClusterManager extends ExternalClusterManager { + + def canCreate(masterURL: String): Boolean = masterURL == "myclusterManager" + + def createTaskScheduler(sc: SparkContext, + masterURL: String): TaskScheduler = new DummyTaskScheduler + + def createSchedulerBackend(sc: SparkContext, + masterURL: String, + scheduler: TaskScheduler): SchedulerBackend = new DummySchedulerBackend() + + def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {} + +} + +private class DummySchedulerBackend extends SchedulerBackend { + def start() {} + def stop() {} + def reviveOffers() {} + def defaultParallelism(): Int = 1 +} + +private class DummyTaskScheduler extends TaskScheduler { + override def rootPool: Pool = null + override def schedulingMode: SchedulingMode = SchedulingMode.NONE + override def start(): Unit = {} + override def stop(): Unit = {} + override def submitTasks(taskSet: TaskSet): Unit = {} + override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {} + override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} + override def defaultParallelism(): Int = 2 + override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} + override def applicationAttemptId(): Option[String] = None + def executorHeartbeatReceived( + execId: String, + accumUpdates: Array[(Long, Seq[AccumulableInfo])], + blockManagerId: BlockManagerId): Boolean = true +} diff --git a/dev/.rat-excludes b/dev/.rat-excludes index 8b5061415f..a409a3cb36 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -98,3 +98,4 @@ LZ4BlockInputStream.java spark-deps-.* .*csv .*tsv +org.apache.spark.scheduler.ExternalClusterManager |