aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-02 19:07:22 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-02 19:07:22 -0800
commita1b8dd53e3474dae2c49b30bc9719c7f6b98c7cc (patch)
tree71a8001723003d3247c5773d90701a154ee94208 /streaming
parenta8729770f5dc944444b9996716c2f6a26485a819 (diff)
downloadspark-a1b8dd53e3474dae2c49b30bc9719c7f6b98c7cc.tar.gz
spark-a1b8dd53e3474dae2c49b30bc9719c7f6b98c7cc.tar.bz2
spark-a1b8dd53e3474dae2c49b30bc9719c7f6b98c7cc.zip
Added StreamingContext.getOrCreate to for automatic recovery, and added RecoverableNetworkWordCount example to use it.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala18
2 files changed, 26 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 7b343d2376..139e2c08b5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.RejectedExecutionException
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.Logging
+import org.apache.spark.{SparkException, Logging}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.MetadataCleaner
@@ -141,9 +141,15 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
private[streaming]
object CheckpointReader extends Logging {
+ def doesCheckpointExist(path: String): Boolean = {
+ val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"))
+ val fs = new Path(path).getFileSystem(new Configuration())
+ (attempts.count(p => fs.exists(p)) > 1)
+ }
+
def read(path: String): Checkpoint = {
val fs = new Path(path).getFileSystem(new Configuration())
- val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
+ val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"))
val compressionCodec = CompressionCodec.createCodec()
@@ -175,7 +181,7 @@ object CheckpointReader extends Logging {
}
})
- throw new Exception("Could not read checkpoint from path '" + path + "'")
+ throw new SparkException("Could not read checkpoint from path '" + path + "'")
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 41da028a3c..01b213ab42 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -570,12 +570,28 @@ class StreamingContext private (
}
-object StreamingContext {
+object StreamingContext extends Logging {
implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = {
new PairDStreamFunctions[K, V](stream)
}
+ def getOrCreate(
+ checkpointPath: String,
+ creatingFunc: () => StreamingContext,
+ createOnCheckpointError: Boolean = false
+ ): StreamingContext = {
+ if (CheckpointReader.doesCheckpointExist(checkpointPath)) {
+ logInfo("Creating streaming context from checkpoint file")
+ new StreamingContext(checkpointPath)
+ } else {
+ logInfo("Creating new streaming context")
+ val ssc = creatingFunc()
+ ssc.checkpoint(checkpointPath)
+ ssc
+ }
+ }
+
protected[streaming] def createNewSparkContext(
master: String,
appName: String,