aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/shuffle.py
diff options
context:
space:
mode:
authorlinweizhong <linweizhong@huawei.com>2015-05-26 08:35:39 -0700
committerDavies Liu <davies@databricks.com>2015-05-26 08:35:39 -0700
commit8948ad3fb5d5d095d3942855960d735f27d97dd5 (patch)
tree2e45c6a8199b0e6c6181aa2e5cfba8d4f20e7b87 /python/pyspark/shuffle.py
parentbf49c22130af9a729dcc510743e4c1ea4c5d2439 (diff)
downloadspark-8948ad3fb5d5d095d3942855960d735f27d97dd5.tar.gz
spark-8948ad3fb5d5d095d3942855960d735f27d97dd5.tar.bz2
spark-8948ad3fb5d5d095d3942855960d735f27d97dd5.zip
[SPARK-7339] [PYSPARK] PySpark shuffle spill memory sometimes are not correct
In PySpark we get memory used before and after spill, then use the difference of these two value as memorySpilled, but if the before value is small than after value, then we will get a negative value, but this scenario 0 value may be more reasonable. Below is the result in HistoryServer we have tested: Index ID Attempt Status Locality Level Executor ID / Host Launch Time Duration GC Time Input Size / Records Write Time Shuffle Write Size / Records Shuffle Spill (Memory) Shuffle Spill (Disk) Errors 0 0 0 SUCCESS NODE_LOCAL 3 / vm119 2015/05/04 17:31:06 21 s 0.1 s 128.1 MB (hadoop) / 3237 70 ms 10.1 MB / 2529 0.0 B 5.7 MB 2 2 0 SUCCESS NODE_LOCAL 1 / vm118 2015/05/04 17:31:06 22 s 89 ms 128.1 MB (hadoop) / 3205 0.1 s 10.1 MB / 2529 -1048576.0 B 5.9 MB 1 1 0 SUCCESS NODE_LOCAL 2 / vm117 2015/05/04 17:31:06 22 s 0.1 s 128.1 MB (hadoop) / 3271 68 ms 10.1 MB / 2529 -1048576.0 B 5.6 MB 4 4 0 SUCCESS NODE_LOCAL 2 / vm117 2015/05/04 17:31:06 22 s 0.1 s 128.1 MB (hadoop) / 3192 51 ms 10.1 MB / 2529 -1048576.0 B 5.9 MB 3 3 0 SUCCESS NODE_LOCAL 3 / vm119 2015/05/04 17:31:06 22 s 0.1 s 128.1 MB (hadoop) / 3262 51 ms 10.1 MB / 2529 1024.0 KB 5.8 MB 5 5 0 SUCCESS NODE_LOCAL 1 / vm118 2015/05/04 17:31:06 22 s 89 ms 128.1 MB (hadoop) / 3256 93 ms 10.1 MB / 2529 -1048576.0 B 5.7 MB /cc davies Author: linweizhong <linweizhong@huawei.com> Closes #5887 from Sephiroth-Lin/spark-7339 and squashes the following commits: 9186c81 [linweizhong] Use max function to get a nonnegative value d41672b [linweizhong] Update MemoryBytesSpilled when memorySpilled > 0
Diffstat (limited to 'python/pyspark/shuffle.py')
-rw-r--r--python/pyspark/shuffle.py8
1 files changed, 4 insertions, 4 deletions
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index 1d0b16cade..81c420ce16 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -362,7 +362,7 @@ class ExternalMerger(Merger):
self.spills += 1
gc.collect() # release the memory as much as possible
- MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+ MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
def items(self):
""" Return all merged items as iterator """
@@ -515,7 +515,7 @@ class ExternalSorter(object):
gc.collect()
batch //= 2
limit = self._next_limit()
- MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+ MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
DiskBytesSpilled += os.path.getsize(path)
os.unlink(path) # data will be deleted after close
@@ -630,7 +630,7 @@ class ExternalList(object):
self.values = []
gc.collect()
DiskBytesSpilled += self._file.tell() - pos
- MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+ MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
class ExternalListOfList(ExternalList):
@@ -794,7 +794,7 @@ class ExternalGroupBy(ExternalMerger):
self.spills += 1
gc.collect() # release the memory as much as possible
- MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+ MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
def _merged_items(self, index):
size = sum(os.path.getsize(os.path.join(self._get_spill_dir(j), str(index)))