aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xbin/spark-class16
-rw-r--r--core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java (renamed from launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java)0
-rw-r--r--core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala10
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java28
-rw-r--r--pom.xml8
-rw-r--r--project/SparkBuild.scala2
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala37
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala20
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala2
-rw-r--r--yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala36
10 files changed, 122 insertions, 37 deletions
diff --git a/bin/spark-class b/bin/spark-class
index 2b59e5df57..e38e08dec4 100755
--- a/bin/spark-class
+++ b/bin/spark-class
@@ -43,17 +43,19 @@ else
fi
num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)"
-if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" ]; then
+if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" -a "$SPARK_PREPEND_CLASSES" != "1" ]; then
echo "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2
echo "You need to build Spark before running this program." 1>&2
exit 1
fi
-ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"
-if [ "$num_jars" -gt "1" ]; then
- echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2
- echo "$ASSEMBLY_JARS" 1>&2
- echo "Please remove all but one jar." 1>&2
- exit 1
+if [ -d "$ASSEMBLY_DIR" ]; then
+ ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"
+ if [ "$num_jars" -gt "1" ]; then
+ echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2
+ echo "$ASSEMBLY_JARS" 1>&2
+ echo "Please remove all but one jar." 1>&2
+ exit 1
+ fi
fi
SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index d0c26dd056..d0c26dd056 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 48e74f06f7..fb7a8ae3f9 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -310,8 +310,14 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
val _sc =
new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", broadcastConf)
// Wait until all salves are up
- _sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000)
- _sc
+ try {
+ _sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000)
+ _sc
+ } catch {
+ case e: Throwable =>
+ _sc.stop()
+ throw e
+ }
} else {
new SparkContext("local", "test", broadcastConf)
}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
index 5e793a5c48..0a237ee73b 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -169,9 +169,11 @@ abstract class AbstractCommandBuilder {
"streaming", "tools", "sql/catalyst", "sql/core", "sql/hive", "sql/hive-thriftserver",
"yarn", "launcher");
if (prependClasses) {
- System.err.println(
- "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of " +
- "assembly.");
+ if (!isTesting) {
+ System.err.println(
+ "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of " +
+ "assembly.");
+ }
for (String project : projects) {
addToClassPath(cp, String.format("%s/%s/target/scala-%s/classes", sparkHome, project,
scala));
@@ -200,7 +202,7 @@ abstract class AbstractCommandBuilder {
// For the user code case, we fall back to looking for the Spark assembly under SPARK_HOME.
// That duplicates some of the code in the shell scripts that look for the assembly, though.
String assembly = getenv(ENV_SPARK_ASSEMBLY);
- if (assembly == null && isEmpty(getenv("SPARK_TESTING"))) {
+ if (assembly == null && !isTesting) {
assembly = findAssembly();
}
addToClassPath(cp, assembly);
@@ -215,12 +217,14 @@ abstract class AbstractCommandBuilder {
libdir = new File(sparkHome, "lib_managed/jars");
}
- checkState(libdir.isDirectory(), "Library directory '%s' does not exist.",
- libdir.getAbsolutePath());
- for (File jar : libdir.listFiles()) {
- if (jar.getName().startsWith("datanucleus-")) {
- addToClassPath(cp, jar.getAbsolutePath());
+ if (libdir.isDirectory()) {
+ for (File jar : libdir.listFiles()) {
+ if (jar.getName().startsWith("datanucleus-")) {
+ addToClassPath(cp, jar.getAbsolutePath());
+ }
}
+ } else {
+ checkState(isTesting, "Library directory '%s' does not exist.", libdir.getAbsolutePath());
}
addToClassPath(cp, getenv("HADOOP_CONF_DIR"));
@@ -256,15 +260,15 @@ abstract class AbstractCommandBuilder {
return scala;
}
String sparkHome = getSparkHome();
- File scala210 = new File(sparkHome, "assembly/target/scala-2.10");
- File scala211 = new File(sparkHome, "assembly/target/scala-2.11");
+ File scala210 = new File(sparkHome, "launcher/target/scala-2.10");
+ File scala211 = new File(sparkHome, "launcher/target/scala-2.11");
checkState(!scala210.isDirectory() || !scala211.isDirectory(),
"Presence of build for both scala versions (2.10 and 2.11) detected.\n" +
"Either clean one of them or set SPARK_SCALA_VERSION in your environment.");
if (scala210.isDirectory()) {
return "2.10";
} else {
- checkState(scala211.isDirectory(), "Cannot find any assembly build directories.");
+ checkState(scala211.isDirectory(), "Cannot find any build directories.");
return "2.11";
}
}
diff --git a/pom.xml b/pom.xml
index 0716016523..88ebceca76 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1422,6 +1422,10 @@
<artifactId>libthrift</artifactId>
</exclusion>
<exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
@@ -1892,6 +1896,8 @@
launched by the tests have access to the correct test-time classpath.
-->
<SPARK_DIST_CLASSPATH>${test_classpath}</SPARK_DIST_CLASSPATH>
+ <SPARK_PREPEND_CLASSES>1</SPARK_PREPEND_CLASSES>
+ <SPARK_TESTING>1</SPARK_TESTING>
<JAVA_HOME>${test.java.home}</JAVA_HOME>
</environmentVariables>
<systemProperties>
@@ -1929,6 +1935,8 @@
launched by the tests have access to the correct test-time classpath.
-->
<SPARK_DIST_CLASSPATH>${test_classpath}</SPARK_DIST_CLASSPATH>
+ <SPARK_PREPEND_CLASSES>1</SPARK_PREPEND_CLASSES>
+ <SPARK_TESTING>1</SPARK_TESTING>
<JAVA_HOME>${test.java.home}</JAVA_HOME>
</environmentVariables>
<systemProperties>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index ea52bfd679..901cfa538d 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -547,6 +547,8 @@ object TestSettings {
envVars in Test ++= Map(
"SPARK_DIST_CLASSPATH" ->
(fullClasspath in Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":"),
+ "SPARK_PREPEND_CLASSES" -> "1",
+ "SPARK_TESTING" -> "1",
"JAVA_HOME" -> sys.env.get("JAVA_HOME").getOrElse(sys.props("java.home"))),
javaOptions in Test += s"-Djava.io.tmpdir=$testTempDir",
javaOptions in Test += "-Dspark.test.home=" + sparkHome,
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index b4f8049bff..17c59ff06e 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.scalatest.{BeforeAndAfterAll, Matchers}
import org.apache.spark._
+import org.apache.spark.launcher.TestClasspathBuilder
import org.apache.spark.util.Utils
abstract class BaseYarnClusterSuite
@@ -43,6 +44,9 @@ abstract class BaseYarnClusterSuite
|log4j.appender.console.target=System.err
|log4j.appender.console.layout=org.apache.log4j.PatternLayout
|log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+ |log4j.logger.org.apache.hadoop=WARN
+ |log4j.logger.org.eclipse.jetty=WARN
+ |log4j.logger.org.spark-project.jetty=WARN
""".stripMargin
private var yarnCluster: MiniYARNCluster = _
@@ -51,8 +55,7 @@ abstract class BaseYarnClusterSuite
private var hadoopConfDir: File = _
private var logConfDir: File = _
-
- def yarnConfig: YarnConfiguration
+ def newYarnConfig(): YarnConfiguration
override def beforeAll() {
super.beforeAll()
@@ -65,8 +68,14 @@ abstract class BaseYarnClusterSuite
val logConfFile = new File(logConfDir, "log4j.properties")
Files.write(LOG4J_CONF, logConfFile, UTF_8)
+ // Disable the disk utilization check to avoid the test hanging when people's disks are
+ // getting full.
+ val yarnConf = newYarnConfig()
+ yarnConf.set("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
+ "100.0")
+
yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
- yarnCluster.init(yarnConfig)
+ yarnCluster.init(yarnConf)
yarnCluster.start()
// There's a race in MiniYARNCluster in which start() may return before the RM has updated
@@ -114,19 +123,23 @@ abstract class BaseYarnClusterSuite
sparkArgs: Seq[String] = Nil,
extraClassPath: Seq[String] = Nil,
extraJars: Seq[String] = Nil,
- extraConf: Map[String, String] = Map()): Unit = {
+ extraConf: Map[String, String] = Map(),
+ extraEnv: Map[String, String] = Map()): Unit = {
val master = if (clientMode) "yarn-client" else "yarn-cluster"
val props = new Properties()
props.setProperty("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath())
- val childClasspath = logConfDir.getAbsolutePath() +
- File.pathSeparator +
- sys.props("java.class.path") +
- File.pathSeparator +
- extraClassPath.mkString(File.pathSeparator)
- props.setProperty("spark.driver.extraClassPath", childClasspath)
- props.setProperty("spark.executor.extraClassPath", childClasspath)
+ val testClasspath = new TestClasspathBuilder()
+ .buildClassPath(
+ logConfDir.getAbsolutePath() +
+ File.pathSeparator +
+ extraClassPath.mkString(File.pathSeparator))
+ .asScala
+ .mkString(File.pathSeparator)
+
+ props.setProperty("spark.driver.extraClassPath", testClasspath)
+ props.setProperty("spark.executor.extraClassPath", testClasspath)
// SPARK-4267: make sure java options are propagated correctly.
props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"")
@@ -168,7 +181,7 @@ abstract class BaseYarnClusterSuite
appArgs
Utils.executeAndGetOutput(argv,
- extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()))
+ extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv)
}
/**
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 5a4ea2ea2f..b5a42fd6af 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -28,7 +28,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.Matchers
import org.apache.spark._
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart, SparkListenerExecutorAdded}
+import org.apache.spark.launcher.TestClasspathBuilder
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart,
+ SparkListenerExecutorAdded}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils
@@ -39,7 +41,7 @@ import org.apache.spark.util.Utils
*/
class YarnClusterSuite extends BaseYarnClusterSuite {
- override def yarnConfig: YarnConfiguration = new YarnConfiguration()
+ override def newYarnConfig(): YarnConfiguration = new YarnConfiguration()
private val TEST_PYFILE = """
|import mod1, mod2
@@ -111,6 +113,17 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
val primaryPyFile = new File(tempDir, "test.py")
Files.write(TEST_PYFILE, primaryPyFile, UTF_8)
+ // When running tests, let's not assume the user has built the assembly module, which also
+ // creates the pyspark archive. Instead, let's use PYSPARK_ARCHIVES_PATH to point at the
+ // needed locations.
+ val sparkHome = sys.props("spark.test.home");
+ val pythonPath = Seq(
+ s"$sparkHome/python/lib/py4j-0.8.2.1-src.zip",
+ s"$sparkHome/python")
+ val extraEnv = Map(
+ "PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),
+ "PYTHONPATH" -> pythonPath.mkString(File.pathSeparator))
+
val moduleDir =
if (clientMode) {
// In client-mode, .py files added with --py-files are not visible in the driver.
@@ -130,7 +143,8 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
runSpark(clientMode, primaryPyFile.getAbsolutePath(),
sparkArgs = Seq("--py-files", pyFiles),
- appArgs = Seq(result.getAbsolutePath()))
+ appArgs = Seq(result.getAbsolutePath()),
+ extraEnv = extraEnv)
checkResult(result)
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
index 5e8238822b..8d9c9b3004 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor}
*/
class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
- override def yarnConfig: YarnConfiguration = {
+ override def newYarnConfig(): YarnConfiguration = {
val yarnConfig = new YarnConfiguration()
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
diff --git a/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala b/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala
new file mode 100644
index 0000000000..da9e8e21a2
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher
+
+import java.util.{List => JList, Map => JMap}
+
+/**
+ * Exposes AbstractCommandBuilder to the YARN tests, so that they can build classpaths the same
+ * way other cluster managers do.
+ */
+private[spark] class TestClasspathBuilder extends AbstractCommandBuilder {
+
+ childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, sys.props("spark.test.home"))
+
+ override def buildClassPath(extraCp: String): JList[String] = super.buildClassPath(extraCp)
+
+ /** Not used by the YARN tests. */
+ override def buildCommand(env: JMap[String, String]): JList[String] =
+ throw new UnsupportedOperationException()
+
+}