aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala52
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala13
-rw-r--r--docs/programming-guide.md19
-rw-r--r--docs/submitting-applications.md5
-rw-r--r--python/pyspark/tests.py69
5 files changed, 131 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 012a89a31b..4c4110812e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -252,6 +252,26 @@ object SparkSubmit {
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
+ // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
+ // too for packages that include Python code
+ val resolvedMavenCoordinates =
+ SparkSubmitUtils.resolveMavenCoordinates(
+ args.packages, Option(args.repositories), Option(args.ivyRepoPath))
+ if (!resolvedMavenCoordinates.trim.isEmpty) {
+ if (args.jars == null || args.jars.trim.isEmpty) {
+ args.jars = resolvedMavenCoordinates
+ } else {
+ args.jars += s",$resolvedMavenCoordinates"
+ }
+ if (args.isPython) {
+ if (args.pyFiles == null || args.pyFiles.trim.isEmpty) {
+ args.pyFiles = resolvedMavenCoordinates
+ } else {
+ args.pyFiles += s",$resolvedMavenCoordinates"
+ }
+ }
+ }
+
// Require all python files to be local, so we can add them to the PYTHONPATH
// In YARN cluster mode, python files are distributed as regular files, which can be non-local
if (args.isPython && !isYarnCluster) {
@@ -307,18 +327,6 @@ object SparkSubmit {
// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"
- // Resolve maven dependencies if there are any and add classpath to jars
- val resolvedMavenCoordinates =
- SparkSubmitUtils.resolveMavenCoordinates(
- args.packages, Option(args.repositories), Option(args.ivyRepoPath))
- if (!resolvedMavenCoordinates.trim.isEmpty) {
- if (args.jars == null || args.jars.trim.isEmpty) {
- args.jars = resolvedMavenCoordinates
- } else {
- args.jars += s",$resolvedMavenCoordinates"
- }
- }
-
// A list of rules to map each argument to system properties or command-line options in
// each deploy mode; we iterate through these below
val options = List[OptionAssigner](
@@ -646,13 +654,15 @@ private[spark] object SparkSubmitUtils {
private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String)
/**
- * Extracts maven coordinates from a comma-delimited string
+ * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
+ * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. The latter provides
+ * simplicity for Spark Package users.
* @param coordinates Comma-delimited string of maven coordinates
* @return Sequence of Maven coordinates
*/
private[spark] def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
coordinates.split(",").map { p =>
- val splits = p.split(":")
+ val splits = p.replace("/", ":").split(":")
require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
s"'groupId:artifactId:version'. The coordinate provided is: $p")
require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
@@ -682,6 +692,13 @@ private[spark] object SparkSubmitUtils {
br.setName("central")
cr.add(br)
+ val sp: IBiblioResolver = new IBiblioResolver
+ sp.setM2compatible(true)
+ sp.setUsepoms(true)
+ sp.setRoot("http://dl.bintray.com/spark-packages/maven")
+ sp.setName("spark-packages")
+ cr.add(sp)
+
val repositoryList = remoteRepos.getOrElse("")
// add any other remote repositories other than maven central
if (repositoryList.trim.nonEmpty) {
@@ -794,14 +811,19 @@ private[spark] object SparkSubmitUtils {
val md = getModuleDescriptor
md.setDefaultConf(ivyConfName)
- // Add an exclusion rule for Spark
+ // Add an exclusion rule for Spark and Scala Library
val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*")
val sparkDependencyExcludeRule =
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
sparkDependencyExcludeRule.addConfiguration(ivyConfName)
+ val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
+ val scalaDependencyExcludeRule =
+ new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
+ scalaDependencyExcludeRule.addConfiguration(ivyConfName)
// Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies
md.addExcludeRule(sparkDependencyExcludeRule)
+ md.addExcludeRule(scalaDependencyExcludeRule)
addDependenciesToIvy(md, artifacts, ivyConfName)
// resolve dependencies
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
index 5366535001..ad62b35f62 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
@@ -57,20 +57,23 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll {
test("create repo resolvers") {
val resolver1 = SparkSubmitUtils.createRepoResolvers(None)
- // should have central by default
- assert(resolver1.getResolvers.size() === 1)
+ // should have central and spark-packages by default
+ assert(resolver1.getResolvers.size() === 2)
assert(resolver1.getResolvers.get(0).asInstanceOf[IBiblioResolver].getName === "central")
+ assert(resolver1.getResolvers.get(1).asInstanceOf[IBiblioResolver].getName === "spark-packages")
val repos = "a/1,b/2,c/3"
val resolver2 = SparkSubmitUtils.createRepoResolvers(Option(repos))
- assert(resolver2.getResolvers.size() === 4)
+ assert(resolver2.getResolvers.size() === 5)
val expected = repos.split(",").map(r => s"$r/")
resolver2.getResolvers.toArray.zipWithIndex.foreach { case (resolver: IBiblioResolver, i) =>
if (i == 0) {
assert(resolver.getName === "central")
+ } else if (i == 1) {
+ assert(resolver.getName === "spark-packages")
} else {
- assert(resolver.getName === s"repo-$i")
- assert(resolver.getRoot === expected(i - 1))
+ assert(resolver.getName === s"repo-${i - 1}")
+ assert(resolver.getRoot === expected(i - 2))
}
}
}
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 118701549a..4e4af76316 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -173,8 +173,11 @@ in-process.
In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the
variable called `sc`. Making your own SparkContext will not work. You can set which master the
context connects to using the `--master` argument, and you can add JARs to the classpath
-by passing a comma-separated list to the `--jars` argument.
-For example, to run `bin/spark-shell` on exactly four cores, use:
+by passing a comma-separated list to the `--jars` argument. You can also add dependencies
+(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates
+to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType)
+can be passed to the `--repositories` argument. For example, to run `bin/spark-shell` on exactly
+four cores, use:
{% highlight bash %}
$ ./bin/spark-shell --master local[4]
@@ -186,6 +189,12 @@ Or, to also add `code.jar` to its classpath, use:
$ ./bin/spark-shell --master local[4] --jars code.jar
{% endhighlight %}
+To include a dependency using maven coordinates:
+
+{% highlight bash %}
+$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
+{% endhighlight %}
+
For a complete list of options, run `spark-shell --help`. Behind the scenes,
`spark-shell` invokes the more general [`spark-submit` script](submitting-applications.html).
@@ -196,7 +205,11 @@ For a complete list of options, run `spark-shell --help`. Behind the scenes,
In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the
variable called `sc`. Making your own SparkContext will not work. You can set which master the
context connects to using the `--master` argument, and you can add Python .zip, .egg or .py files
-to the runtime path by passing a comma-separated list to `--py-files`.
+to the runtime path by passing a comma-separated list to `--py-files`. You can also add dependencies
+(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates
+to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType)
+can be passed to the `--repositories` argument. Any python dependencies a Spark Package has (listed in
+the requirements.txt of that package) must be manually installed using pip when necessary.
For example, to run `bin/pyspark` on exactly four cores, use:
{% highlight bash %}
diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md
index 14a87f8436..57b074778f 100644
--- a/docs/submitting-applications.md
+++ b/docs/submitting-applications.md
@@ -174,6 +174,11 @@ This can use up a significant amount of space over time and will need to be clea
is handled automatically, and with Spark standalone, automatic cleanup can be configured with the
`spark.worker.cleanup.appDataTtl` property.
+Users may also include any other dependencies by supplying a comma-delimited list of maven coordinates
+with `--packages`. All transitive dependencies will be handled when using this command. Additional
+repositories (or resolvers in SBT) can be added in a comma-delimited fashion with the flag `--repositories`.
+These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to include Spark Packages.
+
For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries
to executors.
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index f64e25c607..52e82091c9 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1440,31 +1440,59 @@ class SparkSubmitTests(unittest.TestCase):
def tearDown(self):
shutil.rmtree(self.programDir)
- def createTempFile(self, name, content):
+ def createTempFile(self, name, content, dir=None):
"""
Create a temp file with the given name and content and return its path.
Strips leading spaces from content up to the first '|' in each line.
"""
pattern = re.compile(r'^ *\|', re.MULTILINE)
content = re.sub(pattern, '', content.strip())
- path = os.path.join(self.programDir, name)
+ if dir is None:
+ path = os.path.join(self.programDir, name)
+ else:
+ os.makedirs(os.path.join(self.programDir, dir))
+ path = os.path.join(self.programDir, dir, name)
with open(path, "w") as f:
f.write(content)
return path
- def createFileInZip(self, name, content):
+ def createFileInZip(self, name, content, ext=".zip", dir=None, zip_name=None):
"""
Create a zip archive containing a file with the given content and return its path.
Strips leading spaces from content up to the first '|' in each line.
"""
pattern = re.compile(r'^ *\|', re.MULTILINE)
content = re.sub(pattern, '', content.strip())
- path = os.path.join(self.programDir, name + ".zip")
+ if dir is None:
+ path = os.path.join(self.programDir, name + ext)
+ else:
+ path = os.path.join(self.programDir, dir, zip_name + ext)
zip = zipfile.ZipFile(path, 'w')
zip.writestr(name, content)
zip.close()
return path
+ def create_spark_package(self, artifact_name):
+ group_id, artifact_id, version = artifact_name.split(":")
+ self.createTempFile("%s-%s.pom" % (artifact_id, version), ("""
+ |<?xml version="1.0" encoding="UTF-8"?>
+ |<project xmlns="http://maven.apache.org/POM/4.0.0"
+ | xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ | xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ | http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ | <modelVersion>4.0.0</modelVersion>
+ | <groupId>%s</groupId>
+ | <artifactId>%s</artifactId>
+ | <version>%s</version>
+ |</project>
+ """ % (group_id, artifact_id, version)).lstrip(),
+ os.path.join(group_id, artifact_id, version))
+ self.createFileInZip("%s.py" % artifact_id, """
+ |def myfunc(x):
+ | return x + 1
+ """, ".jar", os.path.join(group_id, artifact_id, version),
+ "%s-%s" % (artifact_id, version))
+
def test_single_script(self):
"""Submit and test a single script file"""
script = self.createTempFile("test.py", """
@@ -1533,6 +1561,39 @@ class SparkSubmitTests(unittest.TestCase):
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out)
+ def test_package_dependency(self):
+ """Submit and test a script with a dependency on a Spark Package"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |from mylib import myfunc
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(myfunc).collect()
+ """)
+ self.create_spark_package("a:mylib:0.1")
+ proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories",
+ "file:" + self.programDir, script], stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[2, 3, 4]", out)
+
+ def test_package_dependency_on_cluster(self):
+ """Submit and test a script with a dependency on a Spark Package on a cluster"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |from mylib import myfunc
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(myfunc).collect()
+ """)
+ self.create_spark_package("a:mylib:0.1")
+ proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories",
+ "file:" + self.programDir, "--master",
+ "local-cluster[1,1,512]", script], stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[2, 3, 4]", out)
+
def test_single_script_on_cluster(self):
"""Submit and test a single script on a cluster"""
script = self.createTempFile("test.py", """