aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/util.py')
-rw-r--r--python/pyspark/streaming/util.py29
1 files changed, 20 insertions, 9 deletions
diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py
index 767c732eb9..c7f02bca2a 100644
--- a/python/pyspark/streaming/util.py
+++ b/python/pyspark/streaming/util.py
@@ -38,12 +38,15 @@ class TransformFunction(object):
self.func = func
self.deserializers = deserializers
self._rdd_wrapper = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser)
+ self.failure = None
def rdd_wrapper(self, func):
self._rdd_wrapper = func
return self
def call(self, milliseconds, jrdds):
+ # Clear the failure
+ self.failure = None
try:
if self.ctx is None:
self.ctx = SparkContext._active_spark_context
@@ -62,9 +65,11 @@ class TransformFunction(object):
r = self.func(t, *rdds)
if r:
return r._jrdd
- except Exception:
- traceback.print_exc()
- raise
+ except:
+ self.failure = traceback.format_exc()
+
+ def getLastFailure(self):
+ return self.failure
def __repr__(self):
return "TransformFunction(%s)" % self.func
@@ -89,22 +94,28 @@ class TransformFunctionSerializer(object):
self.serializer = serializer
self.gateway = gateway or self.ctx._gateway
self.gateway.jvm.PythonDStream.registerSerializer(self)
+ self.failure = None
def dumps(self, id):
+ # Clear the failure
+ self.failure = None
try:
func = self.gateway.gateway_property.pool[id]
return bytearray(self.serializer.dumps((func.func, func.deserializers)))
- except Exception:
- traceback.print_exc()
- raise
+ except:
+ self.failure = traceback.format_exc()
def loads(self, data):
+ # Clear the failure
+ self.failure = None
try:
f, deserializers = self.serializer.loads(bytes(data))
return TransformFunction(self.ctx, f, *deserializers)
- except Exception:
- traceback.print_exc()
- raise
+ except:
+ self.failure = traceback.format_exc()
+
+ def getLastFailure(self):
+ return self.failure
def __repr__(self):
return "TransformFunctionSerializer(%s)" % self.serializer