aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py18
1 files changed, 12 insertions, 6 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index e4e56fff3b..49634252fd 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -61,9 +61,12 @@ class PySparkStreamingTestCase(unittest.TestCase):
def tearDownClass(cls):
cls.sc.stop()
# Clean up in the JVM just in case there has been some issues in Python API
- jSparkContextOption = SparkContext._jvm.SparkContext.get()
- if jSparkContextOption.nonEmpty():
- jSparkContextOption.get().stop()
+ try:
+ jSparkContextOption = SparkContext._jvm.SparkContext.get()
+ if jSparkContextOption.nonEmpty():
+ jSparkContextOption.get().stop()
+ except:
+ pass
def setUp(self):
self.ssc = StreamingContext(self.sc, self.duration)
@@ -72,9 +75,12 @@ class PySparkStreamingTestCase(unittest.TestCase):
if self.ssc is not None:
self.ssc.stop(False)
# Clean up in the JVM just in case there has been some issues in Python API
- jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive()
- if jStreamingContextOption.nonEmpty():
- jStreamingContextOption.get().stop(False)
+ try:
+ jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive()
+ if jStreamingContextOption.nonEmpty():
+ jStreamingContextOption.get().stop(False)
+ except:
+ pass
def wait_for(self, result, n):
start_time = time.time()