aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2015-02-17 17:15:43 -0800
committerPatrick Wendell <patrick@databricks.com>2015-02-17 17:23:22 -0800
commitae6cfb3acdbc2721d25793698a4a440f0519dbec (patch)
tree4dba3eaff24a4d042ac6e9e0a3e1b8c5c6108f14 /python/pyspark
parentc3d2b90bde2e11823909605d518167548df66bd8 (diff)
downloadspark-ae6cfb3acdbc2721d25793698a4a440f0519dbec.tar.gz
spark-ae6cfb3acdbc2721d25793698a4a440f0519dbec.tar.bz2
spark-ae6cfb3acdbc2721d25793698a4a440f0519dbec.zip
[SPARK-5811] Added documentation for maven coordinates and added Spark Packages support
Documentation for maven coordinates + Spark Package support. Added pyspark tests for `--packages` Author: Burak Yavuz <brkyvz@gmail.com> Author: Davies Liu <davies@databricks.com> Closes #4662 from brkyvz/SPARK-5811 and squashes the following commits: 56ccccd [Burak Yavuz] fixed broken test 64cb8ee [Burak Yavuz] passed pep8 on local c07b81e [Burak Yavuz] fixed pep8 a8bd6b7 [Burak Yavuz] submit PR 4ef4046 [Burak Yavuz] ready for PR 8fb02e5 [Burak Yavuz] merged master 25c9b9f [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into python-jar 560d13b [Burak Yavuz] before PR 17d3f76 [Davies Liu] support .jar as python package a3eb717 [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into SPARK-5811 c60156d [Burak Yavuz] [SPARK-5811] Added documentation for maven coordinates
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/tests.py69
1 files changed, 65 insertions, 4 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index f64e25c607..52e82091c9 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1440,31 +1440,59 @@ class SparkSubmitTests(unittest.TestCase):
def tearDown(self):
shutil.rmtree(self.programDir)
- def createTempFile(self, name, content):
+ def createTempFile(self, name, content, dir=None):
"""
Create a temp file with the given name and content and return its path.
Strips leading spaces from content up to the first '|' in each line.
"""
pattern = re.compile(r'^ *\|', re.MULTILINE)
content = re.sub(pattern, '', content.strip())
- path = os.path.join(self.programDir, name)
+ if dir is None:
+ path = os.path.join(self.programDir, name)
+ else:
+ os.makedirs(os.path.join(self.programDir, dir))
+ path = os.path.join(self.programDir, dir, name)
with open(path, "w") as f:
f.write(content)
return path
- def createFileInZip(self, name, content):
+ def createFileInZip(self, name, content, ext=".zip", dir=None, zip_name=None):
"""
Create a zip archive containing a file with the given content and return its path.
Strips leading spaces from content up to the first '|' in each line.
"""
pattern = re.compile(r'^ *\|', re.MULTILINE)
content = re.sub(pattern, '', content.strip())
- path = os.path.join(self.programDir, name + ".zip")
+ if dir is None:
+ path = os.path.join(self.programDir, name + ext)
+ else:
+ path = os.path.join(self.programDir, dir, zip_name + ext)
zip = zipfile.ZipFile(path, 'w')
zip.writestr(name, content)
zip.close()
return path
+ def create_spark_package(self, artifact_name):
+ group_id, artifact_id, version = artifact_name.split(":")
+ self.createTempFile("%s-%s.pom" % (artifact_id, version), ("""
+ |<?xml version="1.0" encoding="UTF-8"?>
+ |<project xmlns="http://maven.apache.org/POM/4.0.0"
+ | xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ | xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ | http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ | <modelVersion>4.0.0</modelVersion>
+ | <groupId>%s</groupId>
+ | <artifactId>%s</artifactId>
+ | <version>%s</version>
+ |</project>
+ """ % (group_id, artifact_id, version)).lstrip(),
+ os.path.join(group_id, artifact_id, version))
+ self.createFileInZip("%s.py" % artifact_id, """
+ |def myfunc(x):
+ | return x + 1
+ """, ".jar", os.path.join(group_id, artifact_id, version),
+ "%s-%s" % (artifact_id, version))
+
def test_single_script(self):
"""Submit and test a single script file"""
script = self.createTempFile("test.py", """
@@ -1533,6 +1561,39 @@ class SparkSubmitTests(unittest.TestCase):
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out)
+ def test_package_dependency(self):
+ """Submit and test a script with a dependency on a Spark Package"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |from mylib import myfunc
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(myfunc).collect()
+ """)
+ self.create_spark_package("a:mylib:0.1")
+ proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories",
+ "file:" + self.programDir, script], stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[2, 3, 4]", out)
+
+ def test_package_dependency_on_cluster(self):
+ """Submit and test a script with a dependency on a Spark Package on a cluster"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |from mylib import myfunc
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(myfunc).collect()
+ """)
+ self.create_spark_package("a:mylib:0.1")
+ proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories",
+ "file:" + self.programDir, "--master",
+ "local-cluster[1,1,512]", script], stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[2, 3, 4]", out)
+
def test_single_script_on_cluster(self):
"""Submit and test a single script on a cluster"""
script = self.createTempFile("test.py", """