diff options
author | Josh Rosen <joshrosen@databricks.com> | 2016-04-11 10:53:26 -0700 |
---|---|---|
committer | Jakob Odersky <jakob@odersky.com> | 2017-04-24 14:09:49 -0700 |
commit | acf922a355ed21bb6ef56eb94c11802980faff65 (patch) | |
tree | 986e88827d7fed298df09488dedc3f7a22b1d949 | |
parent | 875c73b60a6fce9108e54fff5b6eedfafb39a9ce (diff) | |
download | spark-acf922a355ed21bb6ef56eb94c11802980faff65.tar.gz spark-acf922a355ed21bb6ef56eb94c11802980faff65.tar.bz2 spark-acf922a355ed21bb6ef56eb94c11802980faff65.zip |
Update Josh's changes for minimal REPL with scala 2.12
22 files changed, 291 insertions, 56 deletions
diff --git a/assembly/pom.xml b/assembly/pom.xml index 9d8607d913..b16bcd3528 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -49,11 +49,13 @@ <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> + <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> + --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index f312fa2b2d..ff9d8dd163 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -161,9 +161,14 @@ public final class UnsafeExternalSorter extends MemoryConsumer { // Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at // the end of the task. This is necessary to avoid memory leaks in when the downstream operator // does not fully consume the sorter's output (e.g. sort followed by limit). - taskContext.addTaskCompletionListener(context -> { - cleanupResources(); - }); + taskContext.addJavaFriendlyTaskCompletionListener( + new TaskCompletionListener() { + @Override + public void onTaskCompletion(TaskContext context) { + cleanupResources(); + } + } + ); } /** diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index a50600f148..947a50fbcc 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -103,6 +103,16 @@ trait FutureAction[T] extends Future[T] { */ def jobIds: Seq[Int] + // TODO(josh): We probably need to provide implementations for this. + // scalastyle:off + def transform[S]( + f: scala.util.Try[T] => scala.util.Try[S])( + implicit executor: scala.concurrent.ExecutionContext): scala.concurrent.Future[S] = ??? + + def transformWith[S]( + f: scala.util.Try[T] => scala.concurrent.Future[S])( + implicit executor: scala.concurrent.ExecutionContext): scala.concurrent.Future[S] = ??? + // scalastyle:on } diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 0b87cd503d..742828a227 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -103,6 +103,10 @@ abstract class TaskContext extends Serializable { @deprecated("Local execution was removed, so this always returns false", "2.0.0") def isRunningLocally(): Boolean + // TODO(josh): this used to be an overload of addTaskCompletionListener(), but the overload + // became ambiguous under Scala 2.12. For now, I'm renaming this in order to get the code to + // compile, but we need to figure out a long-term solution which maintains at least source + // compatibility (and probably binary compatibility) for Java callers. /** * Adds a (Java friendly) listener to be executed on task completion. * This will be called in all situations - success, failure, or cancellation. Adding a listener @@ -112,7 +116,7 @@ abstract class TaskContext extends Serializable { * * Exceptions thrown by the listener will result in failure of the task. */ - def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext + def addJavaFriendlyTaskCompletionListener(listener: TaskCompletionListener): TaskContext /** * Adds a listener in the form of a Scala closure to be executed on task completion. @@ -124,23 +128,27 @@ abstract class TaskContext extends Serializable { * Exceptions thrown by the listener will result in failure of the task. */ def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = { - addTaskCompletionListener(new TaskCompletionListener { + addJavaFriendlyTaskCompletionListener(new TaskCompletionListener { override def onTaskCompletion(context: TaskContext): Unit = f(context) }) } + // TODO(josh): this used to be an overload of addTaskFailureListener(), but the overload + // became ambiguous under Scala 2.12. For now, I'm renaming this in order to get the code to + // compile, but we need to figure out a long-term solution which maintains at least source + // compatibility (and probably binary compatibility) for Java callers. /** * Adds a listener to be executed on task failure. Adding a listener to an already failed task * will result in that listener being called immediately. */ - def addTaskFailureListener(listener: TaskFailureListener): TaskContext + def addJavaFriendlyTaskFailureListener(listener: TaskFailureListener): TaskContext /** * Adds a listener to be executed on task failure. Adding a listener to an already failed task * will result in that listener being called immediately. */ def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext = { - addTaskFailureListener(new TaskFailureListener { + addJavaFriendlyTaskFailureListener(new TaskFailureListener { override def onTaskFailure(context: TaskContext, error: Throwable): Unit = f(context, error) }) } diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 489688cb08..fae1bd2a76 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -156,7 +156,8 @@ private[spark] object ClosureCleaner extends Logging { accessedFields: Map[Class[_], Set[String]]): Unit = { if (!isClosure(func.getClass)) { - logWarning("Expected a closure; got " + func.getClass.getName) + // TODO: pass the other options as well + LambdaClosureCleaner.clean(func) return } @@ -289,7 +290,7 @@ private[spark] object ClosureCleaner extends Logging { } } - private def ensureSerializable(func: AnyRef) { + private[util] def ensureSerializable(func: AnyRef) { try { if (SparkEnv.get != null) { SparkEnv.get.closureSerializer.newInstance().serialize(func) diff --git a/core/src/main/scala/org/apache/spark/util/LambdaClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/LambdaClosureCleaner.scala new file mode 100644 index 0000000000..96f3fc81fc --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/LambdaClosureCleaner.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.lang.reflect.Method + +import org.apache.xbean.asm5.{ClassVisitor, MethodVisitor} +import org.apache.xbean.asm5.Opcodes._ + +import org.apache.spark.internal.Logging + + +private[spark] object LambdaClosureCleaner extends Logging { + + private[util] def clean(closure: AnyRef): Unit = { + val writeReplaceMethod: Method = try { + closure.getClass.getDeclaredMethod("writeReplace") + } catch { + case e: java.lang.NoSuchMethodException => + logWarning("Expected a Java lambda; got " + closure.getClass.getName) + return + } + + writeReplaceMethod.setAccessible(true) + // Because we still need to support Java 7, we must use reflection here. + val serializedLambda: AnyRef = writeReplaceMethod.invoke(closure) + if (serializedLambda.getClass.getName != "java.lang.invoke.SerializedLambda") { + logWarning("Closure's writeReplace() method " + + s"returned ${serializedLambda.getClass.getName}, not SerializedLambda") + return + } + + val serializedLambdaClass = Utils.classForName("java.lang.invoke.SerializedLambda") + + val implClassName = serializedLambdaClass + .getDeclaredMethod("getImplClass").invoke(serializedLambda).asInstanceOf[String] + // TODO: we do not want to unconditionally strip this suffix. + val implMethodName = { + serializedLambdaClass + .getDeclaredMethod("getImplMethodName").invoke(serializedLambda).asInstanceOf[String] + .stripSuffix("$adapted") + } + val implMethodSignature = serializedLambdaClass + .getDeclaredMethod("getImplMethodSignature").invoke(serializedLambda).asInstanceOf[String] + val capturedArgCount = serializedLambdaClass + .getDeclaredMethod("getCapturedArgCount").invoke(serializedLambda).asInstanceOf[Int] + val capturedArgs = (0 until capturedArgCount).map { argNum: Int => + serializedLambdaClass + .getDeclaredMethod("getCapturedArg", java.lang.Integer.TYPE) + .invoke(serializedLambda, argNum.asInstanceOf[Object]) + }.toSeq + assert(capturedArgs.size == capturedArgCount) + val implClass = Utils.classForName(implClassName.replaceAllLiterally("/", ".")) + + // Fail fast if we detect return statements in closures. + // TODO: match the impl method based on its type signature as well, not just its name. + ClosureCleaner + .getClassReader(implClass) + .accept(new LambdaReturnStatementFinder(implMethodName), 0) + + // Check serializable TODO: add flag + ClosureCleaner.ensureSerializable(closure) + capturedArgs.foreach(ClosureCleaner.clean(_)) + + // TODO: null fields to render the closure serializable? + } +} + + +private class LambdaReturnStatementFinder(targetMethodName: String) extends ClassVisitor(ASM5) { + override def visitMethod( + access: Int, + name: String, + desc: String, + sig: String, + exceptions: Array[String]): MethodVisitor = { + if (name == targetMethodName) { + new MethodVisitor(ASM5) { + override def visitTypeInsn(op: Int, tp: String) { + if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) { + throw new ReturnStatementInClosureException + } + } + } + } else { + new MethodVisitor(ASM5) {} + } + } +}
\ No newline at end of file diff --git a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java index 94f5805853..ee8fd81cee 100644 --- a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java +++ b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java @@ -32,8 +32,8 @@ public class JavaTaskContextCompileCheck { tc.isCompleted(); tc.isInterrupted(); - tc.addTaskCompletionListener(new JavaTaskCompletionListenerImpl()); - tc.addTaskFailureListener(new JavaTaskFailureListenerImpl()); + tc.addJavaFriendlyTaskCompletionListener(new JavaTaskCompletionListenerImpl()); + tc.addJavaFriendlyTaskFailureListener(new JavaTaskFailureListenerImpl()); tc.attemptNumber(); tc.partitionId(); @@ -52,7 +52,7 @@ public class JavaTaskContextCompileCheck { context.isInterrupted(); context.stageId(); context.partitionId(); - context.addTaskCompletionListener(this); + context.addJavaFriendlyTaskCompletionListener(this); } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index 8f576daa77..5f13667c21 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -76,7 +76,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val rdd = new RDD[String](sc, List()) { override def getPartitions = Array[Partition](StubPartition(0)) override def compute(split: Partition, context: TaskContext) = { - context.addTaskFailureListener((context, error) => TaskContextSuite.lastError = error) + context.addTaskFailureListener( + (context, error) => TaskContextSuite.lastError = error) sys.error("damn error") } } @@ -96,7 +97,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val context = TaskContext.empty() val listener = mock(classOf[TaskCompletionListener]) context.addTaskCompletionListener(_ => throw new Exception("blah")) - context.addTaskCompletionListener(listener) + context.addJavaFriendlyTaskCompletionListener(listener) context.addTaskCompletionListener(_ => throw new Exception("blah")) intercept[TaskCompletionListenerException] { @@ -110,7 +111,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val context = TaskContext.empty() val listener = mock(classOf[TaskFailureListener]) context.addTaskFailureListener((_, _) => throw new Exception("exception in listener1")) - context.addTaskFailureListener(listener) + context.addJavaFriendlyTaskFailureListener(listener) context.addTaskFailureListener((_, _) => throw new Exception("exception in listener3")) val e = intercept[TaskCompletionListenerException] { diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala index 934385fbca..d7682ab8cc 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala @@ -141,6 +141,9 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri } test("get inner closure classes") { + // Skip on Scala 2.12 for now, since we may not need to determine accessed fields anymore. + assume(!scala.util.Properties.versionString.startsWith("version 2.12.")) + val closure1 = () => 1 val closure2 = () => { () => 1 } val closure3 = (i: Int) => { @@ -188,18 +191,20 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri assert(outerClasses1.isEmpty) assert(outerClasses2.isEmpty) - // These closures do have $outer pointers because they ultimately reference `this` - // The first $outer pointer refers to the closure defines this test (see FunSuite#test) - // The second $outer pointer refers to ClosureCleanerSuite2 - assert(outerClasses3.size === 2) - assert(outerClasses4.size === 2) - assert(isClosure(outerClasses3(0))) - assert(isClosure(outerClasses4(0))) - assert(outerClasses3(0) === outerClasses4(0)) // part of the same "FunSuite#test" scope - assert(outerClasses3(1) === this.getClass) - assert(outerClasses4(1) === this.getClass) - assert(outerObjects3(1) === this) - assert(outerObjects4(1) === this) + if (!scala.util.Properties.versionString.startsWith("version 2.12.")) { + // These closures do have $outer pointers because they ultimately reference `this` + // The first $outer pointer refers to the closure defines this test (see FunSuite#test) + // The second $outer pointer refers to ClosureCleanerSuite2 + assert(outerClasses3.size === 2) + assert(outerClasses4.size === 2) + assert(isClosure(outerClasses3(0))) + assert(isClosure(outerClasses4(0))) + assert(outerClasses3(0) === outerClasses4(0)) // part of the same "FunSuite#test" scope + assert(outerClasses3(1) === this.getClass) + assert(outerClasses4(1) === this.getClass) + assert(outerObjects3(1) === this) + assert(outerObjects4(1) === this) + } } test("get outer classes and objects with nesting") { @@ -231,22 +236,24 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri assert(outerClasses3.size === outerObjects3.size) // Same as above, this closure only references local variables assert(outerClasses1.isEmpty) - // This closure references the "test2" scope because it needs to find the method `y` - // Scope hierarchy: "test2" < "FunSuite#test" < ClosureCleanerSuite2 - assert(outerClasses2.size === 3) - // This closure references the "test2" scope because it needs to find the `localValue` - // defined outside of this scope - assert(outerClasses3.size === 3) - assert(isClosure(outerClasses2(0))) - assert(isClosure(outerClasses3(0))) - assert(isClosure(outerClasses2(1))) - assert(isClosure(outerClasses3(1))) - assert(outerClasses2(0) === outerClasses3(0)) // part of the same "test2" scope - assert(outerClasses2(1) === outerClasses3(1)) // part of the same "FunSuite#test" scope - assert(outerClasses2(2) === this.getClass) - assert(outerClasses3(2) === this.getClass) - assert(outerObjects2(2) === this) - assert(outerObjects3(2) === this) + if (!scala.util.Properties.versionString.startsWith("version 2.12.")) { + // This closure references the "test2" scope because it needs to find the method `y` + // Scope hierarchy: "test2" < "FunSuite#test" < ClosureCleanerSuite2 + assert(outerClasses2.size === 3) + // This closure references the "test2" scope because it needs to find the `localValue` + // defined outside of this scope + assert(outerClasses3.size === 3) + assert(isClosure(outerClasses2(0))) + assert(isClosure(outerClasses3(0))) + assert(isClosure(outerClasses2(1))) + assert(isClosure(outerClasses3(1))) + assert(outerClasses2(0) === outerClasses3(0)) // part of the same "test2" scope + assert(outerClasses2(1) === outerClasses3(1)) // part of the same "FunSuite#test" scope + assert(outerClasses2(2) === this.getClass) + assert(outerClasses3(2) === this.getClass) + assert(outerObjects2(2) === this) + assert(outerObjects3(2) === this) + } } test1() @@ -254,6 +261,9 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri } test("find accessed fields") { + // Skip on Scala 2.12 for now, since we may not need to determine accessed fields anymore. + assume(!scala.util.Properties.versionString.startsWith("version 2.12.")) + val localValue = someSerializableValue val closure1 = () => 1 val closure2 = () => localValue @@ -292,6 +302,9 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri } test("find accessed fields with nesting") { + // Skip on Scala 2.12 for now, since we may not need to determine accessed fields anymore. + assume(!scala.util.Properties.versionString.startsWith("version 2.12.")) + val localValue = someSerializableValue val test1 = () => { @@ -538,13 +551,19 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri // which is itself not serializable because it has a pointer to the ClosureCleanerSuite2. // If we do not clean transitively, we will not null out this indirect reference. verifyCleaning( - inner2, serializableBefore = false, serializableAfter = false, transitive = false) + inner2, + serializableBefore = scala.util.Properties.versionString.startsWith("version 2.12."), + serializableAfter = scala.util.Properties.versionString.startsWith("version 2.12."), + transitive = false) // If we clean transitively, we will find that method `a` does not actually reference the // outer closure's parent (i.e. the ClosureCleanerSuite), so we can additionally null out // the outer closure's parent pointer. This will make `inner2` serializable. verifyCleaning( - inner2, serializableBefore = false, serializableAfter = true, transitive = true) + inner2, + serializableBefore = scala.util.Properties.versionString.startsWith("version 2.12."), + serializableAfter = true, + transitive = true) } // Same as above, but with more levels of nesting diff --git a/dev/change-scala-version.sh b/dev/change-scala-version.sh index 6c4077c584..6437e879de 100755 --- a/dev/change-scala-version.sh +++ b/dev/change-scala-version.sh @@ -19,7 +19,7 @@ set -e -VALID_VERSIONS=( 2.10 2.11 2.12 ) +VALID_VERSIONS=( 2.10 2.11 2.12.0-M4 ) usage() { echo "Usage: $(basename $0) [-h|--help] <version> @@ -93,8 +93,11 @@ <module>common/tags</module> <module>core</module> <module>graphx</module> - <module>mllib</module> - <module>mllib-local</module> + <!-- TODO 2.12: the ML and Kafka modules would typically be defined here, but because their + dependencies are not 2.12-ready we need to conditionally enable those modules via the + 2.11 and 2.10 build profiles --> + <!-- <module>mllib</module> --> + <!-- <module>mllib-local</module> --> <module>tools</module> <module>streaming</module> <module>sql/catalyst</module> @@ -2586,6 +2589,22 @@ </plugin> </plugins> </build> + <!-- TODO 2.12: remove these from here and always enable them in root POM once these modules' + dependencies are 2.12-ready --> + <modules> + <module>mllib</module> + <module>external/kafka</module> + <module>external/kafka-assembly</module> + </modules> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>${jline.groupid}</groupId> + <artifactId>jline</artifactId> + <version>${jline.version}</version> + </dependency> + </dependencies> + </dependencyManagement> </profile> <profile> @@ -2634,6 +2653,38 @@ </plugin> </plugins> </build> + <!-- TODO 2.12: remove these from here and always enable them in root POM once these modules' + dependencies are 2.12-ready --> + <modules> + <module>mllib</module> + <module>mllib-local</module> + <module>external/kafka</module> + <module>external/kafka-assembly</module> + </modules> + </profile> + + <profile> + <id>scala-2.12</id> + <activation> + <property><name>scala-2.12</name></property> + </activation> + <properties> + <scala.version>2.12.0</scala.version> + <scala.binary.version>2.12.0</scala.binary.version> + <!-- This corresponds to https://github.com/twitter/chill/pull/253 --> + <chill.version>0.8.1-SNAPSHOT</chill.version> + <!-- This incorporates https://github.com/FasterXML/jackson-module-scala/pull/247 --> + <fasterxml.jackson.version>2.7.3-SNAPSHOT</fasterxml.jackson.version> + </properties> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.scala-lang.modules</groupId> + <artifactId>scala-parser-combinators_${scala.binary.version}</artifactId> + <version>1.0.4</version> + </dependency> + </dependencies> + </dependencyManagement> </profile> <!-- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index e52baf51ae..ef3cc787e2 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -98,6 +98,10 @@ object SparkBuild extends PomBuild { // in the same way as Maven which handles -Dname as -Dname=true before executes build process. // see: https://github.com/apache/maven/blob/maven-3.0.4/maven-embedder/src/main/java/org/apache/maven/cli/MavenCli.java#L1082 System.setProperty("scala-2.10", "true") + } else if (System.getProperty("scala-2.11") == "") { + System.setProperty("scala-2.11", "true") + } else if (System.getProperty("scala-2.12") == "") { + System.setProperty("scala-2.12", "true") } profiles } @@ -759,8 +763,10 @@ object TestSettings { private val scalaBinaryVersion = if (System.getProperty("scala-2.10") == "true") { "2.10" - } else { + } else if (System.getProperty("scala-2.11") == "true") { "2.11" + } else { + "2.12" } lazy val settings = Seq ( diff --git a/repl/pom.xml b/repl/pom.xml index a256ae3b84..8118e73a72 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -49,12 +49,14 @@ <type>test-jar</type> <scope>test</scope> </dependency> + <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>runtime</scope> </dependency> + --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 765c92b8d3..a91fd22e93 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -100,6 +100,20 @@ <artifactId>commons-codec</artifactId> </dependency> </dependencies> + <profiles> + <profile> + <id>scala-2.12</id> + <activation> + <property><name>scala-2.12</name></property> + </activation> + <dependencies> + <dependency> + <groupId>org.scala-lang.modules</groupId> + <artifactId>scala-parser-combinators_${scala.binary.version}</artifactId> + </dependency> + </dependencies> + </profile> + </profiles> <build> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 82710a2a18..85a21d7cdd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -687,6 +687,7 @@ object ScalaReflection extends ScalaReflection { def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match { case Schema(s: StructType, _) => s.toAttributes + case _ => throw new Exception("Expected schema!") } /** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala index 41128fe389..ff124f7b63 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala @@ -38,9 +38,13 @@ package object codegen { } } + + // Commented out because the nsc import doesn't work with 2.12 + /** * Dumps the bytecode from a class to the screen using javap. */ + /* object DumpByteCode { import scala.sys.process._ val dumpDirectory = Utils.createTempDir() @@ -70,4 +74,5 @@ package object codegen { // scalastyle:on println } } + */ } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala index 3aa4bf619f..beed5d3e39 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala @@ -177,7 +177,8 @@ object Metadata { private def toJsonValue(obj: Any): JValue = { obj match { case map: Map[_, _] => - val fields = map.toList.map { case (k: String, v) => (k, toJsonValue(v)) } + // TODO(josh): exhaustivity + val fields = map.toList.map { case (k, v) => (k.toString, toJsonValue(v)) } JObject(fields) case arr: Array[_] => val values = arr.toList.map(toJsonValue) diff --git a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java b/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java index ec9c107b1c..f3bc7f58b5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java +++ b/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java @@ -53,9 +53,12 @@ public class typed { * * @since 2.0.0 */ + // TODO(josh): re-enable after SAM fix + /* public static <T> TypedColumn<T, Long> count(MapFunction<T, Object> f) { return new TypedCount<T>(f).toColumnJava(); } + */ /** * Sum aggregate function for floating point (double) type. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index c6dcd93bbd..13b206ca84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2325,7 +2325,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def foreachPartition(func: ForeachPartitionFunction[T]): Unit = - foreachPartition(it => func.call(it.asJava)) + foreachPartition((it: Iterator[T]) => func.call(it.asJava)) /** * Returns the first `n` rows in the Dataset. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala index aa789af6f8..0422eaf53e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala @@ -177,6 +177,7 @@ object AggUtils { case agg @ AggregateExpression(aggregateFunction, mode, true, _) => aggregateFunction.transformDown(distinctColumnAttributeLookup) .asInstanceOf[AggregateFunction] + case other => throw new MatchError(other) } val partialDistinctAggregate: SparkPlan = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala index 1dae5f6964..b3ee8f485c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala @@ -37,7 +37,7 @@ class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Dou override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]() // Java api support - def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x).asInstanceOf[Double]) + def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double]) def toColumnJava: TypedColumn[IN, java.lang.Double] = { toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]] @@ -55,7 +55,7 @@ class TypedSumLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] { override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]() // Java api support - def this(f: MapFunction[IN, java.lang.Long]) = this(x => f.call(x).asInstanceOf[Long]) + def this(f: MapFunction[IN, java.lang.Long]) = this((x: IN) => f.call(x).asInstanceOf[Long]) def toColumnJava: TypedColumn[IN, java.lang.Long] = { toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]] @@ -75,7 +75,8 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] { override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]() // Java api support - def this(f: MapFunction[IN, Object]) = this(x => f.call(x)) + // TODO(josh): uncomment this definition / use shims for 2.12 SAM compatibility + // def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x)) def toColumnJava: TypedColumn[IN, java.lang.Long] = { toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]] } @@ -94,7 +95,7 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]() // Java api support - def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x).asInstanceOf[Double]) + def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double]) def toColumnJava: TypedColumn[IN, java.lang.Double] = { toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]] } diff --git a/tools/pom.xml b/tools/pom.xml index 938ba2f6ac..64fdc67712 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -44,7 +44,7 @@ <dependency> <groupId>org.clapper</groupId> <artifactId>classutil_${scala.binary.version}</artifactId> - <version>1.0.6</version> + <version>1.0.8</version> </dependency> </dependencies> |