aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-09-13 22:31:21 -0700
committerJosh Rosen <joshrosen@apache.org>2014-09-13 22:31:21 -0700
commit4e3fbe8cdb6c6291e219195abb272f3c81f0ed63 (patch)
tree684e60e57003fa6441a384a93cd6b0e746445c95 /python/pyspark/tests.py
parent2aea0da84c58a179917311290083456dfa043db7 (diff)
downloadspark-4e3fbe8cdb6c6291e219195abb272f3c81f0ed63.tar.gz
spark-4e3fbe8cdb6c6291e219195abb272f3c81f0ed63.tar.bz2
spark-4e3fbe8cdb6c6291e219195abb272f3c81f0ed63.zip
[SPARK-3463] [PySpark] aggregate and show spilled bytes in Python
Aggregate the number of bytes spilled into disks during aggregation or sorting, show them in Web UI. ![spilled](https://cloud.githubusercontent.com/assets/40902/4209758/4b995562-386d-11e4-97c1-8e838ee1d4e3.png) This patch is blocked by SPARK-3465. (It includes a fix for that). Author: Davies Liu <davies.liu@gmail.com> Closes #2336 from davies/metrics and squashes the following commits: e37df38 [Davies Liu] remove outdated comments 1245eb7 [Davies Liu] remove the temporary fix ebd2f43 [Davies Liu] Merge branch 'master' into metrics 7e4ad04 [Davies Liu] Merge branch 'master' into metrics fbe9029 [Davies Liu] show spilled bytes in Python in web ui
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py15
1 files changed, 8 insertions, 7 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 747cd1767d..f3309a20fc 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -46,6 +46,7 @@ from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer,
CloudPickleSerializer
from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, ExternalSorter
from pyspark.sql import SQLContext, IntegerType
+from pyspark import shuffle
_have_scipy = False
_have_numpy = False
@@ -138,17 +139,17 @@ class TestSorter(unittest.TestCase):
random.shuffle(l)
sorter = ExternalSorter(1)
self.assertEquals(sorted(l), list(sorter.sorted(l)))
- self.assertGreater(sorter._spilled_bytes, 0)
- last = sorter._spilled_bytes
+ self.assertGreater(shuffle.DiskBytesSpilled, 0)
+ last = shuffle.DiskBytesSpilled
self.assertEquals(sorted(l, reverse=True), list(sorter.sorted(l, reverse=True)))
- self.assertGreater(sorter._spilled_bytes, last)
- last = sorter._spilled_bytes
+ self.assertGreater(shuffle.DiskBytesSpilled, last)
+ last = shuffle.DiskBytesSpilled
self.assertEquals(sorted(l, key=lambda x: -x), list(sorter.sorted(l, key=lambda x: -x)))
- self.assertGreater(sorter._spilled_bytes, last)
- last = sorter._spilled_bytes
+ self.assertGreater(shuffle.DiskBytesSpilled, last)
+ last = shuffle.DiskBytesSpilled
self.assertEquals(sorted(l, key=lambda x: -x, reverse=True),
list(sorter.sorted(l, key=lambda x: -x, reverse=True)))
- self.assertGreater(sorter._spilled_bytes, last)
+ self.assertGreater(shuffle.DiskBytesSpilled, last)
def test_external_sort_in_rdd(self):
conf = SparkConf().set("spark.python.worker.memory", "1m")