From 170e451fbdd308ae77065bd9c0f2bd278abf0cb7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 1 Jan 2013 13:52:14 -0800 Subject: Minor documentation and style fixes for PySpark. --- .../scala/spark/api/python/PythonPartitioner.scala | 4 +- .../main/scala/spark/api/python/PythonRDD.scala | 43 +++++++++++++++------- 2 files changed, 32 insertions(+), 15 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/spark/api/python/PythonPartitioner.scala index 2c829508e5..648d9402b0 100644 --- a/core/src/main/scala/spark/api/python/PythonPartitioner.scala +++ b/core/src/main/scala/spark/api/python/PythonPartitioner.scala @@ -17,9 +17,9 @@ private[spark] class PythonPartitioner(override val numPartitions: Int) extends val hashCode = { if (key.isInstanceOf[Array[Byte]]) { Arrays.hashCode(key.asInstanceOf[Array[Byte]]) - } - else + } else { key.hashCode() + } } val mod = hashCode % numPartitions if (mod < 0) { diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index dc48378fdc..19a039e330 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -13,8 +13,12 @@ import spark.rdd.PipedRDD private[spark] class PythonRDD[T: ClassManifest]( - parent: RDD[T], command: Seq[String], envVars: java.util.Map[String, String], - preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]]) + parent: RDD[T], + command: Seq[String], + envVars: java.util.Map[String, String], + preservePartitoning: Boolean, + pythonExec: String, + broadcastVars: java.util.List[Broadcast[Array[Byte]]]) extends RDD[Array[Byte]](parent.context) { // Similar to Runtime.exec(), if we are given a single string, split it into words @@ -38,8 +42,8 @@ private[spark] class PythonRDD[T: ClassManifest]( // Add the environmental variables to the process. val currentEnvVars = pb.environment() - envVars.foreach { - case (variable, value) => currentEnvVars.put(variable, value) + for ((variable, value) <- envVars) { + currentEnvVars.put(variable, value) } val proc = pb.start() @@ -116,6 +120,10 @@ private[spark] class PythonRDD[T: ClassManifest]( val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) } +/** + * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python. + * This is used by PySpark's shuffle operations. + */ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends RDD[(Array[Byte], Array[Byte])](prev.context) { override def splits = prev.splits @@ -139,6 +147,16 @@ private[spark] object PythonRDD { * Write strings, pickled Python objects, or pairs of pickled objects to a data output stream. * The data format is a 32-bit integer representing the pickled object's length (in bytes), * followed by the pickled data. + * + * Pickle module: + * + * http://docs.python.org/2/library/pickle.html + * + * The pickle protocol is documented in the source of the `pickle` and `pickletools` modules: + * + * http://hg.python.org/cpython/file/2.6/Lib/pickle.py + * http://hg.python.org/cpython/file/2.6/Lib/pickletools.py + * * @param elem the object to write * @param dOut a data output stream */ @@ -201,15 +219,14 @@ private[spark] object PythonRDD { } private object Pickle { - def b(x: Int): Byte = x.asInstanceOf[Byte] - val PROTO: Byte = b(0x80) - val TWO: Byte = b(0x02) - val BINUNICODE : Byte = 'X' - val STOP : Byte = '.' - val TUPLE2 : Byte = b(0x86) - val EMPTY_LIST : Byte = ']' - val MARK : Byte = '(' - val APPENDS : Byte = 'e' + val PROTO: Byte = 0x80.toByte + val TWO: Byte = 0x02.toByte + val BINUNICODE: Byte = 'X' + val STOP: Byte = '.' + val TUPLE2: Byte = 0x86.toByte + val EMPTY_LIST: Byte = ']' + val MARK: Byte = '(' + val APPENDS: Byte = 'e' } private class ExtractValue extends spark.api.java.function.Function[(Array[Byte], -- cgit v1.2.3