diff options
14 files changed, 36 insertions, 137 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 9a091bf6d3..13caa54d06 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -300,12 +300,6 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.DirectKafkaInputDStream.maxMessagesPerPartition") ) ++ Seq( // [SPARK-13244][SQL] Migrates DataFrame to Dataset - ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.DataFrameHolder.apply"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.toDF"), - ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.DataFrameHolder.copy"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.copy$default$1"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.df$1"), - ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.DataFrameHolder.this"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.tables"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.sql"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.baseRelationToDataFrame"), @@ -315,6 +309,13 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrame"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrame$"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.LegacyFunctions"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrameHolder"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrameHolder$"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.localSeqToDataFrameHolder"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.stringRddToDataFrameHolder"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.rddToDataFrameHolder"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.longRddToDataFrameHolder"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.intRddToDataFrameHolder"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.GroupedDataset"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.evaluation.MultilabelMetrics.this"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala deleted file mode 100644 index 3b30337f1f..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql - -/** - * A container for a [[DataFrame]], used for implicit conversions. - * - * To use this, import implicit conversions in SQL: - * {{{ - * import sqlContext.implicits._ - * }}} - * - * @since 1.3.0 - */ -case class DataFrameHolder private[sql](private val df: DataFrame) { - - // This is declared with parentheses to prevent the Scala compiler from treating - // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. - def toDF(): DataFrame = df - - def toDF(colNames: String*): DataFrame = df.toDF(colNames : _*) -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 295cb67eb4..be0dfe7c33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2196,14 +2196,12 @@ class Dataset[T] private[sql]( def write: DataFrameWriter = new DataFrameWriter(toDF()) /** - * Returns the content of the [[Dataset]] as a [[Dataset]] of JSON strings. - * - * @group basic - * @since 1.6.0 + * Returns the content of the [[Dataset]] as a Dataset of JSON strings. + * @since 2.0.0 */ def toJSON: Dataset[String] = { val rowSchema = this.schema - val rdd = queryExecution.toRdd.mapPartitions { iter => + val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter => val writer = new CharArrayWriter() // create the Generator without separator inserted between 2 records val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) @@ -2225,8 +2223,8 @@ class Dataset[T] private[sql]( } } } - import sqlContext.implicits._ - rdd.toDS + import sqlContext.implicits.newStringEncoder + sqlContext.createDataset(rdd) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala b/sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala index 08097e9f02..47b81c17a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql /** - * A container for a [[Dataset]], used for implicit conversions. + * A container for a [[Dataset]], used for implicit conversions in Scala. * * To use this, import implicit conversions in SQL: * {{{ @@ -32,4 +32,10 @@ case class DatasetHolder[T] private[sql](private val ds: Dataset[T]) { // This is declared with parentheses to prevent the Scala compiler from treating // `rdd.toDS("1")` as invoking this toDS and then apply on the returned Dataset. def toDS(): Dataset[T] = ds + + // This is declared with parentheses to prevent the Scala compiler from treating + // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. + def toDF(): DataFrame = ds.toDF() + + def toDF(colNames: String*): DataFrame = ds.toDF(colNames : _*) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index e23d5e1261..fd814e0f28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -147,75 +147,4 @@ abstract class SQLImplicits { */ implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name) - /** - * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples). - * @since 1.3.0 - */ - implicit def rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A]): DataFrameHolder = { - DataFrameHolder(_sqlContext.createDataFrame(rdd)) - } - - /** - * Creates a DataFrame from a local Seq of Product. - * @since 1.3.0 - */ - implicit def localSeqToDataFrameHolder[A <: Product : TypeTag](data: Seq[A]): DataFrameHolder = - { - DataFrameHolder(_sqlContext.createDataFrame(data)) - } - - // Do NOT add more implicit conversions for primitive types. - // They are likely to break source compatibility by making existing implicit conversions - // ambiguous. In particular, RDD[Double] is dangerous because of [[DoubleRDDFunctions]]. - - /** - * Creates a single column DataFrame from an RDD[Int]. - * @since 1.3.0 - */ - implicit def intRddToDataFrameHolder(data: RDD[Int]): DataFrameHolder = { - val dataType = IntegerType - val rows = data.mapPartitions { iter => - val row = new SpecificMutableRow(dataType :: Nil) - iter.map { v => - row.setInt(0, v) - row: InternalRow - } - } - DataFrameHolder( - _sqlContext.internalCreateDataFrame(rows, StructType(StructField("_1", dataType) :: Nil))) - } - - /** - * Creates a single column DataFrame from an RDD[Long]. - * @since 1.3.0 - */ - implicit def longRddToDataFrameHolder(data: RDD[Long]): DataFrameHolder = { - val dataType = LongType - val rows = data.mapPartitions { iter => - val row = new SpecificMutableRow(dataType :: Nil) - iter.map { v => - row.setLong(0, v) - row: InternalRow - } - } - DataFrameHolder( - _sqlContext.internalCreateDataFrame(rows, StructType(StructField("_1", dataType) :: Nil))) - } - - /** - * Creates a single column DataFrame from an RDD[String]. - * @since 1.3.0 - */ - implicit def stringRddToDataFrameHolder(data: RDD[String]): DataFrameHolder = { - val dataType = StringType - val rows = data.mapPartitions { iter => - val row = new SpecificMutableRow(dataType :: Nil) - iter.map { v => - row.update(0, UTF8String.fromString(v)) - row: InternalRow - } - } - DataFrameHolder( - _sqlContext.internalCreateDataFrame(rows, StructType(StructField("_1", dataType) :: Nil))) - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 199e138abf..df2a287030 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -612,7 +612,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val longString = Array.fill(21)("1").mkString val df = sparkContext.parallelize(Seq("1", longString)).toDF() val expectedAnswerForFalse = """+---------------------+ - ||_1 | + ||value | |+---------------------+ ||1 | ||111111111111111111111| @@ -620,7 +620,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { |""".stripMargin assert(df.showString(10, false) === expectedAnswerForFalse) val expectedAnswerForTrue = """+--------------------+ - || _1| + || value| |+--------------------+ || 1| ||11111111111111111...| diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index b765fd8b66..9f2233d5d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1621,15 +1621,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-10215 Div of Decimal returns null") { - val d = Decimal(1.12321) + val d = Decimal(1.12321).toBigDecimal val df = Seq((d, 1)).toDF("a", "b") checkAnswer( df.selectExpr("b * a / b"), - Seq(Row(d.toBigDecimal))) + Seq(Row(d))) checkAnswer( df.selectExpr("b * a / b / b"), - Seq(Row(d.toBigDecimal))) + Seq(Row(d))) checkAnswer( df.selectExpr("b * a + b"), Seq(Row(BigDecimal(2.12321)))) @@ -1638,7 +1638,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Seq(Row(BigDecimal(0.12321)))) checkAnswer( df.selectExpr("b * a * b"), - Seq(Row(d.toBigDecimal))) + Seq(Row(d))) } test("precision smaller than scale") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala index 9f159d1e1e..9680f3a008 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode import org.apache.spark.sql.test.SharedSQLContext class ExchangeSuite extends SparkPlanTest with SharedSQLContext { - import testImplicits.localSeqToDataFrameHolder + import testImplicits._ test("shuffling UnsafeRows in exchange") { val input = (1 to 1000).map(Tuple1.apply) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala index cb6d68dc3a..778477660e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala @@ -30,7 +30,8 @@ import org.apache.spark.sql.types._ * sorted by a reference implementation ([[ReferenceSort]]). */ class SortSuite extends SparkPlanTest with SharedSQLContext { - import testImplicits.localSeqToDataFrameHolder + import testImplicits.newProductEncoder + import testImplicits.localSeqToDatasetHolder test("basic sorting using ExternalSort") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 0940878e38..9e04caf8ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -126,7 +126,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("decimal type") { // Casting is required here because ScalaReflection can't capture decimal precision information. val df = (1 to 10) - .map(i => Tuple1(Decimal(i, 15, 10))) + .map(i => Tuple1(Decimal(i, 15, 10).toJavaBigDecimal)) .toDF("dec") .select($"dec" cast DecimalType(15, 10)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index cf2681050e..3cb3ef1ffa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -29,7 +29,8 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StringType, StructType} class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { - import testImplicits.localSeqToDataFrameHolder + import testImplicits.newProductEncoder + import testImplicits.localSeqToDatasetHolder private lazy val myUpperCaseData = sqlContext.createDataFrame( sparkContext.parallelize(Seq( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index ab4047df1e..5fe85eaef2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -950,9 +950,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { assert(checkAddFileRDD.first()) } - case class LogEntry(filename: String, message: String) - case class LogFile(name: String) - createQueryTest("dynamic_partition", """ |DROP TABLE IF EXISTS dynamic_part_table; @@ -1249,3 +1246,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { // for SPARK-2180 test case class HavingRow(key: Int, value: String, attr: Int) + +case class LogEntry(filename: String, message: String) +case class LogFile(name: String) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 9667b53e48..6bedead91b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -729,7 +729,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("SPARK-5203 union with different decimal precision") { - Seq.empty[(Decimal, Decimal)] + Seq.empty[(java.math.BigDecimal, java.math.BigDecimal)] .toDF("d1", "d2") .select($"d1".cast(DecimalType(10, 5)).as("d")) .registerTempTable("dn") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index c395d361a1..cc412241fb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -79,7 +79,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } test("Read/write all types with non-primitive type") { - val data = (0 to 255).map { i => + val data: Seq[AllDataTypesWithNonPrimitiveType] = (0 to 255).map { i => AllDataTypesWithNonPrimitiveType( s"$i", i, i.toLong, i.toFloat, i.toDouble, i.toShort, i.toByte, i % 2 == 0, 0 until i, |