From d29e2ef4cf43c7f7c5aa40d305cf02be44ce19e0 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 25 Nov 2015 11:47:21 -0800 Subject: [SPARK-11935][PYSPARK] Send the Python exceptions in TransformFunction and TransformFunctionSerializer to Java The Python exception track in TransformFunction and TransformFunctionSerializer is not sent back to Java. Py4j just throws a very general exception, which is hard to debug. This PRs adds `getFailure` method to get the failure message in Java side. Author: Shixiong Zhu Closes #9922 from zsxwing/SPARK-11935. --- python/pyspark/streaming/util.py | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) (limited to 'python/pyspark/streaming/util.py') diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py index 767c732eb9..c7f02bca2a 100644 --- a/python/pyspark/streaming/util.py +++ b/python/pyspark/streaming/util.py @@ -38,12 +38,15 @@ class TransformFunction(object): self.func = func self.deserializers = deserializers self._rdd_wrapper = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser) + self.failure = None def rdd_wrapper(self, func): self._rdd_wrapper = func return self def call(self, milliseconds, jrdds): + # Clear the failure + self.failure = None try: if self.ctx is None: self.ctx = SparkContext._active_spark_context @@ -62,9 +65,11 @@ class TransformFunction(object): r = self.func(t, *rdds) if r: return r._jrdd - except Exception: - traceback.print_exc() - raise + except: + self.failure = traceback.format_exc() + + def getLastFailure(self): + return self.failure def __repr__(self): return "TransformFunction(%s)" % self.func @@ -89,22 +94,28 @@ class TransformFunctionSerializer(object): self.serializer = serializer self.gateway = gateway or self.ctx._gateway self.gateway.jvm.PythonDStream.registerSerializer(self) + self.failure = None def dumps(self, id): + # Clear the failure + self.failure = None try: func = self.gateway.gateway_property.pool[id] return bytearray(self.serializer.dumps((func.func, func.deserializers))) - except Exception: - traceback.print_exc() - raise + except: + self.failure = traceback.format_exc() def loads(self, data): + # Clear the failure + self.failure = None try: f, deserializers = self.serializer.loads(bytes(data)) return TransformFunction(self.ctx, f, *deserializers) - except Exception: - traceback.print_exc() - raise + except: + self.failure = traceback.format_exc() + + def getLastFailure(self): + return self.failure def __repr__(self): return "TransformFunctionSerializer(%s)" % self.serializer -- cgit v1.2.3