From 1321c546df12a70a62b7e5fef8096aaed047b260 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 14 Apr 2016 23:29:23 -0700 Subject: Minimum changes to compile REPL in SBT. Good news: we were able to use the same source for 2.11 and 2.12! --- assembly/pom.xml | 2 ++ bin/spark-class | 1 + pom.xml | 18 +++++++++++------- repl/pom.xml | 2 ++ sql/catalyst/pom.xml | 14 ++++++++++++++ .../apache/spark/sql/catalyst/ScalaReflection.scala | 1 + .../sql/catalyst/expressions/codegen/package.scala | 5 +++++ .../org/apache/spark/sql/catalyst/trees/TreeNode.scala | 3 ++- .../scala/org/apache/spark/sql/types/Metadata.scala | 3 ++- .../org/apache/spark/sql/expressions/java/typed.java | 3 +++ .../src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../sql/execution/aggregate/typedaggregators.scala | 9 +++++---- .../apache/spark/sql/execution/aggregate/utils.scala | 1 + .../sql/execution/datasources/jdbc/JdbcUtils.scala | 5 +++-- 14 files changed, 53 insertions(+), 16 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index 22cbac06ca..685446898a 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -43,11 +43,13 @@ spark-core_${scala.binary.version} ${project.version} + org.apache.spark spark-streaming_${scala.binary.version} diff --git a/bin/spark-class b/bin/spark-class index b2a36b9846..0952000e85 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -64,6 +64,7 @@ fi # The launcher library will print arguments separated by a NULL character, to allow arguments with # characters that would be otherwise interpreted by the shell. Read that in a while loop, populating # an array that will be used to exec the final command. +echo "$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" CMD=() while IFS= read -d '' -r ARG; do CMD+=("$ARG") diff --git a/pom.xml b/pom.xml index fab34f521b..52da625eac 100644 --- a/pom.xml +++ b/pom.xml @@ -162,7 +162,7 @@ 3.2.2 2.11.8 - 2.11 + 2.12.0-M4 ${scala.version} org.scala-lang 1.9.13 @@ -611,11 +611,6 @@ - - org.json4s - json4s-jackson_${scala.binary.version} - 3.2.10 - com.sun.jersey jersey-json @@ -2484,7 +2479,7 @@ 2.10.6 - 2.10 + 2.11 ${scala.version} org.scala-lang @@ -2560,6 +2555,15 @@ + + + + org.scala-lang.modules + scala-parser-combinators_${scala.binary.version} + 1.0.4 + + + diff --git a/repl/pom.xml b/repl/pom.xml index 0f396c9b80..4c63567288 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -50,12 +50,14 @@ test-jar test + org.apache.spark spark-sql_${scala.binary.version} diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 1748fa2778..7938416225 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -80,6 +80,20 @@ commons-codec + + + scala-2.12 + + scala-2.12 + + + + org.scala-lang.modules + scala-parser-combinators_${scala.binary.version} + + + + target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 4795fc2557..b6d5db1175 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -662,6 +662,7 @@ trait ScalaReflection { def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match { case Schema(s: StructType, _) => s.toAttributes + case _ => throw new Exception("Expected schema!") } /** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala index 41128fe389..ff124f7b63 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala @@ -38,9 +38,13 @@ package object codegen { } } + + // Commented out because the nsc import doesn't work with 2.12 + /** * Dumps the bytecode from a class to the screen using javap. */ + /* object DumpByteCode { import scala.sys.process._ val dumpDirectory = Utils.createTempDir() @@ -70,4 +74,5 @@ package object codegen { // scalastyle:on println } } + */ } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 232ca43588..bc096f29b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -615,7 +615,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { case o: Option[_] => o.map(parseToJson) case t: Seq[_] => JArray(t.map(parseToJson).toList) case m: Map[_, _] => - val fields = m.toList.map { case (k: String, v) => (k, parseToJson(v)) } + // TODO(josh): exhausivity + val fields = m.toList.map { case (k, v) => (k.toString, parseToJson(v)) } JObject(fields) case r: RDD[_] => JNothing // if it's a scala object, we can simply keep the full class path. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala index 66f123682e..7278501cf8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala @@ -168,7 +168,8 @@ object Metadata { private def toJsonValue(obj: Any): JValue = { obj match { case map: Map[_, _] => - val fields = map.toList.map { case (k: String, v) => (k, toJsonValue(v)) } + // TODO(josh): exhaustivity + val fields = map.toList.map { case (k, v) => (k.toString, toJsonValue(v)) } JObject(fields) case arr: Array[_] => val values = arr.toList.map(toJsonValue) diff --git a/sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java b/sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java index c7c6e3868f..d47816c191 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java +++ b/sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java @@ -51,9 +51,12 @@ public class typed { * * @since 2.0.0 */ + // TODO(josh): re-enable after SAM fix + /* public static TypedColumn count(MapFunction f) { return new TypedCount(f).toColumnJava(); } + */ /** * Sum aggregate function for floating point (double) type. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index e216945fbe..5d2e32b418 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2041,7 +2041,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def foreachPartition(func: ForeachPartitionFunction[T]): Unit = - foreachPartition(it => func.call(it.asJava)) + foreachPartition((it: Iterator[T]) => func.call(it.asJava)) /** * Returns the first `n` rows in the [[Dataset]]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala index c39a78da6f..e147b56851 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala @@ -37,7 +37,7 @@ class TypedSumDouble[IN](f: IN => Double) extends Aggregator[IN, Double, Double] override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]() // Java api support - def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x).asInstanceOf[Double]) + def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double]) def toColumnJava: TypedColumn[IN, java.lang.Double] = { toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]] @@ -55,7 +55,7 @@ class TypedSumLong[IN](f: IN => Long) extends Aggregator[IN, Long, Long] { override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]() // Java api support - def this(f: MapFunction[IN, java.lang.Long]) = this(x => f.call(x).asInstanceOf[Long]) + def this(f: MapFunction[IN, java.lang.Long]) = this((x: IN) => f.call(x).asInstanceOf[Long]) def toColumnJava: TypedColumn[IN, java.lang.Long] = { toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]] @@ -75,7 +75,8 @@ class TypedCount[IN](f: IN => Any) extends Aggregator[IN, Long, Long] { override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]() // Java api support - def this(f: MapFunction[IN, Object]) = this(x => f.call(x)) + // TODO(josh): uncomment this definition / use shims for 2.12 SAM compatibility + // def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x)) def toColumnJava: TypedColumn[IN, java.lang.Long] = { toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]] } @@ -94,7 +95,7 @@ class TypedAverage[IN](f: IN => Double) extends Aggregator[IN, (Double, Long), D override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]() // Java api support - def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x).asInstanceOf[Double]) + def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double]) def toColumnJava: TypedColumn[IN, java.lang.Double] = { toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala index 4682949fa1..51b988c886 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala @@ -182,6 +182,7 @@ object Utils { case agg @ AggregateExpression(aggregateFunction, mode, true, _) => aggregateFunction.transformDown(distinctColumnAttributeLookup) .asInstanceOf[AggregateFunction] + case other => throw new MatchError(other) } val partialDistinctAggregate: SparkPlan = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 065c8572b0..2847104ad3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -276,9 +276,10 @@ object JdbcUtils extends Logging { val rddSchema = df.schema val getConnection: () => Connection = createConnectionFactory(url, properties) val batchSize = properties.getProperty("batchsize", "1000").toInt - df.foreachPartition { iterator => + // TODO(josh): revert once applying SAM ambiguity fix + val func: Iterator[Row] => Unit = (iterator: Iterator[Row]) => savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect) - } + df.foreachPartition(func) } } -- cgit v1.2.3