aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-10-31 21:16:09 -0700
committerDavies Liu <davies.liu@gmail.com>2015-10-31 21:16:09 -0700
commitaa494a9c2ebd59baec47beb434cd09bf3f188218 (patch)
tree54bd81e8b7540d08c27be3671bc250ec54020bbf /sql/hive
parent40d3c6797a3dfd037eb69b2bcd336d8544deddf5 (diff)
downloadspark-aa494a9c2ebd59baec47beb434cd09bf3f188218.tar.gz
spark-aa494a9c2ebd59baec47beb434cd09bf3f188218.tar.bz2
spark-aa494a9c2ebd59baec47beb434cd09bf3f188218.zip
[SPARK-11117] [SPARK-11345] [SQL] Makes all HadoopFsRelation data sources produce UnsafeRow
This PR fixes two issues: 1. `PhysicalRDD.outputsUnsafeRows` is always `false` Thus a `ConvertToUnsafe` operator is often required even if the underlying data source relation does output `UnsafeRow`. 1. Internal/external row conversion for `HadoopFsRelation` is kinda messy Currently we're using `HadoopFsRelation.needConversion` and [dirty type erasure hacks][1] to indicate whether the relation outputs external row or internal row and apply external-to-internal conversion when necessary. Basically, all builtin `HadoopFsRelation` data sources, i.e. Parquet, JSON, ORC, and Text output `InternalRow`, while typical external `HadoopFsRelation` data sources, e.g. spark-avro and spark-csv, output `Row`. This PR adds a `private[sql]` interface method `HadoopFsRelation.buildInternalScan`, which by default invokes `HadoopFsRelation.buildScan` and converts `Row`s to `UnsafeRow`s (which are also `InternalRow`s). All builtin `HadoopFsRelation` data sources override this method and directly output `UnsafeRow`s. In this way, now `HadoopFsRelation` always produces `UnsafeRow`s. Thus `PhysicalRDD.outputsUnsafeRows` can be properly set by checking whether the underlying data source is a `HadoopFsRelation`. A remaining question is that, can we assume that all non-builtin `HadoopFsRelation` data sources output external rows? At least all well known ones do so. However it's possible that some users implemented their own `HadoopFsRelation` data sources that leverages `InternalRow` and thus all those unstable internal data representations. If this assumption is safe, we can deprecate `HadoopFsRelation.needConversion` and cleanup some more conversion code (like [here][2] and [here][3]). This PR supersedes #9125. Follow-ups: 1. Makes JSON and ORC data sources output `UnsafeRow` directly 1. Makes `HiveTableScan` output `UnsafeRow` directly This is related to 1 since ORC data source shares the same `Writable` unwrapping code with `HiveTableScan`. [1]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala#L353 [2]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L331-L335 [3]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L630-L669 Author: Cheng Lian <lian@databricks.com> Closes #9305 from liancheng/spark-11345.unsafe-hadoop-fs-relation.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala30
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala31
2 files changed, 44 insertions, 17 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index d1f30e188e..45de567039 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -19,21 +19,20 @@ package org.apache.spark.sql.hive.orc
import java.util.Properties
-import scala.collection.JavaConverters._
-
import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit, OrcStruct}
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
-import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, StructTypeInfo}
+import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.Logging
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.{HadoopRDD, RDD}
@@ -199,12 +198,13 @@ private[sql] class OrcRelation(
partitionColumns)
}
- override def buildScan(
+ override private[sql] def buildInternalScan(
requiredColumns: Array[String],
filters: Array[Filter],
- inputPaths: Array[FileStatus]): RDD[Row] = {
+ inputPaths: Array[FileStatus],
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes
- OrcTableScan(output, this, filters, inputPaths).execute().asInstanceOf[RDD[Row]]
+ OrcTableScan(output, this, filters, inputPaths).execute()
}
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
@@ -253,16 +253,17 @@ private[orc] case class OrcTableScan(
path: String,
conf: Configuration,
iterator: Iterator[Writable],
- nonPartitionKeyAttrs: Seq[(Attribute, Int)],
- mutableRow: MutableRow): Iterator[InternalRow] = {
+ nonPartitionKeyAttrs: Seq[Attribute]): Iterator[InternalRow] = {
val deserializer = new OrcSerde
val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf))
+ val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
+ val unsafeProjection = UnsafeProjection.create(StructType.fromAttributes(nonPartitionKeyAttrs))
// SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero
// rows, and thus couldn't give a proper ObjectInspector. In this case we just return an empty
// partition since we know that this file is empty.
maybeStructOI.map { soi =>
- val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
+ val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.zipWithIndex.map {
case (attr, ordinal) =>
soi.getStructFieldRef(attr.name) -> ordinal
}.unzip
@@ -280,7 +281,7 @@ private[orc] case class OrcTableScan(
}
i += 1
}
- mutableRow: InternalRow
+ unsafeProjection(mutableRow)
}
}.getOrElse {
Iterator.empty
@@ -322,13 +323,8 @@ private[orc] case class OrcTableScan(
val wrappedConf = new SerializableConfiguration(conf)
rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
- val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
- fillObject(
- split.getPath.toString,
- wrappedConf.value,
- iterator.map(_._2),
- attributes.zipWithIndex,
- mutableRow)
+ val writableIterator = iterator.map(_._2)
+ fillObject(split.getPath.toString, wrappedConf.value, writableIterator, attributes)
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index e3605bb3f6..100b97137c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
+import org.apache.spark.sql.execution.ConvertToUnsafe
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -687,6 +688,36 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
}
}
+
+ test("HadoopFsRelation produces UnsafeRow") {
+ withTempTable("test_unsafe") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ sqlContext.range(3).write.format(dataSourceName).save(path)
+ sqlContext.read
+ .format(dataSourceName)
+ .option("dataSchema", new StructType().add("id", LongType, nullable = false).json)
+ .load(path)
+ .registerTempTable("test_unsafe")
+
+ val df = sqlContext.sql(
+ """SELECT COUNT(*)
+ |FROM test_unsafe a JOIN test_unsafe b
+ |WHERE a.id = b.id
+ """.stripMargin)
+
+ val plan = df.queryExecution.executedPlan
+
+ assert(
+ plan.collect { case plan: ConvertToUnsafe => plan }.isEmpty,
+ s"""Query plan shouldn't have ${classOf[ConvertToUnsafe].getSimpleName} node(s):
+ |$plan
+ """.stripMargin)
+
+ checkAnswer(df, Row(3))
+ }
+ }
+ }
}
// This class is used to test SPARK-8578. We should not use any custom output committer when