diff options
author | Andre Schumacher <schumach@icsi.berkeley.edu> | 2013-08-15 16:01:19 -0700 |
---|---|---|
committer | Andre Schumacher <schumach@icsi.berkeley.edu> | 2013-08-16 11:58:20 -0700 |
commit | c7e348faec45ad1d996d16639015c4bc4fc3bc92 (patch) | |
tree | 45e69b999c4b4af6bd7528e3dcc860bce264e14f /core/src | |
parent | 659553b21ddd7504889ce113a816c1db4a73f167 (diff) | |
download | spark-c7e348faec45ad1d996d16639015c4bc4fc3bc92.tar.gz spark-c7e348faec45ad1d996d16639015c4bc4fc3bc92.tar.bz2 spark-c7e348faec45ad1d996d16639015c4bc4fc3bc92.zip |
Implementing SPARK-878 for PySpark: adding zip and egg files to context and passing it down to workers which add these to their sys.path
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/spark/api/python/PythonRDD.scala | 9 |
1 files changed, 8 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index 2dd79f7100..49671437d0 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -33,6 +33,7 @@ private[spark] class PythonRDD[T: ClassManifest]( parent: RDD[T], command: Seq[String], envVars: JMap[String, String], + pythonIncludes: JList[String], preservePartitoning: Boolean, pythonExec: String, broadcastVars: JList[Broadcast[Array[Byte]]], @@ -44,10 +45,11 @@ private[spark] class PythonRDD[T: ClassManifest]( // Similar to Runtime.exec(), if we are given a single string, split it into words // using a standard StringTokenizer (i.e. by spaces) def this(parent: RDD[T], command: String, envVars: JMap[String, String], + pythonIncludes: JList[String], preservePartitoning: Boolean, pythonExec: String, broadcastVars: JList[Broadcast[Array[Byte]]], accumulator: Accumulator[JList[Array[Byte]]]) = - this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec, + this(parent, PipedRDD.tokenize(command), envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator) override def getPartitions = parent.partitions @@ -79,6 +81,11 @@ private[spark] class PythonRDD[T: ClassManifest]( dataOut.writeInt(broadcast.value.length) dataOut.write(broadcast.value) } + // Python includes (*.zip and *.egg files) + dataOut.writeInt(pythonIncludes.length) + for (f <- pythonIncludes) { + PythonRDD.writeAsPickle(f, dataOut) + } dataOut.flush() // Serialized user code for (elem <- command) { |