diff options
4 files changed, 58 insertions, 67 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e7eabd2896..f322a770bf 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2389,53 +2389,6 @@ object SparkContext extends Logging { } (backend, scheduler) - case "yarn" if deployMode == "cluster" => - val scheduler = try { - val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler") - val cons = clazz.getConstructor(classOf[SparkContext]) - cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl] - } catch { - // TODO: Enumerate the exact reasons why it can fail - // But irrespective of it, it means we cannot proceed ! - case e: Exception => - throw new SparkException("YARN mode not available ?", e) - } - val backend = try { - val clazz = - Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend") - val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) - cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] - } catch { - case e: Exception => - throw new SparkException("YARN mode not available ?", e) - } - scheduler.initialize(backend) - (backend, scheduler) - - case "yarn" if deployMode == "client" => - val scheduler = try { - val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler") - val cons = clazz.getConstructor(classOf[SparkContext]) - cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl] - - } catch { - case e: Exception => - throw new SparkException("YARN mode not available ?", e) - } - - val backend = try { - val clazz = - Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend") - val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) - cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] - } catch { - case e: Exception => - throw new SparkException("YARN mode not available ?", e) - } - - scheduler.initialize(backend) - (backend, scheduler) - case MESOS_REGEX(mesosUrl) => MesosNativeLibrary.load() val scheduler = new TaskSchedulerImpl(sc) @@ -2464,6 +2417,7 @@ object SparkContext extends Logging { cm.initialize(scheduler, backend) (backend, scheduler) } catch { + case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index 49c2bf6bca..213d70f4e5 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -129,26 +129,6 @@ class SparkContextSchedulerCreationSuite } } - def testYarn(master: String, deployMode: String, expectedClassName: String) { - try { - val sched = createTaskScheduler(master, deployMode) - assert(sched.getClass === Utils.classForName(expectedClassName)) - } catch { - case e: SparkException => - assert(e.getMessage.contains("YARN mode not available")) - logWarning("YARN not available, could not test actual YARN scheduler creation") - case e: Throwable => fail(e) - } - } - - test("yarn-cluster") { - testYarn("yarn", "cluster", "org.apache.spark.scheduler.cluster.YarnClusterScheduler") - } - - test("yarn-client") { - testYarn("yarn", "client", "org.apache.spark.scheduler.cluster.YarnScheduler") - } - def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) { val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString) try { diff --git a/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager new file mode 100644 index 0000000000..6e8a1ebfc6 --- /dev/null +++ b/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager @@ -0,0 +1 @@ +org.apache.spark.scheduler.cluster.YarnClusterManager diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala new file mode 100644 index 0000000000..64cd1bd088 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} + +/** + * Cluster Manager for creation of Yarn scheduler and backend + */ +private[spark] class YarnClusterManager extends ExternalClusterManager { + + override def canCreate(masterURL: String): Boolean = { + masterURL == "yarn" + } + + override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { + sc.deployMode match { + case "cluster" => new YarnClusterScheduler(sc) + case "client" => new YarnScheduler(sc) + case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") + } + } + + override def createSchedulerBackend(sc: SparkContext, + masterURL: String, + scheduler: TaskScheduler): SchedulerBackend = { + sc.deployMode match { + case "cluster" => + new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) + case "client" => + new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) + case _ => + throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") + } + } + + override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { + scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) + } +} |