aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSun Rui <rui.sun@intel.com>2016-04-29 16:41:07 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2016-04-29 16:41:07 -0700
commit4ae9fe091c2cb8388c581093d62d3deaef40993e (patch)
treefd84ce605c0ea8bd9d0b2e307119bd5d8651c9f5 /core
parentd78fbcc3cc9c379b4a548ebc816c6f71cc71a16e (diff)
downloadspark-4ae9fe091c2cb8388c581093d62d3deaef40993e.tar.gz
spark-4ae9fe091c2cb8388c581093d62d3deaef40993e.tar.bz2
spark-4ae9fe091c2cb8388c581093d62d3deaef40993e.zip
[SPARK-12919][SPARKR] Implement dapply() on DataFrame in SparkR.
## What changes were proposed in this pull request? dapply() applies an R function on each partition of a DataFrame and returns a new DataFrame. The function signature is: dapply(df, function(localDF) {}, schema = NULL) R function input: local data.frame from the partition on local node R function output: local data.frame Schema specifies the Row format of the resulting DataFrame. It must match the R function's output. If schema is not specified, each partition of the result DataFrame will be serialized in R into a single byte array. Such resulting DataFrame can be processed by successive calls to dapply(). ## How was this patch tested? SparkR unit tests. Author: Sun Rui <rui.sun@intel.com> Author: Sun Rui <sunrui2016@gmail.com> Closes #12493 from sun-rui/SPARK-12919.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RRunner.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/SerDe.scala2
3 files changed, 12 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
index 606ba6ef86..59c8429c80 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
@@ -46,7 +46,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
// The parent may be also an RRDD, so we should launch it first.
val parentIterator = firstParent[T].iterator(partition, context)
- runner.compute(parentIterator, partition.index, context)
+ runner.compute(parentIterator, partition.index)
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala
index 07d1fa2c4a..24ad689f83 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala
@@ -38,7 +38,9 @@ private[spark] class RRunner[U](
serializer: String,
packageNames: Array[Byte],
broadcastVars: Array[Broadcast[Object]],
- numPartitions: Int = -1)
+ numPartitions: Int = -1,
+ isDataFrame: Boolean = false,
+ colNames: Array[String] = null)
extends Logging {
private var bootTime: Double = _
private var dataStream: DataInputStream = _
@@ -53,8 +55,7 @@ private[spark] class RRunner[U](
def compute(
inputIterator: Iterator[_],
- partitionIndex: Int,
- context: TaskContext): Iterator[U] = {
+ partitionIndex: Int): Iterator[U] = {
// Timing start
bootTime = System.currentTimeMillis / 1000.0
@@ -148,6 +149,12 @@ private[spark] class RRunner[U](
dataOut.writeInt(numPartitions)
+ dataOut.writeInt(if (isDataFrame) 1 else 0)
+
+ if (isDataFrame) {
+ SerDe.writeObject(dataOut, colNames)
+ }
+
if (!iter.hasNext) {
dataOut.writeInt(0)
} else {
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index 8e4e80a24a..e4932a4192 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -459,7 +459,7 @@ private[spark] object SerDe {
}
-private[r] object SerializationFormats {
+private[spark] object SerializationFormats {
val BYTE = "byte"
val STRING = "string"
val ROW = "row"