aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/running-on-yarn.md15
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala66
-rw-r--r--yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala76
3 files changed, 157 insertions, 0 deletions
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index dfe2db4b3f..45e219e0c1 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -132,6 +132,21 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
The maximum number of threads to use in the application master for launching executor containers.
</td>
</tr>
+<tr>
+ <td><code>spark.yarn.datanucleus.dir</code></td>
+ <td>$SPARK_HOME/lib</td>
+ <td>
+ The location of the DataNucleus jars, in case overriding the default location is desired.
+ By default, Spark on YARN will use the DataNucleus jars installed at
+ <code>$SPARK_HOME/lib</code>, but the jars can also be in a world-readable location on HDFS.
+ This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an
+ application runs. To point to a directory on HDFS, for example, set this configuration to
+ "hdfs:///some/path".
+
+ This is required because the datanucleus jars cannot be packaged into the
+ assembly jar due to metadata conflicts (involving <code>plugin.xml</code>.)
+ </td>
+</tr>
</table>
# Launching Spark on YARN
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index f95d723791..8e4360ea44 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -18,6 +18,7 @@
package org.apache.spark.deploy.yarn
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
+import java.io.{File, FilenameFilter}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, ListBuffer, Map}
@@ -223,10 +224,48 @@ private[spark] trait ClientBase extends Logging {
}
}
}
+
if (cachedSecondaryJarLinks.nonEmpty) {
sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
}
+ /**
+ * Do the same for datanucleus jars, if they exist in spark home. Find all datanucleus-* jars,
+ * copy them to the remote fs, and add them to the class path.
+ *
+ * This is necessary because the datanucleus jars cannot be included in the assembly jar due
+ * to metadata conflicts involving plugin.xml. At the time of writing, these are the only
+ * jars that cannot be distributed with the uber jar and have to be treated differently.
+ *
+ * For more details, see SPARK-2624, and https://github.com/apache/spark/pull/3238
+ */
+ for (libsDir <- dataNucleusJarsDir(sparkConf)) {
+ val libsURI = new URI(libsDir)
+ val jarLinks = ListBuffer.empty[String]
+ if (libsURI.getScheme != LOCAL_SCHEME) {
+ val localURI = getQualifiedLocalPath(libsURI).toUri()
+ val jars = FileSystem.get(localURI, hadoopConf).listFiles(new Path(localURI.getPath), false)
+ while (jars.hasNext) {
+ val jar = jars.next()
+ val name = jar.getPath.getName
+ if (name.startsWith("datanucleus-")) {
+ // copy to remote and add to classpath
+ val src = jar.getPath
+ val destPath = copyFileToRemote(dst, src, replication)
+ distCacheMgr.addResource(fs, hadoopConf, destPath,
+ localResources, LocalResourceType.FILE, name, statCache)
+ jarLinks += name
+ }
+ }
+ } else {
+ jarLinks += libsURI.toString + Path.SEPARATOR + "*"
+ }
+
+ if (jarLinks.nonEmpty) {
+ sparkConf.set(CONF_SPARK_DATANUCLEUS_JARS, jarLinks.mkString(","))
+ }
+ }
+
localResources
}
@@ -551,6 +590,13 @@ private[spark] object ClientBase extends Logging {
// Internal config to propagate the location of the user's jar to the driver/executors
val CONF_SPARK_USER_JAR = "spark.yarn.user.jar"
+ // Location of the datanucleus jars
+ val CONF_SPARK_DATANUCLEUS_DIR = "spark.yarn.datanucleus.dir"
+
+ // Internal config to propagate the locations of datanucleus jars found to add to the
+ // classpath of the executors. Value should be a comma-separated list of paths to each jar.
+ val CONF_SPARK_DATANUCLEUS_JARS = "spark.yarn.datanucleus.jars"
+
// Internal config to propagate the locations of any extra jars to add to the classpath
// of the executors
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
@@ -584,6 +630,19 @@ private[spark] object ClientBase extends Logging {
}
/**
+ * Find the user-defined provided jars directory if configured, or return SPARK_HOME/lib if not.
+ *
+ * This method first looks for $CONF_SPARK_DATANUCLEUS_DIR inside the SparkConf, then looks for
+ * Spark home inside the the SparkConf and the user environment.
+ */
+ private def dataNucleusJarsDir(conf: SparkConf): Option[String] = {
+ conf.getOption(CONF_SPARK_DATANUCLEUS_DIR).orElse {
+ val sparkHome = conf.getOption("spark.home").orElse(sys.env.get("SPARK_HOME"))
+ sparkHome.map(path => path + Path.SEPARATOR + "lib")
+ }
+ }
+
+ /**
* Return the path to the given application's staging directory.
*/
private def getAppStagingDir(appId: ApplicationId): String = {
@@ -684,6 +743,13 @@ private[spark] object ClientBase extends Logging {
addUserClasspath(args, sparkConf, env)
}
+ // Add datanucleus jars to classpath
+ for (entries <- sparkConf.getOption(CONF_SPARK_DATANUCLEUS_JARS)) {
+ entries.split(",").filter(_.nonEmpty).foreach { entry =>
+ addFileToClasspath(entry, null, env)
+ }
+ }
+
// Append all jar files under the working directory to the classpath.
addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env)
}
diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
index 17b79ae1d8..b055e9b72d 100644
--- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
+++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
import java.net.URI
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -104,6 +105,81 @@ class ClientBaseSuite extends FunSuite with Matchers {
cp should not contain (ClientBase.APP_JAR)
}
+ test("DataNucleus in classpath") {
+ val dnJars = "local:/dn/core.jar,/dn/api.jar"
+ val conf = new Configuration()
+ val sparkConf = new SparkConf()
+ .set(ClientBase.CONF_SPARK_JAR, SPARK)
+ .set(ClientBase.CONF_SPARK_DATANUCLEUS_JARS, dnJars)
+ val env = new MutableHashMap[String, String]()
+ val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
+
+ ClientBase.populateClasspath(args, conf, sparkConf, env)
+
+ val cp = env("CLASSPATH").split(File.pathSeparator)
+ s"$dnJars".split(",").foreach({ entry =>
+ val uri = new URI(entry)
+ if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) {
+ cp should contain (uri.getPath())
+ } else {
+ cp should not contain (uri.getPath())
+ }
+ })
+ }
+
+ test("DataNucleus using local:") {
+ val dnDir = "local:/datanucleus"
+ val conf = new Configuration()
+ val sparkConf = new SparkConf()
+ .set(ClientBase.CONF_SPARK_JAR, SPARK)
+ .set(ClientBase.CONF_SPARK_DATANUCLEUS_DIR, dnDir)
+ val yarnConf = new YarnConfiguration()
+ val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
+
+ val client = spy(new DummyClient(args, conf, sparkConf, yarnConf))
+ doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
+ any(classOf[Path]), anyShort(), anyBoolean())
+
+ val tempDir = Utils.createTempDir()
+ try {
+ client.prepareLocalResources(tempDir.getAbsolutePath())
+ val jars = sparkConf.get(ClientBase.CONF_SPARK_DATANUCLEUS_JARS).split(",")
+ val uri = new URI(dnDir)
+ jars should contain (uri.toString + Path.SEPARATOR + "*")
+ } finally {
+ Utils.deleteRecursively(tempDir)
+ }
+ }
+
+ test("DataNucleus using file:") {
+ val dnDir = Utils.createTempDir()
+ val tempDir = Utils.createTempDir()
+
+ try {
+ // create mock datanucleus jar
+ val tempJar = File.createTempFile("datanucleus-", null, dnDir)
+
+ val conf = new Configuration()
+ val sparkConf = new SparkConf()
+ .set(ClientBase.CONF_SPARK_JAR, SPARK)
+ .set(ClientBase.CONF_SPARK_DATANUCLEUS_DIR, dnDir.toURI.toString)
+ val yarnConf = new YarnConfiguration()
+ val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
+
+ val client = spy(new DummyClient(args, conf, sparkConf, yarnConf))
+ doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
+ any(classOf[Path]), anyShort(), anyBoolean())
+
+ client.prepareLocalResources(tempDir.getAbsolutePath())
+
+ val jars = sparkConf.get(ClientBase.CONF_SPARK_DATANUCLEUS_JARS).split(",")
+ jars should contain (tempJar.getName)
+ } finally {
+ Utils.deleteRecursively(dnDir)
+ Utils.deleteRecursively(tempDir)
+ }
+ }
+
test("Jar path propagation through SparkConf") {
val conf = new Configuration()
val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK)