diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-08-25 16:46:07 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-08-27 00:24:39 -0700 |
commit | 200d248dcc5903295296bf897211cf543b37f8c1 (patch) | |
tree | 46df15fbccf99489a1f7f240c71cc56ef083d6d8 /core | |
parent | 6904cb77d4306a14891cc71338c8f9f966d009f1 (diff) | |
download | spark-200d248dcc5903295296bf897211cf543b37f8c1.tar.gz spark-200d248dcc5903295296bf897211cf543b37f8c1.tar.bz2 spark-200d248dcc5903295296bf897211cf543b37f8c1.zip |
Simplify Python worker; pipeline the map step of partitionBy().
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/api/python/PythonRDD.scala | 34 |
1 files changed, 7 insertions, 27 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index 5163812df4..b9091fd436 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -151,38 +151,18 @@ class PythonRDD[T: ClassManifest]( val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) } -class PythonPairRDD[T: ClassManifest] ( - parent: RDD[T], command: Seq[String], envVars: Map[String, String], - preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]]) - extends RDD[(Array[Byte], Array[Byte])](parent.context) with PythonRDDBase { - - def this(parent: RDD[T], command: Seq[String], preservePartitoning: Boolean, - pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]]) = - this(parent, command, Map(), preservePartitoning, pythonExec, broadcastVars) - - // Similar to Runtime.exec(), if we are given a single string, split it into words - // using a standard StringTokenizer (i.e. by spaces) - def this(parent: RDD[T], command: String, preservePartitoning: Boolean, pythonExec: String, - broadcastVars: java.util.List[Broadcast[Array[Byte]]]) = - this(parent, PipedRDD.tokenize(command), preservePartitoning, pythonExec, broadcastVars) - - override def splits = parent.splits - - override val dependencies = List(new OneToOneDependency(parent)) - - override val partitioner = if (preservePartitoning) parent.partitioner else None - - override def compute(split: Split): Iterator[(Array[Byte], Array[Byte])] = { - compute(split, envVars, command, parent, pythonExec, broadcastVars).grouped(2).map { +private class PairwiseRDD(prev: RDD[Array[Byte]]) extends + RDD[(Array[Byte], Array[Byte])](prev.context) { + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = + prev.iterator(split).grouped(2).map { case Seq(a, b) => (a, b) - case x => throw new Exception("PythonPairRDD: unexpected value: " + x) + case x => throw new Exception("PairwiseRDD: unexpected value: " + x) } - } - val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this) } - object PythonRDD { /** Strips the pickle PROTO and STOP opcodes from the start and end of a pickle */ |