aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java2
-rw-r--r--core/src/main/scala/org/apache/spark/FutureAction.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala5
-rw-r--r--core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala7
-rwxr-xr-xdev/change-scala-version.sh8
-rw-r--r--pom.xml72
-rw-r--r--project/SparkBuild.scala32
-rw-r--r--sql/core/pom.xml2
-rw-r--r--tools/pom.xml2
11 files changed, 120 insertions, 42 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index ef79b49083..dc9a8db9c5 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -142,7 +142,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the sorter's output (e.g. sort followed by limit).
- taskContext.addTaskCompletionListener(
+ taskContext.addJavaFriendlyTaskCompletionListener(
new TaskCompletionListener() {
@Override
public void onTaskCompletion(TaskContext context) {
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 339266a5d4..882d2b21cf 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -100,6 +100,16 @@ trait FutureAction[T] extends Future[T] {
*/
def jobIds: Seq[Int]
+ // TODO(josh): We probably need to provide implementations for this.
+ // scalastyle:off
+ def transform[S](
+ f: scala.util.Try[T] => scala.util.Try[S])(
+ implicit executor: scala.concurrent.ExecutionContext): scala.concurrent.Future[S] = ???
+
+ def transformWith[S](
+ f: scala.util.Try[T] => scala.concurrent.Future[S])(
+ implicit executor: scala.concurrent.ExecutionContext): scala.concurrent.Future[S] = ???
+ // scalastyle:on
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 757c1b5116..5b2fca4b2d 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -103,6 +103,10 @@ abstract class TaskContext extends Serializable {
@deprecated("Local execution was removed, so this always returns false", "2.0.0")
def isRunningLocally(): Boolean
+ // TODO(josh): this used to be an overload of addTaskCompletionListener(), but the overload
+ // became ambiguous under Scala 2.12. For now, I'm renaming this in order to get the code to
+ // compile, but we need to figure out a long-term solution which maintains at least source
+ // compatibility (and probably binary compatibility) for Java callers.
/**
* Adds a (Java friendly) listener to be executed on task completion.
* This will be called in all situation - success, failure, or cancellation.
@@ -110,7 +114,7 @@ abstract class TaskContext extends Serializable {
*
* Exceptions thrown by the listener will result in failure of the task.
*/
- def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext
+ def addJavaFriendlyTaskCompletionListener(listener: TaskCompletionListener): TaskContext
/**
* Adds a listener in the form of a Scala closure to be executed on task completion.
@@ -120,23 +124,27 @@ abstract class TaskContext extends Serializable {
* Exceptions thrown by the listener will result in failure of the task.
*/
def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = {
- addTaskCompletionListener(new TaskCompletionListener {
+ addJavaFriendlyTaskCompletionListener(new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = f(context)
})
}
+ // TODO(josh): this used to be an overload of addTaskFailureListener(), but the overload
+ // became ambiguous under Scala 2.12. For now, I'm renaming this in order to get the code to
+ // compile, but we need to figure out a long-term solution which maintains at least source
+ // compatibility (and probably binary compatibility) for Java callers.
/**
* Adds a listener to be executed on task failure.
* Operations defined here must be idempotent, as `onTaskFailure` can be called multiple times.
*/
- def addTaskFailureListener(listener: TaskFailureListener): TaskContext
+ def addJavaFriendlyTaskFailureListener(listener: TaskFailureListener): TaskContext
/**
* Adds a listener to be executed on task failure.
* Operations defined here must be idempotent, as `onTaskFailure` can be called multiple times.
*/
def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext = {
- addTaskFailureListener(new TaskFailureListener {
+ addJavaFriendlyTaskFailureListener(new TaskFailureListener {
override def onTaskFailure(context: TaskContext, error: Throwable): Unit = f(context, error)
})
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index fa0b2d3d28..8b407f9771 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -60,12 +60,13 @@ private[spark] class TaskContextImpl(
// Whether the task has failed.
@volatile private var failed: Boolean = false
- override def addTaskCompletionListener(listener: TaskCompletionListener): this.type = {
+ override def addJavaFriendlyTaskCompletionListener(
+ listener: TaskCompletionListener): this.type = {
onCompleteCallbacks += listener
this
}
- override def addTaskFailureListener(listener: TaskFailureListener): this.type = {
+ override def addJavaFriendlyTaskFailureListener(listener: TaskFailureListener): this.type = {
onFailureCallbacks += listener
this
}
diff --git a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
index 94f5805853..ee8fd81cee 100644
--- a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
+++ b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
@@ -32,8 +32,8 @@ public class JavaTaskContextCompileCheck {
tc.isCompleted();
tc.isInterrupted();
- tc.addTaskCompletionListener(new JavaTaskCompletionListenerImpl());
- tc.addTaskFailureListener(new JavaTaskFailureListenerImpl());
+ tc.addJavaFriendlyTaskCompletionListener(new JavaTaskCompletionListenerImpl());
+ tc.addJavaFriendlyTaskFailureListener(new JavaTaskFailureListenerImpl());
tc.attemptNumber();
tc.partitionId();
@@ -52,7 +52,7 @@ public class JavaTaskContextCompileCheck {
context.isInterrupted();
context.stageId();
context.partitionId();
- context.addTaskCompletionListener(this);
+ context.addJavaFriendlyTaskCompletionListener(this);
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 86911d2211..5ca0c6419d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -75,7 +75,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val rdd = new RDD[String](sc, List()) {
override def getPartitions = Array[Partition](StubPartition(0))
override def compute(split: Partition, context: TaskContext) = {
- context.addTaskFailureListener((context, error) => TaskContextSuite.lastError = error)
+ context.addTaskFailureListener(
+ (context, error) => TaskContextSuite.lastError = error)
sys.error("damn error")
}
}
@@ -94,7 +95,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val context = TaskContext.empty()
val listener = mock(classOf[TaskCompletionListener])
context.addTaskCompletionListener(_ => throw new Exception("blah"))
- context.addTaskCompletionListener(listener)
+ context.addJavaFriendlyTaskCompletionListener(listener)
context.addTaskCompletionListener(_ => throw new Exception("blah"))
intercept[TaskCompletionListenerException] {
@@ -108,7 +109,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val context = TaskContext.empty()
val listener = mock(classOf[TaskFailureListener])
context.addTaskFailureListener((_, _) => throw new Exception("exception in listener1"))
- context.addTaskFailureListener(listener)
+ context.addJavaFriendlyTaskFailureListener(listener)
context.addTaskFailureListener((_, _) => throw new Exception("exception in listener3"))
val e = intercept[TaskCompletionListenerException] {
diff --git a/dev/change-scala-version.sh b/dev/change-scala-version.sh
index d7975dfb64..6c4077c584 100755
--- a/dev/change-scala-version.sh
+++ b/dev/change-scala-version.sh
@@ -19,7 +19,7 @@
set -e
-VALID_VERSIONS=( 2.10 2.11 )
+VALID_VERSIONS=( 2.10 2.11 2.12 )
usage() {
echo "Usage: $(basename $0) [-h|--help] <version>
@@ -44,11 +44,7 @@ check_scala_version() {
check_scala_version "$TO_VERSION"
-if [ $TO_VERSION = "2.11" ]; then
- FROM_VERSION="2.10"
-else
- FROM_VERSION="2.11"
-fi
+FROM_VERSION=$(grep -m1 -o '<scala.binary.version>.*</scala.binary.version>' pom.xml | grep -o -e '2[^<]\+')
sed_i() {
sed -e "$1" "$2" > "$2.tmp" && mv "$2.tmp" "$2"
diff --git a/pom.xml b/pom.xml
index a772d51337..cf17fe788c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,8 +93,11 @@
<module>common/tags</module>
<module>core</module>
<module>graphx</module>
- <module>mllib</module>
- <module>mllib-local</module>
+ <!-- Note: the ML and Kafka modules would typically be defined here, but because their
+ dependencies are not 2.12-ready we need to conditionally enable those modules via the
+ 2.11 and 2.10 build profiles -->
+ <!-- <module>mllib</module> -->
+ <!-- <module>mllib-local</module> -->
<module>tools</module>
<module>streaming</module>
<module>sql/catalyst</module>
@@ -108,8 +111,8 @@
<module>examples</module>
<module>repl</module>
<module>launcher</module>
- <module>external/kafka</module>
- <module>external/kafka-assembly</module>
+ <!-- <module>external/kafka</module> -->
+ <!-- <module>external/kafka-assembly</module> -->
</modules>
<properties>
@@ -164,6 +167,7 @@
<jline.groupid>org.scala-lang</jline.groupid>
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
<fasterxml.jackson.version>2.5.3</fasterxml.jackson.version>
+ <fasterxml.jackson.scala.version>2.5.3</fasterxml.jackson.scala.version>
<snappy.version>1.1.2.4</snappy.version>
<netlib.java.version>1.1.2</netlib.java.version>
<calcite.version>1.2.0-incubating</calcite.version>
@@ -180,6 +184,7 @@
<jsr305.version>1.3.9</jsr305.version>
<libthrift.version>0.9.2</libthrift.version>
<antlr4.version>4.5.2-1</antlr4.version>
+ <json4s.version>3.2.2</json4s.version>
<test.java.home>${java.home}</test.java.home>
<test.exclude.tags></test.exclude.tags>
@@ -564,7 +569,7 @@
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
- <version>${fasterxml.jackson.version}</version>
+ <version>${fasterxml.jackson.scala.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
@@ -573,6 +578,11 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.json4s</groupId>
+ <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
+ <version>${json4s.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>${jersey.version}</version>
@@ -2478,6 +2488,14 @@
<jline.version>${scala.version}</jline.version>
<jline.groupid>org.scala-lang</jline.groupid>
</properties>
+ <!-- TODO: remove these from here and always enable them in root POM once these modules'
+ dependencies are 2.12-ready -->
+ <modules>
+ <module>mllib</module>
+ <module>mllib-local</module>
+ <module>external/kafka</module>
+ <module>external/kafka-assembly</module>
+ </modules>
<dependencyManagement>
<dependencies>
<dependency>
@@ -2490,23 +2508,53 @@
</profile>
<profile>
- <id>test-java-home</id>
+ <id>scala-2.11</id>
+ <!-- Note: this profile has no activation because it only serves to document the build's
+ defaults, which use Scala 2.11. The properties here should be kept in sync with the
+ build's default settings, which are defined at the top of this root POM. -->
+ <!-- UPDATE: for now, we need to alwass explicitl pass -Dscala-2.11 for 2.11 builds -->
<activation>
- <property><name>env.JAVA_HOME</name></property>
+ <property><name>scala-2.11</name></property>
</activation>
<properties>
- <test.java.home>${env.JAVA_HOME}</test.java.home>
+ <scala.version>2.11.7</scala.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ </properties>
+ <!-- TODO: remove these from here and always enable them in root POM once these modules'
+ dependencies are 2.12-ready -->
+ <modules>
+ <module>mllib</module>
+ <module>mllib-local</module>
+ <module>external/kafka</module>
+ <module>external/kafka-assembly</module>
+ </modules>
+ </profile>
+
+ <profile>
+ <id>scala-2.12</id>
+ <activation>
+ <property><name>scala-2.12</name></property>
+ </activation>
+ <properties>
+ <scala.version>2.12.0-M4</scala.version>
+ <scala.binary.version>2.12.0-M4</scala.binary.version>
+ <!-- This corresponds to https://github.com/twitter/chill/pull/253 -->
+ <chill.version>0.8.1-SNAPSHOT</chill.version>
+ <!-- This corresponds to https://github.com/FasterXML/jackson-module-scala/pull/247 -->
+ <fasterxml.jackson.scala.version>2.7.3-SNAPSHOT</fasterxml.jackson.scala.version>
+ <fasterxml.jackson.version>2.7.0</fasterxml.jackson.version>
+ <!-- This corresponds to https://github.com/JoshRosen/json4s/tree/scala-2.12.0-m4 -->
+ <json4s.version>3.4.0-SNAPSHOT</json4s.version>
</properties>
</profile>
<profile>
- <id>scala-2.11</id>
+ <id>test-java-home</id>
<activation>
- <property><name>!scala-2.10</name></property>
+ <property><name>env.JAVA_HOME</name></property>
</activation>
<properties>
- <scala.version>2.11.8</scala.version>
- <scala.binary.version>2.11</scala.binary.version>
+ <test.java.home>${env.JAVA_HOME}</test.java.home>
</properties>
</profile>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index a58dd7e7f1..cb85c63382 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -127,6 +127,8 @@ object SparkBuild extends PomBuild {
// in the same way as Maven which handles -Dname as -Dname=true before executes build process.
// see: https://github.com/apache/maven/blob/maven-3.0.4/maven-embedder/src/main/java/org/apache/maven/cli/MavenCli.java#L1082
System.setProperty("scala-2.10", "true")
+ } else if (System.getProperty("scala-2.12") == "") {
+ System.setProperty("scala-2.12", "true")
}
profiles
}
@@ -142,10 +144,17 @@ object SparkBuild extends PomBuild {
lazy val MavenCompile = config("m2r") extend(Compile)
lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy")
- lazy val sparkGenjavadocSettings: Seq[sbt.Def.Setting[_]] = Seq(
- libraryDependencies += compilerPlugin(
- "org.spark-project" %% "genjavadoc-plugin" % unidocGenjavadocVersion.value cross CrossVersion.full),
- scalacOptions <+= target.map(t => "-P:genjavadoc:out=" + (t / "java")))
+ lazy val sparkGenjavadocSettings: Seq[sbt.Def.Setting[_]] = {
+ if (System.getProperty("scala-2.12") != null) {
+ // TODO: re-enable once our custom genjavadoc is published for 2.12 (see SPARK-14511).
+ Seq.empty
+ } else {
+ Seq(
+ libraryDependencies += compilerPlugin(
+ "org.spark-project" %% "genjavadoc-plugin" % unidocGenjavadocVersion.value cross CrossVersion.full),
+ scalacOptions <+= target.map(t => "-P:genjavadoc:out=" + (t / "java")))
+ }
+ }
lazy val sharedSettings = sparkGenjavadocSettings ++ Seq (
exportJars in Compile := true,
@@ -251,11 +260,16 @@ object SparkBuild extends PomBuild {
/* Enable tests settings for all projects except examples, assembly and tools */
(allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings))
- val mimaProjects = allProjects.filterNot { x =>
- Seq(
- spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn,
- unsafe, testTags, sketch, mllibLocal
- ).contains(x)
+ // TODO: remove this conditional after Spark publishes with 2.12 support:
+ lazy val mimaProjects = if (System.getProperty("scala-2.12") == null) {
+ allProjects.filterNot { x =>
+ Seq(
+ spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn,
+ unsafe, testTags, sketch, mllibLocal
+ ).contains(x)
+ }
+ } else {
+ Seq.empty
}
mimaProjects.foreach { x =>
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 8b1017042c..e1071ebfb5 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -44,7 +44,7 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sketch_2.11</artifactId>
+ <artifactId>spark-sketch_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
diff --git a/tools/pom.xml b/tools/pom.xml
index 9bb20e1381..9b94b6d823 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -45,7 +45,7 @@
<dependency>
<groupId>org.clapper</groupId>
<artifactId>classutil_${scala.binary.version}</artifactId>
- <version>1.0.6</version>
+ <version>1.0.8</version>
</dependency>
</dependencies>