diff options
12 files changed, 445 insertions, 376 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 926e1ff7a8..2ef81cbadc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -28,20 +28,10 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, Map} import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.hadoop.security.UserGroupInformation -import org.apache.ivy.Ivy -import org.apache.ivy.core.LogOptions -import org.apache.ivy.core.module.descriptor._ -import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId} -import org.apache.ivy.core.report.ResolveReport -import org.apache.ivy.core.resolve.ResolveOptions -import org.apache.ivy.core.retrieve.RetrieveOptions -import org.apache.ivy.core.settings.IvySettings -import org.apache.ivy.plugins.matcher.GlobPatternMatcher -import org.apache.ivy.plugins.repository.file.FileRepository -import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver} import org.apache.spark.{SPARK_VERSION, SparkException, SparkUserAppException} import org.apache.spark.api.r.RUtils +import org.apache.spark.deploy.packages._ import org.apache.spark.deploy.rest._ import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} @@ -281,16 +271,14 @@ object SparkSubmit { val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER - // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files - // too for packages that include Python code - val exclusions: Seq[String] = - if (!StringUtils.isBlank(args.packagesExclusions)) { - args.packagesExclusions.split(",") - } else { - Nil - } - val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages, - Option(args.repositories), Option(args.ivyRepoPath), exclusions = exclusions) + val resolvedMavenCoordinates: String = SparkSubmitUtils.resolveMavenCoordinates( + packages = Option(args.packages).map(_.split(",").toSeq).getOrElse(Seq.empty), + extraRepos = Option(args.repositories).map(_.split(",").toSeq).getOrElse(Seq.empty), + ivyPath = Option(args.ivyRepoPath), + exclusions = Option(args.packagesExclusions).map(_.split(",").toSeq).getOrElse(Seq.empty), + provided = args.providedPackages + ).mkString(",") + if (!StringUtils.isBlank(resolvedMavenCoordinates)) { args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) if (args.isPython) { @@ -812,277 +800,39 @@ object SparkSubmit { /** Provides utility functions to be used inside SparkSubmit. */ private[spark] object SparkSubmitUtils { - // Exposed for testing - var printStream = SparkSubmit.printStream - - /** - * Represents a Maven Coordinate - * @param groupId the groupId of the coordinate - * @param artifactId the artifactId of the coordinate - * @param version the version of the coordinate - */ - private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String) { - override def toString: String = s"$groupId:$artifactId:$version" - } - -/** - * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided - * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. - * @param coordinates Comma-delimited string of maven coordinates - * @return Sequence of Maven coordinates - */ - def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = { - coordinates.split(",").map { p => - val splits = p.replace("/", ":").split(":") - require(splits.length == 3, s"Provided Maven Coordinates must be in the form " + - s"'groupId:artifactId:version'. The coordinate provided is: $p") - require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " + - s"be whitespace. The groupId provided is: ${splits(0)}") - require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " + - s"be whitespace. The artifactId provided is: ${splits(1)}") - require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " + - s"be whitespace. The version provided is: ${splits(2)}") - new MavenCoordinate(splits(0), splits(1), splits(2)) - } - } - - /** Path of the local Maven cache. */ - private[spark] def m2Path: File = { - if (Utils.isTesting) { - // test builds delete the maven cache, and this can cause flakiness - new File("dummy", ".m2" + File.separator + "repository") - } else { - new File(System.getProperty("user.home"), ".m2" + File.separator + "repository") - } - } - - /** - * Extracts maven coordinates from a comma-delimited string - * @param remoteRepos Comma-delimited string of remote repositories - * @param ivySettings The Ivy settings for this session - * @return A ChainResolver used by Ivy to search for and resolve dependencies. - */ - def createRepoResolvers(remoteRepos: Option[String], ivySettings: IvySettings): ChainResolver = { - // We need a chain resolver if we want to check multiple repositories - val cr = new ChainResolver - cr.setName("list") - - val repositoryList = remoteRepos.getOrElse("") - // add any other remote repositories other than maven central - if (repositoryList.trim.nonEmpty) { - repositoryList.split(",").zipWithIndex.foreach { case (repo, i) => - val brr: IBiblioResolver = new IBiblioResolver - brr.setM2compatible(true) - brr.setUsepoms(true) - brr.setRoot(repo) - brr.setName(s"repo-${i + 1}") - cr.add(brr) - // scalastyle:off println - printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}") - // scalastyle:on println - } - } - - val localM2 = new IBiblioResolver - localM2.setM2compatible(true) - localM2.setRoot(m2Path.toURI.toString) - localM2.setUsepoms(true) - localM2.setName("local-m2-cache") - cr.add(localM2) - - val localIvy = new FileSystemResolver - val localIvyRoot = new File(ivySettings.getDefaultIvyUserDir, "local") - localIvy.setLocal(true) - localIvy.setRepository(new FileRepository(localIvyRoot)) - val ivyPattern = Seq("[organisation]", "[module]", "[revision]", "[type]s", - "[artifact](-[classifier]).[ext]").mkString(File.separator) - localIvy.addIvyPattern(localIvyRoot.getAbsolutePath + File.separator + ivyPattern) - localIvy.setName("local-ivy-cache") - cr.add(localIvy) - - // the biblio resolver resolves POM declared dependencies - val br: IBiblioResolver = new IBiblioResolver - br.setM2compatible(true) - br.setUsepoms(true) - br.setName("central") - cr.add(br) - - val sp: IBiblioResolver = new IBiblioResolver - sp.setM2compatible(true) - sp.setUsepoms(true) - sp.setRoot("http://dl.bintray.com/spark-packages/maven") - sp.setName("spark-packages") - cr.add(sp) - cr - } - - /** - * Output a comma-delimited list of paths for the downloaded jars to be added to the classpath - * (will append to jars in SparkSubmit). - * @param artifacts Sequence of dependencies that were resolved and retrieved - * @param cacheDirectory directory where jars are cached - * @return a comma-delimited list of paths for the dependencies - */ - def resolveDependencyPaths( - artifacts: Array[AnyRef], - cacheDirectory: File): String = { - artifacts.map { artifactInfo => - val artifact = artifactInfo.asInstanceOf[Artifact].getModuleRevisionId - cacheDirectory.getAbsolutePath + File.separator + - s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}.jar" - }.mkString(",") - } + //compatibility with the way arguments are submitted to spark + def resolveMavenCoordinates( + packages: Seq[String], + extraRepos: Seq[String], + ivyPath: Option[String], + exclusions: Seq[String], + provided: Boolean = false + ): Seq[String] = { - /** Adds the given maven coordinates to Ivy's module descriptor. */ - def addDependenciesToIvy( - md: DefaultModuleDescriptor, - artifacts: Seq[MavenCoordinate], - ivyConfName: String): Unit = { - artifacts.foreach { mvn => - val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version) - val dd = new DefaultDependencyDescriptor(ri, false, false) - dd.addDependencyConfiguration(ivyConfName, ivyConfName) - // scalastyle:off println - printStream.println(s"${dd.getDependencyId} added as a dependency") - // scalastyle:on println - md.addDependency(dd) + val mavenCoordinates = packages map { pkg => + MavenCoordinate.extract(pkg, provided) } - } - /** Add exclusion rules for dependencies already included in the spark-assembly */ - def addExclusionRules( - ivySettings: IvySettings, - ivyConfName: String, - md: DefaultModuleDescriptor): Unit = { - // Add scala exclusion rule - md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName)) - - // We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and - // other spark-streaming utility components. Underscore is there to differentiate between - // spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x - val components = Seq("catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_", - "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_") - - components.foreach { comp => - md.addExcludeRule(createExclusion(s"org.apache.spark:spark-$comp*:*", ivySettings, - ivyConfName)) + val extraResolvers = extraRepos.zipWithIndex flatMap { case (repo, i) => + Seq(Resolvers.maven(s"repo-maven-$i", repo), Resolvers.ivy(s"repo-ivy-$i", repo)) } - } - - /** A nice function to use in tests as well. Values are dummy strings. */ - def getModuleDescriptor: DefaultModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance( - ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0")) - /** - * Resolves any dependencies that were supplied through maven coordinates - * @param coordinates Comma-delimited string of maven coordinates - * @param remoteRepos Comma-delimited string of remote repositories other than maven central - * @param ivyPath The path to the local ivy repository - * @param exclusions Exclusions to apply when resolving transitive dependencies - * @return The comma-delimited path to the jars of the given maven artifacts including their - * transitive dependencies - */ - def resolveMavenCoordinates( - coordinates: String, - remoteRepos: Option[String], - ivyPath: Option[String], - exclusions: Seq[String] = Nil, - isTest: Boolean = false): String = { - if (coordinates == null || coordinates.trim.isEmpty) { - "" + val resolvers = if (!provided) { + Resolvers.default } else { - val sysOut = System.out - try { - // To prevent ivy from logging to system out - System.setOut(printStream) - val artifacts = extractMavenCoordinates(coordinates) - // Default configuration name for ivy - val ivyConfName = "default" - // set ivy settings for location of cache - val ivySettings: IvySettings = new IvySettings - // Directories for caching downloads through ivy and storing the jars when maven coordinates - // are supplied to spark-submit - val alternateIvyCache = ivyPath.getOrElse("") - val packagesDirectory: File = - if (alternateIvyCache == null || alternateIvyCache.trim.isEmpty) { - new File(ivySettings.getDefaultIvyUserDir, "jars") - } else { - ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache)) - ivySettings.setDefaultCache(new File(alternateIvyCache, "cache")) - new File(alternateIvyCache, "jars") - } - // scalastyle:off println - printStream.println( - s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}") - printStream.println(s"The jars for the packages stored in: $packagesDirectory") - // scalastyle:on println - // create a pattern matcher - ivySettings.addMatcher(new GlobPatternMatcher) - // create the dependency resolvers - val repoResolver = createRepoResolvers(remoteRepos, ivySettings) - ivySettings.addResolver(repoResolver) - ivySettings.setDefaultResolver(repoResolver.getName) - - val ivy = Ivy.newInstance(ivySettings) - // Set resolve options to download transitive dependencies as well - val resolveOptions = new ResolveOptions - resolveOptions.setTransitive(true) - val retrieveOptions = new RetrieveOptions - // Turn downloading and logging off for testing - if (isTest) { - resolveOptions.setDownload(false) - resolveOptions.setLog(LogOptions.LOG_QUIET) - retrieveOptions.setLog(LogOptions.LOG_QUIET) - } else { - resolveOptions.setDownload(true) - } + Seq.empty + } ++ extraResolvers - // A Module descriptor must be specified. Entries are dummy strings - val md = getModuleDescriptor - // clear ivy resolution from previous launches. The resolution file is usually at - // ~/.ivy2/org.apache.spark-spark-submit-parent-default.xml. In between runs, this file - // leads to confusion with Ivy when the files can no longer be found at the repository - // declared in that file/ - val mdId = md.getModuleRevisionId - val previousResolution = new File(ivySettings.getDefaultCache, - s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml") - if (previousResolution.exists) previousResolution.delete - - md.setDefaultConf(ivyConfName) - - // Add exclusion rules for Spark and Scala Library - addExclusionRules(ivySettings, ivyConfName, md) - // add all supplied maven artifacts as dependencies - addDependenciesToIvy(md, artifacts, ivyConfName) - exclusions.foreach { e => - md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName)) - } - // resolve dependencies - val rr: ResolveReport = ivy.resolve(md, resolveOptions) - if (rr.hasError) { - throw new RuntimeException(rr.getAllProblemMessages.toString) - } - // retrieve all resolved dependencies - ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, - packagesDirectory.getAbsolutePath + File.separator + - "[organization]_[artifact]-[revision].[ext]", - retrieveOptions.setConfs(Array(ivyConfName))) - resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) - } finally { - System.setOut(sysOut) - } + val excludedCoordinates = exclusions.map{ex => + MavenCoordinate.extract(ex + ":*") } - } - private[deploy] def createExclusion( - coords: String, - ivySettings: IvySettings, - ivyConfName: String): ExcludeRule = { - val c = extractMavenCoordinates(coords)(0) - val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", "*") - val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null) - rule.addConfiguration(ivyConfName) - rule + PackageUtil.resolve( + mavenCoordinates, + resolvers, + ivyPath, + excludedCoordinates + ).map(_.getAbsolutePath) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index ec6d48485f..6d2710844a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -61,6 +61,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var repositories: String = null var ivyRepoPath: String = null var packagesExclusions: String = null + var providedPackages: Boolean = false var verbose: Boolean = false var isPython: Boolean = false var pyFiles: String = null @@ -313,6 +314,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | packages $packages | packagesExclusions $packagesExclusions | repositories $repositories + | providedPackages $providedPackages | verbose $verbose | |Spark properties used, including those specified through @@ -428,6 +430,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S case HELP => printUsageAndExit(0) + case PROVIDED_PACKAGES => + providedPackages = true + case VERBOSE => verbose = true diff --git a/core/src/main/scala/org/apache/spark/deploy/packages/MavenCoordinate.scala b/core/src/main/scala/org/apache/spark/deploy/packages/MavenCoordinate.scala new file mode 100644 index 0000000000..992d357b60 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/packages/MavenCoordinate.scala @@ -0,0 +1,39 @@ +package org.apache.spark.deploy.packages + +/** + * Represents a Maven Coordinate + * @param groupId the groupId of the coordinate + * @param artifactId the artifactId of the coordinate + * @param version the version of the coordinate + */ +private [spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String) { + override def toString: String = s"$groupId:$artifactId:$version" +} + +object MavenCoordinate { + + def extract(coordinate: String, chooseLatest: Boolean = false): MavenCoordinate = { + val splits = coordinate.replace("/", ":").split(":").toList + + splits match { + case org :: name :: version :: Nil => + new MavenCoordinate(org, name, version) + + case org :: name :: Nil if chooseLatest => + new MavenCoordinate(org, name, "+") + + case _ => + throw new RuntimeException( + "Invalid maven coodinate. Must be in the form 'org:name:version'") + + } + } + + /** + * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided + * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. + * @param coordinates Comma-delimited string of maven coordinates + * @return Sequence of Maven coordinates + */ + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/packages/PackageNotFoundException.scala b/core/src/main/scala/org/apache/spark/deploy/packages/PackageNotFoundException.scala new file mode 100644 index 0000000000..a366811f08 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/packages/PackageNotFoundException.scala @@ -0,0 +1,3 @@ +package org.apache.spark.deploy.packages + +class PackageNotFoundException(message: String) extends RuntimeException(message) diff --git a/core/src/main/scala/org/apache/spark/deploy/packages/PackageUtil.scala b/core/src/main/scala/org/apache/spark/deploy/packages/PackageUtil.scala new file mode 100644 index 0000000000..12a73d97fd --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/packages/PackageUtil.scala @@ -0,0 +1,203 @@ +package org.apache.spark.deploy.packages + +import java.io.{ File, PrintStream } + +import org.apache.ivy.Ivy +import org.apache.ivy.core.LogOptions +import org.apache.ivy.core.module.descriptor._ +import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId} +import org.apache.ivy.core.report.ResolveReport +import org.apache.ivy.core.resolve.ResolveOptions +import org.apache.ivy.core.retrieve.RetrieveOptions +import org.apache.ivy.core.settings.IvySettings +import org.apache.ivy.plugins.matcher.GlobPatternMatcher +import org.apache.ivy.plugins.repository.file.FileRepository +import org.apache.ivy.plugins.resolver.{ ChainResolver, FileSystemResolver, IBiblioResolver, RepositoryResolver } +import org.apache.spark.deploy.SparkSubmit +import org.apache.spark.util.Utils +import scala.collection.JavaConversions._ + +private[spark] object PackageUtil { + + // Exposed for testing + private[spark] var printStream: PrintStream = System.err + private def usingStream[A](action: => A): A = { + val sysOut = System.out + try { + System.setOut(printStream) + action + } finally { + System.setOut(sysOut) + } + } + + /** + * Output a list of paths for the downloaded jars to be added to the classpath. + * @param artifacts Sequence of dependencies that were resolved and retrieved + * @param cacheDirectory directory where jars are cached + * @return a comma-delimited list of paths for the dependencies + */ + def resolveDependencyPaths(artifacts: Seq[Artifact], cacheDirectory: File): Seq[File] = { + artifacts.map{artifactInfo => + val artifact = artifactInfo.getModuleRevisionId + val name = s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}.jar" + new File(cacheDirectory, name) + } + } + + /** Adds the given maven coordinates to Ivy's module descriptor. */ + def addDependenciesToIvy( + md: DefaultModuleDescriptor, + coordinates: Seq[MavenCoordinate], + ivyConfName: String + ): Unit = for (mvn <- coordinates) { + val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version) + val dd = new DefaultDependencyDescriptor(ri, false, false) + dd.addDependencyConfiguration(ivyConfName, ivyConfName) + // scalastyle:off println + printStream.println(s"${dd.getDependencyId} added as a dependency") + // scalastyle:on println + md.addDependency(dd) + } + + private[deploy] def createExclusion( + coords: MavenCoordinate, + ivySettings: IvySettings, + ivyConfName: String + ): ExcludeRule = { + val id = new ArtifactId(new ModuleId(coords.groupId, coords.artifactId), "*", "*", "*") + val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null) + rule.addConfiguration(ivyConfName) + rule + } + + /** Add exclusion rules for dependencies already included in the spark-assembly */ + private def addSparkExclusionRules( + ivySettings: IvySettings, + ivyConfName: String, + md: DefaultModuleDescriptor + ): Unit = { + // Add scala exclusion rule + md.addExcludeRule(createExclusion(MavenCoordinate("*", "scala-library", "*"), ivySettings, ivyConfName)) + + // We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and + // other spark-streaming utility components. Underscore is there to differentiate between + // spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x + val components = Seq("catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_", + "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_") + + components.foreach{ comp => + md.addExcludeRule(createExclusion( + MavenCoordinate("org.apache.spark", s"spark-${comp}*", "*"), + ivySettings, + ivyConfName)) + } + } + + /** A nice function to use in tests as well. Values are dummy strings. */ + private[spark] def parentModule: DefaultModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance( + ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0") + ) + + /** + * Main entry point into spark packages. + * Resolves any dependencies that were supplied through maven coordinates + * @param coordinates List of packages given as maven coordinates + * @param resolvers Repositories that will be considered during resolution + * @param ivyPath Path to the local ivy directory, used for configuring and caching (not the local .ivy2/local repository) + * @param exclusions Exclusions to apply when resolving transitive dependencies (do not include version) + * @return Jar files of the given maven artifacts including their transitive dependencies + */ + private[spark] def resolve( + coordinates: Seq[MavenCoordinate], + resolvers: Seq[RepositoryResolver], + ivyPath: Option[String], // + exclusions: Seq[MavenCoordinate] + ): Seq[File] = usingStream { + + val ivyConfName = "default" + + val ivySettings: IvySettings = ivyPath match { + case None => new IvySettings + case Some(path) => + val settings = new IvySettings + settings.setDefaultIvyUserDir(new File(path)) + settings.setDefaultCache(new File(path, "cache")) + settings + } + + // Directories for caching downloads through ivy and storing the jars when maven coordinates + // are supplied to spark-submit + val packagesDirectory: File = new File(ivySettings.getDefaultIvyUserDir, "jars") + + // scalastyle:off println + printStream.println( + s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}" + ) + printStream.println(s"The jars for the packages stored in: $packagesDirectory") + //scalastyle:on println + + // create a pattern matcher + ivySettings.addMatcher(new GlobPatternMatcher) + // create the dependency resolvers + val repoResolver = Resolvers.chained("packages-resolver", resolvers) + ivySettings.addResolver(repoResolver) + ivySettings.setDefaultResolver(repoResolver.getName) + + val ivy = Ivy.newInstance(ivySettings) + // Set resolve options to download transitive dependencies as well + val resolveOptions = new ResolveOptions + resolveOptions.setTransitive(true) + val retrieveOptions = new RetrieveOptions + + // Turn downloading and logging off for testing + if (Utils.isTesting) { + resolveOptions.setDownload(false) + resolveOptions.setLog(LogOptions.LOG_QUIET) + retrieveOptions.setLog(LogOptions.LOG_QUIET) + } else { + resolveOptions.setDownload(true) + } + + // A parent module descriptor must be specified. Entries are dummy strings + val md = parentModule + // clear ivy resolution from previous launches. The resolution file is usually at + // ~/.ivy2/org.apache.spark-spark-submit-parent-default.xml. In between runs, this file + // leads to confusion with Ivy when the files can no longer be found at the repository + // declared in that file/ + val mdId = md.getModuleRevisionId + val previousResolution = new File( + ivySettings.getDefaultCache, + s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml" + ) + if (previousResolution.exists) previousResolution.delete + + md.setDefaultConf(ivyConfName) + + // Add exclusion rules for Spark and Scala Library + addSparkExclusionRules(ivySettings, ivyConfName, md) + // add all supplied maven artifacts as dependencies + addDependenciesToIvy(md, coordinates, ivyConfName) + exclusions.foreach { e => + md.addExcludeRule(createExclusion(e, ivySettings, ivyConfName)) + } + + // resolve dependencies + val rr: ResolveReport = ivy.resolve(md, resolveOptions) + if (rr.hasError) { + throw new PackageNotFoundException(rr.getAllProblemMessages.toString) + } + // retrieve all resolved dependencies + ivy.retrieve( + rr.getModuleDescriptor.getModuleRevisionId, + packagesDirectory.getAbsolutePath + File.separator + + "[organization]_[artifact]-[revision].[ext]", + retrieveOptions.setConfs(Array(ivyConfName)) + ) + resolveDependencyPaths( + rr.getArtifacts.map(_.asInstanceOf[Artifact]), //ivy uses a raw List type + packagesDirectory + ) + } + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/packages/Resolvers.scala b/core/src/main/scala/org/apache/spark/deploy/packages/Resolvers.scala new file mode 100644 index 0000000000..57f9f1a83c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/packages/Resolvers.scala @@ -0,0 +1,94 @@ +package org.apache.spark.deploy.packages + +import java.io.File +import java.net.URL +import org.apache.ivy.plugins.repository.file.FileRepository +import org.apache.ivy.plugins.repository.url.URLRepository +import org.apache.ivy.plugins.resolver.{ ChainResolver, FileSystemResolver, IBiblioResolver, RepositoryResolver, URLResolver } +import org.apache.spark.util.Utils + +private[spark] object Resolvers { + + private val home = sys.props.get("user.home").getOrElse(".") + + /** + * A file-based maven repository. + * @param root url base of directory of the repository + */ + def maven(name: String, root: String): RepositoryResolver = { + val r = new IBiblioResolver + r.setM2compatible(true) + r.setRoot(root) + r.setUsepoms(true) + r.setName(name) + r + } + + val localM2: RepositoryResolver = { + val root = if (Utils.isTesting) { + // test builds delete the maven cache, and this can cause flakiness + new File("dummy", ".m2" + File.separator + "repository") + } else { + new File(home, ".m2" + File.separator + "repository") + } + maven("local-maven", root.toURI.toString) + } + + def ivy(name: String, root: String): RepositoryResolver = { + val resolver = new URLResolver + resolver.setM2compatible(true) + val ivyPattern = Seq("[organisation]","[module]", "[revision]", "[type]s", + "[artifact](-[classifier]).[ext]").mkString(File.separator) + resolver.addIvyPattern(root + "/" + ivyPattern) + resolver.setName("local-ivy") + resolver + } + + val localIvy: RepositoryResolver = { + val root = new File(home, ".ivy2" + File.separator + "local") + val resolver = new FileSystemResolver + resolver.setLocal(true) + resolver.setRepository(new FileRepository(root)) + val ivyPattern = Seq("[organisation]", "[module]", "[revision]", "[type]s", + "[artifact](-[classifier]).[ext]").mkString(File.separator) + resolver.addIvyPattern(root.getAbsolutePath + File.separator + ivyPattern) + resolver.setName("local-ivy") + resolver + } + + val central: RepositoryResolver = { + // the biblio resolver resolves POM declared dependencies + val resolver = new IBiblioResolver + resolver.setM2compatible(true) + resolver.setUsepoms(true) + resolver.setName("central") + resolver + } + + val sparkPackages: RepositoryResolver = { + val resolver = new IBiblioResolver + resolver.setM2compatible(true) + resolver.setUsepoms(true) + resolver.setRoot("http://dl.bintray.com/spark-packages/maven") + resolver.setName("spark-packages") + resolver + } + + val default: Seq[RepositoryResolver] = Seq( + localIvy, localM2, sparkPackages, central + ) + + /** + * Chains resolvers in priority-decreasing order. I.e. a package will be taken from the first available + * resolver. + */ + def chained(name: String, resolvers: Seq[RepositoryResolver]): ChainResolver = { + val cr = new ChainResolver + cr.setName(name) + for (r <- resolvers) { + cr.add(r) + } + cr + } + +} diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala index 9ecf49b598..7a4ec9b36b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala @@ -29,7 +29,7 @@ import org.apache.commons.io.FileUtils import org.apache.ivy.core.settings.IvySettings import org.apache.spark.TestUtils.{createCompiledClass, JavaSourceFromString} -import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate +import org.apache.spark.deploy.packages.MavenCoordinate private[deploy] object IvyTestUtils { @@ -149,7 +149,7 @@ private[deploy] object IvyTestUtils { private def createDescriptor( tempPath: File, artifact: MavenCoordinate, - dependencies: Option[Seq[MavenCoordinate]], + dependencies: Seq[MavenCoordinate], useIvyLayout: Boolean): File = { if (useIvyLayout) { val ivyXmlPath = pathFromCoordinate(artifact, tempPath, "ivy", true) @@ -174,7 +174,7 @@ private[deploy] object IvyTestUtils { private def createPom( dir: File, artifact: MavenCoordinate, - dependencies: Option[Seq[MavenCoordinate]]): File = { + dependencies: Seq[MavenCoordinate]): File = { var content = """ |<?xml version="1.0" encoding="UTF-8"?> |<project xmlns="http://maven.apache.org/POM/4.0.0" @@ -184,12 +184,13 @@ private[deploy] object IvyTestUtils { | <modelVersion>4.0.0</modelVersion> """.stripMargin.trim content += pomArtifactWriter(artifact) - content += dependencies.map { deps => - val inside = deps.map { dep => + + content += { + val deps = dependencies.map{ dep => "\t<dependency>" + pomArtifactWriter(dep, 3) + "\n\t</dependency>" }.mkString("\n") - "\n <dependencies>\n" + inside + "\n </dependencies>" - }.getOrElse("") + if (deps.isEmpty) "" else "\n <dependencies>\n" + deps + "\n </dependencies>" + } content += "\n</project>" writeFile(dir, artifactName(artifact, false, ".pom"), content.trim) } @@ -205,7 +206,7 @@ private[deploy] object IvyTestUtils { private def createIvyDescriptor( dir: File, artifact: MavenCoordinate, - dependencies: Option[Seq[MavenCoordinate]]): File = { + dependencies: Seq[MavenCoordinate]): File = { var content = s""" |<?xml version="1.0" encoding="UTF-8"?> |<ivy-module version="2.0" xmlns:m="http://ant.apache.org/ivy/maven"> @@ -225,10 +226,11 @@ private[deploy] object IvyTestUtils { | conf="master"/> | </publications> """.stripMargin.trim - content += dependencies.map { deps => - val inside = deps.map(ivyArtifactWriter).mkString("\n") - "\n <dependencies>\n" + inside + "\n </dependencies>" - }.getOrElse("") + + content += { + val deps = dependencies.map(ivyArtifactWriter).mkString("\n") + if (deps.isEmpty) "" else "\n <dependencies>\n" + deps + "\n </dependencies>" + } content += "\n</ivy-module>" writeFile(dir, "ivy.xml", content.trim) } @@ -282,7 +284,7 @@ private[deploy] object IvyTestUtils { */ private def createLocalRepository( artifact: MavenCoordinate, - dependencies: Option[Seq[MavenCoordinate]] = None, + dependencies: Seq[MavenCoordinate] = Seq(), tempDir: Option[File] = None, useIvyLayout: Boolean = false, withPython: Boolean = false, @@ -336,12 +338,12 @@ private[deploy] object IvyTestUtils { rootDir: Option[File], useIvyLayout: Boolean = false, withPython: Boolean = false, - withR: Boolean = false): File = { - val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates) + withR: Boolean = false): File = { + val deps = dependencies.map(_.split(",").toSeq).getOrElse(Seq.empty).map(MavenCoordinate.extract) val mainRepo = createLocalRepository(artifact, deps, rootDir, useIvyLayout, withPython, withR) - deps.foreach { seq => seq.foreach { dep => - createLocalRepository(dep, None, Some(mainRepo), useIvyLayout, withPython = false) - }} + deps.foreach { dep => + createLocalRepository(dep, Seq.empty, Some(mainRepo), useIvyLayout, withPython = false) + } mainRepo } @@ -363,7 +365,7 @@ private[deploy] object IvyTestUtils { withPython: Boolean = false, withR: Boolean = false, ivySettings: IvySettings = new IvySettings)(f: String => Unit): Unit = { - val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates) + val deps = dependencies.map(_.split(",").toSeq).getOrElse(Seq.empty).map(MavenCoordinate.extract) purgeLocalIvyCache(artifact, deps, ivySettings) val repo = createLocalRepositoryForTests(artifact, dependencies, rootDir, useIvyLayout, withPython, withR) @@ -374,9 +376,8 @@ private[deploy] object IvyTestUtils { if (repo.toString.contains(".m2") || repo.toString.contains(".ivy2")) { val groupDir = getBaseGroupDirectory(artifact, useIvyLayout) FileUtils.deleteDirectory(new File(repo, groupDir + File.separator + artifact.artifactId)) - deps.foreach { _.foreach { dep => - FileUtils.deleteDirectory(new File(repo, getBaseGroupDirectory(dep, useIvyLayout))) - } + deps.foreach { dep => + FileUtils.deleteDirectory(new File(repo, getBaseGroupDirectory(dep, useIvyLayout))) } } else { FileUtils.deleteDirectory(repo) @@ -388,13 +389,12 @@ private[deploy] object IvyTestUtils { /** Deletes the test packages from the ivy cache */ private def purgeLocalIvyCache( artifact: MavenCoordinate, - dependencies: Option[Seq[MavenCoordinate]], + dependencies: Seq[MavenCoordinate], ivySettings: IvySettings): Unit = { // delete the artifact from the cache as well if it already exists FileUtils.deleteDirectory(new File(ivySettings.getDefaultCache, artifact.groupId)) - dependencies.foreach { _.foreach { dep => - FileUtils.deleteDirectory(new File(ivySettings.getDefaultCache, dep.groupId)) - } + dependencies.foreach { dep => + FileUtils.deleteDirectory(new File(ivySettings.getDefaultCache, dep.groupId)) } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala index 13cba94578..7fa3a4c8ac 100644 --- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala @@ -32,7 +32,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite import org.apache.spark.api.r.RUtils -import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate +import org.apache.spark.deploy.packages.MavenCoordinate import org.apache.spark.util.ResetSystemProperties class RPackageUtilsSuite diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 2718976992..3b4835cf02 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -30,7 +30,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.SparkSubmit._ -import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate +import org.apache.spark.deploy.packages.MavenCoordinate import org.apache.spark.internal.Logging import org.apache.spark.util.{ResetSystemProperties, Utils} diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 4877710c12..39a2379e1a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy import java.io.{File, OutputStream, PrintStream} +import org.apache.spark.deploy.packages.PackageUtil import scala.collection.mutable.ArrayBuffer @@ -27,7 +28,7 @@ import org.apache.ivy.plugins.resolver.{AbstractResolver, FileSystemResolver, IB import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite -import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate +import org.apache.spark.deploy.packages._ import org.apache.spark.util.Utils class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { @@ -51,7 +52,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { override def beforeAll() { super.beforeAll() // We don't want to write logs during testing - SparkSubmitUtils.printStream = new BufferPrintStream + PackageUtil.printStream = new BufferPrintStream tempIvyPath = Utils.createTempDir(namePrefix = "ivy").getAbsolutePath() } @@ -59,47 +60,36 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val coordinates = Seq("a:b: ", " :a:b", "a: :b", "a:b:", ":a:b", "a::b", "::", "a:b", "a") for (coordinate <- coordinates) { intercept[IllegalArgumentException] { - SparkSubmitUtils.extractMavenCoordinates(coordinate) + MavenCoordinate.extract(coordinate) } } } test("create repo resolvers") { - val settings = new IvySettings - val res1 = SparkSubmitUtils.createRepoResolvers(None, settings) + val res1 = Resolvers.chained("test-resolver1", Resolvers.default) // should have central and spark-packages by default assert(res1.getResolvers.size() === 4) - assert(res1.getResolvers.get(0).asInstanceOf[IBiblioResolver].getName === "local-m2-cache") - assert(res1.getResolvers.get(1).asInstanceOf[FileSystemResolver].getName === "local-ivy-cache") - assert(res1.getResolvers.get(2).asInstanceOf[IBiblioResolver].getName === "central") - assert(res1.getResolvers.get(3).asInstanceOf[IBiblioResolver].getName === "spark-packages") - - val repos = "a/1,b/2,c/3" - val resolver2 = SparkSubmitUtils.createRepoResolvers(Option(repos), settings) - assert(resolver2.getResolvers.size() === 7) - val expected = repos.split(",").map(r => s"$r/") - resolver2.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) => - if (i < 3) { - assert(resolver.getName === s"repo-${i + 1}") - assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i)) - } - } + assert(res1.getResolvers.get(0).asInstanceOf[FileSystemResolver].getName === "local-ivy") + assert(res1.getResolvers.get(1).asInstanceOf[IBiblioResolver].getName === "local-maven") + assert(res1.getResolvers.get(2).asInstanceOf[IBiblioResolver].getName === "spark-packages") + assert(res1.getResolvers.get(3).asInstanceOf[IBiblioResolver].getName === "central") } test("add dependencies works correctly") { - val md = SparkSubmitUtils.getModuleDescriptor - val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.10:0.1," + - "com.databricks:spark-avro_2.10:0.1") + val md = PackageUtil.parentModule + val artifacts = Seq( + MavenCoordinate.extract("com.databricks:spark-csv_2.10:0.1"), + MavenCoordinate.extract("com.databricks:spark-avro_2.10:0.1")) - SparkSubmitUtils.addDependenciesToIvy(md, artifacts, "default") + PackageUtil.addDependenciesToIvy(md, artifacts, "default") assert(md.getDependencies.length === 2) } test("excludes works correctly") { - val md = SparkSubmitUtils.getModuleDescriptor - val excludes = Seq("a:b", "c:d") + val md = PackageUtil.parentModule + val excludes = Seq("a:b:*", "c:d:*").map(MavenCoordinate.extract) excludes.foreach { e => - md.addExcludeRule(SparkSubmitUtils.createExclusion(e + ":*", new IvySettings, "default")) + md.addExcludeRule(PackageUtil.createExclusion(e, new IvySettings, "default")) } val rules = md.getAllExcludeRules assert(rules.length === 2) @@ -109,15 +99,12 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val rule2 = rules(1).getId.getModuleId assert(rule2.getOrganisation === "c") assert(rule2.getName === "d") - intercept[IllegalArgumentException] { - SparkSubmitUtils.createExclusion("e:f:g:h", new IvySettings, "default") - } } test("ivy path works correctly") { - val md = SparkSubmitUtils.getModuleDescriptor + val md = PackageUtil.parentModule val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar") - var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(tempIvyPath)) + var jPaths = PackageUtil.resolveDependencyPaths(artifacts, new File(tempIvyPath)).map(_.getAbsolutePath).mkString(",") for (i <- 0 until 3) { val index = jPaths.indexOf(tempIvyPath) assert(index >= 0) @@ -126,8 +113,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1") IvyTestUtils.withRepository(main, None, None) { repo => // end to end - val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, Option(repo), - Option(tempIvyPath), isTest = true) + val jarPath = PackageUtil.resolveMavenCoordinates(Seq(main.toString), Seq(repo), Some(tempIvyPath), useDefaultRepos = false).mkString(",") assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path") } } @@ -135,38 +121,26 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { test("search for artifact at local repositories") { val main = new MavenCoordinate("my.great.lib", "mylib", "0.1") val dep = "my.great.dep:mydep:0.5" + val dummyM2 = new File("dummy", ".m2" + File.separator + "repository") // Local M2 repository - IvyTestUtils.withRepository(main, Some(dep), Some(SparkSubmitUtils.m2Path)) { repo => - val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None, - isTest = true) + IvyTestUtils.withRepository(main, Some(dep), Some(dummyM2)) { repo => + val jarPath = PackageUtil.resolveMavenCoordinates(Seq(main.toString), Seq(repo), useDefaultRepos = false).mkString(",") assert(jarPath.indexOf("mylib") >= 0, "should find artifact") assert(jarPath.indexOf("mydep") >= 0, "should find dependency") } // Local Ivy Repository val settings = new IvySettings - val ivyLocal = new File(settings.getDefaultIvyUserDir, "local" + File.separator) + val ivyLocal = new File(settings.getDefaultIvyUserDir, "local") IvyTestUtils.withRepository(main, Some(dep), Some(ivyLocal), useIvyLayout = true) { repo => - val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None, - isTest = true) - assert(jarPath.indexOf("mylib") >= 0, "should find artifact") - assert(jarPath.indexOf("mydep") >= 0, "should find dependency") - } - // Local ivy repository with modified home - val dummyIvyLocal = new File(tempIvyPath, "local" + File.separator) - settings.setDefaultIvyUserDir(new File(tempIvyPath)) - IvyTestUtils.withRepository(main, Some(dep), Some(dummyIvyLocal), useIvyLayout = true, - ivySettings = settings) { repo => - val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, - Some(tempIvyPath), isTest = true) + val jarPath = PackageUtil.resolveMavenCoordinates(Seq(main.toString)).mkString(",") assert(jarPath.indexOf("mylib") >= 0, "should find artifact") - assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path") assert(jarPath.indexOf("mydep") >= 0, "should find dependency") } } test("dependency not found throws RuntimeException") { - intercept[RuntimeException] { - SparkSubmitUtils.resolveMavenCoordinates("a:b:c", None, None, isTest = true) + intercept[PackageNotFoundException] { + PackageUtil.resolveMavenCoordinates(Seq("a:b:c")) } } @@ -174,16 +148,15 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val components = Seq("catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_", "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_") - val coordinates = - components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") + - ",org.apache.spark:spark-core_fake:1.2.0" + val coordinates = components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0") ++ + Seq("org.apache.spark:spark-core_fake:1.2.0") - val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, isTest = true) - assert(path === "", "should return empty path") + val path = PackageUtil.resolveMavenCoordinates(coordinates) + assert(path.isEmpty, "should return empty path") + val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0") IvyTestUtils.withRepository(main, None, None) { repo => - val files = SparkSubmitUtils.resolveMavenCoordinates(coordinates + "," + main.toString, - Some(repo), None, isTest = true) + val files = PackageUtil.resolveMavenCoordinates(coordinates ++ Seq(main.toString), Seq(repo)).mkString(",") assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact") } } @@ -192,8 +165,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val main = new MavenCoordinate("my.great.lib", "mylib", "0.1") val dep = "my.great.dep:mydep:0.5" IvyTestUtils.withRepository(main, Some(dep), None) { repo => - val files = SparkSubmitUtils.resolveMavenCoordinates(main.toString, - Some(repo), None, Seq("my.great.dep:mydep"), isTest = true) + val files = PackageUtil.resolveMavenCoordinates( + Seq(main.toString), Seq(repo), None, Seq("my.great.dep:mydep"), false).mkString(",") assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact") assert(files.indexOf("my.great.dep") < 0, "Returned excluded artifact") } diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java index 6767cc5079..2d1150e6a6 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java @@ -51,6 +51,7 @@ class SparkSubmitOptionParser { protected final String MASTER = "--master"; protected final String NAME = "--name"; protected final String PACKAGES = "--packages"; + protected final String PROVIDED_PACKAGES = "--provided"; protected final String PACKAGES_EXCLUDE = "--exclude-packages"; protected final String PROPERTIES_FILE = "--properties-file"; protected final String PROXY_USER = "--proxy-user"; @@ -125,6 +126,7 @@ class SparkSubmitOptionParser { { SUPERVISE }, { USAGE_ERROR }, { VERBOSE, "-v" }, + { PROVIDED_PACKAGES }, { VERSION }, }; diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index f45264af34..4c6116d2d0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -110,10 +110,10 @@ private[hive] object IsolatedClientLoader extends Logging { val classpath = quietly { SparkSubmitUtils.resolveMavenCoordinates( - hiveArtifacts.mkString(","), - Some("http://www.datanucleus.org/downloads/maven2"), + hiveArtifacts, + Seq("http://www.datanucleus.org/downloads/maven2"), ivyPath, - exclusions = version.exclusions) + exclusions = version.exclusions).mkString(",") } val allFiles = classpath.split(",").map(new File(_)).toSet |