diff options
author | Reynold Xin <rxin@databricks.com> | 2016-03-15 14:57:54 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-03-15 14:57:54 -0700 |
commit | 643649dcbfabc5d6952c2ecfb98286324c887665 (patch) | |
tree | 571751a4a87ba11fbd0e5a5c582987a2ca53b887 | |
parent | 41eaabf5935052fc69f1657368fa17529d18f84b (diff) | |
download | spark-643649dcbfabc5d6952c2ecfb98286324c887665.tar.gz spark-643649dcbfabc5d6952c2ecfb98286324c887665.tar.bz2 spark-643649dcbfabc5d6952c2ecfb98286324c887665.zip |
[SPARK-13895][SQL] DataFrameReader.text should return Dataset[String]
## What changes were proposed in this pull request?
This patch changes DataFrameReader.text()'s return type from DataFrame to Dataset[String].
Closes #11731.
## How was this patch tested?
Updated existing integration tests to reflect the change.
Author: Reynold Xin <rxin@databricks.com>
Closes #11739 from rxin/SPARK-13895.
3 files changed, 16 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 57c978bec8..ef85f1db89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -399,8 +399,10 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { } /** - * Loads a text file and returns a [[DataFrame]] with a single string column named "value". - * Each line in the text file is a new row in the resulting DataFrame. For example: + * Loads a text file and returns a [[Dataset]] of String. The underlying schema of the Dataset + * contains a single string column named "value". + * + * Each line in the text file is a new row in the resulting Dataset. For example: * {{{ * // Scala: * sqlContext.read.text("/path/to/spark/README.md") @@ -410,10 +412,12 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * }}} * * @param paths input path - * @since 1.6.0 + * @since 2.0.0 */ @scala.annotation.varargs - def text(paths: String*): DataFrame = format("text").load(paths : _*) + def text(paths: String*): Dataset[String] = { + format("text").load(paths : _*).as[String](sqlContext.implicits.newStringEncoder) + } /////////////////////////////////////////////////////////////////////////////////////// // Builder pattern config options diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index 42554720ed..7fe17e0cf7 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -316,14 +316,14 @@ public class JavaDataFrameSuite { @Test public void testTextLoad() { - Dataset<Row> df1 = context.read().text( + Dataset<String> ds1 = context.read().text( Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString()); - Assert.assertEquals(4L, df1.count()); + Assert.assertEquals(4L, ds1.count()); - Dataset<Row> df2 = context.read().text( + Dataset<String> ds2 = context.read().text( Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString(), Thread.currentThread().getContextClassLoader().getResource("text-suite2.txt").toString()); - Assert.assertEquals(5L, df2.count()); + Assert.assertEquals(5L, ds2.count()); } @Test diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index ee398721c0..47330f1db3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -37,7 +37,7 @@ class TextSuite extends QueryTest with SharedSQLContext { } test("SQLContext.read.text() API") { - verifyFrame(sqlContext.read.text(testFile)) + verifyFrame(sqlContext.read.text(testFile).toDF()) } test("SPARK-12562 verify write.text() can handle column name beyond `value`") { @@ -46,7 +46,7 @@ class TextSuite extends QueryTest with SharedSQLContext { val tempFile = Utils.createTempDir() tempFile.delete() df.write.text(tempFile.getCanonicalPath) - verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath)) + verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath).toDF()) Utils.deleteRecursively(tempFile) } @@ -75,7 +75,7 @@ class TextSuite extends QueryTest with SharedSQLContext { testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath) val compressedFiles = new File(tempDirPath).listFiles() assert(compressedFiles.exists(_.getName.endsWith(s".txt$extension"))) - verifyFrame(sqlContext.read.text(tempDirPath)) + verifyFrame(sqlContext.read.text(tempDirPath).toDF()) } val errMsg = intercept[IllegalArgumentException] { @@ -103,7 +103,7 @@ class TextSuite extends QueryTest with SharedSQLContext { testDf.write.option("compression", "none").mode(SaveMode.Overwrite).text(tempDirPath) val compressedFiles = new File(tempDirPath).listFiles() assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz"))) - verifyFrame(sqlContext.read.text(tempDirPath)) + verifyFrame(sqlContext.read.text(tempDirPath).toDF()) } finally { // Hadoop 1 doesn't have `Configuration.unset` hadoopConfiguration.clear() |