aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-10 23:28:34 -0800
committerJosh Rosen <joshrosen@databricks.com>2016-03-10 23:28:34 -0800
commit6ca990fb366cf68cd9d5afb433725d28f07e51a0 (patch)
tree036d4b8afc9f8f14f95e5f31e92cabcd51850242
parentd18276cb1d82790a402960835e112aebd0c55513 (diff)
downloadspark-6ca990fb366cf68cd9d5afb433725d28f07e51a0.tar.gz
spark-6ca990fb366cf68cd9d5afb433725d28f07e51a0.tar.bz2
spark-6ca990fb366cf68cd9d5afb433725d28f07e51a0.zip
[SPARK-13294][PROJECT INFRA] Remove MiMa's dependency on spark-class / Spark assembly
This patch removes the need to build a full Spark assembly before running the `dev/mima` script. - I modified the `tools` project to remove a direct dependency on Spark, so `sbt/sbt tools/fullClasspath` will now return the classpath for the `GenerateMIMAIgnore` class itself plus its own dependencies. - This required me to delete two classes full of dead code that we don't use anymore - `GenerateMIMAIgnore` now uses [ClassUtil](http://software.clapper.org/classutil/) to find all of the Spark classes rather than our homemade JAR traversal code. The problem in our own code was that it didn't handle folders of classes properly, which is necessary in order to generate excludes with an assembly-free Spark build. - `./dev/mima` no longer runs through `spark-class`, eliminating the need to reason about classpath ordering between `SPARK_CLASSPATH` and the assembly. Author: Josh Rosen <joshrosen@databricks.com> Closes #11178 from JoshRosen/remove-assembly-in-run-tests.
-rwxr-xr-xdev/mima23
-rwxr-xr-xdev/run-tests.py24
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java22
-rw-r--r--project/SparkBuild.scala19
-rw-r--r--tools/pom.xml15
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala53
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala367
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala111
8 files changed, 58 insertions, 576 deletions
diff --git a/dev/mima b/dev/mima
index d5baffc6ef..b7f8d62b7d 100755
--- a/dev/mima
+++ b/dev/mima
@@ -24,24 +24,21 @@ set -e
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
cd "$FWDIR"
-echo -e "q\n" | build/sbt oldDeps/update
+TOOLS_CLASSPATH="$(build/sbt "export tools/fullClasspath" | tail -n1)"
+
rm -f .generated-mima*
generate_mima_ignore() {
- SPARK_JAVA_OPTS="-XX:MaxPermSize=1g -Xmx2g" \
- ./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore
+ java \
+ -XX:MaxPermSize=1g \
+ -Xmx2g \
+ -cp "$TOOLS_CLASSPATH:$1" \
+ org.apache.spark.tools.GenerateMIMAIgnore
}
-# Generate Mima Ignore is called twice, first with latest built jars
-# on the classpath and then again with previous version jars on the classpath.
-# Because of a bug in GenerateMIMAIgnore that when old jars are ahead on classpath
-# it did not process the new classes (which are in assembly jar).
-generate_mima_ignore
-
-export SPARK_CLASSPATH="$(build/sbt "export oldDeps/fullClasspath" | tail -n1)"
-echo "SPARK_CLASSPATH=$SPARK_CLASSPATH"
-
-generate_mima_ignore
+SPARK_PROFILES="-Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
+generate_mima_ignore "$(build/sbt $SPARK_PROFILES "export assembly/fullClasspath" | tail -n1)"
+generate_mima_ignore "$(build/sbt $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)"
echo -e "q\n" | build/sbt mima-report-binary-issues | grep -v -e "info.*Resolving"
ret_val=$?
diff --git a/dev/run-tests.py b/dev/run-tests.py
index 6e45113134..ebeede52c9 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -336,7 +336,6 @@ def build_spark_sbt(hadoop_version):
# Enable all of the profiles for the build:
build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
sbt_goals = ["package",
- "assembly/assembly",
"streaming-kafka-assembly/assembly",
"streaming-flume-assembly/assembly",
"streaming-mqtt-assembly/assembly",
@@ -350,6 +349,16 @@ def build_spark_sbt(hadoop_version):
exec_sbt(profiles_and_goals)
+def build_spark_assembly_sbt(hadoop_version):
+ # Enable all of the profiles for the build:
+ build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
+ sbt_goals = ["assembly/assembly"]
+ profiles_and_goals = build_profiles + sbt_goals
+ print("[info] Building Spark assembly (w/Hive 1.2.1) using SBT with these arguments: ",
+ " ".join(profiles_and_goals))
+ exec_sbt(profiles_and_goals)
+
+
def build_apache_spark(build_tool, hadoop_version):
"""Will build Spark against Hive v1.2.1 given the passed in build tool (either `sbt` or
`maven`). Defaults to using `sbt`."""
@@ -561,11 +570,14 @@ def main():
# spark build
build_apache_spark(build_tool, hadoop_version)
- # TODO Temporarily disable MiMA check for DF-to-DS migration prototyping
- # # backwards compatibility checks
- # if build_tool == "sbt":
- # # Note: compatiblity tests only supported in sbt for now
- # detect_binary_inop_with_mima()
+ # backwards compatibility checks
+ if build_tool == "sbt":
+ # Note: compatibility tests only supported in sbt for now
+ # TODO Temporarily disable MiMA check for DF-to-DS migration prototyping
+ # detect_binary_inop_with_mima()
+ # Since we did not build assembly/assembly before running dev/mima, we need to
+ # do it here because the tests still rely on it; see SPARK-13294 for details.
+ build_spark_assembly_sbt(hadoop_version)
# run the test suites
run_scala_tests(build_tool, hadoop_version, test_modules, excluded_tags)
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
index 40187236f2..6b9d36cc0b 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -17,12 +17,10 @@
package org.apache.spark.launcher;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.regex.Pattern;
import static org.apache.spark.launcher.CommandBuilderUtils.*;
@@ -76,26 +74,6 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder {
javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
memKey = "SPARK_DAEMON_MEMORY";
- } else if (className.startsWith("org.apache.spark.tools.")) {
- String sparkHome = getSparkHome();
- File toolsDir = new File(join(File.separator, sparkHome, "tools", "target",
- "scala-" + getScalaVersion()));
- checkState(toolsDir.isDirectory(), "Cannot find tools build directory.");
-
- Pattern re = Pattern.compile("spark-tools_.*\\.jar");
- for (File f : toolsDir.listFiles()) {
- if (re.matcher(f.getName()).matches()) {
- extraClassPath = f.getAbsolutePath();
- break;
- }
- }
-
- checkState(extraClassPath != null,
- "Failed to find Spark Tools Jar in %s.\n" +
- "You need to run \"build/sbt tools/package\" before running %s.",
- toolsDir.getAbsolutePath(), className);
-
- javaOptsKeys.add("SPARK_JAVA_OPTS");
} else {
javaOptsKeys.add("SPARK_JAVA_OPTS");
memKey = "SPARK_DRIVER_MEMORY";
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index a380c4cca2..e74fb17472 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -384,18 +384,19 @@ object OldDeps {
lazy val project = Project("oldDeps", file("dev"), settings = oldDepsSettings)
- def versionArtifact(id: String): Option[sbt.ModuleID] = {
- val fullId = id + "_2.11"
- Some("org.apache.spark" % fullId % "1.2.0")
- }
-
def oldDepsSettings() = Defaults.coreDefaultSettings ++ Seq(
name := "old-deps",
scalaVersion := "2.10.5",
- libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq",
- "spark-streaming-flume", "spark-streaming-twitter",
- "spark-streaming", "spark-mllib", "spark-graphx",
- "spark-core").map(versionArtifact(_).get intransitive())
+ libraryDependencies := Seq(
+ "spark-streaming-mqtt",
+ "spark-streaming-zeromq",
+ "spark-streaming-flume",
+ "spark-streaming-twitter",
+ "spark-streaming",
+ "spark-mllib",
+ "spark-graphx",
+ "spark-core"
+ ).map(id => "org.apache.spark" % (id + "_2.11") % "1.2.0")
)
}
diff --git a/tools/pom.xml b/tools/pom.xml
index b3a5ae2771..9bb20e1381 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -35,16 +35,6 @@
<dependencies>
<dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
</dependency>
@@ -52,6 +42,11 @@
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.clapper</groupId>
+ <artifactId>classutil_${scala.binary.version}</artifactId>
+ <version>1.0.6</version>
+ </dependency>
</dependencies>
<build>
diff --git a/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala b/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
index a947fac1d7..738bd2150a 100644
--- a/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
@@ -18,15 +18,13 @@
// scalastyle:off classforname
package org.apache.spark.tools
-import java.io.File
-import java.util.jar.JarFile
-
import scala.collection.mutable
-import scala.collection.JavaConverters._
import scala.reflect.runtime.{universe => unv}
import scala.reflect.runtime.universe.runtimeMirror
import scala.util.Try
+import org.clapper.classutil.ClassFinder
+
/**
* A tool for generating classes to be excluded during binary checking with MIMA. It is expected
* that this tool is run with ./spark-class.
@@ -42,12 +40,13 @@ object GenerateMIMAIgnore {
private val classLoader = Thread.currentThread().getContextClassLoader
private val mirror = runtimeMirror(classLoader)
+ private def isDeveloperApi(sym: unv.Symbol) = sym.annotations.exists {
+ _.tpe =:= mirror.staticClass("org.apache.spark.annotation.DeveloperApi").toType
+ }
- private def isDeveloperApi(sym: unv.Symbol) =
- sym.annotations.exists(_.tpe =:= unv.typeOf[org.apache.spark.annotation.DeveloperApi])
-
- private def isExperimental(sym: unv.Symbol) =
- sym.annotations.exists(_.tpe =:= unv.typeOf[org.apache.spark.annotation.Experimental])
+ private def isExperimental(sym: unv.Symbol) = sym.annotations.exists {
+ _.tpe =:= mirror.staticClass("org.apache.spark.annotation.Experimental").toType
+ }
private def isPackagePrivate(sym: unv.Symbol) =
@@ -160,35 +159,13 @@ object GenerateMIMAIgnore {
* and subpackages both from directories and jars present on the classpath.
*/
private def getClasses(packageName: String): Set[String] = {
- val path = packageName.replace('.', '/')
- val resources = classLoader.getResources(path)
-
- val jars = resources.asScala.filter(_.getProtocol == "jar")
- .map(_.getFile.split(":")(1).split("!")(0)).toSeq
-
- jars.flatMap(getClassesFromJar(_, path))
- .map(_.getName)
- .filterNot(shouldExclude).toSet
- }
-
- /**
- * Get all classes in a package from a jar file.
- */
- private def getClassesFromJar(jarPath: String, packageName: String) = {
- import scala.collection.mutable
- val jar = new JarFile(new File(jarPath))
- val enums = jar.entries().asScala.map(_.getName).filter(_.startsWith(packageName))
- val classes = mutable.HashSet[Class[_]]()
- for (entry <- enums if entry.endsWith(".class")) {
- try {
- classes += Class.forName(entry.replace('/', '.').stripSuffix(".class"), false, classLoader)
- } catch {
- // scalastyle:off println
- case _: Throwable => println("Unable to load:" + entry)
- // scalastyle:on println
- }
- }
- classes
+ val finder = ClassFinder()
+ finder
+ .getClasses
+ .map(_.name)
+ .filter(_.startsWith(packageName))
+ .filterNot(shouldExclude)
+ .toSet
}
}
// scalastyle:on classforname
diff --git a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
deleted file mode 100644
index ccd8fd3969..0000000000
--- a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.tools
-
-import java.lang.reflect.{Method, Type}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.language.existentials
-
-import org.apache.spark._
-import org.apache.spark.api.java._
-import org.apache.spark.rdd.{DoubleRDDFunctions, OrderedRDDFunctions, PairRDDFunctions, RDD}
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaPairDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.{DStream, PairDStreamFunctions}
-
-
-private[spark] abstract class SparkType(val name: String)
-
-private[spark] case class BaseType(override val name: String) extends SparkType(name) {
- override def toString: String = {
- name
- }
-}
-
-private[spark]
-case class ParameterizedType(override val name: String,
- parameters: Seq[SparkType],
- typebounds: String = "") extends SparkType(name) {
- override def toString: String = {
- if (typebounds != "") {
- typebounds + " " + name + "<" + parameters.mkString(", ") + ">"
- } else {
- name + "<" + parameters.mkString(", ") + ">"
- }
- }
-}
-
-private[spark]
-case class SparkMethod(name: String, returnType: SparkType, parameters: Seq[SparkType]) {
- override def toString: String = {
- returnType + " " + name + "(" + parameters.mkString(", ") + ")"
- }
-}
-
-/**
- * A tool for identifying methods that need to be ported from Scala to the Java API.
- *
- * It uses reflection to find methods in the Scala API and rewrites those methods' signatures
- * into appropriate Java equivalents. If those equivalent methods have not been implemented in
- * the Java API, they are printed.
- */
-object JavaAPICompletenessChecker {
-
- private def parseType(typeStr: String): SparkType = {
- if (!typeStr.contains("<")) {
- // Base types might begin with "class" or "interface", so we have to strip that off:
- BaseType(typeStr.trim.split(" ").last)
- } else if (typeStr.endsWith("[]")) {
- ParameterizedType("Array", Seq(parseType(typeStr.stripSuffix("[]"))))
- } else {
- val parts = typeStr.split("<", 2)
- val name = parts(0).trim
- assert (parts(1).last == '>')
- val parameters = parts(1).dropRight(1)
- ParameterizedType(name, parseTypeList(parameters))
- }
- }
-
- private def parseTypeList(typeStr: String): Seq[SparkType] = {
- val types: ArrayBuffer[SparkType] = new ArrayBuffer[SparkType]
- var stack = 0
- var token: StringBuffer = new StringBuffer()
- for (c <- typeStr.trim) {
- if (c == ',' && stack == 0) {
- types += parseType(token.toString)
- token = new StringBuffer()
- } else if (c == ' ' && stack != 0) {
- // continue
- } else {
- if (c == '<') {
- stack += 1
- } else if (c == '>') {
- stack -= 1
- }
- token.append(c)
- }
- }
- assert (stack == 0)
- if (token.toString != "") {
- types += parseType(token.toString)
- }
- types.toSeq
- }
-
- private def parseReturnType(typeStr: String): SparkType = {
- if (typeStr(0) == '<') {
- val parts = typeStr.drop(0).split(">", 2)
- val parsed = parseType(parts(1)).asInstanceOf[ParameterizedType]
- ParameterizedType(parsed.name, parsed.parameters, parts(0))
- } else {
- parseType(typeStr)
- }
- }
-
- private def toSparkMethod(method: Method): SparkMethod = {
- val returnType = parseReturnType(method.getGenericReturnType.toString)
- val name = method.getName
- val parameters = method.getGenericParameterTypes.map(t => parseType(t.toString))
- SparkMethod(name, returnType, parameters)
- }
-
- private def toJavaType(scalaType: SparkType, isReturnType: Boolean): SparkType = {
- val renameSubstitutions = Map(
- "scala.collection.Map" -> "java.util.Map",
- // TODO: the JavaStreamingContext API accepts Array arguments
- // instead of Lists, so this isn't a trivial translation / sub:
- "scala.collection.Seq" -> "java.util.List",
- "scala.Function2" -> "org.apache.spark.api.java.function.Function2",
- "scala.collection.Iterator" -> "java.util.Iterator",
- "scala.collection.mutable.Queue" -> "java.util.Queue",
- "double" -> "java.lang.Double"
- )
- // Keep applying the substitutions until we've reached a fixedpoint.
- def applySubs(scalaType: SparkType): SparkType = {
- scalaType match {
- case ParameterizedType(name, parameters, typebounds) =>
- name match {
- case "org.apache.spark.rdd.RDD" =>
- if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
- val tupleParams =
- parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
- ParameterizedType(classOf[JavaPairRDD[_, _]].getName, tupleParams)
- } else {
- ParameterizedType(classOf[JavaRDD[_]].getName, parameters.map(applySubs))
- }
- case "org.apache.spark.streaming.dstream.DStream" =>
- if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
- val tupleParams =
- parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
- ParameterizedType("org.apache.spark.streaming.api.java.JavaPairDStream",
- tupleParams)
- } else {
- ParameterizedType("org.apache.spark.streaming.api.java.JavaDStream",
- parameters.map(applySubs))
- }
- case "scala.Option" => {
- if (isReturnType) {
- ParameterizedType("org.apache.spark.api.java.Optional", parameters.map(applySubs))
- } else {
- applySubs(parameters(0))
- }
- }
- case "scala.Function1" =>
- val firstParamName = parameters.last.name
- if (firstParamName.startsWith("scala.collection.Traversable") ||
- firstParamName.startsWith("scala.collection.Iterator")) {
- ParameterizedType("org.apache.spark.api.java.function.FlatMapFunction",
- Seq(parameters(0),
- parameters.last.asInstanceOf[ParameterizedType].parameters(0)).map(applySubs))
- } else if (firstParamName == "scala.runtime.BoxedUnit") {
- ParameterizedType("org.apache.spark.api.java.function.VoidFunction",
- parameters.dropRight(1).map(applySubs))
- } else {
- ParameterizedType("org.apache.spark.api.java.function.Function",
- parameters.map(applySubs))
- }
- case _ =>
- ParameterizedType(renameSubstitutions.getOrElse(name, name),
- parameters.map(applySubs))
- }
- case BaseType(name) =>
- if (renameSubstitutions.contains(name)) {
- BaseType(renameSubstitutions(name))
- } else {
- scalaType
- }
- }
- }
- var oldType = scalaType
- var newType = applySubs(scalaType)
- while (oldType != newType) {
- oldType = newType
- newType = applySubs(scalaType)
- }
- newType
- }
-
- private def toJavaMethod(method: SparkMethod): SparkMethod = {
- val params = method.parameters
- .filterNot(_.name == "scala.reflect.ClassTag")
- .map(toJavaType(_, isReturnType = false))
- SparkMethod(method.name, toJavaType(method.returnType, isReturnType = true), params)
- }
-
- private def isExcludedByName(method: Method): Boolean = {
- val name = method.getDeclaringClass.getName + "." + method.getName
- // Scala methods that are declared as private[mypackage] become public in the resulting
- // Java bytecode. As a result, we need to manually exclude those methods here.
- // This list also includes a few methods that are only used by the web UI or other
- // internal Spark components.
- val excludedNames = Seq(
- "org.apache.spark.rdd.RDD.origin",
- "org.apache.spark.rdd.RDD.elementClassTag",
- "org.apache.spark.rdd.RDD.checkpointData",
- "org.apache.spark.rdd.RDD.partitioner",
- "org.apache.spark.rdd.RDD.partitions",
- "org.apache.spark.rdd.RDD.firstParent",
- "org.apache.spark.rdd.RDD.doCheckpoint",
- "org.apache.spark.rdd.RDD.markCheckpointed",
- "org.apache.spark.rdd.RDD.clearDependencies",
- "org.apache.spark.rdd.RDD.getDependencies",
- "org.apache.spark.rdd.RDD.getPartitions",
- "org.apache.spark.rdd.RDD.dependencies",
- "org.apache.spark.rdd.RDD.getPreferredLocations",
- "org.apache.spark.rdd.RDD.collectPartitions",
- "org.apache.spark.rdd.RDD.computeOrReadCheckpoint",
- "org.apache.spark.rdd.PairRDDFunctions.getKeyClass",
- "org.apache.spark.rdd.PairRDDFunctions.getValueClass",
- "org.apache.spark.SparkContext.stringToText",
- "org.apache.spark.SparkContext.makeRDD",
- "org.apache.spark.SparkContext.runJob",
- "org.apache.spark.SparkContext.runApproximateJob",
- "org.apache.spark.SparkContext.clean",
- "org.apache.spark.SparkContext.metadataCleaner",
- "org.apache.spark.SparkContext.ui",
- "org.apache.spark.SparkContext.newShuffleId",
- "org.apache.spark.SparkContext.newRddId",
- "org.apache.spark.SparkContext.cleanup",
- "org.apache.spark.SparkContext.receiverJobThread",
- "org.apache.spark.SparkContext.getRDDStorageInfo",
- "org.apache.spark.SparkContext.addedFiles",
- "org.apache.spark.SparkContext.addedJars",
- "org.apache.spark.SparkContext.persistentRdds",
- "org.apache.spark.SparkContext.executorEnvs",
- "org.apache.spark.SparkContext.checkpointDir",
- "org.apache.spark.SparkContext.getSparkHome",
- "org.apache.spark.SparkContext.executorMemoryRequested",
- "org.apache.spark.SparkContext.getExecutorStorageStatus",
- "org.apache.spark.streaming.dstream.DStream.generatedRDDs",
- "org.apache.spark.streaming.dstream.DStream.zeroTime",
- "org.apache.spark.streaming.dstream.DStream.rememberDuration",
- "org.apache.spark.streaming.dstream.DStream.storageLevel",
- "org.apache.spark.streaming.dstream.DStream.mustCheckpoint",
- "org.apache.spark.streaming.dstream.DStream.checkpointDuration",
- "org.apache.spark.streaming.dstream.DStream.checkpointData",
- "org.apache.spark.streaming.dstream.DStream.graph",
- "org.apache.spark.streaming.dstream.DStream.isInitialized",
- "org.apache.spark.streaming.dstream.DStream.parentRememberDuration",
- "org.apache.spark.streaming.dstream.DStream.initialize",
- "org.apache.spark.streaming.dstream.DStream.validate",
- "org.apache.spark.streaming.dstream.DStream.setContext",
- "org.apache.spark.streaming.dstream.DStream.setGraph",
- "org.apache.spark.streaming.dstream.DStream.remember",
- "org.apache.spark.streaming.dstream.DStream.getOrCompute",
- "org.apache.spark.streaming.dstream.DStream.generateJob",
- "org.apache.spark.streaming.dstream.DStream.clearOldMetadata",
- "org.apache.spark.streaming.dstream.DStream.addMetadata",
- "org.apache.spark.streaming.dstream.DStream.updateCheckpointData",
- "org.apache.spark.streaming.dstream.DStream.restoreCheckpointData",
- "org.apache.spark.streaming.dstream.DStream.isTimeValid",
- "org.apache.spark.streaming.StreamingContext.nextNetworkInputStreamId",
- "org.apache.spark.streaming.StreamingContext.checkpointDir",
- "org.apache.spark.streaming.StreamingContext.checkpointDuration",
- "org.apache.spark.streaming.StreamingContext.receiverJobThread",
- "org.apache.spark.streaming.StreamingContext.scheduler",
- "org.apache.spark.streaming.StreamingContext.initialCheckpoint",
- "org.apache.spark.streaming.StreamingContext.getNewNetworkStreamId",
- "org.apache.spark.streaming.StreamingContext.validate",
- "org.apache.spark.streaming.StreamingContext.createNewSparkContext",
- "org.apache.spark.streaming.StreamingContext.rddToFileName",
- "org.apache.spark.streaming.StreamingContext.getSparkCheckpointDir",
- "org.apache.spark.streaming.StreamingContext.env",
- "org.apache.spark.streaming.StreamingContext.graph",
- "org.apache.spark.streaming.StreamingContext.isCheckpointPresent"
- )
- val excludedPatterns = Seq(
- """^org\.apache\.spark\.SparkContext\..*To.*Functions""",
- """^org\.apache\.spark\.SparkContext\..*WritableConverter""",
- """^org\.apache\.spark\.SparkContext\..*To.*Writable"""
- ).map(_.r)
- lazy val excludedByPattern =
- !excludedPatterns.map(_.findFirstIn(name)).filter(_.isDefined).isEmpty
- name.contains("$") || excludedNames.contains(name) || excludedByPattern
- }
-
- private def isExcludedByInterface(method: Method): Boolean = {
- val excludedInterfaces =
- Set("org.apache.spark.Logging", "org.apache.hadoop.mapreduce.HadoopMapReduceUtil")
- def toComparisionKey(method: Method): (Class[_], String, Type) =
- (method.getReturnType, method.getName, method.getGenericReturnType)
- val interfaces = method.getDeclaringClass.getInterfaces.filter { i =>
- excludedInterfaces.contains(i.getName)
- }
- val excludedMethods = interfaces.flatMap(_.getMethods.map(toComparisionKey))
- excludedMethods.contains(toComparisionKey(method))
- }
-
- private def printMissingMethods(scalaClass: Class[_], javaClass: Class[_]) {
- val methods = scalaClass.getMethods
- .filterNot(_.isAccessible)
- .filterNot(isExcludedByName)
- .filterNot(isExcludedByInterface)
- val javaEquivalents = methods.map(m => toJavaMethod(toSparkMethod(m))).toSet
-
- val javaMethods = javaClass.getMethods.map(toSparkMethod).toSet
-
- val missingMethods = javaEquivalents -- javaMethods
-
- for (method <- missingMethods) {
- // scalastyle:off println
- println(method)
- // scalastyle:on println
- }
- }
-
- def main(args: Array[String]) {
- // scalastyle:off println
- println("Missing RDD methods")
- printMissingMethods(classOf[RDD[_]], classOf[JavaRDD[_]])
- println()
-
- println("Missing PairRDD methods")
- printMissingMethods(classOf[PairRDDFunctions[_, _]], classOf[JavaPairRDD[_, _]])
- println()
-
- println("Missing DoubleRDD methods")
- printMissingMethods(classOf[DoubleRDDFunctions], classOf[JavaDoubleRDD])
- println()
-
- println("Missing OrderedRDD methods")
- printMissingMethods(classOf[OrderedRDDFunctions[_, _, _]], classOf[JavaPairRDD[_, _]])
- println()
-
- println("Missing SparkContext methods")
- printMissingMethods(classOf[SparkContext], classOf[JavaSparkContext])
- println()
-
- println("Missing StreamingContext methods")
- printMissingMethods(classOf[StreamingContext], classOf[JavaStreamingContext])
- println()
-
- println("Missing DStream methods")
- printMissingMethods(classOf[DStream[_]], classOf[JavaDStream[_]])
- println()
-
- println("Missing PairDStream methods")
- printMissingMethods(classOf[PairDStreamFunctions[_, _]], classOf[JavaPairDStream[_, _]])
- println()
- // scalastyle:on println
- }
-}
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
deleted file mode 100644
index 8a5c7c0e73..0000000000
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.tools
-
-import java.util.concurrent.{CountDownLatch, Executors}
-import java.util.concurrent.atomic.AtomicLong
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.executor.ShuffleWriteMetrics
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.shuffle.hash.HashShuffleManager
-import org.apache.spark.util.Utils
-
-/**
- * Internal utility for micro-benchmarking shuffle write performance.
- *
- * Writes simulated shuffle output from several threads and records the observed throughput.
- */
-object StoragePerfTester {
- def main(args: Array[String]): Unit = {
- /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */
- val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g"))
-
- /** Number of map tasks. All tasks execute concurrently. */
- val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8)
-
- /** Number of reduce splits for each map task. */
- val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500)
-
- val recordLength = 1000 // ~1KB records
- val totalRecords = dataSizeMb * 1000
- val recordsPerMap = totalRecords / numMaps
-
- val writeKey = "1" * (recordLength / 2)
- val writeValue = "1" * (recordLength / 2)
- val executor = Executors.newFixedThreadPool(numMaps)
-
- val conf = new SparkConf()
- .set("spark.shuffle.compress", "false")
- .set("spark.shuffle.sync", "true")
- .set("spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
-
- // This is only used to instantiate a BlockManager. All thread scheduling is done manually.
- val sc = new SparkContext("local[4]", "Write Tester", conf)
- val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager]
-
- def writeOutputBytes(mapId: Int, total: AtomicLong): Unit = {
- val shuffle = hashShuffleManager.shuffleBlockResolver.forMapTask(1, mapId, numOutputSplits,
- new KryoSerializer(sc.conf), new ShuffleWriteMetrics())
- val writers = shuffle.writers
- for (i <- 1 to recordsPerMap) {
- writers(i % numOutputSplits).write(writeKey, writeValue)
- }
- writers.map { w =>
- w.commitAndClose()
- total.addAndGet(w.fileSegment().length)
- }
-
- shuffle.releaseWriters(true)
- }
-
- val start = System.currentTimeMillis()
- val latch = new CountDownLatch(numMaps)
- val totalBytes = new AtomicLong()
- for (task <- 1 to numMaps) {
- executor.submit(new Runnable() {
- override def run(): Unit = {
- try {
- writeOutputBytes(task, totalBytes)
- latch.countDown()
- } catch {
- case e: Exception =>
- // scalastyle:off println
- println("Exception in child thread: " + e + " " + e.getMessage)
- // scalastyle:on println
- System.exit(1)
- }
- }
- })
- }
- latch.await()
- val end = System.currentTimeMillis()
- val time = (end - start) / 1000.0
- val bytesPerSecond = totalBytes.get() / time
- val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong
-
- // scalastyle:off println
- System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
- System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile)))
- System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong)))
- // scalastyle:on println
-
- executor.shutdown()
- sc.stop()
- }
-}