diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/shuffle.py | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 1d0b16cade..81c420ce16 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -362,7 +362,7 @@ class ExternalMerger(Merger): self.spills += 1 gc.collect() # release the memory as much as possible - MemoryBytesSpilled += (used_memory - get_used_memory()) << 20 + MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20 def items(self): """ Return all merged items as iterator """ @@ -515,7 +515,7 @@ class ExternalSorter(object): gc.collect() batch //= 2 limit = self._next_limit() - MemoryBytesSpilled += (used_memory - get_used_memory()) << 20 + MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20 DiskBytesSpilled += os.path.getsize(path) os.unlink(path) # data will be deleted after close @@ -630,7 +630,7 @@ class ExternalList(object): self.values = [] gc.collect() DiskBytesSpilled += self._file.tell() - pos - MemoryBytesSpilled += (used_memory - get_used_memory()) << 20 + MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20 class ExternalListOfList(ExternalList): @@ -794,7 +794,7 @@ class ExternalGroupBy(ExternalMerger): self.spills += 1 gc.collect() # release the memory as much as possible - MemoryBytesSpilled += (used_memory - get_used_memory()) << 20 + MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20 def _merged_items(self, index): size = sum(os.path.getsize(os.path.join(self._get_spill_dir(j), str(index))) |