aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala14
-rw-r--r--sql/core/src/test/resources/text-partitioned/year=2014/data.txt1
-rw-r--r--sql/core/src/test/resources/text-partitioned/year=2015/data.txt1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala20
5 files changed, 24 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 011aff4ff6..e33fd831ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -457,7 +457,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*/
@scala.annotation.varargs
def text(paths: String*): Dataset[String] = {
- format("text").load(paths : _*).as[String](sparkSession.implicits.newStringEncoder)
+ format("text").load(paths : _*).select("value")
+ .as[String](sparkSession.implicits.newStringEncoder)
}
///////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index f22c0241d9..f091615a9a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -83,19 +83,6 @@ class DefaultSource extends FileFormat with DataSourceRegister {
}
}
- override private[sql] def buildReaderWithPartitionValues(
- sparkSession: SparkSession,
- dataSchema: StructType,
- partitionSchema: StructType,
- requiredSchema: StructType,
- filters: Seq[Filter],
- options: Map[String, String],
- hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
- // Text data source doesn't support partitioning. Here we simply delegate to `buildReader`.
- buildReader(
- sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
- }
-
override def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
@@ -152,4 +139,3 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp
recordWriter.close(context)
}
}
-
diff --git a/sql/core/src/test/resources/text-partitioned/year=2014/data.txt b/sql/core/src/test/resources/text-partitioned/year=2014/data.txt
new file mode 100644
index 0000000000..e2719428bb
--- /dev/null
+++ b/sql/core/src/test/resources/text-partitioned/year=2014/data.txt
@@ -0,0 +1 @@
+2014-test
diff --git a/sql/core/src/test/resources/text-partitioned/year=2015/data.txt b/sql/core/src/test/resources/text-partitioned/year=2015/data.txt
new file mode 100644
index 0000000000..b8c03daa8c
--- /dev/null
+++ b/sql/core/src/test/resources/text-partitioned/year=2015/data.txt
@@ -0,0 +1 @@
+2015-test
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index f61fce5d41..b5e51e963f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -65,6 +65,26 @@ class TextSuite extends QueryTest with SharedSQLContext {
}
}
+ test("reading partitioned data using read.text()") {
+ val partitionedData = Thread.currentThread().getContextClassLoader
+ .getResource("text-partitioned").toString
+ val df = spark.read.text(partitionedData)
+ val data = df.collect()
+
+ assert(df.schema == new StructType().add("value", StringType))
+ assert(data.length == 2)
+ }
+
+ test("support for partitioned reading") {
+ val partitionedData = Thread.currentThread().getContextClassLoader
+ .getResource("text-partitioned").toString
+ val df = spark.read.format("text").load(partitionedData)
+ val data = df.filter("year = '2015'").select("value").collect()
+
+ assert(data(0) == Row("2015-test"))
+ assert(data.length == 1)
+ }
+
test("SPARK-13503 Support to specify the option for compression codec for TEXT") {
val testDf = spark.read.text(testFile)
val extensionNameMap = Map("bzip2" -> ".bz2", "deflate" -> ".deflate", "gzip" -> ".gz")