aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-11-11 11:16:39 -0800
committerMichael Armbrust <michael@databricks.com>2015-11-11 11:16:39 -0800
commit529a1d3380c4c23fed068ad05a6376162c4b76d6 (patch)
tree3b18b3ab42840c8893b4e5ce9c2fb8d99f2264cf
parente71ba56586ba64da18f412bc0e0263777c46ac4a (diff)
downloadspark-529a1d3380c4c23fed068ad05a6376162c4b76d6.tar.gz
spark-529a1d3380c4c23fed068ad05a6376162c4b76d6.tar.bz2
spark-529a1d3380c4c23fed068ad05a6376162c4b76d6.zip
[SPARK-6152] Use shaded ASM5 to support closure cleaning of Java 8 compiled classes
This patch modifies Spark's closure cleaner (and a few other places) to use ASM 5, which is necessary in order to support cleaning of closures that were compiled by Java 8. In order to avoid ASM dependency conflicts, Spark excludes ASM from all of its dependencies and uses a shaded version of ASM 4 that comes from `reflectasm` (see [SPARK-782](https://issues.apache.org/jira/browse/SPARK-782) and #232). This patch updates Spark to use a shaded version of ASM 5.0.4 that was published by the Apache XBean project; the POM used to create the shaded artifact can be found at https://github.com/apache/geronimo-xbean/blob/xbean-4.4/xbean-asm5-shaded/pom.xml. http://movingfulcrum.tumblr.com/post/80826553604/asm-framework-50-the-missing-migration-guide was a useful resource while upgrading the code to use the new ASM5 opcodes. I also added a new regression tests in the `java8-tests` subproject; the existing tests were insufficient to catch this bug, which only affected Scala 2.11 user code which was compiled targeting Java 8. Author: Josh Rosen <joshrosen@databricks.com> Closes #9512 from JoshRosen/SPARK-6152.
-rw-r--r--core/pom.xml4
-rw-r--r--core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala25
-rw-r--r--docs/building-spark.md4
-rw-r--r--extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala27
-rw-r--r--graphx/pom.xml4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala16
-rw-r--r--pom.xml8
-rw-r--r--project/SparkBuild.scala26
-rw-r--r--repl/pom.xml4
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala9
-rw-r--r--sql/core/pom.xml5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala50
12 files changed, 125 insertions, 57 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 570a25cf32..7e1205a076 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -52,6 +52,10 @@
<artifactId>chill-java</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-asm5-shaded</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 1b49dca9dc..e27d2e6c94 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -21,8 +21,8 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.collection.mutable.{Map, Set}
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
+import org.apache.xbean.asm5.{ClassReader, ClassVisitor, MethodVisitor, Type}
+import org.apache.xbean.asm5.Opcodes._
import org.apache.spark.{Logging, SparkEnv, SparkException}
@@ -325,11 +325,11 @@ private[spark] object ClosureCleaner extends Logging {
private[spark] class ReturnStatementInClosureException
extends SparkException("Return statements aren't allowed in Spark closures")
-private class ReturnStatementFinder extends ClassVisitor(ASM4) {
+private class ReturnStatementFinder extends ClassVisitor(ASM5) {
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
if (name.contains("apply")) {
- new MethodVisitor(ASM4) {
+ new MethodVisitor(ASM5) {
override def visitTypeInsn(op: Int, tp: String) {
if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) {
throw new ReturnStatementInClosureException
@@ -337,7 +337,7 @@ private class ReturnStatementFinder extends ClassVisitor(ASM4) {
}
}
} else {
- new MethodVisitor(ASM4) {}
+ new MethodVisitor(ASM5) {}
}
}
}
@@ -361,7 +361,7 @@ private[util] class FieldAccessFinder(
findTransitively: Boolean,
specificMethod: Option[MethodIdentifier[_]] = None,
visitedMethods: Set[MethodIdentifier[_]] = Set.empty)
- extends ClassVisitor(ASM4) {
+ extends ClassVisitor(ASM5) {
override def visitMethod(
access: Int,
@@ -376,7 +376,7 @@ private[util] class FieldAccessFinder(
return null
}
- new MethodVisitor(ASM4) {
+ new MethodVisitor(ASM5) {
override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) {
if (op == GETFIELD) {
for (cl <- fields.keys if cl.getName == owner.replace('/', '.')) {
@@ -385,7 +385,8 @@ private[util] class FieldAccessFinder(
}
}
- override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) {
+ override def visitMethodInsn(
+ op: Int, owner: String, name: String, desc: String, itf: Boolean) {
for (cl <- fields.keys if cl.getName == owner.replace('/', '.')) {
// Check for calls a getter method for a variable in an interpreter wrapper object.
// This means that the corresponding field will be accessed, so we should save it.
@@ -408,7 +409,7 @@ private[util] class FieldAccessFinder(
}
}
-private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM4) {
+private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM5) {
var myName: String = null
// TODO: Recursively find inner closures that we indirectly reference, e.g.
@@ -423,9 +424,9 @@ private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
- new MethodVisitor(ASM4) {
- override def visitMethodInsn(op: Int, owner: String, name: String,
- desc: String) {
+ new MethodVisitor(ASM5) {
+ override def visitMethodInsn(
+ op: Int, owner: String, name: String, desc: String, itf: Boolean) {
val argTypes = Type.getArgumentTypes(desc)
if (op == INVOKESPECIAL && name == "<init>" && argTypes.length > 0
&& argTypes(0).toString.startsWith("L") // is it an object?
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 4f73adb854..3d38edbdad 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -190,6 +190,10 @@ Running only Java 8 tests and nothing else.
mvn install -DskipTests -Pjava8-tests
+or
+
+ sbt -Pjava8-tests java8-tests/test
+
Java 8 tests are run when `-Pjava8-tests` profile is enabled, they will run in spite of `-DskipTests`.
For these tests to run your system must have a JDK 8 installation.
If you have JDK 8 installed but it is not the system default, you can set JAVA_HOME to point to JDK 8 before running the tests.
diff --git a/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala b/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala
new file mode 100644
index 0000000000..fa0681db41
--- /dev/null
+++ b/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * Test cases where JDK8-compiled Scala user code is used with Spark.
+ */
+class JDK8ScalaSuite extends SparkFunSuite with SharedSparkContext {
+ test("basic RDD closure test (SPARK-6152)") {
+ sc.parallelize(1 to 1000).map(x => x * x).count()
+ }
+}
diff --git a/graphx/pom.xml b/graphx/pom.xml
index 987b831021..8cd66c5b2e 100644
--- a/graphx/pom.xml
+++ b/graphx/pom.xml
@@ -48,6 +48,10 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-asm5-shaded</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
index 74a7de18d4..a6d0cb6409 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
@@ -22,11 +22,10 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.collection.mutable.HashSet
import scala.language.existentials
-import org.apache.spark.util.Utils
-
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor}
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
+import org.apache.xbean.asm5.{ClassReader, ClassVisitor, MethodVisitor}
+import org.apache.xbean.asm5.Opcodes._
+import org.apache.spark.util.Utils
/**
* Includes an utility function to test whether a function accesses a specific attribute
@@ -107,18 +106,19 @@ private[graphx] object BytecodeUtils {
* MethodInvocationFinder("spark/graph/Foo", "test")
* its methodsInvoked variable will contain the set of methods invoked directly by
* Foo.test(). Interface invocations are not returned as part of the result set because we cannot
- * determine the actual metod invoked by inspecting the bytecode.
+ * determine the actual method invoked by inspecting the bytecode.
*/
private class MethodInvocationFinder(className: String, methodName: String)
- extends ClassVisitor(ASM4) {
+ extends ClassVisitor(ASM5) {
val methodsInvoked = new HashSet[(Class[_], String)]
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
if (name == methodName) {
- new MethodVisitor(ASM4) {
- override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) {
+ new MethodVisitor(ASM5) {
+ override def visitMethodInsn(
+ op: Int, owner: String, name: String, desc: String, itf: Boolean) {
if (op == INVOKEVIRTUAL || op == INVOKESPECIAL || op == INVOKESTATIC) {
if (!skipClass(owner)) {
methodsInvoked.add((Utils.classForName(owner.replace("/", ".")), name))
diff --git a/pom.xml b/pom.xml
index c499a80aa0..01afa80617 100644
--- a/pom.xml
+++ b/pom.xml
@@ -393,6 +393,14 @@
</exclusion>
</exclusions>
</dependency>
+ <!-- This artifact is a shaded version of ASM 5.0.4. The POM that was used to produce this
+ is at https://github.com/apache/geronimo-xbean/tree/xbean-4.4/xbean-asm5-shaded
+ For context on why we shade ASM, see SPARK-782 and SPARK-6152. -->
+ <dependency>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-asm5-shaded</artifactId>
+ <version>4.4</version>
+ </dependency>
<!-- Shaded deps marked as provided. These are promoted to compile scope
in the modules where we want the shaded classes to appear in the
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index b7c6192243..570c9e50ed 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -57,6 +57,9 @@ object BuildCommons {
val sparkHome = buildLocation
val testTempDir = s"$sparkHome/target/tmp"
+
+ val javacJVMVersion = settingKey[String]("source and target JVM version for javac")
+ val scalacJVMVersion = settingKey[String]("source and target JVM version for scalac")
}
object SparkBuild extends PomBuild {
@@ -154,9 +157,17 @@ object SparkBuild extends PomBuild {
if (major.toInt >= 1 && minor.toInt >= 8) Seq("-Xdoclint:all", "-Xdoclint:-missing") else Seq.empty
},
- javacOptions in Compile ++= Seq("-encoding", "UTF-8"),
+ javacJVMVersion := "1.7",
+ scalacJVMVersion := "1.7",
+
+ javacOptions in Compile ++= Seq(
+ "-encoding", "UTF-8",
+ "-source", javacJVMVersion.value,
+ "-target", javacJVMVersion.value
+ ),
scalacOptions in Compile ++= Seq(
+ s"-target:jvm-${scalacJVMVersion.value}",
"-sourcepath", (baseDirectory in ThisBuild).value.getAbsolutePath // Required for relative source links in scaladoc
),
@@ -241,8 +252,9 @@ object SparkBuild extends PomBuild {
enable(Flume.settings)(streamingFlumeSink)
- enable(DockerIntegrationTests.settings)(dockerIntegrationTests)
+ enable(Java8TestSettings.settings)(java8Tests)
+ enable(DockerIntegrationTests.settings)(dockerIntegrationTests)
/**
* Adds the ability to run the spark shell directly from SBT without building an assembly
@@ -591,6 +603,16 @@ object Unidoc {
)
}
+object Java8TestSettings {
+ import BuildCommons._
+
+ lazy val settings = Seq(
+ javacJVMVersion := "1.8",
+ // Targeting Java 8 bytecode is only supported in Scala 2.11.4 and higher:
+ scalacJVMVersion := (if (System.getProperty("scala-2.11") == "true") "1.8" else "1.7")
+ )
+}
+
object TestSettings {
import BuildCommons._
diff --git a/repl/pom.xml b/repl/pom.xml
index fb0a0e1286..154c99d23c 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -95,6 +95,10 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-test-tags_${scala.binary.version}</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-asm5-shaded</artifactId>
+ </dependency>
<!-- Explicit listing of transitive deps that are shaded. Otherwise, odd compiler crashes. -->
<dependency>
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index 004941d5f5..3d2d235a00 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -23,15 +23,14 @@ import java.net.{HttpURLConnection, URI, URL, URLEncoder}
import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.xbean.asm5._
+import org.apache.xbean.asm5.Opcodes._
import org.apache.spark.{SparkConf, SparkEnv, Logging}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
import org.apache.spark.util.ParentClassLoader
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm._
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
-
/**
* A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI,
* used to load classes defined by the interpreter when the REPL is used.
@@ -192,7 +191,7 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
}
class ConstructorCleaner(className: String, cv: ClassVisitor)
-extends ClassVisitor(ASM4, cv) {
+extends ClassVisitor(ASM5, cv) {
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
val mv = cv.visitMethod(access, name, desc, sig, exceptions)
@@ -202,7 +201,7 @@ extends ClassVisitor(ASM4, cv) {
// field in the class to point to it, but do nothing otherwise.
mv.visitCode()
mv.visitVarInsn(ALOAD, 0) // load this
- mv.visitMethodInsn(INVOKESPECIAL, "java/lang/Object", "<init>", "()V")
+ mv.visitMethodInsn(INVOKESPECIAL, "java/lang/Object", "<init>", "()V", false)
mv.visitVarInsn(ALOAD, 0) // load this
// val classType = className.replace('.', '/')
// mv.visitFieldInsn(PUTSTATIC, classType, "MODULE$", "L" + classType + ";")
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index c96855e261..9fd6b5a07e 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -110,6 +110,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-asm5-shaded</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 4b4f5c6c45..97162249d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -21,8 +21,8 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.collection.mutable
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm._
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
+import org.apache.xbean.asm5._
+import org.apache.xbean.asm5.Opcodes._
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
@@ -41,22 +41,20 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
l += 1L
l.add(1L)
}
- BoxingFinder.getClassReader(f.getClass).foreach { cl =>
- val boxingFinder = new BoxingFinder()
- cl.accept(boxingFinder, 0)
- assert(boxingFinder.boxingInvokes.isEmpty, s"Found boxing: ${boxingFinder.boxingInvokes}")
- }
+ val cl = BoxingFinder.getClassReader(f.getClass)
+ val boxingFinder = new BoxingFinder()
+ cl.accept(boxingFinder, 0)
+ assert(boxingFinder.boxingInvokes.isEmpty, s"Found boxing: ${boxingFinder.boxingInvokes}")
}
test("Normal accumulator should do boxing") {
// We need this test to make sure BoxingFinder works.
val l = sparkContext.accumulator(0L)
val f = () => { l += 1L }
- BoxingFinder.getClassReader(f.getClass).foreach { cl =>
- val boxingFinder = new BoxingFinder()
- cl.accept(boxingFinder, 0)
- assert(boxingFinder.boxingInvokes.nonEmpty, "Found find boxing in this test")
- }
+ val cl = BoxingFinder.getClassReader(f.getClass)
+ val boxingFinder = new BoxingFinder()
+ cl.accept(boxingFinder, 0)
+ assert(boxingFinder.boxingInvokes.nonEmpty, "Found find boxing in this test")
}
/**
@@ -486,7 +484,7 @@ private class BoxingFinder(
method: MethodIdentifier[_] = null,
val boxingInvokes: mutable.Set[String] = mutable.Set.empty,
visitedMethods: mutable.Set[MethodIdentifier[_]] = mutable.Set.empty)
- extends ClassVisitor(ASM4) {
+ extends ClassVisitor(ASM5) {
private val primitiveBoxingClassName =
Set("java/lang/Long",
@@ -503,11 +501,12 @@ private class BoxingFinder(
MethodVisitor = {
if (method != null && (method.name != name || method.desc != desc)) {
// If method is specified, skip other methods.
- return new MethodVisitor(ASM4) {}
+ return new MethodVisitor(ASM5) {}
}
- new MethodVisitor(ASM4) {
- override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) {
+ new MethodVisitor(ASM5) {
+ override def visitMethodInsn(
+ op: Int, owner: String, name: String, desc: String, itf: Boolean) {
if (op == INVOKESPECIAL && name == "<init>" || op == INVOKESTATIC && name == "valueOf") {
if (primitiveBoxingClassName.contains(owner)) {
// Find boxing methods, e.g, new java.lang.Long(l) or java.lang.Long.valueOf(l)
@@ -522,10 +521,9 @@ private class BoxingFinder(
if (!visitedMethods.contains(m)) {
// Keep track of visited methods to avoid potential infinite cycles
visitedMethods += m
- BoxingFinder.getClassReader(classOfMethodOwner).foreach { cl =>
- visitedMethods += m
- cl.accept(new BoxingFinder(m, boxingInvokes, visitedMethods), 0)
- }
+ val cl = BoxingFinder.getClassReader(classOfMethodOwner)
+ visitedMethods += m
+ cl.accept(new BoxingFinder(m, boxingInvokes, visitedMethods), 0)
}
}
}
@@ -535,22 +533,14 @@ private class BoxingFinder(
private object BoxingFinder {
- def getClassReader(cls: Class[_]): Option[ClassReader] = {
+ def getClassReader(cls: Class[_]): ClassReader = {
val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
val resourceStream = cls.getResourceAsStream(className)
val baos = new ByteArrayOutputStream(128)
// Copy data over, before delegating to ClassReader -
// else we can run out of open file handles.
Utils.copyStream(resourceStream, baos, true)
- // ASM4 doesn't support Java 8 classes, which requires ASM5.
- // So if the class is ASM5 (E.g., java.lang.Long when using JDK8 runtime to run these codes),
- // then ClassReader will throw IllegalArgumentException,
- // However, since this is only for testing, it's safe to skip these classes.
- try {
- Some(new ClassReader(new ByteArrayInputStream(baos.toByteArray)))
- } catch {
- case _: IllegalArgumentException => None
- }
+ new ClassReader(new ByteArrayInputStream(baos.toByteArray))
}
}