aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
authorYandu Oppacher <yandu.oppacher@jadedpixel.com>2015-01-28 13:48:06 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-01-28 13:48:06 -0800
commit3bead67d5926a2a798ca0e2bc71e747380493787 (patch)
treece36d8e926702f8da17b9f43c49e1682d1e9fccb /python/pyspark/context.py
parenta731314c319a6f265060e05267844069027804fd (diff)
downloadspark-3bead67d5926a2a798ca0e2bc71e747380493787.tar.gz
spark-3bead67d5926a2a798ca0e2bc71e747380493787.tar.bz2
spark-3bead67d5926a2a798ca0e2bc71e747380493787.zip
[SPARK-4387][PySpark] Refactoring python profiling code to make it extensible
This PR is based on #3255 , fix conflicts and code style. Closes #3255. Author: Yandu Oppacher <yandu.oppacher@jadedpixel.com> Author: Davies Liu <davies@databricks.com> Closes #3901 from davies/refactor-python-profile-code and squashes the following commits: b4a9306 [Davies Liu] fix tests 4b79ce8 [Davies Liu] add docstring for profiler_cls 2700e47 [Davies Liu] use BasicProfiler as default 349e341 [Davies Liu] more refactor 6a5d4df [Davies Liu] refactor and fix tests 31bf6b6 [Davies Liu] fix code style 0864b5d [Yandu Oppacher] Remove unused method 76a6c37 [Yandu Oppacher] Added a profile collector to accumulate the profilers per stage 9eefc36 [Yandu Oppacher] Fix doc 9ace076 [Yandu Oppacher] Refactor of profiler, and moved tests around 8739aff [Yandu Oppacher] Code review fixes 9bda3ec [Yandu Oppacher] Refactor profiler code
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py46
1 files changed, 14 insertions, 32 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 568e21f380..c0dec16ac1 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -20,7 +20,6 @@ import shutil
import sys
from threading import Lock
from tempfile import NamedTemporaryFile
-import atexit
from pyspark import accumulators
from pyspark.accumulators import Accumulator
@@ -33,6 +32,7 @@ from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deseria
from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
from pyspark.traceback_utils import CallSite, first_spark_call
+from pyspark.profiler import ProfilerCollector, BasicProfiler
from py4j.java_collections import ListConverter
@@ -66,7 +66,7 @@ class SparkContext(object):
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
- gateway=None, jsc=None):
+ gateway=None, jsc=None, profiler_cls=BasicProfiler):
"""
Create a new SparkContext. At least the master and app name should be set,
either through the named parameters here or through C{conf}.
@@ -88,6 +88,9 @@ class SparkContext(object):
:param conf: A L{SparkConf} object setting Spark properties.
:param gateway: Use an existing gateway and JVM, otherwise a new JVM
will be instantiated.
+ :param jsc: The JavaSparkContext instance (optional).
+ :param profiler_cls: A class of custom Profiler used to do profiling
+ (default is pyspark.profiler.BasicProfiler).
>>> from pyspark.context import SparkContext
@@ -102,14 +105,14 @@ class SparkContext(object):
SparkContext._ensure_initialized(self, gateway=gateway)
try:
self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
- conf, jsc)
+ conf, jsc, profiler_cls)
except:
# If an error occurs, clean up in order to allow future SparkContext creation:
self.stop()
raise
def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
- conf, jsc):
+ conf, jsc, profiler_cls):
self.environment = environment or {}
self._conf = conf or SparkConf(_jvm=self._jvm)
self._batchSize = batchSize # -1 represents an unlimited batch size
@@ -192,7 +195,11 @@ class SparkContext(object):
self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()
# profiling stats collected for each PythonRDD
- self._profile_stats = []
+ if self._conf.get("spark.python.profile", "false") == "true":
+ dump_path = self._conf.get("spark.python.profile.dump", None)
+ self.profiler_collector = ProfilerCollector(profiler_cls, dump_path)
+ else:
+ self.profiler_collector = None
def _initialize_context(self, jconf):
"""
@@ -826,39 +833,14 @@ class SparkContext(object):
it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
return list(mappedRDD._collect_iterator_through_file(it))
- def _add_profile(self, id, profileAcc):
- if not self._profile_stats:
- dump_path = self._conf.get("spark.python.profile.dump")
- if dump_path:
- atexit.register(self.dump_profiles, dump_path)
- else:
- atexit.register(self.show_profiles)
-
- self._profile_stats.append([id, profileAcc, False])
-
def show_profiles(self):
""" Print the profile stats to stdout """
- for i, (id, acc, showed) in enumerate(self._profile_stats):
- stats = acc.value
- if not showed and stats:
- print "=" * 60
- print "Profile of RDD<id=%d>" % id
- print "=" * 60
- stats.sort_stats("time", "cumulative").print_stats()
- # mark it as showed
- self._profile_stats[i][2] = True
+ self.profiler_collector.show_profiles()
def dump_profiles(self, path):
""" Dump the profile stats into directory `path`
"""
- if not os.path.exists(path):
- os.makedirs(path)
- for id, acc, _ in self._profile_stats:
- stats = acc.value
- if stats:
- p = os.path.join(path, "rdd_%d.pstats" % id)
- stats.dump_stats(p)
- self._profile_stats = []
+ self.profiler_collector.dump_profiles(path)
def _test():