From ebff7327af5efa9f57c605284de4fae6b050ae0f Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Fri, 8 May 2015 08:44:46 -0500 Subject: [SPARK-6869] [PYSPARK] Add pyspark archives path to PYTHONPATH Based on https://github.com/apache/spark/pull/5478 that provide a PYSPARK_ARCHIVES_PATH env. within this PR, we just should export PYSPARK_ARCHIVES_PATH=/user/spark/pyspark.zip,/user/spark/python/lib/py4j-0.8.2.1-src.zip in conf/spark-env.sh when we don't install PySpark on each node of Yarn. i run python application successfully on yarn-client and yarn-cluster with this PR. andrewor14 sryza Sephiroth-Lin Can you take a look at this?thanks. Author: Lianhui Wang Closes #5580 from lianhuiwang/SPARK-6869 and squashes the following commits: 66ffa43 [Lianhui Wang] Update Client.scala c2ad0f9 [Lianhui Wang] Update Client.scala 1c8f664 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 008850a [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 f0b4ed8 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 150907b [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 20402cd [Lianhui Wang] use ZipEntry 9d87c3f [Lianhui Wang] update scala style e7bd971 [Lianhui Wang] address vanzin's comments 4b8a3ed [Lianhui Wang] use pyArchivesEnvOpt e6b573b [Lianhui Wang] address vanzin's comments f11f84a [Lianhui Wang] zip pyspark archives 5192cca [Lianhui Wang] update import path 3b1e4c8 [Lianhui Wang] address tgravescs's comments 9396346 [Lianhui Wang] put zip to make-distribution.sh 0d2baf7 [Lianhui Wang] update import paths e0179be [Lianhui Wang] add zip pyspark archives in build or sparksubmit 31e8e06 [Lianhui Wang] update code style 9f31dac [Lianhui Wang] update code and add comments f72987c [Lianhui Wang] add archives path to PYTHONPATH --- project/SparkBuild.scala | 37 +++++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) (limited to 'project/SparkBuild.scala') diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 026855f8f6..186345af0e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -370,6 +370,7 @@ object Assembly { object PySparkAssembly { import sbtassembly.Plugin._ import AssemblyKeys._ + import java.util.zip.{ZipOutputStream, ZipEntry} lazy val settings = Seq( unmanagedJars in Compile += { BuildCommons.sparkHome / "python/lib/py4j-0.8.2.1-src.zip" }, @@ -377,16 +378,48 @@ object PySparkAssembly { // to be included in the assembly. We can't just add "python/" to the assembly's resource dir // list since that will copy unneeded / unwanted files. resourceGenerators in Compile <+= resourceManaged in Compile map { outDir: File => + val src = new File(BuildCommons.sparkHome, "python/pyspark") + + val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip") + zipFile.delete() + zipRecursive(src, zipFile) + val dst = new File(outDir, "pyspark") if (!dst.isDirectory()) { require(dst.mkdirs()) } - - val src = new File(BuildCommons.sparkHome, "python/pyspark") copy(src, dst) } ) + private def zipRecursive(source: File, destZipFile: File) = { + val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile)) + addFilesToZipStream("", source, destOutput) + destOutput.flush() + destOutput.close() + } + + private def addFilesToZipStream(parent: String, source: File, output: ZipOutputStream): Unit = { + if (source.isDirectory()) { + output.putNextEntry(new ZipEntry(parent + source.getName())) + for (file <- source.listFiles()) { + addFilesToZipStream(parent + source.getName() + File.separator, file, output) + } + } else { + val in = new FileInputStream(source) + output.putNextEntry(new ZipEntry(parent + source.getName())) + val buf = new Array[Byte](8192) + var n = 0 + while (n != -1) { + n = in.read(buf) + if (n != -1) { + output.write(buf, 0, n) + } + } + in.close() + } + } + private def copy(src: File, dst: File): Seq[File] = { src.listFiles().flatMap { f => val child = new File(dst, f.getName()) -- cgit v1.2.3