aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorwangfei <wangfei1@huawei.com>2014-11-02 15:45:55 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-02 15:45:55 -0800
commite749f5dedbad412430b86e7290085095f8dec0d1 (patch)
treedf6e678ae821ce803be93df3b791e8a75cf62cc6 /sql/hive
parentc9f840046f8c45b1137f0289eeb0c980de72ea5e (diff)
downloadspark-e749f5dedbad412430b86e7290085095f8dec0d1.tar.gz
spark-e749f5dedbad412430b86e7290085095f8dec0d1.tar.bz2
spark-e749f5dedbad412430b86e7290085095f8dec0d1.zip
[SPARK-4191][SQL]move wrapperFor to HiveInspectors to reuse it
Move wrapperFor in InsertIntoHiveTable to HiveInspectors to reuse them, this method can be reused when writing date with ObjectInspector(such as orc support) Author: wangfei <wangfei1@huawei.com> Author: scwf <wangfei1@huawei.com> Closes #3057 from scwf/reuse-wraperfor and squashes the following commits: 7ccf932 [scwf] fix conflicts d44f4da [wangfei] fix imports 9bf1b50 [wangfei] revert no related change 9a5276a [wangfei] move wrapfor to hiveinspector to reuse them
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala47
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala44
2 files changed, 48 insertions, 43 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 1e2bf5cc4b..58815daa82 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive
-import org.apache.hadoop.hive.common.`type`.HiveDecimal
+import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector._
@@ -114,6 +114,51 @@ private[hive] trait HiveInspectors {
unwrap(si.getStructFieldData(data,r), r.getFieldObjectInspector)).toArray)
}
+
+ /**
+ * Wraps with Hive types based on object inspector.
+ * TODO: Consolidate all hive OI/data interface code.
+ */
+ /**
+ * Wraps with Hive types based on object inspector.
+ * TODO: Consolidate all hive OI/data interface code.
+ */
+ protected def wrapperFor(oi: ObjectInspector): Any => Any = oi match {
+ case _: JavaHiveVarcharObjectInspector =>
+ (o: Any) => new HiveVarchar(o.asInstanceOf[String], o.asInstanceOf[String].size)
+
+ case _: JavaHiveDecimalObjectInspector =>
+ (o: Any) => HiveShim.createDecimal(o.asInstanceOf[Decimal].toBigDecimal.underlying())
+
+ case soi: StandardStructObjectInspector =>
+ val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector))
+ (o: Any) => {
+ val struct = soi.create()
+ (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row]).zipped.foreach {
+ (field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data))
+ }
+ struct
+ }
+
+ case loi: ListObjectInspector =>
+ val wrapper = wrapperFor(loi.getListElementObjectInspector)
+ (o: Any) => seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper))
+
+ case moi: MapObjectInspector =>
+ // The Predef.Map is scala.collection.immutable.Map.
+ // Since the map values can be mutable, we explicitly import scala.collection.Map at here.
+ import scala.collection.Map
+
+ val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector)
+ val valueWrapper = wrapperFor(moi.getMapValueObjectInspector)
+ (o: Any) => mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) =>
+ keyWrapper(key) -> valueWrapper(value)
+ })
+
+ case _ =>
+ identity[Any]
+ }
+
/**
* Converts native catalyst types to the types expected by Hive
* @param a the value to be wrapped
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 92bc1c6625..74b4e7aaa4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution
import scala.collection.JavaConversions._
-import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
+import org.apache.hadoop.hive.common.`type`.HiveVarchar
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.MetaStoreUtils
@@ -52,7 +52,7 @@ case class InsertIntoHiveTable(
child: SparkPlan,
overwrite: Boolean)
(@transient sc: HiveContext)
- extends UnaryNode with Command {
+ extends UnaryNode with Command with HiveInspectors {
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@@ -68,46 +68,6 @@ case class InsertIntoHiveTable(
def output = child.output
- /**
- * Wraps with Hive types based on object inspector.
- * TODO: Consolidate all hive OI/data interface code.
- */
- protected def wrapperFor(oi: ObjectInspector): Any => Any = oi match {
- case _: JavaHiveVarcharObjectInspector =>
- (o: Any) => new HiveVarchar(o.asInstanceOf[String], o.asInstanceOf[String].size)
-
- case _: JavaHiveDecimalObjectInspector =>
- (o: Any) => HiveShim.createDecimal(o.asInstanceOf[Decimal].toBigDecimal.underlying())
-
- case soi: StandardStructObjectInspector =>
- val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector))
- (o: Any) => {
- val struct = soi.create()
- (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row]).zipped.foreach {
- (field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data))
- }
- struct
- }
-
- case loi: ListObjectInspector =>
- val wrapper = wrapperFor(loi.getListElementObjectInspector)
- (o: Any) => seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper))
-
- case moi: MapObjectInspector =>
- // The Predef.Map is scala.collection.immutable.Map.
- // Since the map values can be mutable, we explicitly import scala.collection.Map at here.
- import scala.collection.Map
-
- val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector)
- val valueWrapper = wrapperFor(moi.getMapValueObjectInspector)
- (o: Any) => mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) =>
- keyWrapper(key) -> valueWrapper(value)
- })
-
- case _ =>
- identity[Any]
- }
-
def saveAsHiveFile(
rdd: RDD[Row],
valueClass: Class[_],