aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2014-09-26 14:47:14 -0700
committerJosh Rosen <joshrosen@apache.org>2014-09-26 14:47:14 -0700
commitf872e4fb80b8429800daa9c44c0cac620c1ff303 (patch)
treed43c6d4ebb15b4fca42f50e241bfc4ff94bb13d6 /python/pyspark/context.py
parent7364fa5a176da69e425bca0e3e137ee73275c78c (diff)
downloadspark-f872e4fb80b8429800daa9c44c0cac620c1ff303.tar.gz
spark-f872e4fb80b8429800daa9c44c0cac620c1ff303.tar.bz2
spark-f872e4fb80b8429800daa9c44c0cac620c1ff303.zip
Revert "[SPARK-3478] [PySpark] Profile the Python tasks"
This reverts commit 1aa549ba9839565274a12c52fa1075b424f138a6.
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py39
1 files changed, 1 insertions, 38 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index abeda19b77..8e7b00469e 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -20,7 +20,6 @@ import shutil
import sys
from threading import Lock
from tempfile import NamedTemporaryFile
-import atexit
from pyspark import accumulators
from pyspark.accumulators import Accumulator
@@ -31,6 +30,7 @@ from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, CompressedSerializer
from pyspark.storagelevel import StorageLevel
+from pyspark import rdd
from pyspark.rdd import RDD
from pyspark.traceback_utils import CallSite, first_spark_call
@@ -192,9 +192,6 @@ class SparkContext(object):
self._temp_dir = \
self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()
- # profiling stats collected for each PythonRDD
- self._profile_stats = []
-
def _initialize_context(self, jconf):
"""
Initialize SparkContext in function to allow subclass specific initialization
@@ -795,40 +792,6 @@ class SparkContext(object):
it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
return list(mappedRDD._collect_iterator_through_file(it))
- def _add_profile(self, id, profileAcc):
- if not self._profile_stats:
- dump_path = self._conf.get("spark.python.profile.dump")
- if dump_path:
- atexit.register(self.dump_profiles, dump_path)
- else:
- atexit.register(self.show_profiles)
-
- self._profile_stats.append([id, profileAcc, False])
-
- def show_profiles(self):
- """ Print the profile stats to stdout """
- for i, (id, acc, showed) in enumerate(self._profile_stats):
- stats = acc.value
- if not showed and stats:
- print "=" * 60
- print "Profile of RDD<id=%d>" % id
- print "=" * 60
- stats.sort_stats("tottime", "cumtime").print_stats()
- # mark it as showed
- self._profile_stats[i][2] = True
-
- def dump_profiles(self, path):
- """ Dump the profile stats into directory `path`
- """
- if not os.path.exists(path):
- os.makedirs(path)
- for id, acc, _ in self._profile_stats:
- stats = acc.value
- if stats:
- p = os.path.join(path, "rdd_%d.pstats" % id)
- stats.dump_stats(p)
- self._profile_stats = []
-
def _test():
import atexit