aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-05 13:48:47 -0800
committerDavies Liu <davies.liu@gmail.com>2016-01-05 13:48:47 -0800
commit6cfe341ee89baa952929e91d33b9ecbca73a3ea0 (patch)
treec578445bdd6b5610875b189d6e4a8e7cc2efc20f /streaming
parentc26d174265f6b4682210fcc406e6603b4f7dc784 (diff)
downloadspark-6cfe341ee89baa952929e91d33b9ecbca73a3ea0.tar.gz
spark-6cfe341ee89baa952929e91d33b9ecbca73a3ea0.tar.bz2
spark-6cfe341ee89baa952929e91d33b9ecbca73a3ea0.zip
[SPARK-12511] [PYSPARK] [STREAMING] Make sure PythonDStream.registerSerializer is called only once
There is an issue that Py4J's PythonProxyHandler.finalize blocks forever. (https://github.com/bartdag/py4j/pull/184) Py4j will create a PythonProxyHandler in Java for "transformer_serializer" when calling "registerSerializer". If we call "registerSerializer" twice, the second PythonProxyHandler will override the first one, then the first one will be GCed and trigger "PythonProxyHandler.finalize". To avoid that, we should not call"registerSerializer" more than once, so that "PythonProxyHandler" in Java side won't be GCed. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10514 from zsxwing/SPARK-12511.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala12
1 files changed, 12 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index c4a10aa2dd..a5ab666975 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -902,3 +902,15 @@ object StreamingContext extends Logging {
result
}
}
+
+private class StreamingContextPythonHelper {
+
+ /**
+ * This is a private method only for Python to implement `getOrCreate`.
+ */
+ def tryRecoverFromCheckpoint(checkpointPath: String): Option[StreamingContext] = {
+ val checkpointOption = CheckpointReader.read(
+ checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, false)
+ checkpointOption.map(new StreamingContext(null, _, null))
+ }
+}