diff options
-rwxr-xr-x | bin/spark-class | 16 | ||||
-rw-r--r-- | core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java (renamed from launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java) | 0 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala | 10 | ||||
-rw-r--r-- | launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java | 28 | ||||
-rw-r--r-- | pom.xml | 8 | ||||
-rw-r--r-- | project/SparkBuild.scala | 2 | ||||
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala | 37 | ||||
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 20 | ||||
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala | 2 | ||||
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala | 36 |
10 files changed, 122 insertions, 37 deletions
diff --git a/bin/spark-class b/bin/spark-class index 2b59e5df57..e38e08dec4 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -43,17 +43,19 @@ else fi num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)" -if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" ]; then +if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" -a "$SPARK_PREPEND_CLASSES" != "1" ]; then echo "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2 echo "You need to build Spark before running this program." 1>&2 exit 1 fi -ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)" -if [ "$num_jars" -gt "1" ]; then - echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2 - echo "$ASSEMBLY_JARS" 1>&2 - echo "Please remove all but one jar." 1>&2 - exit 1 +if [ -d "$ASSEMBLY_DIR" ]; then + ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)" + if [ "$num_jars" -gt "1" ]; then + echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2 + echo "$ASSEMBLY_JARS" 1>&2 + echo "Please remove all but one jar." 1>&2 + exit 1 + fi fi SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}" diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index d0c26dd056..d0c26dd056 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index 48e74f06f7..fb7a8ae3f9 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -310,8 +310,14 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext { val _sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", broadcastConf) // Wait until all salves are up - _sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000) - _sc + try { + _sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000) + _sc + } catch { + case e: Throwable => + _sc.stop() + throw e + } } else { new SparkContext("local", "test", broadcastConf) } diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index 5e793a5c48..0a237ee73b 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -169,9 +169,11 @@ abstract class AbstractCommandBuilder { "streaming", "tools", "sql/catalyst", "sql/core", "sql/hive", "sql/hive-thriftserver", "yarn", "launcher"); if (prependClasses) { - System.err.println( - "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of " + - "assembly."); + if (!isTesting) { + System.err.println( + "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of " + + "assembly."); + } for (String project : projects) { addToClassPath(cp, String.format("%s/%s/target/scala-%s/classes", sparkHome, project, scala)); @@ -200,7 +202,7 @@ abstract class AbstractCommandBuilder { // For the user code case, we fall back to looking for the Spark assembly under SPARK_HOME. // That duplicates some of the code in the shell scripts that look for the assembly, though. String assembly = getenv(ENV_SPARK_ASSEMBLY); - if (assembly == null && isEmpty(getenv("SPARK_TESTING"))) { + if (assembly == null && !isTesting) { assembly = findAssembly(); } addToClassPath(cp, assembly); @@ -215,12 +217,14 @@ abstract class AbstractCommandBuilder { libdir = new File(sparkHome, "lib_managed/jars"); } - checkState(libdir.isDirectory(), "Library directory '%s' does not exist.", - libdir.getAbsolutePath()); - for (File jar : libdir.listFiles()) { - if (jar.getName().startsWith("datanucleus-")) { - addToClassPath(cp, jar.getAbsolutePath()); + if (libdir.isDirectory()) { + for (File jar : libdir.listFiles()) { + if (jar.getName().startsWith("datanucleus-")) { + addToClassPath(cp, jar.getAbsolutePath()); + } } + } else { + checkState(isTesting, "Library directory '%s' does not exist.", libdir.getAbsolutePath()); } addToClassPath(cp, getenv("HADOOP_CONF_DIR")); @@ -256,15 +260,15 @@ abstract class AbstractCommandBuilder { return scala; } String sparkHome = getSparkHome(); - File scala210 = new File(sparkHome, "assembly/target/scala-2.10"); - File scala211 = new File(sparkHome, "assembly/target/scala-2.11"); + File scala210 = new File(sparkHome, "launcher/target/scala-2.10"); + File scala211 = new File(sparkHome, "launcher/target/scala-2.11"); checkState(!scala210.isDirectory() || !scala211.isDirectory(), "Presence of build for both scala versions (2.10 and 2.11) detected.\n" + "Either clean one of them or set SPARK_SCALA_VERSION in your environment."); if (scala210.isDirectory()) { return "2.10"; } else { - checkState(scala211.isDirectory(), "Cannot find any assembly build directories."); + checkState(scala211.isDirectory(), "Cannot find any build directories."); return "2.11"; } } @@ -1422,6 +1422,10 @@ <artifactId>libthrift</artifactId> </exclusion> <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </exclusion> @@ -1892,6 +1896,8 @@ launched by the tests have access to the correct test-time classpath. --> <SPARK_DIST_CLASSPATH>${test_classpath}</SPARK_DIST_CLASSPATH> + <SPARK_PREPEND_CLASSES>1</SPARK_PREPEND_CLASSES> + <SPARK_TESTING>1</SPARK_TESTING> <JAVA_HOME>${test.java.home}</JAVA_HOME> </environmentVariables> <systemProperties> @@ -1929,6 +1935,8 @@ launched by the tests have access to the correct test-time classpath. --> <SPARK_DIST_CLASSPATH>${test_classpath}</SPARK_DIST_CLASSPATH> + <SPARK_PREPEND_CLASSES>1</SPARK_PREPEND_CLASSES> + <SPARK_TESTING>1</SPARK_TESTING> <JAVA_HOME>${test.java.home}</JAVA_HOME> </environmentVariables> <systemProperties> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index ea52bfd679..901cfa538d 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -547,6 +547,8 @@ object TestSettings { envVars in Test ++= Map( "SPARK_DIST_CLASSPATH" -> (fullClasspath in Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":"), + "SPARK_PREPEND_CLASSES" -> "1", + "SPARK_TESTING" -> "1", "JAVA_HOME" -> sys.env.get("JAVA_HOME").getOrElse(sys.props("java.home"))), javaOptions in Test += s"-Djava.io.tmpdir=$testTempDir", javaOptions in Test += "-Dspark.test.home=" + sparkHome, diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index b4f8049bff..17c59ff06e 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster import org.scalatest.{BeforeAndAfterAll, Matchers} import org.apache.spark._ +import org.apache.spark.launcher.TestClasspathBuilder import org.apache.spark.util.Utils abstract class BaseYarnClusterSuite @@ -43,6 +44,9 @@ abstract class BaseYarnClusterSuite |log4j.appender.console.target=System.err |log4j.appender.console.layout=org.apache.log4j.PatternLayout |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + |log4j.logger.org.apache.hadoop=WARN + |log4j.logger.org.eclipse.jetty=WARN + |log4j.logger.org.spark-project.jetty=WARN """.stripMargin private var yarnCluster: MiniYARNCluster = _ @@ -51,8 +55,7 @@ abstract class BaseYarnClusterSuite private var hadoopConfDir: File = _ private var logConfDir: File = _ - - def yarnConfig: YarnConfiguration + def newYarnConfig(): YarnConfiguration override def beforeAll() { super.beforeAll() @@ -65,8 +68,14 @@ abstract class BaseYarnClusterSuite val logConfFile = new File(logConfDir, "log4j.properties") Files.write(LOG4J_CONF, logConfFile, UTF_8) + // Disable the disk utilization check to avoid the test hanging when people's disks are + // getting full. + val yarnConf = newYarnConfig() + yarnConf.set("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", + "100.0") + yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1) - yarnCluster.init(yarnConfig) + yarnCluster.init(yarnConf) yarnCluster.start() // There's a race in MiniYARNCluster in which start() may return before the RM has updated @@ -114,19 +123,23 @@ abstract class BaseYarnClusterSuite sparkArgs: Seq[String] = Nil, extraClassPath: Seq[String] = Nil, extraJars: Seq[String] = Nil, - extraConf: Map[String, String] = Map()): Unit = { + extraConf: Map[String, String] = Map(), + extraEnv: Map[String, String] = Map()): Unit = { val master = if (clientMode) "yarn-client" else "yarn-cluster" val props = new Properties() props.setProperty("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath()) - val childClasspath = logConfDir.getAbsolutePath() + - File.pathSeparator + - sys.props("java.class.path") + - File.pathSeparator + - extraClassPath.mkString(File.pathSeparator) - props.setProperty("spark.driver.extraClassPath", childClasspath) - props.setProperty("spark.executor.extraClassPath", childClasspath) + val testClasspath = new TestClasspathBuilder() + .buildClassPath( + logConfDir.getAbsolutePath() + + File.pathSeparator + + extraClassPath.mkString(File.pathSeparator)) + .asScala + .mkString(File.pathSeparator) + + props.setProperty("spark.driver.extraClassPath", testClasspath) + props.setProperty("spark.executor.extraClassPath", testClasspath) // SPARK-4267: make sure java options are propagated correctly. props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"") @@ -168,7 +181,7 @@ abstract class BaseYarnClusterSuite appArgs Utils.executeAndGetOutput(argv, - extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath())) + extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv) } /** diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 5a4ea2ea2f..b5a42fd6af 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -28,7 +28,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.scalatest.Matchers import org.apache.spark._ -import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart, SparkListenerExecutorAdded} +import org.apache.spark.launcher.TestClasspathBuilder +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart, + SparkListenerExecutorAdded} import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.util.Utils @@ -39,7 +41,7 @@ import org.apache.spark.util.Utils */ class YarnClusterSuite extends BaseYarnClusterSuite { - override def yarnConfig: YarnConfiguration = new YarnConfiguration() + override def newYarnConfig(): YarnConfiguration = new YarnConfiguration() private val TEST_PYFILE = """ |import mod1, mod2 @@ -111,6 +113,17 @@ class YarnClusterSuite extends BaseYarnClusterSuite { val primaryPyFile = new File(tempDir, "test.py") Files.write(TEST_PYFILE, primaryPyFile, UTF_8) + // When running tests, let's not assume the user has built the assembly module, which also + // creates the pyspark archive. Instead, let's use PYSPARK_ARCHIVES_PATH to point at the + // needed locations. + val sparkHome = sys.props("spark.test.home"); + val pythonPath = Seq( + s"$sparkHome/python/lib/py4j-0.8.2.1-src.zip", + s"$sparkHome/python") + val extraEnv = Map( + "PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator), + "PYTHONPATH" -> pythonPath.mkString(File.pathSeparator)) + val moduleDir = if (clientMode) { // In client-mode, .py files added with --py-files are not visible in the driver. @@ -130,7 +143,8 @@ class YarnClusterSuite extends BaseYarnClusterSuite { runSpark(clientMode, primaryPyFile.getAbsolutePath(), sparkArgs = Seq("--py-files", pyFiles), - appArgs = Seq(result.getAbsolutePath())) + appArgs = Seq(result.getAbsolutePath()), + extraEnv = extraEnv) checkResult(result) } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index 5e8238822b..8d9c9b3004 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor} */ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { - override def yarnConfig: YarnConfiguration = { + override def newYarnConfig(): YarnConfiguration = { val yarnConfig = new YarnConfiguration() yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle") yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"), diff --git a/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala b/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala new file mode 100644 index 0000000000..da9e8e21a2 --- /dev/null +++ b/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher + +import java.util.{List => JList, Map => JMap} + +/** + * Exposes AbstractCommandBuilder to the YARN tests, so that they can build classpaths the same + * way other cluster managers do. + */ +private[spark] class TestClasspathBuilder extends AbstractCommandBuilder { + + childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, sys.props("spark.test.home")) + + override def buildClassPath(extraCp: String): JList[String] = super.buildClassPath(extraCp) + + /** Not used by the YARN tests. */ + override def buildCommand(env: JMap[String, String]): JList[String] = + throw new UnsupportedOperationException() + +} |