aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-01-01 13:39:20 -0800
committerReynold Xin <rxin@databricks.com>2016-01-01 13:39:20 -0800
commit0da7bd50ddf0fb9e0e8aeadb9c7fb3edf6f0ee6e (patch)
tree831829e112b108c0da73492a351fe04c8f67f827 /sql/hive
parent6c20b3c0871609cbbb1de8e12edd3cad318fc14e (diff)
downloadspark-0da7bd50ddf0fb9e0e8aeadb9c7fb3edf6f0ee6e.tar.gz
spark-0da7bd50ddf0fb9e0e8aeadb9c7fb3edf6f0ee6e.tar.bz2
spark-0da7bd50ddf0fb9e0e8aeadb9c7fb3edf6f0ee6e.zip
[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][SQL] always output UnsafeRow
It's confusing that some operator output UnsafeRow but some not, easy to make mistake. This PR change to only output UnsafeRow for all the operators (SparkPlan), removed the rule to insert Unsafe/Safe conversions. For those that can't output UnsafeRow directly, added UnsafeProjection into them. Closes #10330 cc JoshRosen rxin Author: Davies Liu <davies@databricks.com> Closes #10511 from davies/unsafe_row.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala15
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala31
4 files changed, 21 insertions, 44 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index 8141136de5..1588728bdb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -132,11 +132,17 @@ case class HiveTableScan(
}
}
- protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) {
- hadoopReader.makeRDDForTable(relation.hiveQlTable)
- } else {
- hadoopReader.makeRDDForPartitionedTable(
- prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
+ protected override def doExecute(): RDD[InternalRow] = {
+ val rdd = if (!relation.hiveQlTable.isPartitioned) {
+ hadoopReader.makeRDDForTable(relation.hiveQlTable)
+ } else {
+ hadoopReader.makeRDDForPartitionedTable(
+ prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
+ }
+ rdd.mapPartitionsInternal { iter =>
+ val proj = UnsafeProjection.create(schema)
+ iter.map(proj)
+ }
}
override def output: Seq[Attribute] = attributes
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index f936cf565b..44dc68e6ba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -28,18 +28,17 @@ import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
+import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
+import org.apache.spark.sql.catalyst.expressions.{FromUnsafeProjection, Attribute}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.types.DataType
-import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.util.SerializableJobConf
+import org.apache.spark.{SparkException, TaskContext}
private[hive]
case class InsertIntoHiveTable(
@@ -101,15 +100,17 @@ case class InsertIntoHiveTable(
writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
+ val proj = FromUnsafeProjection(child.schema)
iterator.foreach { row =>
var i = 0
+ val safeRow = proj(row)
while (i < fieldOIs.length) {
- outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i)))
+ outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(safeRow.get(i, dataTypes(i)))
i += 1
}
writerContainer
- .getLocalFileWriter(row, table.schema)
+ .getLocalFileWriter(safeRow, table.schema)
.write(serializer.serialize(outputData, standardOI))
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index a61e162f48..6ccd417819 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -213,7 +213,8 @@ case class ScriptTransformation(
child.execute().mapPartitions { iter =>
if (iter.hasNext) {
- processIterator(iter)
+ val proj = UnsafeProjection.create(schema)
+ processIterator(iter).map(proj)
} else {
// If the input iterator has no rows then do not launch the external script.
Iterator.empty
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 665e87e3e3..efbf9988dd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -27,7 +27,6 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.ConvertToUnsafe
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -689,36 +688,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
}
}
-
- test("HadoopFsRelation produces UnsafeRow") {
- withTempTable("test_unsafe") {
- withTempPath { dir =>
- val path = dir.getCanonicalPath
- sqlContext.range(3).write.format(dataSourceName).save(path)
- sqlContext.read
- .format(dataSourceName)
- .option("dataSchema", new StructType().add("id", LongType, nullable = false).json)
- .load(path)
- .registerTempTable("test_unsafe")
-
- val df = sqlContext.sql(
- """SELECT COUNT(*)
- |FROM test_unsafe a JOIN test_unsafe b
- |WHERE a.id = b.id
- """.stripMargin)
-
- val plan = df.queryExecution.executedPlan
-
- assert(
- plan.collect { case plan: ConvertToUnsafe => plan }.isEmpty,
- s"""Query plan shouldn't have ${classOf[ConvertToUnsafe].getSimpleName} node(s):
- |$plan
- """.stripMargin)
-
- checkAnswer(df, Row(3))
- }
- }
- }
}
// This class is used to test SPARK-8578. We should not use any custom output committer when