From 760823d3937822ea4a6d6f476815442711c605fa Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 24 Dec 2013 23:20:34 -0800 Subject: Adding better option parsing --- .../spark/examples/DriverSubmissionTest.scala | 45 ++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala (limited to 'examples') diff --git a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala new file mode 100644 index 0000000000..9055ce7a39 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples + +import scala.collection.JavaConversions._ + +/** Prints out environmental information, sleeps, and then exits. Made to + * test driver submission in the standalone scheduler. */ +object DriverSubmissionTest { + def main(args: Array[String]) { + if (args.size < 1) { + println("Usage: DriverSubmissionTest ") + System.exit(0) + } + val numSecondsToSleep = args(0).toInt + + val env = System.getenv() + val properties = System.getProperties() + + println("Environment variables containing SPARK_TEST:") + env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println) + + println("System properties containing spark.test:") + properties.filter{case (k, v) => k.toString.contains("spark.test")}.foreach(println) + + for (i <- 1 until numSecondsToSleep) { + Thread.sleep(1000) + } + } +} -- cgit v1.2.3 From c9c0f745afcf00c17fa073e4ca6dd9433400be95 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 25 Dec 2013 00:54:34 -0800 Subject: Minor style clean-up --- .../apache/spark/deploy/client/DriverClient.scala | 2 ++ .../org/apache/spark/deploy/master/Master.scala | 3 +-- .../apache/spark/deploy/master/ui/IndexPage.scala | 22 +++++++++++----------- .../apache/spark/deploy/worker/DriverRunner.scala | 4 ++-- .../org/apache/spark/deploy/worker/Worker.scala | 3 --- .../spark/examples/DriverSubmissionTest.scala | 1 + 6 files changed, 17 insertions(+), 18 deletions(-) (limited to 'examples') diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala index 9c0a626204..28c851bcfe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala @@ -101,6 +101,8 @@ object DriverClient { def main(args: Array[String]) { val driverArgs = new DriverClientArguments(args) + // TODO: See if we can initialize akka so return messages are sent back using the same TCP + // flow. Else, this (sadly) requires the DriverClient be routable from the Master. val (actorSystem, boundPort) = AkkaUtils.createActorSystem( "driverClient", Utils.localHostName(), 0) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f5d6fdab5f..0528ef43a1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -185,7 +185,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act schedule() // TODO: It might be good to instead have the submission client poll the master to determine - // the current status of the driver. Since we may already want to expose this. + // the current status of the driver. For now it's simply "fire and forget". sender ! SubmitDriverResponse(true, "Driver successfully submitted") } @@ -611,7 +611,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } - /** Generate a new driver ID given a driver's submission date */ def newDriverId(submitDate: Date): String = { val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber) nextDriverNumber += 1 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 6a99d7ac02..3c6fca3780 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -106,20 +106,20 @@ private[spark] class IndexPage(parent: MasterWebUI) { -
-
-

Active Drivers

+
+
+

Active Drivers

- {activeDriversTable} -
+ {activeDriversTable}
+
-
-
-

Completed Drivers

- {completedDriversTable} -
-
; +
+
+

Completed Drivers

+ {completedDriversTable} +
+
; UIUtils.basicSparkPage(content, "Spark Master at " + state.uri) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index b030d6041a..28d4297299 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -32,7 +32,7 @@ import org.apache.spark.deploy.master.DriverState import org.apache.spark.util.Utils /** - * Manages the execution of one driver process. + * Manages the execution of one driver, including automatically restarting the driver on failure. */ private[spark] class DriverRunner( val driverId: String, @@ -133,7 +133,7 @@ private[spark] class DriverRunner( localJarFilename } - /** Continue launching the supplied command until it exits zero. */ + /** Continue launching the supplied command until it exits zero or is killed. */ def runCommandWithRetry(command: Seq[String], envVars: Seq[(String, String)], baseDir: File) = { // Time to wait between submission retries. var waitSeconds = 1 diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index b6a84fc371..42c28cf22d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -254,17 +254,14 @@ private[spark] class Worker( case KillDriver(driverId) => { logInfo(s"Asked to kill driver $driverId") - drivers.find(_._1 == driverId) match { case Some((id, runner)) => runner.kill() case None => logError(s"Asked to kill unknown driver $driverId") } - } - case DriverStateChanged(driverId, state, exception) => { state match { case DriverState.FAILED => diff --git a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala index 9055ce7a39..65251e9319 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala @@ -39,6 +39,7 @@ object DriverSubmissionTest { properties.filter{case (k, v) => k.toString.contains("spark.test")}.foreach(println) for (i <- 1 until numSecondsToSleep) { + println(s"Alive for $i out of $numSecondsToSleep seconds") Thread.sleep(1000) } } -- cgit v1.2.3 From a1b8dd53e3474dae2c49b30bc9719c7f6b98c7cc Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 2 Jan 2014 19:07:22 -0800 Subject: Added StreamingContext.getOrCreate to for automatic recovery, and added RecoverableNetworkWordCount example to use it. --- .../cluster/SparkDeploySchedulerBackend.scala | 2 +- .../examples/RecoverableNetworkWordCount.scala | 58 ++++++++++++++++++++++ .../org/apache/spark/streaming/Checkpoint.scala | 12 +++-- .../apache/spark/streaming/StreamingContext.scala | 18 ++++++- 4 files changed, 85 insertions(+), 5 deletions(-) create mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala (limited to 'examples') diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 921b887a89..0615f7b565 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -47,7 +47,7 @@ private[spark] class SparkDeploySchedulerBackend( val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) - val args = Seq(driverUrl, "{{WORKER_URL}}", "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}") + val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") val command = Command( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome().getOrElse(null) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala new file mode 100644 index 0000000000..0e5f39f772 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala @@ -0,0 +1,58 @@ +package org.apache.spark.streaming.examples + +import org.apache.spark.streaming.{Time, Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.util.IntParam +import java.io.File +import org.apache.spark.rdd.RDD +import com.google.common.io.Files +import java.nio.charset.Charset + +object RecoverableNetworkWordCount { + + def createContext(master: String, ip: String, port: Int, outputPath: String) = { + + val outputFile = new File(outputPath) + if (outputFile.exists()) outputFile.delete() + + // Create the context with a 1 second batch size + println("Creating new context") + val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + + // Create a NetworkInputDStream on target ip:port and count the + // words in input stream of \n delimited test (eg. generated by 'nc') + val lines = ssc.socketTextStream(ip, port) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.foreach((rdd: RDD[(String, Int)], time: Time) => { + val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]") + println(counts) + println("Appending to " + outputFile.getAbsolutePath) + Files.append(counts + "\n", outputFile, Charset.defaultCharset()) + }) + ssc + } + + def main(args: Array[String]) { + if (args.length != 5) { + System.err.println("You arguments were " + args.mkString("[", ", ", "]")) + System.err.println( + """ + |Usage: RecoverableNetworkWordCount + | + |In local mode, should be 'local[n]' with n > 1 + |Both and should be full paths + """.stripMargin + ) + System.exit(1) + } + val Array(master, ip, IntParam(port), checkpointDirectory, outputPath) = args + val ssc = StreamingContext.getOrCreate(checkpointDirectory, + () => { + createContext(master, ip, port, outputPath) + }) + ssc.start() + + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 7b343d2376..139e2c08b5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -24,7 +24,7 @@ import java.util.concurrent.RejectedExecutionException import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration -import org.apache.spark.Logging +import org.apache.spark.{SparkException, Logging} import org.apache.spark.io.CompressionCodec import org.apache.spark.util.MetadataCleaner @@ -141,9 +141,15 @@ class CheckpointWriter(checkpointDir: String) extends Logging { private[streaming] object CheckpointReader extends Logging { + def doesCheckpointExist(path: String): Boolean = { + val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk")) + val fs = new Path(path).getFileSystem(new Configuration()) + (attempts.count(p => fs.exists(p)) > 1) + } + def read(path: String): Checkpoint = { val fs = new Path(path).getFileSystem(new Configuration()) - val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk")) + val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk")) val compressionCodec = CompressionCodec.createCodec() @@ -175,7 +181,7 @@ object CheckpointReader extends Logging { } }) - throw new Exception("Could not read checkpoint from path '" + path + "'") + throw new SparkException("Could not read checkpoint from path '" + path + "'") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 41da028a3c..01b213ab42 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -570,12 +570,28 @@ class StreamingContext private ( } -object StreamingContext { +object StreamingContext extends Logging { implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = { new PairDStreamFunctions[K, V](stream) } + def getOrCreate( + checkpointPath: String, + creatingFunc: () => StreamingContext, + createOnCheckpointError: Boolean = false + ): StreamingContext = { + if (CheckpointReader.doesCheckpointExist(checkpointPath)) { + logInfo("Creating streaming context from checkpoint file") + new StreamingContext(checkpointPath) + } else { + logInfo("Creating new streaming context") + val ssc = creatingFunc() + ssc.checkpoint(checkpointPath) + ssc + } + } + protected[streaming] def createNewSparkContext( master: String, appName: String, -- cgit v1.2.3 From 35f80da21aaea8c6fde089754ef3a86bc78e0428 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 8 Jan 2014 20:54:24 -0800 Subject: Set default logging to WARN for Spark streaming examples. This programatically sets the log level to WARN by default for streaming tests. If the user has already specified a log4j.properties file, the user's file will take precedence over this default. --- .../streaming/examples/JavaFlumeEventCount.java | 2 ++ .../streaming/examples/JavaKafkaWordCount.java | 2 ++ .../streaming/examples/JavaNetworkWordCount.java | 2 ++ .../spark/streaming/examples/JavaQueueStream.java | 2 ++ .../spark/streaming/examples/ActorWordCount.scala | 12 +++++------- .../spark/streaming/examples/FlumeEventCount.scala | 4 +++- .../spark/streaming/examples/HdfsWordCount.scala | 3 ++- .../spark/streaming/examples/KafkaWordCount.scala | 5 +++-- .../spark/streaming/examples/MQTTWordCount.scala | 8 +++----- .../spark/streaming/examples/NetworkWordCount.scala | 2 ++ .../spark/streaming/examples/QueueStream.scala | 8 +++++--- .../spark/streaming/examples/RawNetworkGrep.scala | 5 +++-- .../examples/StatefulNetworkWordCount.scala | 2 ++ .../streaming/examples/StreamingExamples.scala | 21 +++++++++++++++++++++ .../streaming/examples/TwitterAlgebirdCMS.scala | 10 ++++++---- .../streaming/examples/TwitterAlgebirdHLL.scala | 9 ++++++--- .../streaming/examples/TwitterPopularTags.scala | 2 ++ .../spark/streaming/examples/ZeroMQWordCount.scala | 1 + .../examples/clickstream/PageViewStream.scala | 4 +++- .../scala/org/apache/spark/streaming/DStream.scala | 2 +- .../org/apache/spark/streaming/DStreamGraph.scala | 8 ++++---- 21 files changed, 80 insertions(+), 34 deletions(-) create mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala (limited to 'examples') diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java index b11cfa667e..7b5a243e26 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java @@ -47,6 +47,8 @@ public final class JavaFlumeEventCount { System.exit(1); } + StreamingExamples.setStreamingLogLevels(); + String master = args[0]; String host = args[1]; int port = Integer.parseInt(args[2]); diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java index 16b8a948e6..04f62ee204 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -59,6 +59,8 @@ public final class JavaKafkaWordCount { System.exit(1); } + StreamingExamples.setStreamingLogLevels(); + // Create the context with a 1 second batch size JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount", new Duration(2000), System.getenv("SPARK_HOME"), diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java index 1e2efd359c..c37b0cacc9 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java @@ -53,6 +53,8 @@ public final class JavaNetworkWordCount { System.exit(1); } + StreamingExamples.setStreamingLogLevels(); + // Create the context with a 1 second batch size JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount", new Duration(1000), System.getenv("SPARK_HOME"), diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java index e05551ab83..7ef9c6c8f4 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java @@ -41,6 +41,8 @@ public final class JavaQueueStream { System.exit(1); } + StreamingExamples.setStreamingLogLevels(); + // Create the context JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000), System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class)); diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala index 4e0058cd70..57e1b1f806 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala @@ -18,17 +18,13 @@ package org.apache.spark.streaming.examples import scala.collection.mutable.LinkedList -import scala.util.Random import scala.reflect.ClassTag +import scala.util.Random -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Props -import akka.actor.actorRef2Scala +import akka.actor.{Actor, ActorRef, Props, actorRef2Scala} import org.apache.spark.SparkConf -import org.apache.spark.streaming.Seconds -import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions import org.apache.spark.streaming.receivers.Receiver import org.apache.spark.util.AkkaUtils @@ -147,6 +143,8 @@ object ActorWordCount { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val Seq(master, host, port) = args.toSeq // Create the context and set the batch size diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala index ae3709b3d9..a59be7899d 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala @@ -17,10 +17,10 @@ package org.apache.spark.streaming.examples -import org.apache.spark.util.IntParam import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.flume._ +import org.apache.spark.util.IntParam /** * Produces a count of events received from Flume. @@ -44,6 +44,8 @@ object FlumeEventCount { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val Array(master, host, IntParam(port)) = args val batchInterval = Milliseconds(2000) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala index ea6ea67419..704b315ef8 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala @@ -20,7 +20,6 @@ package org.apache.spark.streaming.examples import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ - /** * Counts words in new text files created in the given directory * Usage: HdfsWordCount @@ -38,6 +37,8 @@ object HdfsWordCount { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + // Create the context val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 31a94bd224..4a3d81c09a 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -23,8 +23,8 @@ import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.util.RawTextHelper._ import org.apache.spark.streaming.kafka._ +import org.apache.spark.streaming.util.RawTextHelper._ /** * Consumes messages from one or more topics in Kafka and does wordcount. @@ -40,12 +40,13 @@ import org.apache.spark.streaming.kafka._ */ object KafkaWordCount { def main(args: Array[String]) { - if (args.length < 5) { System.err.println("Usage: KafkaWordCount ") System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val Array(master, zkQuorum, group, topics, numThreads) = args val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2), diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index 325290b66f..78b49fdcf1 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -17,12 +17,8 @@ package org.apache.spark.streaming.examples -import org.eclipse.paho.client.mqttv3.MqttClient -import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic} import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -import org.eclipse.paho.client.mqttv3.MqttException -import org.eclipse.paho.client.mqttv3.MqttMessage -import org.eclipse.paho.client.mqttv3.MqttTopic import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} @@ -43,6 +39,8 @@ object MQTTPublisher { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val Seq(brokerUrl, topic) = args.toSeq try { diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala index 6a32c75373..c12139b3ec 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala @@ -39,6 +39,8 @@ object NetworkWordCount { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + // Create the context with a 1 second batch size val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala index 9d640e716b..4d4968ba6a 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala @@ -17,12 +17,12 @@ package org.apache.spark.streaming.examples +import scala.collection.mutable.SynchronizedQueue + import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ -import scala.collection.mutable.SynchronizedQueue - object QueueStream { def main(args: Array[String]) { @@ -30,7 +30,9 @@ object QueueStream { System.err.println("Usage: QueueStream ") System.exit(1) } - + + StreamingExamples.setStreamingLogLevels() + // Create the context val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala index c0706d0724..3d08d86567 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala @@ -17,11 +17,10 @@ package org.apache.spark.streaming.examples -import org.apache.spark.util.IntParam import org.apache.spark.storage.StorageLevel - import org.apache.spark.streaming._ import org.apache.spark.streaming.util.RawTextHelper +import org.apache.spark.util.IntParam /** * Receives text from multiple rawNetworkStreams and counts how many '\n' delimited @@ -45,6 +44,8 @@ object RawNetworkGrep { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args // Create the context diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala index 002db57d59..1183eba846 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala @@ -39,6 +39,8 @@ object StatefulNetworkWordCount { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala new file mode 100644 index 0000000000..d41d84a980 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala @@ -0,0 +1,21 @@ +package org.apache.spark.streaming.examples + +import org.apache.spark.Logging + +import org.apache.log4j.{Level, Logger} + +/** Utility functions for Spark Streaming examples. */ +object StreamingExamples extends Logging { + + /** Set reasonable logging levels for streaming if the user has not configured log4j. */ + def setStreamingLogLevels() { + val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements + if (!log4jInitialized) { + // We first log something to initialize Spark's default logging, then we override the + // logging level. + logInfo("Setting log level to [WARN] for streaming example." + + " To override add a custom log4j.properties to the classpath.") + Logger.getRootLogger.setLevel(Level.WARN) + } + } +} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index 3ccdc908e2..80b5a98b14 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -17,12 +17,12 @@ package org.apache.spark.streaming.examples -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.storage.StorageLevel import com.twitter.algebird._ -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.SparkContext._ +import org.apache.spark.SparkContext._ +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.twitter._ /** @@ -51,6 +51,8 @@ object TwitterAlgebirdCMS { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + // CMS parameters val DELTA = 1E-3 val EPS = 0.01 diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala index c7e83e76b0..cb2f2c51a0 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -17,10 +17,11 @@ package org.apache.spark.streaming.examples -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.storage.StorageLevel -import com.twitter.algebird.HyperLogLog._ import com.twitter.algebird.HyperLogLogMonoid +import com.twitter.algebird.HyperLogLog._ + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.twitter._ /** @@ -44,6 +45,8 @@ object TwitterAlgebirdHLL { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + /** Bit size parameter for HyperLogLog, trades off accuracy vs size */ val BIT_SIZE = 12 val (master, filters) = (args.head, args.tail) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala index e2b0418d55..16c10feaba 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala @@ -36,6 +36,8 @@ object TwitterPopularTags { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val (master, filters) = (args.head, args.tail) val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2), diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala index 03902ec353..12d2a1084f 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala @@ -76,6 +76,7 @@ object ZeroMQWordCount { "In local mode, should be 'local[n]' with n > 1") System.exit(1) } + StreamingExamples.setStreamingLogLevels() val Seq(master, url, topic) = args.toSeq // Create the context and set the batch size diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala index 807af199f4..da6b67bcce 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala @@ -17,9 +17,10 @@ package org.apache.spark.streaming.examples.clickstream +import org.apache.spark.SparkContext._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.examples.StreamingExamples /** Analyses a streaming dataset of web page views. This class demonstrates several types of * operators available in Spark streaming. @@ -36,6 +37,7 @@ object PageViewStream { " errorRatePerZipCode, activeUserCount, popularUsersSeen") System.exit(1) } + StreamingExamples.setStreamingLogLevels() val metric = args(0) val host = args(1) val port = args(2).toInt diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index 00671ba520..837f1ea1d8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -333,7 +333,7 @@ abstract class DStream[T: ClassTag] ( var numForgotten = 0 val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) generatedRDDs --= oldRDDs.keys - logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " + + logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " + (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) dependencies.foreach(_.clearOldMetadata(time)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index a09b891956..62d07b22c6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -105,18 +105,18 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def generateJobs(time: Time): Seq[Job] = { this.synchronized { - logInfo("Generating jobs for time " + time) + logDebug("Generating jobs for time " + time) val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time)) - logInfo("Generated " + jobs.length + " jobs for time " + time) + logDebug("Generated " + jobs.length + " jobs for time " + time) jobs } } def clearOldMetadata(time: Time) { this.synchronized { - logInfo("Clearing old metadata for time " + time) + logDebug("Clearing old metadata for time " + time) outputStreams.foreach(_.clearOldMetadata(time)) - logInfo("Cleared old metadata for time " + time) + logDebug("Cleared old metadata for time " + time) } } -- cgit v1.2.3 From 6f713e2a3e56185368b66fb087637dec112a1f5d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 9 Jan 2014 13:42:04 -0800 Subject: Changed the way StreamingContext finds and reads checkpoint files, and added JavaStreamingContext.getOrCreate. --- conf/slaves | 6 +- .../main/scala/org/apache/spark/SparkContext.scala | 2 +- .../streaming/examples/JavaNetworkWordCount.java | 7 +- .../streaming/examples/NetworkWordCount.scala | 2 +- .../examples/RecoverableNetworkWordCount.scala | 43 ++++++++-- .../org/apache/spark/streaming/Checkpoint.scala | 98 ++++++++++++---------- .../spark/streaming/DStreamCheckpointData.scala | 57 ++++--------- .../org/apache/spark/streaming/DStreamGraph.scala | 4 +- .../apache/spark/streaming/StreamingContext.scala | 64 ++++++++++---- .../streaming/api/java/JavaStreamingContext.scala | 96 +++++++++++++++++++-- 10 files changed, 254 insertions(+), 125 deletions(-) (limited to 'examples') diff --git a/conf/slaves b/conf/slaves index 30ea300e07..2fbb50c4a8 100644 --- a/conf/slaves +++ b/conf/slaves @@ -1,5 +1 @@ -ec2-54-221-59-252.compute-1.amazonaws.com -ec2-67-202-26-243.compute-1.amazonaws.com -ec2-23-22-220-97.compute-1.amazonaws.com -ec2-50-16-98-100.compute-1.amazonaws.com -ec2-54-234-164-206.compute-1.amazonaws.com +localhost diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7514ce58fb..304e85f1c0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -29,7 +29,7 @@ import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{LocalFileSystem, Path} import org.apache.hadoop.io.ArrayWritable import org.apache.hadoop.io.BooleanWritable import org.apache.hadoop.io.BytesWritable diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java index def87c199b..d8d6046914 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java @@ -41,17 +41,17 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; public class JavaNetworkWordCount { public static void main(String[] args) { if (args.length < 3) { - System.err.println("Usage: NetworkWordCount \n" + + System.err.println("Usage: JavaNetworkWordCount \n" + "In local mode, should be 'local[n]' with n > 1"); System.exit(1); } // Create the context with a 1 second batch size - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount", + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount", new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited test (eg. generated by 'nc') + // words in input stream of \n delimited text (eg. generated by 'nc') JavaDStream lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2])); JavaDStream words = lines.flatMap(new FlatMapFunction() { @Override @@ -74,6 +74,5 @@ public class JavaNetworkWordCount { wordCounts.print(); ssc.start(); - } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala index e2487dca5f..5ad4875980 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala @@ -44,7 +44,7 @@ object NetworkWordCount { System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited test (eg. generated by 'nc') + // words in input stream of \n delimited text (eg. generated by 'nc') val lines = ssc.socketTextStream(args(1), args(2).toInt) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala index 0e5f39f772..739f805e87 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming.examples import org.apache.spark.streaming.{Time, Seconds, StreamingContext} @@ -8,20 +25,37 @@ import org.apache.spark.rdd.RDD import com.google.common.io.Files import java.nio.charset.Charset +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Usage: NetworkWordCount + * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + * and describe the TCP server that Spark Streaming would connect to receive data. + * directory in a Hadoop compatible file system to which checkpoint + * data will be saved to; this must be a fault-tolerant file system + * like HDFS for the system to recover from driver failures + * (x, 1)).reduceByKey(_ + _) @@ -39,10 +73,10 @@ object RecoverableNetworkWordCount { System.err.println("You arguments were " + args.mkString("[", ", ", "]")) System.err.println( """ - |Usage: RecoverableNetworkWordCount + |Usage: RecoverableNetworkWordCount | |In local mode, should be 'local[n]' with n > 1 - |Both and should be full paths + |Both and should be full paths """.stripMargin ) System.exit(1) @@ -53,6 +87,5 @@ object RecoverableNetworkWordCount { createContext(master, ip, port, outputPath) }) ssc.start() - } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 155d5bc02e..a32e4852c5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -82,22 +82,28 @@ class CheckpointWriter(jobGenerator: JobGenerator, checkpointDir: String, hadoop attempts += 1 try { logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") - // This is inherently thread unsafe, so alleviating it by writing to '.new' and + // This is inherently thread unsafe, so alleviating it by writing to '.next' and // then moving it to the final file val fos = fs.create(writeFile) fos.write(bytes) fos.close() + + // Back up existing checkpoint if it exists if (fs.exists(file) && fs.rename(file, bakFile)) { logDebug("Moved existing checkpoint file to " + bakFile) } - // paranoia - fs.delete(file, false) - fs.rename(writeFile, file) - - val finishTime = System.currentTimeMillis() - logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + - "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") - jobGenerator.onCheckpointCompletion(checkpointTime) + fs.delete(file, false) // paranoia + + // Rename temp written file to the right location + if (fs.rename(writeFile, file)) { + val finishTime = System.currentTimeMillis() + logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms") + jobGenerator.onCheckpointCompletion(checkpointTime) + } else { + throw new SparkException("Failed to rename checkpoint file from " + + writeFile + " to " + file) + } return } catch { case ioe: IOException => @@ -154,47 +160,47 @@ class CheckpointWriter(jobGenerator: JobGenerator, checkpointDir: String, hadoop private[streaming] object CheckpointReader extends Logging { - def doesCheckpointExist(path: String): Boolean = { - val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk")) - val fs = new Path(path).getFileSystem(new Configuration()) - (attempts.count(p => fs.exists(p)) > 1) - } + private val graphFileNames = Seq("graph", "graph.bk") + + def read(checkpointDir: String, hadoopConf: Configuration): Option[Checkpoint] = { + val checkpointPath = new Path(checkpointDir) + def fs = checkpointPath.getFileSystem(hadoopConf) + val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists) + + // Log the file listing if graph checkpoint file was not found + if (existingFiles.isEmpty) { + logInfo("Could not find graph file in " + checkpointDir + ", which contains the files:\n" + + fs.listStatus(checkpointPath).mkString("\n")) + return None + } + logInfo("Checkpoint files found: " + existingFiles.mkString(",")) - def read(path: String): Checkpoint = { - val fs = new Path(path).getFileSystem(new Configuration()) - val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk")) val compressionCodec = CompressionCodec.createCodec() - - attempts.foreach(file => { - if (fs.exists(file)) { - logInfo("Attempting to load checkpoint from file '" + file + "'") - try { - val fis = fs.open(file) - // ObjectInputStream uses the last defined user-defined class loader in the stack - // to find classes, which maybe the wrong class loader. Hence, a inherited version - // of ObjectInputStream is used to explicitly use the current thread's default class - // loader to find and load classes. This is a well know Java issue and has popped up - // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) - val zis = compressionCodec.compressedInputStream(fis) - val ois = new ObjectInputStreamWithLoader(zis, - Thread.currentThread().getContextClassLoader) - val cp = ois.readObject.asInstanceOf[Checkpoint] - ois.close() - fs.close() - cp.validate() - logInfo("Checkpoint successfully loaded from file '" + file + "'") - logInfo("Checkpoint was generated at time " + cp.checkpointTime) - return cp - } catch { - case e: Exception => - logError("Error loading checkpoint from file '" + file + "'", e) - } - } else { - logWarning("Could not read checkpoint from file '" + file + "' as it does not exist") + existingFiles.foreach(file => { + logInfo("Attempting to load checkpoint from file '" + file + "'") + try { + val fis = fs.open(file) + // ObjectInputStream uses the last defined user-defined class loader in the stack + // to find classes, which maybe the wrong class loader. Hence, a inherited version + // of ObjectInputStream is used to explicitly use the current thread's default class + // loader to find and load classes. This is a well know Java issue and has popped up + // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) + val zis = compressionCodec.compressedInputStream(fis) + val ois = new ObjectInputStreamWithLoader(zis, + Thread.currentThread().getContextClassLoader) + val cp = ois.readObject.asInstanceOf[Checkpoint] + ois.close() + fs.close() + cp.validate() + logInfo("Checkpoint successfully loaded from file '" + file + "'") + logInfo("Checkpoint was generated at time " + cp.checkpointTime) + return Some(cp) + } catch { + case e: Exception => + logWarning("Error reading checkpoint from file '" + file + "'", e) } - }) - throw new SparkException("Could not read checkpoint from path '" + path + "'") + throw new SparkException("Failed to read checkpoint from directory '" + checkpointDir + "'") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala index e0567a1c19..1081d3c807 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala @@ -27,18 +27,16 @@ import org.apache.spark.Logging import java.io.{ObjectInputStream, IOException} - private[streaming] class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) extends Serializable with Logging { protected val data = new HashMap[Time, AnyRef]() - @transient private var allCheckpointFiles = new HashMap[Time, String] - @transient private var timeToLastCheckpointFileTime = new HashMap[Time, Time] + // Mapping of the batch time to the checkpointed RDD file of that time + @transient private var timeToCheckpointFile = new HashMap[Time, String] + // Mapping of the batch time to the time of the oldest checkpointed RDD in that batch's checkpoint data + @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] @transient private var fileSystem : FileSystem = null - - //@transient private var lastCheckpointFiles: HashMap[Time, String] = null - protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] /** @@ -51,17 +49,14 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) // Get the checkpointed RDDs from the generated RDDs val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) .map(x => (x._1, x._2.getCheckpointFile.get)) + logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) - logInfo("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) - // Make a copy of the existing checkpoint data (checkpointed RDDs) - // lastCheckpointFiles = checkpointFiles.clone() - - // If the new checkpoint data has checkpoints then replace existing with the new one + // Add the checkpoint files to the data to be serialized if (!currentCheckpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles - allCheckpointFiles ++= currentCheckpointFiles - timeToLastCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) + timeToCheckpointFile ++= currentCheckpointFiles + timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) } } @@ -71,32 +66,10 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) * implementation, cleans up old checkpoint files. */ def cleanup(time: Time) { - /* - // If there is at least on checkpoint file in the current checkpoint files, - // then delete the old checkpoint files. - if (checkpointFiles.size > 0 && lastCheckpointFiles != null) { - (lastCheckpointFiles -- checkpointFiles.keySet).foreach { - case (time, file) => { - try { - val path = new Path(file) - if (fileSystem == null) { - fileSystem = path.getFileSystem(new Configuration()) - } - fileSystem.delete(path, true) - logInfo("Deleted checkpoint file '" + file + "' for time " + time) - } catch { - case e: Exception => - logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) - } - } - } - } - */ - timeToLastCheckpointFileTime.remove(time) match { + timeToOldestCheckpointFileTime.remove(time) match { case Some(lastCheckpointFileTime) => - logInfo("Deleting all files before " + time) - val filesToDelete = allCheckpointFiles.filter(_._1 < lastCheckpointFileTime) - logInfo("Files to delete:\n" + filesToDelete.mkString(",")) + val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime) + logDebug("Files to delete:\n" + filesToDelete.mkString(",")) filesToDelete.foreach { case (time, file) => try { @@ -105,11 +78,12 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) } fileSystem.delete(path, true) - allCheckpointFiles -= time + timeToCheckpointFile -= time logInfo("Deleted checkpoint file '" + file + "' for time " + time) } catch { case e: Exception => logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) + fileSystem = null } } case None => @@ -138,7 +112,8 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) @throws(classOf[IOException]) private def readObject(ois: ObjectInputStream) { - timeToLastCheckpointFileTime = new HashMap[Time, Time] - allCheckpointFiles = new HashMap[Time, String] + ois.defaultReadObject() + timeToOldestCheckpointFileTime = new HashMap[Time, Time] + timeToCheckpointFile = new HashMap[Time, String] } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index bfedef2e4e..34919d315c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -130,11 +130,11 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { } def clearCheckpointData(time: Time) { - logInfo("Restoring checkpoint data") + logInfo("Clearing checkpoint data for time " + time) this.synchronized { outputStreams.foreach(_.clearCheckpointData(time)) } - logInfo("Restored checkpoint data") + logInfo("Cleared checkpoint data for time " + time) } def restoreCheckpointData() { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 59d2d546e6..30deba417e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -45,10 +45,11 @@ import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{LocalFileSystem, Path} import twitter4j.Status import twitter4j.auth.Authorization +import org.apache.hadoop.conf.Configuration /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -89,10 +90,12 @@ class StreamingContext private ( /** * Re-create a StreamingContext from a checkpoint file. - * @param path Path either to the directory that was specified as the checkpoint directory, or - * to the checkpoint file 'graph' or 'graph.bk'. + * @param path Path to the directory that was specified as the checkpoint directory + * @param hadoopConf Optional, configuration object if necessary for reading from + * HDFS compatible filesystems */ - def this(path: String) = this(null, CheckpointReader.read(path), null) + def this(path: String, hadoopConf: Configuration = new Configuration) = + this(null, CheckpointReader.read(path, hadoopConf).get, null) initLogging() @@ -170,8 +173,9 @@ class StreamingContext private ( /** * Set the context to periodically checkpoint the DStream operations for master - * fault-tolerance. The graph will be checkpointed every batch interval. - * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored + * fault-tolerance. + * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored. + * Note that this must be a fault-tolerant file system like HDFS for */ def checkpoint(directory: String) { if (directory != null) { @@ -577,6 +581,10 @@ class StreamingContext private ( } } +/** + * StreamingContext object contains a number of utility functions related to the + * StreamingContext class. + */ object StreamingContext extends Logging { @@ -584,19 +592,45 @@ object StreamingContext extends Logging { new PairDStreamFunctions[K, V](stream) } + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the StreamingContext + * will be created by called the provided `creatingFunc`. + * + * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program + * @param creatingFunc Function to create a new StreamingContext + * @param hadoopConf Optional Hadoop configuration if necessary for reading from the + * file system + * @param createOnError Optional, whether to create a new StreamingContext if there is an + * error in reading checkpoint data. By default, an exception will be + * thrown on error. + */ def getOrCreate( checkpointPath: String, creatingFunc: () => StreamingContext, - createOnCheckpointError: Boolean = false + hadoopConf: Configuration = new Configuration(), + createOnError: Boolean = false ): StreamingContext = { - if (CheckpointReader.doesCheckpointExist(checkpointPath)) { - logInfo("Creating streaming context from checkpoint file") - new StreamingContext(checkpointPath) - } else { - logInfo("Creating new streaming context") - val ssc = creatingFunc() - ssc.checkpoint(checkpointPath) - ssc + + try { + CheckpointReader.read(checkpointPath, hadoopConf) match { + case Some(checkpoint) => + return new StreamingContext(null, checkpoint, null) + case None => + logInfo("Creating new StreamingContext") + return creatingFunc() + } + } catch { + case e: Exception => + if (createOnError) { + logWarning("Error reading checkpoint", e) + logInfo("Creating new StreamingContext") + return creatingFunc() + } else { + logError("Error reading checkpoint", e) + throw e + } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index aad0d931e7..f38d145317 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -40,6 +40,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.scheduler.StreamingListener +import org.apache.hadoop.conf.Configuration /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -125,10 +126,16 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Re-creates a StreamingContext from a checkpoint file. - * @param path Path either to the directory that was specified as the checkpoint directory, or - * to the checkpoint file 'graph' or 'graph.bk'. + * @param path Path to the directory that was specified as the checkpoint directory */ - def this(path: String) = this (new StreamingContext(path)) + def this(path: String) = this(new StreamingContext(path)) + + /** + * Re-creates a StreamingContext from a checkpoint file. + * @param path Path to the directory that was specified as the checkpoint directory + * + */ + def this(path: String, hadoopConf: Configuration) = this(new StreamingContext(path, hadoopConf)) /** The underlying SparkContext */ val sc: JavaSparkContext = new JavaSparkContext(ssc.sc) @@ -699,13 +706,92 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Starts the execution of the streams. + * Start the execution of the streams. */ def start() = ssc.start() /** - * Sstops the execution of the streams. + * Stop the execution of the streams. */ def stop() = ssc.stop() +} + +/** + * JavaStreamingContext object contains a number of static utility functions. + */ +object JavaStreamingContext { + + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the StreamingContext + * will be created by called the provided `creatingFunc`. + * + * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program + * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext + */ + def getOrCreate( + checkpointPath: String, + factory: JavaStreamingContextFactory + ): JavaStreamingContext = { + val ssc = StreamingContext.getOrCreate(checkpointPath, () => { + factory.create.ssc + }) + new JavaStreamingContext(ssc) + } + + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the StreamingContext + * will be created by called the provided `creatingFunc`. + * + * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program + * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext + * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible + * file system + */ + def getOrCreate( + checkpointPath: String, + hadoopConf: Configuration, + factory: JavaStreamingContextFactory + ): JavaStreamingContext = { + val ssc = StreamingContext.getOrCreate(checkpointPath, () => { + factory.create.ssc + }, hadoopConf) + new JavaStreamingContext(ssc) + } + + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the StreamingContext + * will be created by called the provided `creatingFunc`. + * + * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program + * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext + * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible + * file system + * @param createOnError Whether to create a new JavaStreamingContext if there is an + * error in reading checkpoint data. + */ + def getOrCreate( + checkpointPath: String, + hadoopConf: Configuration, + factory: JavaStreamingContextFactory, + createOnError: Boolean + ): JavaStreamingContext = { + val ssc = StreamingContext.getOrCreate(checkpointPath, () => { + factory.create.ssc + }, hadoopConf, createOnError) + new JavaStreamingContext(ssc) + } +} + +/** + * Factory interface for creating a new JavaStreamingContext + */ +trait JavaStreamingContextFactory { + def create(): JavaStreamingContext } -- cgit v1.2.3 From 7b748b83a124a2b9c692da4a0c9285f4efa431b2 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 9 Jan 2014 20:42:48 -0800 Subject: Minor clean-up --- .../java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'examples') diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java index c37b0cacc9..2e616b1ab2 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java @@ -38,7 +38,7 @@ import java.util.regex.Pattern; * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999` + * `$ ./run org.apache.spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999` */ public final class JavaNetworkWordCount { private static final Pattern SPACE = Pattern.compile(" "); -- cgit v1.2.3 From e4bb845238d0df48f8258e925caf9af5a107af46 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 10 Jan 2014 12:17:09 -0800 Subject: Updated docs based on Patrick's comments in PR 383. --- .../org/apache/spark/util/TimeStampedHashMap.scala | 4 +- .../streaming/examples/NetworkWordCount.scala | 3 +- .../examples/RecoverableNetworkWordCount.scala | 49 +++++++++++++++++----- .../org/apache/spark/streaming/Checkpoint.scala | 13 ++++-- .../streaming/api/java/JavaStreamingContext.scala | 14 +++---- 5 files changed, 58 insertions(+), 25 deletions(-) (limited to 'examples') diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala index dde504fc52..8e07a0f29a 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala @@ -27,8 +27,8 @@ import org.apache.spark.Logging /** * This is a custom implementation of scala.collection.mutable.Map which stores the insertion * timestamp along with each key-value pair. If specified, the timestamp of each pair can be - * updated every it is accessed. Key-value pairs whose timestamp are older than a particular - * threshold time can them be removed using the clearOldValues method. This is intended to + * updated every time it is accessed. Key-value pairs whose timestamp are older than a particular + * threshold time can then be removed using the clearOldValues method. This is intended to * be a drop-in replacement of scala.collection.mutable.HashMap. * @param updateTimeStampOnGet When enabled, the timestamp of a pair will be * updated when it is accessed diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala index aba1704825..4b896eaccb 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala @@ -21,7 +21,8 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ /** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Counts words in text encoded with UTF8 received from the network every second. + * * Usage: NetworkWordCount * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. * and describe the TCP server that Spark Streaming would connect to receive data. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala index 739f805e87..d51e6e9418 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala @@ -26,18 +26,41 @@ import com.google.common.io.Files import java.nio.charset.Charset /** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. - * Usage: NetworkWordCount + * Counts words in text encoded with UTF8 received from the network every second. + * + * Usage: NetworkWordCount * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. * and describe the TCP server that Spark Streaming would connect to receive data. - * directory in a Hadoop compatible file system to which checkpoint - * data will be saved to; this must be a fault-tolerant file system - * like HDFS for the system to recover from driver failures - * directory to HDFS-compatible file system which checkpoint data + * file to which the word counts will be appended + * + * In local mode, should be 'local[n]' with n > 1 + * and must be absolute paths + * + * * To run this on your local machine, you need to first run a Netcat server - * `$ nc -lk 9999` - * and then run the example - * `$ ./run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999` + * + * `$ nc -lk 9999` + * + * and run the example as + * + * `$ ./run-example org.apache.spark.streaming.examples.RecoverableNetworkWordCount \ + * local[2] localhost 9999 ~/checkpoint/ ~/out` + * + * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create + * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if + * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from + * the checkpoint data. + * + * To run this example in a local standalone cluster with automatic driver recovery, + * + * `$ ./spark-class org.apache.spark.deploy.Client -s launch \ + * org.apache.spark.streaming.examples.RecoverableNetworkWordCount \ + * localhost 9999 ~/checkpoint ~/out` + * + * would typically be /examples/target/scala-XX/spark-examples....jar + * + * Refer to the online documentation for more details. */ object RecoverableNetworkWordCount { @@ -52,7 +75,7 @@ object RecoverableNetworkWordCount { // Create the context with a 1 second batch size val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1), - System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') @@ -74,9 +97,13 @@ object RecoverableNetworkWordCount { System.err.println( """ |Usage: RecoverableNetworkWordCount + | is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + | and describe the TCP server that Spark Streaming would connect to receive data. + | directory to HDFS-compatible file system which checkpoint data + | file to which the word counts will be appended | |In local mode, should be 'local[n]' with n > 1 - |Both and should be full paths + |Both and must be absolute paths """.stripMargin ) System.exit(1) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 62b225382e..1249ef4c3d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -43,8 +43,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val pendingTimes = ssc.scheduler.getPendingTimes() val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConf = ssc.conf - - // do not save these configurations + + // These should be unset when a checkpoint is deserialized, + // otherwise the SparkContext won't initialize correctly. sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port") def validate() { @@ -102,8 +103,12 @@ object Checkpoint extends Logging { * Convenience class to handle the writing of graph checkpoint to file */ private[streaming] -class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDir: String, hadoopConf: Configuration) - extends Logging { +class CheckpointWriter( + jobGenerator: JobGenerator, + conf: SparkConf, + checkpointDir: String, + hadoopConf: Configuration + ) extends Logging { val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) val compressionCodec = CompressionCodec.createCodec(conf) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index d96e9ac7b7..523173d45a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -489,15 +489,15 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * JavaStreamingContext object contains a number of static utility functions. + * JavaStreamingContext object contains a number of utility functions. */ object JavaStreamingContext { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the StreamingContext - * will be created by called the provided `creatingFunc`. + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. * * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext @@ -515,8 +515,8 @@ object JavaStreamingContext { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the StreamingContext - * will be created by called the provided `creatingFunc`. + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. * * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext @@ -537,8 +537,8 @@ object JavaStreamingContext { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the StreamingContext - * will be created by called the provided `creatingFunc`. + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. * * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext -- cgit v1.2.3