diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-12-29 16:01:03 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-12-29 16:01:03 -0800 |
commit | 59195c68ec37acf20d527189ed757397b273a207 (patch) | |
tree | 33d583fe91325fbc2bb94b27ba4ef3503e7fdda3 /core | |
parent | c5cee53f2092ee2825095a1831ca47f1c41afc2f (diff) | |
download | spark-59195c68ec37acf20d527189ed757397b273a207.tar.gz spark-59195c68ec37acf20d527189ed757397b273a207.tar.bz2 spark-59195c68ec37acf20d527189ed757397b273a207.zip |
Update PySpark for compatibility with TaskContext.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/api/python/PythonRDD.scala | 13 |
1 files changed, 5 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index f76616a4c4..dc48378fdc 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -8,10 +8,7 @@ import scala.io.Source import spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} import spark.broadcast.Broadcast -import spark.SparkEnv -import spark.Split -import spark.RDD -import spark.OneToOneDependency +import spark._ import spark.rdd.PipedRDD @@ -34,7 +31,7 @@ private[spark] class PythonRDD[T: ClassManifest]( override val partitioner = if (preservePartitoning) parent.partitioner else None - override def compute(split: Split): Iterator[Array[Byte]] = { + override def compute(split: Split, context: TaskContext): Iterator[Array[Byte]] = { val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME") val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/pyspark/pyspark/worker.py")) @@ -74,7 +71,7 @@ private[spark] class PythonRDD[T: ClassManifest]( out.println(elem) } out.flush() - for (elem <- parent.iterator(split)) { + for (elem <- parent.iterator(split, context)) { PythonRDD.writeAsPickle(elem, dOut) } dOut.flush() @@ -123,8 +120,8 @@ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends RDD[(Array[Byte], Array[Byte])](prev.context) { override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = - prev.iterator(split).grouped(2).map { + override def compute(split: Split, context: TaskContext) = + prev.iterator(split, context).grouped(2).map { case Seq(a, b) => (a, b) case x => throw new Exception("PairwiseRDD: unexpected value: " + x) } |