aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-06-25 00:14:34 -0700
committerReynold Xin <rxin@apache.org>2014-06-25 00:14:34 -0700
commit8fade8973e5fc97f781de5344beb66b90bd6e524 (patch)
tree99b7cd1b46125b81186cc6f11d57771d3e743217 /sql
parentb6b44853cd61660f2917b99d87c157e2b4430e5c (diff)
downloadspark-8fade8973e5fc97f781de5344beb66b90bd6e524.tar.gz
spark-8fade8973e5fc97f781de5344beb66b90bd6e524.tar.bz2
spark-8fade8973e5fc97f781de5344beb66b90bd6e524.zip
[SPARK-2263][SQL] Support inserting MAP<K, V> to Hive tables
JIRA issue: [SPARK-2263](https://issues.apache.org/jira/browse/SPARK-2263) Map objects were not converted to Hive types before inserting into Hive tables. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1205 from liancheng/spark-2263 and squashes the following commits: c7a4373 [Cheng Lian] Addressed @concretevitamin's comment 784940b [Cheng Lian] SARPK-2263: support inserting MAP<K, V> to Hive tables
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala15
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala3
3 files changed, 20 insertions, 6 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 594a803806..c2b0b00aa5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.execution
import scala.collection.JavaConversions._
+import java.util.{HashMap => JHashMap}
+
import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
import org.apache.hadoop.hive.metastore.MetaStoreUtils
import org.apache.hadoop.hive.ql.Context
@@ -88,6 +90,12 @@ case class InsertIntoHiveTable(
val wrappedSeq = s.map(wrap(_, oi.getListElementObjectInspector))
seqAsJavaList(wrappedSeq)
+ case (m: Map[_, _], oi: MapObjectInspector) =>
+ val keyOi = oi.getMapKeyObjectInspector
+ val valueOi = oi.getMapValueObjectInspector
+ val wrappedMap = m.map { case (key, value) => wrap(key, keyOi) -> wrap(value, valueOi) }
+ mapAsJavaMap(wrappedMap)
+
case (obj, _) =>
obj
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index d855310253..9f1cd70310 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -228,7 +228,7 @@ class HiveQuerySuite extends HiveComparisonTest {
val fixture = List(("foo", 2), ("bar", 1), ("foo", 4), ("bar", 3))
.zipWithIndex.map {case Pair(Pair(value, attr), key) => HavingRow(key, value, attr)}
TestHive.sparkContext.parallelize(fixture).registerAsTable("having_test")
- val results =
+ val results =
hql("SELECT value, max(attr) AS attr FROM having_test GROUP BY value HAVING attr > 3")
.collect()
.map(x => Pair(x.getString(0), x.getInt(1)))
@@ -236,7 +236,7 @@ class HiveQuerySuite extends HiveComparisonTest {
assert(results === Array(Pair("foo", 4)))
TestHive.reset()
}
-
+
test("SPARK-2180: HAVING with non-boolean clause raises no exceptions") {
hql("select key, count(*) c from src group by key having c").collect()
}
@@ -370,6 +370,16 @@ class HiveQuerySuite extends HiveComparisonTest {
}
}
+ test("SPARK-2263: Insert Map<K, V> values") {
+ hql("CREATE TABLE m(value MAP<INT, STRING>)")
+ hql("INSERT OVERWRITE TABLE m SELECT MAP(key, value) FROM src LIMIT 10")
+ hql("SELECT * FROM m").collect().zip(hql("SELECT * FROM src LIMIT 10").collect()).map {
+ case (Row(map: Map[Int, String]), Row(key: Int, value: String)) =>
+ assert(map.size === 1)
+ assert(map.head === (key, value))
+ }
+ }
+
test("parse HQL set commands") {
// Adapted from its SQL counterpart.
val testKey = "spark.sql.key.usedfortestonly"
@@ -460,7 +470,6 @@ class HiveQuerySuite extends HiveComparisonTest {
// Put tests that depend on specific Hive settings before these last two test,
// since they modify /clear stuff.
-
}
// for SPARK-2180 test
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
index a9e3f42a3a..f944d01066 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
@@ -122,6 +122,3 @@ class PairUdf extends GenericUDF {
override def getDisplayString(p1: Array[String]): String = ""
}
-
-
-