From 081a0b6861321d262a82166bc1df61959e9c6387 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 28 Nov 2013 20:39:10 -0800 Subject: Add unit test for SparkContext scheduler creation Since YARN and Mesos are not necessarily available in the system, they are allowed to pass as long as the YARN/Mesos code paths are exercised. --- .../main/scala/org/apache/spark/SparkContext.scala | 234 +++++++++++---------- .../spark/scheduler/local/LocalScheduler.scala | 2 +- .../spark/SparkContextSchedulerCreationSuite.scala | 135 ++++++++++++ 3 files changed, 255 insertions(+), 116 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cf1fd497f0..1eb00e79e1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -153,121 +153,7 @@ class SparkContext( executorEnvs("SPARK_USER") = sparkUser // Create and start the scheduler - private[spark] var taskScheduler: TaskScheduler = { - // Regular expression used for local[N] master format - val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r - // Regular expression for local[N, maxRetries], used in tests with failing tasks - val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r - // Regular expression for simulating a Spark cluster of [N, cores, memory] locally - val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r - // Regular expression for connecting to Spark deploy clusters - val SPARK_REGEX = """spark://(.*)""".r - // Regular expression for connection to Mesos cluster by mesos:// or zk:// url - val MESOS_REGEX = """(mesos|zk)://.*""".r - // Regular expression for connection to Simr cluster - val SIMR_REGEX = """simr://(.*)""".r - - master match { - case "local" => - new LocalScheduler(1, 0, this) - - case LOCAL_N_REGEX(threads) => - new LocalScheduler(threads.toInt, 0, this) - - case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => - new LocalScheduler(threads.toInt, maxFailures.toInt, this) - - case SPARK_REGEX(sparkUrl) => - val scheduler = new ClusterScheduler(this) - val masterUrls = sparkUrl.split(",").map("spark://" + _) - val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName) - scheduler.initialize(backend) - scheduler - - case SIMR_REGEX(simrUrl) => - val scheduler = new ClusterScheduler(this) - val backend = new SimrSchedulerBackend(scheduler, this, simrUrl) - scheduler.initialize(backend) - scheduler - - case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => - // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. - val memoryPerSlaveInt = memoryPerSlave.toInt - if (SparkContext.executorMemoryRequested > memoryPerSlaveInt) { - throw new SparkException( - "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format( - memoryPerSlaveInt, SparkContext.executorMemoryRequested)) - } - - val scheduler = new ClusterScheduler(this) - val localCluster = new LocalSparkCluster( - numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt) - val masterUrls = localCluster.start() - val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName) - scheduler.initialize(backend) - backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { - localCluster.stop() - } - scheduler - - case "yarn-standalone" => - val scheduler = try { - val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler") - val cons = clazz.getConstructor(classOf[SparkContext]) - cons.newInstance(this).asInstanceOf[ClusterScheduler] - } catch { - // TODO: Enumerate the exact reasons why it can fail - // But irrespective of it, it means we cannot proceed ! - case th: Throwable => { - throw new SparkException("YARN mode not available ?", th) - } - } - val backend = new CoarseGrainedSchedulerBackend(scheduler, this.env.actorSystem) - scheduler.initialize(backend) - scheduler - - case "yarn-client" => - val scheduler = try { - val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler") - val cons = clazz.getConstructor(classOf[SparkContext]) - cons.newInstance(this).asInstanceOf[ClusterScheduler] - - } catch { - case th: Throwable => { - throw new SparkException("YARN mode not available ?", th) - } - } - - val backend = try { - val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend") - val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext]) - cons.newInstance(scheduler, this).asInstanceOf[CoarseGrainedSchedulerBackend] - } catch { - case th: Throwable => { - throw new SparkException("YARN mode not available ?", th) - } - } - - scheduler.initialize(backend) - scheduler - - case mesosUrl @ MESOS_REGEX(_) => - MesosNativeLibrary.load() - val scheduler = new ClusterScheduler(this) - val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean - val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs - val backend = if (coarseGrained) { - new CoarseMesosSchedulerBackend(scheduler, this, url, appName) - } else { - new MesosSchedulerBackend(scheduler, this, url, appName) - } - scheduler.initialize(backend) - scheduler - - case _ => - throw new SparkException("Could not parse Master URL: '" + master + "'") - } - } + private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName) taskScheduler.start() @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler) @@ -1137,6 +1023,124 @@ object SparkContext { .map(Utils.memoryStringToMb) .getOrElse(512) } + + // Creates a task scheduler based on a given master URL. Extracted for testing. + private + def createTaskScheduler(sc: SparkContext, master: String, appName: String): TaskScheduler = { + // Regular expression used for local[N] master format + val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r + // Regular expression for local[N, maxRetries], used in tests with failing tasks + val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r + // Regular expression for simulating a Spark cluster of [N, cores, memory] locally + val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r + // Regular expression for connecting to Spark deploy clusters + val SPARK_REGEX = """spark://(.*)""".r + // Regular expression for connection to Mesos cluster by mesos:// or zk:// url + val MESOS_REGEX = """(mesos|zk)://.*""".r + // Regular expression for connection to Simr cluster + val SIMR_REGEX = """simr://(.*)""".r + + master match { + case "local" => + new LocalScheduler(1, 0, sc) + + case LOCAL_N_REGEX(threads) => + new LocalScheduler(threads.toInt, 0, sc) + + case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => + new LocalScheduler(threads.toInt, maxFailures.toInt, sc) + + case SPARK_REGEX(sparkUrl) => + val scheduler = new ClusterScheduler(sc) + val masterUrls = sparkUrl.split(",").map("spark://" + _) + val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, appName) + scheduler.initialize(backend) + scheduler + + case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => + // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. + val memoryPerSlaveInt = memoryPerSlave.toInt + if (SparkContext.executorMemoryRequested > memoryPerSlaveInt) { + throw new SparkException( + "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format( + memoryPerSlaveInt, SparkContext.executorMemoryRequested)) + } + + val scheduler = new ClusterScheduler(sc) + val localCluster = new LocalSparkCluster( + numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt) + val masterUrls = localCluster.start() + val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, appName) + scheduler.initialize(backend) + backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { + localCluster.stop() + } + scheduler + + case "yarn-standalone" => + val scheduler = try { + val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler") + val cons = clazz.getConstructor(classOf[SparkContext]) + cons.newInstance(sc).asInstanceOf[ClusterScheduler] + } catch { + // TODO: Enumerate the exact reasons why it can fail + // But irrespective of it, it means we cannot proceed ! + case th: Throwable => { + throw new SparkException("YARN mode not available ?", th) + } + } + val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + scheduler.initialize(backend) + scheduler + + case "yarn-client" => + val scheduler = try { + val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler") + val cons = clazz.getConstructor(classOf[SparkContext]) + cons.newInstance(sc).asInstanceOf[ClusterScheduler] + + } catch { + case th: Throwable => { + throw new SparkException("YARN mode not available ?", th) + } + } + + val backend = try { + val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend") + val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext]) + cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] + } catch { + case th: Throwable => { + throw new SparkException("YARN mode not available ?", th) + } + } + + scheduler.initialize(backend) + scheduler + + case mesosUrl @ MESOS_REGEX(_) => + MesosNativeLibrary.load() + val scheduler = new ClusterScheduler(sc) + val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean + val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs + val backend = if (coarseGrained) { + new CoarseMesosSchedulerBackend(scheduler, sc, url, appName) + } else { + new MesosSchedulerBackend(scheduler, sc, url, appName) + } + scheduler.initialize(backend) + scheduler + + case SIMR_REGEX(simrUrl) => + val scheduler = new ClusterScheduler(sc) + val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl) + scheduler.initialize(backend) + scheduler + + case _ => + throw new SparkException("Could not parse Master URL: '" + master + "'") + } + } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala index 2699f0b33e..5af51164f7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala @@ -74,7 +74,7 @@ class LocalActor(localScheduler: LocalScheduler, private var freeCores: Int) } } -private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: SparkContext) +private[spark] class LocalScheduler(val threads: Int, val maxFailures: Int, val sc: SparkContext) extends TaskScheduler with ExecutorBackend with Logging { diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala new file mode 100644 index 0000000000..61d6163659 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.scalatest.{FunSuite, PrivateMethodTester} + +import org.apache.spark.scheduler.TaskScheduler +import org.apache.spark.scheduler.cluster.{ClusterScheduler, SimrSchedulerBackend, SparkDeploySchedulerBackend} +import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} +import org.apache.spark.scheduler.local.LocalScheduler + +class SparkContextSchedulerCreationSuite + extends FunSuite with PrivateMethodTester with LocalSparkContext with Logging { + + def createTaskScheduler(master: String): TaskScheduler = { + // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the + // real schedulers, so we don't want to create a full SparkContext with the desired scheduler. + sc = new SparkContext("local", "test") + val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler) + SparkContext invokePrivate createTaskSchedulerMethod(sc, master, "test") + } + + test("bad-master") { + val e = intercept[SparkException] { + createTaskScheduler("localhost:1234") + } + assert(e.getMessage.contains("Could not parse Master URL")) + } + + test("local") { + createTaskScheduler("local") match { + case s: LocalScheduler => + assert(s.threads === 1) + assert(s.maxFailures === 0) + case _ => fail() + } + } + + test("local-n") { + createTaskScheduler("local[5]") match { + case s: LocalScheduler => + assert(s.threads === 5) + assert(s.maxFailures === 0) + case _ => fail() + } + } + + test("local-n-failures") { + createTaskScheduler("local[4, 2]") match { + case s: LocalScheduler => + assert(s.threads === 4) + assert(s.maxFailures === 2) + case _ => fail() + } + } + + test("simr") { + createTaskScheduler("simr://uri") match { + case s: ClusterScheduler => + assert(s.backend.isInstanceOf[SimrSchedulerBackend]) + case _ => fail() + } + } + + test("local-cluster") { + createTaskScheduler("local-cluster[3, 14, 512]") match { + case s: ClusterScheduler => + assert(s.backend.isInstanceOf[SparkDeploySchedulerBackend]) + case _ => fail() + } + } + + def testYarn(master: String, expectedClassName: String) { + try { + createTaskScheduler(master) match { + case s: ClusterScheduler => + assert(s.getClass === Class.forName(expectedClassName)) + case _ => fail() + } + } catch { + case e: SparkException => + assert(e.getMessage.contains("YARN mode not available")) + logWarning("YARN not available, could not test actual YARN scheduler creation") + case e: Throwable => fail(e) + } + } + test("yarn-standalone") { + testYarn("yarn-standalone", "org.apache.spark.scheduler.cluster.YarnClusterScheduler") + } + test("yarn-client") { + testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnClientClusterScheduler") + } + + def testMesos(master: String, expectedClass: Class[_]) { + try { + createTaskScheduler(master) match { + case s: ClusterScheduler => + assert(s.backend.getClass === expectedClass) + case _ => fail() + } + } catch { + case e: UnsatisfiedLinkError => + assert(e.getMessage.contains("no mesos in")) + logWarning("Mesos not available, could not test actual Mesos scheduler creation") + case e: Throwable => fail(e) + } + } + test("mesos fine-grained") { + System.setProperty("spark.mesos.coarse", "false") + testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend]) + } + test("mesos coarse-grained") { + System.setProperty("spark.mesos.coarse", "true") + testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend]) + } + test("mesos with zookeeper") { + System.setProperty("spark.mesos.coarse", "false") + testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend]) + } +} -- cgit v1.2.3