aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--assembly/pom.xml21
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala41
-rw-r--r--project/SparkBuild.scala37
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala23
4 files changed, 114 insertions, 8 deletions
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 2b4d0a990b..626c8577e3 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -92,6 +92,27 @@
<skip>true</skip>
</configuration>
</plugin>
+ <!-- zip pyspark archives to run python application on yarn mode -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <target>
+ <delete dir="${basedir}/../python/lib/pyspark.zip"/>
+ <zip destfile="${basedir}/../python/lib/pyspark.zip">
+ <fileset dir="${basedir}/../python/" includes="pyspark/**/*"/>
+ </zip>
+ </target>
+ </configuration>
+ </plugin>
<!-- Use the shade plugin to create a big JAR with all the dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 8a0327984e..329fa06ba8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -332,6 +332,47 @@ object SparkSubmit {
}
}
+ // In yarn mode for a python app, add pyspark archives to files
+ // that can be distributed with the job
+ if (args.isPython && clusterManager == YARN) {
+ var pyArchives: String = null
+ val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH")
+ if (pyArchivesEnvOpt.isDefined) {
+ pyArchives = pyArchivesEnvOpt.get
+ } else {
+ if (!sys.env.contains("SPARK_HOME")) {
+ printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.")
+ }
+ val pythonPath = new ArrayBuffer[String]
+ for (sparkHome <- sys.env.get("SPARK_HOME")) {
+ val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
+ val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
+ if (!pyArchivesFile.exists()) {
+ printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.")
+ }
+ val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
+ if (!py4jFile.exists()) {
+ printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " +
+ "in yarn mode.")
+ }
+ pythonPath += pyArchivesFile.getAbsolutePath()
+ pythonPath += py4jFile.getAbsolutePath()
+ }
+ pyArchives = pythonPath.mkString(",")
+ }
+
+ pyArchives = pyArchives.split(",").map { localPath=>
+ val localURI = Utils.resolveURI(localPath)
+ if (localURI.getScheme != "local") {
+ args.files = mergeFileLists(args.files, localURI.toString)
+ new Path(localPath).getName
+ } else {
+ localURI.getPath
+ }
+ }.mkString(File.pathSeparator)
+ sysProps("spark.submit.pyArchives") = pyArchives
+ }
+
// If we're running a R app, set the main class to our specific R runner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 026855f8f6..186345af0e 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -370,6 +370,7 @@ object Assembly {
object PySparkAssembly {
import sbtassembly.Plugin._
import AssemblyKeys._
+ import java.util.zip.{ZipOutputStream, ZipEntry}
lazy val settings = Seq(
unmanagedJars in Compile += { BuildCommons.sparkHome / "python/lib/py4j-0.8.2.1-src.zip" },
@@ -377,16 +378,48 @@ object PySparkAssembly {
// to be included in the assembly. We can't just add "python/" to the assembly's resource dir
// list since that will copy unneeded / unwanted files.
resourceGenerators in Compile <+= resourceManaged in Compile map { outDir: File =>
+ val src = new File(BuildCommons.sparkHome, "python/pyspark")
+
+ val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip")
+ zipFile.delete()
+ zipRecursive(src, zipFile)
+
val dst = new File(outDir, "pyspark")
if (!dst.isDirectory()) {
require(dst.mkdirs())
}
-
- val src = new File(BuildCommons.sparkHome, "python/pyspark")
copy(src, dst)
}
)
+ private def zipRecursive(source: File, destZipFile: File) = {
+ val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile))
+ addFilesToZipStream("", source, destOutput)
+ destOutput.flush()
+ destOutput.close()
+ }
+
+ private def addFilesToZipStream(parent: String, source: File, output: ZipOutputStream): Unit = {
+ if (source.isDirectory()) {
+ output.putNextEntry(new ZipEntry(parent + source.getName()))
+ for (file <- source.listFiles()) {
+ addFilesToZipStream(parent + source.getName() + File.separator, file, output)
+ }
+ } else {
+ val in = new FileInputStream(source)
+ output.putNextEntry(new ZipEntry(parent + source.getName()))
+ val buf = new Array[Byte](8192)
+ var n = 0
+ while (n != -1) {
+ n = in.read(buf)
+ if (n != -1) {
+ output.write(buf, 0, n)
+ }
+ }
+ in.close()
+ }
+ }
+
private def copy(src: File, dst: File): Seq[File] = {
src.listFiles().flatMap { f =>
val child = new File(dst, f.getName())
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 20ecaf092e..d21a739347 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -468,6 +468,17 @@ private[spark] class Client(
env("SPARK_YARN_USER_ENV") = userEnvs
}
+ // if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH
+ // that can be passed on to the ApplicationMaster and the executors.
+ if (sparkConf.contains("spark.submit.pyArchives")) {
+ var pythonPath = sparkConf.get("spark.submit.pyArchives")
+ if (env.contains("PYTHONPATH")) {
+ pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator)
+ }
+ env("PYTHONPATH") = pythonPath
+ sparkConf.setExecutorEnv("PYTHONPATH", pythonPath)
+ }
+
// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
// executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
// SparkContext will not let that set spark* system properties, which is expected behavior for
@@ -1074,7 +1085,7 @@ object Client extends Logging {
val hiveConf = hiveClass.getMethod("getConf").invoke(hive)
val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
- val hiveConfGet = (param:String) => Option(hiveConfClass
+ val hiveConfGet = (param: String) => Option(hiveConfClass
.getMethod("get", classOf[java.lang.String])
.invoke(hiveConf, param))
@@ -1096,7 +1107,7 @@ object Client extends Logging {
val hive2Token = new Token[DelegationTokenIdentifier]()
hive2Token.decodeFromUrlString(tokenStr)
- credentials.addToken(new Text("hive.server2.delegation.token"),hive2Token)
+ credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token)
logDebug("Added hive.Server2.delegation.token to conf.")
hiveClass.getMethod("closeCurrent").invoke(null)
} else {
@@ -1141,13 +1152,13 @@ object Client extends Logging {
logInfo("Added HBase security token to credentials.")
} catch {
- case e:java.lang.NoSuchMethodException =>
+ case e: java.lang.NoSuchMethodException =>
logInfo("HBase Method not found: " + e)
- case e:java.lang.ClassNotFoundException =>
+ case e: java.lang.ClassNotFoundException =>
logDebug("HBase Class not found: " + e)
- case e:java.lang.NoClassDefFoundError =>
+ case e: java.lang.NoClassDefFoundError =>
logDebug("HBase Class not found: " + e)
- case e:Exception =>
+ case e: Exception =>
logError("Exception when obtaining HBase security token: " + e)
}
}