aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSun Rui <rui.sun@intel.com>2015-08-25 13:14:10 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-08-25 13:14:10 -0700
commit71a138cd0e0a14e8426f97877e3b52a562bbd02c (patch)
treee0d2f675ec969b7a5c24c46414999d16c8fc759e /sql
parent16a2be1a84c0a274a60c0a584faaf58b55d4942b (diff)
downloadspark-71a138cd0e0a14e8426f97877e3b52a562bbd02c.tar.gz
spark-71a138cd0e0a14e8426f97877e3b52a562bbd02c.tar.bz2
spark-71a138cd0e0a14e8426f97877e3b52a562bbd02c.zip
[SPARK-10048] [SPARKR] Support arbitrary nested Java array in serde.
This PR: 1. supports transferring arbitrary nested array from JVM to R side in SerDe; 2. based on 1, collect() implemenation is improved. Now it can support collecting data of complex types from a DataFrame. Author: Sun Rui <rui.sun@intel.com> Closes #8276 from sun-rui/SPARK-10048.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala32
1 files changed, 4 insertions, 28 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index 92861ab038..7f3defec3d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -98,27 +98,17 @@ private[r] object SQLUtils {
val bos = new ByteArrayOutputStream()
val dos = new DataOutputStream(bos)
- SerDe.writeInt(dos, row.length)
- (0 until row.length).map { idx =>
- val obj: Object = row(idx).asInstanceOf[Object]
- SerDe.writeObject(dos, obj)
- }
+ val cols = (0 until row.length).map(row(_).asInstanceOf[Object]).toArray
+ SerDe.writeObject(dos, cols)
bos.toByteArray()
}
- def dfToCols(df: DataFrame): Array[Array[Byte]] = {
+ def dfToCols(df: DataFrame): Array[Array[Any]] = {
// localDF is Array[Row]
val localDF = df.collect()
val numCols = df.columns.length
- // dfCols is Array[Array[Any]]
- val dfCols = convertRowsToColumns(localDF, numCols)
-
- dfCols.map { col =>
- colToRBytes(col)
- }
- }
- def convertRowsToColumns(localDF: Array[Row], numCols: Int): Array[Array[Any]] = {
+ // result is Array[Array[Any]]
(0 until numCols).map { colIdx =>
localDF.map { row =>
row(colIdx)
@@ -126,20 +116,6 @@ private[r] object SQLUtils {
}.toArray
}
- def colToRBytes(col: Array[Any]): Array[Byte] = {
- val numRows = col.length
- val bos = new ByteArrayOutputStream()
- val dos = new DataOutputStream(bos)
-
- SerDe.writeInt(dos, numRows)
-
- col.map { item =>
- val obj: Object = item.asInstanceOf[Object]
- SerDe.writeObject(dos, obj)
- }
- bos.toByteArray()
- }
-
def saveMode(mode: String): SaveMode = {
mode match {
case "append" => SaveMode.Append