aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorjyotiska <jyotiska123@gmail.com>2014-03-10 13:34:49 -0700
committerMatei Zaharia <matei@databricks.com>2014-03-10 13:34:49 -0700
commitf5518989b67a0941ca79368e73811895a5fa8669 (patch)
treeb62a0e4648e93f7c95e7fbec52a7bf62ed64a486 /python
parente1e09e0ef6b18e034727403d81747d899b042219 (diff)
downloadspark-f5518989b67a0941ca79368e73811895a5fa8669.tar.gz
spark-f5518989b67a0941ca79368e73811895a5fa8669.tar.bz2
spark-f5518989b67a0941ca79368e73811895a5fa8669.zip
[SPARK-972] Added detailed callsite info for ValueError in context.py (resubmitted)
Author: jyotiska <jyotiska123@gmail.com> Closes #34 from jyotiska/pyspark_code and squashes the following commits: c9439be [jyotiska] replaced dict with namedtuple a6bf4cd [jyotiska] added callsite info for context.py
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/context.py16
-rw-r--r--python/pyspark/rdd.py21
2 files changed, 29 insertions, 8 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index c9f42d3aac..bf2454fd7e 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -20,6 +20,7 @@ import shutil
import sys
from threading import Lock
from tempfile import NamedTemporaryFile
+from collections import namedtuple
from pyspark import accumulators
from pyspark.accumulators import Accumulator
@@ -29,6 +30,7 @@ from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
from pyspark.storagelevel import StorageLevel
+from pyspark import rdd
from pyspark.rdd import RDD
from py4j.java_collections import ListConverter
@@ -83,6 +85,11 @@ class SparkContext(object):
...
ValueError:...
"""
+ if rdd._extract_concise_traceback() is not None:
+ self._callsite = rdd._extract_concise_traceback()
+ else:
+ tempNamedTuple = namedtuple("Callsite", "function file linenum")
+ self._callsite = tempNamedTuple(function=None, file=None, linenum=None)
SparkContext._ensure_initialized(self, gateway=gateway)
self.environment = environment or {}
@@ -169,7 +176,14 @@ class SparkContext(object):
if instance:
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
- raise ValueError("Cannot run multiple SparkContexts at once")
+ currentMaster = SparkContext._active_spark_context.master
+ currentAppName = SparkContext._active_spark_context.appName
+ callsite = SparkContext._active_spark_context._callsite
+
+ # Raise error if there is already a running Spark context
+ raise ValueError("Cannot run multiple SparkContexts at once; existing SparkContext(app=%s, master=%s)" \
+ " created by %s at %s:%s " \
+ % (currentAppName, currentMaster, callsite.function, callsite.file, callsite.linenum))
else:
SparkContext._active_spark_context = instance
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 5ab27ff402..e1043ad564 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -18,6 +18,7 @@
from base64 import standard_b64encode as b64enc
import copy
from collections import defaultdict
+from collections import namedtuple
from itertools import chain, ifilter, imap
import operator
import os
@@ -42,12 +43,14 @@ from py4j.java_collections import ListConverter, MapConverter
__all__ = ["RDD"]
def _extract_concise_traceback():
+ """
+ This function returns the traceback info for a callsite, returns a dict
+ with function name, file name and line number
+ """
tb = traceback.extract_stack()
+ callsite = namedtuple("Callsite", "function file linenum")
if len(tb) == 0:
- return "I'm lost!"
- # HACK: This function is in a file called 'rdd.py' in the top level of
- # everything PySpark. Just trim off the directory name and assume
- # everything in that tree is PySpark guts.
+ return None
file, line, module, what = tb[len(tb) - 1]
sparkpath = os.path.dirname(file)
first_spark_frame = len(tb) - 1
@@ -58,16 +61,20 @@ def _extract_concise_traceback():
break
if first_spark_frame == 0:
file, line, fun, what = tb[0]
- return "%s at %s:%d" % (fun, file, line)
+ return callsite(function=fun, file=file, linenum=line)
sfile, sline, sfun, swhat = tb[first_spark_frame]
ufile, uline, ufun, uwhat = tb[first_spark_frame-1]
- return "%s at %s:%d" % (sfun, ufile, uline)
+ return callsite(function=sfun, file=ufile, linenum=uline)
_spark_stack_depth = 0
class _JavaStackTrace(object):
def __init__(self, sc):
- self._traceback = _extract_concise_traceback()
+ tb = _extract_concise_traceback()
+ if tb is not None:
+ self._traceback = "%s at %s:%s" % (tb.function, tb.file, tb.linenum)
+ else:
+ self._traceback = "Error! Could not extract traceback info"
self._context = sc
def __enter__(self):