aboutsummaryrefslogtreecommitdiff
path: root/project
diff options
context:
space:
mode:
authorLianhui Wang <lianhuiwang09@gmail.com>2015-05-08 08:44:46 -0500
committerThomas Graves <tgraves@apache.org>2015-05-08 08:44:46 -0500
commitebff7327af5efa9f57c605284de4fae6b050ae0f (patch)
treeed814cf7c903da2404a55b4c5bdcc8eb0648f518 /project
parentc2f0821aad3b82dcd327e914c9b297e92526649d (diff)
downloadspark-ebff7327af5efa9f57c605284de4fae6b050ae0f.tar.gz
spark-ebff7327af5efa9f57c605284de4fae6b050ae0f.tar.bz2
spark-ebff7327af5efa9f57c605284de4fae6b050ae0f.zip
[SPARK-6869] [PYSPARK] Add pyspark archives path to PYTHONPATH
Based on https://github.com/apache/spark/pull/5478 that provide a PYSPARK_ARCHIVES_PATH env. within this PR, we just should export PYSPARK_ARCHIVES_PATH=/user/spark/pyspark.zip,/user/spark/python/lib/py4j-0.8.2.1-src.zip in conf/spark-env.sh when we don't install PySpark on each node of Yarn. i run python application successfully on yarn-client and yarn-cluster with this PR. andrewor14 sryza Sephiroth-Lin Can you take a look at this?thanks. Author: Lianhui Wang <lianhuiwang09@gmail.com> Closes #5580 from lianhuiwang/SPARK-6869 and squashes the following commits: 66ffa43 [Lianhui Wang] Update Client.scala c2ad0f9 [Lianhui Wang] Update Client.scala 1c8f664 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 008850a [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 f0b4ed8 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 150907b [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 20402cd [Lianhui Wang] use ZipEntry 9d87c3f [Lianhui Wang] update scala style e7bd971 [Lianhui Wang] address vanzin's comments 4b8a3ed [Lianhui Wang] use pyArchivesEnvOpt e6b573b [Lianhui Wang] address vanzin's comments f11f84a [Lianhui Wang] zip pyspark archives 5192cca [Lianhui Wang] update import path 3b1e4c8 [Lianhui Wang] address tgravescs's comments 9396346 [Lianhui Wang] put zip to make-distribution.sh 0d2baf7 [Lianhui Wang] update import paths e0179be [Lianhui Wang] add zip pyspark archives in build or sparksubmit 31e8e06 [Lianhui Wang] update code style 9f31dac [Lianhui Wang] update code and add comments f72987c [Lianhui Wang] add archives path to PYTHONPATH
Diffstat (limited to 'project')
-rw-r--r--project/SparkBuild.scala37
1 files changed, 35 insertions, 2 deletions
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 026855f8f6..186345af0e 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -370,6 +370,7 @@ object Assembly {
object PySparkAssembly {
import sbtassembly.Plugin._
import AssemblyKeys._
+ import java.util.zip.{ZipOutputStream, ZipEntry}
lazy val settings = Seq(
unmanagedJars in Compile += { BuildCommons.sparkHome / "python/lib/py4j-0.8.2.1-src.zip" },
@@ -377,16 +378,48 @@ object PySparkAssembly {
// to be included in the assembly. We can't just add "python/" to the assembly's resource dir
// list since that will copy unneeded / unwanted files.
resourceGenerators in Compile <+= resourceManaged in Compile map { outDir: File =>
+ val src = new File(BuildCommons.sparkHome, "python/pyspark")
+
+ val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip")
+ zipFile.delete()
+ zipRecursive(src, zipFile)
+
val dst = new File(outDir, "pyspark")
if (!dst.isDirectory()) {
require(dst.mkdirs())
}
-
- val src = new File(BuildCommons.sparkHome, "python/pyspark")
copy(src, dst)
}
)
+ private def zipRecursive(source: File, destZipFile: File) = {
+ val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile))
+ addFilesToZipStream("", source, destOutput)
+ destOutput.flush()
+ destOutput.close()
+ }
+
+ private def addFilesToZipStream(parent: String, source: File, output: ZipOutputStream): Unit = {
+ if (source.isDirectory()) {
+ output.putNextEntry(new ZipEntry(parent + source.getName()))
+ for (file <- source.listFiles()) {
+ addFilesToZipStream(parent + source.getName() + File.separator, file, output)
+ }
+ } else {
+ val in = new FileInputStream(source)
+ output.putNextEntry(new ZipEntry(parent + source.getName()))
+ val buf = new Array[Byte](8192)
+ var n = 0
+ while (n != -1) {
+ n = in.read(buf)
+ if (n != -1) {
+ output.write(buf, 0, n)
+ }
+ }
+ in.close()
+ }
+ }
+
private def copy(src: File, dst: File): Seq[File] = {
src.listFiles().flatMap { f =>
val child = new File(dst, f.getName())