diff options
14 files changed, 269 insertions, 196 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ad3337d94c..7514ce58fb 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -19,7 +19,7 @@ package org.apache.spark import java.io._ import java.net.URI -import java.util.Properties +import java.util.{UUID, Properties} import java.util.concurrent.atomic.AtomicInteger import scala.collection.Map @@ -855,22 +855,15 @@ class SparkContext( /** * Set the directory under which RDDs are going to be checkpointed. The directory must - * be a HDFS path if running on a cluster. If the directory does not exist, it will - * be created. If the directory exists and useExisting is set to true, then the - * exisiting directory will be used. Otherwise an exception will be thrown to - * prevent accidental overriding of checkpoint files in the existing directory. + * be a HDFS path if running on a cluster. */ - def setCheckpointDir(dir: String, useExisting: Boolean = false) { - val path = new Path(dir) - val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration()) - if (!useExisting) { - if (fs.exists(path)) { - throw new Exception("Checkpoint directory '" + path + "' already exists.") - } else { - fs.mkdirs(path) - } + def setCheckpointDir(directory: String) { + checkpointDir = Option(directory).map { dir => + val path = new Path(dir, UUID.randomUUID().toString) + val fs = path.getFileSystem(hadoopConfiguration) + fs.mkdirs(path) + fs.getFileStatus(path).getPath().toString } - checkpointDir = Some(dir) } /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index acf328aa6a..50f2021d01 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -381,20 +381,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork /** * Set the directory under which RDDs are going to be checkpointed. The directory must - * be a HDFS path if running on a cluster. If the directory does not exist, it will - * be created. If the directory exists and useExisting is set to true, then the - * exisiting directory will be used. Otherwise an exception will be thrown to - * prevent accidental overriding of checkpoint files in the existing directory. - */ - def setCheckpointDir(dir: String, useExisting: Boolean) { - sc.setCheckpointDir(dir, useExisting) - } - - /** - * Set the directory under which RDDs are going to be checkpointed. The directory must - * be a HDFS path if running on a cluster. If the directory does not exist, it will - * be created. If the directory exists, an exception will be thrown to prevent accidental - * overriding of checkpoint files. + * be a HDFS path if running on a cluster. */ def setCheckpointDir(dir: String) { sc.setCheckpointDir(dir) diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index a712ef1c27..293a7d1f68 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -18,12 +18,12 @@ package org.apache.spark.rdd import java.io.IOException - import scala.reflect.ClassTag - -import org.apache.hadoop.fs.Path import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {} @@ -34,6 +34,8 @@ private[spark] class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String) extends RDD[T](sc, Nil) { + val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration)) + @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration) override def getPartitions: Array[Partition] = { @@ -65,7 +67,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String) override def compute(split: Partition, context: TaskContext): Iterator[T] = { val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index)) - CheckpointRDD.readFromFile(file, context) + CheckpointRDD.readFromFile(file, broadcastedConf, context) } override def checkpoint() { @@ -79,10 +81,14 @@ private[spark] object CheckpointRDD extends Logging { "part-%05d".format(splitId) } - def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) { + def writeToFile[T]( + path: String, + broadcastedConf: Broadcast[SerializableWritable[Configuration]], + blockSize: Int = -1 + )(ctx: TaskContext, iterator: Iterator[T]) { val env = SparkEnv.get val outputDir = new Path(path) - val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration()) + val fs = outputDir.getFileSystem(broadcastedConf.value.value) val finalOutputName = splitIdToFile(ctx.partitionId) val finalOutputPath = new Path(outputDir, finalOutputName) @@ -119,9 +125,13 @@ private[spark] object CheckpointRDD extends Logging { } } - def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = { + def readFromFile[T]( + path: Path, + broadcastedConf: Broadcast[SerializableWritable[Configuration]], + context: TaskContext + ): Iterator[T] = { val env = SparkEnv.get - val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration()) + val fs = path.getFileSystem(broadcastedConf.value.value) val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt val fileInputStream = fs.open(path, bufferSize) val serializer = env.serializer.newInstance() @@ -144,8 +154,10 @@ private[spark] object CheckpointRDD extends Logging { val sc = new SparkContext(cluster, "CheckpointRDD Test") val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000) val path = new Path(hdfsPath, "temp") - val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration()) - sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _) + val conf = SparkHadoopUtil.get.newConfiguration() + val fs = path.getFileSystem(conf) + val broadcastedConf = sc.broadcast(new SerializableWritable(conf)) + sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf, 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same") diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala index 3b56e45aa9..642dabaad5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala @@ -22,7 +22,7 @@ import scala.reflect.ClassTag import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration -import org.apache.spark.{Partition, SparkException, Logging} +import org.apache.spark.{SerializableWritable, Partition, SparkException, Logging} import org.apache.spark.scheduler.{ResultTask, ShuffleMapTask} /** @@ -85,14 +85,21 @@ private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T]) // Create the output path for the checkpoint val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id) - val fs = path.getFileSystem(new Configuration()) + val fs = path.getFileSystem(rdd.context.hadoopConfiguration) if (!fs.mkdirs(path)) { throw new SparkException("Failed to create checkpoint path " + path) } // Save to file, and reload it as an RDD - rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _) + val broadcastedConf = rdd.context.broadcast( + new SerializableWritable(rdd.context.hadoopConfiguration)) + rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf) _) val newRDD = new CheckpointRDD[T](rdd.context, path.toString) + if (newRDD.partitions.size != rdd.partitions.size) { + throw new SparkException( + "Checkpoint RDD " + newRDD + "("+ newRDD.partitions.size + ") has different " + + "number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")") + } // Change the dependencies and partitions of the RDD RDDCheckpointData.synchronized { @@ -101,8 +108,8 @@ private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T]) rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions cpState = Checkpointed RDDCheckpointData.clearTaskCaches() - logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id) } + logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id) } // Get preferred location of a split after checkpointing diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 79913dc718..5e2899c97b 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -851,7 +851,7 @@ public class JavaAPISuite implements Serializable { public void checkpointAndComputation() { File tempDir = Files.createTempDir(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - sc.setCheckpointDir(tempDir.getAbsolutePath(), true); + sc.setCheckpointDir(tempDir.getAbsolutePath()); Assert.assertEquals(false, rdd.isCheckpointed()); rdd.checkpoint(); rdd.count(); // Forces the DAG to cause a checkpoint @@ -863,7 +863,7 @@ public class JavaAPISuite implements Serializable { public void checkpointAndRestore() { File tempDir = Files.createTempDir(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - sc.setCheckpointDir(tempDir.getAbsolutePath(), true); + sc.setCheckpointDir(tempDir.getAbsolutePath()); Assert.assertEquals(false, rdd.isCheckpointed()); rdd.checkpoint(); rdd.count(); // Forces the DAG to cause a checkpoint diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 0604f6836c..108f36576a 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -320,17 +320,12 @@ class SparkContext(object): self._python_includes.append(filename) sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode - def setCheckpointDir(self, dirName, useExisting=False): + def setCheckpointDir(self, dirName): """ Set the directory under which RDDs are going to be checkpointed. The directory must be a HDFS path if running on a cluster. - - If the directory does not exist, it will be created. If the directory - exists and C{useExisting} is set to true, then the exisiting directory - will be used. Otherwise an exception will be thrown to prevent - accidental overriding of checkpoint files in the existing directory. """ - self._jsc.sc().setCheckpointDir(dirName, useExisting) + self._jsc.sc().setCheckpointDir(dirName) def _getJavaStorageLevel(self, storageLevel): """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 3987642bf4..7acb6eaf10 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -73,8 +73,8 @@ class TestCheckpoint(PySparkTestCase): time.sleep(1) # 1 second self.assertTrue(flatMappedRDD.isCheckpointed()) self.assertEqual(flatMappedRDD.collect(), result) - self.assertEqual(self.checkpointDir.name, - os.path.dirname(flatMappedRDD.getCheckpointFile())) + self.assertEqual("file:" + self.checkpointDir.name, + os.path.dirname(os.path.dirname(flatMappedRDD.getCheckpointFile()))) def test_checkpoint_and_restore(self): parCollection = self.sc.parallelize([1, 2, 3, 4]) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 7b343d2376..4960a85b97 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -21,12 +21,13 @@ import java.io._ import java.util.concurrent.Executors import java.util.concurrent.RejectedExecutionException -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration import org.apache.spark.Logging import org.apache.spark.io.CompressionCodec import org.apache.spark.util.MetadataCleaner +import org.apache.spark.deploy.SparkHadoopUtil private[streaming] @@ -54,36 +55,34 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) /** - * Convenience class to speed up the writing of graph checkpoint to file + * Convenience class to handle the writing of graph checkpoint to file */ private[streaming] -class CheckpointWriter(checkpointDir: String) extends Logging { +class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends Logging { val file = new Path(checkpointDir, "graph") - // The file to which we actually write - and then "move" to file. - private val writeFile = new Path(file.getParent, file.getName + ".next") - private val bakFile = new Path(file.getParent, file.getName + ".bk") - - private var stopped = false - - val conf = new Configuration() - var fs = file.getFileSystem(conf) - val maxAttempts = 3 + val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) + val compressionCodec = CompressionCodec.createCodec() + // The file to which we actually write - and then "move" to file + val writeFile = new Path(file.getParent, file.getName + ".next") + // The file to which existing checkpoint is backed up (i.e. "moved") + val bakFile = new Path(file.getParent, file.getName + ".bk") - private val compressionCodec = CompressionCodec.createCodec() + private var stopped = false + private var fs_ : FileSystem = _ - // Removed code which validates whether there is only one CheckpointWriter per path 'file' since + // Removed code which validates whether there is only one CheckpointWriter per path 'file' since // I did not notice any errors - reintroduce it ? - class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { def run() { var attempts = 0 val startTime = System.currentTimeMillis() - while (attempts < maxAttempts) { + while (attempts < MAX_ATTEMPTS && !stopped) { attempts += 1 try { logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") - // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast. + // This is inherently thread unsafe, so alleviating it by writing to '.new' and + // then moving it to the final file val fos = fs.create(writeFile) fos.write(bytes) fos.close() @@ -101,6 +100,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging { } catch { case ioe: IOException => logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe) + reset() } } logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'") @@ -133,7 +133,17 @@ class CheckpointWriter(checkpointDir: String) extends Logging { val startTime = System.currentTimeMillis() val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS) val endTime = System.currentTimeMillis() - logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.") + logInfo("CheckpointWriter executor terminated ? " + terminated + + ", waited for " + (endTime - startTime) + " ms.") + } + + private def fs = synchronized { + if (fs_ == null) fs_ = file.getFileSystem(hadoopConf) + fs_ + } + + private def reset() = synchronized { + fs_ = null } } @@ -143,7 +153,8 @@ object CheckpointReader extends Logging { def read(path: String): Checkpoint = { val fs = new Path(path).getFileSystem(new Configuration()) - val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk")) + val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), + new Path(path), new Path(path + ".bk")) val compressionCodec = CompressionCodec.createCodec() @@ -158,7 +169,8 @@ object CheckpointReader extends Logging { // loader to find and load classes. This is a well know Java issue and has popped up // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) val zis = compressionCodec.compressedInputStream(fis) - val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader) + val ois = new ObjectInputStreamWithLoader(zis, + Thread.currentThread().getContextClassLoader) val cp = ois.readObject.asInstanceOf[Checkpoint] ois.close() fs.close() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 41da028a3c..8898fdcb7f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -20,17 +20,19 @@ package org.apache.spark.streaming import akka.actor.Props import akka.actor.SupervisorStrategy import akka.zeromq.Subscribe +import akka.util.ByteString import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.StreamingListener import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.receivers.ActorReceiver import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy import org.apache.spark.streaming.receivers.ZeroMQReceiver import org.apache.spark.storage.StorageLevel import org.apache.spark.util.MetadataCleaner import org.apache.spark.streaming.receivers.ActorReceiver +import org.apache.spark.streaming.scheduler.{JobScheduler, NetworkInputTracker} import scala.collection.mutable.Queue import scala.collection.Map @@ -38,17 +40,15 @@ import scala.reflect.ClassTag import java.io.InputStream import java.util.concurrent.atomic.AtomicInteger -import java.util.UUID import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path + import twitter4j.Status import twitter4j.auth.Authorization -import org.apache.spark.streaming.scheduler._ -import akka.util.ByteString /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -87,7 +87,6 @@ class StreamingContext private ( null, batchDuration) } - /** * Re-create a StreamingContext from a checkpoint file. * @param path Path either to the directory that was specified as the checkpoint directory, or @@ -141,7 +140,7 @@ class StreamingContext private ( protected[streaming] var checkpointDir: String = { if (isCheckpointPresent) { - sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true) + sc.setCheckpointDir(cp_.checkpointDir) cp_.checkpointDir } else { null @@ -176,8 +175,12 @@ class StreamingContext private ( */ def checkpoint(directory: String) { if (directory != null) { - sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory)) - checkpointDir = directory + val path = new Path(directory) + val fs = path.getFileSystem(sparkContext.hadoopConfiguration) + fs.mkdirs(path) + val fullPath = fs.getFileStatus(path).getPath().toString + sc.setCheckpointDir(fullPath) + checkpointDir = fullPath } else { checkpointDir = null } @@ -368,7 +371,8 @@ class StreamingContext private ( /** * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. - * File names starting with . are ignored. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file @@ -387,6 +391,8 @@ class StreamingContext private ( /** * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. * @param directory HDFS directory to monitor for new file * @param filter Function to filter paths to process * @param newFilesOnly Should process only new files and ignore existing files in the directory @@ -407,7 +413,9 @@ class StreamingContext private ( /** * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as text files (using key as LongWritable, value - * as Text and input format as TextInputFormat). File names starting with . are ignored. + * as Text and input format as TextInputFormat). Files must be written to the + * monitored directory by "moving" them from another location within the same + * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file */ def textFileStream(directory: String): DStream[String] = { @@ -599,8 +607,4 @@ object StreamingContext { prefix + "-" + time.milliseconds + "." + suffix } } - - protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = { - new Path(sscCheckpointDir, UUID.randomUUID.toString).toString - } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 78d318cf27..aad0d931e7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -256,9 +256,11 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Creates a input stream that monitors a Hadoop-compatible filesystem + * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as text files (using key as LongWritable, value - * as Text and input format as TextInputFormat). File names starting with . are ignored. + * as Text and input format as TextInputFormat). Files must be written to the + * monitored directory by "moving" them from another location within the same + * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file */ def textFileStream(directory: String): JavaDStream[String] = { @@ -300,9 +302,10 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Creates a input stream that monitors a Hadoop-compatible filesystem + * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. - * File names starting with . are ignored. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file @@ -331,7 +334,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** - * Creates a input stream from a Flume source. + * Create a input stream from a Flume source. * @param hostname Hostname of the slave machine to which the flume data will be sent * @param port Port of the slave machine to which the flume data will be sent */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 39e25239bf..fb9eda8996 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -17,18 +17,17 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.UnionRDD -import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time} - +import java.io.{ObjectInputStream, IOException} +import scala.collection.mutable.{HashSet, HashMap} +import scala.reflect.ClassTag import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.spark.SparkException +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.UnionRDD +import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time} -import scala.collection.mutable.{HashSet, HashMap} -import scala.reflect.ClassTag - -import java.io.{ObjectInputStream, IOException} private[streaming] class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag]( @@ -41,8 +40,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData // Latest file mod time seen till any point of time - private val lastModTimeFiles = new HashSet[String]() - private var lastModTime = 0L + private val prevModTimeFiles = new HashSet[String]() + private var prevModTime = 0L @transient private var path_ : Path = null @transient private var fs_ : FileSystem = null @@ -50,11 +49,11 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas override def start() { if (newFilesOnly) { - lastModTime = graph.zeroTime.milliseconds + prevModTime = graph.zeroTime.milliseconds } else { - lastModTime = 0 + prevModTime = 0 } - logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly) + logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly) } override def stop() { } @@ -69,55 +68,22 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas * the previous call. */ override def compute(validTime: Time): Option[RDD[(K, V)]] = { - assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime) + assert(validTime.milliseconds >= prevModTime, + "Trying to get new files for really old time [" + validTime + " < " + prevModTime + "]") - // Create the filter for selecting new files - val newFilter = new PathFilter() { - // Latest file mod time seen in this round of fetching files and its corresponding files - var latestModTime = 0L - val latestModTimeFiles = new HashSet[String]() - - def accept(path: Path): Boolean = { - if (!filter(path)) { // Reject file if it does not satisfy filter - logDebug("Rejected by filter " + path) - return false - } else { // Accept file only if - val modTime = fs.getFileStatus(path).getModificationTime() - logDebug("Mod time for " + path + " is " + modTime) - if (modTime < lastModTime) { - logDebug("Mod time less than last mod time") - return false // If the file was created before the last time it was called - } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) { - logDebug("Mod time equal to last mod time, but file considered already") - return false // If the file was created exactly as lastModTime but not reported yet - } else if (modTime > validTime.milliseconds) { - logDebug("Mod time more than valid time") - return false // If the file was created after the time this function call requires - } - if (modTime > latestModTime) { - latestModTime = modTime - latestModTimeFiles.clear() - logDebug("Latest mod time updated to " + latestModTime) - } - latestModTimeFiles += path.toString - logDebug("Accepted " + path) - return true - } - } - } - logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime) - val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString) + // Find new files + val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) if (newFiles.length > 0) { // Update the modification time and the files processed for that modification time - if (lastModTime != newFilter.latestModTime) { - lastModTime = newFilter.latestModTime - lastModTimeFiles.clear() + if (prevModTime < latestModTime) { + prevModTime = latestModTime + prevModTimeFiles.clear() } - lastModTimeFiles ++= newFilter.latestModTimeFiles - logDebug("Last mod time updated to " + lastModTime) + prevModTimeFiles ++= latestModTimeFiles + logDebug("Last mod time updated to " + prevModTime) } - files += ((validTime, newFiles)) + files += ((validTime, newFiles.toArray)) Some(filesToRDD(newFiles)) } @@ -132,12 +98,28 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) } + /** + * Find files which have modification timestamp <= current time and return a 3-tuple of + * (new files found, latest modification time among them, files with latest modification time) + */ + private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = { + logDebug("Trying to get new files for time " + currentTime) + val filter = new CustomPathFilter(currentTime) + val newFiles = fs.listStatus(path, filter).map(_.getPath.toString) + (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq) + } + /** Generate one RDD from an array of files */ - protected[streaming] def filesToRDD(files: Seq[String]): RDD[(K, V)] = { - new UnionRDD( - context.sparkContext, - files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file)) - ) + private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { + val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file)) + files.zip(fileRDDs).foreach { case (file, rdd) => { + if (rdd.partitions.size == 0) { + logError("File " + file + " has no data in it. Spark Streaming can only ingest " + + "files that have been \"moved\" to the directory assigned to the file stream. " + + "Refer to the streaming programming guide for more details.") + } + }} + new UnionRDD(context.sparkContext, fileRDDs) } private def path: Path = { @@ -150,6 +132,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas fs_ } + private def reset() { + fs_ = null + } + @throws(classOf[IOException]) private def readObject(ois: ObjectInputStream) { logDebug(this.getClass().getSimpleName + ".readObject used") @@ -191,6 +177,51 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]" } } + + /** + * Custom PathFilter class to find new files that have modification timestamps <= current time, + * but have not been seen before (i.e. the file should not be in lastModTimeFiles) + */ + private[streaming] + class CustomPathFilter(maxModTime: Long) extends PathFilter { + // Latest file mod time seen in this round of fetching files and its corresponding files + var latestModTime = 0L + val latestModTimeFiles = new HashSet[String]() + + def accept(path: Path): Boolean = { + try { + if (!filter(path)) { // Reject file if it does not satisfy filter + logDebug("Rejected by filter " + path) + return false + } + val modTime = fs.getFileStatus(path).getModificationTime() + logDebug("Mod time for " + path + " is " + modTime) + if (modTime < prevModTime) { + logDebug("Mod time less than last mod time") + return false // If the file was created before the last time it was called + } else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) { + logDebug("Mod time equal to last mod time, but file considered already") + return false // If the file was created exactly as lastModTime but not reported yet + } else if (modTime > maxModTime) { + logDebug("Mod time more than ") + return false // If the file is too new that considering it may give errors + } + if (modTime > latestModTime) { + latestModTime = modTime + latestModTimeFiles.clear() + logDebug("Latest mod time updated to " + latestModTime) + } + latestModTimeFiles += path.toString + logDebug("Accepted " + path) + } catch { + case fnfe: java.io.FileNotFoundException => + logWarning("Error finding new files", fnfe) + reset() + return false + } + return true + } + } } private[streaming] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 1cd0b9b0a4..921a33a4cb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -17,11 +17,18 @@ package org.apache.spark.streaming.scheduler +import akka.actor.{Props, Actor} import org.apache.spark.SparkEnv import org.apache.spark.Logging import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter} import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock} +/** Event classes for JobGenerator */ +private[scheduler] sealed trait JobGeneratorEvent +private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent +private[scheduler] case class ClearOldMetadata(time: Time) extends JobGeneratorEvent +private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent + /** * This class generates jobs from DStreams as well as drives checkpointing and cleaning * up DStream metadata. @@ -30,44 +37,69 @@ private[streaming] class JobGenerator(jobScheduler: JobScheduler) extends Logging { initLogging() + val ssc = jobScheduler.ssc - val clockClass = System.getProperty( - "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") - val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] - val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, - longTime => generateJobs(new Time(longTime))) val graph = ssc.graph + val eventProcessorActor = ssc.env.actorSystem.actorOf(Props(new Actor { + def receive = { + case event: JobGeneratorEvent => + logDebug("Got event of type " + event.getClass.getName) + processEvent(event) + } + })) + val clock = { + val clockClass = System.getProperty( + "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") + Class.forName(clockClass).newInstance().asInstanceOf[Clock] + } + val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, + longTime => eventProcessorActor ! GenerateJobs(new Time(longTime))) lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { - new CheckpointWriter(ssc.checkpointDir) + new CheckpointWriter(ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) } else { null } - var latestTime: Time = null - def start() = synchronized { if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } - logInfo("JobGenerator started") } - def stop() = synchronized { + def stop() { timer.stop() if (checkpointWriter != null) checkpointWriter.stop() ssc.graph.stop() logInfo("JobGenerator stopped") } + /** + * On batch completion, clear old metadata and checkpoint computation. + */ + private[scheduler] def onBatchCompletion(time: Time) { + eventProcessorActor ! ClearOldMetadata(time) + } + + /** Processes all events */ + private def processEvent(event: JobGeneratorEvent) { + event match { + case GenerateJobs(time) => generateJobs(time) + case ClearOldMetadata(time) => clearOldMetadata(time) + case DoCheckpoint(time) => doCheckpoint(time) + } + } + + /** Starts the generator for the first time */ private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) - logInfo("JobGenerator's timer started at " + startTime) + logInfo("JobGenerator started at " + startTime) } + /** Restarts the generator based on the information in checkpoint */ private def restart() { // If manual clock is being used for testing, then // either set the manual clock to the last checkpointed time, @@ -99,7 +131,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // Restart the timer timer.start(restartTime.milliseconds) - logInfo("JobGenerator's timer restarted at " + restartTime) + logInfo("JobGenerator restarted at " + restartTime) } /** Generate jobs and perform checkpoint for the given `time`. */ @@ -107,16 +139,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { SparkEnv.set(ssc.env) logInfo("\n-----------------------------------------------------\n") jobScheduler.runJobs(time, graph.generateJobs(time)) - latestTime = time - doCheckpoint(time) + eventProcessorActor ! DoCheckpoint(time) } - /** - * On batch completion, clear old metadata and checkpoint computation. - */ - private[streaming] def onBatchCompletion(time: Time) { + /** Clear DStream metadata for the given `time`. */ + private def clearOldMetadata(time: Time) { ssc.graph.clearOldMetadata(time) - doCheckpoint(time) + eventProcessorActor ! DoCheckpoint(time) } /** Perform checkpoint for the give `time`. */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 67a0841535..4e25c9566c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -17,24 +17,18 @@ package org.apache.spark.streaming -import dstream.FileInputDStream -import org.apache.spark.streaming.StreamingContext._ import java.io.File import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag - import org.apache.commons.io.FileUtils -import org.scalatest.BeforeAndAfter - import com.google.common.io.Files - -import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.hadoop.conf.Configuration +import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.FileInputDStream import org.apache.spark.streaming.util.ManualClock - - /** * This test suites tests the checkpointing functionality of DStreams - * the checkpointing of a DStream's RDDs as well as the checkpointing of @@ -66,7 +60,7 @@ class CheckpointSuite extends TestSuiteBase { System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") val stateStreamCheckpointInterval = Seconds(1) - + val fs = FileSystem.getLocal(new Configuration()) // this ensure checkpointing occurs at least once val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2 val secondNumBatches = firstNumBatches @@ -90,11 +84,12 @@ class CheckpointSuite extends TestSuiteBase { ssc.start() advanceTimeWithRealDelay(ssc, firstNumBatches) logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData) - assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure") + assert(!stateStream.checkpointData.checkpointFiles.isEmpty, + "No checkpointed RDDs in state stream before first failure") stateStream.checkpointData.checkpointFiles.foreach { - case (time, data) => { - val file = new File(data.toString) - assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist") + case (time, file) => { + assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time + + " for state stream before first failure does not exist") } } @@ -102,7 +97,8 @@ class CheckpointSuite extends TestSuiteBase { // and check whether the earlier checkpoint files are deleted val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2)) advanceTimeWithRealDelay(ssc, secondNumBatches) - checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted")) + checkpointFiles.foreach(file => + assert(!file.exists, "Checkpoint file '" + file + "' was not deleted")) ssc.stop() // Restart stream computation using the checkpoint file and check whether @@ -110,19 +106,20 @@ class CheckpointSuite extends TestSuiteBase { ssc = new StreamingContext(checkpointDir) stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]") - assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from first failure") + assert(!stateStream.generatedRDDs.isEmpty, + "No restored RDDs in state stream after recovery from first failure") // Run one batch to generate a new checkpoint file and check whether some RDD // is present in the checkpoint data or not ssc.start() advanceTimeWithRealDelay(ssc, 1) - assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure") + assert(!stateStream.checkpointData.checkpointFiles.isEmpty, + "No checkpointed RDDs in state stream before second failure") stateStream.checkpointData.checkpointFiles.foreach { - case (time, data) => { - val file = new File(data.toString) - assert(file.exists(), - "Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist") + case (time, file) => { + assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time + + " for state stream before seconds failure does not exist") } } ssc.stop() @@ -132,7 +129,8 @@ class CheckpointSuite extends TestSuiteBase { ssc = new StreamingContext(checkpointDir) stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]") - assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure") + assert(!stateStream.generatedRDDs.isEmpty, + "No restored RDDs in state stream after recovery from second failure") // Adjust manual clock time as if it is being restarted after a delay System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString) @@ -143,6 +141,7 @@ class CheckpointSuite extends TestSuiteBase { ssc = null } + // This tests whether the systm can recover from a master failure with simple // non-stateful operations. This assumes as reliable, replayable input // source - TestInputDStream. @@ -191,6 +190,7 @@ class CheckpointSuite extends TestSuiteBase { testCheckpointedOperation(input, operation, output, 7) } + // This tests whether file input stream remembers what files were seen before // the master failure and uses them again to process a large window operation. // It also tests whether batches, whose processing was incomplete due to the diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 62a9f120b4..5fa14ad7c4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -23,7 +23,7 @@ import akka.actor.IOManager import akka.actor.Props import akka.util.ByteString -import org.apache.spark.streaming.dstream.{NetworkReceiver, SparkFlumeEvent} +import org.apache.spark.streaming.dstream.{FileInputDStream, NetworkReceiver, SparkFlumeEvent} import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} |