aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-09-13 22:31:21 -0700
committerJosh Rosen <joshrosen@apache.org>2014-09-13 22:31:21 -0700
commit4e3fbe8cdb6c6291e219195abb272f3c81f0ed63 (patch)
tree684e60e57003fa6441a384a93cd6b0e746445c95 /python/pyspark/worker.py
parent2aea0da84c58a179917311290083456dfa043db7 (diff)
downloadspark-4e3fbe8cdb6c6291e219195abb272f3c81f0ed63.tar.gz
spark-4e3fbe8cdb6c6291e219195abb272f3c81f0ed63.tar.bz2
spark-4e3fbe8cdb6c6291e219195abb272f3c81f0ed63.zip
[SPARK-3463] [PySpark] aggregate and show spilled bytes in Python
Aggregate the number of bytes spilled into disks during aggregation or sorting, show them in Web UI. ![spilled](https://cloud.githubusercontent.com/assets/40902/4209758/4b995562-386d-11e4-97c1-8e838ee1d4e3.png) This patch is blocked by SPARK-3465. (It includes a fix for that). Author: Davies Liu <davies.liu@gmail.com> Closes #2336 from davies/metrics and squashes the following commits: e37df38 [Davies Liu] remove outdated comments 1245eb7 [Davies Liu] remove the temporary fix ebd2f43 [Davies Liu] Merge branch 'master' into metrics 7e4ad04 [Davies Liu] Merge branch 'master' into metrics fbe9029 [Davies Liu] show spilled bytes in Python in web ui
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py14
1 files changed, 10 insertions, 4 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 61b8a74d06..252176ac65 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -23,16 +23,14 @@ import sys
import time
import socket
import traceback
-# CloudPickler needs to be imported so that depicklers are registered using the
-# copy_reg module.
+
from pyspark.accumulators import _accumulatorRegistry
from pyspark.broadcast import Broadcast, _broadcastRegistry
-from pyspark.cloudpickle import CloudPickler
from pyspark.files import SparkFiles
from pyspark.serializers import write_with_length, write_int, read_long, \
write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, \
CompressedSerializer
-
+from pyspark import shuffle
pickleSer = PickleSerializer()
utf8_deserializer = UTF8Deserializer()
@@ -52,6 +50,11 @@ def main(infile, outfile):
if split_index == -1: # for unit tests
return
+ # initialize global state
+ shuffle.MemoryBytesSpilled = 0
+ shuffle.DiskBytesSpilled = 0
+ _accumulatorRegistry.clear()
+
# fetch name of workdir
spark_files_dir = utf8_deserializer.loads(infile)
SparkFiles._root_directory = spark_files_dir
@@ -97,6 +100,9 @@ def main(infile, outfile):
exit(-1)
finish_time = time.time()
report_times(outfile, boot_time, init_time, finish_time)
+ write_long(shuffle.MemoryBytesSpilled, outfile)
+ write_long(shuffle.DiskBytesSpilled, outfile)
+
# Mark the beginning of the accumulators section of the output
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
write_int(len(_accumulatorRegistry), outfile)