aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala30
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala31
2 files changed, 44 insertions, 17 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index d1f30e188e..45de567039 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -19,21 +19,20 @@ package org.apache.spark.sql.hive.orc
import java.util.Properties
-import scala.collection.JavaConverters._
-
import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit, OrcStruct}
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
-import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, StructTypeInfo}
+import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.Logging
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.{HadoopRDD, RDD}
@@ -199,12 +198,13 @@ private[sql] class OrcRelation(
partitionColumns)
}
- override def buildScan(
+ override private[sql] def buildInternalScan(
requiredColumns: Array[String],
filters: Array[Filter],
- inputPaths: Array[FileStatus]): RDD[Row] = {
+ inputPaths: Array[FileStatus],
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes
- OrcTableScan(output, this, filters, inputPaths).execute().asInstanceOf[RDD[Row]]
+ OrcTableScan(output, this, filters, inputPaths).execute()
}
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
@@ -253,16 +253,17 @@ private[orc] case class OrcTableScan(
path: String,
conf: Configuration,
iterator: Iterator[Writable],
- nonPartitionKeyAttrs: Seq[(Attribute, Int)],
- mutableRow: MutableRow): Iterator[InternalRow] = {
+ nonPartitionKeyAttrs: Seq[Attribute]): Iterator[InternalRow] = {
val deserializer = new OrcSerde
val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf))
+ val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
+ val unsafeProjection = UnsafeProjection.create(StructType.fromAttributes(nonPartitionKeyAttrs))
// SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero
// rows, and thus couldn't give a proper ObjectInspector. In this case we just return an empty
// partition since we know that this file is empty.
maybeStructOI.map { soi =>
- val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
+ val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.zipWithIndex.map {
case (attr, ordinal) =>
soi.getStructFieldRef(attr.name) -> ordinal
}.unzip
@@ -280,7 +281,7 @@ private[orc] case class OrcTableScan(
}
i += 1
}
- mutableRow: InternalRow
+ unsafeProjection(mutableRow)
}
}.getOrElse {
Iterator.empty
@@ -322,13 +323,8 @@ private[orc] case class OrcTableScan(
val wrappedConf = new SerializableConfiguration(conf)
rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
- val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
- fillObject(
- split.getPath.toString,
- wrappedConf.value,
- iterator.map(_._2),
- attributes.zipWithIndex,
- mutableRow)
+ val writableIterator = iterator.map(_._2)
+ fillObject(split.getPath.toString, wrappedConf.value, writableIterator, attributes)
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index e3605bb3f6..100b97137c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
+import org.apache.spark.sql.execution.ConvertToUnsafe
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -687,6 +688,36 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
}
}
+
+ test("HadoopFsRelation produces UnsafeRow") {
+ withTempTable("test_unsafe") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ sqlContext.range(3).write.format(dataSourceName).save(path)
+ sqlContext.read
+ .format(dataSourceName)
+ .option("dataSchema", new StructType().add("id", LongType, nullable = false).json)
+ .load(path)
+ .registerTempTable("test_unsafe")
+
+ val df = sqlContext.sql(
+ """SELECT COUNT(*)
+ |FROM test_unsafe a JOIN test_unsafe b
+ |WHERE a.id = b.id
+ """.stripMargin)
+
+ val plan = df.queryExecution.executedPlan
+
+ assert(
+ plan.collect { case plan: ConvertToUnsafe => plan }.isEmpty,
+ s"""Query plan shouldn't have ${classOf[ConvertToUnsafe].getSimpleName} node(s):
+ |$plan
+ """.stripMargin)
+
+ checkAnswer(df, Row(3))
+ }
+ }
+ }
}
// This class is used to test SPARK-8578. We should not use any custom output committer when