diff options
author | Sun Rui <rui.sun@intel.com> | 2015-08-25 13:14:10 -0700 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@cs.berkeley.edu> | 2015-08-25 13:14:10 -0700 |
commit | 71a138cd0e0a14e8426f97877e3b52a562bbd02c (patch) | |
tree | e0d2f675ec969b7a5c24c46414999d16c8fc759e /sql | |
parent | 16a2be1a84c0a274a60c0a584faaf58b55d4942b (diff) | |
download | spark-71a138cd0e0a14e8426f97877e3b52a562bbd02c.tar.gz spark-71a138cd0e0a14e8426f97877e3b52a562bbd02c.tar.bz2 spark-71a138cd0e0a14e8426f97877e3b52a562bbd02c.zip |
[SPARK-10048] [SPARKR] Support arbitrary nested Java array in serde.
This PR:
1. supports transferring arbitrary nested array from JVM to R side in SerDe;
2. based on 1, collect() implemenation is improved. Now it can support collecting data of complex types
from a DataFrame.
Author: Sun Rui <rui.sun@intel.com>
Closes #8276 from sun-rui/SPARK-10048.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala | 32 |
1 files changed, 4 insertions, 28 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 92861ab038..7f3defec3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -98,27 +98,17 @@ private[r] object SQLUtils { val bos = new ByteArrayOutputStream() val dos = new DataOutputStream(bos) - SerDe.writeInt(dos, row.length) - (0 until row.length).map { idx => - val obj: Object = row(idx).asInstanceOf[Object] - SerDe.writeObject(dos, obj) - } + val cols = (0 until row.length).map(row(_).asInstanceOf[Object]).toArray + SerDe.writeObject(dos, cols) bos.toByteArray() } - def dfToCols(df: DataFrame): Array[Array[Byte]] = { + def dfToCols(df: DataFrame): Array[Array[Any]] = { // localDF is Array[Row] val localDF = df.collect() val numCols = df.columns.length - // dfCols is Array[Array[Any]] - val dfCols = convertRowsToColumns(localDF, numCols) - - dfCols.map { col => - colToRBytes(col) - } - } - def convertRowsToColumns(localDF: Array[Row], numCols: Int): Array[Array[Any]] = { + // result is Array[Array[Any]] (0 until numCols).map { colIdx => localDF.map { row => row(colIdx) @@ -126,20 +116,6 @@ private[r] object SQLUtils { }.toArray } - def colToRBytes(col: Array[Any]): Array[Byte] = { - val numRows = col.length - val bos = new ByteArrayOutputStream() - val dos = new DataOutputStream(bos) - - SerDe.writeInt(dos, numRows) - - col.map { item => - val obj: Object = item.asInstanceOf[Object] - SerDe.writeObject(dos, obj) - } - bos.toByteArray() - } - def saveMode(mode: String): SaveMode = { mode match { case "append" => SaveMode.Append |