aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-29 16:01:03 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-29 16:01:03 -0800
commit59195c68ec37acf20d527189ed757397b273a207 (patch)
tree33d583fe91325fbc2bb94b27ba4ef3503e7fdda3 /core
parentc5cee53f2092ee2825095a1831ca47f1c41afc2f (diff)
downloadspark-59195c68ec37acf20d527189ed757397b273a207.tar.gz
spark-59195c68ec37acf20d527189ed757397b273a207.tar.bz2
spark-59195c68ec37acf20d527189ed757397b273a207.zip
Update PySpark for compatibility with TaskContext.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala13
1 files changed, 5 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index f76616a4c4..dc48378fdc 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -8,10 +8,7 @@ import scala.io.Source
import spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import spark.broadcast.Broadcast
-import spark.SparkEnv
-import spark.Split
-import spark.RDD
-import spark.OneToOneDependency
+import spark._
import spark.rdd.PipedRDD
@@ -34,7 +31,7 @@ private[spark] class PythonRDD[T: ClassManifest](
override val partitioner = if (preservePartitoning) parent.partitioner else None
- override def compute(split: Split): Iterator[Array[Byte]] = {
+ override def compute(split: Split, context: TaskContext): Iterator[Array[Byte]] = {
val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/pyspark/pyspark/worker.py"))
@@ -74,7 +71,7 @@ private[spark] class PythonRDD[T: ClassManifest](
out.println(elem)
}
out.flush()
- for (elem <- parent.iterator(split)) {
+ for (elem <- parent.iterator(split, context)) {
PythonRDD.writeAsPickle(elem, dOut)
}
dOut.flush()
@@ -123,8 +120,8 @@ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
RDD[(Array[Byte], Array[Byte])](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
- override def compute(split: Split) =
- prev.iterator(split).grouped(2).map {
+ override def compute(split: Split, context: TaskContext) =
+ prev.iterator(split, context).grouped(2).map {
case Seq(a, b) => (a, b)
case x => throw new Exception("PairwiseRDD: unexpected value: " + x)
}