aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-01-01 19:23:06 -0800
committerReynold Xin <rxin@databricks.com>2016-01-01 19:23:06 -0800
commit44ee920fd49d35b421ae562ea99bcc8f2b98ced6 (patch)
treefe94b7a91dda2ef27d7be6507baa83a339050846 /sql/hive
parent0da7bd50ddf0fb9e0e8aeadb9c7fb3edf6f0ee6e (diff)
downloadspark-44ee920fd49d35b421ae562ea99bcc8f2b98ced6.tar.gz
spark-44ee920fd49d35b421ae562ea99bcc8f2b98ced6.tar.bz2
spark-44ee920fd49d35b421ae562ea99bcc8f2b98ced6.zip
Revert "[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][SQL] always output UnsafeRow"
This reverts commit 0da7bd50ddf0fb9e0e8aeadb9c7fb3edf6f0ee6e.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala15
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala31
4 files changed, 44 insertions, 21 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index 1588728bdb..8141136de5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -132,17 +132,11 @@ case class HiveTableScan(
}
}
- protected override def doExecute(): RDD[InternalRow] = {
- val rdd = if (!relation.hiveQlTable.isPartitioned) {
- hadoopReader.makeRDDForTable(relation.hiveQlTable)
- } else {
- hadoopReader.makeRDDForPartitionedTable(
- prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
- }
- rdd.mapPartitionsInternal { iter =>
- val proj = UnsafeProjection.create(schema)
- iter.map(proj)
- }
+ protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) {
+ hadoopReader.makeRDDForTable(relation.hiveQlTable)
+ } else {
+ hadoopReader.makeRDDForPartitionedTable(
+ prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
}
override def output: Seq[Attribute] = attributes
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 44dc68e6ba..f936cf565b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -28,17 +28,18 @@ import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
+import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{FromUnsafeProjection, Attribute}
-import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.types.DataType
-import org.apache.spark.util.SerializableJobConf
import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.util.SerializableJobConf
private[hive]
case class InsertIntoHiveTable(
@@ -100,17 +101,15 @@ case class InsertIntoHiveTable(
writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
- val proj = FromUnsafeProjection(child.schema)
iterator.foreach { row =>
var i = 0
- val safeRow = proj(row)
while (i < fieldOIs.length) {
- outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(safeRow.get(i, dataTypes(i)))
+ outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i)))
i += 1
}
writerContainer
- .getLocalFileWriter(safeRow, table.schema)
+ .getLocalFileWriter(row, table.schema)
.write(serializer.serialize(outputData, standardOI))
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 6ccd417819..a61e162f48 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -213,8 +213,7 @@ case class ScriptTransformation(
child.execute().mapPartitions { iter =>
if (iter.hasNext) {
- val proj = UnsafeProjection.create(schema)
- processIterator(iter).map(proj)
+ processIterator(iter)
} else {
// If the input iterator has no rows then do not launch the external script.
Iterator.empty
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index efbf9988dd..665e87e3e3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
+import org.apache.spark.sql.execution.ConvertToUnsafe
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -688,6 +689,36 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
}
}
+
+ test("HadoopFsRelation produces UnsafeRow") {
+ withTempTable("test_unsafe") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ sqlContext.range(3).write.format(dataSourceName).save(path)
+ sqlContext.read
+ .format(dataSourceName)
+ .option("dataSchema", new StructType().add("id", LongType, nullable = false).json)
+ .load(path)
+ .registerTempTable("test_unsafe")
+
+ val df = sqlContext.sql(
+ """SELECT COUNT(*)
+ |FROM test_unsafe a JOIN test_unsafe b
+ |WHERE a.id = b.id
+ """.stripMargin)
+
+ val plan = df.queryExecution.executedPlan
+
+ assert(
+ plan.collect { case plan: ConvertToUnsafe => plan }.isEmpty,
+ s"""Query plan shouldn't have ${classOf[ConvertToUnsafe].getSimpleName} node(s):
+ |$plan
+ """.stripMargin)
+
+ checkAnswer(df, Row(3))
+ }
+ }
+ }
}
// This class is used to test SPARK-8578. We should not use any custom output committer when