diff options
author | Davies Liu <davies.liu@gmail.com> | 2014-09-12 18:42:50 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-09-12 18:42:50 -0700 |
commit | 71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7 (patch) | |
tree | 1cff9839428a177b129de830c3c175b1316a6626 /python/pyspark/tests.py | |
parent | 25311c2c545a60eb9dcf704814d4600987852155 (diff) | |
download | spark-71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7.tar.gz spark-71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7.tar.bz2 spark-71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7.zip |
[SPARK-3094] [PySpark] compatitable with PyPy
After this patch, we can run PySpark in PyPy (testing with PyPy 2.3.1 in Mac 10.9), for example:
```
PYSPARK_PYTHON=pypy ./bin/spark-submit wordcount.py
```
The performance speed up will depend on work load (from 20% to 3000%). Here are some benchmarks:
Job | CPython 2.7 | PyPy 2.3.1 | Speed up
------- | ------------ | ------------- | -------
Word Count | 41s | 15s | 2.7x
Sort | 46s | 44s | 1.05x
Stats | 174s | 3.6s | 48x
Here is the code used for benchmark:
```python
rdd = sc.textFile("text")
def wordcount():
rdd.flatMap(lambda x:x.split('/'))\
.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).collectAsMap()
def sort():
rdd.sortBy(lambda x:x, 1).count()
def stats():
sc.parallelize(range(1024), 20).flatMap(lambda x: xrange(5024)).stats()
```
Author: Davies Liu <davies.liu@gmail.com>
Closes #2144 from davies/pypy and squashes the following commits:
9aed6c5 [Davies Liu] use protocol 2 in CloudPickle
4bc1f04 [Davies Liu] refactor
b20ab3a [Davies Liu] pickle sys.stdout and stderr in portable way
3ca2351 [Davies Liu] Merge branch 'master' into pypy
fae8b19 [Davies Liu] improve attrgetter, add tests
591f830 [Davies Liu] try to run tests with PyPy in run-tests
c8d62ba [Davies Liu] cleanup
f651fd0 [Davies Liu] fix tests using array with PyPy
1b98fb3 [Davies Liu] serialize itemgetter/attrgetter in portable ways
3c1dbfe [Davies Liu] Merge branch 'master' into pypy
42fb5fa [Davies Liu] Merge branch 'master' into pypy
cb2d724 [Davies Liu] fix tests
9986692 [Davies Liu] Merge branch 'master' into pypy
25b4ca7 [Davies Liu] support PyPy
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r-- | python/pyspark/tests.py | 85 |
1 files changed, 76 insertions, 9 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index bb84ebe72c..2e7c2750a8 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -31,6 +31,7 @@ import tempfile import time import zipfile import random +from platform import python_implementation if sys.version_info[:2] <= (2, 6): import unittest2 as unittest @@ -41,7 +42,8 @@ else: from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.files import SparkFiles -from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, PickleSerializer +from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, PickleSerializer, \ + CloudPickleSerializer from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, ExternalSorter from pyspark.sql import SQLContext, IntegerType @@ -168,15 +170,46 @@ class SerializationTestCase(unittest.TestCase): p2 = loads(dumps(p1, 2)) self.assertEquals(p1, p2) - -# Regression test for SPARK-3415 -class CloudPickleTest(unittest.TestCase): + def test_itemgetter(self): + from operator import itemgetter + ser = CloudPickleSerializer() + d = range(10) + getter = itemgetter(1) + getter2 = ser.loads(ser.dumps(getter)) + self.assertEqual(getter(d), getter2(d)) + + getter = itemgetter(0, 3) + getter2 = ser.loads(ser.dumps(getter)) + self.assertEqual(getter(d), getter2(d)) + + def test_attrgetter(self): + from operator import attrgetter + ser = CloudPickleSerializer() + + class C(object): + def __getattr__(self, item): + return item + d = C() + getter = attrgetter("a") + getter2 = ser.loads(ser.dumps(getter)) + self.assertEqual(getter(d), getter2(d)) + getter = attrgetter("a", "b") + getter2 = ser.loads(ser.dumps(getter)) + self.assertEqual(getter(d), getter2(d)) + + d.e = C() + getter = attrgetter("e.a") + getter2 = ser.loads(ser.dumps(getter)) + self.assertEqual(getter(d), getter2(d)) + getter = attrgetter("e.a", "e.b") + getter2 = ser.loads(ser.dumps(getter)) + self.assertEqual(getter(d), getter2(d)) + + # Regression test for SPARK-3415 def test_pickling_file_handles(self): - from pyspark.cloudpickle import dumps - from StringIO import StringIO - from pickle import load + ser = CloudPickleSerializer() out1 = sys.stderr - out2 = load(StringIO(dumps(out1))) + out2 = ser.loads(ser.dumps(out1)) self.assertEquals(out1, out2) @@ -861,9 +894,43 @@ class TestOutputFormat(PySparkTestCase): conf=input_conf).collect()) self.assertEqual(old_dataset, dict_data) - @unittest.skipIf(sys.version_info[:2] <= (2, 6), "Skipped on 2.6 until SPARK-2951 is fixed") def test_newhadoop(self): basepath = self.tempdir.name + data = [(1, ""), + (1, "a"), + (2, "bcdf")] + self.sc.parallelize(data).saveAsNewAPIHadoopFile( + basepath + "/newhadoop/", + "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text") + result = sorted(self.sc.newAPIHadoopFile( + basepath + "/newhadoop/", + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text").collect()) + self.assertEqual(result, data) + + conf = { + "mapreduce.outputformat.class": + "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat", + "mapred.output.key.class": "org.apache.hadoop.io.IntWritable", + "mapred.output.value.class": "org.apache.hadoop.io.Text", + "mapred.output.dir": basepath + "/newdataset/" + } + self.sc.parallelize(data).saveAsNewAPIHadoopDataset(conf) + input_conf = {"mapred.input.dir": basepath + "/newdataset/"} + new_dataset = sorted(self.sc.newAPIHadoopRDD( + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text", + conf=input_conf).collect()) + self.assertEqual(new_dataset, data) + + @unittest.skipIf(sys.version_info[:2] <= (2, 6) or python_implementation() == "PyPy", + "Skipped on 2.6 and PyPy until SPARK-2951 is fixed") + def test_newhadoop_with_array(self): + basepath = self.tempdir.name # use custom ArrayWritable types and converters to handle arrays array_data = [(1, array('d')), (1, array('d', [1.0, 2.0, 3.0])), |