aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-08 16:04:41 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-08 16:05:02 -0800
commitb57dd0f16024a82dfc223e69528b9908b931f068 (patch)
tree8ad1222593da58eaeb7746aecaef2c41c5313f71 /core
parent33beba39656fc64984db09a82fc69ca4edcc02d4 (diff)
downloadspark-b57dd0f16024a82dfc223e69528b9908b931f068.tar.gz
spark-b57dd0f16024a82dfc223e69528b9908b931f068.tar.bz2
spark-b57dd0f16024a82dfc223e69528b9908b931f068.zip
Add mapPartitionsWithSplit() to PySpark.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala5
1 files changed, 5 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index 79d824d494..f431ef28d3 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -65,6 +65,9 @@ private[spark] class PythonRDD[T: ClassManifest](
SparkEnv.set(env)
val out = new PrintWriter(proc.getOutputStream)
val dOut = new DataOutputStream(proc.getOutputStream)
+ // Split index
+ dOut.writeInt(split.index)
+ // Broadcast variables
dOut.writeInt(broadcastVars.length)
for (broadcast <- broadcastVars) {
dOut.writeLong(broadcast.id)
@@ -72,10 +75,12 @@ private[spark] class PythonRDD[T: ClassManifest](
dOut.write(broadcast.value)
dOut.flush()
}
+ // Serialized user code
for (elem <- command) {
out.println(elem)
}
out.flush()
+ // Data values
for (elem <- parent.iterator(split, context)) {
PythonRDD.writeAsPickle(elem, dOut)
}