aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-08-18 17:02:54 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-08-18 17:02:54 -0700
commit8fa0747978e2600705657ba21f8d1ccef37dc722 (patch)
treed948f96ec874424542191cb7c5052b2bf34366d4 /core
parent1e137a5a21b67d0fb74e28e47b8d8b1b4ede931c (diff)
parentc7e348faec45ad1d996d16639015c4bc4fc3bc92 (diff)
downloadspark-8fa0747978e2600705657ba21f8d1ccef37dc722.tar.gz
spark-8fa0747978e2600705657ba21f8d1ccef37dc722.tar.bz2
spark-8fa0747978e2600705657ba21f8d1ccef37dc722.zip
Merge pull request #840 from AndreSchumacher/zipegg
Implementing SPARK-878 for PySpark: adding zip and egg files to context ...
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala9
1 files changed, 8 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index 2dd79f7100..49671437d0 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -33,6 +33,7 @@ private[spark] class PythonRDD[T: ClassManifest](
parent: RDD[T],
command: Seq[String],
envVars: JMap[String, String],
+ pythonIncludes: JList[String],
preservePartitoning: Boolean,
pythonExec: String,
broadcastVars: JList[Broadcast[Array[Byte]]],
@@ -44,10 +45,11 @@ private[spark] class PythonRDD[T: ClassManifest](
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
def this(parent: RDD[T], command: String, envVars: JMap[String, String],
+ pythonIncludes: JList[String],
preservePartitoning: Boolean, pythonExec: String,
broadcastVars: JList[Broadcast[Array[Byte]]],
accumulator: Accumulator[JList[Array[Byte]]]) =
- this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec,
+ this(parent, PipedRDD.tokenize(command), envVars, pythonIncludes, preservePartitoning, pythonExec,
broadcastVars, accumulator)
override def getPartitions = parent.partitions
@@ -79,6 +81,11 @@ private[spark] class PythonRDD[T: ClassManifest](
dataOut.writeInt(broadcast.value.length)
dataOut.write(broadcast.value)
}
+ // Python includes (*.zip and *.egg files)
+ dataOut.writeInt(pythonIncludes.length)
+ for (f <- pythonIncludes) {
+ PythonRDD.writeAsPickle(f, dataOut)
+ }
dataOut.flush()
// Serialized user code
for (elem <- command) {