aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala316
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/packages/MavenCoordinate.scala39
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/packages/PackageNotFoundException.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/packages/PackageUtil.scala203
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/packages/Resolvers.scala94
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala52
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala97
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala6
12 files changed, 445 insertions, 376 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 926e1ff7a8..2ef81cbadc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -28,20 +28,10 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.ivy.Ivy
-import org.apache.ivy.core.LogOptions
-import org.apache.ivy.core.module.descriptor._
-import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
-import org.apache.ivy.core.report.ResolveReport
-import org.apache.ivy.core.resolve.ResolveOptions
-import org.apache.ivy.core.retrieve.RetrieveOptions
-import org.apache.ivy.core.settings.IvySettings
-import org.apache.ivy.plugins.matcher.GlobPatternMatcher
-import org.apache.ivy.plugins.repository.file.FileRepository
-import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver}
import org.apache.spark.{SPARK_VERSION, SparkException, SparkUserAppException}
import org.apache.spark.api.r.RUtils
+import org.apache.spark.deploy.packages._
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
@@ -281,16 +271,14 @@ object SparkSubmit {
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
- // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
- // too for packages that include Python code
- val exclusions: Seq[String] =
- if (!StringUtils.isBlank(args.packagesExclusions)) {
- args.packagesExclusions.split(",")
- } else {
- Nil
- }
- val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
- Option(args.repositories), Option(args.ivyRepoPath), exclusions = exclusions)
+ val resolvedMavenCoordinates: String = SparkSubmitUtils.resolveMavenCoordinates(
+ packages = Option(args.packages).map(_.split(",").toSeq).getOrElse(Seq.empty),
+ extraRepos = Option(args.repositories).map(_.split(",").toSeq).getOrElse(Seq.empty),
+ ivyPath = Option(args.ivyRepoPath),
+ exclusions = Option(args.packagesExclusions).map(_.split(",").toSeq).getOrElse(Seq.empty),
+ provided = args.providedPackages
+ ).mkString(",")
+
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython) {
@@ -812,277 +800,39 @@ object SparkSubmit {
/** Provides utility functions to be used inside SparkSubmit. */
private[spark] object SparkSubmitUtils {
- // Exposed for testing
- var printStream = SparkSubmit.printStream
-
- /**
- * Represents a Maven Coordinate
- * @param groupId the groupId of the coordinate
- * @param artifactId the artifactId of the coordinate
- * @param version the version of the coordinate
- */
- private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String) {
- override def toString: String = s"$groupId:$artifactId:$version"
- }
-
-/**
- * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
- * in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
- * @param coordinates Comma-delimited string of maven coordinates
- * @return Sequence of Maven coordinates
- */
- def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
- coordinates.split(",").map { p =>
- val splits = p.replace("/", ":").split(":")
- require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
- s"'groupId:artifactId:version'. The coordinate provided is: $p")
- require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
- s"be whitespace. The groupId provided is: ${splits(0)}")
- require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " +
- s"be whitespace. The artifactId provided is: ${splits(1)}")
- require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " +
- s"be whitespace. The version provided is: ${splits(2)}")
- new MavenCoordinate(splits(0), splits(1), splits(2))
- }
- }
-
- /** Path of the local Maven cache. */
- private[spark] def m2Path: File = {
- if (Utils.isTesting) {
- // test builds delete the maven cache, and this can cause flakiness
- new File("dummy", ".m2" + File.separator + "repository")
- } else {
- new File(System.getProperty("user.home"), ".m2" + File.separator + "repository")
- }
- }
-
- /**
- * Extracts maven coordinates from a comma-delimited string
- * @param remoteRepos Comma-delimited string of remote repositories
- * @param ivySettings The Ivy settings for this session
- * @return A ChainResolver used by Ivy to search for and resolve dependencies.
- */
- def createRepoResolvers(remoteRepos: Option[String], ivySettings: IvySettings): ChainResolver = {
- // We need a chain resolver if we want to check multiple repositories
- val cr = new ChainResolver
- cr.setName("list")
-
- val repositoryList = remoteRepos.getOrElse("")
- // add any other remote repositories other than maven central
- if (repositoryList.trim.nonEmpty) {
- repositoryList.split(",").zipWithIndex.foreach { case (repo, i) =>
- val brr: IBiblioResolver = new IBiblioResolver
- brr.setM2compatible(true)
- brr.setUsepoms(true)
- brr.setRoot(repo)
- brr.setName(s"repo-${i + 1}")
- cr.add(brr)
- // scalastyle:off println
- printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
- // scalastyle:on println
- }
- }
-
- val localM2 = new IBiblioResolver
- localM2.setM2compatible(true)
- localM2.setRoot(m2Path.toURI.toString)
- localM2.setUsepoms(true)
- localM2.setName("local-m2-cache")
- cr.add(localM2)
-
- val localIvy = new FileSystemResolver
- val localIvyRoot = new File(ivySettings.getDefaultIvyUserDir, "local")
- localIvy.setLocal(true)
- localIvy.setRepository(new FileRepository(localIvyRoot))
- val ivyPattern = Seq("[organisation]", "[module]", "[revision]", "[type]s",
- "[artifact](-[classifier]).[ext]").mkString(File.separator)
- localIvy.addIvyPattern(localIvyRoot.getAbsolutePath + File.separator + ivyPattern)
- localIvy.setName("local-ivy-cache")
- cr.add(localIvy)
-
- // the biblio resolver resolves POM declared dependencies
- val br: IBiblioResolver = new IBiblioResolver
- br.setM2compatible(true)
- br.setUsepoms(true)
- br.setName("central")
- cr.add(br)
-
- val sp: IBiblioResolver = new IBiblioResolver
- sp.setM2compatible(true)
- sp.setUsepoms(true)
- sp.setRoot("http://dl.bintray.com/spark-packages/maven")
- sp.setName("spark-packages")
- cr.add(sp)
- cr
- }
-
- /**
- * Output a comma-delimited list of paths for the downloaded jars to be added to the classpath
- * (will append to jars in SparkSubmit).
- * @param artifacts Sequence of dependencies that were resolved and retrieved
- * @param cacheDirectory directory where jars are cached
- * @return a comma-delimited list of paths for the dependencies
- */
- def resolveDependencyPaths(
- artifacts: Array[AnyRef],
- cacheDirectory: File): String = {
- artifacts.map { artifactInfo =>
- val artifact = artifactInfo.asInstanceOf[Artifact].getModuleRevisionId
- cacheDirectory.getAbsolutePath + File.separator +
- s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}.jar"
- }.mkString(",")
- }
+ //compatibility with the way arguments are submitted to spark
+ def resolveMavenCoordinates(
+ packages: Seq[String],
+ extraRepos: Seq[String],
+ ivyPath: Option[String],
+ exclusions: Seq[String],
+ provided: Boolean = false
+ ): Seq[String] = {
- /** Adds the given maven coordinates to Ivy's module descriptor. */
- def addDependenciesToIvy(
- md: DefaultModuleDescriptor,
- artifacts: Seq[MavenCoordinate],
- ivyConfName: String): Unit = {
- artifacts.foreach { mvn =>
- val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
- val dd = new DefaultDependencyDescriptor(ri, false, false)
- dd.addDependencyConfiguration(ivyConfName, ivyConfName)
- // scalastyle:off println
- printStream.println(s"${dd.getDependencyId} added as a dependency")
- // scalastyle:on println
- md.addDependency(dd)
+ val mavenCoordinates = packages map { pkg =>
+ MavenCoordinate.extract(pkg, provided)
}
- }
- /** Add exclusion rules for dependencies already included in the spark-assembly */
- def addExclusionRules(
- ivySettings: IvySettings,
- ivyConfName: String,
- md: DefaultModuleDescriptor): Unit = {
- // Add scala exclusion rule
- md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName))
-
- // We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and
- // other spark-streaming utility components. Underscore is there to differentiate between
- // spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x
- val components = Seq("catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
- "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
-
- components.foreach { comp =>
- md.addExcludeRule(createExclusion(s"org.apache.spark:spark-$comp*:*", ivySettings,
- ivyConfName))
+ val extraResolvers = extraRepos.zipWithIndex flatMap { case (repo, i) =>
+ Seq(Resolvers.maven(s"repo-maven-$i", repo), Resolvers.ivy(s"repo-ivy-$i", repo))
}
- }
-
- /** A nice function to use in tests as well. Values are dummy strings. */
- def getModuleDescriptor: DefaultModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
- ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0"))
- /**
- * Resolves any dependencies that were supplied through maven coordinates
- * @param coordinates Comma-delimited string of maven coordinates
- * @param remoteRepos Comma-delimited string of remote repositories other than maven central
- * @param ivyPath The path to the local ivy repository
- * @param exclusions Exclusions to apply when resolving transitive dependencies
- * @return The comma-delimited path to the jars of the given maven artifacts including their
- * transitive dependencies
- */
- def resolveMavenCoordinates(
- coordinates: String,
- remoteRepos: Option[String],
- ivyPath: Option[String],
- exclusions: Seq[String] = Nil,
- isTest: Boolean = false): String = {
- if (coordinates == null || coordinates.trim.isEmpty) {
- ""
+ val resolvers = if (!provided) {
+ Resolvers.default
} else {
- val sysOut = System.out
- try {
- // To prevent ivy from logging to system out
- System.setOut(printStream)
- val artifacts = extractMavenCoordinates(coordinates)
- // Default configuration name for ivy
- val ivyConfName = "default"
- // set ivy settings for location of cache
- val ivySettings: IvySettings = new IvySettings
- // Directories for caching downloads through ivy and storing the jars when maven coordinates
- // are supplied to spark-submit
- val alternateIvyCache = ivyPath.getOrElse("")
- val packagesDirectory: File =
- if (alternateIvyCache == null || alternateIvyCache.trim.isEmpty) {
- new File(ivySettings.getDefaultIvyUserDir, "jars")
- } else {
- ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache))
- ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
- new File(alternateIvyCache, "jars")
- }
- // scalastyle:off println
- printStream.println(
- s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
- printStream.println(s"The jars for the packages stored in: $packagesDirectory")
- // scalastyle:on println
- // create a pattern matcher
- ivySettings.addMatcher(new GlobPatternMatcher)
- // create the dependency resolvers
- val repoResolver = createRepoResolvers(remoteRepos, ivySettings)
- ivySettings.addResolver(repoResolver)
- ivySettings.setDefaultResolver(repoResolver.getName)
-
- val ivy = Ivy.newInstance(ivySettings)
- // Set resolve options to download transitive dependencies as well
- val resolveOptions = new ResolveOptions
- resolveOptions.setTransitive(true)
- val retrieveOptions = new RetrieveOptions
- // Turn downloading and logging off for testing
- if (isTest) {
- resolveOptions.setDownload(false)
- resolveOptions.setLog(LogOptions.LOG_QUIET)
- retrieveOptions.setLog(LogOptions.LOG_QUIET)
- } else {
- resolveOptions.setDownload(true)
- }
+ Seq.empty
+ } ++ extraResolvers
- // A Module descriptor must be specified. Entries are dummy strings
- val md = getModuleDescriptor
- // clear ivy resolution from previous launches. The resolution file is usually at
- // ~/.ivy2/org.apache.spark-spark-submit-parent-default.xml. In between runs, this file
- // leads to confusion with Ivy when the files can no longer be found at the repository
- // declared in that file/
- val mdId = md.getModuleRevisionId
- val previousResolution = new File(ivySettings.getDefaultCache,
- s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml")
- if (previousResolution.exists) previousResolution.delete
-
- md.setDefaultConf(ivyConfName)
-
- // Add exclusion rules for Spark and Scala Library
- addExclusionRules(ivySettings, ivyConfName, md)
- // add all supplied maven artifacts as dependencies
- addDependenciesToIvy(md, artifacts, ivyConfName)
- exclusions.foreach { e =>
- md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName))
- }
- // resolve dependencies
- val rr: ResolveReport = ivy.resolve(md, resolveOptions)
- if (rr.hasError) {
- throw new RuntimeException(rr.getAllProblemMessages.toString)
- }
- // retrieve all resolved dependencies
- ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
- packagesDirectory.getAbsolutePath + File.separator +
- "[organization]_[artifact]-[revision].[ext]",
- retrieveOptions.setConfs(Array(ivyConfName)))
- resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
- } finally {
- System.setOut(sysOut)
- }
+ val excludedCoordinates = exclusions.map{ex =>
+ MavenCoordinate.extract(ex + ":*")
}
- }
- private[deploy] def createExclusion(
- coords: String,
- ivySettings: IvySettings,
- ivyConfName: String): ExcludeRule = {
- val c = extractMavenCoordinates(coords)(0)
- val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", "*")
- val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null)
- rule.addConfiguration(ivyConfName)
- rule
+ PackageUtil.resolve(
+ mavenCoordinates,
+ resolvers,
+ ivyPath,
+ excludedCoordinates
+ ).map(_.getAbsolutePath)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index ec6d48485f..6d2710844a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -61,6 +61,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var repositories: String = null
var ivyRepoPath: String = null
var packagesExclusions: String = null
+ var providedPackages: Boolean = false
var verbose: Boolean = false
var isPython: Boolean = false
var pyFiles: String = null
@@ -313,6 +314,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| packages $packages
| packagesExclusions $packagesExclusions
| repositories $repositories
+ | providedPackages $providedPackages
| verbose $verbose
|
|Spark properties used, including those specified through
@@ -428,6 +430,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case HELP =>
printUsageAndExit(0)
+ case PROVIDED_PACKAGES =>
+ providedPackages = true
+
case VERBOSE =>
verbose = true
diff --git a/core/src/main/scala/org/apache/spark/deploy/packages/MavenCoordinate.scala b/core/src/main/scala/org/apache/spark/deploy/packages/MavenCoordinate.scala
new file mode 100644
index 0000000000..992d357b60
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/packages/MavenCoordinate.scala
@@ -0,0 +1,39 @@
+package org.apache.spark.deploy.packages
+
+/**
+ * Represents a Maven Coordinate
+ * @param groupId the groupId of the coordinate
+ * @param artifactId the artifactId of the coordinate
+ * @param version the version of the coordinate
+ */
+private [spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String) {
+ override def toString: String = s"$groupId:$artifactId:$version"
+}
+
+object MavenCoordinate {
+
+ def extract(coordinate: String, chooseLatest: Boolean = false): MavenCoordinate = {
+ val splits = coordinate.replace("/", ":").split(":").toList
+
+ splits match {
+ case org :: name :: version :: Nil =>
+ new MavenCoordinate(org, name, version)
+
+ case org :: name :: Nil if chooseLatest =>
+ new MavenCoordinate(org, name, "+")
+
+ case _ =>
+ throw new RuntimeException(
+ "Invalid maven coodinate. Must be in the form 'org:name:version'")
+
+ }
+ }
+
+ /**
+ * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
+ * in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
+ * @param coordinates Comma-delimited string of maven coordinates
+ * @return Sequence of Maven coordinates
+ */
+
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/packages/PackageNotFoundException.scala b/core/src/main/scala/org/apache/spark/deploy/packages/PackageNotFoundException.scala
new file mode 100644
index 0000000000..a366811f08
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/packages/PackageNotFoundException.scala
@@ -0,0 +1,3 @@
+package org.apache.spark.deploy.packages
+
+class PackageNotFoundException(message: String) extends RuntimeException(message)
diff --git a/core/src/main/scala/org/apache/spark/deploy/packages/PackageUtil.scala b/core/src/main/scala/org/apache/spark/deploy/packages/PackageUtil.scala
new file mode 100644
index 0000000000..12a73d97fd
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/packages/PackageUtil.scala
@@ -0,0 +1,203 @@
+package org.apache.spark.deploy.packages
+
+import java.io.{ File, PrintStream }
+
+import org.apache.ivy.Ivy
+import org.apache.ivy.core.LogOptions
+import org.apache.ivy.core.module.descriptor._
+import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
+import org.apache.ivy.core.report.ResolveReport
+import org.apache.ivy.core.resolve.ResolveOptions
+import org.apache.ivy.core.retrieve.RetrieveOptions
+import org.apache.ivy.core.settings.IvySettings
+import org.apache.ivy.plugins.matcher.GlobPatternMatcher
+import org.apache.ivy.plugins.repository.file.FileRepository
+import org.apache.ivy.plugins.resolver.{ ChainResolver, FileSystemResolver, IBiblioResolver, RepositoryResolver }
+import org.apache.spark.deploy.SparkSubmit
+import org.apache.spark.util.Utils
+import scala.collection.JavaConversions._
+
+private[spark] object PackageUtil {
+
+ // Exposed for testing
+ private[spark] var printStream: PrintStream = System.err
+ private def usingStream[A](action: => A): A = {
+ val sysOut = System.out
+ try {
+ System.setOut(printStream)
+ action
+ } finally {
+ System.setOut(sysOut)
+ }
+ }
+
+ /**
+ * Output a list of paths for the downloaded jars to be added to the classpath.
+ * @param artifacts Sequence of dependencies that were resolved and retrieved
+ * @param cacheDirectory directory where jars are cached
+ * @return a comma-delimited list of paths for the dependencies
+ */
+ def resolveDependencyPaths(artifacts: Seq[Artifact], cacheDirectory: File): Seq[File] = {
+ artifacts.map{artifactInfo =>
+ val artifact = artifactInfo.getModuleRevisionId
+ val name = s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}.jar"
+ new File(cacheDirectory, name)
+ }
+ }
+
+ /** Adds the given maven coordinates to Ivy's module descriptor. */
+ def addDependenciesToIvy(
+ md: DefaultModuleDescriptor,
+ coordinates: Seq[MavenCoordinate],
+ ivyConfName: String
+ ): Unit = for (mvn <- coordinates) {
+ val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
+ val dd = new DefaultDependencyDescriptor(ri, false, false)
+ dd.addDependencyConfiguration(ivyConfName, ivyConfName)
+ // scalastyle:off println
+ printStream.println(s"${dd.getDependencyId} added as a dependency")
+ // scalastyle:on println
+ md.addDependency(dd)
+ }
+
+ private[deploy] def createExclusion(
+ coords: MavenCoordinate,
+ ivySettings: IvySettings,
+ ivyConfName: String
+ ): ExcludeRule = {
+ val id = new ArtifactId(new ModuleId(coords.groupId, coords.artifactId), "*", "*", "*")
+ val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null)
+ rule.addConfiguration(ivyConfName)
+ rule
+ }
+
+ /** Add exclusion rules for dependencies already included in the spark-assembly */
+ private def addSparkExclusionRules(
+ ivySettings: IvySettings,
+ ivyConfName: String,
+ md: DefaultModuleDescriptor
+ ): Unit = {
+ // Add scala exclusion rule
+ md.addExcludeRule(createExclusion(MavenCoordinate("*", "scala-library", "*"), ivySettings, ivyConfName))
+
+ // We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and
+ // other spark-streaming utility components. Underscore is there to differentiate between
+ // spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x
+ val components = Seq("catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
+ "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
+
+ components.foreach{ comp =>
+ md.addExcludeRule(createExclusion(
+ MavenCoordinate("org.apache.spark", s"spark-${comp}*", "*"),
+ ivySettings,
+ ivyConfName))
+ }
+ }
+
+ /** A nice function to use in tests as well. Values are dummy strings. */
+ private[spark] def parentModule: DefaultModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
+ ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0")
+ )
+
+ /**
+ * Main entry point into spark packages.
+ * Resolves any dependencies that were supplied through maven coordinates
+ * @param coordinates List of packages given as maven coordinates
+ * @param resolvers Repositories that will be considered during resolution
+ * @param ivyPath Path to the local ivy directory, used for configuring and caching (not the local .ivy2/local repository)
+ * @param exclusions Exclusions to apply when resolving transitive dependencies (do not include version)
+ * @return Jar files of the given maven artifacts including their transitive dependencies
+ */
+ private[spark] def resolve(
+ coordinates: Seq[MavenCoordinate],
+ resolvers: Seq[RepositoryResolver],
+ ivyPath: Option[String], //
+ exclusions: Seq[MavenCoordinate]
+ ): Seq[File] = usingStream {
+
+ val ivyConfName = "default"
+
+ val ivySettings: IvySettings = ivyPath match {
+ case None => new IvySettings
+ case Some(path) =>
+ val settings = new IvySettings
+ settings.setDefaultIvyUserDir(new File(path))
+ settings.setDefaultCache(new File(path, "cache"))
+ settings
+ }
+
+ // Directories for caching downloads through ivy and storing the jars when maven coordinates
+ // are supplied to spark-submit
+ val packagesDirectory: File = new File(ivySettings.getDefaultIvyUserDir, "jars")
+
+ // scalastyle:off println
+ printStream.println(
+ s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}"
+ )
+ printStream.println(s"The jars for the packages stored in: $packagesDirectory")
+ //scalastyle:on println
+
+ // create a pattern matcher
+ ivySettings.addMatcher(new GlobPatternMatcher)
+ // create the dependency resolvers
+ val repoResolver = Resolvers.chained("packages-resolver", resolvers)
+ ivySettings.addResolver(repoResolver)
+ ivySettings.setDefaultResolver(repoResolver.getName)
+
+ val ivy = Ivy.newInstance(ivySettings)
+ // Set resolve options to download transitive dependencies as well
+ val resolveOptions = new ResolveOptions
+ resolveOptions.setTransitive(true)
+ val retrieveOptions = new RetrieveOptions
+
+ // Turn downloading and logging off for testing
+ if (Utils.isTesting) {
+ resolveOptions.setDownload(false)
+ resolveOptions.setLog(LogOptions.LOG_QUIET)
+ retrieveOptions.setLog(LogOptions.LOG_QUIET)
+ } else {
+ resolveOptions.setDownload(true)
+ }
+
+ // A parent module descriptor must be specified. Entries are dummy strings
+ val md = parentModule
+ // clear ivy resolution from previous launches. The resolution file is usually at
+ // ~/.ivy2/org.apache.spark-spark-submit-parent-default.xml. In between runs, this file
+ // leads to confusion with Ivy when the files can no longer be found at the repository
+ // declared in that file/
+ val mdId = md.getModuleRevisionId
+ val previousResolution = new File(
+ ivySettings.getDefaultCache,
+ s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml"
+ )
+ if (previousResolution.exists) previousResolution.delete
+
+ md.setDefaultConf(ivyConfName)
+
+ // Add exclusion rules for Spark and Scala Library
+ addSparkExclusionRules(ivySettings, ivyConfName, md)
+ // add all supplied maven artifacts as dependencies
+ addDependenciesToIvy(md, coordinates, ivyConfName)
+ exclusions.foreach { e =>
+ md.addExcludeRule(createExclusion(e, ivySettings, ivyConfName))
+ }
+
+ // resolve dependencies
+ val rr: ResolveReport = ivy.resolve(md, resolveOptions)
+ if (rr.hasError) {
+ throw new PackageNotFoundException(rr.getAllProblemMessages.toString)
+ }
+ // retrieve all resolved dependencies
+ ivy.retrieve(
+ rr.getModuleDescriptor.getModuleRevisionId,
+ packagesDirectory.getAbsolutePath + File.separator +
+ "[organization]_[artifact]-[revision].[ext]",
+ retrieveOptions.setConfs(Array(ivyConfName))
+ )
+ resolveDependencyPaths(
+ rr.getArtifacts.map(_.asInstanceOf[Artifact]), //ivy uses a raw List type
+ packagesDirectory
+ )
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/packages/Resolvers.scala b/core/src/main/scala/org/apache/spark/deploy/packages/Resolvers.scala
new file mode 100644
index 0000000000..57f9f1a83c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/packages/Resolvers.scala
@@ -0,0 +1,94 @@
+package org.apache.spark.deploy.packages
+
+import java.io.File
+import java.net.URL
+import org.apache.ivy.plugins.repository.file.FileRepository
+import org.apache.ivy.plugins.repository.url.URLRepository
+import org.apache.ivy.plugins.resolver.{ ChainResolver, FileSystemResolver, IBiblioResolver, RepositoryResolver, URLResolver }
+import org.apache.spark.util.Utils
+
+private[spark] object Resolvers {
+
+ private val home = sys.props.get("user.home").getOrElse(".")
+
+ /**
+ * A file-based maven repository.
+ * @param root url base of directory of the repository
+ */
+ def maven(name: String, root: String): RepositoryResolver = {
+ val r = new IBiblioResolver
+ r.setM2compatible(true)
+ r.setRoot(root)
+ r.setUsepoms(true)
+ r.setName(name)
+ r
+ }
+
+ val localM2: RepositoryResolver = {
+ val root = if (Utils.isTesting) {
+ // test builds delete the maven cache, and this can cause flakiness
+ new File("dummy", ".m2" + File.separator + "repository")
+ } else {
+ new File(home, ".m2" + File.separator + "repository")
+ }
+ maven("local-maven", root.toURI.toString)
+ }
+
+ def ivy(name: String, root: String): RepositoryResolver = {
+ val resolver = new URLResolver
+ resolver.setM2compatible(true)
+ val ivyPattern = Seq("[organisation]","[module]", "[revision]", "[type]s",
+ "[artifact](-[classifier]).[ext]").mkString(File.separator)
+ resolver.addIvyPattern(root + "/" + ivyPattern)
+ resolver.setName("local-ivy")
+ resolver
+ }
+
+ val localIvy: RepositoryResolver = {
+ val root = new File(home, ".ivy2" + File.separator + "local")
+ val resolver = new FileSystemResolver
+ resolver.setLocal(true)
+ resolver.setRepository(new FileRepository(root))
+ val ivyPattern = Seq("[organisation]", "[module]", "[revision]", "[type]s",
+ "[artifact](-[classifier]).[ext]").mkString(File.separator)
+ resolver.addIvyPattern(root.getAbsolutePath + File.separator + ivyPattern)
+ resolver.setName("local-ivy")
+ resolver
+ }
+
+ val central: RepositoryResolver = {
+ // the biblio resolver resolves POM declared dependencies
+ val resolver = new IBiblioResolver
+ resolver.setM2compatible(true)
+ resolver.setUsepoms(true)
+ resolver.setName("central")
+ resolver
+ }
+
+ val sparkPackages: RepositoryResolver = {
+ val resolver = new IBiblioResolver
+ resolver.setM2compatible(true)
+ resolver.setUsepoms(true)
+ resolver.setRoot("http://dl.bintray.com/spark-packages/maven")
+ resolver.setName("spark-packages")
+ resolver
+ }
+
+ val default: Seq[RepositoryResolver] = Seq(
+ localIvy, localM2, sparkPackages, central
+ )
+
+ /**
+ * Chains resolvers in priority-decreasing order. I.e. a package will be taken from the first available
+ * resolver.
+ */
+ def chained(name: String, resolvers: Seq[RepositoryResolver]): ChainResolver = {
+ val cr = new ChainResolver
+ cr.setName(name)
+ for (r <- resolvers) {
+ cr.add(r)
+ }
+ cr
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
index 9ecf49b598..7a4ec9b36b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
@@ -29,7 +29,7 @@ import org.apache.commons.io.FileUtils
import org.apache.ivy.core.settings.IvySettings
import org.apache.spark.TestUtils.{createCompiledClass, JavaSourceFromString}
-import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
+import org.apache.spark.deploy.packages.MavenCoordinate
private[deploy] object IvyTestUtils {
@@ -149,7 +149,7 @@ private[deploy] object IvyTestUtils {
private def createDescriptor(
tempPath: File,
artifact: MavenCoordinate,
- dependencies: Option[Seq[MavenCoordinate]],
+ dependencies: Seq[MavenCoordinate],
useIvyLayout: Boolean): File = {
if (useIvyLayout) {
val ivyXmlPath = pathFromCoordinate(artifact, tempPath, "ivy", true)
@@ -174,7 +174,7 @@ private[deploy] object IvyTestUtils {
private def createPom(
dir: File,
artifact: MavenCoordinate,
- dependencies: Option[Seq[MavenCoordinate]]): File = {
+ dependencies: Seq[MavenCoordinate]): File = {
var content = """
|<?xml version="1.0" encoding="UTF-8"?>
|<project xmlns="http://maven.apache.org/POM/4.0.0"
@@ -184,12 +184,13 @@ private[deploy] object IvyTestUtils {
| <modelVersion>4.0.0</modelVersion>
""".stripMargin.trim
content += pomArtifactWriter(artifact)
- content += dependencies.map { deps =>
- val inside = deps.map { dep =>
+
+ content += {
+ val deps = dependencies.map{ dep =>
"\t<dependency>" + pomArtifactWriter(dep, 3) + "\n\t</dependency>"
}.mkString("\n")
- "\n <dependencies>\n" + inside + "\n </dependencies>"
- }.getOrElse("")
+ if (deps.isEmpty) "" else "\n <dependencies>\n" + deps + "\n </dependencies>"
+ }
content += "\n</project>"
writeFile(dir, artifactName(artifact, false, ".pom"), content.trim)
}
@@ -205,7 +206,7 @@ private[deploy] object IvyTestUtils {
private def createIvyDescriptor(
dir: File,
artifact: MavenCoordinate,
- dependencies: Option[Seq[MavenCoordinate]]): File = {
+ dependencies: Seq[MavenCoordinate]): File = {
var content = s"""
|<?xml version="1.0" encoding="UTF-8"?>
|<ivy-module version="2.0" xmlns:m="http://ant.apache.org/ivy/maven">
@@ -225,10 +226,11 @@ private[deploy] object IvyTestUtils {
| conf="master"/>
| </publications>
""".stripMargin.trim
- content += dependencies.map { deps =>
- val inside = deps.map(ivyArtifactWriter).mkString("\n")
- "\n <dependencies>\n" + inside + "\n </dependencies>"
- }.getOrElse("")
+
+ content += {
+ val deps = dependencies.map(ivyArtifactWriter).mkString("\n")
+ if (deps.isEmpty) "" else "\n <dependencies>\n" + deps + "\n </dependencies>"
+ }
content += "\n</ivy-module>"
writeFile(dir, "ivy.xml", content.trim)
}
@@ -282,7 +284,7 @@ private[deploy] object IvyTestUtils {
*/
private def createLocalRepository(
artifact: MavenCoordinate,
- dependencies: Option[Seq[MavenCoordinate]] = None,
+ dependencies: Seq[MavenCoordinate] = Seq(),
tempDir: Option[File] = None,
useIvyLayout: Boolean = false,
withPython: Boolean = false,
@@ -336,12 +338,12 @@ private[deploy] object IvyTestUtils {
rootDir: Option[File],
useIvyLayout: Boolean = false,
withPython: Boolean = false,
- withR: Boolean = false): File = {
- val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates)
+ withR: Boolean = false): File = {
+ val deps = dependencies.map(_.split(",").toSeq).getOrElse(Seq.empty).map(MavenCoordinate.extract)
val mainRepo = createLocalRepository(artifact, deps, rootDir, useIvyLayout, withPython, withR)
- deps.foreach { seq => seq.foreach { dep =>
- createLocalRepository(dep, None, Some(mainRepo), useIvyLayout, withPython = false)
- }}
+ deps.foreach { dep =>
+ createLocalRepository(dep, Seq.empty, Some(mainRepo), useIvyLayout, withPython = false)
+ }
mainRepo
}
@@ -363,7 +365,7 @@ private[deploy] object IvyTestUtils {
withPython: Boolean = false,
withR: Boolean = false,
ivySettings: IvySettings = new IvySettings)(f: String => Unit): Unit = {
- val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates)
+ val deps = dependencies.map(_.split(",").toSeq).getOrElse(Seq.empty).map(MavenCoordinate.extract)
purgeLocalIvyCache(artifact, deps, ivySettings)
val repo = createLocalRepositoryForTests(artifact, dependencies, rootDir, useIvyLayout,
withPython, withR)
@@ -374,9 +376,8 @@ private[deploy] object IvyTestUtils {
if (repo.toString.contains(".m2") || repo.toString.contains(".ivy2")) {
val groupDir = getBaseGroupDirectory(artifact, useIvyLayout)
FileUtils.deleteDirectory(new File(repo, groupDir + File.separator + artifact.artifactId))
- deps.foreach { _.foreach { dep =>
- FileUtils.deleteDirectory(new File(repo, getBaseGroupDirectory(dep, useIvyLayout)))
- }
+ deps.foreach { dep =>
+ FileUtils.deleteDirectory(new File(repo, getBaseGroupDirectory(dep, useIvyLayout)))
}
} else {
FileUtils.deleteDirectory(repo)
@@ -388,13 +389,12 @@ private[deploy] object IvyTestUtils {
/** Deletes the test packages from the ivy cache */
private def purgeLocalIvyCache(
artifact: MavenCoordinate,
- dependencies: Option[Seq[MavenCoordinate]],
+ dependencies: Seq[MavenCoordinate],
ivySettings: IvySettings): Unit = {
// delete the artifact from the cache as well if it already exists
FileUtils.deleteDirectory(new File(ivySettings.getDefaultCache, artifact.groupId))
- dependencies.foreach { _.foreach { dep =>
- FileUtils.deleteDirectory(new File(ivySettings.getDefaultCache, dep.groupId))
- }
+ dependencies.foreach { dep =>
+ FileUtils.deleteDirectory(new File(ivySettings.getDefaultCache, dep.groupId))
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
index 13cba94578..7fa3a4c8ac 100644
--- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
@@ -32,7 +32,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
import org.apache.spark.api.r.RUtils
-import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
+import org.apache.spark.deploy.packages.MavenCoordinate
import org.apache.spark.util.ResetSystemProperties
class RPackageUtilsSuite
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 2718976992..3b4835cf02 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -30,7 +30,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.SparkSubmit._
-import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
+import org.apache.spark.deploy.packages.MavenCoordinate
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ResetSystemProperties, Utils}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
index 4877710c12..39a2379e1a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.deploy
import java.io.{File, OutputStream, PrintStream}
+import org.apache.spark.deploy.packages.PackageUtil
import scala.collection.mutable.ArrayBuffer
@@ -27,7 +28,7 @@ import org.apache.ivy.plugins.resolver.{AbstractResolver, FileSystemResolver, IB
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
-import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
+import org.apache.spark.deploy.packages._
import org.apache.spark.util.Utils
class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
@@ -51,7 +52,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
override def beforeAll() {
super.beforeAll()
// We don't want to write logs during testing
- SparkSubmitUtils.printStream = new BufferPrintStream
+ PackageUtil.printStream = new BufferPrintStream
tempIvyPath = Utils.createTempDir(namePrefix = "ivy").getAbsolutePath()
}
@@ -59,47 +60,36 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val coordinates = Seq("a:b: ", " :a:b", "a: :b", "a:b:", ":a:b", "a::b", "::", "a:b", "a")
for (coordinate <- coordinates) {
intercept[IllegalArgumentException] {
- SparkSubmitUtils.extractMavenCoordinates(coordinate)
+ MavenCoordinate.extract(coordinate)
}
}
}
test("create repo resolvers") {
- val settings = new IvySettings
- val res1 = SparkSubmitUtils.createRepoResolvers(None, settings)
+ val res1 = Resolvers.chained("test-resolver1", Resolvers.default)
// should have central and spark-packages by default
assert(res1.getResolvers.size() === 4)
- assert(res1.getResolvers.get(0).asInstanceOf[IBiblioResolver].getName === "local-m2-cache")
- assert(res1.getResolvers.get(1).asInstanceOf[FileSystemResolver].getName === "local-ivy-cache")
- assert(res1.getResolvers.get(2).asInstanceOf[IBiblioResolver].getName === "central")
- assert(res1.getResolvers.get(3).asInstanceOf[IBiblioResolver].getName === "spark-packages")
-
- val repos = "a/1,b/2,c/3"
- val resolver2 = SparkSubmitUtils.createRepoResolvers(Option(repos), settings)
- assert(resolver2.getResolvers.size() === 7)
- val expected = repos.split(",").map(r => s"$r/")
- resolver2.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) =>
- if (i < 3) {
- assert(resolver.getName === s"repo-${i + 1}")
- assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i))
- }
- }
+ assert(res1.getResolvers.get(0).asInstanceOf[FileSystemResolver].getName === "local-ivy")
+ assert(res1.getResolvers.get(1).asInstanceOf[IBiblioResolver].getName === "local-maven")
+ assert(res1.getResolvers.get(2).asInstanceOf[IBiblioResolver].getName === "spark-packages")
+ assert(res1.getResolvers.get(3).asInstanceOf[IBiblioResolver].getName === "central")
}
test("add dependencies works correctly") {
- val md = SparkSubmitUtils.getModuleDescriptor
- val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.10:0.1," +
- "com.databricks:spark-avro_2.10:0.1")
+ val md = PackageUtil.parentModule
+ val artifacts = Seq(
+ MavenCoordinate.extract("com.databricks:spark-csv_2.10:0.1"),
+ MavenCoordinate.extract("com.databricks:spark-avro_2.10:0.1"))
- SparkSubmitUtils.addDependenciesToIvy(md, artifacts, "default")
+ PackageUtil.addDependenciesToIvy(md, artifacts, "default")
assert(md.getDependencies.length === 2)
}
test("excludes works correctly") {
- val md = SparkSubmitUtils.getModuleDescriptor
- val excludes = Seq("a:b", "c:d")
+ val md = PackageUtil.parentModule
+ val excludes = Seq("a:b:*", "c:d:*").map(MavenCoordinate.extract)
excludes.foreach { e =>
- md.addExcludeRule(SparkSubmitUtils.createExclusion(e + ":*", new IvySettings, "default"))
+ md.addExcludeRule(PackageUtil.createExclusion(e, new IvySettings, "default"))
}
val rules = md.getAllExcludeRules
assert(rules.length === 2)
@@ -109,15 +99,12 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val rule2 = rules(1).getId.getModuleId
assert(rule2.getOrganisation === "c")
assert(rule2.getName === "d")
- intercept[IllegalArgumentException] {
- SparkSubmitUtils.createExclusion("e:f:g:h", new IvySettings, "default")
- }
}
test("ivy path works correctly") {
- val md = SparkSubmitUtils.getModuleDescriptor
+ val md = PackageUtil.parentModule
val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar")
- var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(tempIvyPath))
+ var jPaths = PackageUtil.resolveDependencyPaths(artifacts, new File(tempIvyPath)).map(_.getAbsolutePath).mkString(",")
for (i <- 0 until 3) {
val index = jPaths.indexOf(tempIvyPath)
assert(index >= 0)
@@ -126,8 +113,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1")
IvyTestUtils.withRepository(main, None, None) { repo =>
// end to end
- val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, Option(repo),
- Option(tempIvyPath), isTest = true)
+ val jarPath = PackageUtil.resolveMavenCoordinates(Seq(main.toString), Seq(repo), Some(tempIvyPath), useDefaultRepos = false).mkString(",")
assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path")
}
}
@@ -135,38 +121,26 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
test("search for artifact at local repositories") {
val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
val dep = "my.great.dep:mydep:0.5"
+ val dummyM2 = new File("dummy", ".m2" + File.separator + "repository")
// Local M2 repository
- IvyTestUtils.withRepository(main, Some(dep), Some(SparkSubmitUtils.m2Path)) { repo =>
- val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None,
- isTest = true)
+ IvyTestUtils.withRepository(main, Some(dep), Some(dummyM2)) { repo =>
+ val jarPath = PackageUtil.resolveMavenCoordinates(Seq(main.toString), Seq(repo), useDefaultRepos = false).mkString(",")
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
}
// Local Ivy Repository
val settings = new IvySettings
- val ivyLocal = new File(settings.getDefaultIvyUserDir, "local" + File.separator)
+ val ivyLocal = new File(settings.getDefaultIvyUserDir, "local")
IvyTestUtils.withRepository(main, Some(dep), Some(ivyLocal), useIvyLayout = true) { repo =>
- val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None,
- isTest = true)
- assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
- assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
- }
- // Local ivy repository with modified home
- val dummyIvyLocal = new File(tempIvyPath, "local" + File.separator)
- settings.setDefaultIvyUserDir(new File(tempIvyPath))
- IvyTestUtils.withRepository(main, Some(dep), Some(dummyIvyLocal), useIvyLayout = true,
- ivySettings = settings) { repo =>
- val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None,
- Some(tempIvyPath), isTest = true)
+ val jarPath = PackageUtil.resolveMavenCoordinates(Seq(main.toString)).mkString(",")
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
- assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path")
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
}
}
test("dependency not found throws RuntimeException") {
- intercept[RuntimeException] {
- SparkSubmitUtils.resolveMavenCoordinates("a:b:c", None, None, isTest = true)
+ intercept[PackageNotFoundException] {
+ PackageUtil.resolveMavenCoordinates(Seq("a:b:c"))
}
}
@@ -174,16 +148,15 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val components = Seq("catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
- val coordinates =
- components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") +
- ",org.apache.spark:spark-core_fake:1.2.0"
+ val coordinates = components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0") ++
+ Seq("org.apache.spark:spark-core_fake:1.2.0")
- val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, isTest = true)
- assert(path === "", "should return empty path")
+ val path = PackageUtil.resolveMavenCoordinates(coordinates)
+ assert(path.isEmpty, "should return empty path")
+
val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0")
IvyTestUtils.withRepository(main, None, None) { repo =>
- val files = SparkSubmitUtils.resolveMavenCoordinates(coordinates + "," + main.toString,
- Some(repo), None, isTest = true)
+ val files = PackageUtil.resolveMavenCoordinates(coordinates ++ Seq(main.toString), Seq(repo)).mkString(",")
assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
}
}
@@ -192,8 +165,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
val dep = "my.great.dep:mydep:0.5"
IvyTestUtils.withRepository(main, Some(dep), None) { repo =>
- val files = SparkSubmitUtils.resolveMavenCoordinates(main.toString,
- Some(repo), None, Seq("my.great.dep:mydep"), isTest = true)
+ val files = PackageUtil.resolveMavenCoordinates(
+ Seq(main.toString), Seq(repo), None, Seq("my.great.dep:mydep"), false).mkString(",")
assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
assert(files.indexOf("my.great.dep") < 0, "Returned excluded artifact")
}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
index 6767cc5079..2d1150e6a6 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
@@ -51,6 +51,7 @@ class SparkSubmitOptionParser {
protected final String MASTER = "--master";
protected final String NAME = "--name";
protected final String PACKAGES = "--packages";
+ protected final String PROVIDED_PACKAGES = "--provided";
protected final String PACKAGES_EXCLUDE = "--exclude-packages";
protected final String PROPERTIES_FILE = "--properties-file";
protected final String PROXY_USER = "--proxy-user";
@@ -125,6 +126,7 @@ class SparkSubmitOptionParser {
{ SUPERVISE },
{ USAGE_ERROR },
{ VERBOSE, "-v" },
+ { PROVIDED_PACKAGES },
{ VERSION },
};
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index f45264af34..4c6116d2d0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -110,10 +110,10 @@ private[hive] object IsolatedClientLoader extends Logging {
val classpath = quietly {
SparkSubmitUtils.resolveMavenCoordinates(
- hiveArtifacts.mkString(","),
- Some("http://www.datanucleus.org/downloads/maven2"),
+ hiveArtifacts,
+ Seq("http://www.datanucleus.org/downloads/maven2"),
ivyPath,
- exclusions = version.exclusions)
+ exclusions = version.exclusions).mkString(",")
}
val allFiles = classpath.split(",").map(new File(_)).toSet