From 643649dcbfabc5d6952c2ecfb98286324c887665 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 15 Mar 2016 14:57:54 -0700 Subject: [SPARK-13895][SQL] DataFrameReader.text should return Dataset[String] ## What changes were proposed in this pull request? This patch changes DataFrameReader.text()'s return type from DataFrame to Dataset[String]. Closes #11731. ## How was this patch tested? Updated existing integration tests to reflect the change. Author: Reynold Xin Closes #11739 from rxin/SPARK-13895. --- .../main/scala/org/apache/spark/sql/DataFrameReader.scala | 12 ++++++++---- .../java/test/org/apache/spark/sql/JavaDataFrameSuite.java | 8 ++++---- .../spark/sql/execution/datasources/text/TextSuite.scala | 8 ++++---- 3 files changed, 16 insertions(+), 12 deletions(-) (limited to 'sql/core/src') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 57c978bec8..ef85f1db89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -399,8 +399,10 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { } /** - * Loads a text file and returns a [[DataFrame]] with a single string column named "value". - * Each line in the text file is a new row in the resulting DataFrame. For example: + * Loads a text file and returns a [[Dataset]] of String. The underlying schema of the Dataset + * contains a single string column named "value". + * + * Each line in the text file is a new row in the resulting Dataset. For example: * {{{ * // Scala: * sqlContext.read.text("/path/to/spark/README.md") @@ -410,10 +412,12 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * }}} * * @param paths input path - * @since 1.6.0 + * @since 2.0.0 */ @scala.annotation.varargs - def text(paths: String*): DataFrame = format("text").load(paths : _*) + def text(paths: String*): Dataset[String] = { + format("text").load(paths : _*).as[String](sqlContext.implicits.newStringEncoder) + } /////////////////////////////////////////////////////////////////////////////////////// // Builder pattern config options diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index 42554720ed..7fe17e0cf7 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -316,14 +316,14 @@ public class JavaDataFrameSuite { @Test public void testTextLoad() { - Dataset df1 = context.read().text( + Dataset ds1 = context.read().text( Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString()); - Assert.assertEquals(4L, df1.count()); + Assert.assertEquals(4L, ds1.count()); - Dataset df2 = context.read().text( + Dataset ds2 = context.read().text( Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString(), Thread.currentThread().getContextClassLoader().getResource("text-suite2.txt").toString()); - Assert.assertEquals(5L, df2.count()); + Assert.assertEquals(5L, ds2.count()); } @Test diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index ee398721c0..47330f1db3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -37,7 +37,7 @@ class TextSuite extends QueryTest with SharedSQLContext { } test("SQLContext.read.text() API") { - verifyFrame(sqlContext.read.text(testFile)) + verifyFrame(sqlContext.read.text(testFile).toDF()) } test("SPARK-12562 verify write.text() can handle column name beyond `value`") { @@ -46,7 +46,7 @@ class TextSuite extends QueryTest with SharedSQLContext { val tempFile = Utils.createTempDir() tempFile.delete() df.write.text(tempFile.getCanonicalPath) - verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath)) + verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath).toDF()) Utils.deleteRecursively(tempFile) } @@ -75,7 +75,7 @@ class TextSuite extends QueryTest with SharedSQLContext { testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath) val compressedFiles = new File(tempDirPath).listFiles() assert(compressedFiles.exists(_.getName.endsWith(s".txt$extension"))) - verifyFrame(sqlContext.read.text(tempDirPath)) + verifyFrame(sqlContext.read.text(tempDirPath).toDF()) } val errMsg = intercept[IllegalArgumentException] { @@ -103,7 +103,7 @@ class TextSuite extends QueryTest with SharedSQLContext { testDf.write.option("compression", "none").mode(SaveMode.Overwrite).text(tempDirPath) val compressedFiles = new File(tempDirPath).listFiles() assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz"))) - verifyFrame(sqlContext.read.text(tempDirPath)) + verifyFrame(sqlContext.read.text(tempDirPath).toDF()) } finally { // Hadoop 1 doesn't have `Configuration.unset` hadoopConfiguration.clear() -- cgit v1.2.3