aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-08-25 13:59:01 -0700
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-08-27 00:16:47 -0700
commitf79a1e4d2a8643157136de69b8d7de84f0034712 (patch)
tree679f4453a382e121ffd4fc0c9f3a77a4b292d14c /core
parent65e8406029a0fe1e1c5c5d033d335b43f6743a04 (diff)
downloadspark-f79a1e4d2a8643157136de69b8d7de84f0034712.tar.gz
spark-f79a1e4d2a8643157136de69b8d7de84f0034712.tar.bz2
spark-f79a1e4d2a8643157136de69b8d7de84f0034712.zip
Add broadcast variables to Python API.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala43
1 files changed, 26 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index 93847e2f14..5163812df4 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -7,14 +7,13 @@ import scala.collection.JavaConversions._
import scala.io.Source
import spark._
import api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
-import scala.{collection, Some}
-import collection.parallel.mutable
+import broadcast.Broadcast
import scala.collection
-import scala.Some
trait PythonRDDBase {
def compute[T](split: Split, envVars: Map[String, String],
- command: Seq[String], parent: RDD[T], pythonExec: String): Iterator[Array[Byte]] = {
+ command: Seq[String], parent: RDD[T], pythonExec: String,
+ broadcastVars: java.util.List[Broadcast[Array[Byte]]]): Iterator[Array[Byte]] = {
val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/pyspark/pyspark/worker.py"))
@@ -42,11 +41,18 @@ trait PythonRDDBase {
override def run() {
SparkEnv.set(env)
val out = new PrintWriter(proc.getOutputStream)
+ val dOut = new DataOutputStream(proc.getOutputStream)
+ out.println(broadcastVars.length)
+ for (broadcast <- broadcastVars) {
+ out.print(broadcast.uuid.toString)
+ dOut.writeInt(broadcast.value.length)
+ dOut.write(broadcast.value)
+ dOut.flush()
+ }
for (elem <- command) {
out.println(elem)
}
out.flush()
- val dOut = new DataOutputStream(proc.getOutputStream)
for (elem <- parent.iterator(split)) {
if (elem.isInstanceOf[Array[Byte]]) {
val arr = elem.asInstanceOf[Array[Byte]]
@@ -121,16 +127,17 @@ trait PythonRDDBase {
class PythonRDD[T: ClassManifest](
parent: RDD[T], command: Seq[String], envVars: Map[String, String],
- preservePartitoning: Boolean, pythonExec: String)
+ preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]])
extends RDD[Array[Byte]](parent.context) with PythonRDDBase {
- def this(parent: RDD[T], command: Seq[String], preservePartitoning: Boolean, pythonExec: String) =
- this(parent, command, Map(), preservePartitoning, pythonExec)
+ def this(parent: RDD[T], command: Seq[String], preservePartitoning: Boolean,
+ pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]]) =
+ this(parent, command, Map(), preservePartitoning, pythonExec, broadcastVars)
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
- def this(parent: RDD[T], command: String, preservePartitoning: Boolean, pythonExec: String) =
- this(parent, PipedRDD.tokenize(command), preservePartitoning, pythonExec)
+ def this(parent: RDD[T], command: String, preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]]) =
+ this(parent, PipedRDD.tokenize(command), preservePartitoning, pythonExec, broadcastVars)
override def splits = parent.splits
@@ -139,23 +146,25 @@ class PythonRDD[T: ClassManifest](
override val partitioner = if (preservePartitoning) parent.partitioner else None
override def compute(split: Split): Iterator[Array[Byte]] =
- compute(split, envVars, command, parent, pythonExec)
+ compute(split, envVars, command, parent, pythonExec, broadcastVars)
val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
}
class PythonPairRDD[T: ClassManifest] (
parent: RDD[T], command: Seq[String], envVars: Map[String, String],
- preservePartitoning: Boolean, pythonExec: String)
+ preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]])
extends RDD[(Array[Byte], Array[Byte])](parent.context) with PythonRDDBase {
- def this(parent: RDD[T], command: Seq[String], preservePartitoning: Boolean, pythonExec: String) =
- this(parent, command, Map(), preservePartitoning, pythonExec)
+ def this(parent: RDD[T], command: Seq[String], preservePartitoning: Boolean,
+ pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]]) =
+ this(parent, command, Map(), preservePartitoning, pythonExec, broadcastVars)
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
- def this(parent: RDD[T], command: String, preservePartitoning: Boolean, pythonExec: String) =
- this(parent, PipedRDD.tokenize(command), preservePartitoning, pythonExec)
+ def this(parent: RDD[T], command: String, preservePartitoning: Boolean, pythonExec: String,
+ broadcastVars: java.util.List[Broadcast[Array[Byte]]]) =
+ this(parent, PipedRDD.tokenize(command), preservePartitoning, pythonExec, broadcastVars)
override def splits = parent.splits
@@ -164,7 +173,7 @@ class PythonPairRDD[T: ClassManifest] (
override val partitioner = if (preservePartitoning) parent.partitioner else None
override def compute(split: Split): Iterator[(Array[Byte], Array[Byte])] = {
- compute(split, envVars, command, parent, pythonExec).grouped(2).map {
+ compute(split, envVars, command, parent, pythonExec, broadcastVars).grouped(2).map {
case Seq(a, b) => (a, b)
case x => throw new Exception("PythonPairRDD: unexpected value: " + x)
}