aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-28 09:06:11 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-28 09:06:11 -0800
commitfbadb1cda504b256e3d12c4ce389e723b6f2503c (patch)
treec235173f12e4edd97940681a52f099b6bf6570e8 /core
parent665466dfff4f89196627a0777eabd3d3894cd296 (diff)
downloadspark-fbadb1cda504b256e3d12c4ce389e723b6f2503c.tar.gz
spark-fbadb1cda504b256e3d12c4ce389e723b6f2503c.tar.bz2
spark-fbadb1cda504b256e3d12c4ce389e723b6f2503c.zip
Mark api.python classes as private; echo Java output to stderr.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/api/python/PythonPartitioner.scala2
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala50
2 files changed, 22 insertions, 30 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
index 606a80d1eb..2c829508e5 100644
--- a/core/src/main/scala/spark/api/python/PythonPartitioner.scala
+++ b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
@@ -7,7 +7,7 @@ import java.util.Arrays
/**
* A [[spark.Partitioner]] that performs handling of byte arrays, for use by the Python API.
*/
-class PythonPartitioner(override val numPartitions: Int) extends Partitioner {
+private[spark] class PythonPartitioner(override val numPartitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = {
if (key == null) {
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index 4f870e837a..a80a8eea45 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -3,7 +3,6 @@ package spark.api.python
import java.io._
import java.util.{List => JList}
-import scala.collection.Map
import scala.collection.JavaConversions._
import scala.io.Source
@@ -16,10 +15,26 @@ import spark.OneToOneDependency
import spark.rdd.PipedRDD
-trait PythonRDDBase {
- def compute[T](split: Split, envVars: Map[String, String],
- command: Seq[String], parent: RDD[T], pythonExec: String,
- broadcastVars: java.util.List[Broadcast[Array[Byte]]]): Iterator[Array[Byte]] = {
+private[spark] class PythonRDD[T: ClassManifest](
+ parent: RDD[T], command: Seq[String], envVars: java.util.Map[String, String],
+ preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]])
+ extends RDD[Array[Byte]](parent.context) {
+
+ // Similar to Runtime.exec(), if we are given a single string, split it into words
+ // using a standard StringTokenizer (i.e. by spaces)
+ def this(parent: RDD[T], command: String, envVars: java.util.Map[String, String],
+ preservePartitoning: Boolean, pythonExec: String,
+ broadcastVars: java.util.List[Broadcast[Array[Byte]]]) =
+ this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec,
+ broadcastVars)
+
+ override def splits = parent.splits
+
+ override val dependencies = List(new OneToOneDependency(parent))
+
+ override val partitioner = if (preservePartitoning) parent.partitioner else None
+
+ override def compute(split: Split): Iterator[Array[Byte]] = {
val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/pyspark/pyspark/worker.py"))
@@ -100,29 +115,6 @@ trait PythonRDDBase {
def hasNext = _nextObj.length != 0
}
}
-}
-
-class PythonRDD[T: ClassManifest](
- parent: RDD[T], command: Seq[String], envVars: java.util.Map[String, String],
- preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]])
- extends RDD[Array[Byte]](parent.context) with PythonRDDBase {
-
- // Similar to Runtime.exec(), if we are given a single string, split it into words
- // using a standard StringTokenizer (i.e. by spaces)
- def this(parent: RDD[T], command: String, envVars: java.util.Map[String, String],
- preservePartitoning: Boolean, pythonExec: String,
- broadcastVars: java.util.List[Broadcast[Array[Byte]]]) =
- this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec,
- broadcastVars)
-
- override def splits = parent.splits
-
- override val dependencies = List(new OneToOneDependency(parent))
-
- override val partitioner = if (preservePartitoning) parent.partitioner else None
-
- override def compute(split: Split): Iterator[Array[Byte]] =
- compute(split, envVars.toMap, command, parent, pythonExec, broadcastVars)
val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
}
@@ -139,7 +131,7 @@ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this)
}
-object PythonRDD {
+private[spark] object PythonRDD {
/** Strips the pickle PROTO and STOP opcodes from the start and end of a pickle */
def stripPickle(arr: Array[Byte]) : Array[Byte] = {