diff options
35 files changed, 181 insertions, 356 deletions
diff --git a/bagel/src/test/resources/log4j.properties b/bagel/src/test/resources/log4j.properties index 4c99e450bc..83d05cab2f 100644 --- a/bagel/src/test/resources/log4j.properties +++ b/bagel/src/test/resources/log4j.properties @@ -1,8 +1,8 @@ -# Set everything to be logged to the console +# Set everything to be logged to the file bagel/target/unit-tests.log log4j.rootCategory=INFO, file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=spark-tests.log +log4j.appender.file.file=bagel/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/conf/streaming-env.sh.template b/conf/streaming-env.sh.template deleted file mode 100755 index 1ea9ba5541..0000000000 --- a/conf/streaming-env.sh.template +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env bash - -# This file contains a few additional setting that are useful for -# running streaming jobs in Spark. Copy this file as streaming-env.sh . -# Note that this shell script will be read after spark-env.sh, so settings -# in this file may override similar settings (if present) in spark-env.sh . - - -# Using concurrent GC is strongly recommended as it can significantly -# reduce GC related pauses. - -SPARK_JAVA_OPTS+=" -XX:+UseConcMarkSweepGC" - -# Using Kryo serialization can improve serialization performance -# and therefore the throughput of the Spark Streaming programs. However, -# using Kryo serialization with custom classes may required you to -# register the classes with Kryo. Refer to the Spark documentation -# for more details. - -# SPARK_JAVA_OPTS+=" -Dspark.serializer=spark.KryoSerializer" - -export SPARK_JAVA_OPTS diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 7d320c4fe5..86ad737583 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -39,7 +39,7 @@ private[spark] class CacheTrackerActor extends Actor with Logging { private val slaveCapacity = new HashMap[String, Long] private val slaveUsage = new HashMap[String, Long] - private val metadataCleaner = new MetadataCleaner("CacheTrackerActor", locs.cleanup) + private val metadataCleaner = new MetadataCleaner("CacheTrackerActor", locs.clearOldValues) private def getCacheUsage(host: String): Long = slaveUsage.getOrElse(host, 0L) private def getCacheCapacity(host: String): Long = slaveCapacity.getOrElse(host, 0L) @@ -120,7 +120,7 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b // Remembers which splits are currently being loaded (on worker nodes) val loading = new HashSet[String] - val metadataCleaner = new MetadataCleaner("CacheTracker", registeredRddIds.cleanup) + val metadataCleaner = new MetadataCleaner("CacheTracker", registeredRddIds.clearOldValues) // Send a message to the trackerActor and get its result within a default timeout, or // throw a SparkException if this fails. diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index 5ebdba0fc8..a2fa2d1ea7 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -178,8 +178,8 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea } def cleanup(cleanupTime: Long) { - mapStatuses.cleanup(cleanupTime) - cachedSerializedStatuses.cleanup(cleanupTime) + mapStatuses.clearOldValues(cleanupTime) + cachedSerializedStatuses.clearOldValues(cleanupTime) } def stop() { diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala index 7af830940f..d845a522e4 100644 --- a/core/src/main/scala/spark/RDDCheckpointData.scala +++ b/core/src/main/scala/spark/RDDCheckpointData.scala @@ -14,15 +14,23 @@ private[spark] object CheckpointState extends Enumeration { } /** - * This class contains all the information of the regarding RDD checkpointing. + * This class contains all the information related to RDD checkpointing. Each instance of this class + * is associated with a RDD. It manages process of checkpointing of the associated RDD, as well as, + * manages the post-checkpoint state by providing the updated splits, iterator and preferred locations + * of the checkpointed RDD. */ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T]) extends Logging with Serializable { import CheckpointState._ + // The checkpoint state of the associated RDD. var cpState = Initialized + + // The file to which the associated RDD has been checkpointed to @transient var cpFile: Option[String] = None + + // The CheckpointRDD created from the checkpoint file, that is, the new parent the associated RDD. @transient var cpRDD: Option[RDD[T]] = None // Mark the RDD for checkpointing @@ -65,7 +73,7 @@ extends Logging with Serializable { cpRDD = Some(newRDD) rdd.changeDependencies(newRDD) cpState = Checkpointed - RDDCheckpointData.checkpointCompleted() + RDDCheckpointData.clearTaskCaches() logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id) } } @@ -90,7 +98,7 @@ extends Logging with Serializable { } private[spark] object RDDCheckpointData { - def checkpointCompleted() { + def clearTaskCaches() { ShuffleMapTask.clearCache() ResultTask.clearCache() } diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index 1a88d402c3..86c63ca2f4 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -13,6 +13,10 @@ private[spark] class CheckpointRDDSplit(idx: Int, val splitFile: String) extends override val index: Int = idx } +/** + * This RDD represents a RDD checkpoint file (similar to HadoopRDD). + */ +private[spark] class CheckpointRDD[T: ClassManifest](sc: SparkContext, checkpointPath: String) extends RDD[T](sc, Nil) { diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 9387ba19a3..59f2099e91 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -599,15 +599,15 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with def cleanup(cleanupTime: Long) { var sizeBefore = idToStage.size - idToStage.cleanup(cleanupTime) + idToStage.clearOldValues(cleanupTime) logInfo("idToStage " + sizeBefore + " --> " + idToStage.size) sizeBefore = shuffleToMapStage.size - shuffleToMapStage.cleanup(cleanupTime) + shuffleToMapStage.clearOldValues(cleanupTime) logInfo("shuffleToMapStage " + sizeBefore + " --> " + shuffleToMapStage.size) sizeBefore = pendingTasks.size - pendingTasks.cleanup(cleanupTime) + pendingTasks.clearOldValues(cleanupTime) logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size) } diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala index 7ec6564105..74a63c1af1 100644 --- a/core/src/main/scala/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/spark/scheduler/ResultTask.scala @@ -12,7 +12,7 @@ private[spark] object ResultTask { // expensive on the master node if it needs to launch thousands of tasks. val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]] - val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.cleanup) + val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.clearOldValues) def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = { synchronized { diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index feb63abb61..19f5328eee 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -23,7 +23,7 @@ private[spark] object ShuffleMapTask { // expensive on the master node if it needs to launch thousands of tasks. val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]] - val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.cleanup) + val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.clearOldValues) def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = { synchronized { diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala index 7e785182ea..bb7c5c01c8 100644 --- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala @@ -7,7 +7,7 @@ import scala.collection.mutable.Map /** * This is a custom implementation of scala.collection.mutable.Map which stores the insertion * time stamp along with each key-value pair. Key-value pairs that are older than a particular - * threshold time can them be removed using the cleanup method. This is intended to be a drop-in + * threshold time can them be removed using the clearOldValues method. This is intended to be a drop-in * replacement of scala.collection.mutable.HashMap. */ class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging { @@ -74,7 +74,10 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging { } } - def cleanup(threshTime: Long) { + /** + * Removes old key-value pairs that have timestamp earlier than `threshTime` + */ + def clearOldValues(threshTime: Long) { val iterator = internalMap.entrySet().iterator() while(iterator.hasNext) { val entry = iterator.next() diff --git a/core/src/main/scala/spark/util/TimeStampedHashSet.scala b/core/src/main/scala/spark/util/TimeStampedHashSet.scala index 539dd75844..5f1cc93752 100644 --- a/core/src/main/scala/spark/util/TimeStampedHashSet.scala +++ b/core/src/main/scala/spark/util/TimeStampedHashSet.scala @@ -52,7 +52,10 @@ class TimeStampedHashSet[A] extends Set[A] { } } - def cleanup(threshTime: Long) { + /** + * Removes old values that have timestamp earlier than `threshTime` + */ + def clearOldValues(threshTime: Long) { val iterator = internalMap.entrySet().iterator() while(iterator.hasNext) { val entry = iterator.next() diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 5ed388e91b..6ec89c0184 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -1,8 +1,8 @@ -# Set everything to be logged to the file spark-tests.log +# Set everything to be logged to the file core/target/unit-tests.log log4j.rootCategory=INFO, file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=spark-tests.log +log4j.appender.file.file=core/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index fc2ea2ef79..05a88ce7bd 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -187,8 +187,8 @@ Conversely, the computation can be stopped by using ssc.stop() {% endhighlight %} -# Example - WordCountNetwork.scala -A good example to start off is the spark.streaming.examples.WordCountNetwork. This example counts the words received from a network server every second. Given below is the relevant sections of the source code. You can find the full source code in <Spark repo>/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala. +# Example - NetworkWordCount.scala +A good example to start off is the spark.streaming.examples.NetworkWordCount. This example counts the words received from a network server every second. Given below is the relevant sections of the source code. You can find the full source code in <Spark repo>/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala. {% highlight scala %} import spark.streaming.{Seconds, StreamingContext} @@ -196,7 +196,7 @@ import spark.streaming.StreamingContext._ ... // Create the context and set up a network input stream to receive from a host:port -val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1)) +val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1)) val lines = ssc.networkTextStream(args(1), args(2).toInt) // Split the lines into words, count them, and print some of the counts on the master @@ -214,13 +214,13 @@ To run this example on your local machine, you need to first run a Netcat server $ nc -lk 9999 {% endhighlight %} -Then, in a different terminal, you can start WordCountNetwork by using +Then, in a different terminal, you can start NetworkWordCount by using {% highlight bash %} -$ ./run spark.streaming.examples.WordCountNetwork local[2] localhost 9999 +$ ./run spark.streaming.examples.NetworkWordCount local[2] localhost 9999 {% endhighlight %} -This will make WordCountNetwork connect to the netcat server. Any lines typed in the terminal running the netcat server will be counted and printed on screen. +This will make NetworkWordCount connect to the netcat server. Any lines typed in the terminal running the netcat server will be counted and printed on screen. <table> <td> @@ -240,7 +240,7 @@ hello world </td> <td> {% highlight bash %} -# TERMINAL 2: RUNNING WordCountNetwork +# TERMINAL 2: RUNNING NetworkWordCount ... 2012-12-31 18:47:10,446 INFO SparkContext: Job finished: run at ThreadPoolExecutor.java:886, took 0.038817 s ------------------------------------------- diff --git a/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala index e60ce483a3..461929fba2 100644 --- a/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala @@ -5,7 +5,7 @@ import spark.storage.StorageLevel import spark.streaming._ /** - * Produce a streaming count of events received from Flume. + * Produces a count of events received from Flume. * * This should be used in conjunction with an AvroSink in Flume. It will start * an Avro server on at the request host:port address and listen for requests. diff --git a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala new file mode 100644 index 0000000000..8530f5c175 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala @@ -0,0 +1,36 @@ +package spark.streaming.examples + +import spark.streaming.{Seconds, StreamingContext} +import spark.streaming.StreamingContext._ + + +/** + * Counts words in new text files created in the given directory + * Usage: HdfsWordCount <master> <directory> + * <master> is the Spark master URL. + * <directory> is the directory that Spark Streaming will use to find and read new text files. + * + * To run this on your local machine on directory `localdir`, run this example + * `$ ./run spark.streaming.examples.HdfsWordCount local[2] localdir` + * Then create a text file in `localdir` and the words in the file will get counted. + */ +object HdfsWordCount { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println("Usage: HdfsWordCount <master> <directory>") + System.exit(1) + } + + // Create the context + val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2)) + + // Create the FileInputDStream on the directory and use the + // stream to count words in new files created + val lines = ssc.textFileStream(args(1)) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() + } +} + diff --git a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala index fe55db6e2c..fe55db6e2c 100644 --- a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala index eadda60563..43c01d5db2 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala +++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala @@ -3,16 +3,27 @@ package spark.streaming.examples import spark.streaming.{Seconds, StreamingContext} import spark.streaming.StreamingContext._ -object WordCountNetwork { +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Usage: NetworkWordCount <master> <hostname> <port> + * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ ./run spark.streaming.examples.NetworkWordCount local[2] localhost 9999` + */ +object NetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { - System.err.println("Usage: WordCountNetwork <master> <hostname> <port>\n" + + System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" + "In local mode, <master> should be 'local[n]' with n > 1") System.exit(1) } // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1)) + val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1)) // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') diff --git a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala index 2a265d021d..2a265d021d 100644 --- a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala +++ b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala diff --git a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala new file mode 100644 index 0000000000..2eec777c54 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala @@ -0,0 +1,46 @@ +package spark.streaming.examples + +import spark.util.IntParam +import spark.storage.StorageLevel + +import spark.streaming._ +import spark.streaming.util.RawTextHelper + +/** + * Receives text from multiple rawNetworkStreams and counts how many '\n' delimited + * lines have the word 'the' in them. This is useful for benchmarking purposes. This + * will only work with spark.streaming.util.RawTextSender running on all worker nodes + * and with Spark using Kryo serialization (set Java property "spark.serializer" to + * "spark.KryoSerializer"). + * Usage: RawNetworkGrep <master> <numStreams> <host> <port> <batchMillis> + * <master> is the Spark master URL + * <numStream> is the number rawNetworkStreams, which should be same as number + * of work nodes in the cluster + * <host> is "localhost". + * <port> is the port on which RawTextSender is running in the worker nodes. + * <batchMillise> is the Spark Streaming batch duration in milliseconds. + */ + +object RawNetworkGrep { + def main(args: Array[String]) { + if (args.length != 5) { + System.err.println("Usage: RawNetworkGrep <master> <numStreams> <host> <port> <batchMillis>") + System.exit(1) + } + + val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args + + // Create the context + val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis)) + + // Warm up the JVMs on master and slave for JIT compilation to kick in + RawTextHelper.warmUp(ssc.sc) + + val rawStreams = (1 to numStreams).map(_ => + ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray + val union = ssc.union(rawStreams) + union.filter(_.contains("the")).count().foreach(r => + println("Grep count: " + r.collect().mkString)) + ssc.start() + } +} diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala index 4c6e08bc74..4c6e08bc74 100644 --- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala +++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala index a191321d91..a191321d91 100644 --- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a3f901a081..6ba3026bcc 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -23,7 +23,7 @@ object SparkBuild extends Build { lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) - lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) + lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming) lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn (core) diff --git a/repl/src/test/resources/log4j.properties b/repl/src/test/resources/log4j.properties index 4c99e450bc..cfb1a390e6 100644 --- a/repl/src/test/resources/log4j.properties +++ b/repl/src/test/resources/log4j.properties @@ -1,8 +1,8 @@ -# Set everything to be logged to the console +# Set everything to be logged to the repl/target/unit-tests.log log4j.rootCategory=INFO, file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=spark-tests.log +log4j.appender.file.file=repl/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n @@ -13,10 +13,6 @@ if [ -e $FWDIR/conf/spark-env.sh ] ; then . $FWDIR/conf/spark-env.sh fi -if [ -e $FWDIR/conf/streaming-env.sh ] ; then - . $FWDIR/conf/streaming-env.sh -fi - if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then if [ `command -v scala` ]; then RUNNER="scala" diff --git a/sentences.txt b/sentences.txt deleted file mode 100644 index fedf96c66e..0000000000 --- a/sentences.txt +++ /dev/null @@ -1,3 +0,0 @@ -Hello world! -What's up? -There is no cow level diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 7256e41af9..215246ba2e 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -154,7 +154,7 @@ class StreamingContext private ( storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 ): DStream[T] = { val inputStream = new KafkaInputDStream[T](this, hostname, port, groupId, topics, initialOffsets, storageLevel) - graph.addInputStream(inputStream) + registerInputStream(inputStream) inputStream } @@ -192,7 +192,7 @@ class StreamingContext private ( storageLevel: StorageLevel ): DStream[T] = { val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel) - graph.addInputStream(inputStream) + registerInputStream(inputStream) inputStream } @@ -208,7 +208,7 @@ class StreamingContext private ( storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[SparkFlumeEvent] = { val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel) - graph.addInputStream(inputStream) + registerInputStream(inputStream) inputStream } @@ -228,13 +228,14 @@ class StreamingContext private ( storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[T] = { val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel) - graph.addInputStream(inputStream) + registerInputStream(inputStream) inputStream } /** * Creates a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. + * File names starting with . are ignored. * @param directory HDFS directory to monitor for new file * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file @@ -244,16 +245,37 @@ class StreamingContext private ( K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K, V]: ClassManifest - ](directory: String): DStream[(K, V)] = { + ] (directory: String): DStream[(K, V)] = { val inputStream = new FileInputDStream[K, V, F](this, directory) - graph.addInputStream(inputStream) + registerInputStream(inputStream) + inputStream + } + + /** + * Creates a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * @param directory HDFS directory to monitor for new file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassManifest, + V: ClassManifest, + F <: NewInputFormat[K, V]: ClassManifest + ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = { + val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) + registerInputStream(inputStream) inputStream } + /** * Creates a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as text files (using key as LongWritable, value - * as Text and input format as TextInputFormat). + * as Text and input format as TextInputFormat). File names starting with . are ignored. * @param directory HDFS directory to monitor for new file */ def textFileStream(directory: String): DStream[String] = { @@ -274,7 +296,7 @@ class StreamingContext private ( defaultRDD: RDD[T] = null ): DStream[T] = { val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD) - graph.addInputStream(inputStream) + registerInputStream(inputStream) inputStream } diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala index cf72095324..1e6ad84b44 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala @@ -14,7 +14,7 @@ private[streaming] class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( @transient ssc_ : StreamingContext, directory: String, - filter: PathFilter = FileInputDStream.defaultPathFilter, + filter: Path => Boolean = FileInputDStream.defaultFilter, newFilesOnly: Boolean = true) extends InputDStream[(K, V)](ssc_) { @@ -60,7 +60,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K val latestModTimeFiles = new HashSet[String]() def accept(path: Path): Boolean = { - if (!filter.accept(path)) { + if (!filter(path)) { return false } else { val modTime = fs.getFileStatus(path).getModificationTime() @@ -95,16 +95,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K } } +private[streaming] object FileInputDStream { - val defaultPathFilter = new PathFilter with Serializable { - def accept(path: Path): Boolean = { - val file = path.getName() - if (file.startsWith(".") || file.endsWith("_tmp")) { - return false - } else { - return true - } - } - } + def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") } diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala deleted file mode 100644 index 81938d30d4..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala +++ /dev/null @@ -1,46 +0,0 @@ -package spark.streaming.examples - -import spark.streaming.StreamingContext -import spark.streaming.StreamingContext._ -import spark.streaming.Seconds -import org.apache.hadoop.fs.Path -import org.apache.hadoop.conf.Configuration - - -object FileStream { - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println("Usage: FileStream <master> <new HDFS compatible directory>") - System.exit(1) - } - - // Create the context - val ssc = new StreamingContext(args(0), "FileStream", Seconds(1)) - - // Create the new directory - val directory = new Path(args(1)) - val fs = directory.getFileSystem(new Configuration()) - if (fs.exists(directory)) throw new Exception("This directory already exists") - fs.mkdirs(directory) - fs.deleteOnExit(directory) - - // Create the FileInputDStream on the directory and use the - // stream to count words in new files created - val inputStream = ssc.textFileStream(directory.toString) - val words = inputStream.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.print() - ssc.start() - - // Creating new files in the directory - val text = "This is a text file" - for (i <- 1 to 30) { - ssc.sc.parallelize((1 to (i * 10)).map(_ => text), 10) - .saveAsTextFile(new Path(directory, i.toString).toString) - Thread.sleep(1000) - } - Thread.sleep(5000) // Waiting for the file to be processed - ssc.stop() - System.exit(0) - } -}
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala deleted file mode 100644 index b7bc15a1d5..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala +++ /dev/null @@ -1,75 +0,0 @@ -package spark.streaming.examples - -import spark.streaming._ -import spark.streaming.StreamingContext._ -import org.apache.hadoop.fs.Path -import org.apache.hadoop.conf.Configuration - -object FileStreamWithCheckpoint { - - def main(args: Array[String]) { - - if (args.size != 3) { - println("FileStreamWithCheckpoint <master> <directory> <checkpoint dir>") - println("FileStreamWithCheckpoint restart <directory> <checkpoint dir>") - System.exit(-1) - } - - val directory = new Path(args(1)) - val checkpointDir = args(2) - - val ssc: StreamingContext = { - - if (args(0) == "restart") { - - // Recreated streaming context from specified checkpoint file - new StreamingContext(checkpointDir) - - } else { - - // Create directory if it does not exist - val fs = directory.getFileSystem(new Configuration()) - if (!fs.exists(directory)) fs.mkdirs(directory) - - // Create new streaming context - val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint", Seconds(1)) - ssc_.checkpoint(checkpointDir) - - // Setup the streaming computation - val inputStream = ssc_.textFileStream(directory.toString) - val words = inputStream.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.print() - - ssc_ - } - } - - // Start the stream computation - startFileWritingThread(directory.toString) - ssc.start() - } - - def startFileWritingThread(directory: String) { - - val fs = new Path(directory).getFileSystem(new Configuration()) - - val fileWritingThread = new Thread() { - override def run() { - val r = new scala.util.Random() - val text = "This is a sample text file with a random number " - while(true) { - val number = r.nextInt() - val file = new Path(directory, number.toString) - val fos = fs.create(file) - fos.writeChars(text + number) - fos.close() - println("Created text file " + file) - Thread.sleep(1000) - } - } - } - fileWritingThread.start() - } - -} diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala deleted file mode 100644 index dfaaf03f03..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala +++ /dev/null @@ -1,33 +0,0 @@ -package spark.streaming.examples - -import spark.util.IntParam -import spark.storage.StorageLevel - -import spark.streaming._ -import spark.streaming.StreamingContext._ -import spark.streaming.util.RawTextHelper._ - -object GrepRaw { - def main(args: Array[String]) { - if (args.length != 5) { - System.err.println("Usage: GrepRaw <master> <numStreams> <host> <port> <batchMillis>") - System.exit(1) - } - - val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args - - // Create the context - val ssc = new StreamingContext(master, "GrepRaw", Milliseconds(batchMillis)) - - // Warm up the JVMs on master and slave for JIT compilation to kick in - warmUp(ssc.sc) - - - val rawStreams = (1 to numStreams).map(_ => - ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray - val union = ssc.union(rawStreams) - union.filter(_.contains("Alice")).count().foreach(r => - println("Grep count: " + r.collect().mkString)) - ssc.start() - } -} diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala deleted file mode 100644 index 338834bc3c..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala +++ /dev/null @@ -1,49 +0,0 @@ -package spark.streaming.examples - -import spark.storage.StorageLevel -import spark.util.IntParam - -import spark.streaming._ -import spark.streaming.StreamingContext._ -import spark.streaming.util.RawTextHelper._ - -import java.util.UUID - -object TopKWordCountRaw { - - def main(args: Array[String]) { - if (args.length != 4) { - System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ") - System.exit(1) - } - - val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args - val k = 10 - - // Create the context, and set the checkpoint directory. - // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts - // periodically to HDFS - val ssc = new StreamingContext(master, "TopKWordCountRaw", Seconds(1)) - ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) - - // Warm up the JVMs on master and slave for JIT compilation to kick in - /*warmUp(ssc.sc)*/ - - // Set up the raw network streams that will connect to localhost:port to raw test - // senders on the slaves and generate top K words of last 30 seconds - val lines = (1 to numStreams).map(_ => { - ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2) - }) - val union = ssc.union(lines) - val counts = union.mapPartitions(splitAndCountPartitions) - val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10) - val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k)) - partialTopKWindowedCounts.foreach(rdd => { - val collectedCounts = rdd.collect - println("Collected " + collectedCounts.size + " words from partial top words") - println("Top " + k + " words are " + topK(collectedCounts.toIterator, k).mkString(",")) - }) - - ssc.start() - } -} diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala deleted file mode 100644 index 867a8f42c4..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala +++ /dev/null @@ -1,25 +0,0 @@ -package spark.streaming.examples - -import spark.streaming.{Seconds, StreamingContext} -import spark.streaming.StreamingContext._ - -object WordCountHdfs { - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println("Usage: WordCountHdfs <master> <directory>") - System.exit(1) - } - - // Create the context - val ssc = new StreamingContext(args(0), "WordCountHdfs", Seconds(2)) - - // Create the FileInputDStream on the directory and use the - // stream to count words in new files created - val lines = ssc.textFileStream(args(1)) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.print() - ssc.start() - } -} - diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala deleted file mode 100644 index d93335a8ce..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala +++ /dev/null @@ -1,43 +0,0 @@ -package spark.streaming.examples - -import spark.storage.StorageLevel -import spark.util.IntParam - -import spark.streaming._ -import spark.streaming.StreamingContext._ -import spark.streaming.util.RawTextHelper._ - -import java.util.UUID - -object WordCountRaw { - - def main(args: Array[String]) { - if (args.length != 4) { - System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ") - System.exit(1) - } - - val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args - - // Create the context, and set the checkpoint directory. - // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts - // periodically to HDFS - val ssc = new StreamingContext(master, "WordCountRaw", Seconds(1)) - ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) - - // Warm up the JVMs on master and slave for JIT compilation to kick in - warmUp(ssc.sc) - - // Set up the raw network streams that will connect to localhost:port to raw test - // senders on the slaves and generate count of words of last 30 seconds - val lines = (1 to numStreams).map(_ => { - ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2) - }) - val union = ssc.union(lines) - val counts = union.mapPartitions(splitAndCountPartitions) - val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10) - windowedCounts.foreach(r => println("# unique words = " + r.count())) - - ssc.start() - } -} diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties index 33bafebaab..edfa1243fa 100644 --- a/streaming/src/test/resources/log4j.properties +++ b/streaming/src/test/resources/log4j.properties @@ -1,8 +1,8 @@ -# Set everything to be logged to the file streaming-tests.log +# Set everything to be logged to the file streaming/target/unit-tests.log log4j.rootCategory=INFO, file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming-tests.log +log4j.appender.file.file=streaming/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 76b528bec3..e71ba6ddc1 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -318,7 +318,7 @@ class TestServer(port: Int) extends Logging { } } } catch { - case e: SocketException => println(e) + case e: SocketException => logError("TestServer error", e) } finally { logInfo("Connection closed") if (!clientSocket.isClosed) clientSocket.close() |