From c5414b681868a0a11cc5a94184116e66e8d3e9c0 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 30 Sep 2014 18:24:57 -0700 Subject: [SPARK-3478] [PySpark] Profile the Python tasks This patch add profiling support for PySpark, it will show the profiling results before the driver exits, here is one example: ``` ============================================================ Profile of RDD ============================================================ 5146507 function calls (5146487 primitive calls) in 71.094 seconds Ordered by: internal time, cumulative time ncalls tottime percall cumtime percall filename:lineno(function) 5144576 68.331 0.000 68.331 0.000 statcounter.py:44(merge) 20 2.735 0.137 71.071 3.554 statcounter.py:33(__init__) 20 0.017 0.001 0.017 0.001 {cPickle.dumps} 1024 0.003 0.000 0.003 0.000 t.py:16() 20 0.001 0.000 0.001 0.000 {reduce} 21 0.001 0.000 0.001 0.000 {cPickle.loads} 20 0.001 0.000 0.001 0.000 copy_reg.py:95(_slotnames) 41 0.001 0.000 0.001 0.000 serializers.py:461(read_int) 40 0.001 0.000 0.002 0.000 serializers.py:179(_batched) 62 0.000 0.000 0.000 0.000 {method 'read' of 'file' objects} 20 0.000 0.000 71.072 3.554 rdd.py:863() 20 0.000 0.000 0.001 0.000 serializers.py:198(load_stream) 40/20 0.000 0.000 71.072 3.554 rdd.py:2093(pipeline_func) 41 0.000 0.000 0.002 0.000 serializers.py:130(load_stream) 40 0.000 0.000 71.072 1.777 rdd.py:304(func) 20 0.000 0.000 71.094 3.555 worker.py:82(process) ``` Also, use can show profile result manually by `sc.show_profiles()` or dump it into disk by `sc.dump_profiles(path)`, such as ```python >>> sc._conf.set("spark.python.profile", "true") >>> rdd = sc.parallelize(range(100)).map(str) >>> rdd.count() 100 >>> sc.show_profiles() ============================================================ Profile of RDD ============================================================ 284 function calls (276 primitive calls) in 0.001 seconds Ordered by: internal time, cumulative time ncalls tottime percall cumtime percall filename:lineno(function) 4 0.000 0.000 0.000 0.000 serializers.py:198(load_stream) 4 0.000 0.000 0.000 0.000 {reduce} 12/4 0.000 0.000 0.001 0.000 rdd.py:2092(pipeline_func) 4 0.000 0.000 0.000 0.000 {cPickle.loads} 4 0.000 0.000 0.000 0.000 {cPickle.dumps} 104 0.000 0.000 0.000 0.000 rdd.py:852() 8 0.000 0.000 0.000 0.000 serializers.py:461(read_int) 12 0.000 0.000 0.000 0.000 rdd.py:303(func) ``` The profiling is disabled by default, can be enabled by "spark.python.profile=true". Also, users can dump the results into disks automatically for future analysis, by "spark.python.profile.dump=path_to_dump" This is bugfix of #2351 cc JoshRosen Author: Davies Liu Closes #2556 from davies/profiler and squashes the following commits: e68df5a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler 858e74c [Davies Liu] compatitable with python 2.6 7ef2aa0 [Davies Liu] bugfix, add tests for show_profiles and dump_profiles() 2b0daf2 [Davies Liu] fix docs 7a56c24 [Davies Liu] bugfix cba9463 [Davies Liu] move show_profiles and dump_profiles to SparkContext fb9565b [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler 116d52a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler 09d02c3 [Davies Liu] Merge branch 'master' into profiler c23865c [Davies Liu] Merge branch 'master' into profiler 15d6f18 [Davies Liu] add docs for two configs dadee1a [Davies Liu] add docs string and clear profiles after show or dump 4f8309d [Davies Liu] address comment, add tests 0a5b6eb [Davies Liu] fix Python UDF 4b20494 [Davies Liu] add profile for python --- python/pyspark/context.py | 39 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) (limited to 'python/pyspark/context.py') diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 8e7b00469e..e9418320ff 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -20,6 +20,7 @@ import shutil import sys from threading import Lock from tempfile import NamedTemporaryFile +import atexit from pyspark import accumulators from pyspark.accumulators import Accumulator @@ -30,7 +31,6 @@ from pyspark.java_gateway import launch_gateway from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ PairDeserializer, CompressedSerializer from pyspark.storagelevel import StorageLevel -from pyspark import rdd from pyspark.rdd import RDD from pyspark.traceback_utils import CallSite, first_spark_call @@ -192,6 +192,9 @@ class SparkContext(object): self._temp_dir = \ self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() + # profiling stats collected for each PythonRDD + self._profile_stats = [] + def _initialize_context(self, jconf): """ Initialize SparkContext in function to allow subclass specific initialization @@ -792,6 +795,40 @@ class SparkContext(object): it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) return list(mappedRDD._collect_iterator_through_file(it)) + def _add_profile(self, id, profileAcc): + if not self._profile_stats: + dump_path = self._conf.get("spark.python.profile.dump") + if dump_path: + atexit.register(self.dump_profiles, dump_path) + else: + atexit.register(self.show_profiles) + + self._profile_stats.append([id, profileAcc, False]) + + def show_profiles(self): + """ Print the profile stats to stdout """ + for i, (id, acc, showed) in enumerate(self._profile_stats): + stats = acc.value + if not showed and stats: + print "=" * 60 + print "Profile of RDD" % id + print "=" * 60 + stats.sort_stats("time", "cumulative").print_stats() + # mark it as showed + self._profile_stats[i][2] = True + + def dump_profiles(self, path): + """ Dump the profile stats into directory `path` + """ + if not os.path.exists(path): + os.makedirs(path) + for id, acc, _ in self._profile_stats: + stats = acc.value + if stats: + p = os.path.join(path, "rdd_%d.pstats" % id) + stats.dump_stats(p) + self._profile_stats = [] + def _test(): import atexit -- cgit v1.2.3