diff options
author | Reynold Xin <rxin@databricks.com> | 2016-03-21 17:17:25 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-03-21 17:17:25 -0700 |
commit | b3e5af62a1c9e11556c4721164e6539d7ecce8e7 (patch) | |
tree | c87ffb5cc56c939dbe81791fdcdaa2bc4d6598b3 /sql/core/src/main | |
parent | 5e86e9262fc637dc58f487ae32e6d0a340b173ce (diff) | |
download | spark-b3e5af62a1c9e11556c4721164e6539d7ecce8e7.tar.gz spark-b3e5af62a1c9e11556c4721164e6539d7ecce8e7.tar.bz2 spark-b3e5af62a1c9e11556c4721164e6539d7ecce8e7.zip |
[SPARK-13898][SQL] Merge DatasetHolder and DataFrameHolder
## What changes were proposed in this pull request?
This patch merges DatasetHolder and DataFrameHolder. This makes more sense because DataFrame/Dataset are now one class.
In addition, fixed some minor issues with pull request #11732.
## How was this patch tested?
Updated existing unit tests that test these implicits.
Author: Reynold Xin <rxin@databricks.com>
Closes #11737 from rxin/SPARK-13898.
Diffstat (limited to 'sql/core/src/main')
4 files changed, 12 insertions, 116 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala deleted file mode 100644 index 3b30337f1f..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql - -/** - * A container for a [[DataFrame]], used for implicit conversions. - * - * To use this, import implicit conversions in SQL: - * {{{ - * import sqlContext.implicits._ - * }}} - * - * @since 1.3.0 - */ -case class DataFrameHolder private[sql](private val df: DataFrame) { - - // This is declared with parentheses to prevent the Scala compiler from treating - // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. - def toDF(): DataFrame = df - - def toDF(colNames: String*): DataFrame = df.toDF(colNames : _*) -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 295cb67eb4..be0dfe7c33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2196,14 +2196,12 @@ class Dataset[T] private[sql]( def write: DataFrameWriter = new DataFrameWriter(toDF()) /** - * Returns the content of the [[Dataset]] as a [[Dataset]] of JSON strings. - * - * @group basic - * @since 1.6.0 + * Returns the content of the [[Dataset]] as a Dataset of JSON strings. + * @since 2.0.0 */ def toJSON: Dataset[String] = { val rowSchema = this.schema - val rdd = queryExecution.toRdd.mapPartitions { iter => + val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter => val writer = new CharArrayWriter() // create the Generator without separator inserted between 2 records val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) @@ -2225,8 +2223,8 @@ class Dataset[T] private[sql]( } } } - import sqlContext.implicits._ - rdd.toDS + import sqlContext.implicits.newStringEncoder + sqlContext.createDataset(rdd) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala b/sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala index 08097e9f02..47b81c17a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql /** - * A container for a [[Dataset]], used for implicit conversions. + * A container for a [[Dataset]], used for implicit conversions in Scala. * * To use this, import implicit conversions in SQL: * {{{ @@ -32,4 +32,10 @@ case class DatasetHolder[T] private[sql](private val ds: Dataset[T]) { // This is declared with parentheses to prevent the Scala compiler from treating // `rdd.toDS("1")` as invoking this toDS and then apply on the returned Dataset. def toDS(): Dataset[T] = ds + + // This is declared with parentheses to prevent the Scala compiler from treating + // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. + def toDF(): DataFrame = ds.toDF() + + def toDF(colNames: String*): DataFrame = ds.toDF(colNames : _*) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index e23d5e1261..fd814e0f28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -147,75 +147,4 @@ abstract class SQLImplicits { */ implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name) - /** - * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples). - * @since 1.3.0 - */ - implicit def rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A]): DataFrameHolder = { - DataFrameHolder(_sqlContext.createDataFrame(rdd)) - } - - /** - * Creates a DataFrame from a local Seq of Product. - * @since 1.3.0 - */ - implicit def localSeqToDataFrameHolder[A <: Product : TypeTag](data: Seq[A]): DataFrameHolder = - { - DataFrameHolder(_sqlContext.createDataFrame(data)) - } - - // Do NOT add more implicit conversions for primitive types. - // They are likely to break source compatibility by making existing implicit conversions - // ambiguous. In particular, RDD[Double] is dangerous because of [[DoubleRDDFunctions]]. - - /** - * Creates a single column DataFrame from an RDD[Int]. - * @since 1.3.0 - */ - implicit def intRddToDataFrameHolder(data: RDD[Int]): DataFrameHolder = { - val dataType = IntegerType - val rows = data.mapPartitions { iter => - val row = new SpecificMutableRow(dataType :: Nil) - iter.map { v => - row.setInt(0, v) - row: InternalRow - } - } - DataFrameHolder( - _sqlContext.internalCreateDataFrame(rows, StructType(StructField("_1", dataType) :: Nil))) - } - - /** - * Creates a single column DataFrame from an RDD[Long]. - * @since 1.3.0 - */ - implicit def longRddToDataFrameHolder(data: RDD[Long]): DataFrameHolder = { - val dataType = LongType - val rows = data.mapPartitions { iter => - val row = new SpecificMutableRow(dataType :: Nil) - iter.map { v => - row.setLong(0, v) - row: InternalRow - } - } - DataFrameHolder( - _sqlContext.internalCreateDataFrame(rows, StructType(StructField("_1", dataType) :: Nil))) - } - - /** - * Creates a single column DataFrame from an RDD[String]. - * @since 1.3.0 - */ - implicit def stringRddToDataFrameHolder(data: RDD[String]): DataFrameHolder = { - val dataType = StringType - val rows = data.mapPartitions { iter => - val row = new SpecificMutableRow(dataType :: Nil) - iter.map { v => - row.update(0, UTF8String.fromString(v)) - row: InternalRow - } - } - DataFrameHolder( - _sqlContext.internalCreateDataFrame(rows, StructType(StructField("_1", dataType) :: Nil))) - } } |