aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSun Rui <rui.sun@intel.com>2015-09-16 13:20:39 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-09-16 13:20:39 -0700
commit896edb51ab7a88bbb31259e565311a9be6f2ca6d (patch)
treea21dd5c096fc1cacb309b7a5bf7ea342e0b4b4fe /core
parent5dbaf3d3911bbfa003bc75459aaad66b4f6e0c67 (diff)
downloadspark-896edb51ab7a88bbb31259e565311a9be6f2ca6d.tar.gz
spark-896edb51ab7a88bbb31259e565311a9be6f2ca6d.tar.bz2
spark-896edb51ab7a88bbb31259e565311a9be6f2ca6d.zip
[SPARK-10050] [SPARKR] Support collecting data of MapType in DataFrame.
1. Support collecting data of MapType from DataFrame. 2. Support data of MapType in createDataFrame. Author: Sun Rui <rui.sun@intel.com> Closes #8711 from sun-rui/SPARK-10050.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/SerDe.scala31
1 files changed, 31 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index 3c92bb7a1c..0c78613e40 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -209,11 +209,23 @@ private[spark] object SerDe {
case "array" => dos.writeByte('a')
// Array of objects
case "list" => dos.writeByte('l')
+ case "map" => dos.writeByte('e')
case "jobj" => dos.writeByte('j')
case _ => throw new IllegalArgumentException(s"Invalid type $typeStr")
}
}
+ private def writeKeyValue(dos: DataOutputStream, key: Object, value: Object): Unit = {
+ if (key == null) {
+ throw new IllegalArgumentException("Key in map can't be null.")
+ } else if (!key.isInstanceOf[String]) {
+ throw new IllegalArgumentException(s"Invalid map key type: ${key.getClass.getName}")
+ }
+
+ writeString(dos, key.asInstanceOf[String])
+ writeObject(dos, value)
+ }
+
def writeObject(dos: DataOutputStream, obj: Object): Unit = {
if (obj == null) {
writeType(dos, "void")
@@ -306,6 +318,25 @@ private[spark] object SerDe {
writeInt(dos, v.length)
v.foreach(elem => writeObject(dos, elem))
+ // Handle map
+ case v: java.util.Map[_, _] =>
+ writeType(dos, "map")
+ writeInt(dos, v.size)
+ val iter = v.entrySet.iterator
+ while(iter.hasNext) {
+ val entry = iter.next
+ val key = entry.getKey
+ val value = entry.getValue
+
+ writeKeyValue(dos, key.asInstanceOf[Object], value.asInstanceOf[Object])
+ }
+ case v: scala.collection.Map[_, _] =>
+ writeType(dos, "map")
+ writeInt(dos, v.size)
+ v.foreach { case (key, value) =>
+ writeKeyValue(dos, key.asInstanceOf[Object], value.asInstanceOf[Object])
+ }
+
case _ =>
writeType(dos, "jobj")
writeJObj(dos, value)