diff options
author | Davies Liu <davies.liu@gmail.com> | 2014-09-13 22:31:21 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-09-13 22:31:21 -0700 |
commit | 4e3fbe8cdb6c6291e219195abb272f3c81f0ed63 (patch) | |
tree | 684e60e57003fa6441a384a93cd6b0e746445c95 /python/pyspark/tests.py | |
parent | 2aea0da84c58a179917311290083456dfa043db7 (diff) | |
download | spark-4e3fbe8cdb6c6291e219195abb272f3c81f0ed63.tar.gz spark-4e3fbe8cdb6c6291e219195abb272f3c81f0ed63.tar.bz2 spark-4e3fbe8cdb6c6291e219195abb272f3c81f0ed63.zip |
[SPARK-3463] [PySpark] aggregate and show spilled bytes in Python
Aggregate the number of bytes spilled into disks during aggregation or sorting, show them in Web UI.
![spilled](https://cloud.githubusercontent.com/assets/40902/4209758/4b995562-386d-11e4-97c1-8e838ee1d4e3.png)
This patch is blocked by SPARK-3465. (It includes a fix for that).
Author: Davies Liu <davies.liu@gmail.com>
Closes #2336 from davies/metrics and squashes the following commits:
e37df38 [Davies Liu] remove outdated comments
1245eb7 [Davies Liu] remove the temporary fix
ebd2f43 [Davies Liu] Merge branch 'master' into metrics
7e4ad04 [Davies Liu] Merge branch 'master' into metrics
fbe9029 [Davies Liu] show spilled bytes in Python in web ui
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r-- | python/pyspark/tests.py | 15 |
1 files changed, 8 insertions, 7 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 747cd1767d..f3309a20fc 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -46,6 +46,7 @@ from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, CloudPickleSerializer from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, ExternalSorter from pyspark.sql import SQLContext, IntegerType +from pyspark import shuffle _have_scipy = False _have_numpy = False @@ -138,17 +139,17 @@ class TestSorter(unittest.TestCase): random.shuffle(l) sorter = ExternalSorter(1) self.assertEquals(sorted(l), list(sorter.sorted(l))) - self.assertGreater(sorter._spilled_bytes, 0) - last = sorter._spilled_bytes + self.assertGreater(shuffle.DiskBytesSpilled, 0) + last = shuffle.DiskBytesSpilled self.assertEquals(sorted(l, reverse=True), list(sorter.sorted(l, reverse=True))) - self.assertGreater(sorter._spilled_bytes, last) - last = sorter._spilled_bytes + self.assertGreater(shuffle.DiskBytesSpilled, last) + last = shuffle.DiskBytesSpilled self.assertEquals(sorted(l, key=lambda x: -x), list(sorter.sorted(l, key=lambda x: -x))) - self.assertGreater(sorter._spilled_bytes, last) - last = sorter._spilled_bytes + self.assertGreater(shuffle.DiskBytesSpilled, last) + last = shuffle.DiskBytesSpilled self.assertEquals(sorted(l, key=lambda x: -x, reverse=True), list(sorter.sorted(l, key=lambda x: -x, reverse=True))) - self.assertGreater(sorter._spilled_bytes, last) + self.assertGreater(shuffle.DiskBytesSpilled, last) def test_external_sort_in_rdd(self): conf = SparkConf().set("spark.python.worker.memory", "1m") |