aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorJurriaan Pruis <email@jurriaanpruis.nl>2016-05-18 16:15:09 -0700
committerReynold Xin <rxin@databricks.com>2016-05-18 16:15:09 -0700
commit32be51fba45f5e07a2a3520293c12dc7765a364d (patch)
tree1f967ee3b668518ec4c23af817db0cc1b5bfaf21 /sql/core/src
parent84b23453ddb0a97e3d81306de0a5dcb64f88bdd0 (diff)
downloadspark-32be51fba45f5e07a2a3520293c12dc7765a364d.tar.gz
spark-32be51fba45f5e07a2a3520293c12dc7765a364d.tar.bz2
spark-32be51fba45f5e07a2a3520293c12dc7765a364d.zip
[SPARK-15323][SPARK-14463][SQL] Fix reading of partitioned format=text datasets
https://issues.apache.org/jira/browse/SPARK-15323 I was using partitioned text datasets in Spark 1.6.1 but it broke in Spark 2.0.0. It would be logical if you could also write those, but not entirely sure how to solve this with the new DataSet implementation. Also it doesn't work using `sqlContext.read.text`, since that method returns a `DataSet[String]`. See https://issues.apache.org/jira/browse/SPARK-14463 for that issue. Author: Jurriaan Pruis <email@jurriaanpruis.nl> Closes #13104 from jurriaan/fix-partitioned-text-reads.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala14
-rw-r--r--sql/core/src/test/resources/text-partitioned/year=2014/data.txt1
-rw-r--r--sql/core/src/test/resources/text-partitioned/year=2015/data.txt1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala20
5 files changed, 24 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 011aff4ff6..e33fd831ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -457,7 +457,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*/
@scala.annotation.varargs
def text(paths: String*): Dataset[String] = {
- format("text").load(paths : _*).as[String](sparkSession.implicits.newStringEncoder)
+ format("text").load(paths : _*).select("value")
+ .as[String](sparkSession.implicits.newStringEncoder)
}
///////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index f22c0241d9..f091615a9a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -83,19 +83,6 @@ class DefaultSource extends FileFormat with DataSourceRegister {
}
}
- override private[sql] def buildReaderWithPartitionValues(
- sparkSession: SparkSession,
- dataSchema: StructType,
- partitionSchema: StructType,
- requiredSchema: StructType,
- filters: Seq[Filter],
- options: Map[String, String],
- hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
- // Text data source doesn't support partitioning. Here we simply delegate to `buildReader`.
- buildReader(
- sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
- }
-
override def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
@@ -152,4 +139,3 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp
recordWriter.close(context)
}
}
-
diff --git a/sql/core/src/test/resources/text-partitioned/year=2014/data.txt b/sql/core/src/test/resources/text-partitioned/year=2014/data.txt
new file mode 100644
index 0000000000..e2719428bb
--- /dev/null
+++ b/sql/core/src/test/resources/text-partitioned/year=2014/data.txt
@@ -0,0 +1 @@
+2014-test
diff --git a/sql/core/src/test/resources/text-partitioned/year=2015/data.txt b/sql/core/src/test/resources/text-partitioned/year=2015/data.txt
new file mode 100644
index 0000000000..b8c03daa8c
--- /dev/null
+++ b/sql/core/src/test/resources/text-partitioned/year=2015/data.txt
@@ -0,0 +1 @@
+2015-test
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index f61fce5d41..b5e51e963f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -65,6 +65,26 @@ class TextSuite extends QueryTest with SharedSQLContext {
}
}
+ test("reading partitioned data using read.text()") {
+ val partitionedData = Thread.currentThread().getContextClassLoader
+ .getResource("text-partitioned").toString
+ val df = spark.read.text(partitionedData)
+ val data = df.collect()
+
+ assert(df.schema == new StructType().add("value", StringType))
+ assert(data.length == 2)
+ }
+
+ test("support for partitioned reading") {
+ val partitionedData = Thread.currentThread().getContextClassLoader
+ .getResource("text-partitioned").toString
+ val df = spark.read.format("text").load(partitionedData)
+ val data = df.filter("year = '2015'").select("value").collect()
+
+ assert(data(0) == Row("2015-test"))
+ assert(data.length == 1)
+ }
+
test("SPARK-13503 Support to specify the option for compression codec for TEXT") {
val testDf = spark.read.text(testFile)
val extensionNameMap = Map("bzip2" -> ".bz2", "deflate" -> ".deflate", "gzip" -> ".gz")