aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-07-25 23:52:37 -0700
committerReynold Xin <rxin@databricks.com>2015-07-25 23:52:37 -0700
commit4a01bfc2a2e664186028ea32095d32d29c9f9e38 (patch)
tree28e8c54fe6bc64d4e9eb77c397bd98c2088672ae /sql/hive
parent41a7cdf85de2d583d8b8759941a9d6c6e98cae4d (diff)
downloadspark-4a01bfc2a2e664186028ea32095d32d29c9f9e38.tar.gz
spark-4a01bfc2a2e664186028ea32095d32d29c9f9e38.tar.bz2
spark-4a01bfc2a2e664186028ea32095d32d29c9f9e38.zip
[SPARK-9350][SQL] Introduce an InternalRow generic getter that requires a DataType
Currently UnsafeRow cannot support a generic getter. However, if the data type is known, we can support a generic getter. Author: Reynold Xin <rxin@databricks.com> Closes #7666 from rxin/generic-getter-with-datatype and squashes the following commits: ee2874c [Reynold Xin] Add a default implementation for getStruct. 1e109a0 [Reynold Xin] [SPARK-9350][SQL] Introduce an InternalRow generic getter that requires a DataType. 033ee88 [Reynold Xin] Removed getAs in non test code.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala2
2 files changed, 4 insertions, 2 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index f0e0ca05a8..e4944caeff 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive._
+import org.apache.spark.sql.types.DataType
import org.apache.spark.{SparkException, TaskContext}
import scala.collection.JavaConversions._
@@ -96,13 +97,14 @@ case class InsertIntoHiveTable(
val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
val wrappers = fieldOIs.map(wrapperFor)
val outputData = new Array[Any](fieldOIs.length)
+ val dataTypes: Array[DataType] = child.output.map(_.dataType).toArray
writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
iterator.foreach { row =>
var i = 0
while (i < fieldOIs.length) {
- outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i))
+ outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i)))
i += 1
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 58445095ad..924f4d37ce 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -122,7 +122,7 @@ private[orc] class OrcOutputWriter(
override def writeInternal(row: InternalRow): Unit = {
var i = 0
while (i < row.numFields) {
- reusableOutputBuffer(i) = wrappers(i)(row.get(i))
+ reusableOutputBuffer(i) = wrappers(i)(row.get(i, dataSchema(i).dataType))
i += 1
}