diff options
author | Josh Rosen <joshrosen@databricks.com> | 2016-04-14 23:29:23 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2016-04-14 23:29:23 -0700 |
commit | 1321c546df12a70a62b7e5fef8096aaed047b260 (patch) | |
tree | 87e46b8ff12a831b6c37567bfa79c641eb10b307 | |
parent | adfe8ff8b24497aee1375054f1630750d01b30a2 (diff) | |
download | spark-1321c546df12a70a62b7e5fef8096aaed047b260.tar.gz spark-1321c546df12a70a62b7e5fef8096aaed047b260.tar.bz2 spark-1321c546df12a70a62b7e5fef8096aaed047b260.zip |
Minimum changes to compile REPL in SBT.
Good news: we were able to use the same source for 2.11 and 2.12!
14 files changed, 53 insertions, 16 deletions
diff --git a/assembly/pom.xml b/assembly/pom.xml index 22cbac06ca..685446898a 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -43,11 +43,13 @@ <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> + <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> + --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> diff --git a/bin/spark-class b/bin/spark-class index b2a36b9846..0952000e85 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -64,6 +64,7 @@ fi # The launcher library will print arguments separated by a NULL character, to allow arguments with # characters that would be otherwise interpreted by the shell. Read that in a while loop, populating # an array that will be used to exec the final command. +echo "$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" CMD=() while IFS= read -d '' -r ARG; do CMD+=("$ARG") @@ -162,7 +162,7 @@ <!-- managed up from 3.2.1 for SPARK-11652 --> <commons.collections.version>3.2.2</commons.collections.version> <scala.version>2.11.8</scala.version> - <scala.binary.version>2.11</scala.binary.version> + <scala.binary.version>2.12.0-M4</scala.binary.version> <jline.version>${scala.version}</jline.version> <jline.groupid>org.scala-lang</jline.groupid> <codehaus.jackson.version>1.9.13</codehaus.jackson.version> @@ -612,11 +612,6 @@ </exclusions> </dependency> <dependency> - <groupId>org.json4s</groupId> - <artifactId>json4s-jackson_${scala.binary.version}</artifactId> - <version>3.2.10</version> - </dependency> - <dependency> <groupId>com.sun.jersey</groupId> <artifactId>jersey-json</artifactId> <version>${jersey.version}</version> @@ -2484,7 +2479,7 @@ </activation> <properties> <scala.version>2.10.6</scala.version> - <scala.binary.version>2.10</scala.binary.version> + <scala.binary.version>2.11</scala.binary.version> <jline.version>${scala.version}</jline.version> <jline.groupid>org.scala-lang</jline.groupid> </properties> @@ -2560,6 +2555,15 @@ </snapshots> </repository> </repositories> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.scala-lang.modules</groupId> + <artifactId>scala-parser-combinators_${scala.binary.version}</artifactId> + <version>1.0.4</version> + </dependency> + </dependencies> + </dependencyManagement> </profile> <profile> diff --git a/repl/pom.xml b/repl/pom.xml index 0f396c9b80..4c63567288 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -50,12 +50,14 @@ <type>test-jar</type> <scope>test</scope> </dependency> + <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>runtime</scope> </dependency> + --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 1748fa2778..7938416225 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -80,6 +80,20 @@ <artifactId>commons-codec</artifactId> </dependency> </dependencies> + <profiles> + <profile> + <id>scala-2.12</id> + <activation> + <property><name>scala-2.12</name></property> + </activation> + <dependencies> + <dependency> + <groupId>org.scala-lang.modules</groupId> + <artifactId>scala-parser-combinators_${scala.binary.version}</artifactId> + </dependency> + </dependencies> + </profile> + </profiles> <build> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 4795fc2557..b6d5db1175 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -662,6 +662,7 @@ trait ScalaReflection { def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match { case Schema(s: StructType, _) => s.toAttributes + case _ => throw new Exception("Expected schema!") } /** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala index 41128fe389..ff124f7b63 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala @@ -38,9 +38,13 @@ package object codegen { } } + + // Commented out because the nsc import doesn't work with 2.12 + /** * Dumps the bytecode from a class to the screen using javap. */ + /* object DumpByteCode { import scala.sys.process._ val dumpDirectory = Utils.createTempDir() @@ -70,4 +74,5 @@ package object codegen { // scalastyle:on println } } + */ } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 232ca43588..bc096f29b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -615,7 +615,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { case o: Option[_] => o.map(parseToJson) case t: Seq[_] => JArray(t.map(parseToJson).toList) case m: Map[_, _] => - val fields = m.toList.map { case (k: String, v) => (k, parseToJson(v)) } + // TODO(josh): exhausivity + val fields = m.toList.map { case (k, v) => (k.toString, parseToJson(v)) } JObject(fields) case r: RDD[_] => JNothing // if it's a scala object, we can simply keep the full class path. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala index 66f123682e..7278501cf8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala @@ -168,7 +168,8 @@ object Metadata { private def toJsonValue(obj: Any): JValue = { obj match { case map: Map[_, _] => - val fields = map.toList.map { case (k: String, v) => (k, toJsonValue(v)) } + // TODO(josh): exhaustivity + val fields = map.toList.map { case (k, v) => (k.toString, toJsonValue(v)) } JObject(fields) case arr: Array[_] => val values = arr.toList.map(toJsonValue) diff --git a/sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java b/sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java index c7c6e3868f..d47816c191 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java +++ b/sql/core/src/main/java/org/apache/spark/sql/expressions/java/typed.java @@ -51,9 +51,12 @@ public class typed { * * @since 2.0.0 */ + // TODO(josh): re-enable after SAM fix + /* public static <T> TypedColumn<T, Long> count(MapFunction<T, Object> f) { return new TypedCount<T>(f).toColumnJava(); } + */ /** * Sum aggregate function for floating point (double) type. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index e216945fbe..5d2e32b418 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2041,7 +2041,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def foreachPartition(func: ForeachPartitionFunction[T]): Unit = - foreachPartition(it => func.call(it.asJava)) + foreachPartition((it: Iterator[T]) => func.call(it.asJava)) /** * Returns the first `n` rows in the [[Dataset]]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala index c39a78da6f..e147b56851 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala @@ -37,7 +37,7 @@ class TypedSumDouble[IN](f: IN => Double) extends Aggregator[IN, Double, Double] override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]() // Java api support - def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x).asInstanceOf[Double]) + def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double]) def toColumnJava: TypedColumn[IN, java.lang.Double] = { toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]] @@ -55,7 +55,7 @@ class TypedSumLong[IN](f: IN => Long) extends Aggregator[IN, Long, Long] { override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]() // Java api support - def this(f: MapFunction[IN, java.lang.Long]) = this(x => f.call(x).asInstanceOf[Long]) + def this(f: MapFunction[IN, java.lang.Long]) = this((x: IN) => f.call(x).asInstanceOf[Long]) def toColumnJava: TypedColumn[IN, java.lang.Long] = { toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]] @@ -75,7 +75,8 @@ class TypedCount[IN](f: IN => Any) extends Aggregator[IN, Long, Long] { override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]() // Java api support - def this(f: MapFunction[IN, Object]) = this(x => f.call(x)) + // TODO(josh): uncomment this definition / use shims for 2.12 SAM compatibility + // def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x)) def toColumnJava: TypedColumn[IN, java.lang.Long] = { toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]] } @@ -94,7 +95,7 @@ class TypedAverage[IN](f: IN => Double) extends Aggregator[IN, (Double, Long), D override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]() // Java api support - def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x).asInstanceOf[Double]) + def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double]) def toColumnJava: TypedColumn[IN, java.lang.Double] = { toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala index 4682949fa1..51b988c886 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala @@ -182,6 +182,7 @@ object Utils { case agg @ AggregateExpression(aggregateFunction, mode, true, _) => aggregateFunction.transformDown(distinctColumnAttributeLookup) .asInstanceOf[AggregateFunction] + case other => throw new MatchError(other) } val partialDistinctAggregate: SparkPlan = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 065c8572b0..2847104ad3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -276,9 +276,10 @@ object JdbcUtils extends Logging { val rddSchema = df.schema val getConnection: () => Connection = createConnectionFactory(url, properties) val batchSize = properties.getProperty("batchsize", "1000").toInt - df.foreachPartition { iterator => + // TODO(josh): revert once applying SAM ambiguity fix + val func: Iterator[Row] => Unit = (iterator: Iterator[Row]) => savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect) - } + df.foreachPartition(func) } } |