aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/api/python/PythonRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/api/python/PythonRDD.scala')
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala5
1 files changed, 5 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index 79d824d494..f431ef28d3 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -65,6 +65,9 @@ private[spark] class PythonRDD[T: ClassManifest](
SparkEnv.set(env)
val out = new PrintWriter(proc.getOutputStream)
val dOut = new DataOutputStream(proc.getOutputStream)
+ // Split index
+ dOut.writeInt(split.index)
+ // Broadcast variables
dOut.writeInt(broadcastVars.length)
for (broadcast <- broadcastVars) {
dOut.writeLong(broadcast.id)
@@ -72,10 +75,12 @@ private[spark] class PythonRDD[T: ClassManifest](
dOut.write(broadcast.value)
dOut.flush()
}
+ // Serialized user code
for (elem <- command) {
out.println(elem)
}
out.flush()
+ // Data values
for (elem <- parent.iterator(split, context)) {
PythonRDD.writeAsPickle(elem, dOut)
}