aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-09-12 18:42:50 -0700
committerJosh Rosen <joshrosen@apache.org>2014-09-12 18:42:50 -0700
commit71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7 (patch)
tree1cff9839428a177b129de830c3c175b1316a6626 /python/pyspark/tests.py
parent25311c2c545a60eb9dcf704814d4600987852155 (diff)
downloadspark-71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7.tar.gz
spark-71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7.tar.bz2
spark-71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7.zip
[SPARK-3094] [PySpark] compatitable with PyPy
After this patch, we can run PySpark in PyPy (testing with PyPy 2.3.1 in Mac 10.9), for example: ``` PYSPARK_PYTHON=pypy ./bin/spark-submit wordcount.py ``` The performance speed up will depend on work load (from 20% to 3000%). Here are some benchmarks: Job | CPython 2.7 | PyPy 2.3.1 | Speed up ------- | ------------ | ------------- | ------- Word Count | 41s | 15s | 2.7x Sort | 46s | 44s | 1.05x Stats | 174s | 3.6s | 48x Here is the code used for benchmark: ```python rdd = sc.textFile("text") def wordcount(): rdd.flatMap(lambda x:x.split('/'))\ .map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).collectAsMap() def sort(): rdd.sortBy(lambda x:x, 1).count() def stats(): sc.parallelize(range(1024), 20).flatMap(lambda x: xrange(5024)).stats() ``` Author: Davies Liu <davies.liu@gmail.com> Closes #2144 from davies/pypy and squashes the following commits: 9aed6c5 [Davies Liu] use protocol 2 in CloudPickle 4bc1f04 [Davies Liu] refactor b20ab3a [Davies Liu] pickle sys.stdout and stderr in portable way 3ca2351 [Davies Liu] Merge branch 'master' into pypy fae8b19 [Davies Liu] improve attrgetter, add tests 591f830 [Davies Liu] try to run tests with PyPy in run-tests c8d62ba [Davies Liu] cleanup f651fd0 [Davies Liu] fix tests using array with PyPy 1b98fb3 [Davies Liu] serialize itemgetter/attrgetter in portable ways 3c1dbfe [Davies Liu] Merge branch 'master' into pypy 42fb5fa [Davies Liu] Merge branch 'master' into pypy cb2d724 [Davies Liu] fix tests 9986692 [Davies Liu] Merge branch 'master' into pypy 25b4ca7 [Davies Liu] support PyPy
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py85
1 files changed, 76 insertions, 9 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index bb84ebe72c..2e7c2750a8 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -31,6 +31,7 @@ import tempfile
import time
import zipfile
import random
+from platform import python_implementation
if sys.version_info[:2] <= (2, 6):
import unittest2 as unittest
@@ -41,7 +42,8 @@ else:
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.files import SparkFiles
-from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, PickleSerializer
+from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, PickleSerializer, \
+ CloudPickleSerializer
from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, ExternalSorter
from pyspark.sql import SQLContext, IntegerType
@@ -168,15 +170,46 @@ class SerializationTestCase(unittest.TestCase):
p2 = loads(dumps(p1, 2))
self.assertEquals(p1, p2)
-
-# Regression test for SPARK-3415
-class CloudPickleTest(unittest.TestCase):
+ def test_itemgetter(self):
+ from operator import itemgetter
+ ser = CloudPickleSerializer()
+ d = range(10)
+ getter = itemgetter(1)
+ getter2 = ser.loads(ser.dumps(getter))
+ self.assertEqual(getter(d), getter2(d))
+
+ getter = itemgetter(0, 3)
+ getter2 = ser.loads(ser.dumps(getter))
+ self.assertEqual(getter(d), getter2(d))
+
+ def test_attrgetter(self):
+ from operator import attrgetter
+ ser = CloudPickleSerializer()
+
+ class C(object):
+ def __getattr__(self, item):
+ return item
+ d = C()
+ getter = attrgetter("a")
+ getter2 = ser.loads(ser.dumps(getter))
+ self.assertEqual(getter(d), getter2(d))
+ getter = attrgetter("a", "b")
+ getter2 = ser.loads(ser.dumps(getter))
+ self.assertEqual(getter(d), getter2(d))
+
+ d.e = C()
+ getter = attrgetter("e.a")
+ getter2 = ser.loads(ser.dumps(getter))
+ self.assertEqual(getter(d), getter2(d))
+ getter = attrgetter("e.a", "e.b")
+ getter2 = ser.loads(ser.dumps(getter))
+ self.assertEqual(getter(d), getter2(d))
+
+ # Regression test for SPARK-3415
def test_pickling_file_handles(self):
- from pyspark.cloudpickle import dumps
- from StringIO import StringIO
- from pickle import load
+ ser = CloudPickleSerializer()
out1 = sys.stderr
- out2 = load(StringIO(dumps(out1)))
+ out2 = ser.loads(ser.dumps(out1))
self.assertEquals(out1, out2)
@@ -861,9 +894,43 @@ class TestOutputFormat(PySparkTestCase):
conf=input_conf).collect())
self.assertEqual(old_dataset, dict_data)
- @unittest.skipIf(sys.version_info[:2] <= (2, 6), "Skipped on 2.6 until SPARK-2951 is fixed")
def test_newhadoop(self):
basepath = self.tempdir.name
+ data = [(1, ""),
+ (1, "a"),
+ (2, "bcdf")]
+ self.sc.parallelize(data).saveAsNewAPIHadoopFile(
+ basepath + "/newhadoop/",
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text")
+ result = sorted(self.sc.newAPIHadoopFile(
+ basepath + "/newhadoop/",
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text").collect())
+ self.assertEqual(result, data)
+
+ conf = {
+ "mapreduce.outputformat.class":
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ "mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class": "org.apache.hadoop.io.Text",
+ "mapred.output.dir": basepath + "/newdataset/"
+ }
+ self.sc.parallelize(data).saveAsNewAPIHadoopDataset(conf)
+ input_conf = {"mapred.input.dir": basepath + "/newdataset/"}
+ new_dataset = sorted(self.sc.newAPIHadoopRDD(
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text",
+ conf=input_conf).collect())
+ self.assertEqual(new_dataset, data)
+
+ @unittest.skipIf(sys.version_info[:2] <= (2, 6) or python_implementation() == "PyPy",
+ "Skipped on 2.6 and PyPy until SPARK-2951 is fixed")
+ def test_newhadoop_with_array(self):
+ basepath = self.tempdir.name
# use custom ArrayWritable types and converters to handle arrays
array_data = [(1, array('d')),
(1, array('d', [1.0, 2.0, 3.0])),