aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/util.py
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-11-25 11:47:21 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-25 11:47:21 -0800
commitd29e2ef4cf43c7f7c5aa40d305cf02be44ce19e0 (patch)
tree8ec59422678c3c59da4eb08828d613595236fcfb /python/pyspark/streaming/util.py
parent88875d9413ec7d64a88d40857ffcf97b5853a7f2 (diff)
downloadspark-d29e2ef4cf43c7f7c5aa40d305cf02be44ce19e0.tar.gz
spark-d29e2ef4cf43c7f7c5aa40d305cf02be44ce19e0.tar.bz2
spark-d29e2ef4cf43c7f7c5aa40d305cf02be44ce19e0.zip
[SPARK-11935][PYSPARK] Send the Python exceptions in TransformFunction and TransformFunctionSerializer to Java
The Python exception track in TransformFunction and TransformFunctionSerializer is not sent back to Java. Py4j just throws a very general exception, which is hard to debug. This PRs adds `getFailure` method to get the failure message in Java side. Author: Shixiong Zhu <shixiong@databricks.com> Closes #9922 from zsxwing/SPARK-11935.
Diffstat (limited to 'python/pyspark/streaming/util.py')
-rw-r--r--python/pyspark/streaming/util.py29
1 files changed, 20 insertions, 9 deletions
diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py
index 767c732eb9..c7f02bca2a 100644
--- a/python/pyspark/streaming/util.py
+++ b/python/pyspark/streaming/util.py
@@ -38,12 +38,15 @@ class TransformFunction(object):
self.func = func
self.deserializers = deserializers
self._rdd_wrapper = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser)
+ self.failure = None
def rdd_wrapper(self, func):
self._rdd_wrapper = func
return self
def call(self, milliseconds, jrdds):
+ # Clear the failure
+ self.failure = None
try:
if self.ctx is None:
self.ctx = SparkContext._active_spark_context
@@ -62,9 +65,11 @@ class TransformFunction(object):
r = self.func(t, *rdds)
if r:
return r._jrdd
- except Exception:
- traceback.print_exc()
- raise
+ except:
+ self.failure = traceback.format_exc()
+
+ def getLastFailure(self):
+ return self.failure
def __repr__(self):
return "TransformFunction(%s)" % self.func
@@ -89,22 +94,28 @@ class TransformFunctionSerializer(object):
self.serializer = serializer
self.gateway = gateway or self.ctx._gateway
self.gateway.jvm.PythonDStream.registerSerializer(self)
+ self.failure = None
def dumps(self, id):
+ # Clear the failure
+ self.failure = None
try:
func = self.gateway.gateway_property.pool[id]
return bytearray(self.serializer.dumps((func.func, func.deserializers)))
- except Exception:
- traceback.print_exc()
- raise
+ except:
+ self.failure = traceback.format_exc()
def loads(self, data):
+ # Clear the failure
+ self.failure = None
try:
f, deserializers = self.serializer.loads(bytes(data))
return TransformFunction(self.ctx, f, *deserializers)
- except Exception:
- traceback.print_exc()
- raise
+ except:
+ self.failure = traceback.format_exc()
+
+ def getLastFailure(self):
+ return self.failure
def __repr__(self):
return "TransformFunctionSerializer(%s)" % self.serializer