aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-03-15 14:57:54 -0700
committerYin Huai <yhuai@databricks.com>2016-03-15 14:57:54 -0700
commit643649dcbfabc5d6952c2ecfb98286324c887665 (patch)
tree571751a4a87ba11fbd0e5a5c582987a2ca53b887 /sql/core/src
parent41eaabf5935052fc69f1657368fa17529d18f84b (diff)
downloadspark-643649dcbfabc5d6952c2ecfb98286324c887665.tar.gz
spark-643649dcbfabc5d6952c2ecfb98286324c887665.tar.bz2
spark-643649dcbfabc5d6952c2ecfb98286324c887665.zip
[SPARK-13895][SQL] DataFrameReader.text should return Dataset[String]
## What changes were proposed in this pull request? This patch changes DataFrameReader.text()'s return type from DataFrame to Dataset[String]. Closes #11731. ## How was this patch tested? Updated existing integration tests to reflect the change. Author: Reynold Xin <rxin@databricks.com> Closes #11739 from rxin/SPARK-13895.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala12
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala8
3 files changed, 16 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 57c978bec8..ef85f1db89 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -399,8 +399,10 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
}
/**
- * Loads a text file and returns a [[DataFrame]] with a single string column named "value".
- * Each line in the text file is a new row in the resulting DataFrame. For example:
+ * Loads a text file and returns a [[Dataset]] of String. The underlying schema of the Dataset
+ * contains a single string column named "value".
+ *
+ * Each line in the text file is a new row in the resulting Dataset. For example:
* {{{
* // Scala:
* sqlContext.read.text("/path/to/spark/README.md")
@@ -410,10 +412,12 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* }}}
*
* @param paths input path
- * @since 1.6.0
+ * @since 2.0.0
*/
@scala.annotation.varargs
- def text(paths: String*): DataFrame = format("text").load(paths : _*)
+ def text(paths: String*): Dataset[String] = {
+ format("text").load(paths : _*).as[String](sqlContext.implicits.newStringEncoder)
+ }
///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 42554720ed..7fe17e0cf7 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -316,14 +316,14 @@ public class JavaDataFrameSuite {
@Test
public void testTextLoad() {
- Dataset<Row> df1 = context.read().text(
+ Dataset<String> ds1 = context.read().text(
Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString());
- Assert.assertEquals(4L, df1.count());
+ Assert.assertEquals(4L, ds1.count());
- Dataset<Row> df2 = context.read().text(
+ Dataset<String> ds2 = context.read().text(
Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString(),
Thread.currentThread().getContextClassLoader().getResource("text-suite2.txt").toString());
- Assert.assertEquals(5L, df2.count());
+ Assert.assertEquals(5L, ds2.count());
}
@Test
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index ee398721c0..47330f1db3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -37,7 +37,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
}
test("SQLContext.read.text() API") {
- verifyFrame(sqlContext.read.text(testFile))
+ verifyFrame(sqlContext.read.text(testFile).toDF())
}
test("SPARK-12562 verify write.text() can handle column name beyond `value`") {
@@ -46,7 +46,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
val tempFile = Utils.createTempDir()
tempFile.delete()
df.write.text(tempFile.getCanonicalPath)
- verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath))
+ verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath).toDF())
Utils.deleteRecursively(tempFile)
}
@@ -75,7 +75,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath)
val compressedFiles = new File(tempDirPath).listFiles()
assert(compressedFiles.exists(_.getName.endsWith(s".txt$extension")))
- verifyFrame(sqlContext.read.text(tempDirPath))
+ verifyFrame(sqlContext.read.text(tempDirPath).toDF())
}
val errMsg = intercept[IllegalArgumentException] {
@@ -103,7 +103,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
testDf.write.option("compression", "none").mode(SaveMode.Overwrite).text(tempDirPath)
val compressedFiles = new File(tempDirPath).listFiles()
assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz")))
- verifyFrame(sqlContext.read.text(tempDirPath))
+ verifyFrame(sqlContext.read.text(tempDirPath).toDF())
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()