aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorJim Lim <jim@quixey.com>2014-12-03 11:16:02 -0800
committerAndrew Or <andrew@databricks.com>2014-12-03 11:16:29 -0800
commita975dc32799bb8a14f9e1c76defaaa7cfbaf8b53 (patch)
tree4d360d83bf07ae47d9b12962c47431d7611568c9 /yarn
parentd00542987ed80635782dcc826fc0bdbf434fff10 (diff)
downloadspark-a975dc32799bb8a14f9e1c76defaaa7cfbaf8b53.tar.gz
spark-a975dc32799bb8a14f9e1c76defaaa7cfbaf8b53.tar.bz2
spark-a975dc32799bb8a14f9e1c76defaaa7cfbaf8b53.zip
SPARK-2624 add datanucleus jars to the container in yarn-cluster
If `spark-submit` finds the datanucleus jars, it adds them to the driver's classpath, but does not add it to the container. This patch modifies the yarn deployment class to copy all `datanucleus-*` jars found in `[spark-home]/libs` to the container. Author: Jim Lim <jim@quixey.com> Closes #3238 from jimjh/SPARK-2624 and squashes the following commits: 3633071 [Jim Lim] SPARK-2624 update documentation and comments fe95125 [Jim Lim] SPARK-2624 keep java imports together 6c31fe0 [Jim Lim] SPARK-2624 update documentation 6690fbf [Jim Lim] SPARK-2624 add tests d28d8e9 [Jim Lim] SPARK-2624 add spark.yarn.datanucleus.dir option 84e6cba [Jim Lim] SPARK-2624 add datanucleus jars to the container in yarn-cluster
Diffstat (limited to 'yarn')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala66
-rw-r--r--yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala76
2 files changed, 142 insertions, 0 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index f95d723791..8e4360ea44 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -18,6 +18,7 @@
package org.apache.spark.deploy.yarn
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
+import java.io.{File, FilenameFilter}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, ListBuffer, Map}
@@ -223,10 +224,48 @@ private[spark] trait ClientBase extends Logging {
}
}
}
+
if (cachedSecondaryJarLinks.nonEmpty) {
sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
}
+ /**
+ * Do the same for datanucleus jars, if they exist in spark home. Find all datanucleus-* jars,
+ * copy them to the remote fs, and add them to the class path.
+ *
+ * This is necessary because the datanucleus jars cannot be included in the assembly jar due
+ * to metadata conflicts involving plugin.xml. At the time of writing, these are the only
+ * jars that cannot be distributed with the uber jar and have to be treated differently.
+ *
+ * For more details, see SPARK-2624, and https://github.com/apache/spark/pull/3238
+ */
+ for (libsDir <- dataNucleusJarsDir(sparkConf)) {
+ val libsURI = new URI(libsDir)
+ val jarLinks = ListBuffer.empty[String]
+ if (libsURI.getScheme != LOCAL_SCHEME) {
+ val localURI = getQualifiedLocalPath(libsURI).toUri()
+ val jars = FileSystem.get(localURI, hadoopConf).listFiles(new Path(localURI.getPath), false)
+ while (jars.hasNext) {
+ val jar = jars.next()
+ val name = jar.getPath.getName
+ if (name.startsWith("datanucleus-")) {
+ // copy to remote and add to classpath
+ val src = jar.getPath
+ val destPath = copyFileToRemote(dst, src, replication)
+ distCacheMgr.addResource(fs, hadoopConf, destPath,
+ localResources, LocalResourceType.FILE, name, statCache)
+ jarLinks += name
+ }
+ }
+ } else {
+ jarLinks += libsURI.toString + Path.SEPARATOR + "*"
+ }
+
+ if (jarLinks.nonEmpty) {
+ sparkConf.set(CONF_SPARK_DATANUCLEUS_JARS, jarLinks.mkString(","))
+ }
+ }
+
localResources
}
@@ -551,6 +590,13 @@ private[spark] object ClientBase extends Logging {
// Internal config to propagate the location of the user's jar to the driver/executors
val CONF_SPARK_USER_JAR = "spark.yarn.user.jar"
+ // Location of the datanucleus jars
+ val CONF_SPARK_DATANUCLEUS_DIR = "spark.yarn.datanucleus.dir"
+
+ // Internal config to propagate the locations of datanucleus jars found to add to the
+ // classpath of the executors. Value should be a comma-separated list of paths to each jar.
+ val CONF_SPARK_DATANUCLEUS_JARS = "spark.yarn.datanucleus.jars"
+
// Internal config to propagate the locations of any extra jars to add to the classpath
// of the executors
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
@@ -584,6 +630,19 @@ private[spark] object ClientBase extends Logging {
}
/**
+ * Find the user-defined provided jars directory if configured, or return SPARK_HOME/lib if not.
+ *
+ * This method first looks for $CONF_SPARK_DATANUCLEUS_DIR inside the SparkConf, then looks for
+ * Spark home inside the the SparkConf and the user environment.
+ */
+ private def dataNucleusJarsDir(conf: SparkConf): Option[String] = {
+ conf.getOption(CONF_SPARK_DATANUCLEUS_DIR).orElse {
+ val sparkHome = conf.getOption("spark.home").orElse(sys.env.get("SPARK_HOME"))
+ sparkHome.map(path => path + Path.SEPARATOR + "lib")
+ }
+ }
+
+ /**
* Return the path to the given application's staging directory.
*/
private def getAppStagingDir(appId: ApplicationId): String = {
@@ -684,6 +743,13 @@ private[spark] object ClientBase extends Logging {
addUserClasspath(args, sparkConf, env)
}
+ // Add datanucleus jars to classpath
+ for (entries <- sparkConf.getOption(CONF_SPARK_DATANUCLEUS_JARS)) {
+ entries.split(",").filter(_.nonEmpty).foreach { entry =>
+ addFileToClasspath(entry, null, env)
+ }
+ }
+
// Append all jar files under the working directory to the classpath.
addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env)
}
diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
index 17b79ae1d8..b055e9b72d 100644
--- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
+++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
import java.net.URI
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -104,6 +105,81 @@ class ClientBaseSuite extends FunSuite with Matchers {
cp should not contain (ClientBase.APP_JAR)
}
+ test("DataNucleus in classpath") {
+ val dnJars = "local:/dn/core.jar,/dn/api.jar"
+ val conf = new Configuration()
+ val sparkConf = new SparkConf()
+ .set(ClientBase.CONF_SPARK_JAR, SPARK)
+ .set(ClientBase.CONF_SPARK_DATANUCLEUS_JARS, dnJars)
+ val env = new MutableHashMap[String, String]()
+ val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
+
+ ClientBase.populateClasspath(args, conf, sparkConf, env)
+
+ val cp = env("CLASSPATH").split(File.pathSeparator)
+ s"$dnJars".split(",").foreach({ entry =>
+ val uri = new URI(entry)
+ if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) {
+ cp should contain (uri.getPath())
+ } else {
+ cp should not contain (uri.getPath())
+ }
+ })
+ }
+
+ test("DataNucleus using local:") {
+ val dnDir = "local:/datanucleus"
+ val conf = new Configuration()
+ val sparkConf = new SparkConf()
+ .set(ClientBase.CONF_SPARK_JAR, SPARK)
+ .set(ClientBase.CONF_SPARK_DATANUCLEUS_DIR, dnDir)
+ val yarnConf = new YarnConfiguration()
+ val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
+
+ val client = spy(new DummyClient(args, conf, sparkConf, yarnConf))
+ doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
+ any(classOf[Path]), anyShort(), anyBoolean())
+
+ val tempDir = Utils.createTempDir()
+ try {
+ client.prepareLocalResources(tempDir.getAbsolutePath())
+ val jars = sparkConf.get(ClientBase.CONF_SPARK_DATANUCLEUS_JARS).split(",")
+ val uri = new URI(dnDir)
+ jars should contain (uri.toString + Path.SEPARATOR + "*")
+ } finally {
+ Utils.deleteRecursively(tempDir)
+ }
+ }
+
+ test("DataNucleus using file:") {
+ val dnDir = Utils.createTempDir()
+ val tempDir = Utils.createTempDir()
+
+ try {
+ // create mock datanucleus jar
+ val tempJar = File.createTempFile("datanucleus-", null, dnDir)
+
+ val conf = new Configuration()
+ val sparkConf = new SparkConf()
+ .set(ClientBase.CONF_SPARK_JAR, SPARK)
+ .set(ClientBase.CONF_SPARK_DATANUCLEUS_DIR, dnDir.toURI.toString)
+ val yarnConf = new YarnConfiguration()
+ val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
+
+ val client = spy(new DummyClient(args, conf, sparkConf, yarnConf))
+ doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
+ any(classOf[Path]), anyShort(), anyBoolean())
+
+ client.prepareLocalResources(tempDir.getAbsolutePath())
+
+ val jars = sparkConf.get(ClientBase.CONF_SPARK_DATANUCLEUS_JARS).split(",")
+ jars should contain (tempJar.getName)
+ } finally {
+ Utils.deleteRecursively(dnDir)
+ Utils.deleteRecursively(tempDir)
+ }
+ }
+
test("Jar path propagation through SparkConf") {
val conf = new Configuration()
val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK)