aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorhuangzhaowei <carlmartinmax@gmail.com>2015-07-14 19:20:49 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-14 19:20:49 -0700
commitf957796c4b3c3cd95edfc64500a045f7e810ee87 (patch)
tree1b6e3679dcc865e271543d85a74272707b5a09b1 /streaming
parentcc57d705e732aefc2f3d3f438e84d71705b2eb65 (diff)
downloadspark-f957796c4b3c3cd95edfc64500a045f7e810ee87.tar.gz
spark-f957796c4b3c3cd95edfc64500a045f7e810ee87.tar.bz2
spark-f957796c4b3c3cd95edfc64500a045f7e810ee87.zip
[SPARK-8820] [STREAMING] Add a configuration to set checkpoint dir.
Add a configuration to set checkpoint directory for convenience to user. [Jira Address](https://issues.apache.org/jira/browse/SPARK-8820) Author: huangzhaowei <carlmartinmax@gmail.com> Closes #7218 from SaintBacchus/SPARK-8820 and squashes the following commits: d49fe4b [huangzhaowei] Rename the configuration name 66ea47c [huangzhaowei] Add the unit test. dd0acc1 [huangzhaowei] [SPARK-8820][Streaming] Add a configuration to set checkpoint dir.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala9
2 files changed, 11 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 6b78a82e68..92438f1b1f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -201,6 +201,8 @@ class StreamingContext private[streaming] (
private var shutdownHookRef: AnyRef = _
+ conf.getOption("spark.streaming.checkpoint.directory").foreach(checkpoint)
+
/**
* Return the associated Spark context
*/
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 289a159d89..f588cf5bc1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -115,6 +115,15 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
}
+ test("checkPoint from conf") {
+ val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
+
+ val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
+ myConf.set("spark.streaming.checkpoint.directory", checkpointDirectory)
+ val ssc = new StreamingContext(myConf, batchDuration)
+ assert(ssc.checkpointDir != null)
+ }
+
test("state matching") {
import StreamingContextState._
assert(INITIALIZED === INITIALIZED)