aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorBryan Cutler <cutlerb@gmail.com>2017-01-11 11:57:38 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2017-01-11 11:57:38 -0800
commit3bc2eff8880a3ba8d4318118715ea1a47048e3de (patch)
treeed2191ec24974ab6866fd05fa5e62dcd7a94dfdd /core
parentd749c06677c2fd383733337f1c00f542da122b8d (diff)
downloadspark-3bc2eff8880a3ba8d4318118715ea1a47048e3de.tar.gz
spark-3bc2eff8880a3ba8d4318118715ea1a47048e3de.tar.bz2
spark-3bc2eff8880a3ba8d4318118715ea1a47048e3de.zip
[SPARK-17568][CORE][DEPLOY] Add spark-submit option to override ivy settings used to resolve packages/artifacts
## What changes were proposed in this pull request? Adding option in spark-submit to allow overriding the default IvySettings used to resolve artifacts as part of the Spark Packages functionality. This will allow all artifact resolution to go through a central managed repository, such as Nexus or Artifactory, where site admins can better approve and control what is used with Spark apps. This change restructures the creation of the IvySettings object in two distinct ways. First, if the `spark.ivy.settings` option is not defined then `buildIvySettings` will create a default settings instance, as before, with defined repositories (Maven Central) included. Second, if the option is defined, the ivy settings file will be loaded from the given path and only repositories defined within will be used for artifact resolution. ## How was this patch tested? Existing tests for default behaviour, Manual tests that load a ivysettings.xml file with local and Nexus repositories defined. Added new test to load a simple Ivy settings file with a local filesystem resolver. Author: Bryan Cutler <cutlerb@gmail.com> Author: Ian Hummel <ian@themodernlife.net> Closes #15119 from BryanCutler/spark-custom-IvySettings.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala149
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala100
2 files changed, 183 insertions, 66 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 85f80b6971..a980144a75 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -17,10 +17,11 @@
package org.apache.spark.deploy
-import java.io.{File, PrintStream}
+import java.io.{File, IOException, PrintStream}
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.security.PrivilegedExceptionAction
+import java.text.ParseException
import scala.annotation.tailrec
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
@@ -283,8 +284,17 @@ object SparkSubmit extends CommandLineUtils {
} else {
Nil
}
+
+ // Create the IvySettings, either load from file or build defaults
+ val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile =>
+ SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories),
+ Option(args.ivyRepoPath))
+ }.getOrElse {
+ SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath))
+ }
+
val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
- Option(args.repositories), Option(args.ivyRepoPath), exclusions = exclusions)
+ ivySettings, exclusions = exclusions)
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython) {
@@ -860,30 +870,13 @@ private[spark] object SparkSubmitUtils {
/**
* Extracts maven coordinates from a comma-delimited string
- * @param remoteRepos Comma-delimited string of remote repositories
- * @param ivySettings The Ivy settings for this session
+ * @param defaultIvyUserDir The default user path for Ivy
* @return A ChainResolver used by Ivy to search for and resolve dependencies.
*/
- def createRepoResolvers(remoteRepos: Option[String], ivySettings: IvySettings): ChainResolver = {
+ def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = {
// We need a chain resolver if we want to check multiple repositories
val cr = new ChainResolver
- cr.setName("list")
-
- val repositoryList = remoteRepos.getOrElse("")
- // add any other remote repositories other than maven central
- if (repositoryList.trim.nonEmpty) {
- repositoryList.split(",").zipWithIndex.foreach { case (repo, i) =>
- val brr: IBiblioResolver = new IBiblioResolver
- brr.setM2compatible(true)
- brr.setUsepoms(true)
- brr.setRoot(repo)
- brr.setName(s"repo-${i + 1}")
- cr.add(brr)
- // scalastyle:off println
- printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
- // scalastyle:on println
- }
- }
+ cr.setName("spark-list")
val localM2 = new IBiblioResolver
localM2.setM2compatible(true)
@@ -893,7 +886,7 @@ private[spark] object SparkSubmitUtils {
cr.add(localM2)
val localIvy = new FileSystemResolver
- val localIvyRoot = new File(ivySettings.getDefaultIvyUserDir, "local")
+ val localIvyRoot = new File(defaultIvyUserDir, "local")
localIvy.setLocal(true)
localIvy.setRepository(new FileRepository(localIvyRoot))
val ivyPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", "[module]", "[revision]",
@@ -974,6 +967,87 @@ private[spark] object SparkSubmitUtils {
}
}
+ /**
+ * Build Ivy Settings using options with default resolvers
+ * @param remoteRepos Comma-delimited string of remote repositories other than maven central
+ * @param ivyPath The path to the local ivy repository
+ * @return An IvySettings object
+ */
+ def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String]): IvySettings = {
+ val ivySettings: IvySettings = new IvySettings
+ processIvyPathArg(ivySettings, ivyPath)
+
+ // create a pattern matcher
+ ivySettings.addMatcher(new GlobPatternMatcher)
+ // create the dependency resolvers
+ val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir)
+ ivySettings.addResolver(repoResolver)
+ ivySettings.setDefaultResolver(repoResolver.getName)
+ processRemoteRepoArg(ivySettings, remoteRepos)
+ ivySettings
+ }
+
+ /**
+ * Load Ivy settings from a given filename, using supplied resolvers
+ * @param settingsFile Path to Ivy settings file
+ * @param remoteRepos Comma-delimited string of remote repositories other than maven central
+ * @param ivyPath The path to the local ivy repository
+ * @return An IvySettings object
+ */
+ def loadIvySettings(
+ settingsFile: String,
+ remoteRepos: Option[String],
+ ivyPath: Option[String]): IvySettings = {
+ val file = new File(settingsFile)
+ require(file.exists(), s"Ivy settings file $file does not exist")
+ require(file.isFile(), s"Ivy settings file $file is not a normal file")
+ val ivySettings: IvySettings = new IvySettings
+ try {
+ ivySettings.load(file)
+ } catch {
+ case e @ (_: IOException | _: ParseException) =>
+ throw new SparkException(s"Failed when loading Ivy settings from $settingsFile", e)
+ }
+ processIvyPathArg(ivySettings, ivyPath)
+ processRemoteRepoArg(ivySettings, remoteRepos)
+ ivySettings
+ }
+
+ /* Set ivy settings for location of cache, if option is supplied */
+ private def processIvyPathArg(ivySettings: IvySettings, ivyPath: Option[String]): Unit = {
+ ivyPath.filterNot(_.trim.isEmpty).foreach { alternateIvyDir =>
+ ivySettings.setDefaultIvyUserDir(new File(alternateIvyDir))
+ ivySettings.setDefaultCache(new File(alternateIvyDir, "cache"))
+ }
+ }
+
+ /* Add any optional additional remote repositories */
+ private def processRemoteRepoArg(ivySettings: IvySettings, remoteRepos: Option[String]): Unit = {
+ remoteRepos.filterNot(_.trim.isEmpty).map(_.split(",")).foreach { repositoryList =>
+ val cr = new ChainResolver
+ cr.setName("user-list")
+
+ // add current default resolver, if any
+ Option(ivySettings.getDefaultResolver).foreach(cr.add)
+
+ // add additional repositories, last resolution in chain takes precedence
+ repositoryList.zipWithIndex.foreach { case (repo, i) =>
+ val brr: IBiblioResolver = new IBiblioResolver
+ brr.setM2compatible(true)
+ brr.setUsepoms(true)
+ brr.setRoot(repo)
+ brr.setName(s"repo-${i + 1}")
+ cr.add(brr)
+ // scalastyle:off println
+ printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
+ // scalastyle:on println
+ }
+
+ ivySettings.addResolver(cr)
+ ivySettings.setDefaultResolver(cr.getName)
+ }
+ }
+
/** A nice function to use in tests as well. Values are dummy strings. */
def getModuleDescriptor: DefaultModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0"))
@@ -981,16 +1055,14 @@ private[spark] object SparkSubmitUtils {
/**
* Resolves any dependencies that were supplied through maven coordinates
* @param coordinates Comma-delimited string of maven coordinates
- * @param remoteRepos Comma-delimited string of remote repositories other than maven central
- * @param ivyPath The path to the local ivy repository
+ * @param ivySettings An IvySettings containing resolvers to use
* @param exclusions Exclusions to apply when resolving transitive dependencies
* @return The comma-delimited path to the jars of the given maven artifacts including their
* transitive dependencies
*/
def resolveMavenCoordinates(
coordinates: String,
- remoteRepos: Option[String],
- ivyPath: Option[String],
+ ivySettings: IvySettings,
exclusions: Seq[String] = Nil,
isTest: Boolean = false): String = {
if (coordinates == null || coordinates.trim.isEmpty) {
@@ -1001,32 +1073,14 @@ private[spark] object SparkSubmitUtils {
// To prevent ivy from logging to system out
System.setOut(printStream)
val artifacts = extractMavenCoordinates(coordinates)
- // Default configuration name for ivy
- val ivyConfName = "default"
- // set ivy settings for location of cache
- val ivySettings: IvySettings = new IvySettings
// Directories for caching downloads through ivy and storing the jars when maven coordinates
// are supplied to spark-submit
- val alternateIvyCache = ivyPath.getOrElse("")
- val packagesDirectory: File =
- if (alternateIvyCache == null || alternateIvyCache.trim.isEmpty) {
- new File(ivySettings.getDefaultIvyUserDir, "jars")
- } else {
- ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache))
- ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
- new File(alternateIvyCache, "jars")
- }
+ val packagesDirectory: File = new File(ivySettings.getDefaultIvyUserDir, "jars")
// scalastyle:off println
printStream.println(
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
// scalastyle:on println
- // create a pattern matcher
- ivySettings.addMatcher(new GlobPatternMatcher)
- // create the dependency resolvers
- val repoResolver = createRepoResolvers(remoteRepos, ivySettings)
- ivySettings.addResolver(repoResolver)
- ivySettings.setDefaultResolver(repoResolver.getName)
val ivy = Ivy.newInstance(ivySettings)
// Set resolve options to download transitive dependencies as well
@@ -1042,6 +1096,9 @@ private[spark] object SparkSubmitUtils {
resolveOptions.setDownload(true)
}
+ // Default configuration name for ivy
+ val ivyConfName = "default"
+
// A Module descriptor must be specified. Entries are dummy strings
val md = getModuleDescriptor
// clear ivy resolution from previous launches. The resolution file is usually at
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
index 4877710c12..266c9d33b5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
@@ -18,12 +18,14 @@
package org.apache.spark.deploy
import java.io.{File, OutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
import scala.collection.mutable.ArrayBuffer
+import com.google.common.io.Files
import org.apache.ivy.core.module.descriptor.MDArtifact
import org.apache.ivy.core.settings.IvySettings
-import org.apache.ivy.plugins.resolver.{AbstractResolver, FileSystemResolver, IBiblioResolver}
+import org.apache.ivy.plugins.resolver.{AbstractResolver, ChainResolver, FileSystemResolver, IBiblioResolver}
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
@@ -66,22 +68,25 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
test("create repo resolvers") {
val settings = new IvySettings
- val res1 = SparkSubmitUtils.createRepoResolvers(None, settings)
+ val res1 = SparkSubmitUtils.createRepoResolvers(settings.getDefaultIvyUserDir)
// should have central and spark-packages by default
assert(res1.getResolvers.size() === 4)
assert(res1.getResolvers.get(0).asInstanceOf[IBiblioResolver].getName === "local-m2-cache")
assert(res1.getResolvers.get(1).asInstanceOf[FileSystemResolver].getName === "local-ivy-cache")
assert(res1.getResolvers.get(2).asInstanceOf[IBiblioResolver].getName === "central")
assert(res1.getResolvers.get(3).asInstanceOf[IBiblioResolver].getName === "spark-packages")
+ }
+ test("create additional resolvers") {
val repos = "a/1,b/2,c/3"
- val resolver2 = SparkSubmitUtils.createRepoResolvers(Option(repos), settings)
- assert(resolver2.getResolvers.size() === 7)
+ val settings = SparkSubmitUtils.buildIvySettings(Option(repos), None)
+ val resolver = settings.getDefaultResolver.asInstanceOf[ChainResolver]
+ assert(resolver.getResolvers.size() === 4)
val expected = repos.split(",").map(r => s"$r/")
- resolver2.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) =>
- if (i < 3) {
- assert(resolver.getName === s"repo-${i + 1}")
- assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i))
+ resolver.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) =>
+ if (1 < i && i < 3) {
+ assert(resolver.getName === s"repo-$i")
+ assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i - 1))
}
}
}
@@ -126,8 +131,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1")
IvyTestUtils.withRepository(main, None, None) { repo =>
// end to end
- val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, Option(repo),
- Option(tempIvyPath), isTest = true)
+ val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
+ main.toString,
+ SparkSubmitUtils.buildIvySettings(Option(repo), Option(tempIvyPath)),
+ isTest = true)
assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path")
}
}
@@ -137,7 +144,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val dep = "my.great.dep:mydep:0.5"
// Local M2 repository
IvyTestUtils.withRepository(main, Some(dep), Some(SparkSubmitUtils.m2Path)) { repo =>
- val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None,
+ val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
+ main.toString,
+ SparkSubmitUtils.buildIvySettings(None, None),
isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
@@ -146,7 +155,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val settings = new IvySettings
val ivyLocal = new File(settings.getDefaultIvyUserDir, "local" + File.separator)
IvyTestUtils.withRepository(main, Some(dep), Some(ivyLocal), useIvyLayout = true) { repo =>
- val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None,
+ val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
+ main.toString,
+ SparkSubmitUtils.buildIvySettings(None, None),
isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
@@ -156,8 +167,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
settings.setDefaultIvyUserDir(new File(tempIvyPath))
IvyTestUtils.withRepository(main, Some(dep), Some(dummyIvyLocal), useIvyLayout = true,
ivySettings = settings) { repo =>
- val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None,
- Some(tempIvyPath), isTest = true)
+ val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
+ main.toString,
+ SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)),
+ isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path")
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
@@ -166,7 +179,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
test("dependency not found throws RuntimeException") {
intercept[RuntimeException] {
- SparkSubmitUtils.resolveMavenCoordinates("a:b:c", None, None, isTest = true)
+ SparkSubmitUtils.resolveMavenCoordinates(
+ "a:b:c",
+ SparkSubmitUtils.buildIvySettings(None, None),
+ isTest = true)
}
}
@@ -178,12 +194,17 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") +
",org.apache.spark:spark-core_fake:1.2.0"
- val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, isTest = true)
+ val path = SparkSubmitUtils.resolveMavenCoordinates(
+ coordinates,
+ SparkSubmitUtils.buildIvySettings(None, None),
+ isTest = true)
assert(path === "", "should return empty path")
val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0")
IvyTestUtils.withRepository(main, None, None) { repo =>
- val files = SparkSubmitUtils.resolveMavenCoordinates(coordinates + "," + main.toString,
- Some(repo), None, isTest = true)
+ val files = SparkSubmitUtils.resolveMavenCoordinates(
+ coordinates + "," + main.toString,
+ SparkSubmitUtils.buildIvySettings(Some(repo), None),
+ isTest = true)
assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
}
}
@@ -192,10 +213,49 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
val dep = "my.great.dep:mydep:0.5"
IvyTestUtils.withRepository(main, Some(dep), None) { repo =>
- val files = SparkSubmitUtils.resolveMavenCoordinates(main.toString,
- Some(repo), None, Seq("my.great.dep:mydep"), isTest = true)
+ val files = SparkSubmitUtils.resolveMavenCoordinates(
+ main.toString,
+ SparkSubmitUtils.buildIvySettings(Some(repo), None),
+ Seq("my.great.dep:mydep"),
+ isTest = true)
assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
assert(files.indexOf("my.great.dep") < 0, "Returned excluded artifact")
}
}
+
+ test("load ivy settings file") {
+ val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
+ val dep = "my.great.dep:mydep:0.5"
+ val dummyIvyLocal = new File(tempIvyPath, "local" + File.separator)
+ val settingsText =
+ s"""
+ |<ivysettings>
+ | <caches defaultCacheDir="$tempIvyPath/cache"/>
+ | <settings defaultResolver="local-ivy-settings-file-test"/>
+ | <resolvers>
+ | <filesystem name="local-ivy-settings-file-test">
+ | <ivy pattern=
+ | "$dummyIvyLocal/[organisation]/[module]/[revision]/[type]s/[artifact].[ext]"/>
+ | <artifact pattern=
+ | "$dummyIvyLocal/[organisation]/[module]/[revision]/[type]s/[artifact].[ext]"/>
+ | </filesystem>
+ | </resolvers>
+ |</ivysettings>
+ |""".stripMargin
+
+ val settingsFile = new File(tempIvyPath, "ivysettings.xml")
+ Files.write(settingsText, settingsFile, StandardCharsets.UTF_8)
+ val settings = SparkSubmitUtils.loadIvySettings(settingsFile.toString, None, None)
+ settings.setDefaultIvyUserDir(new File(tempIvyPath)) // NOTE - can't set this through file
+
+ val testUtilSettings = new IvySettings
+ testUtilSettings.setDefaultIvyUserDir(new File(tempIvyPath))
+ IvyTestUtils.withRepository(main, Some(dep), Some(dummyIvyLocal), useIvyLayout = true,
+ ivySettings = testUtilSettings) { repo =>
+ val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, settings, isTest = true)
+ assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
+ assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path")
+ assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
+ }
+ }
}