aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-03-29 14:34:12 +0800
committerWenchen Fan <wenchen@databricks.com>2016-03-29 14:34:12 +0800
commit83775bc78e183791f75a99cdfbcd68a67ca0d472 (patch)
tree65649118a6815223b789a7e622b768f8490f25ff /sql
parent63b200e8d4a05d5b744d437fd10781c6b5429da9 (diff)
downloadspark-83775bc78e183791f75a99cdfbcd68a67ca0d472.tar.gz
spark-83775bc78e183791f75a99cdfbcd68a67ca0d472.tar.bz2
spark-83775bc78e183791f75a99cdfbcd68a67ca0d472.zip
[SPARK-14158][SQL] implement buildReader for json data source
## What changes were proposed in this pull request? This PR implements buildReader for json data source and enable it in the new data source code path. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #11960 from cloud-fan/json.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala51
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala2
4 files changed, 90 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 4b04fec57d..76a724e51e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -58,7 +58,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _))
if (files.fileFormat.toString == "TestFileFormat" ||
files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
- files.fileFormat.toString == "ORC") &&
+ files.fileFormat.toString == "ORC" ||
+ files.fileFormat.isInstanceOf[json.DefaultSource]) &&
files.sqlContext.conf.parquetFileScan =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
@@ -138,7 +139,6 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
- assert(file.getLen != 0, file.toString)
(0L to file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
new file mode 100644
index 0000000000..18f9b55895
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.net.URI
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.{FileSplit, LineRecordReader}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+/**
+ * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines
+ * in that file.
+ */
+class HadoopFileLinesReader(file: PartitionedFile, conf: Configuration) extends Iterator[Text] {
+ private val iterator = {
+ val fileSplit = new FileSplit(
+ new Path(new URI(file.filePath)),
+ file.start,
+ file.length,
+ // TODO: Implement Locality
+ Array.empty)
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+ val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
+ val reader = new LineRecordReader()
+ reader.initialize(fileSplit, hadoopAttemptContext)
+ new RecordReaderIterator(reader)
+ }
+
+ override def hasNext: Boolean = iterator.hasNext
+
+ override def next(): Text = iterator.next()
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 3bf0af0efa..21fc1224ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.json
import java.io.CharArrayWriter
import com.fasterxml.jackson.core.JsonFactory
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
@@ -32,7 +33,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
@@ -120,6 +122,39 @@ class DefaultSource extends FileFormat with DataSourceRegister {
}
}
+ override def buildReader(
+ sqlContext: SQLContext,
+ partitionSchema: StructType,
+ dataSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
+ val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ val broadcastedConf =
+ sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
+
+ val parsedOptions: JSONOptions = new JSONOptions(options)
+ val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
+ .getOrElse(sqlContext.conf.columnNameOfCorruptRecord)
+
+ val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes
+ val joinedRow = new JoinedRow()
+
+ file => {
+ val lines = new HadoopFileLinesReader(file, broadcastedConf.value.value).map(_.toString)
+
+ val rows = JacksonParser.parseJson(
+ lines,
+ dataSchema,
+ columnNameOfCorruptRecord,
+ parsedOptions)
+
+ val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+ rows.map { row =>
+ appendPartitionColumns(joinedRow(row, file.partitionValues))
+ }
+ }
+ }
+
private def createBaseRdd(sqlContext: SQLContext, inputPaths: Seq[FileStatus]): RDD[String] = {
val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
val conf = job.getConfiguration
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 00c14adf07..8bc53bae6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -250,7 +250,7 @@ object JacksonParser extends Logging {
new GenericArrayData(values.toArray)
}
- private def parseJson(
+ def parseJson(
input: Iterator[String],
schema: StructType,
columnNameOfCorruptRecords: String,