aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-06-12 21:36:41 -0700
committerReynold Xin <rxin@databricks.com>2016-06-12 21:36:41 -0700
commite2ab79d5ea00af45c083cc9a6607d2f0905f9908 (patch)
tree750a843ae6ddba4abc3cf592a26960fb6de19189
parent1f8f2b5c2a33e63367ea4881b5918f6bc0a6f52f (diff)
downloadspark-e2ab79d5ea00af45c083cc9a6607d2f0905f9908.tar.gz
spark-e2ab79d5ea00af45c083cc9a6607d2f0905f9908.tar.bz2
spark-e2ab79d5ea00af45c083cc9a6607d2f0905f9908.zip
[SPARK-15898][SQL] DataFrameReader.text should return DataFrame
## What changes were proposed in this pull request? We want to maintain API compatibility for DataFrameReader.text, and will introduce a new API called DataFrameReader.textFile which returns Dataset[String]. affected PRs: https://github.com/apache/spark/pull/11731 https://github.com/apache/spark/pull/13104 https://github.com/apache/spark/pull/13184 ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #13604 from cloud-fan/revert.
-rw-r--r--R/pkg/R/SQLContext.R7
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaPageRank.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaWordCount.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala2
-rw-r--r--python/pyspark/sql/readwriter.py8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala31
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala20
15 files changed, 54 insertions, 36 deletions
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 584bbbf0e4..e7e9e353f9 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -364,9 +364,10 @@ parquetFile <- function(x, ...) {
#' Create a SparkDataFrame from a text file.
#'
-#' Loads a text file and returns a SparkDataFrame with a single string column named "value".
-#' If the directory structure of the text files contains partitioning information, those are
-#' ignored in the resulting DataFrame.
+#' Loads text files and returns a SparkDataFrame whose schema starts with
+#' a string column named "value", and followed by partitioned columns if
+#' there are any.
+#'
#' Each line in the text file is a new row in the resulting SparkDataFrame.
#'
#' @param path Path of file to read. A vector of multiple paths is allowed.
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
index ded442096c..362bd4435e 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
@@ -126,7 +126,7 @@ public final class JavaHdfsLR {
.appName("JavaHdfsLR")
.getOrCreate();
- JavaRDD<String> lines = spark.read().text(args[0]).javaRDD();
+ JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
int ITERATIONS = Integer.parseInt(args[1]);
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index 128b5ab17c..ed0bb87657 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -82,7 +82,7 @@ public final class JavaPageRank {
// URL neighbor URL
// URL neighbor URL
// ...
- JavaRDD<String> lines = spark.read().text(args[0]).javaRDD();
+ JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
// Loads all URLs from input file and initialize their neighbors.
JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index 1caee60e34..8f18604c07 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -46,7 +46,7 @@ public final class JavaWordCount {
.appName("JavaWordCount")
.getOrCreate();
- JavaRDD<String> lines = spark.read().text(args[0]).javaRDD();
+ JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
index 7f568f4e0d..739558e81f 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
@@ -87,7 +87,7 @@ public class JavaALSExample {
// $example on$
JavaRDD<Rating> ratingsRDD = spark
- .read().text("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
+ .read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
.map(new Function<String, Rating>() {
public Rating call(String str) {
return Rating.parseRating(str);
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
index 55e591d0ce..e512979ac7 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
@@ -59,7 +59,7 @@ public class JavaSparkSQL {
System.out.println("=== Data source: RDD ===");
// Load a text file and convert each line to a Java Bean.
String file = "examples/src/main/resources/people.txt";
- JavaRDD<Person> people = spark.read().text(file).javaRDD().map(
+ JavaRDD<Person> people = spark.read().textFile(file).javaRDD().map(
new Function<String, Person>() {
@Override
public Person call(String line) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index 84f133e011..05ac6cbcb3 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -72,7 +72,7 @@ object SparkHdfsLR {
.getOrCreate()
val inputPath = args(0)
- val lines = spark.read.text(inputPath).rdd
+ val lines = spark.read.textFile(inputPath).rdd
val points = lines.map(parsePoint).cache()
val ITERATIONS = args(1).toInt
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
index aa93c93c44..fec3160e9f 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
@@ -71,7 +71,7 @@ object SparkKMeans {
.appName("SparkKMeans")
.getOrCreate()
- val lines = spark.read.text(args(0)).rdd
+ val lines = spark.read.textFile(args(0)).rdd
val data = lines.map(parseVector _).cache()
val K = args(1).toInt
val convergeDist = args(2).toDouble
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
index b7c363c7d4..d0b874c48d 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
@@ -56,7 +56,7 @@ object SparkPageRank {
.getOrCreate()
val iters = if (args.length > 1) args(1).toInt else 10
- val lines = spark.read.text(args(0)).rdd
+ val lines = spark.read.textFile(args(0)).rdd
val links = lines.map{ s =>
val parts = s.split("\\s+")
(parts(0), parts(1))
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
index da19ea9f10..bb5d163608 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
@@ -50,7 +50,7 @@ object ALSExample {
import spark.implicits._
// $example on$
- val ratings = spark.read.text("data/mllib/als/sample_movielens_ratings.txt")
+ val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
.map(parseRating)
.toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala
index 781a934df6..d514891da7 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala
@@ -33,7 +33,7 @@ object RankingMetricsExample {
import spark.implicits._
// $example on$
// Read in the ratings data
- val ratings = spark.read.text("data/mllib/sample_movielens_data.txt").rdd.map { line =>
+ val ratings = spark.read.textFile("data/mllib/sample_movielens_data.txt").rdd.map { line =>
val fields = line.split("::")
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5)
}.cache()
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index f3182b237e..0f50f672a2 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -291,10 +291,10 @@ class DataFrameReader(object):
@ignore_unicode_prefix
@since(1.6)
def text(self, paths):
- """Loads a text file and returns a :class:`DataFrame` with a single string column named "value".
- If the directory structure of the text files contains partitioning information,
- those are ignored in the resulting DataFrame. To include partitioning information as
- columns, use ``read.format('text').load(...)``.
+ """
+ Loads text files and returns a :class:`DataFrame` whose schema starts with a
+ string column named "value", and followed by partitioned columns if there
+ are any.
Each line in the text file is a new row in the resulting DataFrame.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 078b63ee87..dfe31da3f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -450,29 +450,46 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}
/**
+ * Loads text files and returns a [[DataFrame]] whose schema starts with a string column named
+ * "value", and followed by partitioned columns if there are any.
+ *
+ * Each line in the text files is a new row in the resulting DataFrame. For example:
+ * {{{
+ * // Scala:
+ * spark.read.text("/path/to/spark/README.md")
+ *
+ * // Java:
+ * spark.read().text("/path/to/spark/README.md")
+ * }}}
+ *
+ * @param paths input path
+ * @since 1.6.0
+ */
+ @scala.annotation.varargs
+ def text(paths: String*): DataFrame = format("text").load(paths : _*)
+
+ /**
* Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset
* contains a single string column named "value".
*
* If the directory structure of the text files contains partitioning information, those are
- * ignored in the resulting Dataset. To include partitioning information as columns, use
- * `read.format("text").load("...")`.
+ * ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
*
* Each line in the text files is a new element in the resulting Dataset. For example:
* {{{
* // Scala:
- * spark.read.text("/path/to/spark/README.md")
+ * spark.read.textFile("/path/to/spark/README.md")
*
* // Java:
- * spark.read().text("/path/to/spark/README.md")
+ * spark.read().textFile("/path/to/spark/README.md")
* }}}
*
* @param paths input path
* @since 2.0.0
*/
@scala.annotation.varargs
- def text(paths: String*): Dataset[String] = {
- format("text").load(paths : _*).select("value")
- .as[String](sparkSession.implicits.newStringEncoder)
+ def textFile(paths: String*): Dataset[String] = {
+ text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder)
}
///////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 0152f3f85a..318b53cdbb 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -338,10 +338,10 @@ public class JavaDataFrameSuite {
@Test
public void testTextLoad() {
- Dataset<String> ds1 = spark.read().text(getResource("text-suite.txt"));
+ Dataset<String> ds1 = spark.read().textFile(getResource("text-suite.txt"));
Assert.assertEquals(4L, ds1.count());
- Dataset<String> ds2 = spark.read().text(
+ Dataset<String> ds2 = spark.read().textFile(
getResource("text-suite.txt"),
getResource("text-suite2.txt"));
Assert.assertEquals(5L, ds2.count());
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index 5695f6af7b..4ed517cb26 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -36,7 +36,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
}
test("SQLContext.read.text() API") {
- verifyFrame(spark.read.text(testFile).toDF())
+ verifyFrame(spark.read.text(testFile))
}
test("SPARK-12562 verify write.text() can handle column name beyond `value`") {
@@ -45,7 +45,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
val tempFile = Utils.createTempDir()
tempFile.delete()
df.write.text(tempFile.getCanonicalPath)
- verifyFrame(spark.read.text(tempFile.getCanonicalPath).toDF())
+ verifyFrame(spark.read.text(tempFile.getCanonicalPath))
Utils.deleteRecursively(tempFile)
}
@@ -64,20 +64,20 @@ class TextSuite extends QueryTest with SharedSQLContext {
}
}
- test("reading partitioned data using read.text()") {
+ test("reading partitioned data using read.textFile()") {
val partitionedData = Thread.currentThread().getContextClassLoader
.getResource("text-partitioned").toString
- val df = spark.read.text(partitionedData)
- val data = df.collect()
+ val ds = spark.read.textFile(partitionedData)
+ val data = ds.collect()
- assert(df.schema == new StructType().add("value", StringType))
+ assert(ds.schema == new StructType().add("value", StringType))
assert(data.length == 2)
}
- test("support for partitioned reading") {
+ test("support for partitioned reading using read.text()") {
val partitionedData = Thread.currentThread().getContextClassLoader
.getResource("text-partitioned").toString
- val df = spark.read.format("text").load(partitionedData)
+ val df = spark.read.text(partitionedData)
val data = df.filter("year = '2015'").select("value").collect()
assert(data(0) == Row("2015-test"))
@@ -94,7 +94,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath)
val compressedFiles = new File(tempDirPath).listFiles()
assert(compressedFiles.exists(_.getName.endsWith(s".txt$extension")))
- verifyFrame(spark.read.text(tempDirPath).toDF())
+ verifyFrame(spark.read.text(tempDirPath))
}
val errMsg = intercept[IllegalArgumentException] {
@@ -121,7 +121,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
.options(extraOptions).mode(SaveMode.Overwrite).text(tempDirPath)
val compressedFiles = new File(tempDirPath).listFiles()
assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz")))
- verifyFrame(spark.read.options(extraOptions).text(tempDirPath).toDF())
+ verifyFrame(spark.read.options(extraOptions).text(tempDirPath))
}
}