diff options
-rw-r--r-- | core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java | 2 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/FutureAction.scala | 10 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/TaskContext.scala | 16 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/TaskContextImpl.scala | 5 | ||||
-rw-r--r-- | core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java | 6 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala | 7 | ||||
-rwxr-xr-x | dev/change-scala-version.sh | 8 | ||||
-rw-r--r-- | pom.xml | 72 | ||||
-rw-r--r-- | project/SparkBuild.scala | 32 | ||||
-rw-r--r-- | sql/core/pom.xml | 2 | ||||
-rw-r--r-- | tools/pom.xml | 2 |
11 files changed, 120 insertions, 42 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index ef79b49083..dc9a8db9c5 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -142,7 +142,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { // Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at // the end of the task. This is necessary to avoid memory leaks in when the downstream operator // does not fully consume the sorter's output (e.g. sort followed by limit). - taskContext.addTaskCompletionListener( + taskContext.addJavaFriendlyTaskCompletionListener( new TaskCompletionListener() { @Override public void onTaskCompletion(TaskContext context) { diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 339266a5d4..882d2b21cf 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -100,6 +100,16 @@ trait FutureAction[T] extends Future[T] { */ def jobIds: Seq[Int] + // TODO(josh): We probably need to provide implementations for this. + // scalastyle:off + def transform[S]( + f: scala.util.Try[T] => scala.util.Try[S])( + implicit executor: scala.concurrent.ExecutionContext): scala.concurrent.Future[S] = ??? + + def transformWith[S]( + f: scala.util.Try[T] => scala.concurrent.Future[S])( + implicit executor: scala.concurrent.ExecutionContext): scala.concurrent.Future[S] = ??? + // scalastyle:on } diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 757c1b5116..5b2fca4b2d 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -103,6 +103,10 @@ abstract class TaskContext extends Serializable { @deprecated("Local execution was removed, so this always returns false", "2.0.0") def isRunningLocally(): Boolean + // TODO(josh): this used to be an overload of addTaskCompletionListener(), but the overload + // became ambiguous under Scala 2.12. For now, I'm renaming this in order to get the code to + // compile, but we need to figure out a long-term solution which maintains at least source + // compatibility (and probably binary compatibility) for Java callers. /** * Adds a (Java friendly) listener to be executed on task completion. * This will be called in all situation - success, failure, or cancellation. @@ -110,7 +114,7 @@ abstract class TaskContext extends Serializable { * * Exceptions thrown by the listener will result in failure of the task. */ - def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext + def addJavaFriendlyTaskCompletionListener(listener: TaskCompletionListener): TaskContext /** * Adds a listener in the form of a Scala closure to be executed on task completion. @@ -120,23 +124,27 @@ abstract class TaskContext extends Serializable { * Exceptions thrown by the listener will result in failure of the task. */ def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = { - addTaskCompletionListener(new TaskCompletionListener { + addJavaFriendlyTaskCompletionListener(new TaskCompletionListener { override def onTaskCompletion(context: TaskContext): Unit = f(context) }) } + // TODO(josh): this used to be an overload of addTaskFailureListener(), but the overload + // became ambiguous under Scala 2.12. For now, I'm renaming this in order to get the code to + // compile, but we need to figure out a long-term solution which maintains at least source + // compatibility (and probably binary compatibility) for Java callers. /** * Adds a listener to be executed on task failure. * Operations defined here must be idempotent, as `onTaskFailure` can be called multiple times. */ - def addTaskFailureListener(listener: TaskFailureListener): TaskContext + def addJavaFriendlyTaskFailureListener(listener: TaskFailureListener): TaskContext /** * Adds a listener to be executed on task failure. * Operations defined here must be idempotent, as `onTaskFailure` can be called multiple times. */ def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext = { - addTaskFailureListener(new TaskFailureListener { + addJavaFriendlyTaskFailureListener(new TaskFailureListener { override def onTaskFailure(context: TaskContext, error: Throwable): Unit = f(context, error) }) } diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index fa0b2d3d28..8b407f9771 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -60,12 +60,13 @@ private[spark] class TaskContextImpl( // Whether the task has failed. @volatile private var failed: Boolean = false - override def addTaskCompletionListener(listener: TaskCompletionListener): this.type = { + override def addJavaFriendlyTaskCompletionListener( + listener: TaskCompletionListener): this.type = { onCompleteCallbacks += listener this } - override def addTaskFailureListener(listener: TaskFailureListener): this.type = { + override def addJavaFriendlyTaskFailureListener(listener: TaskFailureListener): this.type = { onFailureCallbacks += listener this } diff --git a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java index 94f5805853..ee8fd81cee 100644 --- a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java +++ b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java @@ -32,8 +32,8 @@ public class JavaTaskContextCompileCheck { tc.isCompleted(); tc.isInterrupted(); - tc.addTaskCompletionListener(new JavaTaskCompletionListenerImpl()); - tc.addTaskFailureListener(new JavaTaskFailureListenerImpl()); + tc.addJavaFriendlyTaskCompletionListener(new JavaTaskCompletionListenerImpl()); + tc.addJavaFriendlyTaskFailureListener(new JavaTaskFailureListenerImpl()); tc.attemptNumber(); tc.partitionId(); @@ -52,7 +52,7 @@ public class JavaTaskContextCompileCheck { context.isInterrupted(); context.stageId(); context.partitionId(); - context.addTaskCompletionListener(this); + context.addJavaFriendlyTaskCompletionListener(this); } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index 86911d2211..5ca0c6419d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -75,7 +75,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val rdd = new RDD[String](sc, List()) { override def getPartitions = Array[Partition](StubPartition(0)) override def compute(split: Partition, context: TaskContext) = { - context.addTaskFailureListener((context, error) => TaskContextSuite.lastError = error) + context.addTaskFailureListener( + (context, error) => TaskContextSuite.lastError = error) sys.error("damn error") } } @@ -94,7 +95,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val context = TaskContext.empty() val listener = mock(classOf[TaskCompletionListener]) context.addTaskCompletionListener(_ => throw new Exception("blah")) - context.addTaskCompletionListener(listener) + context.addJavaFriendlyTaskCompletionListener(listener) context.addTaskCompletionListener(_ => throw new Exception("blah")) intercept[TaskCompletionListenerException] { @@ -108,7 +109,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val context = TaskContext.empty() val listener = mock(classOf[TaskFailureListener]) context.addTaskFailureListener((_, _) => throw new Exception("exception in listener1")) - context.addTaskFailureListener(listener) + context.addJavaFriendlyTaskFailureListener(listener) context.addTaskFailureListener((_, _) => throw new Exception("exception in listener3")) val e = intercept[TaskCompletionListenerException] { diff --git a/dev/change-scala-version.sh b/dev/change-scala-version.sh index d7975dfb64..6c4077c584 100755 --- a/dev/change-scala-version.sh +++ b/dev/change-scala-version.sh @@ -19,7 +19,7 @@ set -e -VALID_VERSIONS=( 2.10 2.11 ) +VALID_VERSIONS=( 2.10 2.11 2.12 ) usage() { echo "Usage: $(basename $0) [-h|--help] <version> @@ -44,11 +44,7 @@ check_scala_version() { check_scala_version "$TO_VERSION" -if [ $TO_VERSION = "2.11" ]; then - FROM_VERSION="2.10" -else - FROM_VERSION="2.11" -fi +FROM_VERSION=$(grep -m1 -o '<scala.binary.version>.*</scala.binary.version>' pom.xml | grep -o -e '2[^<]\+') sed_i() { sed -e "$1" "$2" > "$2.tmp" && mv "$2.tmp" "$2" @@ -93,8 +93,11 @@ <module>common/tags</module> <module>core</module> <module>graphx</module> - <module>mllib</module> - <module>mllib-local</module> + <!-- Note: the ML and Kafka modules would typically be defined here, but because their + dependencies are not 2.12-ready we need to conditionally enable those modules via the + 2.11 and 2.10 build profiles --> + <!-- <module>mllib</module> --> + <!-- <module>mllib-local</module> --> <module>tools</module> <module>streaming</module> <module>sql/catalyst</module> @@ -108,8 +111,8 @@ <module>examples</module> <module>repl</module> <module>launcher</module> - <module>external/kafka</module> - <module>external/kafka-assembly</module> + <!-- <module>external/kafka</module> --> + <!-- <module>external/kafka-assembly</module> --> </modules> <properties> @@ -164,6 +167,7 @@ <jline.groupid>org.scala-lang</jline.groupid> <codehaus.jackson.version>1.9.13</codehaus.jackson.version> <fasterxml.jackson.version>2.5.3</fasterxml.jackson.version> + <fasterxml.jackson.scala.version>2.5.3</fasterxml.jackson.scala.version> <snappy.version>1.1.2.4</snappy.version> <netlib.java.version>1.1.2</netlib.java.version> <calcite.version>1.2.0-incubating</calcite.version> @@ -180,6 +184,7 @@ <jsr305.version>1.3.9</jsr305.version> <libthrift.version>0.9.2</libthrift.version> <antlr4.version>4.5.2-1</antlr4.version> + <json4s.version>3.2.2</json4s.version> <test.java.home>${java.home}</test.java.home> <test.exclude.tags></test.exclude.tags> @@ -564,7 +569,7 @@ <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_${scala.binary.version}</artifactId> - <version>${fasterxml.jackson.version}</version> + <version>${fasterxml.jackson.scala.version}</version> <exclusions> <exclusion> <groupId>com.google.guava</groupId> @@ -573,6 +578,11 @@ </exclusions> </dependency> <dependency> + <groupId>org.json4s</groupId> + <artifactId>json4s-jackson_${scala.binary.version}</artifactId> + <version>${json4s.version}</version> + </dependency> + <dependency> <groupId>com.sun.jersey</groupId> <artifactId>jersey-server</artifactId> <version>${jersey.version}</version> @@ -2478,6 +2488,14 @@ <jline.version>${scala.version}</jline.version> <jline.groupid>org.scala-lang</jline.groupid> </properties> + <!-- TODO: remove these from here and always enable them in root POM once these modules' + dependencies are 2.12-ready --> + <modules> + <module>mllib</module> + <module>mllib-local</module> + <module>external/kafka</module> + <module>external/kafka-assembly</module> + </modules> <dependencyManagement> <dependencies> <dependency> @@ -2490,23 +2508,53 @@ </profile> <profile> - <id>test-java-home</id> + <id>scala-2.11</id> + <!-- Note: this profile has no activation because it only serves to document the build's + defaults, which use Scala 2.11. The properties here should be kept in sync with the + build's default settings, which are defined at the top of this root POM. --> + <!-- UPDATE: for now, we need to alwass explicitl pass -Dscala-2.11 for 2.11 builds --> <activation> - <property><name>env.JAVA_HOME</name></property> + <property><name>scala-2.11</name></property> </activation> <properties> - <test.java.home>${env.JAVA_HOME}</test.java.home> + <scala.version>2.11.7</scala.version> + <scala.binary.version>2.11</scala.binary.version> + </properties> + <!-- TODO: remove these from here and always enable them in root POM once these modules' + dependencies are 2.12-ready --> + <modules> + <module>mllib</module> + <module>mllib-local</module> + <module>external/kafka</module> + <module>external/kafka-assembly</module> + </modules> + </profile> + + <profile> + <id>scala-2.12</id> + <activation> + <property><name>scala-2.12</name></property> + </activation> + <properties> + <scala.version>2.12.0-M4</scala.version> + <scala.binary.version>2.12.0-M4</scala.binary.version> + <!-- This corresponds to https://github.com/twitter/chill/pull/253 --> + <chill.version>0.8.1-SNAPSHOT</chill.version> + <!-- This corresponds to https://github.com/FasterXML/jackson-module-scala/pull/247 --> + <fasterxml.jackson.scala.version>2.7.3-SNAPSHOT</fasterxml.jackson.scala.version> + <fasterxml.jackson.version>2.7.0</fasterxml.jackson.version> + <!-- This corresponds to https://github.com/JoshRosen/json4s/tree/scala-2.12.0-m4 --> + <json4s.version>3.4.0-SNAPSHOT</json4s.version> </properties> </profile> <profile> - <id>scala-2.11</id> + <id>test-java-home</id> <activation> - <property><name>!scala-2.10</name></property> + <property><name>env.JAVA_HOME</name></property> </activation> <properties> - <scala.version>2.11.8</scala.version> - <scala.binary.version>2.11</scala.binary.version> + <test.java.home>${env.JAVA_HOME}</test.java.home> </properties> </profile> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a58dd7e7f1..cb85c63382 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -127,6 +127,8 @@ object SparkBuild extends PomBuild { // in the same way as Maven which handles -Dname as -Dname=true before executes build process. // see: https://github.com/apache/maven/blob/maven-3.0.4/maven-embedder/src/main/java/org/apache/maven/cli/MavenCli.java#L1082 System.setProperty("scala-2.10", "true") + } else if (System.getProperty("scala-2.12") == "") { + System.setProperty("scala-2.12", "true") } profiles } @@ -142,10 +144,17 @@ object SparkBuild extends PomBuild { lazy val MavenCompile = config("m2r") extend(Compile) lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy") - lazy val sparkGenjavadocSettings: Seq[sbt.Def.Setting[_]] = Seq( - libraryDependencies += compilerPlugin( - "org.spark-project" %% "genjavadoc-plugin" % unidocGenjavadocVersion.value cross CrossVersion.full), - scalacOptions <+= target.map(t => "-P:genjavadoc:out=" + (t / "java"))) + lazy val sparkGenjavadocSettings: Seq[sbt.Def.Setting[_]] = { + if (System.getProperty("scala-2.12") != null) { + // TODO: re-enable once our custom genjavadoc is published for 2.12 (see SPARK-14511). + Seq.empty + } else { + Seq( + libraryDependencies += compilerPlugin( + "org.spark-project" %% "genjavadoc-plugin" % unidocGenjavadocVersion.value cross CrossVersion.full), + scalacOptions <+= target.map(t => "-P:genjavadoc:out=" + (t / "java"))) + } + } lazy val sharedSettings = sparkGenjavadocSettings ++ Seq ( exportJars in Compile := true, @@ -251,11 +260,16 @@ object SparkBuild extends PomBuild { /* Enable tests settings for all projects except examples, assembly and tools */ (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings)) - val mimaProjects = allProjects.filterNot { x => - Seq( - spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn, - unsafe, testTags, sketch, mllibLocal - ).contains(x) + // TODO: remove this conditional after Spark publishes with 2.12 support: + lazy val mimaProjects = if (System.getProperty("scala-2.12") == null) { + allProjects.filterNot { x => + Seq( + spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn, + unsafe, testTags, sketch, mllibLocal + ).contains(x) + } + } else { + Seq.empty } mimaProjects.foreach { x => diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 8b1017042c..e1071ebfb5 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -44,7 +44,7 @@ </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-sketch_2.11</artifactId> + <artifactId>spark-sketch_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> diff --git a/tools/pom.xml b/tools/pom.xml index 9bb20e1381..9b94b6d823 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -45,7 +45,7 @@ <dependency> <groupId>org.clapper</groupId> <artifactId>classutil_${scala.binary.version}</artifactId> - <version>1.0.6</version> + <version>1.0.8</version> </dependency> </dependencies> |