diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-10 05:06:22 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-10 05:06:22 -0800 |
commit | 2213a5a47fb2d73e3fdebec24356c69ea2968b81 (patch) | |
tree | 5dae7bbf739ae6d57bd95ffb145ab876b7165ef9 /streaming | |
parent | 740730a17901f914d0e9d470b8f40e30be33a9bb (diff) | |
parent | 4f609f79015732a91a83c5625d357c4edfc7c962 (diff) | |
download | spark-2213a5a47fb2d73e3fdebec24356c69ea2968b81.tar.gz spark-2213a5a47fb2d73e3fdebec24356c69ea2968b81.tar.bz2 spark-2213a5a47fb2d73e3fdebec24356c69ea2968b81.zip |
Merge branch 'driver-test' of github.com:tdas/incubator-spark into driver-test
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala | 150 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala | 1 |
2 files changed, 96 insertions, 55 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index d268b68f90..62b225382e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -43,6 +43,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val pendingTimes = ssc.scheduler.getPendingTimes() val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConf = ssc.conf + + // do not save these configurations + sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port") def validate() { assert(master != null, "Checkpoint.master is null") @@ -53,6 +56,47 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) } } +private[streaming] +object Checkpoint extends Logging { + val PREFIX = "checkpoint-" + val REGEX = (PREFIX + """([\d]+)([\w\.]*)""").r + + /** Get the checkpoint file for the given checkpoint time */ + def checkpointFile(checkpointDir: String, checkpointTime: Time) = { + new Path(checkpointDir, PREFIX + checkpointTime.milliseconds) + } + + /** Get the checkpoint backup file for the given checkpoint time */ + def checkpointBackupFile(checkpointDir: String, checkpointTime: Time) = { + new Path(checkpointDir, PREFIX + checkpointTime.milliseconds + ".bk") + } + + /** Get checkpoint files present in the give directory, ordered by oldest-first */ + def getCheckpointFiles(checkpointDir: String, fs: FileSystem): Seq[Path] = { + def sortFunc(path1: Path, path2: Path): Boolean = { + val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) } + val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) } + (time1 < time2) || (time1 == time2 && bk1) + } + + val path = new Path(checkpointDir) + if (fs.exists(path)) { + val statuses = fs.listStatus(path) + if (statuses != null) { + val paths = statuses.map(_.getPath) + val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty) + filtered.sortWith(sortFunc) + } else { + logWarning("Listing " + path + " returned null") + Seq.empty + } + } else { + logInfo("Checkpoint directory " + path + " does not exist") + Seq.empty + } + } +} + /** * Convenience class to handle the writing of graph checkpoint to file @@ -60,58 +104,67 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) private[streaming] class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDir: String, hadoopConf: Configuration) extends Logging { - val file = new Path(checkpointDir, "graph") val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) val compressionCodec = CompressionCodec.createCodec(conf) - // The file to which we actually write - and then "move" to file - val writeFile = new Path(file.getParent, file.getName + ".next") - // The file to which existing checkpoint is backed up (i.e. "moved") - val bakFile = new Path(file.getParent, file.getName + ".bk") - private var stopped = false private var fs_ : FileSystem = _ - // Removed code which validates whether there is only one CheckpointWriter per path 'file' since - // I did not notice any errors - reintroduce it ? class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { def run() { var attempts = 0 val startTime = System.currentTimeMillis() + val tempFile = new Path(checkpointDir, "temp") + val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime) + val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime) + while (attempts < MAX_ATTEMPTS && !stopped) { attempts += 1 try { - logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") - // This is inherently thread unsafe, so alleviating it by writing to '.next' and - // then moving it to the final file - val fos = fs.create(writeFile) + logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile + "'") + + // Write checkpoint to temp file + fs.delete(tempFile, true) // just in case it exists + val fos = fs.create(tempFile) fos.write(bytes) fos.close() - // Back up existing checkpoint if it exists - if (fs.exists(file) && fs.rename(file, bakFile)) { - logDebug("Moved existing checkpoint file to " + bakFile) + // If the checkpoint file exists, back it up + // If the backup exists as well, just delete it, otherwise rename will fail + if (fs.exists(checkpointFile)) { + fs.delete(backupFile, true) // just in case it exists + if (!fs.rename(checkpointFile, backupFile)) { + logWarning("Could not rename " + checkpointFile + " to " + backupFile) + } } - fs.delete(file, false) // paranoia - - // Rename temp written file to the right location - if (fs.rename(writeFile, file)) { - val finishTime = System.currentTimeMillis() - logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + - "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms") - jobGenerator.onCheckpointCompletion(checkpointTime) - } else { - throw new SparkException("Failed to rename checkpoint file from " - + writeFile + " to " + file) + + // Rename temp file to the final checkpoint file + if (!fs.rename(tempFile, checkpointFile)) { + logWarning("Could not rename " + tempFile + " to " + checkpointFile) } + + // Delete old checkpoint files + val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs) + if (allCheckpointFiles.size > 4) { + allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => { + logInfo("Deleting " + file) + fs.delete(file, true) + }) + } + + // All done, print success + val finishTime = System.currentTimeMillis() + logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile + + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms") + jobGenerator.onCheckpointCompletion(checkpointTime) return } catch { case ioe: IOException => - logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe) + logWarning("Error in attempt " + attempts + " of writing checkpoint to " + checkpointFile, ioe) reset() } } - logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'") + logWarning("Could not write checkpoint for time " + checkpointTime + " to file " + checkpointFile + "'") } } @@ -124,7 +177,7 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi bos.close() try { executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) - logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue") + logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue") } catch { case rej: RejectedExecutionException => logError("Could not submit checkpoint task to the thread pool executor", rej) @@ -147,7 +200,7 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi } private def fs = synchronized { - if (fs_ == null) fs_ = file.getFileSystem(hadoopConf) + if (fs_ == null) fs_ = new Path(checkpointDir).getFileSystem(hadoopConf) fs_ } @@ -160,36 +213,21 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi private[streaming] object CheckpointReader extends Logging { - private val graphFileNames = Seq("graph", "graph.bk") - def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = { val checkpointPath = new Path(checkpointDir) def fs = checkpointPath.getFileSystem(hadoopConf) - // See if the checkpoint directory exists - if (!fs.exists(checkpointPath)) { - logInfo("Could not load checkpoint as path '" + checkpointPath + "' does not exist") - return None - } - - // Try to find the checkpoint data - val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists) - if (existingFiles.isEmpty) { - logInfo("Could not load checkpoint as checkpoint data was not " + - "found in directory " + checkpointDir + "") - val statuses = fs.listStatus(checkpointPath) - if (statuses!=null) { - logInfo("Checkpoint directory " + checkpointDir + " contains the files:\n" + - statuses.mkString("\n")) - } + // Try to find the checkpoint files + val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse + if (checkpointFiles.isEmpty) { return None } - logInfo("Checkpoint files found: " + existingFiles.mkString(",")) - // Try to read the checkpoint data + // Try to read the checkpoint files in the order + logInfo("Checkpoint files found: " + checkpointFiles.mkString(",")) val compressionCodec = CompressionCodec.createCodec(conf) - existingFiles.foreach(file => { - logInfo("Attempting to load checkpoint from file '" + file + "'") + checkpointFiles.foreach(file => { + logInfo("Attempting to load checkpoint from file " + file) try { val fis = fs.open(file) // ObjectInputStream uses the last defined user-defined class loader in the stack @@ -204,15 +242,17 @@ object CheckpointReader extends Logging { ois.close() fs.close() cp.validate() - logInfo("Checkpoint successfully loaded from file '" + file + "'") + logInfo("Checkpoint successfully loaded from file " + file) logInfo("Checkpoint was generated at time " + cp.checkpointTime) return Some(cp) } catch { case e: Exception => - logWarning("Error reading checkpoint from file '" + file + "'", e) + logWarning("Error reading checkpoint from file " + file, e) } }) - throw new SparkException("Failed to read checkpoint from directory '" + checkpointDir + "'") + + // If none of checkpoint files could be read, then throw exception + throw new SparkException("Failed to read checkpoint from directory " + checkpointPath) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 38aa119239..1f0f31c4b1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -113,6 +113,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) val timeTaken = System.currentTimeMillis - lastNewFileFindingTime logInfo("Finding new files took " + timeTaken + " ms") + logDebug("# cached file times = " + fileModTimes.size) if (timeTaken > slideDuration.milliseconds) { logWarning( "Time taken to find new files exceeds the batch size. " + |