aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-03-30 17:32:53 +0800
committerCheng Lian <lian@databricks.com>2016-03-30 17:32:53 +0800
commit816f359cf043ef719a0bc7df0506a3a830fff70d (patch)
tree80834d82017e66a7b81e07ab8a8966767fa787a1 /sql
parent7320f9bd190afb7639cd21e956e7300fdd84c0ee (diff)
downloadspark-816f359cf043ef719a0bc7df0506a3a830fff70d.tar.gz
spark-816f359cf043ef719a0bc7df0506a3a830fff70d.tar.bz2
spark-816f359cf043ef719a0bc7df0506a3a830fff70d.zip
[SPARK-14114][SQL] implement buildReader for text data source
## What changes were proposed in this pull request? This PR implements buildReader for text data source and enable it in the new data source code path. ## How was this patch tested? Existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #11934 from cloud-fan/text.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala28
2 files changed, 29 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 20fda95154..4448796b16 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -59,7 +59,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
if (files.fileFormat.toString == "TestFileFormat" ||
files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
files.fileFormat.toString == "ORC" ||
- files.fileFormat.isInstanceOf[json.DefaultSource]) &&
+ files.fileFormat.isInstanceOf[json.DefaultSource] ||
+ files.fileFormat.isInstanceOf[text.DefaultSource]) &&
files.sqlContext.conf.useFileScan =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 5cfc9e9afa..d6ab5fc56e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources.text
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
@@ -30,7 +31,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
-import org.apache.spark.sql.execution.datasources.CompressionCodecs
+import org.apache.spark.sql.execution.datasources.{CompressionCodecs, HadoopFileLinesReader, PartitionedFile}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
@@ -125,6 +126,31 @@ class DefaultSource extends FileFormat with DataSourceRegister {
}
}
}
+
+ override def buildReader(
+ sqlContext: SQLContext,
+ partitionSchema: StructType,
+ dataSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
+ val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ val broadcastedConf =
+ sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
+
+ file => {
+ val unsafeRow = new UnsafeRow(1)
+ val bufferHolder = new BufferHolder(unsafeRow)
+ val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
+
+ new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line =>
+ // Writes to an UnsafeRow directly
+ bufferHolder.reset()
+ unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
+ unsafeRow.setTotalSize(bufferHolder.totalSize())
+ unsafeRow
+ }
+ }
+ }
}
class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext)