diff options
author | Jim Lim <jim@quixey.com> | 2014-12-03 11:16:02 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2014-12-03 11:17:09 -0800 |
commit | 163fd785a0ee41209ecdccc8c28c9f458a4d34d1 (patch) | |
tree | 76718b8de34bb20e5d76000b14ad2b1f91227cd1 /yarn | |
parent | b63e94175f1f1c4fe44f78b9b82dd3d8d2d81f5a (diff) | |
download | spark-163fd785a0ee41209ecdccc8c28c9f458a4d34d1.tar.gz spark-163fd785a0ee41209ecdccc8c28c9f458a4d34d1.tar.bz2 spark-163fd785a0ee41209ecdccc8c28c9f458a4d34d1.zip |
SPARK-2624 add datanucleus jars to the container in yarn-cluster
If `spark-submit` finds the datanucleus jars, it adds them to the driver's classpath, but does not add it to the container.
This patch modifies the yarn deployment class to copy all `datanucleus-*` jars found in `[spark-home]/libs` to the container.
Author: Jim Lim <jim@quixey.com>
Closes #3238 from jimjh/SPARK-2624 and squashes the following commits:
3633071 [Jim Lim] SPARK-2624 update documentation and comments
fe95125 [Jim Lim] SPARK-2624 keep java imports together
6c31fe0 [Jim Lim] SPARK-2624 update documentation
6690fbf [Jim Lim] SPARK-2624 add tests
d28d8e9 [Jim Lim] SPARK-2624 add spark.yarn.datanucleus.dir option
84e6cba [Jim Lim] SPARK-2624 add datanucleus jars to the container in yarn-cluster
Diffstat (limited to 'yarn')
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala | 66 | ||||
-rw-r--r-- | yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala | 76 |
2 files changed, 142 insertions, 0 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index f95d723791..8e4360ea44 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.yarn import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} +import java.io.{File, FilenameFilter} import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, ListBuffer, Map} @@ -223,10 +224,48 @@ private[spark] trait ClientBase extends Logging { } } } + if (cachedSecondaryJarLinks.nonEmpty) { sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(",")) } + /** + * Do the same for datanucleus jars, if they exist in spark home. Find all datanucleus-* jars, + * copy them to the remote fs, and add them to the class path. + * + * This is necessary because the datanucleus jars cannot be included in the assembly jar due + * to metadata conflicts involving plugin.xml. At the time of writing, these are the only + * jars that cannot be distributed with the uber jar and have to be treated differently. + * + * For more details, see SPARK-2624, and https://github.com/apache/spark/pull/3238 + */ + for (libsDir <- dataNucleusJarsDir(sparkConf)) { + val libsURI = new URI(libsDir) + val jarLinks = ListBuffer.empty[String] + if (libsURI.getScheme != LOCAL_SCHEME) { + val localURI = getQualifiedLocalPath(libsURI).toUri() + val jars = FileSystem.get(localURI, hadoopConf).listFiles(new Path(localURI.getPath), false) + while (jars.hasNext) { + val jar = jars.next() + val name = jar.getPath.getName + if (name.startsWith("datanucleus-")) { + // copy to remote and add to classpath + val src = jar.getPath + val destPath = copyFileToRemote(dst, src, replication) + distCacheMgr.addResource(fs, hadoopConf, destPath, + localResources, LocalResourceType.FILE, name, statCache) + jarLinks += name + } + } + } else { + jarLinks += libsURI.toString + Path.SEPARATOR + "*" + } + + if (jarLinks.nonEmpty) { + sparkConf.set(CONF_SPARK_DATANUCLEUS_JARS, jarLinks.mkString(",")) + } + } + localResources } @@ -551,6 +590,13 @@ private[spark] object ClientBase extends Logging { // Internal config to propagate the location of the user's jar to the driver/executors val CONF_SPARK_USER_JAR = "spark.yarn.user.jar" + // Location of the datanucleus jars + val CONF_SPARK_DATANUCLEUS_DIR = "spark.yarn.datanucleus.dir" + + // Internal config to propagate the locations of datanucleus jars found to add to the + // classpath of the executors. Value should be a comma-separated list of paths to each jar. + val CONF_SPARK_DATANUCLEUS_JARS = "spark.yarn.datanucleus.jars" + // Internal config to propagate the locations of any extra jars to add to the classpath // of the executors val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars" @@ -584,6 +630,19 @@ private[spark] object ClientBase extends Logging { } /** + * Find the user-defined provided jars directory if configured, or return SPARK_HOME/lib if not. + * + * This method first looks for $CONF_SPARK_DATANUCLEUS_DIR inside the SparkConf, then looks for + * Spark home inside the the SparkConf and the user environment. + */ + private def dataNucleusJarsDir(conf: SparkConf): Option[String] = { + conf.getOption(CONF_SPARK_DATANUCLEUS_DIR).orElse { + val sparkHome = conf.getOption("spark.home").orElse(sys.env.get("SPARK_HOME")) + sparkHome.map(path => path + Path.SEPARATOR + "lib") + } + } + + /** * Return the path to the given application's staging directory. */ private def getAppStagingDir(appId: ApplicationId): String = { @@ -684,6 +743,13 @@ private[spark] object ClientBase extends Logging { addUserClasspath(args, sparkConf, env) } + // Add datanucleus jars to classpath + for (entries <- sparkConf.getOption(CONF_SPARK_DATANUCLEUS_JARS)) { + entries.split(",").filter(_.nonEmpty).foreach { entry => + addFileToClasspath(entry, null, env) + } + } + // Append all jar files under the working directory to the classpath. addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env) } diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala index 17b79ae1d8..b055e9b72d 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala @@ -21,6 +21,7 @@ import java.io.File import java.net.URI import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.yarn.api.ApplicationConstants.Environment @@ -104,6 +105,81 @@ class ClientBaseSuite extends FunSuite with Matchers { cp should not contain (ClientBase.APP_JAR) } + test("DataNucleus in classpath") { + val dnJars = "local:/dn/core.jar,/dn/api.jar" + val conf = new Configuration() + val sparkConf = new SparkConf() + .set(ClientBase.CONF_SPARK_JAR, SPARK) + .set(ClientBase.CONF_SPARK_DATANUCLEUS_JARS, dnJars) + val env = new MutableHashMap[String, String]() + val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) + + ClientBase.populateClasspath(args, conf, sparkConf, env) + + val cp = env("CLASSPATH").split(File.pathSeparator) + s"$dnJars".split(",").foreach({ entry => + val uri = new URI(entry) + if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) { + cp should contain (uri.getPath()) + } else { + cp should not contain (uri.getPath()) + } + }) + } + + test("DataNucleus using local:") { + val dnDir = "local:/datanucleus" + val conf = new Configuration() + val sparkConf = new SparkConf() + .set(ClientBase.CONF_SPARK_JAR, SPARK) + .set(ClientBase.CONF_SPARK_DATANUCLEUS_DIR, dnDir) + val yarnConf = new YarnConfiguration() + val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) + + val client = spy(new DummyClient(args, conf, sparkConf, yarnConf)) + doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), + any(classOf[Path]), anyShort(), anyBoolean()) + + val tempDir = Utils.createTempDir() + try { + client.prepareLocalResources(tempDir.getAbsolutePath()) + val jars = sparkConf.get(ClientBase.CONF_SPARK_DATANUCLEUS_JARS).split(",") + val uri = new URI(dnDir) + jars should contain (uri.toString + Path.SEPARATOR + "*") + } finally { + Utils.deleteRecursively(tempDir) + } + } + + test("DataNucleus using file:") { + val dnDir = Utils.createTempDir() + val tempDir = Utils.createTempDir() + + try { + // create mock datanucleus jar + val tempJar = File.createTempFile("datanucleus-", null, dnDir) + + val conf = new Configuration() + val sparkConf = new SparkConf() + .set(ClientBase.CONF_SPARK_JAR, SPARK) + .set(ClientBase.CONF_SPARK_DATANUCLEUS_DIR, dnDir.toURI.toString) + val yarnConf = new YarnConfiguration() + val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) + + val client = spy(new DummyClient(args, conf, sparkConf, yarnConf)) + doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), + any(classOf[Path]), anyShort(), anyBoolean()) + + client.prepareLocalResources(tempDir.getAbsolutePath()) + + val jars = sparkConf.get(ClientBase.CONF_SPARK_DATANUCLEUS_JARS).split(",") + jars should contain (tempJar.getName) + } finally { + Utils.deleteRecursively(dnDir) + Utils.deleteRecursively(tempDir) + } + } + test("Jar path propagation through SparkConf") { val conf = new Configuration() val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK) |