diff options
author | zsxwing <zsxwing@gmail.com> | 2015-06-17 13:59:39 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-06-17 13:59:39 -0700 |
commit | 0fc4b96f3e3bf81724ac133a6acc97c1b77271b4 (patch) | |
tree | 7d5e783967ac602ce979fe0f15a3d3613f44a4f8 | |
parent | 2837e067099921dd4ab6639ac5f6e89f789d4ff4 (diff) | |
download | spark-0fc4b96f3e3bf81724ac133a6acc97c1b77271b4.tar.gz spark-0fc4b96f3e3bf81724ac133a6acc97c1b77271b4.tar.bz2 spark-0fc4b96f3e3bf81724ac133a6acc97c1b77271b4.zip |
[SPARK-8373] [PYSPARK] Add emptyRDD to pyspark and fix the issue when calling sum on an empty RDD
This PR fixes the sum issue and also adds `emptyRDD` so that it's easy to create a test case.
Author: zsxwing <zsxwing@gmail.com>
Closes #6826 from zsxwing/python-emptyRDD and squashes the following commits:
b36993f [zsxwing] Update the return type to JavaRDD[T]
71df047 [zsxwing] Add emptyRDD to pyspark and fix the issue when calling sum on an empty RDD
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 5 | ||||
-rw-r--r-- | python/pyspark/context.py | 6 | ||||
-rw-r--r-- | python/pyspark/rdd.py | 2 | ||||
-rw-r--r-- | python/pyspark/tests.py | 8 |
4 files changed, 20 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 55a37f8c94..0103f6c6ab 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -425,6 +425,11 @@ private[spark] object PythonRDD extends Logging { iter.foreach(write) } + /** Create an RDD that has no partitions or elements. */ + def emptyRDD[T](sc: JavaSparkContext): JavaRDD[T] = { + sc.emptyRDD[T] + } + /** * Create an RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]], * key and value class. diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 44d90f1437..90b2fffbb9 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -324,6 +324,12 @@ class SparkContext(object): with SparkContext._lock: SparkContext._active_spark_context = None + def emptyRDD(self): + """ + Create an RDD that has no partitions or elements. + """ + return RDD(self._jsc.emptyRDD(), self, NoOpSerializer()) + def range(self, start, end=None, step=1, numSlices=None): """ Create a new RDD of int containing elements from `start` to `end` diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 98a8ff8606..20c0bc93f4 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -960,7 +960,7 @@ class RDD(object): >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 6.0 """ - return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) + return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) def count(self): """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index f9fb37f7fc..11b402e6df 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -458,6 +458,14 @@ class RDDTests(ReusedPySparkTestCase): self.assertEqual(id + 1, id2) self.assertEqual(id2, rdd2.id()) + def test_empty_rdd(self): + rdd = self.sc.emptyRDD() + self.assertTrue(rdd.isEmpty()) + + def test_sum(self): + self.assertEqual(0, self.sc.emptyRDD().sum()) + self.assertEqual(6, self.sc.parallelize([1, 2, 3]).sum()) + def test_save_as_textfile_with_unicode(self): # Regression test for SPARK-970 x = u"\u00A1Hola, mundo!" |