aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-04-11 10:53:26 -0700
committerJakob Odersky <jakob@odersky.com>2017-04-24 14:09:49 -0700
commitacf922a355ed21bb6ef56eb94c11802980faff65 (patch)
tree986e88827d7fed298df09488dedc3f7a22b1d949
parent875c73b60a6fce9108e54fff5b6eedfafb39a9ce (diff)
downloadspark-acf922a355ed21bb6ef56eb94c11802980faff65.tar.gz
spark-acf922a355ed21bb6ef56eb94c11802980faff65.tar.bz2
spark-acf922a355ed21bb6ef56eb94c11802980faff65.zip
Update Josh's changes for minimal REPL with scala 2.12
-rw-r--r--assembly/pom.xml2
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java11
-rw-r--r--core/src/main/scala/org/apache/spark/FutureAction.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/LambdaClosureCleaner.scala104
-rw-r--r--core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala79
-rwxr-xr-xdev/change-scala-version.sh2
-rw-r--r--pom.xml55
-rw-r--r--project/SparkBuild.scala8
-rw-r--r--repl/pom.xml2
-rw-r--r--sql/catalyst/pom.xml14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala3
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala9
-rw-r--r--tools/pom.xml2
22 files changed, 291 insertions, 56 deletions
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 9d8607d913..b16bcd3528 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -49,11 +49,13 @@
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
+ <!--
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
+ -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index f312fa2b2d..ff9d8dd163 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -161,9 +161,14 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the sorter's output (e.g. sort followed by limit).
- taskContext.addTaskCompletionListener(context -> {
- cleanupResources();
- });
+ taskContext.addJavaFriendlyTaskCompletionListener(
+ new TaskCompletionListener() {
+ @Override
+ public void onTaskCompletion(TaskContext context) {
+ cleanupResources();
+ }
+ }
+ );
}
/**
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index a50600f148..947a50fbcc 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -103,6 +103,16 @@ trait FutureAction[T] extends Future[T] {
*/
def jobIds: Seq[Int]
+ // TODO(josh): We probably need to provide implementations for this.
+ // scalastyle:off
+ def transform[S](
+ f: scala.util.Try[T] => scala.util.Try[S])(
+ implicit executor: scala.concurrent.ExecutionContext): scala.concurrent.Future[S] = ???
+
+ def transformWith[S](
+ f: scala.util.Try[T] => scala.concurrent.Future[S])(
+ implicit executor: scala.concurrent.ExecutionContext): scala.concurrent.Future[S] = ???
+ // scalastyle:on
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 0b87cd503d..742828a227 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -103,6 +103,10 @@ abstract class TaskContext extends Serializable {
@deprecated("Local execution was removed, so this always returns false", "2.0.0")
def isRunningLocally(): Boolean
+ // TODO(josh): this used to be an overload of addTaskCompletionListener(), but the overload
+ // became ambiguous under Scala 2.12. For now, I'm renaming this in order to get the code to
+ // compile, but we need to figure out a long-term solution which maintains at least source
+ // compatibility (and probably binary compatibility) for Java callers.
/**
* Adds a (Java friendly) listener to be executed on task completion.
* This will be called in all situations - success, failure, or cancellation. Adding a listener
@@ -112,7 +116,7 @@ abstract class TaskContext extends Serializable {
*
* Exceptions thrown by the listener will result in failure of the task.
*/
- def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext
+ def addJavaFriendlyTaskCompletionListener(listener: TaskCompletionListener): TaskContext
/**
* Adds a listener in the form of a Scala closure to be executed on task completion.
@@ -124,23 +128,27 @@ abstract class TaskContext extends Serializable {
* Exceptions thrown by the listener will result in failure of the task.
*/
def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = {
- addTaskCompletionListener(new TaskCompletionListener {
+ addJavaFriendlyTaskCompletionListener(new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = f(context)
})
}
+ // TODO(josh): this used to be an overload of addTaskFailureListener(), but the overload
+ // became ambiguous under Scala 2.12. For now, I'm renaming this in order to get the code to
+ // compile, but we need to figure out a long-term solution which maintains at least source
+ // compatibility (and probably binary compatibility) for Java callers.
/**
* Adds a listener to be executed on task failure. Adding a listener to an already failed task
* will result in that listener being called immediately.
*/
- def addTaskFailureListener(listener: TaskFailureListener): TaskContext
+ def addJavaFriendlyTaskFailureListener(listener: TaskFailureListener): TaskContext
/**
* Adds a listener to be executed on task failure. Adding a listener to an already failed task
* will result in that listener being called immediately.
*/
def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext = {
- addTaskFailureListener(new TaskFailureListener {
+ addJavaFriendlyTaskFailureListener(new TaskFailureListener {
override def onTaskFailure(context: TaskContext, error: Throwable): Unit = f(context, error)
})
}
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 489688cb08..fae1bd2a76 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -156,7 +156,8 @@ private[spark] object ClosureCleaner extends Logging {
accessedFields: Map[Class[_], Set[String]]): Unit = {
if (!isClosure(func.getClass)) {
- logWarning("Expected a closure; got " + func.getClass.getName)
+ // TODO: pass the other options as well
+ LambdaClosureCleaner.clean(func)
return
}
@@ -289,7 +290,7 @@ private[spark] object ClosureCleaner extends Logging {
}
}
- private def ensureSerializable(func: AnyRef) {
+ private[util] def ensureSerializable(func: AnyRef) {
try {
if (SparkEnv.get != null) {
SparkEnv.get.closureSerializer.newInstance().serialize(func)
diff --git a/core/src/main/scala/org/apache/spark/util/LambdaClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/LambdaClosureCleaner.scala
new file mode 100644
index 0000000000..96f3fc81fc
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/LambdaClosureCleaner.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.lang.reflect.Method
+
+import org.apache.xbean.asm5.{ClassVisitor, MethodVisitor}
+import org.apache.xbean.asm5.Opcodes._
+
+import org.apache.spark.internal.Logging
+
+
+private[spark] object LambdaClosureCleaner extends Logging {
+
+ private[util] def clean(closure: AnyRef): Unit = {
+ val writeReplaceMethod: Method = try {
+ closure.getClass.getDeclaredMethod("writeReplace")
+ } catch {
+ case e: java.lang.NoSuchMethodException =>
+ logWarning("Expected a Java lambda; got " + closure.getClass.getName)
+ return
+ }
+
+ writeReplaceMethod.setAccessible(true)
+ // Because we still need to support Java 7, we must use reflection here.
+ val serializedLambda: AnyRef = writeReplaceMethod.invoke(closure)
+ if (serializedLambda.getClass.getName != "java.lang.invoke.SerializedLambda") {
+ logWarning("Closure's writeReplace() method " +
+ s"returned ${serializedLambda.getClass.getName}, not SerializedLambda")
+ return
+ }
+
+ val serializedLambdaClass = Utils.classForName("java.lang.invoke.SerializedLambda")
+
+ val implClassName = serializedLambdaClass
+ .getDeclaredMethod("getImplClass").invoke(serializedLambda).asInstanceOf[String]
+ // TODO: we do not want to unconditionally strip this suffix.
+ val implMethodName = {
+ serializedLambdaClass
+ .getDeclaredMethod("getImplMethodName").invoke(serializedLambda).asInstanceOf[String]
+ .stripSuffix("$adapted")
+ }
+ val implMethodSignature = serializedLambdaClass
+ .getDeclaredMethod("getImplMethodSignature").invoke(serializedLambda).asInstanceOf[String]
+ val capturedArgCount = serializedLambdaClass
+ .getDeclaredMethod("getCapturedArgCount").invoke(serializedLambda).asInstanceOf[Int]
+ val capturedArgs = (0 until capturedArgCount).map { argNum: Int =>
+ serializedLambdaClass
+ .getDeclaredMethod("getCapturedArg", java.lang.Integer.TYPE)
+ .invoke(serializedLambda, argNum.asInstanceOf[Object])
+ }.toSeq
+ assert(capturedArgs.size == capturedArgCount)
+ val implClass = Utils.classForName(implClassName.replaceAllLiterally("/", "."))
+
+ // Fail fast if we detect return statements in closures.
+ // TODO: match the impl method based on its type signature as well, not just its name.
+ ClosureCleaner
+ .getClassReader(implClass)
+ .accept(new LambdaReturnStatementFinder(implMethodName), 0)
+
+ // Check serializable TODO: add flag
+ ClosureCleaner.ensureSerializable(closure)
+ capturedArgs.foreach(ClosureCleaner.clean(_))
+
+ // TODO: null fields to render the closure serializable?
+ }
+}
+
+
+private class LambdaReturnStatementFinder(targetMethodName: String) extends ClassVisitor(ASM5) {
+ override def visitMethod(
+ access: Int,
+ name: String,
+ desc: String,
+ sig: String,
+ exceptions: Array[String]): MethodVisitor = {
+ if (name == targetMethodName) {
+ new MethodVisitor(ASM5) {
+ override def visitTypeInsn(op: Int, tp: String) {
+ if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) {
+ throw new ReturnStatementInClosureException
+ }
+ }
+ }
+ } else {
+ new MethodVisitor(ASM5) {}
+ }
+ }
+} \ No newline at end of file
diff --git a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
index 94f5805853..ee8fd81cee 100644
--- a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
+++ b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
@@ -32,8 +32,8 @@ public class JavaTaskContextCompileCheck {
tc.isCompleted();
tc.isInterrupted();
- tc.addTaskCompletionListener(new JavaTaskCompletionListenerImpl());
- tc.addTaskFailureListener(new JavaTaskFailureListenerImpl());
+ tc.addJavaFriendlyTaskCompletionListener(new JavaTaskCompletionListenerImpl());
+ tc.addJavaFriendlyTaskFailureListener(new JavaTaskFailureListenerImpl());
tc.attemptNumber();
tc.partitionId();
@@ -52,7 +52,7 @@ public class JavaTaskContextCompileCheck {
context.isInterrupted();
context.stageId();
context.partitionId();
- context.addTaskCompletionListener(this);
+ context.addJavaFriendlyTaskCompletionListener(this);
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 8f576daa77..5f13667c21 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -76,7 +76,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val rdd = new RDD[String](sc, List()) {
override def getPartitions = Array[Partition](StubPartition(0))
override def compute(split: Partition, context: TaskContext) = {
- context.addTaskFailureListener((context, error) => TaskContextSuite.lastError = error)
+ context.addTaskFailureListener(
+ (context, error) => TaskContextSuite.lastError = error)
sys.error("damn error")
}
}
@@ -96,7 +97,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val context = TaskContext.empty()
val listener = mock(classOf[TaskCompletionListener])
context.addTaskCompletionListener(_ => throw new Exception("blah"))
- context.addTaskCompletionListener(listener)
+ context.addJavaFriendlyTaskCompletionListener(listener)
context.addTaskCompletionListener(_ => throw new Exception("blah"))
intercept[TaskCompletionListenerException] {
@@ -110,7 +111,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val context = TaskContext.empty()
val listener = mock(classOf[TaskFailureListener])
context.addTaskFailureListener((_, _) => throw new Exception("exception in listener1"))
- context.addTaskFailureListener(listener)
+ context.addJavaFriendlyTaskFailureListener(listener)
context.addTaskFailureListener((_, _) => throw new Exception("exception in listener3"))
val e = intercept[TaskCompletionListenerException] {
diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
index 934385fbca..d7682ab8cc 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
@@ -141,6 +141,9 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri
}
test("get inner closure classes") {
+ // Skip on Scala 2.12 for now, since we may not need to determine accessed fields anymore.
+ assume(!scala.util.Properties.versionString.startsWith("version 2.12."))
+
val closure1 = () => 1
val closure2 = () => { () => 1 }
val closure3 = (i: Int) => {
@@ -188,18 +191,20 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri
assert(outerClasses1.isEmpty)
assert(outerClasses2.isEmpty)
- // These closures do have $outer pointers because they ultimately reference `this`
- // The first $outer pointer refers to the closure defines this test (see FunSuite#test)
- // The second $outer pointer refers to ClosureCleanerSuite2
- assert(outerClasses3.size === 2)
- assert(outerClasses4.size === 2)
- assert(isClosure(outerClasses3(0)))
- assert(isClosure(outerClasses4(0)))
- assert(outerClasses3(0) === outerClasses4(0)) // part of the same "FunSuite#test" scope
- assert(outerClasses3(1) === this.getClass)
- assert(outerClasses4(1) === this.getClass)
- assert(outerObjects3(1) === this)
- assert(outerObjects4(1) === this)
+ if (!scala.util.Properties.versionString.startsWith("version 2.12.")) {
+ // These closures do have $outer pointers because they ultimately reference `this`
+ // The first $outer pointer refers to the closure defines this test (see FunSuite#test)
+ // The second $outer pointer refers to ClosureCleanerSuite2
+ assert(outerClasses3.size === 2)
+ assert(outerClasses4.size === 2)
+ assert(isClosure(outerClasses3(0)))
+ assert(isClosure(outerClasses4(0)))
+ assert(outerClasses3(0) === outerClasses4(0)) // part of the same "FunSuite#test" scope
+ assert(outerClasses3(1) === this.getClass)
+ assert(outerClasses4(1) === this.getClass)
+ assert(outerObjects3(1) === this)
+ assert(outerObjects4(1) === this)
+ }
}
test("get outer classes and objects with nesting") {
@@ -231,22 +236,24 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri
assert(outerClasses3.size === outerObjects3.size)
// Same as above, this closure only references local variables
assert(outerClasses1.isEmpty)
- // This closure references the "test2" scope because it needs to find the method `y`
- // Scope hierarchy: "test2" < "FunSuite#test" < ClosureCleanerSuite2
- assert(outerClasses2.size === 3)
- // This closure references the "test2" scope because it needs to find the `localValue`
- // defined outside of this scope
- assert(outerClasses3.size === 3)
- assert(isClosure(outerClasses2(0)))
- assert(isClosure(outerClasses3(0)))
- assert(isClosure(outerClasses2(1)))
- assert(isClosure(outerClasses3(1)))
- assert(outerClasses2(0) === outerClasses3(0)) // part of the same "test2" scope
- assert(outerClasses2(1) === outerClasses3(1)) // part of the same "FunSuite#test" scope
- assert(outerClasses2(2) === this.getClass)
- assert(outerClasses3(2) === this.getClass)
- assert(outerObjects2(2) === this)
- assert(outerObjects3(2) === this)
+ if (!scala.util.Properties.versionString.startsWith("version 2.12.")) {
+ // This closure references the "test2" scope because it needs to find the method `y`
+ // Scope hierarchy: "test2" < "FunSuite#test" < ClosureCleanerSuite2
+ assert(outerClasses2.size === 3)
+ // This closure references the "test2" scope because it needs to find the `localValue`
+ // defined outside of this scope
+ assert(outerClasses3.size === 3)
+ assert(isClosure(outerClasses2(0)))
+ assert(isClosure(outerClasses3(0)))
+ assert(isClosure(outerClasses2(1)))
+ assert(isClosure(outerClasses3(1)))
+ assert(outerClasses2(0) === outerClasses3(0)) // part of the same "test2" scope
+ assert(outerClasses2(1) === outerClasses3(1)) // part of the same "FunSuite#test" scope
+ assert(outerClasses2(2) === this.getClass)
+ assert(outerClasses3(2) === this.getClass)
+ assert(outerObjects2(2) === this)
+ assert(outerObjects3(2) === this)
+ }
}
test1()
@@ -254,6 +261,9 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri
}
test("find accessed fields") {
+ // Skip on Scala 2.12 for now, since we may not need to determine accessed fields anymore.
+ assume(!scala.util.Properties.versionString.startsWith("version 2.12."))
+
val localValue = someSerializableValue
val closure1 = () => 1
val closure2 = () => localValue
@@ -292,6 +302,9 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri
}
test("find accessed fields with nesting") {
+ // Skip on Scala 2.12 for now, since we may not need to determine accessed fields anymore.
+ assume(!scala.util.Properties.versionString.startsWith("version 2.12."))
+
val localValue = someSerializableValue
val test1 = () => {
@@ -538,13 +551,19 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri
// which is itself not serializable because it has a pointer to the ClosureCleanerSuite2.
// If we do not clean transitively, we will not null out this indirect reference.
verifyCleaning(
- inner2, serializableBefore = false, serializableAfter = false, transitive = false)
+ inner2,
+ serializableBefore = scala.util.Properties.versionString.startsWith("version 2.12."),
+ serializableAfter = scala.util.Properties.versionString.startsWith("version 2.12."),
+ transitive = false)
// If we clean transitively, we will find that method `a` does not actually reference the
// outer closure's parent (i.e. the ClosureCleanerSuite), so we can additionally null out
// the outer closure's parent pointer. This will make `inner2` serializable.
verifyCleaning(
- inner2, serializableBefore = false, serializableAfter = true, transitive = true)
+ inner2,
+ serializableBefore = scala.util.Properties.versionString.startsWith("version 2.12."),
+ serializableAfter = true,
+ transitive = true)
}
// Same as above, but with more levels of nesting
diff --git a/dev/change-scala-version.sh b/dev/change-scala-version.sh
index 6c4077c584..6437e879de 100755
--- a/dev/change-scala-version.sh
+++ b/dev/change-scala-version.sh
@@ -19,7 +19,7 @@
set -e
-VALID_VERSIONS=( 2.10 2.11 2.12 )
+VALID_VERSIONS=( 2.10 2.11 2.12.0-M4 )
usage() {
echo "Usage: $(basename $0) [-h|--help] <version>
diff --git a/pom.xml b/pom.xml
index c1174593c1..a6ea2e53ec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,8 +93,11 @@
<module>common/tags</module>
<module>core</module>
<module>graphx</module>
- <module>mllib</module>
- <module>mllib-local</module>
+ <!-- TODO 2.12: the ML and Kafka modules would typically be defined here, but because their
+ dependencies are not 2.12-ready we need to conditionally enable those modules via the
+ 2.11 and 2.10 build profiles -->
+ <!-- <module>mllib</module> -->
+ <!-- <module>mllib-local</module> -->
<module>tools</module>
<module>streaming</module>
<module>sql/catalyst</module>
@@ -2586,6 +2589,22 @@
</plugin>
</plugins>
</build>
+ <!-- TODO 2.12: remove these from here and always enable them in root POM once these modules'
+ dependencies are 2.12-ready -->
+ <modules>
+ <module>mllib</module>
+ <module>external/kafka</module>
+ <module>external/kafka-assembly</module>
+ </modules>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>${jline.groupid}</groupId>
+ <artifactId>jline</artifactId>
+ <version>${jline.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
</profile>
<profile>
@@ -2634,6 +2653,38 @@
</plugin>
</plugins>
</build>
+ <!-- TODO 2.12: remove these from here and always enable them in root POM once these modules'
+ dependencies are 2.12-ready -->
+ <modules>
+ <module>mllib</module>
+ <module>mllib-local</module>
+ <module>external/kafka</module>
+ <module>external/kafka-assembly</module>
+ </modules>
+ </profile>
+
+ <profile>
+ <id>scala-2.12</id>
+ <activation>
+ <property><name>scala-2.12</name></property>
+ </activation>
+ <properties>
+ <scala.version>2.12.0</scala.version>
+ <scala.binary.version>2.12.0</scala.binary.version>
+ <!-- This corresponds to https://github.com/twitter/chill/pull/253 -->
+ <chill.version>0.8.1-SNAPSHOT</chill.version>
+ <!-- This incorporates https://github.com/FasterXML/jackson-module-scala/pull/247 -->
+ <fasterxml.jackson.version>2.7.3-SNAPSHOT</fasterxml.jackson.version>
+ </properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.scala-lang.modules</groupId>
+ <artifactId>scala-parser-combinators_${scala.binary.version}</artifactId>
+ <version>1.0.4</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
</profile>
<!--
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index e52baf51ae..ef3cc787e2 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -98,6 +98,10 @@ object SparkBuild extends PomBuild {
// in the same way as Maven which handles -Dname as -Dname=true before executes build process.
// see: https://github.com/apache/maven/blob/maven-3.0.4/maven-embedder/src/main/java/org/apache/maven/cli/MavenCli.java#L1082
System.setProperty("scala-2.10", "true")
+ } else if (System.getProperty("scala-2.11") == "") {
+ System.setProperty("scala-2.11", "true")
+ } else if (System.getProperty("scala-2.12") == "") {
+ System.setProperty("scala-2.12", "true")
}
profiles
}
@@ -759,8 +763,10 @@ object TestSettings {
private val scalaBinaryVersion =
if (System.getProperty("scala-2.10") == "true") {
"2.10"
- } else {
+ } else if (System.getProperty("scala-2.11") == "true") {
"2.11"
+ } else {
+ "2.12"
}
lazy val settings = Seq (
diff --git a/repl/pom.xml b/repl/pom.xml
index a256ae3b84..8118e73a72 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -49,12 +49,14 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <!--
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
+ -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 765c92b8d3..a91fd22e93 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -100,6 +100,20 @@
<artifactId>commons-codec</artifactId>
</dependency>
</dependencies>
+ <profiles>
+ <profile>
+ <id>scala-2.12</id>
+ <activation>
+ <property><name>scala-2.12</name></property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.scala-lang.modules</groupId>
+ <artifactId>scala-parser-combinators_${scala.binary.version}</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 82710a2a18..85a21d7cdd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -687,6 +687,7 @@ object ScalaReflection extends ScalaReflection {
def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match {
case Schema(s: StructType, _) =>
s.toAttributes
+ case _ => throw new Exception("Expected schema!")
}
/** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala
index 41128fe389..ff124f7b63 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala
@@ -38,9 +38,13 @@ package object codegen {
}
}
+
+ // Commented out because the nsc import doesn't work with 2.12
+
/**
* Dumps the bytecode from a class to the screen using javap.
*/
+ /*
object DumpByteCode {
import scala.sys.process._
val dumpDirectory = Utils.createTempDir()
@@ -70,4 +74,5 @@ package object codegen {
// scalastyle:on println
}
}
+ */
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
index 3aa4bf619f..beed5d3e39 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
@@ -177,7 +177,8 @@ object Metadata {
private def toJsonValue(obj: Any): JValue = {
obj match {
case map: Map[_, _] =>
- val fields = map.toList.map { case (k: String, v) => (k, toJsonValue(v)) }
+ // TODO(josh): exhaustivity
+ val fields = map.toList.map { case (k, v) => (k.toString, toJsonValue(v)) }
JObject(fields)
case arr: Array[_] =>
val values = arr.toList.map(toJsonValue)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java b/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
index ec9c107b1c..f3bc7f58b5 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
@@ -53,9 +53,12 @@ public class typed {
*
* @since 2.0.0
*/
+ // TODO(josh): re-enable after SAM fix
+ /*
public static <T> TypedColumn<T, Long> count(MapFunction<T, Object> f) {
return new TypedCount<T>(f).toColumnJava();
}
+ */
/**
* Sum aggregate function for floating point (double) type.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index c6dcd93bbd..13b206ca84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2325,7 +2325,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def foreachPartition(func: ForeachPartitionFunction[T]): Unit =
- foreachPartition(it => func.call(it.asJava))
+ foreachPartition((it: Iterator[T]) => func.call(it.asJava))
/**
* Returns the first `n` rows in the Dataset.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
index aa789af6f8..0422eaf53e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
@@ -177,6 +177,7 @@ object AggUtils {
case agg @ AggregateExpression(aggregateFunction, mode, true, _) =>
aggregateFunction.transformDown(distinctColumnAttributeLookup)
.asInstanceOf[AggregateFunction]
+ case other => throw new MatchError(other)
}
val partialDistinctAggregate: SparkPlan = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
index 1dae5f6964..b3ee8f485c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
@@ -37,7 +37,7 @@ class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Dou
override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
// Java api support
- def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x).asInstanceOf[Double])
+ def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double])
def toColumnJava: TypedColumn[IN, java.lang.Double] = {
toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
@@ -55,7 +55,7 @@ class TypedSumLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] {
override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]()
// Java api support
- def this(f: MapFunction[IN, java.lang.Long]) = this(x => f.call(x).asInstanceOf[Long])
+ def this(f: MapFunction[IN, java.lang.Long]) = this((x: IN) => f.call(x).asInstanceOf[Long])
def toColumnJava: TypedColumn[IN, java.lang.Long] = {
toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
@@ -75,7 +75,8 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]()
// Java api support
- def this(f: MapFunction[IN, Object]) = this(x => f.call(x))
+ // TODO(josh): uncomment this definition / use shims for 2.12 SAM compatibility
+ // def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x))
def toColumnJava: TypedColumn[IN, java.lang.Long] = {
toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
}
@@ -94,7 +95,7 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
// Java api support
- def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x).asInstanceOf[Double])
+ def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double])
def toColumnJava: TypedColumn[IN, java.lang.Double] = {
toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
}
diff --git a/tools/pom.xml b/tools/pom.xml
index 938ba2f6ac..64fdc67712 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -44,7 +44,7 @@
<dependency>
<groupId>org.clapper</groupId>
<artifactId>classutil_${scala.binary.version}</artifactId>
- <version>1.0.6</version>
+ <version>1.0.8</version>
</dependency>
</dependencies>