diff options
author | Aaron Staple <aaron.staple@gmail.com> | 2014-09-15 19:28:17 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-09-15 19:28:17 -0700 |
commit | 60050f42885582a699fc7a6fa0529964162bb8a3 (patch) | |
tree | 4014b35f39ee99dcd3d26b288dba0e7555a97ad6 /python/pyspark/rdd.py | |
parent | da33acb8b681eca5e787d546fe922af76a151398 (diff) | |
download | spark-60050f42885582a699fc7a6fa0529964162bb8a3.tar.gz spark-60050f42885582a699fc7a6fa0529964162bb8a3.tar.bz2 spark-60050f42885582a699fc7a6fa0529964162bb8a3.zip |
[SPARK-1087] Move python traceback utilities into new traceback_utils.py file.
Also made some cosmetic cleanups.
Author: Aaron Staple <aaron.staple@gmail.com>
Closes #2385 from staple/SPARK-1087 and squashes the following commits:
7b3bb13 [Aaron Staple] Address review comments, cosmetic cleanups.
10ba6e1 [Aaron Staple] [SPARK-1087] Move python traceback utilities into new traceback_utils.py file.
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r-- | python/pyspark/rdd.py | 58 |
1 files changed, 3 insertions, 55 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 6ad5ab2a2d..21f182b0ff 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -18,13 +18,11 @@ from base64 import standard_b64encode as b64enc import copy from collections import defaultdict -from collections import namedtuple from itertools import chain, ifilter, imap import operator import os import sys import shlex -import traceback from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile from threading import Thread @@ -45,6 +43,7 @@ from pyspark.storagelevel import StorageLevel from pyspark.resultiterable import ResultIterable from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \ get_used_memory, ExternalSorter +from pyspark.traceback_utils import SCCallSiteSync from py4j.java_collections import ListConverter, MapConverter @@ -81,57 +80,6 @@ def portable_hash(x): return hash(x) -def _extract_concise_traceback(): - """ - This function returns the traceback info for a callsite, returns a dict - with function name, file name and line number - """ - tb = traceback.extract_stack() - callsite = namedtuple("Callsite", "function file linenum") - if len(tb) == 0: - return None - file, line, module, what = tb[len(tb) - 1] - sparkpath = os.path.dirname(file) - first_spark_frame = len(tb) - 1 - for i in range(0, len(tb)): - file, line, fun, what = tb[i] - if file.startswith(sparkpath): - first_spark_frame = i - break - if first_spark_frame == 0: - file, line, fun, what = tb[0] - return callsite(function=fun, file=file, linenum=line) - sfile, sline, sfun, swhat = tb[first_spark_frame] - ufile, uline, ufun, uwhat = tb[first_spark_frame - 1] - return callsite(function=sfun, file=ufile, linenum=uline) - -_spark_stack_depth = 0 - - -class _JavaStackTrace(object): - - def __init__(self, sc): - tb = _extract_concise_traceback() - if tb is not None: - self._traceback = "%s at %s:%s" % ( - tb.function, tb.file, tb.linenum) - else: - self._traceback = "Error! Could not extract traceback info" - self._context = sc - - def __enter__(self): - global _spark_stack_depth - if _spark_stack_depth == 0: - self._context._jsc.setCallSite(self._traceback) - _spark_stack_depth += 1 - - def __exit__(self, type, value, tb): - global _spark_stack_depth - _spark_stack_depth -= 1 - if _spark_stack_depth == 0: - self._context._jsc.setCallSite(None) - - class BoundedFloat(float): """ Bounded value is generated by approximate job, with confidence and low @@ -704,7 +652,7 @@ class RDD(object): """ Return a list that contains all of the elements in this RDD. """ - with _JavaStackTrace(self.context) as st: + with SCCallSiteSync(self.context) as css: bytesInJava = self._jrdd.collect().iterator() return list(self._collect_iterator_through_file(bytesInJava)) @@ -1515,7 +1463,7 @@ class RDD(object): keyed = self.mapPartitionsWithIndex(add_shuffle_key) keyed._bypass_serializer = True - with _JavaStackTrace(self.context) as st: + with SCCallSiteSync(self.context) as css: pairRDD = self.ctx._jvm.PairwiseRDD( keyed._jrdd.rdd()).asJavaPairRDD() partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, |