From a1b8dd53e3474dae2c49b30bc9719c7f6b98c7cc Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 2 Jan 2014 19:07:22 -0800 Subject: Added StreamingContext.getOrCreate to for automatic recovery, and added RecoverableNetworkWordCount example to use it. --- .../cluster/SparkDeploySchedulerBackend.scala | 2 +- .../examples/RecoverableNetworkWordCount.scala | 58 ++++++++++++++++++++++ .../org/apache/spark/streaming/Checkpoint.scala | 12 +++-- .../apache/spark/streaming/StreamingContext.scala | 18 ++++++- 4 files changed, 85 insertions(+), 5 deletions(-) create mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 921b887a89..0615f7b565 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -47,7 +47,7 @@ private[spark] class SparkDeploySchedulerBackend( val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) - val args = Seq(driverUrl, "{{WORKER_URL}}", "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}") + val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") val command = Command( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome().getOrElse(null) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala new file mode 100644 index 0000000000..0e5f39f772 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala @@ -0,0 +1,58 @@ +package org.apache.spark.streaming.examples + +import org.apache.spark.streaming.{Time, Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.util.IntParam +import java.io.File +import org.apache.spark.rdd.RDD +import com.google.common.io.Files +import java.nio.charset.Charset + +object RecoverableNetworkWordCount { + + def createContext(master: String, ip: String, port: Int, outputPath: String) = { + + val outputFile = new File(outputPath) + if (outputFile.exists()) outputFile.delete() + + // Create the context with a 1 second batch size + println("Creating new context") + val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + + // Create a NetworkInputDStream on target ip:port and count the + // words in input stream of \n delimited test (eg. generated by 'nc') + val lines = ssc.socketTextStream(ip, port) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.foreach((rdd: RDD[(String, Int)], time: Time) => { + val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]") + println(counts) + println("Appending to " + outputFile.getAbsolutePath) + Files.append(counts + "\n", outputFile, Charset.defaultCharset()) + }) + ssc + } + + def main(args: Array[String]) { + if (args.length != 5) { + System.err.println("You arguments were " + args.mkString("[", ", ", "]")) + System.err.println( + """ + |Usage: RecoverableNetworkWordCount + | + |In local mode, should be 'local[n]' with n > 1 + |Both and should be full paths + """.stripMargin + ) + System.exit(1) + } + val Array(master, ip, IntParam(port), checkpointDirectory, outputPath) = args + val ssc = StreamingContext.getOrCreate(checkpointDirectory, + () => { + createContext(master, ip, port, outputPath) + }) + ssc.start() + + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 7b343d2376..139e2c08b5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -24,7 +24,7 @@ import java.util.concurrent.RejectedExecutionException import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration -import org.apache.spark.Logging +import org.apache.spark.{SparkException, Logging} import org.apache.spark.io.CompressionCodec import org.apache.spark.util.MetadataCleaner @@ -141,9 +141,15 @@ class CheckpointWriter(checkpointDir: String) extends Logging { private[streaming] object CheckpointReader extends Logging { + def doesCheckpointExist(path: String): Boolean = { + val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk")) + val fs = new Path(path).getFileSystem(new Configuration()) + (attempts.count(p => fs.exists(p)) > 1) + } + def read(path: String): Checkpoint = { val fs = new Path(path).getFileSystem(new Configuration()) - val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk")) + val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk")) val compressionCodec = CompressionCodec.createCodec() @@ -175,7 +181,7 @@ object CheckpointReader extends Logging { } }) - throw new Exception("Could not read checkpoint from path '" + path + "'") + throw new SparkException("Could not read checkpoint from path '" + path + "'") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 41da028a3c..01b213ab42 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -570,12 +570,28 @@ class StreamingContext private ( } -object StreamingContext { +object StreamingContext extends Logging { implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = { new PairDStreamFunctions[K, V](stream) } + def getOrCreate( + checkpointPath: String, + creatingFunc: () => StreamingContext, + createOnCheckpointError: Boolean = false + ): StreamingContext = { + if (CheckpointReader.doesCheckpointExist(checkpointPath)) { + logInfo("Creating streaming context from checkpoint file") + new StreamingContext(checkpointPath) + } else { + logInfo("Creating new streaming context") + val ssc = creatingFunc() + ssc.checkpoint(checkpointPath) + ssc + } + } + protected[streaming] def createNewSparkContext( master: String, appName: String, -- cgit v1.2.3 From 3d4474330d9cd7d7c1b1e9fc1f8678bc6ee905e9 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sat, 4 Jan 2014 08:39:00 -0800 Subject: Removed the exponential backoff for testing. --- core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index e8ae2d302b..7485b89cf4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -168,7 +168,7 @@ private[spark] class DriverRunner( val exitCode = process.get.waitFor() if (supervise && exitCode != 0 && !killed) { - waitSeconds = waitSeconds * 2 // exponential back-off + waitSeconds = waitSeconds * 1 // exponential back-off logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.") (0 until waitSeconds).takeWhile(f => {Thread.sleep(1000); !killed}) } -- cgit v1.2.3 From 8e88db3ca56e6a56668b029e39c8e96b86d4dd5e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 6 Jan 2014 02:21:56 +0000 Subject: Bug fixes to the DriverRunner and minor changes here and there. --- conf/slaves | 7 +++++-- .../main/scala/org/apache/spark/deploy/worker/DriverRunner.scala | 8 ++++---- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 2 +- .../scala/org/apache/spark/streaming/scheduler/JobGenerator.scala | 8 ++++---- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/conf/slaves b/conf/slaves index da0a01343d..30ea300e07 100644 --- a/conf/slaves +++ b/conf/slaves @@ -1,2 +1,5 @@ -# A Spark Worker will be started on each of the machines listed below. -localhost \ No newline at end of file +ec2-54-221-59-252.compute-1.amazonaws.com +ec2-67-202-26-243.compute-1.amazonaws.com +ec2-23-22-220-97.compute-1.amazonaws.com +ec2-50-16-98-100.compute-1.amazonaws.com +ec2-54-234-164-206.compute-1.amazonaws.com diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 7485b89cf4..2d567b7a41 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -119,15 +119,15 @@ private[spark] class DriverRunner( val emptyConf = new Configuration() // TODO: In docs explain it needs to be full HDFS path val jarFileSystem = jarPath.getFileSystem(emptyConf) - val destPath = new Path(driverDir.getAbsolutePath()) - val destFileSystem = destPath.getFileSystem(emptyConf) + val destPath = new File(driverDir.getAbsolutePath(), jarPath.getName()) + // val destFileSystem = destPath.getFileSystem(emptyConf) val jarFileName = jarPath.getName val localJarFile = new File(driverDir, jarFileName) val localJarFilename = localJarFile.getAbsolutePath if (!localJarFile.exists()) { // May already exist if running multiple workers on one node logInfo(s"Copying user jar $jarPath to $destPath") - FileUtil.copy(jarFileSystem, jarPath, destFileSystem, destPath, false, false, emptyConf) + FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf) } if (!localJarFile.exists()) { // Verify copy succeeded @@ -161,7 +161,7 @@ private[spark] class DriverRunner( val stderr = new File(baseDir, "stderr") val header = "Launch Command: %s\n%s\n\n".format( command.mkString("\"", "\" \"", "\""), "=" * 40) - Files.write(header, stderr, Charsets.UTF_8) + Files.append(header, stderr, Charsets.UTF_8) CommandUtils.redirectStream(process.get.getErrorStream, stderr) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 39e25239bf..a5a5f2e751 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -175,7 +175,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas override def cleanup() { } override def restore() { - hadoopFiles.foreach { + hadoopFiles.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, f) => { // Restore the metadata in both files and generatedRDDs logInfo("Restoring files for time " + t + " - " + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 1cd0b9b0a4..6c1df4f9c9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -85,14 +85,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { val checkpointTime = ssc.initialCheckpoint.checkpointTime val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds)) val downTimes = checkpointTime.until(restartTime, batchDuration) - logInfo("Batches during down time: " + downTimes.mkString(", ")) + logInfo("Batches during down time (" + downTimes.size + " batches): " + downTimes.mkString(", ")) // Batches that were unprocessed before failure - val pendingTimes = ssc.initialCheckpoint.pendingTimes - logInfo("Batches pending processing: " + pendingTimes.mkString(", ")) + val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering) + logInfo("Batches pending processing (" + pendingTimes.size + " batches): " + pendingTimes.mkString(", ")) // Reschedule jobs for these times val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) - logInfo("Batches to reschedule: " + timesToReschedule.mkString(", ")) + logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach(time => jobScheduler.runJobs(time, graph.generateJobs(time)) ) -- cgit v1.2.3 From ac1f4b06c12dae922172b6fa907eec0ae0bd0170 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 5 Jan 2014 23:42:53 -0800 Subject: Added a hashmap to cache file mod times. --- .../org/apache/spark/util/TimeStampedHashMap.scala | 8 ++++-- .../spark/streaming/dstream/FileInputDStream.scala | 30 +++++++++++++++++----- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala index 181ae2fd45..9ce4ef744e 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala @@ -30,12 +30,16 @@ import org.apache.spark.Logging * threshold time can them be removed using the clearOldValues method. This is intended to be a drop-in * replacement of scala.collection.mutable.HashMap. */ -class TimeStampedHashMap[A, B] extends Map[A, B]() with Logging { +class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false) + extends Map[A, B]() with Logging { val internalMap = new ConcurrentHashMap[A, (B, Long)]() def get(key: A): Option[B] = { val value = internalMap.get(key) - if (value != null) Some(value._1) else None + if (value != null && updateTimeStampOnGet) { + internalMap.replace(key, value, (value._1, currentTime)) + } + Option(value).map(_._1) } def iterator: Iterator[(A, B)] = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index b4743013b1..0028422db9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -23,10 +23,10 @@ import scala.reflect.ClassTag import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.rdd.UnionRDD import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time} +import org.apache.spark.util.TimeStampedHashMap private[streaming] @@ -46,6 +46,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas @transient private var path_ : Path = null @transient private var fs_ : FileSystem = null @transient private[streaming] var files = new HashMap[Time, Array[String]] + @transient private var fileModTimes = new TimeStampedHashMap[String, Long](true) + @transient private var lastNewFileFindingTime = 0L override def start() { if (newFilesOnly) { @@ -96,6 +98,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas (time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) logDebug("Cleared files are:\n" + oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) + // Delete file times that weren't accessed in the last round of getting new files + fileModTimes.clearOldValues(lastNewFileFindingTime - 1) } /** @@ -104,8 +108,18 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas */ private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = { logDebug("Trying to get new files for time " + currentTime) + lastNewFileFindingTime = System.currentTimeMillis val filter = new CustomPathFilter(currentTime) - val newFiles = fs.listStatus(path, filter).map(_.getPath.toString) + val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) + val timeTaken = System.currentTimeMillis - lastNewFileFindingTime + logInfo("Finding new files took " + timeTaken + " ms") + if (timeTaken > slideDuration.milliseconds) { + logWarning( + "Time taken to find new files exceeds the batch size. " + + "Consider increasing the batch size or reduceing the number of " + + "files in the monitored directory." + ) + } (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq) } @@ -122,16 +136,20 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas new UnionRDD(context.sparkContext, fileRDDs) } - private def path: Path = { + private def directoryPath: Path = { if (path_ == null) path_ = new Path(directory) path_ } private def fs: FileSystem = { - if (fs_ == null) fs_ = path.getFileSystem(new Configuration()) + if (fs_ == null) fs_ = directoryPath.getFileSystem(new Configuration()) fs_ } + private def getFileModTime(path: Path) = { + fileModTimes.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime()) + } + private def reset() { fs_ = null } @@ -142,6 +160,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ois.defaultReadObject() generatedRDDs = new HashMap[Time, RDD[(K,V)]] () files = new HashMap[Time, Array[String]] + fileModTimes = new TimeStampedHashMap[String, Long](true) } /** @@ -187,14 +206,13 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas // Latest file mod time seen in this round of fetching files and its corresponding files var latestModTime = 0L val latestModTimeFiles = new HashSet[String]() - def accept(path: Path): Boolean = { try { if (!filter(path)) { // Reject file if it does not satisfy filter logDebug("Rejected by filter " + path) return false } - val modTime = fs.getFileStatus(path).getModificationTime() + val modTime = getFileModTime(path) logDebug("Mod time for " + path + " is " + modTime) if (modTime < prevModTime) { logDebug("Mod time less than last mod time") -- cgit v1.2.3 From 0b7a132d03d5a0106d85a8cca1ab28d6af9c8b55 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 8 Jan 2014 03:22:06 -0800 Subject: Modified checkpoing file clearing policy. --- .../org/apache/spark/streaming/Checkpoint.scala | 7 ++- .../scala/org/apache/spark/streaming/DStream.scala | 15 ++++-- .../spark/streaming/DStreamCheckpointData.scala | 63 +++++++++++++++------- .../org/apache/spark/streaming/DStreamGraph.scala | 30 +++++++---- .../spark/streaming/dstream/FileInputDStream.scala | 8 +-- .../spark/streaming/scheduler/JobGenerator.scala | 23 +++++--- .../apache/spark/streaming/CheckpointSuite.scala | 10 ++-- 7 files changed, 104 insertions(+), 52 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 09b184b9cf..155d5bc02e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -28,6 +28,7 @@ import org.apache.spark.{SparkException, Logging} import org.apache.spark.io.CompressionCodec import org.apache.spark.util.MetadataCleaner import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.streaming.scheduler.JobGenerator private[streaming] @@ -58,7 +59,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) * Convenience class to handle the writing of graph checkpoint to file */ private[streaming] -class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends Logging { +class CheckpointWriter(jobGenerator: JobGenerator, checkpointDir: String, hadoopConf: Configuration) extends Logging { val file = new Path(checkpointDir, "graph") val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) @@ -80,7 +81,7 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends while (attempts < MAX_ATTEMPTS && !stopped) { attempts += 1 try { - logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") + logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") // This is inherently thread unsafe, so alleviating it by writing to '.new' and // then moving it to the final file val fos = fs.create(writeFile) @@ -96,6 +97,7 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends val finishTime = System.currentTimeMillis() logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") + jobGenerator.onCheckpointCompletion(checkpointTime) return } catch { case ioe: IOException => @@ -116,6 +118,7 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends bos.close() try { executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) + logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue") } catch { case rej: RejectedExecutionException => logError("Could not submit checkpoint task to the thread pool executor", rej) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index a78d3965ee..20074249d7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -331,13 +331,12 @@ abstract class DStream[T: ClassTag] ( * implementation clears the old generated RDDs. Subclasses of DStream may override * this to clear their own metadata along with the generated RDDs. */ - protected[streaming] def clearOldMetadata(time: Time) { - var numForgotten = 0 + protected[streaming] def clearMetadata(time: Time) { val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) generatedRDDs --= oldRDDs.keys logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " + (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) - dependencies.foreach(_.clearOldMetadata(time)) + dependencies.foreach(_.clearMetadata(time)) } /* Adds metadata to the Stream while it is running. @@ -358,12 +357,18 @@ abstract class DStream[T: ClassTag] ( */ protected[streaming] def updateCheckpointData(currentTime: Time) { logInfo("Updating checkpoint data for time " + currentTime) - checkpointData.update() + checkpointData.update(currentTime) dependencies.foreach(_.updateCheckpointData(currentTime)) - checkpointData.cleanup() logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData) } + protected[streaming] def clearCheckpointData(time: Time) { + logInfo("Clearing checkpoint data") + checkpointData.cleanup(time) + dependencies.foreach(_.clearCheckpointData(time)) + logInfo("Cleared checkpoint data") + } + /** * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method * that should not be called directly. This is a default implementation that recreates RDDs diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala index 3fd5d52403..cc2f08a7d1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala @@ -17,15 +17,16 @@ package org.apache.spark.streaming +import scala.collection.mutable.{HashMap, HashSet} +import scala.reflect.ClassTag + import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.conf.Configuration -import collection.mutable.HashMap import org.apache.spark.Logging -import scala.collection.mutable.HashMap -import scala.reflect.ClassTag +import java.io.{ObjectInputStream, IOException} private[streaming] @@ -33,35 +34,35 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) extends Serializable with Logging { protected val data = new HashMap[Time, AnyRef]() + @transient private var allCheckpointFiles = new HashMap[Time, String] + @transient private var timeToLastCheckpointFileTime = new HashMap[Time, Time] @transient private var fileSystem : FileSystem = null - @transient private var lastCheckpointFiles: HashMap[Time, String] = null - protected[streaming] def checkpointFiles = data.asInstanceOf[HashMap[Time, String]] + //@transient private var lastCheckpointFiles: HashMap[Time, String] = null + + protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] /** * Updates the checkpoint data of the DStream. This gets called every time * the graph checkpoint is initiated. Default implementation records the * checkpoint files to which the generate RDDs of the DStream has been saved. */ - def update() { + def update(time: Time) { // Get the checkpointed RDDs from the generated RDDs - val newCheckpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) + val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) .map(x => (x._1, x._2.getCheckpointFile.get)) // Make a copy of the existing checkpoint data (checkpointed RDDs) - lastCheckpointFiles = checkpointFiles.clone() + //lastCheckpointFiles = checkpointFiles.clone() // If the new checkpoint data has checkpoints then replace existing with the new one - if (newCheckpointFiles.size > 0) { - checkpointFiles.clear() - checkpointFiles ++= newCheckpointFiles - } - - // TODO: remove this, this is just for debugging - newCheckpointFiles.foreach { - case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") } + if (currentCheckpointFiles.size > 0) { + currentCheckpointFiles.clear() + currentCheckpointFiles ++= checkpointFiles } + allCheckpointFiles ++= currentCheckpointFiles + timeToLastCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) } /** @@ -69,7 +70,8 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) * checkpoint is initiated, but after `update` is called. Default * implementation, cleans up old checkpoint files. */ - def cleanup() { + def cleanup(time: Time) { + /* // If there is at least on checkpoint file in the current checkpoint files, // then delete the old checkpoint files. if (checkpointFiles.size > 0 && lastCheckpointFiles != null) { @@ -89,6 +91,23 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) } } } + */ + val lastCheckpointFileTime = timeToLastCheckpointFileTime.remove(time).get + allCheckpointFiles.filter(_._1 < lastCheckpointFileTime).foreach { + case (time, file) => + try { + val path = new Path(file) + if (fileSystem == null) { + fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) + } + fileSystem.delete(path, true) + allCheckpointFiles -= time + logInfo("Deleted checkpoint file '" + file + "' for time " + time) + } catch { + case e: Exception => + logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) + } + } } /** @@ -98,7 +117,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) */ def restore() { // Create RDDs from the checkpoint data - checkpointFiles.foreach { + currentCheckpointFiles.foreach { case(time, file) => { logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'") dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file))) @@ -107,6 +126,12 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) } override def toString() = { - "[\n" + checkpointFiles.size + " checkpoint files \n" + checkpointFiles.mkString("\n") + "\n]" + "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + currentCheckpointFiles.mkString("\n") + "\n]" + } + + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + timeToLastCheckpointFileTime = new HashMap[Time, Time] + allCheckpointFiles = new HashMap[Time, String] } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index daed7ff7c3..bfedef2e4e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -105,36 +105,44 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def getOutputStreams() = this.synchronized { outputStreams.toArray } def generateJobs(time: Time): Seq[Job] = { + logInfo("Generating jobs for time " + time) this.synchronized { - logInfo("Generating jobs for time " + time) val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time)) logInfo("Generated " + jobs.length + " jobs for time " + time) jobs } } - def clearOldMetadata(time: Time) { + def clearMetadata(time: Time) { + logInfo("Clearing metadata for time " + time) this.synchronized { - logInfo("Clearing old metadata for time " + time) - outputStreams.foreach(_.clearOldMetadata(time)) - logInfo("Cleared old metadata for time " + time) + outputStreams.foreach(_.clearMetadata(time)) } + logInfo("Cleared old metadata for time " + time) } def updateCheckpointData(time: Time) { + logInfo("Updating checkpoint data for time " + time) this.synchronized { - logInfo("Updating checkpoint data for time " + time) outputStreams.foreach(_.updateCheckpointData(time)) - logInfo("Updated checkpoint data for time " + time) } + logInfo("Updated checkpoint data for time " + time) + } + + def clearCheckpointData(time: Time) { + logInfo("Restoring checkpoint data") + this.synchronized { + outputStreams.foreach(_.clearCheckpointData(time)) + } + logInfo("Restored checkpoint data") } def restoreCheckpointData() { + logInfo("Restoring checkpoint data") this.synchronized { - logInfo("Restoring checkpoint data") outputStreams.foreach(_.restoreCheckpointData()) - logInfo("Restored checkpoint data") } + logInfo("Restored checkpoint data") } def validate() { @@ -147,8 +155,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream) { + logDebug("DStreamGraph.writeObject used") this.synchronized { - logDebug("DStreamGraph.writeObject used") checkpointInProgress = true oos.defaultWriteObject() checkpointInProgress = false @@ -157,8 +165,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { @throws(classOf[IOException]) private def readObject(ois: ObjectInputStream) { + logDebug("DStreamGraph.readObject used") this.synchronized { - logDebug("DStreamGraph.readObject used") checkpointInProgress = true ois.defaultReadObject() checkpointInProgress = false diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 0028422db9..4585e3f6bd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -90,8 +90,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } /** Clear the old time-to-files mappings along with old RDDs */ - protected[streaming] override def clearOldMetadata(time: Time) { - super.clearOldMetadata(time) + protected[streaming] override def clearMetadata(time: Time) { + super.clearMetadata(time) val oldFiles = files.filter(_._1 <= (time - rememberDuration)) files --= oldFiles.keys logInfo("Cleared " + oldFiles.size + " old files that were older than " + @@ -172,12 +172,12 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]] - override def update() { + override def update(time: Time) { hadoopFiles.clear() hadoopFiles ++= files } - override def cleanup() { } + override def cleanup(time: Time) { } override def restore() { hadoopFiles.toSeq.sortBy(_._1)(Time.ordering).foreach { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 5f48692df8..6fbe6da921 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -26,8 +26,9 @@ import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock} /** Event classes for JobGenerator */ private[scheduler] sealed trait JobGeneratorEvent private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent -private[scheduler] case class ClearOldMetadata(time: Time) extends JobGeneratorEvent +private[scheduler] case class ClearMetadata(time: Time) extends JobGeneratorEvent private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent +private[scheduler] case class ClearCheckpointData(time: Time) extends JobGeneratorEvent /** * This class generates jobs from DStreams as well as drives checkpointing and cleaning @@ -55,7 +56,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventProcessorActor ! GenerateJobs(new Time(longTime))) lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { - new CheckpointWriter(ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) + new CheckpointWriter(this, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) } else { null } @@ -79,15 +80,20 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { * On batch completion, clear old metadata and checkpoint computation. */ private[scheduler] def onBatchCompletion(time: Time) { - eventProcessorActor ! ClearOldMetadata(time) + eventProcessorActor ! ClearMetadata(time) + } + + private[streaming] def onCheckpointCompletion(time: Time) { + eventProcessorActor ! ClearCheckpointData(time) } /** Processes all events */ private def processEvent(event: JobGeneratorEvent) { event match { case GenerateJobs(time) => generateJobs(time) - case ClearOldMetadata(time) => clearOldMetadata(time) + case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time) => doCheckpoint(time) + case ClearCheckpointData(time) => clearCheckpointData(time) } } @@ -143,11 +149,16 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { } /** Clear DStream metadata for the given `time`. */ - private def clearOldMetadata(time: Time) { - ssc.graph.clearOldMetadata(time) + private def clearMetadata(time: Time) { + ssc.graph.clearMetadata(time) eventProcessorActor ! DoCheckpoint(time) } + /** Clear DStream checkpoint data for the given `time`. */ + private def clearCheckpointData(time: Time) { + ssc.graph.clearCheckpointData(time) + } + /** Perform checkpoint for the give `time`. */ private def doCheckpoint(time: Time) = synchronized { if (checkpointWriter != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 4e25c9566c..53bc24ff7a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -84,9 +84,9 @@ class CheckpointSuite extends TestSuiteBase { ssc.start() advanceTimeWithRealDelay(ssc, firstNumBatches) logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData) - assert(!stateStream.checkpointData.checkpointFiles.isEmpty, + assert(!stateStream.checkpointData.currentCheckpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure") - stateStream.checkpointData.checkpointFiles.foreach { + stateStream.checkpointData.currentCheckpointFiles.foreach { case (time, file) => { assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist") @@ -95,7 +95,7 @@ class CheckpointSuite extends TestSuiteBase { // Run till a further time such that previous checkpoint files in the stream would be deleted // and check whether the earlier checkpoint files are deleted - val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2)) + val checkpointFiles = stateStream.checkpointData.currentCheckpointFiles.map(x => new File(x._2)) advanceTimeWithRealDelay(ssc, secondNumBatches) checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted")) @@ -114,9 +114,9 @@ class CheckpointSuite extends TestSuiteBase { // is present in the checkpoint data or not ssc.start() advanceTimeWithRealDelay(ssc, 1) - assert(!stateStream.checkpointData.checkpointFiles.isEmpty, + assert(!stateStream.checkpointData.currentCheckpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure") - stateStream.checkpointData.checkpointFiles.foreach { + stateStream.checkpointData.currentCheckpointFiles.foreach { case (time, file) => { assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist") -- cgit v1.2.3 From a17cc602ac79b22457ed457023493fe82e9d39df Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 8 Jan 2014 04:12:05 -0800 Subject: More bug fixes. --- .../spark/streaming/DStreamCheckpointData.scala | 45 +++++++++++++--------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala index cc2f08a7d1..e0567a1c19 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala @@ -22,7 +22,6 @@ import scala.reflect.ClassTag import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.conf.Configuration import org.apache.spark.Logging @@ -53,16 +52,17 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) .map(x => (x._1, x._2.getCheckpointFile.get)) + logInfo("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) // Make a copy of the existing checkpoint data (checkpointed RDDs) - //lastCheckpointFiles = checkpointFiles.clone() + // lastCheckpointFiles = checkpointFiles.clone() // If the new checkpoint data has checkpoints then replace existing with the new one - if (currentCheckpointFiles.size > 0) { + if (!currentCheckpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles + allCheckpointFiles ++= currentCheckpointFiles + timeToLastCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) } - allCheckpointFiles ++= currentCheckpointFiles - timeToLastCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) } /** @@ -92,21 +92,28 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) } } */ - val lastCheckpointFileTime = timeToLastCheckpointFileTime.remove(time).get - allCheckpointFiles.filter(_._1 < lastCheckpointFileTime).foreach { - case (time, file) => - try { - val path = new Path(file) - if (fileSystem == null) { - fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) - } - fileSystem.delete(path, true) - allCheckpointFiles -= time - logInfo("Deleted checkpoint file '" + file + "' for time " + time) - } catch { - case e: Exception => - logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) + timeToLastCheckpointFileTime.remove(time) match { + case Some(lastCheckpointFileTime) => + logInfo("Deleting all files before " + time) + val filesToDelete = allCheckpointFiles.filter(_._1 < lastCheckpointFileTime) + logInfo("Files to delete:\n" + filesToDelete.mkString(",")) + filesToDelete.foreach { + case (time, file) => + try { + val path = new Path(file) + if (fileSystem == null) { + fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) + } + fileSystem.delete(path, true) + allCheckpointFiles -= time + logInfo("Deleted checkpoint file '" + file + "' for time " + time) + } catch { + case e: Exception => + logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) + } } + case None => + logInfo("Nothing to delete") } } -- cgit v1.2.3 From 6f713e2a3e56185368b66fb087637dec112a1f5d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 9 Jan 2014 13:42:04 -0800 Subject: Changed the way StreamingContext finds and reads checkpoint files, and added JavaStreamingContext.getOrCreate. --- conf/slaves | 6 +- .../main/scala/org/apache/spark/SparkContext.scala | 2 +- .../streaming/examples/JavaNetworkWordCount.java | 7 +- .../streaming/examples/NetworkWordCount.scala | 2 +- .../examples/RecoverableNetworkWordCount.scala | 43 ++++++++-- .../org/apache/spark/streaming/Checkpoint.scala | 98 ++++++++++++---------- .../spark/streaming/DStreamCheckpointData.scala | 57 ++++--------- .../org/apache/spark/streaming/DStreamGraph.scala | 4 +- .../apache/spark/streaming/StreamingContext.scala | 64 ++++++++++---- .../streaming/api/java/JavaStreamingContext.scala | 96 +++++++++++++++++++-- 10 files changed, 254 insertions(+), 125 deletions(-) diff --git a/conf/slaves b/conf/slaves index 30ea300e07..2fbb50c4a8 100644 --- a/conf/slaves +++ b/conf/slaves @@ -1,5 +1 @@ -ec2-54-221-59-252.compute-1.amazonaws.com -ec2-67-202-26-243.compute-1.amazonaws.com -ec2-23-22-220-97.compute-1.amazonaws.com -ec2-50-16-98-100.compute-1.amazonaws.com -ec2-54-234-164-206.compute-1.amazonaws.com +localhost diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7514ce58fb..304e85f1c0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -29,7 +29,7 @@ import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{LocalFileSystem, Path} import org.apache.hadoop.io.ArrayWritable import org.apache.hadoop.io.BooleanWritable import org.apache.hadoop.io.BytesWritable diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java index def87c199b..d8d6046914 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java @@ -41,17 +41,17 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; public class JavaNetworkWordCount { public static void main(String[] args) { if (args.length < 3) { - System.err.println("Usage: NetworkWordCount \n" + + System.err.println("Usage: JavaNetworkWordCount \n" + "In local mode, should be 'local[n]' with n > 1"); System.exit(1); } // Create the context with a 1 second batch size - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount", + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount", new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited test (eg. generated by 'nc') + // words in input stream of \n delimited text (eg. generated by 'nc') JavaDStream lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2])); JavaDStream words = lines.flatMap(new FlatMapFunction() { @Override @@ -74,6 +74,5 @@ public class JavaNetworkWordCount { wordCounts.print(); ssc.start(); - } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala index e2487dca5f..5ad4875980 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala @@ -44,7 +44,7 @@ object NetworkWordCount { System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited test (eg. generated by 'nc') + // words in input stream of \n delimited text (eg. generated by 'nc') val lines = ssc.socketTextStream(args(1), args(2).toInt) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala index 0e5f39f772..739f805e87 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming.examples import org.apache.spark.streaming.{Time, Seconds, StreamingContext} @@ -8,20 +25,37 @@ import org.apache.spark.rdd.RDD import com.google.common.io.Files import java.nio.charset.Charset +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Usage: NetworkWordCount + * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + * and describe the TCP server that Spark Streaming would connect to receive data. + * directory in a Hadoop compatible file system to which checkpoint + * data will be saved to; this must be a fault-tolerant file system + * like HDFS for the system to recover from driver failures + * (x, 1)).reduceByKey(_ + _) @@ -39,10 +73,10 @@ object RecoverableNetworkWordCount { System.err.println("You arguments were " + args.mkString("[", ", ", "]")) System.err.println( """ - |Usage: RecoverableNetworkWordCount + |Usage: RecoverableNetworkWordCount | |In local mode, should be 'local[n]' with n > 1 - |Both and should be full paths + |Both and should be full paths """.stripMargin ) System.exit(1) @@ -53,6 +87,5 @@ object RecoverableNetworkWordCount { createContext(master, ip, port, outputPath) }) ssc.start() - } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 155d5bc02e..a32e4852c5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -82,22 +82,28 @@ class CheckpointWriter(jobGenerator: JobGenerator, checkpointDir: String, hadoop attempts += 1 try { logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") - // This is inherently thread unsafe, so alleviating it by writing to '.new' and + // This is inherently thread unsafe, so alleviating it by writing to '.next' and // then moving it to the final file val fos = fs.create(writeFile) fos.write(bytes) fos.close() + + // Back up existing checkpoint if it exists if (fs.exists(file) && fs.rename(file, bakFile)) { logDebug("Moved existing checkpoint file to " + bakFile) } - // paranoia - fs.delete(file, false) - fs.rename(writeFile, file) - - val finishTime = System.currentTimeMillis() - logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + - "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") - jobGenerator.onCheckpointCompletion(checkpointTime) + fs.delete(file, false) // paranoia + + // Rename temp written file to the right location + if (fs.rename(writeFile, file)) { + val finishTime = System.currentTimeMillis() + logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms") + jobGenerator.onCheckpointCompletion(checkpointTime) + } else { + throw new SparkException("Failed to rename checkpoint file from " + + writeFile + " to " + file) + } return } catch { case ioe: IOException => @@ -154,47 +160,47 @@ class CheckpointWriter(jobGenerator: JobGenerator, checkpointDir: String, hadoop private[streaming] object CheckpointReader extends Logging { - def doesCheckpointExist(path: String): Boolean = { - val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk")) - val fs = new Path(path).getFileSystem(new Configuration()) - (attempts.count(p => fs.exists(p)) > 1) - } + private val graphFileNames = Seq("graph", "graph.bk") + + def read(checkpointDir: String, hadoopConf: Configuration): Option[Checkpoint] = { + val checkpointPath = new Path(checkpointDir) + def fs = checkpointPath.getFileSystem(hadoopConf) + val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists) + + // Log the file listing if graph checkpoint file was not found + if (existingFiles.isEmpty) { + logInfo("Could not find graph file in " + checkpointDir + ", which contains the files:\n" + + fs.listStatus(checkpointPath).mkString("\n")) + return None + } + logInfo("Checkpoint files found: " + existingFiles.mkString(",")) - def read(path: String): Checkpoint = { - val fs = new Path(path).getFileSystem(new Configuration()) - val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk")) val compressionCodec = CompressionCodec.createCodec() - - attempts.foreach(file => { - if (fs.exists(file)) { - logInfo("Attempting to load checkpoint from file '" + file + "'") - try { - val fis = fs.open(file) - // ObjectInputStream uses the last defined user-defined class loader in the stack - // to find classes, which maybe the wrong class loader. Hence, a inherited version - // of ObjectInputStream is used to explicitly use the current thread's default class - // loader to find and load classes. This is a well know Java issue and has popped up - // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) - val zis = compressionCodec.compressedInputStream(fis) - val ois = new ObjectInputStreamWithLoader(zis, - Thread.currentThread().getContextClassLoader) - val cp = ois.readObject.asInstanceOf[Checkpoint] - ois.close() - fs.close() - cp.validate() - logInfo("Checkpoint successfully loaded from file '" + file + "'") - logInfo("Checkpoint was generated at time " + cp.checkpointTime) - return cp - } catch { - case e: Exception => - logError("Error loading checkpoint from file '" + file + "'", e) - } - } else { - logWarning("Could not read checkpoint from file '" + file + "' as it does not exist") + existingFiles.foreach(file => { + logInfo("Attempting to load checkpoint from file '" + file + "'") + try { + val fis = fs.open(file) + // ObjectInputStream uses the last defined user-defined class loader in the stack + // to find classes, which maybe the wrong class loader. Hence, a inherited version + // of ObjectInputStream is used to explicitly use the current thread's default class + // loader to find and load classes. This is a well know Java issue and has popped up + // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) + val zis = compressionCodec.compressedInputStream(fis) + val ois = new ObjectInputStreamWithLoader(zis, + Thread.currentThread().getContextClassLoader) + val cp = ois.readObject.asInstanceOf[Checkpoint] + ois.close() + fs.close() + cp.validate() + logInfo("Checkpoint successfully loaded from file '" + file + "'") + logInfo("Checkpoint was generated at time " + cp.checkpointTime) + return Some(cp) + } catch { + case e: Exception => + logWarning("Error reading checkpoint from file '" + file + "'", e) } - }) - throw new SparkException("Could not read checkpoint from path '" + path + "'") + throw new SparkException("Failed to read checkpoint from directory '" + checkpointDir + "'") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala index e0567a1c19..1081d3c807 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala @@ -27,18 +27,16 @@ import org.apache.spark.Logging import java.io.{ObjectInputStream, IOException} - private[streaming] class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) extends Serializable with Logging { protected val data = new HashMap[Time, AnyRef]() - @transient private var allCheckpointFiles = new HashMap[Time, String] - @transient private var timeToLastCheckpointFileTime = new HashMap[Time, Time] + // Mapping of the batch time to the checkpointed RDD file of that time + @transient private var timeToCheckpointFile = new HashMap[Time, String] + // Mapping of the batch time to the time of the oldest checkpointed RDD in that batch's checkpoint data + @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] @transient private var fileSystem : FileSystem = null - - //@transient private var lastCheckpointFiles: HashMap[Time, String] = null - protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] /** @@ -51,17 +49,14 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) // Get the checkpointed RDDs from the generated RDDs val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) .map(x => (x._1, x._2.getCheckpointFile.get)) + logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) - logInfo("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) - // Make a copy of the existing checkpoint data (checkpointed RDDs) - // lastCheckpointFiles = checkpointFiles.clone() - - // If the new checkpoint data has checkpoints then replace existing with the new one + // Add the checkpoint files to the data to be serialized if (!currentCheckpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles - allCheckpointFiles ++= currentCheckpointFiles - timeToLastCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) + timeToCheckpointFile ++= currentCheckpointFiles + timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) } } @@ -71,32 +66,10 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) * implementation, cleans up old checkpoint files. */ def cleanup(time: Time) { - /* - // If there is at least on checkpoint file in the current checkpoint files, - // then delete the old checkpoint files. - if (checkpointFiles.size > 0 && lastCheckpointFiles != null) { - (lastCheckpointFiles -- checkpointFiles.keySet).foreach { - case (time, file) => { - try { - val path = new Path(file) - if (fileSystem == null) { - fileSystem = path.getFileSystem(new Configuration()) - } - fileSystem.delete(path, true) - logInfo("Deleted checkpoint file '" + file + "' for time " + time) - } catch { - case e: Exception => - logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) - } - } - } - } - */ - timeToLastCheckpointFileTime.remove(time) match { + timeToOldestCheckpointFileTime.remove(time) match { case Some(lastCheckpointFileTime) => - logInfo("Deleting all files before " + time) - val filesToDelete = allCheckpointFiles.filter(_._1 < lastCheckpointFileTime) - logInfo("Files to delete:\n" + filesToDelete.mkString(",")) + val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime) + logDebug("Files to delete:\n" + filesToDelete.mkString(",")) filesToDelete.foreach { case (time, file) => try { @@ -105,11 +78,12 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) } fileSystem.delete(path, true) - allCheckpointFiles -= time + timeToCheckpointFile -= time logInfo("Deleted checkpoint file '" + file + "' for time " + time) } catch { case e: Exception => logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) + fileSystem = null } } case None => @@ -138,7 +112,8 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) @throws(classOf[IOException]) private def readObject(ois: ObjectInputStream) { - timeToLastCheckpointFileTime = new HashMap[Time, Time] - allCheckpointFiles = new HashMap[Time, String] + ois.defaultReadObject() + timeToOldestCheckpointFileTime = new HashMap[Time, Time] + timeToCheckpointFile = new HashMap[Time, String] } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index bfedef2e4e..34919d315c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -130,11 +130,11 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { } def clearCheckpointData(time: Time) { - logInfo("Restoring checkpoint data") + logInfo("Clearing checkpoint data for time " + time) this.synchronized { outputStreams.foreach(_.clearCheckpointData(time)) } - logInfo("Restored checkpoint data") + logInfo("Cleared checkpoint data for time " + time) } def restoreCheckpointData() { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 59d2d546e6..30deba417e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -45,10 +45,11 @@ import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{LocalFileSystem, Path} import twitter4j.Status import twitter4j.auth.Authorization +import org.apache.hadoop.conf.Configuration /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -89,10 +90,12 @@ class StreamingContext private ( /** * Re-create a StreamingContext from a checkpoint file. - * @param path Path either to the directory that was specified as the checkpoint directory, or - * to the checkpoint file 'graph' or 'graph.bk'. + * @param path Path to the directory that was specified as the checkpoint directory + * @param hadoopConf Optional, configuration object if necessary for reading from + * HDFS compatible filesystems */ - def this(path: String) = this(null, CheckpointReader.read(path), null) + def this(path: String, hadoopConf: Configuration = new Configuration) = + this(null, CheckpointReader.read(path, hadoopConf).get, null) initLogging() @@ -170,8 +173,9 @@ class StreamingContext private ( /** * Set the context to periodically checkpoint the DStream operations for master - * fault-tolerance. The graph will be checkpointed every batch interval. - * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored + * fault-tolerance. + * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored. + * Note that this must be a fault-tolerant file system like HDFS for */ def checkpoint(directory: String) { if (directory != null) { @@ -577,6 +581,10 @@ class StreamingContext private ( } } +/** + * StreamingContext object contains a number of utility functions related to the + * StreamingContext class. + */ object StreamingContext extends Logging { @@ -584,19 +592,45 @@ object StreamingContext extends Logging { new PairDStreamFunctions[K, V](stream) } + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the StreamingContext + * will be created by called the provided `creatingFunc`. + * + * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program + * @param creatingFunc Function to create a new StreamingContext + * @param hadoopConf Optional Hadoop configuration if necessary for reading from the + * file system + * @param createOnError Optional, whether to create a new StreamingContext if there is an + * error in reading checkpoint data. By default, an exception will be + * thrown on error. + */ def getOrCreate( checkpointPath: String, creatingFunc: () => StreamingContext, - createOnCheckpointError: Boolean = false + hadoopConf: Configuration = new Configuration(), + createOnError: Boolean = false ): StreamingContext = { - if (CheckpointReader.doesCheckpointExist(checkpointPath)) { - logInfo("Creating streaming context from checkpoint file") - new StreamingContext(checkpointPath) - } else { - logInfo("Creating new streaming context") - val ssc = creatingFunc() - ssc.checkpoint(checkpointPath) - ssc + + try { + CheckpointReader.read(checkpointPath, hadoopConf) match { + case Some(checkpoint) => + return new StreamingContext(null, checkpoint, null) + case None => + logInfo("Creating new StreamingContext") + return creatingFunc() + } + } catch { + case e: Exception => + if (createOnError) { + logWarning("Error reading checkpoint", e) + logInfo("Creating new StreamingContext") + return creatingFunc() + } else { + logError("Error reading checkpoint", e) + throw e + } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index aad0d931e7..f38d145317 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -40,6 +40,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.scheduler.StreamingListener +import org.apache.hadoop.conf.Configuration /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -125,10 +126,16 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Re-creates a StreamingContext from a checkpoint file. - * @param path Path either to the directory that was specified as the checkpoint directory, or - * to the checkpoint file 'graph' or 'graph.bk'. + * @param path Path to the directory that was specified as the checkpoint directory */ - def this(path: String) = this (new StreamingContext(path)) + def this(path: String) = this(new StreamingContext(path)) + + /** + * Re-creates a StreamingContext from a checkpoint file. + * @param path Path to the directory that was specified as the checkpoint directory + * + */ + def this(path: String, hadoopConf: Configuration) = this(new StreamingContext(path, hadoopConf)) /** The underlying SparkContext */ val sc: JavaSparkContext = new JavaSparkContext(ssc.sc) @@ -699,13 +706,92 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Starts the execution of the streams. + * Start the execution of the streams. */ def start() = ssc.start() /** - * Sstops the execution of the streams. + * Stop the execution of the streams. */ def stop() = ssc.stop() +} + +/** + * JavaStreamingContext object contains a number of static utility functions. + */ +object JavaStreamingContext { + + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the StreamingContext + * will be created by called the provided `creatingFunc`. + * + * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program + * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext + */ + def getOrCreate( + checkpointPath: String, + factory: JavaStreamingContextFactory + ): JavaStreamingContext = { + val ssc = StreamingContext.getOrCreate(checkpointPath, () => { + factory.create.ssc + }) + new JavaStreamingContext(ssc) + } + + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the StreamingContext + * will be created by called the provided `creatingFunc`. + * + * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program + * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext + * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible + * file system + */ + def getOrCreate( + checkpointPath: String, + hadoopConf: Configuration, + factory: JavaStreamingContextFactory + ): JavaStreamingContext = { + val ssc = StreamingContext.getOrCreate(checkpointPath, () => { + factory.create.ssc + }, hadoopConf) + new JavaStreamingContext(ssc) + } + + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the StreamingContext + * will be created by called the provided `creatingFunc`. + * + * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program + * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext + * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible + * file system + * @param createOnError Whether to create a new JavaStreamingContext if there is an + * error in reading checkpoint data. + */ + def getOrCreate( + checkpointPath: String, + hadoopConf: Configuration, + factory: JavaStreamingContextFactory, + createOnError: Boolean + ): JavaStreamingContext = { + val ssc = StreamingContext.getOrCreate(checkpointPath, () => { + factory.create.ssc + }, hadoopConf, createOnError) + new JavaStreamingContext(ssc) + } +} + +/** + * Factory interface for creating a new JavaStreamingContext + */ +trait JavaStreamingContextFactory { + def create(): JavaStreamingContext } -- cgit v1.2.3 From 4a5558ca9921ce89b3996e9ead13b07123fc7a2d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 10 Jan 2014 03:28:39 +0000 Subject: Fixed bugs in reading of checkpoints. --- .../org/apache/spark/streaming/Checkpoint.scala | 20 ++++++++++++++++---- .../apache/spark/streaming/StreamingContext.scala | 17 ++++------------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 476ae70bc9..d268b68f90 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -165,16 +165,28 @@ object CheckpointReader extends Logging { def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = { val checkpointPath = new Path(checkpointDir) def fs = checkpointPath.getFileSystem(hadoopConf) - val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists) + + // See if the checkpoint directory exists + if (!fs.exists(checkpointPath)) { + logInfo("Could not load checkpoint as path '" + checkpointPath + "' does not exist") + return None + } - // Log the file listing if graph checkpoint file was not found + // Try to find the checkpoint data + val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists) if (existingFiles.isEmpty) { - logInfo("Could not find graph file in " + checkpointDir + ", which contains the files:\n" + - fs.listStatus(checkpointPath).mkString("\n")) + logInfo("Could not load checkpoint as checkpoint data was not " + + "found in directory " + checkpointDir + "") + val statuses = fs.listStatus(checkpointPath) + if (statuses!=null) { + logInfo("Checkpoint directory " + checkpointDir + " contains the files:\n" + + statuses.mkString("\n")) + } return None } logInfo("Checkpoint files found: " + existingFiles.mkString(",")) + // Try to read the checkpoint data val compressionCodec = CompressionCodec.createCodec(conf) existingFiles.foreach(file => { logInfo("Attempting to load checkpoint from file '" + file + "'") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 76be81603c..dd34f6f4f2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -496,26 +496,17 @@ object StreamingContext extends Logging { hadoopConf: Configuration = new Configuration(), createOnError: Boolean = false ): StreamingContext = { - - try { - CheckpointReader.read(checkpointPath, new SparkConf(), hadoopConf) match { - case Some(checkpoint) => - return new StreamingContext(null, checkpoint, null) - case None => - logInfo("Creating new StreamingContext") - return creatingFunc() - } + val checkpointOption = try { + CheckpointReader.read(checkpointPath, new SparkConf(), hadoopConf) } catch { case e: Exception => if (createOnError) { - logWarning("Error reading checkpoint", e) - logInfo("Creating new StreamingContext") - return creatingFunc() + None } else { - logError("Error reading checkpoint", e) throw e } } + checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc()) } /** -- cgit v1.2.3 From 9d3d9c8251724712590f3178e69e78ea0b750e9c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 10 Jan 2014 11:44:02 +0000 Subject: Refactored graph checkpoint file reading and writing code to make it cleaner and easily debuggable. --- .../org/apache/spark/streaming/Checkpoint.scala | 150 ++++++++++++++------- .../spark/streaming/dstream/FileInputDStream.scala | 1 + 2 files changed, 102 insertions(+), 49 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index d268b68f90..7366d8a7a4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -53,6 +53,55 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) } } +private[streaming] +object Checkpoint extends Logging { + val PREFIX = "checkpoint-" + val REGEX = (PREFIX + """([\d]+)([\w\.]*)""").r + + /** Get the checkpoint file for the given checkpoint time */ + def checkpointFile(checkpointDir: String, checkpointTime: Time) = { + new Path(checkpointDir, PREFIX + checkpointTime.milliseconds) + } + + /** Get the checkpoint backup file for the given checkpoint time */ + def checkpointBackupFile(checkpointDir: String, checkpointTime: Time) = { + new Path(checkpointDir, PREFIX + checkpointTime.milliseconds + ".bk") + } + + /** Get checkpoint files present in the give directory, ordered by oldest-first */ + def getCheckpointFiles(checkpointDir: String, fs: FileSystem): Seq[Path] = { + def sortFunc(path1: Path, path2: Path): Boolean = { + val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) } + val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) } + logInfo("Path 1: " + path1 + " -> " + time1 + ", " + bk1) + logInfo("Path 2: " + path2 + " -> " + time2 + ", " + bk2) + val precede = (time1 < time2) || (time1 == time2 && bk1) + logInfo(precede.toString) + precede + } + + val path = new Path(checkpointDir) + if (fs.exists(path)) { + val statuses = fs.listStatus(path) + if (statuses != null) { + val paths = statuses.map(_.getPath) + logInfo("Paths = " + paths.map(_.getName).mkString(", ")) + val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty) + logInfo("Filtered paths = " + filtered.map(_.getName).mkString(", ")) + val sorted = filtered.sortWith(sortFunc) + logInfo("Sorted paths = " + sorted.map(_.getName).mkString(", ")) + sorted + } else { + logWarning("Listing " + path + " returned null") + Seq.empty + } + } else { + logInfo("Checkpoint directory " + path + " does not exist") + Seq.empty + } + } +} + /** * Convenience class to handle the writing of graph checkpoint to file @@ -60,14 +109,13 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) private[streaming] class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDir: String, hadoopConf: Configuration) extends Logging { - val file = new Path(checkpointDir, "graph") val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) val compressionCodec = CompressionCodec.createCodec(conf) // The file to which we actually write - and then "move" to file - val writeFile = new Path(file.getParent, file.getName + ".next") + // val writeFile = new Path(file.getParent, file.getName + ".next") // The file to which existing checkpoint is backed up (i.e. "moved") - val bakFile = new Path(file.getParent, file.getName + ".bk") + // val bakFile = new Path(file.getParent, file.getName + ".bk") private var stopped = false private var fs_ : FileSystem = _ @@ -78,40 +126,57 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi def run() { var attempts = 0 val startTime = System.currentTimeMillis() + val tempFile = new Path(checkpointDir, "temp") + val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime) + val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime) + while (attempts < MAX_ATTEMPTS && !stopped) { attempts += 1 try { - logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") - // This is inherently thread unsafe, so alleviating it by writing to '.next' and - // then moving it to the final file - val fos = fs.create(writeFile) + logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile + "'") + + // Write checkpoint to temp file + fs.delete(tempFile, true) // just in case it exists + val fos = fs.create(tempFile) fos.write(bytes) fos.close() - // Back up existing checkpoint if it exists - if (fs.exists(file) && fs.rename(file, bakFile)) { - logDebug("Moved existing checkpoint file to " + bakFile) + // If the checkpoint file exists, back it up + // If the backup exists as well, just delete it, otherwise rename will fail + if (fs.exists(checkpointFile)) { + fs.delete(backupFile, true) // just in case it exists + if (!fs.rename(checkpointFile, backupFile)) { + logWarning("Could not rename " + checkpointFile + " to " + backupFile) + } + } + + // Rename temp file to the final checkpoint file + if (!fs.rename(tempFile, checkpointFile)) { + logWarning("Could not rename " + tempFile + " to " + checkpointFile) } - fs.delete(file, false) // paranoia - - // Rename temp written file to the right location - if (fs.rename(writeFile, file)) { - val finishTime = System.currentTimeMillis() - logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + - "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms") - jobGenerator.onCheckpointCompletion(checkpointTime) - } else { - throw new SparkException("Failed to rename checkpoint file from " - + writeFile + " to " + file) + + // Delete old checkpoint files + val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs) + if (allCheckpointFiles.size > 4) { + allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => { + logInfo("Deleting " + file) + fs.delete(file, true) + }) } + + // All done, print success + val finishTime = System.currentTimeMillis() + logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile + + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms") + jobGenerator.onCheckpointCompletion(checkpointTime) return } catch { case ioe: IOException => - logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe) + logWarning("Error in attempt " + attempts + " of writing checkpoint to " + checkpointFile, ioe) reset() } } - logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'") + logWarning("Could not write checkpoint for time " + checkpointTime + " to file " + checkpointFile + "'") } } @@ -147,7 +212,7 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi } private def fs = synchronized { - if (fs_ == null) fs_ = file.getFileSystem(hadoopConf) + if (fs_ == null) fs_ = new Path(checkpointDir).getFileSystem(hadoopConf) fs_ } @@ -160,36 +225,21 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi private[streaming] object CheckpointReader extends Logging { - private val graphFileNames = Seq("graph", "graph.bk") - def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = { val checkpointPath = new Path(checkpointDir) def fs = checkpointPath.getFileSystem(hadoopConf) - // See if the checkpoint directory exists - if (!fs.exists(checkpointPath)) { - logInfo("Could not load checkpoint as path '" + checkpointPath + "' does not exist") + // Try to find the checkpoint files + val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse + if (checkpointFiles.isEmpty) { return None } - // Try to find the checkpoint data - val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists) - if (existingFiles.isEmpty) { - logInfo("Could not load checkpoint as checkpoint data was not " + - "found in directory " + checkpointDir + "") - val statuses = fs.listStatus(checkpointPath) - if (statuses!=null) { - logInfo("Checkpoint directory " + checkpointDir + " contains the files:\n" + - statuses.mkString("\n")) - } - return None - } - logInfo("Checkpoint files found: " + existingFiles.mkString(",")) - - // Try to read the checkpoint data + // Try to read the checkpoint files in the order + logInfo("Checkpoint files found: " + checkpointFiles.mkString(",")) val compressionCodec = CompressionCodec.createCodec(conf) - existingFiles.foreach(file => { - logInfo("Attempting to load checkpoint from file '" + file + "'") + checkpointFiles.foreach(file => { + logInfo("Attempting to load checkpoint from file " + file) try { val fis = fs.open(file) // ObjectInputStream uses the last defined user-defined class loader in the stack @@ -204,15 +254,17 @@ object CheckpointReader extends Logging { ois.close() fs.close() cp.validate() - logInfo("Checkpoint successfully loaded from file '" + file + "'") + logInfo("Checkpoint successfully loaded from file " + file) logInfo("Checkpoint was generated at time " + cp.checkpointTime) return Some(cp) } catch { case e: Exception => - logWarning("Error reading checkpoint from file '" + file + "'", e) + logWarning("Error reading checkpoint from file " + file, e) } }) - throw new SparkException("Failed to read checkpoint from directory '" + checkpointDir + "'") + + // If none of checkpoint files could be read, then throw exception + throw new SparkException("Failed to read checkpoint from directory " + checkpointPath) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 4585e3f6bd..a79fe523a6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -113,6 +113,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) val timeTaken = System.currentTimeMillis - lastNewFileFindingTime logInfo("Finding new files took " + timeTaken + " ms") + logDebug("# cached file times = " + fileModTimes.size) if (timeTaken > slideDuration.milliseconds) { logWarning( "Time taken to find new files exceeds the batch size. " + -- cgit v1.2.3 From 4f609f79015732a91a83c5625d357c4edfc7c962 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 10 Jan 2014 12:58:07 +0000 Subject: Removed spark.hostPort and other setting from SparkConf before saving to checkpoint. --- .../org/apache/spark/streaming/Checkpoint.scala | 24 ++++++---------------- 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 7366d8a7a4..62b225382e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -43,6 +43,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val pendingTimes = ssc.scheduler.getPendingTimes() val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConf = ssc.conf + + // do not save these configurations + sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port") def validate() { assert(master != null, "Checkpoint.master is null") @@ -73,11 +76,7 @@ object Checkpoint extends Logging { def sortFunc(path1: Path, path2: Path): Boolean = { val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) } val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) } - logInfo("Path 1: " + path1 + " -> " + time1 + ", " + bk1) - logInfo("Path 2: " + path2 + " -> " + time2 + ", " + bk2) - val precede = (time1 < time2) || (time1 == time2 && bk1) - logInfo(precede.toString) - precede + (time1 < time2) || (time1 == time2 && bk1) } val path = new Path(checkpointDir) @@ -85,12 +84,8 @@ object Checkpoint extends Logging { val statuses = fs.listStatus(path) if (statuses != null) { val paths = statuses.map(_.getPath) - logInfo("Paths = " + paths.map(_.getName).mkString(", ")) val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty) - logInfo("Filtered paths = " + filtered.map(_.getName).mkString(", ")) - val sorted = filtered.sortWith(sortFunc) - logInfo("Sorted paths = " + sorted.map(_.getName).mkString(", ")) - sorted + filtered.sortWith(sortFunc) } else { logWarning("Listing " + path + " returned null") Seq.empty @@ -112,16 +107,9 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) val compressionCodec = CompressionCodec.createCodec(conf) - // The file to which we actually write - and then "move" to file - // val writeFile = new Path(file.getParent, file.getName + ".next") - // The file to which existing checkpoint is backed up (i.e. "moved") - // val bakFile = new Path(file.getParent, file.getName + ".bk") - private var stopped = false private var fs_ : FileSystem = _ - // Removed code which validates whether there is only one CheckpointWriter per path 'file' since - // I did not notice any errors - reintroduce it ? class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { def run() { var attempts = 0 @@ -189,7 +177,7 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi bos.close() try { executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) - logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue") + logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue") } catch { case rej: RejectedExecutionException => logError("Could not submit checkpoint task to the thread pool executor", rej) -- cgit v1.2.3 From 740730a17901f914d0e9d470b8f40e30be33a9bb Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 10 Jan 2014 05:06:15 -0800 Subject: Fixed conf/slaves and updated docs. --- conf/slaves | 3 ++- .../org/apache/spark/util/TimeStampedHashMap.scala | 9 ++++++--- .../apache/spark/streaming/DStreamCheckpointData.scala | 17 +++++++++++++---- .../spark/streaming/dstream/FileInputDStream.scala | 3 ++- 4 files changed, 23 insertions(+), 9 deletions(-) diff --git a/conf/slaves b/conf/slaves index 2fbb50c4a8..da0a01343d 100644 --- a/conf/slaves +++ b/conf/slaves @@ -1 +1,2 @@ -localhost +# A Spark Worker will be started on each of the machines listed below. +localhost \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala index 9ce4ef744e..dde504fc52 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala @@ -26,9 +26,12 @@ import org.apache.spark.Logging /** * This is a custom implementation of scala.collection.mutable.Map which stores the insertion - * time stamp along with each key-value pair. Key-value pairs that are older than a particular - * threshold time can them be removed using the clearOldValues method. This is intended to be a drop-in - * replacement of scala.collection.mutable.HashMap. + * timestamp along with each key-value pair. If specified, the timestamp of each pair can be + * updated every it is accessed. Key-value pairs whose timestamp are older than a particular + * threshold time can them be removed using the clearOldValues method. This is intended to + * be a drop-in replacement of scala.collection.mutable.HashMap. + * @param updateTimeStampOnGet When enabled, the timestamp of a pair will be + * updated when it is accessed */ class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false) extends Map[A, B]() with Logging { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala index 1589bc19a2..671f7bbce7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala @@ -34,8 +34,10 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) // Mapping of the batch time to the checkpointed RDD file of that time @transient private var timeToCheckpointFile = new HashMap[Time, String] - // Mapping of the batch time to the time of the oldest checkpointed RDD in that batch's checkpoint data + // Mapping of the batch time to the time of the oldest checkpointed RDD + // in that batch's checkpoint data @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] + @transient private var fileSystem : FileSystem = null protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] @@ -55,19 +57,26 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) if (!checkpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles + // Add the current checkpoint files to the map of all checkpoint files + // This will be used to delete old checkpoint files timeToCheckpointFile ++= currentCheckpointFiles + // Remember the time of the oldest checkpoint RDD in current state timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) } } /** - * Cleanup old checkpoint data. This gets called every time the graph - * checkpoint is initiated, but after `update` is called. Default - * implementation, cleans up old checkpoint files. + * Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been + * written to the checkpoint directory. */ def cleanup(time: Time) { + // Get the time of the oldest checkpointed RDD that was written as part of the + // checkpoint of `time` timeToOldestCheckpointFileTime.remove(time) match { case Some(lastCheckpointFileTime) => + // Find all the checkpointed RDDs (i.e. files) that are older than `lastCheckpointFileTime` + // This is because checkpointed RDDs older than this are not going to be needed + // even after master fails, as the checkpoint data of `time` does not refer to those files val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime) logDebug("Files to delete:\n" + filesToDelete.mkString(",")) filesToDelete.foreach { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 4585e3f6bd..38aa119239 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -98,7 +98,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas (time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) logDebug("Cleared files are:\n" + oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) - // Delete file times that weren't accessed in the last round of getting new files + // Delete file mod times that weren't accessed in the last round of getting new files fileModTimes.clearOldValues(lastNewFileFindingTime - 1) } @@ -147,6 +147,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } private def getFileModTime(path: Path) = { + // Get file mod time from cache or fetch it from the file system fileModTimes.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime()) } -- cgit v1.2.3 From e4bb845238d0df48f8258e925caf9af5a107af46 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 10 Jan 2014 12:17:09 -0800 Subject: Updated docs based on Patrick's comments in PR 383. --- .../org/apache/spark/util/TimeStampedHashMap.scala | 4 +- .../streaming/examples/NetworkWordCount.scala | 3 +- .../examples/RecoverableNetworkWordCount.scala | 49 +++++++++++++++++----- .../org/apache/spark/streaming/Checkpoint.scala | 13 ++++-- .../streaming/api/java/JavaStreamingContext.scala | 14 +++---- 5 files changed, 58 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala index dde504fc52..8e07a0f29a 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala @@ -27,8 +27,8 @@ import org.apache.spark.Logging /** * This is a custom implementation of scala.collection.mutable.Map which stores the insertion * timestamp along with each key-value pair. If specified, the timestamp of each pair can be - * updated every it is accessed. Key-value pairs whose timestamp are older than a particular - * threshold time can them be removed using the clearOldValues method. This is intended to + * updated every time it is accessed. Key-value pairs whose timestamp are older than a particular + * threshold time can then be removed using the clearOldValues method. This is intended to * be a drop-in replacement of scala.collection.mutable.HashMap. * @param updateTimeStampOnGet When enabled, the timestamp of a pair will be * updated when it is accessed diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala index aba1704825..4b896eaccb 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala @@ -21,7 +21,8 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ /** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Counts words in text encoded with UTF8 received from the network every second. + * * Usage: NetworkWordCount * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. * and describe the TCP server that Spark Streaming would connect to receive data. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala index 739f805e87..d51e6e9418 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala @@ -26,18 +26,41 @@ import com.google.common.io.Files import java.nio.charset.Charset /** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. - * Usage: NetworkWordCount + * Counts words in text encoded with UTF8 received from the network every second. + * + * Usage: NetworkWordCount * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. * and describe the TCP server that Spark Streaming would connect to receive data. - * directory in a Hadoop compatible file system to which checkpoint - * data will be saved to; this must be a fault-tolerant file system - * like HDFS for the system to recover from driver failures - * directory to HDFS-compatible file system which checkpoint data + * file to which the word counts will be appended + * + * In local mode, should be 'local[n]' with n > 1 + * and must be absolute paths + * + * * To run this on your local machine, you need to first run a Netcat server - * `$ nc -lk 9999` - * and then run the example - * `$ ./run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999` + * + * `$ nc -lk 9999` + * + * and run the example as + * + * `$ ./run-example org.apache.spark.streaming.examples.RecoverableNetworkWordCount \ + * local[2] localhost 9999 ~/checkpoint/ ~/out` + * + * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create + * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if + * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from + * the checkpoint data. + * + * To run this example in a local standalone cluster with automatic driver recovery, + * + * `$ ./spark-class org.apache.spark.deploy.Client -s launch \ + * org.apache.spark.streaming.examples.RecoverableNetworkWordCount \ + * localhost 9999 ~/checkpoint ~/out` + * + * would typically be /examples/target/scala-XX/spark-examples....jar + * + * Refer to the online documentation for more details. */ object RecoverableNetworkWordCount { @@ -52,7 +75,7 @@ object RecoverableNetworkWordCount { // Create the context with a 1 second batch size val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1), - System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') @@ -74,9 +97,13 @@ object RecoverableNetworkWordCount { System.err.println( """ |Usage: RecoverableNetworkWordCount + | is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + | and describe the TCP server that Spark Streaming would connect to receive data. + | directory to HDFS-compatible file system which checkpoint data + | file to which the word counts will be appended | |In local mode, should be 'local[n]' with n > 1 - |Both and should be full paths + |Both and must be absolute paths """.stripMargin ) System.exit(1) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 62b225382e..1249ef4c3d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -43,8 +43,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val pendingTimes = ssc.scheduler.getPendingTimes() val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConf = ssc.conf - - // do not save these configurations + + // These should be unset when a checkpoint is deserialized, + // otherwise the SparkContext won't initialize correctly. sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port") def validate() { @@ -102,8 +103,12 @@ object Checkpoint extends Logging { * Convenience class to handle the writing of graph checkpoint to file */ private[streaming] -class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDir: String, hadoopConf: Configuration) - extends Logging { +class CheckpointWriter( + jobGenerator: JobGenerator, + conf: SparkConf, + checkpointDir: String, + hadoopConf: Configuration + ) extends Logging { val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) val compressionCodec = CompressionCodec.createCodec(conf) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index d96e9ac7b7..523173d45a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -489,15 +489,15 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * JavaStreamingContext object contains a number of static utility functions. + * JavaStreamingContext object contains a number of utility functions. */ object JavaStreamingContext { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the StreamingContext - * will be created by called the provided `creatingFunc`. + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. * * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext @@ -515,8 +515,8 @@ object JavaStreamingContext { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the StreamingContext - * will be created by called the provided `creatingFunc`. + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. * * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext @@ -537,8 +537,8 @@ object JavaStreamingContext { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the StreamingContext - * will be created by called the provided `creatingFunc`. + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. * * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext -- cgit v1.2.3 From 82f07deeda89be2ad34e39ce83ac624c73b8d6e1 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 10 Jan 2014 15:37:05 -0800 Subject: Modified streaming.FailureSuite tests to test StreamingContext.getOrCreate. --- .../spark/streaming/util/MasterFailureTest.scala | 55 +++++++++++++--------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 1559f7a9f7..162b19d7f0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -42,6 +42,7 @@ object MasterFailureTest extends Logging { @volatile var killed = false @volatile var killCount = 0 + @volatile var setupCalled = false def main(args: Array[String]) { if (args.size < 2) { @@ -131,8 +132,26 @@ object MasterFailureTest extends Logging { // Just making sure that the expected output does not have duplicates assert(expectedOutput.distinct.toSet == expectedOutput.toSet) + // Reset all state + reset() + + // Create the directories for this test + val uuid = UUID.randomUUID().toString + val rootDir = new Path(directory, uuid) + val fs = rootDir.getFileSystem(new Configuration()) + val checkpointDir = new Path(rootDir, "checkpoint") + val testDir = new Path(rootDir, "test") + fs.mkdirs(checkpointDir) + fs.mkdirs(testDir) + // Setup the stream computation with the given operation - val (ssc, checkpointDir, testDir) = setupStreams(directory, batchDuration, operation) + val ssc = StreamingContext.getOrCreate(checkpointDir.toString, () => { + setupStreams(batchDuration, operation, checkpointDir, testDir) + }) + + // Check if setupStream was called to create StreamingContext + // (and not created from checkpoint file) + assert(setupCalled, "Setup was not called in the first call to StreamingContext.getOrCreate") // Start generating files in the a different thread val fileGeneratingThread = new FileGeneratingThread(input, testDir, batchDuration.milliseconds) @@ -144,9 +163,7 @@ object MasterFailureTest extends Logging { val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2 val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun) - // Delete directories fileGeneratingThread.join() - val fs = checkpointDir.getFileSystem(new Configuration()) fs.delete(checkpointDir, true) fs.delete(testDir, true) logInfo("Finished test after " + killCount + " failures") @@ -159,32 +176,24 @@ object MasterFailureTest extends Logging { * files should be written for testing. */ private def setupStreams[T: ClassTag]( - directory: String, batchDuration: Duration, - operation: DStream[String] => DStream[T] - ): (StreamingContext, Path, Path) = { - // Reset all state - reset() - - // Create the directories for this test - val uuid = UUID.randomUUID().toString - val rootDir = new Path(directory, uuid) - val fs = rootDir.getFileSystem(new Configuration()) - val checkpointDir = new Path(rootDir, "checkpoint") - val testDir = new Path(rootDir, "test") - fs.mkdirs(checkpointDir) - fs.mkdirs(testDir) + operation: DStream[String] => DStream[T], + checkpointDir: Path, + testDir: Path + ): StreamingContext = { + // Mark that setup was called + setupCalled = true // Setup the streaming computation with the given operation System.clearProperty("spark.driver.port") System.clearProperty("spark.hostPort") - var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) + val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) ssc.checkpoint(checkpointDir.toString) val inputStream = ssc.textFileStream(testDir.toString) val operatedStream = operation(inputStream) val outputStream = new TestOutputStream(operatedStream) ssc.registerOutputStream(outputStream) - (ssc, checkpointDir, testDir) + ssc } @@ -204,7 +213,7 @@ object MasterFailureTest extends Logging { var isTimedOut = false val mergedOutput = new ArrayBuffer[T]() val checkpointDir = ssc.checkpointDir - var batchDuration = ssc.graph.batchDuration + val batchDuration = ssc.graph.batchDuration while(!isLastOutputGenerated && !isTimedOut) { // Get the output buffer @@ -261,7 +270,10 @@ object MasterFailureTest extends Logging { ) Thread.sleep(sleepTime) // Recreate the streaming context from checkpoint - ssc = new StreamingContext(checkpointDir) + ssc = StreamingContext.getOrCreate(checkpointDir, () => { + throw new Exception("Trying to create new context when it " + + "should be reading from checkpoint file") + }) } } mergedOutput @@ -297,6 +309,7 @@ object MasterFailureTest extends Logging { private def reset() { killed = false killCount = 0 + setupCalled = false } } -- cgit v1.2.3