aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSun Rui <rui.sun@intel.com>2015-10-13 10:02:21 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-10-13 10:02:21 -0700
commit5e3868ba139f5f0b3a33361c6b884594a3ab6421 (patch)
treea37f40a4f112f452f0eba4dd8ac5b9e1eb805e4b /core
parentd0cc79ccd0b4500bd6b18184a723dabc164e8abd (diff)
downloadspark-5e3868ba139f5f0b3a33361c6b884594a3ab6421.tar.gz
spark-5e3868ba139f5f0b3a33361c6b884594a3ab6421.tar.bz2
spark-5e3868ba139f5f0b3a33361c6b884594a3ab6421.zip
[SPARK-10051] [SPARKR] Support collecting data of StructType in DataFrame
Two points in this PR: 1. Originally thought was that a named R list is assumed to be a struct in SerDe. But this is problematic because some R functions will implicitly generate named lists that are not intended to be a struct when transferred by SerDe. So SerDe clients have to explicitly mark a names list as struct by changing its class from "list" to "struct". 2. SerDe is in the Spark Core module, and data of StructType is represented as GenricRow which is defined in Spark SQL module. SerDe can't import GenricRow as in maven build Spark SQL module depends on Spark Core module. So this PR adds a registration hook in SerDe to allow SQLUtils in Spark SQL module to register its functions for serialization and deserialization of StructType. Author: Sun Rui <rui.sun@intel.com> Closes #8794 from sun-rui/SPARK-10051.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/SerDe.scala71
1 files changed, 55 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index 0c78613e40..da126bac7a 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -27,6 +27,14 @@ import scala.collection.mutable.WrappedArray
* Utility functions to serialize, deserialize objects to / from R
*/
private[spark] object SerDe {
+ type ReadObject = (DataInputStream, Char) => Object
+ type WriteObject = (DataOutputStream, Object) => Boolean
+
+ var sqlSerDe: (ReadObject, WriteObject) = _
+
+ def registerSqlSerDe(sqlSerDe: (ReadObject, WriteObject)): Unit = {
+ this.sqlSerDe = sqlSerDe
+ }
// Type mapping from R to Java
//
@@ -63,11 +71,22 @@ private[spark] object SerDe {
case 'c' => readString(dis)
case 'e' => readMap(dis)
case 'r' => readBytes(dis)
+ case 'a' => readArray(dis)
case 'l' => readList(dis)
case 'D' => readDate(dis)
case 't' => readTime(dis)
case 'j' => JVMObjectTracker.getObject(readString(dis))
- case _ => throw new IllegalArgumentException(s"Invalid type $dataType")
+ case _ =>
+ if (sqlSerDe == null || sqlSerDe._1 == null) {
+ throw new IllegalArgumentException (s"Invalid type $dataType")
+ } else {
+ val obj = (sqlSerDe._1)(dis, dataType)
+ if (obj == null) {
+ throw new IllegalArgumentException (s"Invalid type $dataType")
+ } else {
+ obj
+ }
+ }
}
}
@@ -141,7 +160,8 @@ private[spark] object SerDe {
(0 until len).map(_ => readString(in)).toArray
}
- def readList(dis: DataInputStream): Array[_] = {
+ // All elements of an array must be of the same type
+ def readArray(dis: DataInputStream): Array[_] = {
val arrType = readObjectType(dis)
arrType match {
case 'i' => readIntArr(dis)
@@ -150,26 +170,43 @@ private[spark] object SerDe {
case 'b' => readBooleanArr(dis)
case 'j' => readStringArr(dis).map(x => JVMObjectTracker.getObject(x))
case 'r' => readBytesArr(dis)
- case 'l' => {
+ case 'a' =>
+ val len = readInt(dis)
+ (0 until len).map(_ => readArray(dis)).toArray
+ case 'l' =>
val len = readInt(dis)
(0 until len).map(_ => readList(dis)).toArray
- }
- case _ => throw new IllegalArgumentException(s"Invalid array type $arrType")
+ case _ =>
+ if (sqlSerDe == null || sqlSerDe._1 == null) {
+ throw new IllegalArgumentException (s"Invalid array type $arrType")
+ } else {
+ val len = readInt(dis)
+ (0 until len).map { _ =>
+ val obj = (sqlSerDe._1)(dis, arrType)
+ if (obj == null) {
+ throw new IllegalArgumentException (s"Invalid array type $arrType")
+ } else {
+ obj
+ }
+ }.toArray
+ }
}
}
+ // Each element of a list can be of different type. They are all represented
+ // as Object on JVM side
+ def readList(dis: DataInputStream): Array[Object] = {
+ val len = readInt(dis)
+ (0 until len).map(_ => readObject(dis)).toArray
+ }
+
def readMap(in: DataInputStream): java.util.Map[Object, Object] = {
val len = readInt(in)
if (len > 0) {
- val keysType = readObjectType(in)
- val keysLen = readInt(in)
- val keys = (0 until keysLen).map(_ => readTypedObject(in, keysType))
-
- val valuesLen = readInt(in)
- val values = (0 until valuesLen).map(_ => {
- val valueType = readObjectType(in)
- readTypedObject(in, valueType)
- })
+ // Keys is an array of String
+ val keys = readArray(in).asInstanceOf[Array[Object]]
+ val values = readList(in)
+
keys.zip(values).toMap.asJava
} else {
new java.util.HashMap[Object, Object]()
@@ -338,8 +375,10 @@ private[spark] object SerDe {
}
case _ =>
- writeType(dos, "jobj")
- writeJObj(dos, value)
+ if (sqlSerDe == null || sqlSerDe._2 == null || !(sqlSerDe._2)(dos, value)) {
+ writeType(dos, "jobj")
+ writeJObj(dos, value)
+ }
}
}
}