aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-09-22 14:26:45 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-09-22 14:26:45 -0700
commit3cdae0ff2f45643df7bc198cb48623526c7eb1a6 (patch)
tree655259b43c83af8e8f5bd1655d816deae0669c24
parent85d609cf25c1da2df3cd4f5d5aeaf3cbcf0d674c (diff)
downloadspark-3cdae0ff2f45643df7bc198cb48623526c7eb1a6.tar.gz
spark-3cdae0ff2f45643df7bc198cb48623526c7eb1a6.tar.bz2
spark-3cdae0ff2f45643df7bc198cb48623526c7eb1a6.zip
[SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is dead
## What changes were proposed in this pull request? When the Python process is dead, the JVM StreamingContext is still running. Hence we will see a lot of Py4jException before the JVM process exits. It's better to stop the JVM StreamingContext to avoid those annoying logs. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #15201 from zsxwing/stop-jvm-ssc.
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala33
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala2
3 files changed, 35 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index aeff4d7a98..46bfc60856 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -24,11 +24,14 @@ import java.util.{ArrayList => JArrayList, List => JList}
import scala.collection.JavaConverters._
import scala.language.existentials
+import py4j.Py4JException
+
import org.apache.spark.SparkException
import org.apache.spark.api.java._
+import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Duration, Interval, Time}
+import org.apache.spark.streaming.{Duration, Interval, StreamingContext, Time}
import org.apache.spark.streaming.api.java._
import org.apache.spark.streaming.dstream._
import org.apache.spark.util.Utils
@@ -157,7 +160,7 @@ private[python] object PythonTransformFunctionSerializer {
/**
* Helper functions, which are called from Python via Py4J.
*/
-private[python] object PythonDStream {
+private[streaming] object PythonDStream {
/**
* can not access PythonTransformFunctionSerializer.register() via Py4j
@@ -184,6 +187,32 @@ private[python] object PythonDStream {
rdds.asScala.foreach(queue.add)
queue
}
+
+ /**
+ * Stop [[StreamingContext]] if the Python process crashes (E.g., OOM) in case the user cannot
+ * stop it in the Python side.
+ */
+ def stopStreamingContextIfPythonProcessIsDead(e: Throwable): Unit = {
+ // These two special messages are from:
+ // scalastyle:off
+ // https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L218
+ // https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L340
+ // scalastyle:on
+ if (e.isInstanceOf[Py4JException] &&
+ ("Cannot obtain a new communication channel" == e.getMessage ||
+ "Error while obtaining a new communication channel" == e.getMessage)) {
+ // Start a new thread to stop StreamingContext to avoid deadlock.
+ new Thread("Stop-StreamingContext") with Logging {
+ setDaemon(true)
+
+ override def run(): Unit = {
+ logError(
+ "Cannot connect to Python process. It's probably dead. Stopping StreamingContext.", e)
+ StreamingContext.getActive().foreach(_.stop(stopSparkContext = false))
+ }
+ }.start()
+ }
+ }
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 10d64f98ac..8d83dc8a8f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -22,6 +22,7 @@ import scala.util.{Failure, Success, Try}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
+import org.apache.spark.streaming.api.python.PythonDStream
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils}
@@ -252,6 +253,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
+ PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index dbc50da21c..98e099354a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -28,6 +28,7 @@ import org.apache.spark.ExecutorAllocationClient
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.streaming._
+import org.apache.spark.streaming.api.python.PythonDStream
import org.apache.spark.streaming.ui.UIUtils
import org.apache.spark.util.{EventLoop, ThreadUtils}
@@ -217,6 +218,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
private def handleError(msg: String, e: Throwable) {
logError(msg, e)
ssc.waiter.notifyError(e)
+ PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
}
private class JobHandler(job: Job) extends Runnable with Logging {