From 83775bc78e183791f75a99cdfbcd68a67ca0d472 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 29 Mar 2016 14:34:12 +0800 Subject: [SPARK-14158][SQL] implement buildReader for json data source ## What changes were proposed in this pull request? This PR implements buildReader for json data source and enable it in the new data source code path. ## How was this patch tested? existing tests Author: Wenchen Fan Closes #11960 from cloud-fan/json. --- .../execution/datasources/FileSourceStrategy.scala | 4 +- .../datasources/HadoopFileLinesReader.scala | 51 ++++++++++++++++++++++ .../execution/datasources/json/JSONRelation.scala | 37 +++++++++++++++- .../execution/datasources/json/JacksonParser.scala | 2 +- 4 files changed, 90 insertions(+), 4 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 4b04fec57d..76a724e51e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -58,7 +58,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) if (files.fileFormat.toString == "TestFileFormat" || files.fileFormat.isInstanceOf[parquet.DefaultSource] || - files.fileFormat.toString == "ORC") && + files.fileFormat.toString == "ORC" || + files.fileFormat.isInstanceOf[json.DefaultSource]) && files.sqlContext.conf.parquetFileScan => // Filters on this relation fall into four categories based on where we can use them to avoid // reading unneeded data: @@ -138,7 +139,6 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => - assert(file.getLen != 0, file.toString) (0L to file.getLen by maxSplitBytes).map { offset => val remaining = file.getLen - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala new file mode 100644 index 0000000000..18f9b55895 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.net.URI + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.Text +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.{FileSplit, LineRecordReader} +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl + +/** + * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines + * in that file. + */ +class HadoopFileLinesReader(file: PartitionedFile, conf: Configuration) extends Iterator[Text] { + private val iterator = { + val fileSplit = new FileSplit( + new Path(new URI(file.filePath)), + file.start, + file.length, + // TODO: Implement Locality + Array.empty) + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + val reader = new LineRecordReader() + reader.initialize(fileSplit, hadoopAttemptContext) + new RecordReaderIterator(reader) + } + + override def hasNext: Boolean = iterator.hasNext + + override def next(): Text = iterator.next() +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 3bf0af0efa..21fc1224ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.json import java.io.CharArrayWriter import com.fasterxml.jackson.core.JsonFactory +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.{LongWritable, NullWritable, Text} import org.apache.hadoop.mapred.{JobConf, TextInputFormat} @@ -32,7 +33,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType @@ -120,6 +122,39 @@ class DefaultSource extends FileFormat with DataSourceRegister { } } + override def buildReader( + sqlContext: SQLContext, + partitionSchema: StructType, + dataSchema: StructType, + filters: Seq[Filter], + options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { + val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + val broadcastedConf = + sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf)) + + val parsedOptions: JSONOptions = new JSONOptions(options) + val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord + .getOrElse(sqlContext.conf.columnNameOfCorruptRecord) + + val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes + val joinedRow = new JoinedRow() + + file => { + val lines = new HadoopFileLinesReader(file, broadcastedConf.value.value).map(_.toString) + + val rows = JacksonParser.parseJson( + lines, + dataSchema, + columnNameOfCorruptRecord, + parsedOptions) + + val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + rows.map { row => + appendPartitionColumns(joinedRow(row, file.partitionValues)) + } + } + } + private def createBaseRdd(sqlContext: SQLContext, inputPaths: Seq[FileStatus]): RDD[String] = { val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration) val conf = job.getConfiguration diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 00c14adf07..8bc53bae6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -250,7 +250,7 @@ object JacksonParser extends Logging { new GenericArrayData(values.toArray) } - private def parseJson( + def parseJson( input: Iterator[String], schema: StructType, columnNameOfCorruptRecords: String, -- cgit v1.2.3