aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-04-14 23:29:23 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-04-14 23:29:23 -0700
commit1321c546df12a70a62b7e5fef8096aaed047b260 (patch)
tree87e46b8ff12a831b6c37567bfa79c641eb10b307
parentadfe8ff8b24497aee1375054f1630750d01b30a2 (diff)
downloadspark-1321c546df12a70a62b7e5fef8096aaed047b260.tar.gz
spark-1321c546df12a70a62b7e5fef8096aaed047b260.tar.bz2
spark-1321c546df12a70a62b7e5fef8096aaed047b260.zip
Minimum changes to compile REPL in SBT.
Good news: we were able to use the same source for 2.11 and 2.12!
-rw-r--r--assembly/pom.xml2
-rwxr-xr-xbin/spark-class1
-rw-r--r--pom.xml18
-rw-r--r--repl/pom.xml2
-rw-r--r--sql/catalyst/pom.xml14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala3
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala5
14 files changed, 53 insertions, 16 deletions
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 22cbac06ca..685446898a 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -43,11 +43,13 @@
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
+ <!--
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
+ -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
diff --git a/bin/spark-class b/bin/spark-class
index b2a36b9846..0952000e85 100755
--- a/bin/spark-class
+++ b/bin/spark-class
@@ -64,6 +64,7 @@ fi
# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
+echo "$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
diff --git a/pom.xml b/pom.xml
index fab34f521b..52da625eac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,7 +162,7 @@
<!-- managed up from 3.2.1 for SPARK-11652 -->
<commons.collections.version>3.2.2</commons.collections.version>
<scala.version>2.11.8</scala.version>
- <scala.binary.version>2.11</scala.binary.version>
+ <scala.binary.version>2.12.0-M4</scala.binary.version>
<jline.version>${scala.version}</jline.version>
<jline.groupid>org.scala-lang</jline.groupid>
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
@@ -612,11 +612,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
- <version>3.2.10</version>
- </dependency>
- <dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
<version>${jersey.version}</version>
@@ -2484,7 +2479,7 @@
</activation>
<properties>
<scala.version>2.10.6</scala.version>
- <scala.binary.version>2.10</scala.binary.version>
+ <scala.binary.version>2.11</scala.binary.version>
<jline.version>${scala.version}</jline.version>
<jline.groupid>org.scala-lang</jline.groupid>
</properties>
@@ -2560,6 +2555,15 @@
</snapshots>
</repository>
</repositories>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.scala-lang.modules</groupId>
+ <artifactId>scala-parser-combinators_${scala.binary.version}</artifactId>
+ <version>1.0.4</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
</profile>
<profile>
diff --git a/repl/pom.xml b/repl/pom.xml
index 0f396c9b80..4c63567288 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -50,12 +50,14 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <!--
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
+ -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 1748fa2778..7938416225 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -80,6 +80,20 @@
<artifactId>commons-codec</artifactId>
</dependency>
</dependencies>
+ <profiles>
+ <profile>
+ <id>scala-2.12</id>
+ <activation>
+ <property><name>scala-2.12</name></property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.scala-lang.modules</groupId>
+ <artifactId>scala-parser-combinators_${scala.binary.version}</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 4795fc2557..b6d5db1175 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -662,6 +662,7 @@ trait ScalaReflection {
def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match {
case Schema(s: StructType, _) =>
s.toAttributes
+ case _ => throw new Exception("Expected schema!")
}
/** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala
index 41128fe389..ff124f7b63 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala
@@ -38,9 +38,13 @@ package object codegen {
}
}
+
+ // Commented out because the nsc import doesn't work with 2.12
+
/**
* Dumps the bytecode from a class to the screen using javap.
*/
+ /*
object DumpByteCode {
import scala.sys.process._
val dumpDirectory = Utils.createTempDir()
@@ -70,4 +74,5 @@ package object codegen {
// scalastyle:on println
}
}
+ */
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 232ca43588..bc096f29b7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -615,7 +615,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
case o: Option[_] => o.map(parseToJson)
case t: Seq[_] => JArray(t.map(parseToJson).toList)
case m: Map[_, _] =>
- val fields = m.toList.map { case (k: String, v) => (k, parseToJson(v)) }
+ // TODO(josh): exhausivity
+ val fields = m.toList.map { case (k, v) => (k.toString, parseToJson(v)) }
JObject(fields)
case r: RDD[_] => JNothing
// if it's a scala object, we can simply keep the full class path.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
index 66f123682e..7278501cf8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
@@ -168,7 +168,8 @@ object Metadata {
private def toJsonValue(obj: Any): JValue = {
obj match {
case map: Map[_, _] =>
- val fields = map.toList.map { case (k: String, v) => (k, toJsonValue(v)) }
+ // TODO(josh): exhaustivity
+ val fields = map.toList.map { case (k, v) => (k.toString, toJsonValue(v)) }
JObject(fields)
case arr: Array[_] =>
val values = arr.toList.map(toJsonValue)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java b/sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java
index c7c6e3868f..d47816c191 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java
@@ -51,9 +51,12 @@ public class typed {
*
* @since 2.0.0
*/
+ // TODO(josh): re-enable after SAM fix
+ /*
public static <T> TypedColumn<T, Long> count(MapFunction<T, Object> f) {
return new TypedCount<T>(f).toColumnJava();
}
+ */
/**
* Sum aggregate function for floating point (double) type.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e216945fbe..5d2e32b418 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2041,7 +2041,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def foreachPartition(func: ForeachPartitionFunction[T]): Unit =
- foreachPartition(it => func.call(it.asJava))
+ foreachPartition((it: Iterator[T]) => func.call(it.asJava))
/**
* Returns the first `n` rows in the [[Dataset]].
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
index c39a78da6f..e147b56851 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
@@ -37,7 +37,7 @@ class TypedSumDouble[IN](f: IN => Double) extends Aggregator[IN, Double, Double]
override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
// Java api support
- def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x).asInstanceOf[Double])
+ def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double])
def toColumnJava: TypedColumn[IN, java.lang.Double] = {
toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
@@ -55,7 +55,7 @@ class TypedSumLong[IN](f: IN => Long) extends Aggregator[IN, Long, Long] {
override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]()
// Java api support
- def this(f: MapFunction[IN, java.lang.Long]) = this(x => f.call(x).asInstanceOf[Long])
+ def this(f: MapFunction[IN, java.lang.Long]) = this((x: IN) => f.call(x).asInstanceOf[Long])
def toColumnJava: TypedColumn[IN, java.lang.Long] = {
toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
@@ -75,7 +75,8 @@ class TypedCount[IN](f: IN => Any) extends Aggregator[IN, Long, Long] {
override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]()
// Java api support
- def this(f: MapFunction[IN, Object]) = this(x => f.call(x))
+ // TODO(josh): uncomment this definition / use shims for 2.12 SAM compatibility
+ // def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x))
def toColumnJava: TypedColumn[IN, java.lang.Long] = {
toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
}
@@ -94,7 +95,7 @@ class TypedAverage[IN](f: IN => Double) extends Aggregator[IN, (Double, Long), D
override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
// Java api support
- def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x).asInstanceOf[Double])
+ def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double])
def toColumnJava: TypedColumn[IN, java.lang.Double] = {
toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
index 4682949fa1..51b988c886 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
@@ -182,6 +182,7 @@ object Utils {
case agg @ AggregateExpression(aggregateFunction, mode, true, _) =>
aggregateFunction.transformDown(distinctColumnAttributeLookup)
.asInstanceOf[AggregateFunction]
+ case other => throw new MatchError(other)
}
val partialDistinctAggregate: SparkPlan = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 065c8572b0..2847104ad3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -276,9 +276,10 @@ object JdbcUtils extends Logging {
val rddSchema = df.schema
val getConnection: () => Connection = createConnectionFactory(url, properties)
val batchSize = properties.getProperty("batchsize", "1000").toInt
- df.foreachPartition { iterator =>
+ // TODO(josh): revert once applying SAM ambiguity fix
+ val func: Iterator[Row] => Unit = (iterator: Iterator[Row]) =>
savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
- }
+ df.foreachPartition(func)
}
}