aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-08-25 16:46:07 -0700
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-08-27 00:24:39 -0700
commit200d248dcc5903295296bf897211cf543b37f8c1 (patch)
tree46df15fbccf99489a1f7f240c71cc56ef083d6d8 /core
parent6904cb77d4306a14891cc71338c8f9f966d009f1 (diff)
downloadspark-200d248dcc5903295296bf897211cf543b37f8c1.tar.gz
spark-200d248dcc5903295296bf897211cf543b37f8c1.tar.bz2
spark-200d248dcc5903295296bf897211cf543b37f8c1.zip
Simplify Python worker; pipeline the map step of partitionBy().
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala34
1 files changed, 7 insertions, 27 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index 5163812df4..b9091fd436 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -151,38 +151,18 @@ class PythonRDD[T: ClassManifest](
val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
}
-class PythonPairRDD[T: ClassManifest] (
- parent: RDD[T], command: Seq[String], envVars: Map[String, String],
- preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]])
- extends RDD[(Array[Byte], Array[Byte])](parent.context) with PythonRDDBase {
-
- def this(parent: RDD[T], command: Seq[String], preservePartitoning: Boolean,
- pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]]) =
- this(parent, command, Map(), preservePartitoning, pythonExec, broadcastVars)
-
- // Similar to Runtime.exec(), if we are given a single string, split it into words
- // using a standard StringTokenizer (i.e. by spaces)
- def this(parent: RDD[T], command: String, preservePartitoning: Boolean, pythonExec: String,
- broadcastVars: java.util.List[Broadcast[Array[Byte]]]) =
- this(parent, PipedRDD.tokenize(command), preservePartitoning, pythonExec, broadcastVars)
-
- override def splits = parent.splits
-
- override val dependencies = List(new OneToOneDependency(parent))
-
- override val partitioner = if (preservePartitoning) parent.partitioner else None
-
- override def compute(split: Split): Iterator[(Array[Byte], Array[Byte])] = {
- compute(split, envVars, command, parent, pythonExec, broadcastVars).grouped(2).map {
+private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
+ RDD[(Array[Byte], Array[Byte])](prev.context) {
+ override def splits = prev.splits
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) =
+ prev.iterator(split).grouped(2).map {
case Seq(a, b) => (a, b)
- case x => throw new Exception("PythonPairRDD: unexpected value: " + x)
+ case x => throw new Exception("PairwiseRDD: unexpected value: " + x)
}
- }
-
val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this)
}
-
object PythonRDD {
/** Strips the pickle PROTO and STOP opcodes from the start and end of a pickle */