aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/shuffle.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/shuffle.py')
-rw-r--r--python/pyspark/shuffle.py8
1 files changed, 4 insertions, 4 deletions
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index 1d0b16cade..81c420ce16 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -362,7 +362,7 @@ class ExternalMerger(Merger):
self.spills += 1
gc.collect() # release the memory as much as possible
- MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+ MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
def items(self):
""" Return all merged items as iterator """
@@ -515,7 +515,7 @@ class ExternalSorter(object):
gc.collect()
batch //= 2
limit = self._next_limit()
- MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+ MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
DiskBytesSpilled += os.path.getsize(path)
os.unlink(path) # data will be deleted after close
@@ -630,7 +630,7 @@ class ExternalList(object):
self.values = []
gc.collect()
DiskBytesSpilled += self._file.tell() - pos
- MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+ MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
class ExternalListOfList(ExternalList):
@@ -794,7 +794,7 @@ class ExternalGroupBy(ExternalMerger):
self.spills += 1
gc.collect() # release the memory as much as possible
- MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+ MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
def _merged_items(self, index):
size = sum(os.path.getsize(os.path.join(self._get_spill_dir(j), str(index)))