diff options
Diffstat (limited to 'sql')
5 files changed, 24 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 011aff4ff6..e33fd831ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -457,7 +457,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { */ @scala.annotation.varargs def text(paths: String*): Dataset[String] = { - format("text").load(paths : _*).as[String](sparkSession.implicits.newStringEncoder) + format("text").load(paths : _*).select("value") + .as[String](sparkSession.implicits.newStringEncoder) } /////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index f22c0241d9..f091615a9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -83,19 +83,6 @@ class DefaultSource extends FileFormat with DataSourceRegister { } } - override private[sql] def buildReaderWithPartitionValues( - sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - // Text data source doesn't support partitioning. Here we simply delegate to `buildReader`. - buildReader( - sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf) - } - override def buildReader( sparkSession: SparkSession, dataSchema: StructType, @@ -152,4 +139,3 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp recordWriter.close(context) } } - diff --git a/sql/core/src/test/resources/text-partitioned/year=2014/data.txt b/sql/core/src/test/resources/text-partitioned/year=2014/data.txt new file mode 100644 index 0000000000..e2719428bb --- /dev/null +++ b/sql/core/src/test/resources/text-partitioned/year=2014/data.txt @@ -0,0 +1 @@ +2014-test diff --git a/sql/core/src/test/resources/text-partitioned/year=2015/data.txt b/sql/core/src/test/resources/text-partitioned/year=2015/data.txt new file mode 100644 index 0000000000..b8c03daa8c --- /dev/null +++ b/sql/core/src/test/resources/text-partitioned/year=2015/data.txt @@ -0,0 +1 @@ +2015-test diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index f61fce5d41..b5e51e963f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -65,6 +65,26 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + test("reading partitioned data using read.text()") { + val partitionedData = Thread.currentThread().getContextClassLoader + .getResource("text-partitioned").toString + val df = spark.read.text(partitionedData) + val data = df.collect() + + assert(df.schema == new StructType().add("value", StringType)) + assert(data.length == 2) + } + + test("support for partitioned reading") { + val partitionedData = Thread.currentThread().getContextClassLoader + .getResource("text-partitioned").toString + val df = spark.read.format("text").load(partitionedData) + val data = df.filter("year = '2015'").select("value").collect() + + assert(data(0) == Row("2015-test")) + assert(data.length == 1) + } + test("SPARK-13503 Support to specify the option for compression codec for TEXT") { val testDf = spark.read.text(testFile) val extensionNameMap = Map("bzip2" -> ".bz2", "deflate" -> ".deflate", "gzip" -> ".gz") |