aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2014-09-26 14:47:14 -0700
committerJosh Rosen <joshrosen@apache.org>2014-09-26 14:47:14 -0700
commitf872e4fb80b8429800daa9c44c0cac620c1ff303 (patch)
treed43c6d4ebb15b4fca42f50e241bfc4ff94bb13d6 /python
parent7364fa5a176da69e425bca0e3e137ee73275c78c (diff)
downloadspark-f872e4fb80b8429800daa9c44c0cac620c1ff303.tar.gz
spark-f872e4fb80b8429800daa9c44c0cac620c1ff303.tar.bz2
spark-f872e4fb80b8429800daa9c44c0cac620c1ff303.zip
Revert "[SPARK-3478] [PySpark] Profile the Python tasks"
This reverts commit 1aa549ba9839565274a12c52fa1075b424f138a6.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/accumulators.py15
-rw-r--r--python/pyspark/context.py39
-rw-r--r--python/pyspark/rdd.py10
-rw-r--r--python/pyspark/sql.py2
-rw-r--r--python/pyspark/tests.py30
-rw-r--r--python/pyspark/worker.py19
6 files changed, 7 insertions, 108 deletions
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index b8cdbbe3cf..ccbca67656 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -215,21 +215,6 @@ FLOAT_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0)
COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j)
-class PStatsParam(AccumulatorParam):
- """PStatsParam is used to merge pstats.Stats"""
-
- @staticmethod
- def zero(value):
- return None
-
- @staticmethod
- def addInPlace(value1, value2):
- if value1 is None:
- return value2
- value1.add(value2)
- return value1
-
-
class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
"""
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index abeda19b77..8e7b00469e 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -20,7 +20,6 @@ import shutil
import sys
from threading import Lock
from tempfile import NamedTemporaryFile
-import atexit
from pyspark import accumulators
from pyspark.accumulators import Accumulator
@@ -31,6 +30,7 @@ from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, CompressedSerializer
from pyspark.storagelevel import StorageLevel
+from pyspark import rdd
from pyspark.rdd import RDD
from pyspark.traceback_utils import CallSite, first_spark_call
@@ -192,9 +192,6 @@ class SparkContext(object):
self._temp_dir = \
self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()
- # profiling stats collected for each PythonRDD
- self._profile_stats = []
-
def _initialize_context(self, jconf):
"""
Initialize SparkContext in function to allow subclass specific initialization
@@ -795,40 +792,6 @@ class SparkContext(object):
it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
return list(mappedRDD._collect_iterator_through_file(it))
- def _add_profile(self, id, profileAcc):
- if not self._profile_stats:
- dump_path = self._conf.get("spark.python.profile.dump")
- if dump_path:
- atexit.register(self.dump_profiles, dump_path)
- else:
- atexit.register(self.show_profiles)
-
- self._profile_stats.append([id, profileAcc, False])
-
- def show_profiles(self):
- """ Print the profile stats to stdout """
- for i, (id, acc, showed) in enumerate(self._profile_stats):
- stats = acc.value
- if not showed and stats:
- print "=" * 60
- print "Profile of RDD<id=%d>" % id
- print "=" * 60
- stats.sort_stats("tottime", "cumtime").print_stats()
- # mark it as showed
- self._profile_stats[i][2] = True
-
- def dump_profiles(self, path):
- """ Dump the profile stats into directory `path`
- """
- if not os.path.exists(path):
- os.makedirs(path)
- for id, acc, _ in self._profile_stats:
- stats = acc.value
- if stats:
- p = os.path.join(path, "rdd_%d.pstats" % id)
- stats.dump_stats(p)
- self._profile_stats = []
-
def _test():
import atexit
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 8ed89e2f97..680140d72d 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+from base64 import standard_b64encode as b64enc
import copy
from collections import defaultdict
from itertools import chain, ifilter, imap
@@ -31,7 +32,6 @@ import bisect
from random import Random
from math import sqrt, log, isinf, isnan
-from pyspark.accumulators import PStatsParam
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
PickleSerializer, pack_long, AutoBatchedSerializer
@@ -2080,9 +2080,7 @@ class PipelinedRDD(RDD):
return self._jrdd_val
if self._bypass_serializer:
self._jrdd_deserializer = NoOpSerializer()
- enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true"
- profileStats = self.ctx.accumulator(None, PStatsParam) if enable_profile else None
- command = (self.func, profileStats, self._prev_jrdd_deserializer,
+ command = (self.func, self._prev_jrdd_deserializer,
self._jrdd_deserializer)
# the serialized command will be compressed by broadcast
ser = CloudPickleSerializer()
@@ -2104,10 +2102,6 @@ class PipelinedRDD(RDD):
self.ctx.pythonExec,
broadcast_vars, self.ctx._javaAccumulator)
self._jrdd_val = python_rdd.asJavaRDD()
-
- if enable_profile:
- self._id = self._jrdd_val.id()
- self.ctx._add_profile(self._id, profileStats)
return self._jrdd_val
def id(self):
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index ee5bda8bb4..653195ea43 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -974,7 +974,7 @@ class SQLContext(object):
[Row(c0=4)]
"""
func = lambda _, it: imap(lambda x: f(*x), it)
- command = (func, None,
+ command = (func,
BatchedSerializer(PickleSerializer(), 1024),
BatchedSerializer(PickleSerializer(), 1024))
ser = CloudPickleSerializer()
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index e6002afa9c..d1bb2033b7 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -632,36 +632,6 @@ class TestRDDFunctions(PySparkTestCase):
self.assertEquals(result.count(), 3)
-class TestProfiler(PySparkTestCase):
-
- def setUp(self):
- self._old_sys_path = list(sys.path)
- class_name = self.__class__.__name__
- conf = SparkConf().set("spark.python.profile", "true")
- self.sc = SparkContext('local[4]', class_name, batchSize=2, conf=conf)
-
- def test_profiler(self):
-
- def heavy_foo(x):
- for i in range(1 << 20):
- x = 1
- rdd = self.sc.parallelize(range(100))
- rdd.foreach(heavy_foo)
- profiles = self.sc._profile_stats
- self.assertEqual(1, len(profiles))
- id, acc, _ = profiles[0]
- stats = acc.value
- self.assertTrue(stats is not None)
- width, stat_list = stats.get_print_list([])
- func_names = [func_name for fname, n, func_name in stat_list]
- self.assertTrue("heavy_foo" in func_names)
-
- self.sc.show_profiles()
- d = tempfile.gettempdir()
- self.sc.dump_profiles(d)
- self.assertTrue("rdd_%d.pstats" % id in os.listdir(d))
-
-
class TestSQL(PySparkTestCase):
def setUp(self):
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 8257dddfee..c1f6e3e4a1 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -23,8 +23,6 @@ import sys
import time
import socket
import traceback
-import cProfile
-import pstats
from pyspark.accumulators import _accumulatorRegistry
from pyspark.broadcast import Broadcast, _broadcastRegistry
@@ -92,21 +90,10 @@ def main(infile, outfile):
command = pickleSer._read_with_length(infile)
if isinstance(command, Broadcast):
command = pickleSer.loads(command.value)
- (func, stats, deserializer, serializer) = command
+ (func, deserializer, serializer) = command
init_time = time.time()
-
- def process():
- iterator = deserializer.load_stream(infile)
- serializer.dump_stream(func(split_index, iterator), outfile)
-
- if stats:
- p = cProfile.Profile()
- p.runcall(process)
- st = pstats.Stats(p)
- st.stream = None # make it picklable
- stats.add(st.strip_dirs())
- else:
- process()
+ iterator = deserializer.load_stream(infile)
+ serializer.dump_stream(func(split_index, iterator), outfile)
except Exception:
try:
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)