aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2014-11-03 23:56:14 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-11-03 23:56:14 -0800
commite4f42631a68b473ce706429915f3f08042af2119 (patch)
tree557ff754b9936addfb9628bfcba462802ff6ec1c /python/pyspark/tests.py
parentb671ce047d036b8923007902826038b01e836e8a (diff)
downloadspark-e4f42631a68b473ce706429915f3f08042af2119.tar.gz
spark-e4f42631a68b473ce706429915f3f08042af2119.tar.bz2
spark-e4f42631a68b473ce706429915f3f08042af2119.zip
[SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default.
This PR simplify serializer, always use batched serializer (AutoBatchedSerializer as default), even batch size is 1. Author: Davies Liu <davies@databricks.com> This patch had conflicts when merged, resolved by Committer: Josh Rosen <joshrosen@databricks.com> Closes #2920 from davies/fix_autobatch and squashes the following commits: e544ef9 [Davies Liu] revert unrelated change 6880b14 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 1d557fc [Davies Liu] fix tests 8180907 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 76abdce [Davies Liu] clean up 53fa60b [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch d7ac751 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 2cc2497 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch b4292ce [Davies Liu] fix bug in master d79744c [Davies Liu] recover hive tests be37ece [Davies Liu] refactor eb3938d [Davies Liu] refactor serializer in scala 8d77ef2 [Davies Liu] simplify serializer, use AutoBatchedSerializer by default.
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py66
1 files changed, 12 insertions, 54 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index e947b09468..7e61b017ef 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -242,7 +242,7 @@ class PySparkTestCase(unittest.TestCase):
def setUp(self):
self._old_sys_path = list(sys.path)
class_name = self.__class__.__name__
- self.sc = SparkContext('local[4]', class_name, batchSize=2)
+ self.sc = SparkContext('local[4]', class_name)
def tearDown(self):
self.sc.stop()
@@ -253,7 +253,7 @@ class ReusedPySparkTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.sc = SparkContext('local[4]', cls.__name__, batchSize=2)
+ cls.sc = SparkContext('local[4]', cls.__name__)
@classmethod
def tearDownClass(cls):
@@ -671,7 +671,7 @@ class ProfilerTests(PySparkTestCase):
self._old_sys_path = list(sys.path)
class_name = self.__class__.__name__
conf = SparkConf().set("spark.python.profile", "true")
- self.sc = SparkContext('local[4]', class_name, batchSize=2, conf=conf)
+ self.sc = SparkContext('local[4]', class_name, conf=conf)
def test_profiler(self):
@@ -1012,16 +1012,19 @@ class InputFormatTests(ReusedPySparkTestCase):
clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
"org.apache.hadoop.io.Text",
"org.apache.spark.api.python.TestWritable").collect())
- ec = (u'1',
- {u'__class__': u'org.apache.spark.api.python.TestWritable',
- u'double': 54.0, u'int': 123, u'str': u'test1'})
- self.assertEqual(clazz[0], ec)
+ cname = u'org.apache.spark.api.python.TestWritable'
+ ec = [(u'1', {u'__class__': cname, u'double': 1.0, u'int': 1, u'str': u'test1'}),
+ (u'2', {u'__class__': cname, u'double': 2.3, u'int': 2, u'str': u'test2'}),
+ (u'3', {u'__class__': cname, u'double': 3.1, u'int': 3, u'str': u'test3'}),
+ (u'4', {u'__class__': cname, u'double': 4.2, u'int': 4, u'str': u'test4'}),
+ (u'5', {u'__class__': cname, u'double': 5.5, u'int': 5, u'str': u'test56'})]
+ self.assertEqual(clazz, ec)
unbatched_clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
"org.apache.hadoop.io.Text",
"org.apache.spark.api.python.TestWritable",
- batchSize=1).collect())
- self.assertEqual(unbatched_clazz[0], ec)
+ ).collect())
+ self.assertEqual(unbatched_clazz, ec)
def test_oldhadoop(self):
basepath = self.tempdir.name
@@ -1341,51 +1344,6 @@ class OutputFormatTests(ReusedPySparkTestCase):
result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect())
self.assertEqual(result5, data)
- def test_unbatched_save_and_read(self):
- basepath = self.tempdir.name
- ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
- self.sc.parallelize(ei, len(ei)).saveAsSequenceFile(
- basepath + "/unbatched/")
-
- unbatched_sequence = sorted(self.sc.sequenceFile(
- basepath + "/unbatched/",
- batchSize=1).collect())
- self.assertEqual(unbatched_sequence, ei)
-
- unbatched_hadoopFile = sorted(self.sc.hadoopFile(
- basepath + "/unbatched/",
- "org.apache.hadoop.mapred.SequenceFileInputFormat",
- "org.apache.hadoop.io.IntWritable",
- "org.apache.hadoop.io.Text",
- batchSize=1).collect())
- self.assertEqual(unbatched_hadoopFile, ei)
-
- unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(
- basepath + "/unbatched/",
- "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
- "org.apache.hadoop.io.IntWritable",
- "org.apache.hadoop.io.Text",
- batchSize=1).collect())
- self.assertEqual(unbatched_newAPIHadoopFile, ei)
-
- oldconf = {"mapred.input.dir": basepath + "/unbatched/"}
- unbatched_hadoopRDD = sorted(self.sc.hadoopRDD(
- "org.apache.hadoop.mapred.SequenceFileInputFormat",
- "org.apache.hadoop.io.IntWritable",
- "org.apache.hadoop.io.Text",
- conf=oldconf,
- batchSize=1).collect())
- self.assertEqual(unbatched_hadoopRDD, ei)
-
- newconf = {"mapred.input.dir": basepath + "/unbatched/"}
- unbatched_newAPIHadoopRDD = sorted(self.sc.newAPIHadoopRDD(
- "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
- "org.apache.hadoop.io.IntWritable",
- "org.apache.hadoop.io.Text",
- conf=newconf,
- batchSize=1).collect())
- self.assertEqual(unbatched_newAPIHadoopRDD, ei)
-
def test_malformed_RDD(self):
basepath = self.tempdir.name
# non-batch-serialized RDD[[(K, V)]] should be rejected