aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorAaron Staple <aaron.staple@gmail.com>2014-09-15 19:28:17 -0700
committerJosh Rosen <joshrosen@apache.org>2014-09-15 19:28:17 -0700
commit60050f42885582a699fc7a6fa0529964162bb8a3 (patch)
tree4014b35f39ee99dcd3d26b288dba0e7555a97ad6 /python/pyspark/rdd.py
parentda33acb8b681eca5e787d546fe922af76a151398 (diff)
downloadspark-60050f42885582a699fc7a6fa0529964162bb8a3.tar.gz
spark-60050f42885582a699fc7a6fa0529964162bb8a3.tar.bz2
spark-60050f42885582a699fc7a6fa0529964162bb8a3.zip
[SPARK-1087] Move python traceback utilities into new traceback_utils.py file.
Also made some cosmetic cleanups. Author: Aaron Staple <aaron.staple@gmail.com> Closes #2385 from staple/SPARK-1087 and squashes the following commits: 7b3bb13 [Aaron Staple] Address review comments, cosmetic cleanups. 10ba6e1 [Aaron Staple] [SPARK-1087] Move python traceback utilities into new traceback_utils.py file.
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py58
1 files changed, 3 insertions, 55 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 6ad5ab2a2d..21f182b0ff 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -18,13 +18,11 @@
from base64 import standard_b64encode as b64enc
import copy
from collections import defaultdict
-from collections import namedtuple
from itertools import chain, ifilter, imap
import operator
import os
import sys
import shlex
-import traceback
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from threading import Thread
@@ -45,6 +43,7 @@ from pyspark.storagelevel import StorageLevel
from pyspark.resultiterable import ResultIterable
from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \
get_used_memory, ExternalSorter
+from pyspark.traceback_utils import SCCallSiteSync
from py4j.java_collections import ListConverter, MapConverter
@@ -81,57 +80,6 @@ def portable_hash(x):
return hash(x)
-def _extract_concise_traceback():
- """
- This function returns the traceback info for a callsite, returns a dict
- with function name, file name and line number
- """
- tb = traceback.extract_stack()
- callsite = namedtuple("Callsite", "function file linenum")
- if len(tb) == 0:
- return None
- file, line, module, what = tb[len(tb) - 1]
- sparkpath = os.path.dirname(file)
- first_spark_frame = len(tb) - 1
- for i in range(0, len(tb)):
- file, line, fun, what = tb[i]
- if file.startswith(sparkpath):
- first_spark_frame = i
- break
- if first_spark_frame == 0:
- file, line, fun, what = tb[0]
- return callsite(function=fun, file=file, linenum=line)
- sfile, sline, sfun, swhat = tb[first_spark_frame]
- ufile, uline, ufun, uwhat = tb[first_spark_frame - 1]
- return callsite(function=sfun, file=ufile, linenum=uline)
-
-_spark_stack_depth = 0
-
-
-class _JavaStackTrace(object):
-
- def __init__(self, sc):
- tb = _extract_concise_traceback()
- if tb is not None:
- self._traceback = "%s at %s:%s" % (
- tb.function, tb.file, tb.linenum)
- else:
- self._traceback = "Error! Could not extract traceback info"
- self._context = sc
-
- def __enter__(self):
- global _spark_stack_depth
- if _spark_stack_depth == 0:
- self._context._jsc.setCallSite(self._traceback)
- _spark_stack_depth += 1
-
- def __exit__(self, type, value, tb):
- global _spark_stack_depth
- _spark_stack_depth -= 1
- if _spark_stack_depth == 0:
- self._context._jsc.setCallSite(None)
-
-
class BoundedFloat(float):
"""
Bounded value is generated by approximate job, with confidence and low
@@ -704,7 +652,7 @@ class RDD(object):
"""
Return a list that contains all of the elements in this RDD.
"""
- with _JavaStackTrace(self.context) as st:
+ with SCCallSiteSync(self.context) as css:
bytesInJava = self._jrdd.collect().iterator()
return list(self._collect_iterator_through_file(bytesInJava))
@@ -1515,7 +1463,7 @@ class RDD(object):
keyed = self.mapPartitionsWithIndex(add_shuffle_key)
keyed._bypass_serializer = True
- with _JavaStackTrace(self.context) as st:
+ with SCCallSiteSync(self.context) as css:
pairRDD = self.ctx._jvm.PairwiseRDD(
keyed._jrdd.rdd()).asJavaPairRDD()
partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,