diff options
author | Sun Rui <rui.sun@intel.com> | 2015-09-16 13:20:39 -0700 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@cs.berkeley.edu> | 2015-09-16 13:20:39 -0700 |
commit | 896edb51ab7a88bbb31259e565311a9be6f2ca6d (patch) | |
tree | a21dd5c096fc1cacb309b7a5bf7ea342e0b4b4fe /core | |
parent | 5dbaf3d3911bbfa003bc75459aaad66b4f6e0c67 (diff) | |
download | spark-896edb51ab7a88bbb31259e565311a9be6f2ca6d.tar.gz spark-896edb51ab7a88bbb31259e565311a9be6f2ca6d.tar.bz2 spark-896edb51ab7a88bbb31259e565311a9be6f2ca6d.zip |
[SPARK-10050] [SPARKR] Support collecting data of MapType in DataFrame.
1. Support collecting data of MapType from DataFrame.
2. Support data of MapType in createDataFrame.
Author: Sun Rui <rui.sun@intel.com>
Closes #8711 from sun-rui/SPARK-10050.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/r/SerDe.scala | 31 |
1 files changed, 31 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index 3c92bb7a1c..0c78613e40 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -209,11 +209,23 @@ private[spark] object SerDe { case "array" => dos.writeByte('a') // Array of objects case "list" => dos.writeByte('l') + case "map" => dos.writeByte('e') case "jobj" => dos.writeByte('j') case _ => throw new IllegalArgumentException(s"Invalid type $typeStr") } } + private def writeKeyValue(dos: DataOutputStream, key: Object, value: Object): Unit = { + if (key == null) { + throw new IllegalArgumentException("Key in map can't be null.") + } else if (!key.isInstanceOf[String]) { + throw new IllegalArgumentException(s"Invalid map key type: ${key.getClass.getName}") + } + + writeString(dos, key.asInstanceOf[String]) + writeObject(dos, value) + } + def writeObject(dos: DataOutputStream, obj: Object): Unit = { if (obj == null) { writeType(dos, "void") @@ -306,6 +318,25 @@ private[spark] object SerDe { writeInt(dos, v.length) v.foreach(elem => writeObject(dos, elem)) + // Handle map + case v: java.util.Map[_, _] => + writeType(dos, "map") + writeInt(dos, v.size) + val iter = v.entrySet.iterator + while(iter.hasNext) { + val entry = iter.next + val key = entry.getKey + val value = entry.getValue + + writeKeyValue(dos, key.asInstanceOf[Object], value.asInstanceOf[Object]) + } + case v: scala.collection.Map[_, _] => + writeType(dos, "map") + writeInt(dos, v.size) + v.foreach { case (key, value) => + writeKeyValue(dos, key.asInstanceOf[Object], value.asInstanceOf[Object]) + } + case _ => writeType(dos, "jobj") writeJObj(dos, value) |