aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/shuffle.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-09-13 22:31:21 -0700
committerJosh Rosen <joshrosen@apache.org>2014-09-13 22:31:21 -0700
commit4e3fbe8cdb6c6291e219195abb272f3c81f0ed63 (patch)
tree684e60e57003fa6441a384a93cd6b0e746445c95 /python/pyspark/shuffle.py
parent2aea0da84c58a179917311290083456dfa043db7 (diff)
downloadspark-4e3fbe8cdb6c6291e219195abb272f3c81f0ed63.tar.gz
spark-4e3fbe8cdb6c6291e219195abb272f3c81f0ed63.tar.bz2
spark-4e3fbe8cdb6c6291e219195abb272f3c81f0ed63.zip
[SPARK-3463] [PySpark] aggregate and show spilled bytes in Python
Aggregate the number of bytes spilled into disks during aggregation or sorting, show them in Web UI. ![spilled](https://cloud.githubusercontent.com/assets/40902/4209758/4b995562-386d-11e4-97c1-8e838ee1d4e3.png) This patch is blocked by SPARK-3465. (It includes a fix for that). Author: Davies Liu <davies.liu@gmail.com> Closes #2336 from davies/metrics and squashes the following commits: e37df38 [Davies Liu] remove outdated comments 1245eb7 [Davies Liu] remove the temporary fix ebd2f43 [Davies Liu] Merge branch 'master' into metrics 7e4ad04 [Davies Liu] Merge branch 'master' into metrics fbe9029 [Davies Liu] show spilled bytes in Python in web ui
Diffstat (limited to 'python/pyspark/shuffle.py')
-rw-r--r--python/pyspark/shuffle.py19
1 files changed, 16 insertions, 3 deletions
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index 49829f5280..ce597cbe91 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -68,6 +68,11 @@ def _get_local_dirs(sub):
return [os.path.join(d, "python", str(os.getpid()), sub) for d in dirs]
+# global stats
+MemoryBytesSpilled = 0L
+DiskBytesSpilled = 0L
+
+
class Aggregator(object):
"""
@@ -313,10 +318,12 @@ class ExternalMerger(Merger):
It will dump the data in batch for better performance.
"""
+ global MemoryBytesSpilled, DiskBytesSpilled
path = self._get_spill_dir(self.spills)
if not os.path.exists(path):
os.makedirs(path)
+ used_memory = get_used_memory()
if not self.pdata:
# The data has not been partitioned, it will iterator the
# dataset once, write them into different files, has no
@@ -334,6 +341,7 @@ class ExternalMerger(Merger):
self.serializer.dump_stream([(k, v)], streams[h])
for s in streams:
+ DiskBytesSpilled += s.tell()
s.close()
self.data.clear()
@@ -346,9 +354,11 @@ class ExternalMerger(Merger):
# dump items in batch
self.serializer.dump_stream(self.pdata[i].iteritems(), f)
self.pdata[i].clear()
+ DiskBytesSpilled += os.path.getsize(p)
self.spills += 1
gc.collect() # release the memory as much as possible
+ MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
def iteritems(self):
""" Return all merged items as iterator """
@@ -462,7 +472,6 @@ class ExternalSorter(object):
self.memory_limit = memory_limit
self.local_dirs = _get_local_dirs("sort")
self.serializer = serializer or BatchedSerializer(PickleSerializer(), 1024)
- self._spilled_bytes = 0
def _get_path(self, n):
""" Choose one directory for spill by number n """
@@ -476,6 +485,7 @@ class ExternalSorter(object):
Sort the elements in iterator, do external sort when the memory
goes above the limit.
"""
+ global MemoryBytesSpilled, DiskBytesSpilled
batch = 10
chunks, current_chunk = [], []
iterator = iter(iterator)
@@ -486,15 +496,18 @@ class ExternalSorter(object):
if len(chunk) < batch:
break
- if get_used_memory() > self.memory_limit:
+ used_memory = get_used_memory()
+ if used_memory > self.memory_limit:
# sort them inplace will save memory
current_chunk.sort(key=key, reverse=reverse)
path = self._get_path(len(chunks))
with open(path, 'w') as f:
self.serializer.dump_stream(current_chunk, f)
- self._spilled_bytes += os.path.getsize(path)
chunks.append(self.serializer.load_stream(open(path)))
current_chunk = []
+ gc.collect()
+ MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+ DiskBytesSpilled += os.path.getsize(path)
elif not chunks:
batch = min(batch * 2, 10000)