diff options
author | Sun Rui <rui.sun@intel.com> | 2016-04-29 16:41:07 -0700 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@cs.berkeley.edu> | 2016-04-29 16:41:07 -0700 |
commit | 4ae9fe091c2cb8388c581093d62d3deaef40993e (patch) | |
tree | fd84ce605c0ea8bd9d0b2e307119bd5d8651c9f5 /core | |
parent | d78fbcc3cc9c379b4a548ebc816c6f71cc71a16e (diff) | |
download | spark-4ae9fe091c2cb8388c581093d62d3deaef40993e.tar.gz spark-4ae9fe091c2cb8388c581093d62d3deaef40993e.tar.bz2 spark-4ae9fe091c2cb8388c581093d62d3deaef40993e.zip |
[SPARK-12919][SPARKR] Implement dapply() on DataFrame in SparkR.
## What changes were proposed in this pull request?
dapply() applies an R function on each partition of a DataFrame and returns a new DataFrame.
The function signature is:
dapply(df, function(localDF) {}, schema = NULL)
R function input: local data.frame from the partition on local node
R function output: local data.frame
Schema specifies the Row format of the resulting DataFrame. It must match the R function's output.
If schema is not specified, each partition of the result DataFrame will be serialized in R into a single byte array. Such resulting DataFrame can be processed by successive calls to dapply().
## How was this patch tested?
SparkR unit tests.
Author: Sun Rui <rui.sun@intel.com>
Author: Sun Rui <sunrui2016@gmail.com>
Closes #12493 from sun-rui/SPARK-12919.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/r/RRDD.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/r/RRunner.scala | 13 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/r/SerDe.scala | 2 |
3 files changed, 12 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala index 606ba6ef86..59c8429c80 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala @@ -46,7 +46,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag]( // The parent may be also an RRDD, so we should launch it first. val parentIterator = firstParent[T].iterator(partition, context) - runner.compute(parentIterator, partition.index, context) + runner.compute(parentIterator, partition.index) } } diff --git a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala index 07d1fa2c4a..24ad689f83 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala @@ -38,7 +38,9 @@ private[spark] class RRunner[U]( serializer: String, packageNames: Array[Byte], broadcastVars: Array[Broadcast[Object]], - numPartitions: Int = -1) + numPartitions: Int = -1, + isDataFrame: Boolean = false, + colNames: Array[String] = null) extends Logging { private var bootTime: Double = _ private var dataStream: DataInputStream = _ @@ -53,8 +55,7 @@ private[spark] class RRunner[U]( def compute( inputIterator: Iterator[_], - partitionIndex: Int, - context: TaskContext): Iterator[U] = { + partitionIndex: Int): Iterator[U] = { // Timing start bootTime = System.currentTimeMillis / 1000.0 @@ -148,6 +149,12 @@ private[spark] class RRunner[U]( dataOut.writeInt(numPartitions) + dataOut.writeInt(if (isDataFrame) 1 else 0) + + if (isDataFrame) { + SerDe.writeObject(dataOut, colNames) + } + if (!iter.hasNext) { dataOut.writeInt(0) } else { diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index 8e4e80a24a..e4932a4192 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -459,7 +459,7 @@ private[spark] object SerDe { } -private[r] object SerializationFormats { +private[spark] object SerializationFormats { val BYTE = "byte" val STRING = "string" val ROW = "row" |