From f872e4fb80b8429800daa9c44c0cac620c1ff303 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 26 Sep 2014 14:47:14 -0700 Subject: Revert "[SPARK-3478] [PySpark] Profile the Python tasks" This reverts commit 1aa549ba9839565274a12c52fa1075b424f138a6. --- python/pyspark/accumulators.py | 15 --------------- python/pyspark/context.py | 39 +-------------------------------------- python/pyspark/rdd.py | 10 ++-------- python/pyspark/sql.py | 2 +- python/pyspark/tests.py | 30 ------------------------------ python/pyspark/worker.py | 19 +++---------------- 6 files changed, 7 insertions(+), 108 deletions(-) (limited to 'python') diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index b8cdbbe3cf..ccbca67656 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -215,21 +215,6 @@ FLOAT_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0) COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j) -class PStatsParam(AccumulatorParam): - """PStatsParam is used to merge pstats.Stats""" - - @staticmethod - def zero(value): - return None - - @staticmethod - def addInPlace(value1, value2): - if value1 is None: - return value2 - value1.add(value2) - return value1 - - class _UpdateRequestHandler(SocketServer.StreamRequestHandler): """ diff --git a/python/pyspark/context.py b/python/pyspark/context.py index abeda19b77..8e7b00469e 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -20,7 +20,6 @@ import shutil import sys from threading import Lock from tempfile import NamedTemporaryFile -import atexit from pyspark import accumulators from pyspark.accumulators import Accumulator @@ -31,6 +30,7 @@ from pyspark.java_gateway import launch_gateway from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ PairDeserializer, CompressedSerializer from pyspark.storagelevel import StorageLevel +from pyspark import rdd from pyspark.rdd import RDD from pyspark.traceback_utils import CallSite, first_spark_call @@ -192,9 +192,6 @@ class SparkContext(object): self._temp_dir = \ self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() - # profiling stats collected for each PythonRDD - self._profile_stats = [] - def _initialize_context(self, jconf): """ Initialize SparkContext in function to allow subclass specific initialization @@ -795,40 +792,6 @@ class SparkContext(object): it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) return list(mappedRDD._collect_iterator_through_file(it)) - def _add_profile(self, id, profileAcc): - if not self._profile_stats: - dump_path = self._conf.get("spark.python.profile.dump") - if dump_path: - atexit.register(self.dump_profiles, dump_path) - else: - atexit.register(self.show_profiles) - - self._profile_stats.append([id, profileAcc, False]) - - def show_profiles(self): - """ Print the profile stats to stdout """ - for i, (id, acc, showed) in enumerate(self._profile_stats): - stats = acc.value - if not showed and stats: - print "=" * 60 - print "Profile of RDD" % id - print "=" * 60 - stats.sort_stats("tottime", "cumtime").print_stats() - # mark it as showed - self._profile_stats[i][2] = True - - def dump_profiles(self, path): - """ Dump the profile stats into directory `path` - """ - if not os.path.exists(path): - os.makedirs(path) - for id, acc, _ in self._profile_stats: - stats = acc.value - if stats: - p = os.path.join(path, "rdd_%d.pstats" % id) - stats.dump_stats(p) - self._profile_stats = [] - def _test(): import atexit diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 8ed89e2f97..680140d72d 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -15,6 +15,7 @@ # limitations under the License. # +from base64 import standard_b64encode as b64enc import copy from collections import defaultdict from itertools import chain, ifilter, imap @@ -31,7 +32,6 @@ import bisect from random import Random from math import sqrt, log, isinf, isnan -from pyspark.accumulators import PStatsParam from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ PickleSerializer, pack_long, AutoBatchedSerializer @@ -2080,9 +2080,7 @@ class PipelinedRDD(RDD): return self._jrdd_val if self._bypass_serializer: self._jrdd_deserializer = NoOpSerializer() - enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true" - profileStats = self.ctx.accumulator(None, PStatsParam) if enable_profile else None - command = (self.func, profileStats, self._prev_jrdd_deserializer, + command = (self.func, self._prev_jrdd_deserializer, self._jrdd_deserializer) # the serialized command will be compressed by broadcast ser = CloudPickleSerializer() @@ -2104,10 +2102,6 @@ class PipelinedRDD(RDD): self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator) self._jrdd_val = python_rdd.asJavaRDD() - - if enable_profile: - self._id = self._jrdd_val.id() - self.ctx._add_profile(self._id, profileStats) return self._jrdd_val def id(self): diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index ee5bda8bb4..653195ea43 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -974,7 +974,7 @@ class SQLContext(object): [Row(c0=4)] """ func = lambda _, it: imap(lambda x: f(*x), it) - command = (func, None, + command = (func, BatchedSerializer(PickleSerializer(), 1024), BatchedSerializer(PickleSerializer(), 1024)) ser = CloudPickleSerializer() diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index e6002afa9c..d1bb2033b7 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -632,36 +632,6 @@ class TestRDDFunctions(PySparkTestCase): self.assertEquals(result.count(), 3) -class TestProfiler(PySparkTestCase): - - def setUp(self): - self._old_sys_path = list(sys.path) - class_name = self.__class__.__name__ - conf = SparkConf().set("spark.python.profile", "true") - self.sc = SparkContext('local[4]', class_name, batchSize=2, conf=conf) - - def test_profiler(self): - - def heavy_foo(x): - for i in range(1 << 20): - x = 1 - rdd = self.sc.parallelize(range(100)) - rdd.foreach(heavy_foo) - profiles = self.sc._profile_stats - self.assertEqual(1, len(profiles)) - id, acc, _ = profiles[0] - stats = acc.value - self.assertTrue(stats is not None) - width, stat_list = stats.get_print_list([]) - func_names = [func_name for fname, n, func_name in stat_list] - self.assertTrue("heavy_foo" in func_names) - - self.sc.show_profiles() - d = tempfile.gettempdir() - self.sc.dump_profiles(d) - self.assertTrue("rdd_%d.pstats" % id in os.listdir(d)) - - class TestSQL(PySparkTestCase): def setUp(self): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 8257dddfee..c1f6e3e4a1 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -23,8 +23,6 @@ import sys import time import socket import traceback -import cProfile -import pstats from pyspark.accumulators import _accumulatorRegistry from pyspark.broadcast import Broadcast, _broadcastRegistry @@ -92,21 +90,10 @@ def main(infile, outfile): command = pickleSer._read_with_length(infile) if isinstance(command, Broadcast): command = pickleSer.loads(command.value) - (func, stats, deserializer, serializer) = command + (func, deserializer, serializer) = command init_time = time.time() - - def process(): - iterator = deserializer.load_stream(infile) - serializer.dump_stream(func(split_index, iterator), outfile) - - if stats: - p = cProfile.Profile() - p.runcall(process) - st = pstats.Stats(p) - st.stream = None # make it picklable - stats.add(st.strip_dirs()) - else: - process() + iterator = deserializer.load_stream(infile) + serializer.dump_stream(func(split_index, iterator), outfile) except Exception: try: write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile) -- cgit v1.2.3