diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-12-28 09:06:11 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-12-28 09:06:11 -0800 |
commit | fbadb1cda504b256e3d12c4ce389e723b6f2503c (patch) | |
tree | c235173f12e4edd97940681a52f099b6bf6570e8 /core | |
parent | 665466dfff4f89196627a0777eabd3d3894cd296 (diff) | |
download | spark-fbadb1cda504b256e3d12c4ce389e723b6f2503c.tar.gz spark-fbadb1cda504b256e3d12c4ce389e723b6f2503c.tar.bz2 spark-fbadb1cda504b256e3d12c4ce389e723b6f2503c.zip |
Mark api.python classes as private; echo Java output to stderr.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/api/python/PythonPartitioner.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/python/PythonRDD.scala | 50 |
2 files changed, 22 insertions, 30 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/spark/api/python/PythonPartitioner.scala index 606a80d1eb..2c829508e5 100644 --- a/core/src/main/scala/spark/api/python/PythonPartitioner.scala +++ b/core/src/main/scala/spark/api/python/PythonPartitioner.scala @@ -7,7 +7,7 @@ import java.util.Arrays /** * A [[spark.Partitioner]] that performs handling of byte arrays, for use by the Python API. */ -class PythonPartitioner(override val numPartitions: Int) extends Partitioner { +private[spark] class PythonPartitioner(override val numPartitions: Int) extends Partitioner { override def getPartition(key: Any): Int = { if (key == null) { diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index 4f870e837a..a80a8eea45 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -3,7 +3,6 @@ package spark.api.python import java.io._ import java.util.{List => JList} -import scala.collection.Map import scala.collection.JavaConversions._ import scala.io.Source @@ -16,10 +15,26 @@ import spark.OneToOneDependency import spark.rdd.PipedRDD -trait PythonRDDBase { - def compute[T](split: Split, envVars: Map[String, String], - command: Seq[String], parent: RDD[T], pythonExec: String, - broadcastVars: java.util.List[Broadcast[Array[Byte]]]): Iterator[Array[Byte]] = { +private[spark] class PythonRDD[T: ClassManifest]( + parent: RDD[T], command: Seq[String], envVars: java.util.Map[String, String], + preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]]) + extends RDD[Array[Byte]](parent.context) { + + // Similar to Runtime.exec(), if we are given a single string, split it into words + // using a standard StringTokenizer (i.e. by spaces) + def this(parent: RDD[T], command: String, envVars: java.util.Map[String, String], + preservePartitoning: Boolean, pythonExec: String, + broadcastVars: java.util.List[Broadcast[Array[Byte]]]) = + this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec, + broadcastVars) + + override def splits = parent.splits + + override val dependencies = List(new OneToOneDependency(parent)) + + override val partitioner = if (preservePartitoning) parent.partitioner else None + + override def compute(split: Split): Iterator[Array[Byte]] = { val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME") val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/pyspark/pyspark/worker.py")) @@ -100,29 +115,6 @@ trait PythonRDDBase { def hasNext = _nextObj.length != 0 } } -} - -class PythonRDD[T: ClassManifest]( - parent: RDD[T], command: Seq[String], envVars: java.util.Map[String, String], - preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]]) - extends RDD[Array[Byte]](parent.context) with PythonRDDBase { - - // Similar to Runtime.exec(), if we are given a single string, split it into words - // using a standard StringTokenizer (i.e. by spaces) - def this(parent: RDD[T], command: String, envVars: java.util.Map[String, String], - preservePartitoning: Boolean, pythonExec: String, - broadcastVars: java.util.List[Broadcast[Array[Byte]]]) = - this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec, - broadcastVars) - - override def splits = parent.splits - - override val dependencies = List(new OneToOneDependency(parent)) - - override val partitioner = if (preservePartitoning) parent.partitioner else None - - override def compute(split: Split): Iterator[Array[Byte]] = - compute(split, envVars.toMap, command, parent, pythonExec, broadcastVars) val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) } @@ -139,7 +131,7 @@ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this) } -object PythonRDD { +private[spark] object PythonRDD { /** Strips the pickle PROTO and STOP opcodes from the start and end of a pickle */ def stripPickle(arr: Array[Byte]) : Array[Byte] = { |