aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
authorYandu Oppacher <yandu.oppacher@jadedpixel.com>2015-01-28 13:48:06 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-01-28 13:48:06 -0800
commit3bead67d5926a2a798ca0e2bc71e747380493787 (patch)
treece36d8e926702f8da17b9f43c49e1682d1e9fccb /python/pyspark/worker.py
parenta731314c319a6f265060e05267844069027804fd (diff)
downloadspark-3bead67d5926a2a798ca0e2bc71e747380493787.tar.gz
spark-3bead67d5926a2a798ca0e2bc71e747380493787.tar.bz2
spark-3bead67d5926a2a798ca0e2bc71e747380493787.zip
[SPARK-4387][PySpark] Refactoring python profiling code to make it extensible
This PR is based on #3255 , fix conflicts and code style. Closes #3255. Author: Yandu Oppacher <yandu.oppacher@jadedpixel.com> Author: Davies Liu <davies@databricks.com> Closes #3901 from davies/refactor-python-profile-code and squashes the following commits: b4a9306 [Davies Liu] fix tests 4b79ce8 [Davies Liu] add docstring for profiler_cls 2700e47 [Davies Liu] use BasicProfiler as default 349e341 [Davies Liu] more refactor 6a5d4df [Davies Liu] refactor and fix tests 31bf6b6 [Davies Liu] fix code style 0864b5d [Yandu Oppacher] Remove unused method 76a6c37 [Yandu Oppacher] Added a profile collector to accumulate the profilers per stage 9eefc36 [Yandu Oppacher] Fix doc 9ace076 [Yandu Oppacher] Refactor of profiler, and moved tests around 8739aff [Yandu Oppacher] Code review fixes 9bda3ec [Yandu Oppacher] Refactor profiler code
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py12
1 files changed, 3 insertions, 9 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 7e5343c973..8a93c320ec 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -23,8 +23,6 @@ import sys
import time
import socket
import traceback
-import cProfile
-import pstats
from pyspark.accumulators import _accumulatorRegistry
from pyspark.broadcast import Broadcast, _broadcastRegistry
@@ -90,19 +88,15 @@ def main(infile, outfile):
command = pickleSer._read_with_length(infile)
if isinstance(command, Broadcast):
command = pickleSer.loads(command.value)
- (func, stats, deserializer, serializer) = command
+ (func, profiler, deserializer, serializer) = command
init_time = time.time()
def process():
iterator = deserializer.load_stream(infile)
serializer.dump_stream(func(split_index, iterator), outfile)
- if stats:
- p = cProfile.Profile()
- p.runcall(process)
- st = pstats.Stats(p)
- st.stream = None # make it picklable
- stats.add(st.strip_dirs())
+ if profiler:
+ profiler.profile(process)
else:
process()
except Exception: