aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-06-01 07:30:55 -0700
committerCheng Lian <lian@databricks.com>2016-06-01 07:30:55 -0700
commit1f43562daf9454428796317203d9dcc9030a46eb (patch)
treefe5ff282475b844acb8e7e9f7de6877c1eb797a1 /sql/core
parent6563d72b168c39115376e73788b48a2d60803d4e (diff)
downloadspark-1f43562daf9454428796317203d9dcc9030a46eb.tar.gz
spark-1f43562daf9454428796317203d9dcc9030a46eb.tar.bz2
spark-1f43562daf9454428796317203d9dcc9030a46eb.zip
[SPARK-14343][SQL] Proper column pruning for text data source
## What changes were proposed in this pull request? Text data source ignores requested schema, and may give wrong result when the only data column is not requested. This may happen when only partitioning column(s) are requested for a partitioned text table. ## How was this patch tested? New test case added in `TextSuite`. Author: Cheng Lian <lian@databricks.com> Closes #13431 from liancheng/spark-14343-partitioned-text-table.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala31
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala17
2 files changed, 35 insertions, 13 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index 1e5bce4a75..9c03ab28dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -92,20 +92,31 @@ class TextFileFormat extends FileFormat with DataSourceRegister {
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+ assert(
+ requiredSchema.length <= 1,
+ "Text data source only produces a single data column named \"value\".")
+
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
- val unsafeRow = new UnsafeRow(1)
- val bufferHolder = new BufferHolder(unsafeRow)
- val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
-
- new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map { line =>
- // Writes to an UnsafeRow directly
- bufferHolder.reset()
- unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
- unsafeRow.setTotalSize(bufferHolder.totalSize())
- unsafeRow
+ val reader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
+
+ if (requiredSchema.isEmpty) {
+ val emptyUnsafeRow = new UnsafeRow(0)
+ reader.map(_ => emptyUnsafeRow)
+ } else {
+ val unsafeRow = new UnsafeRow(1)
+ val bufferHolder = new BufferHolder(unsafeRow)
+ val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
+
+ reader.map { line =>
+ // Writes to an UnsafeRow directly
+ bufferHolder.reset()
+ unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
+ unsafeRow.setTotalSize(bufferHolder.totalSize())
+ unsafeRow
+ }
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index b5e51e963f..7b6981f95e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -19,9 +19,6 @@ package org.apache.spark.sql.execution.datasources.text
import java.io.File
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
@@ -31,6 +28,7 @@ import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.Utils
class TextSuite extends QueryTest with SharedSQLContext {
+ import testImplicits._
test("reading text file") {
verifyFrame(spark.read.format("text").load(testFile))
@@ -126,6 +124,19 @@ class TextSuite extends QueryTest with SharedSQLContext {
}
}
+ test("SPARK-14343: select partitioning column") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val ds1 = spark.range(1).selectExpr("CONCAT('val_', id)")
+ ds1.write.text(s"$path/part=a")
+ ds1.write.text(s"$path/part=b")
+
+ checkDataset(
+ spark.read.format("text").load(path).select($"part"),
+ Row("a"), Row("b"))
+ }
+ }
+
private def testFile: String = {
Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString
}