aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-01 13:52:14 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-01 13:52:14 -0800
commit170e451fbdd308ae77065bd9c0f2bd278abf0cb7 (patch)
treeda3df59e2262dac4b381227d5bc712502249d746 /core
parent6f6a6b79c4c3f3555f8ff427c91e714d02afe8fa (diff)
downloadspark-170e451fbdd308ae77065bd9c0f2bd278abf0cb7.tar.gz
spark-170e451fbdd308ae77065bd9c0f2bd278abf0cb7.tar.bz2
spark-170e451fbdd308ae77065bd9c0f2bd278abf0cb7.zip
Minor documentation and style fixes for PySpark.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/api/python/PythonPartitioner.scala4
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala43
2 files changed, 32 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
index 2c829508e5..648d9402b0 100644
--- a/core/src/main/scala/spark/api/python/PythonPartitioner.scala
+++ b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
@@ -17,9 +17,9 @@ private[spark] class PythonPartitioner(override val numPartitions: Int) extends
val hashCode = {
if (key.isInstanceOf[Array[Byte]]) {
Arrays.hashCode(key.asInstanceOf[Array[Byte]])
- }
- else
+ } else {
key.hashCode()
+ }
}
val mod = hashCode % numPartitions
if (mod < 0) {
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index dc48378fdc..19a039e330 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -13,8 +13,12 @@ import spark.rdd.PipedRDD
private[spark] class PythonRDD[T: ClassManifest](
- parent: RDD[T], command: Seq[String], envVars: java.util.Map[String, String],
- preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]])
+ parent: RDD[T],
+ command: Seq[String],
+ envVars: java.util.Map[String, String],
+ preservePartitoning: Boolean,
+ pythonExec: String,
+ broadcastVars: java.util.List[Broadcast[Array[Byte]]])
extends RDD[Array[Byte]](parent.context) {
// Similar to Runtime.exec(), if we are given a single string, split it into words
@@ -38,8 +42,8 @@ private[spark] class PythonRDD[T: ClassManifest](
// Add the environmental variables to the process.
val currentEnvVars = pb.environment()
- envVars.foreach {
- case (variable, value) => currentEnvVars.put(variable, value)
+ for ((variable, value) <- envVars) {
+ currentEnvVars.put(variable, value)
}
val proc = pb.start()
@@ -116,6 +120,10 @@ private[spark] class PythonRDD[T: ClassManifest](
val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
}
+/**
+ * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
+ * This is used by PySpark's shuffle operations.
+ */
private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
RDD[(Array[Byte], Array[Byte])](prev.context) {
override def splits = prev.splits
@@ -139,6 +147,16 @@ private[spark] object PythonRDD {
* Write strings, pickled Python objects, or pairs of pickled objects to a data output stream.
* The data format is a 32-bit integer representing the pickled object's length (in bytes),
* followed by the pickled data.
+ *
+ * Pickle module:
+ *
+ * http://docs.python.org/2/library/pickle.html
+ *
+ * The pickle protocol is documented in the source of the `pickle` and `pickletools` modules:
+ *
+ * http://hg.python.org/cpython/file/2.6/Lib/pickle.py
+ * http://hg.python.org/cpython/file/2.6/Lib/pickletools.py
+ *
* @param elem the object to write
* @param dOut a data output stream
*/
@@ -201,15 +219,14 @@ private[spark] object PythonRDD {
}
private object Pickle {
- def b(x: Int): Byte = x.asInstanceOf[Byte]
- val PROTO: Byte = b(0x80)
- val TWO: Byte = b(0x02)
- val BINUNICODE : Byte = 'X'
- val STOP : Byte = '.'
- val TUPLE2 : Byte = b(0x86)
- val EMPTY_LIST : Byte = ']'
- val MARK : Byte = '('
- val APPENDS : Byte = 'e'
+ val PROTO: Byte = 0x80.toByte
+ val TWO: Byte = 0x02.toByte
+ val BINUNICODE: Byte = 'X'
+ val STOP: Byte = '.'
+ val TUPLE2: Byte = 0x86.toByte
+ val EMPTY_LIST: Byte = ']'
+ val MARK: Byte = '('
+ val APPENDS: Byte = 'e'
}
private class ExtractValue extends spark.api.java.function.Function[(Array[Byte],