diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2015-11-25 11:47:21 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-11-25 11:47:21 -0800 |
commit | d29e2ef4cf43c7f7c5aa40d305cf02be44ce19e0 (patch) | |
tree | 8ec59422678c3c59da4eb08828d613595236fcfb /python/pyspark/streaming/util.py | |
parent | 88875d9413ec7d64a88d40857ffcf97b5853a7f2 (diff) | |
download | spark-d29e2ef4cf43c7f7c5aa40d305cf02be44ce19e0.tar.gz spark-d29e2ef4cf43c7f7c5aa40d305cf02be44ce19e0.tar.bz2 spark-d29e2ef4cf43c7f7c5aa40d305cf02be44ce19e0.zip |
[SPARK-11935][PYSPARK] Send the Python exceptions in TransformFunction and TransformFunctionSerializer to Java
The Python exception track in TransformFunction and TransformFunctionSerializer is not sent back to Java. Py4j just throws a very general exception, which is hard to debug.
This PRs adds `getFailure` method to get the failure message in Java side.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #9922 from zsxwing/SPARK-11935.
Diffstat (limited to 'python/pyspark/streaming/util.py')
-rw-r--r-- | python/pyspark/streaming/util.py | 29 |
1 files changed, 20 insertions, 9 deletions
diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py index 767c732eb9..c7f02bca2a 100644 --- a/python/pyspark/streaming/util.py +++ b/python/pyspark/streaming/util.py @@ -38,12 +38,15 @@ class TransformFunction(object): self.func = func self.deserializers = deserializers self._rdd_wrapper = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser) + self.failure = None def rdd_wrapper(self, func): self._rdd_wrapper = func return self def call(self, milliseconds, jrdds): + # Clear the failure + self.failure = None try: if self.ctx is None: self.ctx = SparkContext._active_spark_context @@ -62,9 +65,11 @@ class TransformFunction(object): r = self.func(t, *rdds) if r: return r._jrdd - except Exception: - traceback.print_exc() - raise + except: + self.failure = traceback.format_exc() + + def getLastFailure(self): + return self.failure def __repr__(self): return "TransformFunction(%s)" % self.func @@ -89,22 +94,28 @@ class TransformFunctionSerializer(object): self.serializer = serializer self.gateway = gateway or self.ctx._gateway self.gateway.jvm.PythonDStream.registerSerializer(self) + self.failure = None def dumps(self, id): + # Clear the failure + self.failure = None try: func = self.gateway.gateway_property.pool[id] return bytearray(self.serializer.dumps((func.func, func.deserializers))) - except Exception: - traceback.print_exc() - raise + except: + self.failure = traceback.format_exc() def loads(self, data): + # Clear the failure + self.failure = None try: f, deserializers = self.serializer.loads(bytes(data)) return TransformFunction(self.ctx, f, *deserializers) - except Exception: - traceback.print_exc() - raise + except: + self.failure = traceback.format_exc() + + def getLastFailure(self): + return self.failure def __repr__(self): return "TransformFunctionSerializer(%s)" % self.serializer |