From 7e0271b4387eaf27cd96f3057ce2465b1271a480 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 30 Dec 2012 15:19:55 -0800 Subject: Refactored a whole lot to push all DStreams into the spark.streaming.dstream package. --- core/src/main/scala/spark/rdd/ShuffledRDD.scala | 1 + .../scala/spark/streaming/CoGroupedDStream.scala | 38 --- .../spark/streaming/ConstantInputDStream.scala | 18 -- .../src/main/scala/spark/streaming/DStream.scala | 276 +-------------------- .../main/scala/spark/streaming/DStreamGraph.scala | 1 + .../main/scala/spark/streaming/DataHandler.scala | 83 ------- .../scala/spark/streaming/FileInputDStream.scala | 109 -------- .../scala/spark/streaming/FlumeInputDStream.scala | 130 ---------- .../spark/streaming/NetworkInputDStream.scala | 156 ------------ .../spark/streaming/NetworkInputTracker.scala | 2 + .../spark/streaming/PairDStreamFunctions.scala | 7 +- .../scala/spark/streaming/QueueInputDStream.scala | 40 --- .../scala/spark/streaming/RawInputDStream.scala | 85 ------- .../spark/streaming/ReducedWindowedDStream.scala | 149 ----------- .../src/main/scala/spark/streaming/Scheduler.scala | 3 - .../scala/spark/streaming/SocketInputDStream.scala | 107 -------- .../main/scala/spark/streaming/StateDStream.scala | 84 ------- .../scala/spark/streaming/StreamingContext.scala | 13 +- .../src/main/scala/spark/streaming/Time.scala | 11 +- .../scala/spark/streaming/WindowedDStream.scala | 39 --- .../spark/streaming/dstream/CoGroupedDStream.scala | 39 +++ .../streaming/dstream/ConstantInputDStream.scala | 19 ++ .../spark/streaming/dstream/DataHandler.scala | 83 +++++++ .../spark/streaming/dstream/FileInputDStream.scala | 110 ++++++++ .../spark/streaming/dstream/FilteredDStream.scala | 21 ++ .../streaming/dstream/FlatMapValuedDStream.scala | 20 ++ .../streaming/dstream/FlatMappedDStream.scala | 20 ++ .../streaming/dstream/FlumeInputDStream.scala | 135 ++++++++++ .../spark/streaming/dstream/ForEachDStream.scala | 28 +++ .../spark/streaming/dstream/GlommedDStream.scala | 17 ++ .../spark/streaming/dstream/InputDStream.scala | 19 ++ .../streaming/dstream/KafkaInputDStream.scala | 197 +++++++++++++++ .../streaming/dstream/MapPartitionedDStream.scala | 21 ++ .../spark/streaming/dstream/MapValuedDStream.scala | 21 ++ .../spark/streaming/dstream/MappedDStream.scala | 20 ++ .../streaming/dstream/NetworkInputDStream.scala | 157 ++++++++++++ .../streaming/dstream/QueueInputDStream.scala | 41 +++ .../spark/streaming/dstream/RawInputDStream.scala | 88 +++++++ .../streaming/dstream/ReducedWindowedDStream.scala | 148 +++++++++++ .../spark/streaming/dstream/ShuffledDStream.scala | 27 ++ .../streaming/dstream/SocketInputDStream.scala | 103 ++++++++ .../spark/streaming/dstream/StateDStream.scala | 83 +++++++ .../streaming/dstream/TransformedDStream.scala | 19 ++ .../spark/streaming/dstream/UnionDStream.scala | 39 +++ .../spark/streaming/dstream/WindowedDStream.scala | 40 +++ .../scala/spark/streaming/examples/GrepRaw.scala | 2 +- .../streaming/examples/TopKWordCountRaw.scala | 2 +- .../spark/streaming/examples/WordCountRaw.scala | 2 +- .../spark/streaming/input/KafkaInputDStream.scala | 193 -------------- .../scala/spark/streaming/CheckpointSuite.scala | 2 +- .../test/scala/spark/streaming/FailureSuite.scala | 2 +- .../scala/spark/streaming/InputStreamsSuite.scala | 1 + .../test/scala/spark/streaming/TestSuiteBase.scala | 48 +++- .../spark/streaming/WindowOperationsSuite.scala | 12 +- 54 files changed, 1600 insertions(+), 1531 deletions(-) delete mode 100644 streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/DataHandler.scala delete mode 100644 streaming/src/main/scala/spark/streaming/FileInputDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/QueueInputDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/RawInputDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/SocketInputDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/StateDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/WindowedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/DataHandler.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index f40b56be64..1b219473e0 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -1,6 +1,7 @@ package spark.rdd import spark.{Partitioner, RDD, SparkEnv, ShuffleDependency, Split, TaskContext} +import spark.SparkContext._ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split { override val index = idx diff --git a/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala deleted file mode 100644 index 61d088eddb..0000000000 --- a/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala +++ /dev/null @@ -1,38 +0,0 @@ -package spark.streaming - -import spark.{RDD, Partitioner} -import spark.rdd.CoGroupedRDD - -class CoGroupedDStream[K : ClassManifest]( - parents: Seq[DStream[(_, _)]], - partitioner: Partitioner - ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) { - - if (parents.length == 0) { - throw new IllegalArgumentException("Empty array of parents") - } - - if (parents.map(_.ssc).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different StreamingContexts") - } - - if (parents.map(_.slideTime).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different slide times") - } - - override def dependencies = parents.toList - - override def slideTime = parents.head.slideTime - - override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = { - val part = partitioner - val rdds = parents.flatMap(_.getOrCompute(validTime)) - if (rdds.size > 0) { - val q = new CoGroupedRDD[K](rdds, part) - Some(q) - } else { - None - } - } - -} diff --git a/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala b/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala deleted file mode 100644 index 80150708fd..0000000000 --- a/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala +++ /dev/null @@ -1,18 +0,0 @@ -package spark.streaming - -import spark.RDD - -/** - * An input stream that always returns the same RDD on each timestep. Useful for testing. - */ -class ConstantInputDStream[T: ClassManifest](ssc_ : StreamingContext, rdd: RDD[T]) - extends InputDStream[T](ssc_) { - - override def start() {} - - override def stop() {} - - override def compute(validTime: Time): Option[RDD[T]] = { - Some(rdd) - } -} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 3834b57ed3..292ad3b9f9 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -1,17 +1,15 @@ package spark.streaming +import spark.streaming.dstream._ import StreamingContext._ import Time._ -import spark._ -import spark.SparkContext._ -import spark.rdd._ +import spark.{RDD, Logging} import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import java.util.concurrent.ArrayBlockingQueue import java.io.{ObjectInputStream, IOException, ObjectOutputStream} import org.apache.hadoop.fs.Path @@ -197,7 +195,7 @@ abstract class DStream[T: ClassManifest] ( "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" + "delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " + "the Java property 'spark.cleaner.delay' to more than " + - math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes." + math.ceil(rememberDuration.milliseconds.toDouble / 60000.0).toInt + " minutes." ) dependencies.foreach(_.validate()) @@ -642,271 +640,3 @@ abstract class DStream[T: ClassManifest] ( private[streaming] case class DStreamCheckpointData(rdds: HashMap[Time, Any]) -abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext) - extends DStream[T](ssc_) { - - override def dependencies = List() - - override def slideTime = { - if (ssc == null) throw new Exception("ssc is null") - if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null") - ssc.graph.batchDuration - } - - def start() - - def stop() -} - - -/** - * TODO - */ - -private[streaming] -class MappedDStream[T: ClassManifest, U: ClassManifest] ( - parent: DStream[T], - mapFunc: T => U - ) extends DStream[U](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[U]] = { - parent.getOrCompute(validTime).map(_.map[U](mapFunc)) - } -} - - -/** - * TODO - */ - -private[streaming] -class FlatMappedDStream[T: ClassManifest, U: ClassManifest]( - parent: DStream[T], - flatMapFunc: T => Traversable[U] - ) extends DStream[U](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[U]] = { - parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) - } -} - - -/** - * TODO - */ - -private[streaming] -class FilteredDStream[T: ClassManifest]( - parent: DStream[T], - filterFunc: T => Boolean - ) extends DStream[T](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[T]] = { - parent.getOrCompute(validTime).map(_.filter(filterFunc)) - } -} - - -/** - * TODO - */ - -private[streaming] -class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( - parent: DStream[T], - mapPartFunc: Iterator[T] => Iterator[U], - preservePartitioning: Boolean - ) extends DStream[U](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[U]] = { - parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) - } -} - - -/** - * TODO - */ - -private[streaming] -class GlommedDStream[T: ClassManifest](parent: DStream[T]) - extends DStream[Array[T]](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[Array[T]]] = { - parent.getOrCompute(validTime).map(_.glom()) - } -} - - -/** - * TODO - */ - -private[streaming] -class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( - parent: DStream[(K,V)], - createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiner: (C, C) => C, - partitioner: Partitioner - ) extends DStream [(K,C)] (parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[(K,C)]] = { - parent.getOrCompute(validTime) match { - case Some(rdd) => - Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner)) - case None => None - } - } -} - - -/** - * TODO - */ - -private[streaming] -class MapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( - parent: DStream[(K, V)], - mapValueFunc: V => U - ) extends DStream[(K, U)](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[(K, U)]] = { - parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc)) - } -} - - -/** - * TODO - */ -private[streaming] -class FlatMapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( - parent: DStream[(K, V)], - flatMapValueFunc: V => TraversableOnce[U] - ) extends DStream[(K, U)](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[(K, U)]] = { - parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)) - } -} - - - -/** - * TODO - */ - -class UnionDStream[T: ClassManifest](parents: Array[DStream[T]]) - extends DStream[T](parents.head.ssc) { - - if (parents.length == 0) { - throw new IllegalArgumentException("Empty array of parents") - } - - if (parents.map(_.ssc).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different StreamingContexts") - } - - if (parents.map(_.slideTime).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different slide times") - } - - override def dependencies = parents.toList - - override def slideTime: Time = parents.head.slideTime - - override def compute(validTime: Time): Option[RDD[T]] = { - val rdds = new ArrayBuffer[RDD[T]]() - parents.map(_.getOrCompute(validTime)).foreach(_ match { - case Some(rdd) => rdds += rdd - case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime) - }) - if (rdds.size > 0) { - Some(new UnionRDD(ssc.sc, rdds)) - } else { - None - } - } -} - - -/** - * TODO - */ - -private[streaming] -class ForEachDStream[T: ClassManifest] ( - parent: DStream[T], - foreachFunc: (RDD[T], Time) => Unit - ) extends DStream[Unit](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[Unit]] = None - - override def generateJob(time: Time): Option[Job] = { - parent.getOrCompute(time) match { - case Some(rdd) => - val jobFunc = () => { - foreachFunc(rdd, time) - } - Some(new Job(time, jobFunc)) - case None => None - } - } -} - - -/** - * TODO - */ - -private[streaming] -class TransformedDStream[T: ClassManifest, U: ClassManifest] ( - parent: DStream[T], - transformFunc: (RDD[T], Time) => RDD[U] - ) extends DStream[U](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[U]] = { - parent.getOrCompute(validTime).map(transformFunc(_, validTime)) - } -} diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index d0a9ade61d..c72429370e 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -1,5 +1,6 @@ package spark.streaming +import dstream.InputDStream import java.io.{ObjectInputStream, IOException, ObjectOutputStream} import collection.mutable.ArrayBuffer import spark.Logging diff --git a/streaming/src/main/scala/spark/streaming/DataHandler.scala b/streaming/src/main/scala/spark/streaming/DataHandler.scala deleted file mode 100644 index 05f307a8d1..0000000000 --- a/streaming/src/main/scala/spark/streaming/DataHandler.scala +++ /dev/null @@ -1,83 +0,0 @@ -package spark.streaming - -import java.util.concurrent.ArrayBlockingQueue -import scala.collection.mutable.ArrayBuffer -import spark.Logging -import spark.streaming.util.{RecurringTimer, SystemClock} -import spark.storage.StorageLevel - - -/** - * This is a helper object that manages the data received from the socket. It divides - * the object received into small batches of 100s of milliseconds, pushes them as - * blocks into the block manager and reports the block IDs to the network input - * tracker. It starts two threads, one to periodically start a new batch and prepare - * the previous batch of as a block, the other to push the blocks into the block - * manager. - */ - class DataHandler[T](receiver: NetworkReceiver[T], storageLevel: StorageLevel) - extends Serializable with Logging { - - case class Block(id: String, iterator: Iterator[T], metadata: Any = null) - - val clock = new SystemClock() - val blockInterval = 200L - val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) - val blockStorageLevel = storageLevel - val blocksForPushing = new ArrayBlockingQueue[Block](1000) - val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } - - var currentBuffer = new ArrayBuffer[T] - - def createBlock(blockId: String, iterator: Iterator[T]) : Block = { - new Block(blockId, iterator) - } - - def start() { - blockIntervalTimer.start() - blockPushingThread.start() - logInfo("Data handler started") - } - - def stop() { - blockIntervalTimer.stop() - blockPushingThread.interrupt() - logInfo("Data handler stopped") - } - - def += (obj: T) { - currentBuffer += obj - } - - def updateCurrentBuffer(time: Long) { - try { - val newBlockBuffer = currentBuffer - currentBuffer = new ArrayBuffer[T] - if (newBlockBuffer.size > 0) { - val blockId = "input-" + receiver.streamId + "- " + (time - blockInterval) - val newBlock = createBlock(blockId, newBlockBuffer.toIterator) - blocksForPushing.add(newBlock) - } - } catch { - case ie: InterruptedException => - logInfo("Block interval timer thread interrupted") - case e: Exception => - receiver.stop() - } - } - - def keepPushingBlocks() { - logInfo("Block pushing thread started") - try { - while(true) { - val block = blocksForPushing.take() - receiver.pushBlock(block.id, block.iterator, block.metadata, storageLevel) - } - } catch { - case ie: InterruptedException => - logInfo("Block pushing thread interrupted") - case e: Exception => - receiver.stop() - } - } - } \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala deleted file mode 100644 index 88856364d2..0000000000 --- a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala +++ /dev/null @@ -1,109 +0,0 @@ -package spark.streaming - -import spark.RDD -import spark.rdd.UnionRDD - -import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} - -import scala.collection.mutable.HashSet - - -class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( - @transient ssc_ : StreamingContext, - directory: String, - filter: PathFilter = FileInputDStream.defaultPathFilter, - newFilesOnly: Boolean = true) - extends InputDStream[(K, V)](ssc_) { - - @transient private var path_ : Path = null - @transient private var fs_ : FileSystem = null - - var lastModTime = 0L - val lastModTimeFiles = new HashSet[String]() - - def path(): Path = { - if (path_ == null) path_ = new Path(directory) - path_ - } - - def fs(): FileSystem = { - if (fs_ == null) fs_ = path.getFileSystem(new Configuration()) - fs_ - } - - override def start() { - if (newFilesOnly) { - lastModTime = System.currentTimeMillis() - } else { - lastModTime = 0 - } - } - - override def stop() { } - - /** - * Finds the files that were modified since the last time this method was called and makes - * a union RDD out of them. Note that this maintains the list of files that were processed - * in the latest modification time in the previous call to this method. This is because the - * modification time returned by the FileStatus API seems to return times only at the - * granularity of seconds. Hence, new files may have the same modification time as the - * latest modification time in the previous call to this method and the list of files - * maintained is used to filter the one that have been processed. - */ - override def compute(validTime: Time): Option[RDD[(K, V)]] = { - // Create the filter for selecting new files - val newFilter = new PathFilter() { - var latestModTime = 0L - val latestModTimeFiles = new HashSet[String]() - - def accept(path: Path): Boolean = { - if (!filter.accept(path)) { - return false - } else { - val modTime = fs.getFileStatus(path).getModificationTime() - if (modTime < lastModTime){ - return false - } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) { - return false - } - if (modTime > latestModTime) { - latestModTime = modTime - latestModTimeFiles.clear() - } - latestModTimeFiles += path.toString - return true - } - } - } - - val newFiles = fs.listStatus(path, newFilter) - logInfo("New files: " + newFiles.map(_.getPath).mkString(", ")) - if (newFiles.length > 0) { - // Update the modification time and the files processed for that modification time - if (lastModTime != newFilter.latestModTime) { - lastModTime = newFilter.latestModTime - lastModTimeFiles.clear() - } - lastModTimeFiles ++= newFilter.latestModTimeFiles - } - val newRDD = new UnionRDD(ssc.sc, newFiles.map( - file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString))) - Some(newRDD) - } -} - -object FileInputDStream { - val defaultPathFilter = new PathFilter with Serializable { - def accept(path: Path): Boolean = { - val file = path.getName() - if (file.startsWith(".") || file.endsWith("_tmp")) { - return false - } else { - return true - } - } - } -} - diff --git a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala deleted file mode 100644 index 5ac7e5b08e..0000000000 --- a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala +++ /dev/null @@ -1,130 +0,0 @@ -package spark.streaming - -import java.io.{ObjectInput, ObjectOutput, Externalizable} -import spark.storage.StorageLevel -import org.apache.flume.source.avro.AvroSourceProtocol -import org.apache.flume.source.avro.AvroFlumeEvent -import org.apache.flume.source.avro.Status -import org.apache.avro.ipc.specific.SpecificResponder -import org.apache.avro.ipc.NettyServer -import java.net.InetSocketAddress -import collection.JavaConversions._ -import spark.Utils -import java.nio.ByteBuffer - -class FlumeInputDStream[T: ClassManifest]( - @transient ssc_ : StreamingContext, - host: String, - port: Int, - storageLevel: StorageLevel -) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { - - override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = { - new FlumeReceiver(id, host, port, storageLevel) - } -} - -/** - * A wrapper class for AvroFlumeEvent's with a custom serialization format. - * - * This is necessary because AvroFlumeEvent uses inner data structures - * which are not serializable. - */ -class SparkFlumeEvent() extends Externalizable { - var event : AvroFlumeEvent = new AvroFlumeEvent() - - /* De-serialize from bytes. */ - def readExternal(in: ObjectInput) { - val bodyLength = in.readInt() - val bodyBuff = new Array[Byte](bodyLength) - in.read(bodyBuff) - - val numHeaders = in.readInt() - val headers = new java.util.HashMap[CharSequence, CharSequence] - - for (i <- 0 until numHeaders) { - val keyLength = in.readInt() - val keyBuff = new Array[Byte](keyLength) - in.read(keyBuff) - val key : String = Utils.deserialize(keyBuff) - - val valLength = in.readInt() - val valBuff = new Array[Byte](valLength) - in.read(valBuff) - val value : String = Utils.deserialize(valBuff) - - headers.put(key, value) - } - - event.setBody(ByteBuffer.wrap(bodyBuff)) - event.setHeaders(headers) - } - - /* Serialize to bytes. */ - def writeExternal(out: ObjectOutput) { - val body = event.getBody.array() - out.writeInt(body.length) - out.write(body) - - val numHeaders = event.getHeaders.size() - out.writeInt(numHeaders) - for ((k, v) <- event.getHeaders) { - val keyBuff = Utils.serialize(k.toString) - out.writeInt(keyBuff.length) - out.write(keyBuff) - val valBuff = Utils.serialize(v.toString) - out.writeInt(valBuff.length) - out.write(valBuff) - } - } -} - -private[streaming] object SparkFlumeEvent { - def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = { - val event = new SparkFlumeEvent - event.event = in - event - } -} - -/** A simple server that implements Flume's Avro protocol. */ -class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { - override def append(event : AvroFlumeEvent) : Status = { - receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event) - Status.OK - } - - override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { - events.foreach (event => - receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)) - Status.OK - } -} - -/** A NetworkReceiver which listens for events using the - * Flume Avro interface.*/ -class FlumeReceiver( - streamId: Int, - host: String, - port: Int, - storageLevel: StorageLevel - ) extends NetworkReceiver[SparkFlumeEvent](streamId) { - - lazy val dataHandler = new DataHandler(this, storageLevel) - - protected override def onStart() { - val responder = new SpecificResponder( - classOf[AvroSourceProtocol], new FlumeEventServer(this)); - val server = new NettyServer(responder, new InetSocketAddress(host, port)); - dataHandler.start() - server.start() - logInfo("Flume receiver started") - } - - protected override def onStop() { - dataHandler.stop() - logInfo("Flume receiver stopped") - } - - override def getLocationPreference = Some(host) -} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala deleted file mode 100644 index 4bf13dd50c..0000000000 --- a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala +++ /dev/null @@ -1,156 +0,0 @@ -package spark.streaming - -import scala.collection.mutable.ArrayBuffer - -import spark.{Logging, SparkEnv, RDD} -import spark.rdd.BlockRDD -import spark.streaming.util.{RecurringTimer, SystemClock} -import spark.storage.StorageLevel - -import java.nio.ByteBuffer - -import akka.actor.{Props, Actor} -import akka.pattern.ask -import akka.dispatch.Await -import akka.util.duration._ - -abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext) - extends InputDStream[T](ssc_) { - - // This is an unique identifier that is used to match the network receiver with the - // corresponding network input stream. - val id = ssc.getNewNetworkStreamId() - - /** - * This method creates the receiver object that will be sent to the workers - * to receive data. This method needs to defined by any specific implementation - * of a NetworkInputDStream. - */ - def createReceiver(): NetworkReceiver[T] - - // Nothing to start or stop as both taken care of by the NetworkInputTracker. - def start() {} - - def stop() {} - - override def compute(validTime: Time): Option[RDD[T]] = { - val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime) - Some(new BlockRDD[T](ssc.sc, blockIds)) - } -} - - -private[streaming] sealed trait NetworkReceiverMessage -private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage -private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage -private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage - -abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Serializable with Logging { - - initLogging() - - lazy protected val env = SparkEnv.get - - lazy protected val actor = env.actorSystem.actorOf( - Props(new NetworkReceiverActor()), "NetworkReceiver-" + streamId) - - lazy protected val receivingThread = Thread.currentThread() - - /** This method will be called to start receiving data. */ - protected def onStart() - - /** This method will be called to stop receiving data. */ - protected def onStop() - - /** This method conveys a placement preference (hostname) for this receiver. */ - def getLocationPreference() : Option[String] = None - - /** - * This method starts the receiver. First is accesses all the lazy members to - * materialize them. Then it calls the user-defined onStart() method to start - * other threads, etc required to receiver the data. - */ - def start() { - try { - // Access the lazy vals to materialize them - env - actor - receivingThread - - // Call user-defined onStart() - onStart() - } catch { - case ie: InterruptedException => - logInfo("Receiving thread interrupted") - //println("Receiving thread interrupted") - case e: Exception => - stopOnError(e) - } - } - - /** - * This method stops the receiver. First it interrupts the main receiving thread, - * that is, the thread that called receiver.start(). Then it calls the user-defined - * onStop() method to stop other threads and/or do cleanup. - */ - def stop() { - receivingThread.interrupt() - onStop() - //TODO: terminate the actor - } - - /** - * This method stops the receiver and reports to exception to the tracker. - * This should be called whenever an exception has happened on any thread - * of the receiver. - */ - protected def stopOnError(e: Exception) { - logError("Error receiving data", e) - stop() - actor ! ReportError(e.toString) - } - - - /** - * This method pushes a block (as iterator of values) into the block manager. - */ - def pushBlock(blockId: String, iterator: Iterator[T], metadata: Any, level: StorageLevel) { - val buffer = new ArrayBuffer[T] ++ iterator - env.blockManager.put(blockId, buffer.asInstanceOf[ArrayBuffer[Any]], level) - - actor ! ReportBlock(blockId, metadata) - } - - /** - * This method pushes a block (as bytes) into the block manager. - */ - def pushBlock(blockId: String, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { - env.blockManager.putBytes(blockId, bytes, level) - actor ! ReportBlock(blockId, metadata) - } - - /** A helper actor that communicates with the NetworkInputTracker */ - private class NetworkReceiverActor extends Actor { - logInfo("Attempting to register with tracker") - val ip = System.getProperty("spark.master.host", "localhost") - val port = System.getProperty("spark.master.port", "7077").toInt - val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port) - val tracker = env.actorSystem.actorFor(url) - val timeout = 5.seconds - - override def preStart() { - val future = tracker.ask(RegisterReceiver(streamId, self))(timeout) - Await.result(future, timeout) - } - - override def receive() = { - case ReportBlock(blockId, metadata) => - tracker ! AddBlocks(streamId, Array(blockId), metadata) - case ReportError(msg) => - tracker ! DeregisterReceiver(streamId, msg) - case StopReceiver(msg) => - stop() - tracker ! DeregisterReceiver(streamId, msg) - } - } -} diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index 658498dfc1..a6ab44271f 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -1,5 +1,7 @@ package spark.streaming +import spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver} +import spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError} import spark.Logging import spark.SparkEnv diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index f9fef14196..b0a208e67f 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -1,6 +1,9 @@ package spark.streaming import spark.streaming.StreamingContext._ +import spark.streaming.dstream.{ReducedWindowedDStream, StateDStream} +import spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream} +import spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream} import spark.{Manifests, RDD, Partitioner, HashPartitioner} import spark.SparkContext._ @@ -218,13 +221,13 @@ extends Serializable { def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = { - new MapValuesDStream[K, V, U](self, mapValuesFunc) + new MapValuedDStream[K, V, U](self, mapValuesFunc) } def flatMapValues[U: ClassManifest]( flatMapValuesFunc: V => TraversableOnce[U] ): DStream[(K, U)] = { - new FlatMapValuesDStream[K, V, U](self, flatMapValuesFunc) + new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc) } def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala deleted file mode 100644 index bb86e51932..0000000000 --- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala +++ /dev/null @@ -1,40 +0,0 @@ -package spark.streaming - -import spark.RDD -import spark.rdd.UnionRDD - -import scala.collection.mutable.Queue -import scala.collection.mutable.ArrayBuffer - -class QueueInputDStream[T: ClassManifest]( - @transient ssc: StreamingContext, - val queue: Queue[RDD[T]], - oneAtATime: Boolean, - defaultRDD: RDD[T] - ) extends InputDStream[T](ssc) { - - override def start() { } - - override def stop() { } - - override def compute(validTime: Time): Option[RDD[T]] = { - val buffer = new ArrayBuffer[RDD[T]]() - if (oneAtATime && queue.size > 0) { - buffer += queue.dequeue() - } else { - buffer ++= queue - } - if (buffer.size > 0) { - if (oneAtATime) { - Some(buffer.first) - } else { - Some(new UnionRDD(ssc.sc, buffer.toSeq)) - } - } else if (defaultRDD != null) { - Some(defaultRDD) - } else { - None - } - } - -} diff --git a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala deleted file mode 100644 index 6acaa9aab1..0000000000 --- a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala +++ /dev/null @@ -1,85 +0,0 @@ -package spark.streaming - -import java.net.InetSocketAddress -import java.nio.ByteBuffer -import java.nio.channels.{ReadableByteChannel, SocketChannel} -import java.io.EOFException -import java.util.concurrent.ArrayBlockingQueue -import spark._ -import spark.storage.StorageLevel - -/** - * An input stream that reads blocks of serialized objects from a given network address. - * The blocks will be inserted directly into the block store. This is the fastest way to get - * data into Spark Streaming, though it requires the sender to batch data and serialize it - * in the format that the system is configured with. - */ -class RawInputDStream[T: ClassManifest]( - @transient ssc_ : StreamingContext, - host: String, - port: Int, - storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_ ) with Logging { - - def createReceiver(): NetworkReceiver[T] = { - new RawNetworkReceiver(id, host, port, storageLevel).asInstanceOf[NetworkReceiver[T]] - } -} - -class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel) - extends NetworkReceiver[Any](streamId) { - - var blockPushingThread: Thread = null - - override def getLocationPreference = None - - def onStart() { - // Open a socket to the target address and keep reading from it - logInfo("Connecting to " + host + ":" + port) - val channel = SocketChannel.open() - channel.configureBlocking(true) - channel.connect(new InetSocketAddress(host, port)) - logInfo("Connected to " + host + ":" + port) - - val queue = new ArrayBlockingQueue[ByteBuffer](2) - - blockPushingThread = new DaemonThread { - override def run() { - var nextBlockNumber = 0 - while (true) { - val buffer = queue.take() - val blockId = "input-" + streamId + "-" + nextBlockNumber - nextBlockNumber += 1 - pushBlock(blockId, buffer, null, storageLevel) - } - } - } - blockPushingThread.start() - - val lengthBuffer = ByteBuffer.allocate(4) - while (true) { - lengthBuffer.clear() - readFully(channel, lengthBuffer) - lengthBuffer.flip() - val length = lengthBuffer.getInt() - val dataBuffer = ByteBuffer.allocate(length) - readFully(channel, dataBuffer) - dataBuffer.flip() - logInfo("Read a block with " + length + " bytes") - queue.put(dataBuffer) - } - } - - def onStop() { - if (blockPushingThread != null) blockPushingThread.interrupt() - } - - /** Read a buffer fully from a given Channel */ - private def readFully(channel: ReadableByteChannel, dest: ByteBuffer) { - while (dest.position < dest.limit) { - if (channel.read(dest) == -1) { - throw new EOFException("End of channel") - } - } - } -} diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala deleted file mode 100644 index f63a9e0011..0000000000 --- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala +++ /dev/null @@ -1,149 +0,0 @@ -package spark.streaming - -import spark.streaming.StreamingContext._ - -import spark.RDD -import spark.rdd.UnionRDD -import spark.rdd.CoGroupedRDD -import spark.Partitioner -import spark.SparkContext._ -import spark.storage.StorageLevel - -import scala.collection.mutable.ArrayBuffer -import collection.SeqProxy - -class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( - parent: DStream[(K, V)], - reduceFunc: (V, V) => V, - invReduceFunc: (V, V) => V, - _windowTime: Time, - _slideTime: Time, - partitioner: Partitioner - ) extends DStream[(K,V)](parent.ssc) { - - assert(_windowTime.isMultipleOf(parent.slideTime), - "The window duration of ReducedWindowedDStream (" + _slideTime + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")" - ) - - assert(_slideTime.isMultipleOf(parent.slideTime), - "The slide duration of ReducedWindowedDStream (" + _slideTime + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")" - ) - - // Reduce each batch of data using reduceByKey which will be further reduced by window - // by ReducedWindowedDStream - val reducedStream = parent.reduceByKey(reduceFunc, partitioner) - - // Persist RDDs to memory by default as these RDDs are going to be reused. - super.persist(StorageLevel.MEMORY_ONLY_SER) - reducedStream.persist(StorageLevel.MEMORY_ONLY_SER) - - def windowTime: Time = _windowTime - - override def dependencies = List(reducedStream) - - override def slideTime: Time = _slideTime - - override val mustCheckpoint = true - - override def parentRememberDuration: Time = rememberDuration + windowTime - - override def persist(storageLevel: StorageLevel): DStream[(K,V)] = { - super.persist(storageLevel) - reducedStream.persist(storageLevel) - this - } - - override def checkpoint(interval: Time): DStream[(K, V)] = { - super.checkpoint(interval) - //reducedStream.checkpoint(interval) - this - } - - override def compute(validTime: Time): Option[RDD[(K, V)]] = { - val reduceF = reduceFunc - val invReduceF = invReduceFunc - - val currentTime = validTime - val currentWindow = Interval(currentTime - windowTime + parent.slideTime, currentTime) - val previousWindow = currentWindow - slideTime - - logDebug("Window time = " + windowTime) - logDebug("Slide time = " + slideTime) - logDebug("ZeroTime = " + zeroTime) - logDebug("Current window = " + currentWindow) - logDebug("Previous window = " + previousWindow) - - // _____________________________ - // | previous window _________|___________________ - // |___________________| current window | --------------> Time - // |_____________________________| - // - // |________ _________| |________ _________| - // | | - // V V - // old RDDs new RDDs - // - - // Get the RDDs of the reduced values in "old time steps" - val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideTime) - logDebug("# old RDDs = " + oldRDDs.size) - - // Get the RDDs of the reduced values in "new time steps" - val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideTime, currentWindow.endTime) - logDebug("# new RDDs = " + newRDDs.size) - - // Get the RDD of the reduced value of the previous window - val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]())) - - // Make the list of RDDs that needs to cogrouped together for reducing their reduced values - val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs - - // Cogroup the reduced RDDs and merge the reduced values - val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner) - //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ - - val numOldValues = oldRDDs.size - val numNewValues = newRDDs.size - - val mergeValues = (seqOfValues: Seq[Seq[V]]) => { - if (seqOfValues.size != 1 + numOldValues + numNewValues) { - throw new Exception("Unexpected number of sequences of reduced values") - } - // Getting reduced values "old time steps" that will be removed from current window - val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head) - // Getting reduced values "new time steps" - val newValues = (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head) - if (seqOfValues(0).isEmpty) { - // If previous window's reduce value does not exist, then at least new values should exist - if (newValues.isEmpty) { - throw new Exception("Neither previous window has value for key, nor new values found. " + - "Are you sure your key class hashes consistently?") - } - // Reduce the new values - newValues.reduce(reduceF) // return - } else { - // Get the previous window's reduced value - var tempValue = seqOfValues(0).head - // If old values exists, then inverse reduce then from previous value - if (!oldValues.isEmpty) { - tempValue = invReduceF(tempValue, oldValues.reduce(reduceF)) - } - // If new values exists, then reduce them with previous value - if (!newValues.isEmpty) { - tempValue = reduceF(tempValue, newValues.reduce(reduceF)) - } - tempValue // return - } - } - - val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues) - - Some(mergedValuesRDD) - } - - -} - - diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index fd1fa77a24..aeb7c3eb0e 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -4,9 +4,6 @@ import util.{ManualClock, RecurringTimer, Clock} import spark.SparkEnv import spark.Logging -import scala.collection.mutable.HashMap - - private[streaming] class Scheduler(ssc: StreamingContext) extends Logging { diff --git a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala deleted file mode 100644 index a9e37c0ff0..0000000000 --- a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala +++ /dev/null @@ -1,107 +0,0 @@ -package spark.streaming - -import spark.streaming.util.{RecurringTimer, SystemClock} -import spark.storage.StorageLevel - -import java.io._ -import java.net.Socket -import java.util.concurrent.ArrayBlockingQueue - -import scala.collection.mutable.ArrayBuffer -import scala.Serializable - -class SocketInputDStream[T: ClassManifest]( - @transient ssc_ : StreamingContext, - host: String, - port: Int, - bytesToObjects: InputStream => Iterator[T], - storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_) { - - def createReceiver(): NetworkReceiver[T] = { - new SocketReceiver(id, host, port, bytesToObjects, storageLevel) - } -} - - -class SocketReceiver[T: ClassManifest]( - streamId: Int, - host: String, - port: Int, - bytesToObjects: InputStream => Iterator[T], - storageLevel: StorageLevel - ) extends NetworkReceiver[T](streamId) { - - lazy protected val dataHandler = new DataHandler(this, storageLevel) - - override def getLocationPreference = None - - protected def onStart() { - logInfo("Connecting to " + host + ":" + port) - val socket = new Socket(host, port) - logInfo("Connected to " + host + ":" + port) - dataHandler.start() - val iterator = bytesToObjects(socket.getInputStream()) - while(iterator.hasNext) { - val obj = iterator.next - dataHandler += obj - } - } - - protected def onStop() { - dataHandler.stop() - } - -} - - -object SocketReceiver { - - /** - * This methods translates the data from an inputstream (say, from a socket) - * to '\n' delimited strings and returns an iterator to access the strings. - */ - def bytesToLines(inputStream: InputStream): Iterator[String] = { - val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")) - - val iterator = new Iterator[String] { - var gotNext = false - var finished = false - var nextValue: String = null - - private def getNext() { - try { - nextValue = dataInputStream.readLine() - if (nextValue == null) { - finished = true - } - } - gotNext = true - } - - override def hasNext: Boolean = { - if (!finished) { - if (!gotNext) { - getNext() - if (finished) { - dataInputStream.close() - } - } - } - !finished - } - - override def next(): String = { - if (finished) { - throw new NoSuchElementException("End of stream") - } - if (!gotNext) { - getNext() - } - gotNext = false - nextValue - } - } - iterator - } -} diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala deleted file mode 100644 index b7e4c1c30c..0000000000 --- a/streaming/src/main/scala/spark/streaming/StateDStream.scala +++ /dev/null @@ -1,84 +0,0 @@ -package spark.streaming - -import spark.RDD -import spark.rdd.BlockRDD -import spark.Partitioner -import spark.rdd.MapPartitionsRDD -import spark.SparkContext._ -import spark.storage.StorageLevel - -class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest]( - parent: DStream[(K, V)], - updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], - partitioner: Partitioner, - preservePartitioning: Boolean - ) extends DStream[(K, S)](parent.ssc) { - - super.persist(StorageLevel.MEMORY_ONLY_SER) - - override def dependencies = List(parent) - - override def slideTime = parent.slideTime - - override val mustCheckpoint = true - - override def compute(validTime: Time): Option[RDD[(K, S)]] = { - - // Try to get the previous state RDD - getOrCompute(validTime - slideTime) match { - - case Some(prevStateRDD) => { // If previous state RDD exists - - // Try to get the parent RDD - parent.getOrCompute(validTime) match { - case Some(parentRDD) => { // If parent RDD exists, then compute as usual - - // Define the function for the mapPartition operation on cogrouped RDD; - // first map the cogrouped tuple to tuples of required type, - // and then apply the update function - val updateFuncLocal = updateFunc - val finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => { - val i = iterator.map(t => { - (t._1, t._2._1, t._2._2.headOption) - }) - updateFuncLocal(i) - } - val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) - val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) - //logDebug("Generating state RDD for time " + validTime) - return Some(stateRDD) - } - case None => { // If parent RDD does not exist, then return old state RDD - return Some(prevStateRDD) - } - } - } - - case None => { // If previous session RDD does not exist (first input data) - - // Try to get the parent RDD - parent.getOrCompute(validTime) match { - case Some(parentRDD) => { // If parent RDD exists, then compute as usual - - // Define the function for the mapPartition operation on grouped RDD; - // first map the grouped tuple to tuples of required type, - // and then apply the update function - val updateFuncLocal = updateFunc - val finalFunc = (iterator: Iterator[(K, Seq[V])]) => { - updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, None))) - } - - val groupedRDD = parentRDD.groupByKey(partitioner) - val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) - //logDebug("Generating state RDD for time " + validTime + " (first)") - return Some(sessionRDD) - } - case None => { // If parent RDD does not exist, then nothing to do! - //logDebug("Not generating state RDD (no previous state, no parent)") - return None - } - } - } - } - } -} diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 998fea849f..ef73049a81 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -1,10 +1,10 @@ package spark.streaming -import spark.RDD -import spark.Logging -import spark.SparkEnv -import spark.SparkContext +import spark.streaming.dstream._ + +import spark.{RDD, Logging, SparkEnv, SparkContext} import spark.storage.StorageLevel +import spark.util.MetadataCleaner import scala.collection.mutable.Queue @@ -18,7 +18,6 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.flume.source.avro.AvroFlumeEvent import org.apache.hadoop.fs.Path import java.util.UUID -import spark.util.MetadataCleaner /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -126,7 +125,7 @@ class StreamingContext private ( /** * Create an input stream that pulls messages form a Kafka Broker. * - * @param host Zookeper hostname. + * @param hostname Zookeper hostname. * @param port Zookeper port. * @param groupId The group id for this consumer. * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed @@ -319,7 +318,7 @@ object StreamingContext { protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = { if (prefix == null) { - time.millis.toString + time.milliseconds.toString } else if (suffix == null || suffix.length ==0) { prefix + "-" + time.milliseconds } else { diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala index 480d292d7c..2976e5e87b 100644 --- a/streaming/src/main/scala/spark/streaming/Time.scala +++ b/streaming/src/main/scala/spark/streaming/Time.scala @@ -1,6 +1,11 @@ package spark.streaming -case class Time(millis: Long) { +/** + * This class is simple wrapper class that represents time in UTC. + * @param millis Time in UTC long + */ + +case class Time(private val millis: Long) { def < (that: Time): Boolean = (this.millis < that.millis) @@ -15,7 +20,9 @@ case class Time(millis: Long) { def - (that: Time): Time = Time(millis - that.millis) def * (times: Int): Time = Time(millis * times) - + + def / (that: Time): Long = millis / that.millis + def floor(that: Time): Time = { val t = that.millis val m = math.floor(this.millis / t).toLong diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala deleted file mode 100644 index e4d2a634f5..0000000000 --- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala +++ /dev/null @@ -1,39 +0,0 @@ -package spark.streaming - -import spark.RDD -import spark.rdd.UnionRDD -import spark.storage.StorageLevel - - -class WindowedDStream[T: ClassManifest]( - parent: DStream[T], - _windowTime: Time, - _slideTime: Time) - extends DStream[T](parent.ssc) { - - if (!_windowTime.isMultipleOf(parent.slideTime)) - throw new Exception("The window duration of WindowedDStream (" + _slideTime + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") - - if (!_slideTime.isMultipleOf(parent.slideTime)) - throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") - - parent.persist(StorageLevel.MEMORY_ONLY_SER) - - def windowTime: Time = _windowTime - - override def dependencies = List(parent) - - override def slideTime: Time = _slideTime - - override def parentRememberDuration: Time = rememberDuration + windowTime - - override def compute(validTime: Time): Option[RDD[T]] = { - val currentWindow = Interval(validTime - windowTime + parent.slideTime, validTime) - Some(new UnionRDD(ssc.sc, parent.slice(currentWindow))) - } -} - - - diff --git a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala new file mode 100644 index 0000000000..2e427dadf7 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala @@ -0,0 +1,39 @@ +package spark.streaming.dstream + +import spark.{RDD, Partitioner} +import spark.rdd.CoGroupedRDD +import spark.streaming.{Time, DStream} + +class CoGroupedDStream[K : ClassManifest]( + parents: Seq[DStream[(_, _)]], + partitioner: Partitioner + ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) { + + if (parents.length == 0) { + throw new IllegalArgumentException("Empty array of parents") + } + + if (parents.map(_.ssc).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different StreamingContexts") + } + + if (parents.map(_.slideTime).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different slide times") + } + + override def dependencies = parents.toList + + override def slideTime = parents.head.slideTime + + override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = { + val part = partitioner + val rdds = parents.flatMap(_.getOrCompute(validTime)) + if (rdds.size > 0) { + val q = new CoGroupedRDD[K](rdds, part) + Some(q) + } else { + None + } + } + +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala new file mode 100644 index 0000000000..41c3af4694 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala @@ -0,0 +1,19 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.streaming.{Time, StreamingContext} + +/** + * An input stream that always returns the same RDD on each timestep. Useful for testing. + */ +class ConstantInputDStream[T: ClassManifest](ssc_ : StreamingContext, rdd: RDD[T]) + extends InputDStream[T](ssc_) { + + override def start() {} + + override def stop() {} + + override def compute(validTime: Time): Option[RDD[T]] = { + Some(rdd) + } +} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/dstream/DataHandler.scala b/streaming/src/main/scala/spark/streaming/dstream/DataHandler.scala new file mode 100644 index 0000000000..d737ba1ecc --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/DataHandler.scala @@ -0,0 +1,83 @@ +package spark.streaming.dstream + +import java.util.concurrent.ArrayBlockingQueue +import scala.collection.mutable.ArrayBuffer +import spark.Logging +import spark.streaming.util.{RecurringTimer, SystemClock} +import spark.storage.StorageLevel + + +/** + * This is a helper object that manages the data received from the socket. It divides + * the object received into small batches of 100s of milliseconds, pushes them as + * blocks into the block manager and reports the block IDs to the network input + * tracker. It starts two threads, one to periodically start a new batch and prepare + * the previous batch of as a block, the other to push the blocks into the block + * manager. + */ + class DataHandler[T](receiver: NetworkReceiver[T], storageLevel: StorageLevel) + extends Serializable with Logging { + + case class Block(id: String, iterator: Iterator[T], metadata: Any = null) + + val clock = new SystemClock() + val blockInterval = 200L + val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) + val blockStorageLevel = storageLevel + val blocksForPushing = new ArrayBlockingQueue[Block](1000) + val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } + + var currentBuffer = new ArrayBuffer[T] + + def createBlock(blockId: String, iterator: Iterator[T]) : Block = { + new Block(blockId, iterator) + } + + def start() { + blockIntervalTimer.start() + blockPushingThread.start() + logInfo("Data handler started") + } + + def stop() { + blockIntervalTimer.stop() + blockPushingThread.interrupt() + logInfo("Data handler stopped") + } + + def += (obj: T) { + currentBuffer += obj + } + + def updateCurrentBuffer(time: Long) { + try { + val newBlockBuffer = currentBuffer + currentBuffer = new ArrayBuffer[T] + if (newBlockBuffer.size > 0) { + val blockId = "input-" + receiver.streamId + "- " + (time - blockInterval) + val newBlock = createBlock(blockId, newBlockBuffer.toIterator) + blocksForPushing.add(newBlock) + } + } catch { + case ie: InterruptedException => + logInfo("Block interval timer thread interrupted") + case e: Exception => + receiver.stop() + } + } + + def keepPushingBlocks() { + logInfo("Block pushing thread started") + try { + while(true) { + val block = blocksForPushing.take() + receiver.pushBlock(block.id, block.iterator, block.metadata, storageLevel) + } + } catch { + case ie: InterruptedException => + logInfo("Block pushing thread interrupted") + case e: Exception => + receiver.stop() + } + } + } \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala new file mode 100644 index 0000000000..8cdaff467b --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala @@ -0,0 +1,110 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.rdd.UnionRDD +import spark.streaming.{StreamingContext, Time} + +import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} + +import scala.collection.mutable.HashSet + + +class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( + @transient ssc_ : StreamingContext, + directory: String, + filter: PathFilter = FileInputDStream.defaultPathFilter, + newFilesOnly: Boolean = true) + extends InputDStream[(K, V)](ssc_) { + + @transient private var path_ : Path = null + @transient private var fs_ : FileSystem = null + + var lastModTime = 0L + val lastModTimeFiles = new HashSet[String]() + + def path(): Path = { + if (path_ == null) path_ = new Path(directory) + path_ + } + + def fs(): FileSystem = { + if (fs_ == null) fs_ = path.getFileSystem(new Configuration()) + fs_ + } + + override def start() { + if (newFilesOnly) { + lastModTime = System.currentTimeMillis() + } else { + lastModTime = 0 + } + } + + override def stop() { } + + /** + * Finds the files that were modified since the last time this method was called and makes + * a union RDD out of them. Note that this maintains the list of files that were processed + * in the latest modification time in the previous call to this method. This is because the + * modification time returned by the FileStatus API seems to return times only at the + * granularity of seconds. Hence, new files may have the same modification time as the + * latest modification time in the previous call to this method and the list of files + * maintained is used to filter the one that have been processed. + */ + override def compute(validTime: Time): Option[RDD[(K, V)]] = { + // Create the filter for selecting new files + val newFilter = new PathFilter() { + var latestModTime = 0L + val latestModTimeFiles = new HashSet[String]() + + def accept(path: Path): Boolean = { + if (!filter.accept(path)) { + return false + } else { + val modTime = fs.getFileStatus(path).getModificationTime() + if (modTime < lastModTime){ + return false + } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) { + return false + } + if (modTime > latestModTime) { + latestModTime = modTime + latestModTimeFiles.clear() + } + latestModTimeFiles += path.toString + return true + } + } + } + + val newFiles = fs.listStatus(path, newFilter) + logInfo("New files: " + newFiles.map(_.getPath).mkString(", ")) + if (newFiles.length > 0) { + // Update the modification time and the files processed for that modification time + if (lastModTime != newFilter.latestModTime) { + lastModTime = newFilter.latestModTime + lastModTimeFiles.clear() + } + lastModTimeFiles ++= newFilter.latestModTimeFiles + } + val newRDD = new UnionRDD(ssc.sc, newFiles.map( + file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString))) + Some(newRDD) + } +} + +object FileInputDStream { + val defaultPathFilter = new PathFilter with Serializable { + def accept(path: Path): Boolean = { + val file = path.getName() + if (file.startsWith(".") || file.endsWith("_tmp")) { + return false + } else { + return true + } + } + } +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala new file mode 100644 index 0000000000..1cbb4d536e --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala @@ -0,0 +1,21 @@ +package spark.streaming.dstream + +import spark.streaming.{DStream, Time} +import spark.RDD + +private[streaming] +class FilteredDStream[T: ClassManifest]( + parent: DStream[T], + filterFunc: T => Boolean + ) extends DStream[T](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[T]] = { + parent.getOrCompute(validTime).map(_.filter(filterFunc)) + } +} + + diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala new file mode 100644 index 0000000000..11ed8cf317 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala @@ -0,0 +1,20 @@ +package spark.streaming.dstream + +import spark.streaming.{DStream, Time} +import spark.RDD +import spark.SparkContext._ + +private[streaming] +class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( + parent: DStream[(K, V)], + flatMapValueFunc: V => TraversableOnce[U] + ) extends DStream[(K, U)](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[(K, U)]] = { + parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)) + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala new file mode 100644 index 0000000000..a13b4c9ff9 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala @@ -0,0 +1,20 @@ +package spark.streaming.dstream + +import spark.streaming.{DStream, Time} +import spark.RDD + +private[streaming] +class FlatMappedDStream[T: ClassManifest, U: ClassManifest]( + parent: DStream[T], + flatMapFunc: T => Traversable[U] + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) + } +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala new file mode 100644 index 0000000000..7e988cadf4 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala @@ -0,0 +1,135 @@ +package spark.streaming.dstream + +import spark.streaming.StreamingContext + +import spark.Utils +import spark.storage.StorageLevel + +import org.apache.flume.source.avro.AvroSourceProtocol +import org.apache.flume.source.avro.AvroFlumeEvent +import org.apache.flume.source.avro.Status +import org.apache.avro.ipc.specific.SpecificResponder +import org.apache.avro.ipc.NettyServer + +import scala.collection.JavaConversions._ + +import java.net.InetSocketAddress +import java.io.{ObjectInput, ObjectOutput, Externalizable} +import java.nio.ByteBuffer + +class FlumeInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + storageLevel: StorageLevel +) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { + + override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = { + new FlumeReceiver(id, host, port, storageLevel) + } +} + +/** + * A wrapper class for AvroFlumeEvent's with a custom serialization format. + * + * This is necessary because AvroFlumeEvent uses inner data structures + * which are not serializable. + */ +class SparkFlumeEvent() extends Externalizable { + var event : AvroFlumeEvent = new AvroFlumeEvent() + + /* De-serialize from bytes. */ + def readExternal(in: ObjectInput) { + val bodyLength = in.readInt() + val bodyBuff = new Array[Byte](bodyLength) + in.read(bodyBuff) + + val numHeaders = in.readInt() + val headers = new java.util.HashMap[CharSequence, CharSequence] + + for (i <- 0 until numHeaders) { + val keyLength = in.readInt() + val keyBuff = new Array[Byte](keyLength) + in.read(keyBuff) + val key : String = Utils.deserialize(keyBuff) + + val valLength = in.readInt() + val valBuff = new Array[Byte](valLength) + in.read(valBuff) + val value : String = Utils.deserialize(valBuff) + + headers.put(key, value) + } + + event.setBody(ByteBuffer.wrap(bodyBuff)) + event.setHeaders(headers) + } + + /* Serialize to bytes. */ + def writeExternal(out: ObjectOutput) { + val body = event.getBody.array() + out.writeInt(body.length) + out.write(body) + + val numHeaders = event.getHeaders.size() + out.writeInt(numHeaders) + for ((k, v) <- event.getHeaders) { + val keyBuff = Utils.serialize(k.toString) + out.writeInt(keyBuff.length) + out.write(keyBuff) + val valBuff = Utils.serialize(v.toString) + out.writeInt(valBuff.length) + out.write(valBuff) + } + } +} + +private[streaming] object SparkFlumeEvent { + def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = { + val event = new SparkFlumeEvent + event.event = in + event + } +} + +/** A simple server that implements Flume's Avro protocol. */ +class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { + override def append(event : AvroFlumeEvent) : Status = { + receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event) + Status.OK + } + + override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { + events.foreach (event => + receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)) + Status.OK + } +} + +/** A NetworkReceiver which listens for events using the + * Flume Avro interface.*/ +class FlumeReceiver( + streamId: Int, + host: String, + port: Int, + storageLevel: StorageLevel + ) extends NetworkReceiver[SparkFlumeEvent](streamId) { + + lazy val dataHandler = new DataHandler(this, storageLevel) + + protected override def onStart() { + val responder = new SpecificResponder( + classOf[AvroSourceProtocol], new FlumeEventServer(this)); + val server = new NettyServer(responder, new InetSocketAddress(host, port)); + dataHandler.start() + server.start() + logInfo("Flume receiver started") + } + + protected override def onStop() { + dataHandler.stop() + logInfo("Flume receiver stopped") + } + + override def getLocationPreference = Some(host) +} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala new file mode 100644 index 0000000000..41c629a225 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala @@ -0,0 +1,28 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.streaming.{DStream, Job, Time} + +private[streaming] +class ForEachDStream[T: ClassManifest] ( + parent: DStream[T], + foreachFunc: (RDD[T], Time) => Unit + ) extends DStream[Unit](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[Unit]] = None + + override def generateJob(time: Time): Option[Job] = { + parent.getOrCompute(time) match { + case Some(rdd) => + val jobFunc = () => { + foreachFunc(rdd, time) + } + Some(new Job(time, jobFunc)) + case None => None + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala new file mode 100644 index 0000000000..92ea503cae --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala @@ -0,0 +1,17 @@ +package spark.streaming.dstream + +import spark.streaming.{DStream, Time} +import spark.RDD + +private[streaming] +class GlommedDStream[T: ClassManifest](parent: DStream[T]) + extends DStream[Array[T]](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[Array[T]]] = { + parent.getOrCompute(validTime).map(_.glom()) + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala new file mode 100644 index 0000000000..4959c66b06 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala @@ -0,0 +1,19 @@ +package spark.streaming.dstream + +import spark.streaming.{StreamingContext, DStream} + +abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext) + extends DStream[T](ssc_) { + + override def dependencies = List() + + override def slideTime = { + if (ssc == null) throw new Exception("ssc is null") + if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null") + ssc.graph.batchDuration + } + + def start() + + def stop() +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala new file mode 100644 index 0000000000..a46721af2f --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -0,0 +1,197 @@ +package spark.streaming.dstream + +import spark.Logging +import spark.storage.StorageLevel +import spark.streaming.{Time, DStreamCheckpointData, StreamingContext} + +import java.util.Properties +import java.util.concurrent.Executors + +import kafka.consumer._ +import kafka.message.{Message, MessageSet, MessageAndMetadata} +import kafka.serializer.StringDecoder +import kafka.utils.{Utils, ZKGroupTopicDirs} +import kafka.utils.ZkUtils._ + +import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ + + +// Key for a specific Kafka Partition: (broker, topic, group, part) +case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int) +// NOT USED - Originally intended for fault-tolerance +// Metadata for a Kafka Stream that it sent to the Master +case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long]) +// NOT USED - Originally intended for fault-tolerance +// Checkpoint data specific to a KafkaInputDstream +case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], + savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds) + +/** + * Input stream that pulls messages from a Kafka Broker. + * + * @param host Zookeper hostname. + * @param port Zookeper port. + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param initialOffsets Optional initial offsets for each of the partitions to consume. + * By default the value is pulled from zookeper. + * @param storageLevel RDD storage level. + */ +class KafkaInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + groupId: String, + topics: Map[String, Int], + initialOffsets: Map[KafkaPartitionKey, Long], + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_ ) with Logging { + + // Metadata that keeps track of which messages have already been consumed. + var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]() + + /* NOT USED - Originally intended for fault-tolerance + + // In case of a failure, the offets for a particular timestamp will be restored. + @transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null + + + override protected[streaming] def addMetadata(metadata: Any) { + metadata match { + case x : KafkaInputDStreamMetadata => + savedOffsets(x.timestamp) = x.data + // TOOD: Remove logging + logInfo("New saved Offsets: " + savedOffsets) + case _ => logInfo("Received unknown metadata: " + metadata.toString) + } + } + + override protected[streaming] def updateCheckpointData(currentTime: Time) { + super.updateCheckpointData(currentTime) + if(savedOffsets.size > 0) { + // Find the offets that were stored before the checkpoint was initiated + val key = savedOffsets.keys.toList.sortWith(_ < _).filter(_ < currentTime.millis).last + val latestOffsets = savedOffsets(key) + logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString) + checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets) + // TODO: This may throw out offsets that are created after the checkpoint, + // but it's unlikely we'll need them. + savedOffsets.clear() + } + } + + override protected[streaming] def restoreCheckpointData() { + super.restoreCheckpointData() + logInfo("Restoring KafkaDStream checkpoint data.") + checkpointData match { + case x : KafkaDStreamCheckpointData => + restoredOffsets = x.savedOffsets + logInfo("Restored KafkaDStream offsets: " + savedOffsets) + } + } */ + + def createReceiver(): NetworkReceiver[T] = { + new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel) + .asInstanceOf[NetworkReceiver[T]] + } +} + +class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, + topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], + storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) { + + // Timeout for establishing a connection to Zookeper in ms. + val ZK_TIMEOUT = 10000 + + // Handles pushing data into the BlockManager + lazy protected val dataHandler = new DataHandler(this, storageLevel) + // Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset + lazy val offsets = HashMap[KafkaPartitionKey, Long]() + // Connection to Kafka + var consumerConnector : ZookeeperConsumerConnector = null + + def onStop() { + dataHandler.stop() + } + + def onStart() { + + // Starting the DataHandler that buffers blocks and pushes them into them BlockManager + dataHandler.start() + + // In case we are using multiple Threads to handle Kafka Messages + val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) + + val zooKeeperEndPoint = host + ":" + port + logInfo("Starting Kafka Consumer Stream with group: " + groupId) + logInfo("Initial offsets: " + initialOffsets.toString) + + // Zookeper connection properties + val props = new Properties() + props.put("zk.connect", zooKeeperEndPoint) + props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString) + props.put("groupid", groupId) + + // Create the connection to the cluster + logInfo("Connecting to Zookeper: " + zooKeeperEndPoint) + val consumerConfig = new ConsumerConfig(props) + consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector] + logInfo("Connected to " + zooKeeperEndPoint) + + // Reset the Kafka offsets in case we are recovering from a failure + resetOffsets(initialOffsets) + + // Create Threads for each Topic/Message Stream we are listening + val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder()) + + // Start the messages handler for each partition + topicMessageStreams.values.foreach { streams => + streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } + } + + } + + // Overwrites the offets in Zookeper. + private def resetOffsets(offsets: Map[KafkaPartitionKey, Long]) { + offsets.foreach { case(key, offset) => + val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic) + val partitionName = key.brokerId + "-" + key.partId + updatePersistentPath(consumerConnector.zkClient, + topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString) + } + } + + // Handles Kafka Messages + private class MessageHandler(stream: KafkaStream[String]) extends Runnable { + def run() { + logInfo("Starting MessageHandler.") + stream.takeWhile { msgAndMetadata => + dataHandler += msgAndMetadata.message + + // Updating the offet. The key is (broker, topic, group, partition). + val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic, + groupId, msgAndMetadata.topicInfo.partition.partId) + val offset = msgAndMetadata.topicInfo.getConsumeOffset + offsets.put(key, offset) + // logInfo("Handled message: " + (key, offset).toString) + + // Keep on handling messages + true + } + } + } + + // NOT USED - Originally intended for fault-tolerance + // class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel) + // extends DataHandler[Any](receiver, storageLevel) { + + // override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = { + // // Creates a new Block with Kafka-specific Metadata + // new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap)) + // } + + // } + +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala new file mode 100644 index 0000000000..daf78c6893 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala @@ -0,0 +1,21 @@ +package spark.streaming.dstream + +import spark.streaming.{DStream, Time} +import spark.RDD + +private[streaming] +class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( + parent: DStream[T], + mapPartFunc: Iterator[T] => Iterator[U], + preservePartitioning: Boolean + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) + } +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala new file mode 100644 index 0000000000..689caeef0e --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala @@ -0,0 +1,21 @@ +package spark.streaming.dstream + +import spark.streaming.{DStream, Time} +import spark.RDD +import spark.SparkContext._ + +private[streaming] +class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( + parent: DStream[(K, V)], + mapValueFunc: V => U + ) extends DStream[(K, U)](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[(K, U)]] = { + parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc)) + } +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala new file mode 100644 index 0000000000..786b9966f2 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala @@ -0,0 +1,20 @@ +package spark.streaming.dstream + +import spark.streaming.{DStream, Time} +import spark.RDD + +private[streaming] +class MappedDStream[T: ClassManifest, U: ClassManifest] ( + parent: DStream[T], + mapFunc: T => U + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(_.map[U](mapFunc)) + } +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala new file mode 100644 index 0000000000..41276da8bb --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -0,0 +1,157 @@ +package spark.streaming.dstream + +import spark.streaming.{Time, StreamingContext, AddBlocks, RegisterReceiver, DeregisterReceiver} + +import spark.{Logging, SparkEnv, RDD} +import spark.rdd.BlockRDD +import spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer + +import java.nio.ByteBuffer + +import akka.actor.{Props, Actor} +import akka.pattern.ask +import akka.dispatch.Await +import akka.util.duration._ + +abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext) + extends InputDStream[T](ssc_) { + + // This is an unique identifier that is used to match the network receiver with the + // corresponding network input stream. + val id = ssc.getNewNetworkStreamId() + + /** + * This method creates the receiver object that will be sent to the workers + * to receive data. This method needs to defined by any specific implementation + * of a NetworkInputDStream. + */ + def createReceiver(): NetworkReceiver[T] + + // Nothing to start or stop as both taken care of by the NetworkInputTracker. + def start() {} + + def stop() {} + + override def compute(validTime: Time): Option[RDD[T]] = { + val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime) + Some(new BlockRDD[T](ssc.sc, blockIds)) + } +} + + +private[streaming] sealed trait NetworkReceiverMessage +private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage +private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage +private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage + +abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Serializable with Logging { + + initLogging() + + lazy protected val env = SparkEnv.get + + lazy protected val actor = env.actorSystem.actorOf( + Props(new NetworkReceiverActor()), "NetworkReceiver-" + streamId) + + lazy protected val receivingThread = Thread.currentThread() + + /** This method will be called to start receiving data. */ + protected def onStart() + + /** This method will be called to stop receiving data. */ + protected def onStop() + + /** This method conveys a placement preference (hostname) for this receiver. */ + def getLocationPreference() : Option[String] = None + + /** + * This method starts the receiver. First is accesses all the lazy members to + * materialize them. Then it calls the user-defined onStart() method to start + * other threads, etc required to receiver the data. + */ + def start() { + try { + // Access the lazy vals to materialize them + env + actor + receivingThread + + // Call user-defined onStart() + onStart() + } catch { + case ie: InterruptedException => + logInfo("Receiving thread interrupted") + //println("Receiving thread interrupted") + case e: Exception => + stopOnError(e) + } + } + + /** + * This method stops the receiver. First it interrupts the main receiving thread, + * that is, the thread that called receiver.start(). Then it calls the user-defined + * onStop() method to stop other threads and/or do cleanup. + */ + def stop() { + receivingThread.interrupt() + onStop() + //TODO: terminate the actor + } + + /** + * This method stops the receiver and reports to exception to the tracker. + * This should be called whenever an exception has happened on any thread + * of the receiver. + */ + protected def stopOnError(e: Exception) { + logError("Error receiving data", e) + stop() + actor ! ReportError(e.toString) + } + + + /** + * This method pushes a block (as iterator of values) into the block manager. + */ + def pushBlock(blockId: String, iterator: Iterator[T], metadata: Any, level: StorageLevel) { + val buffer = new ArrayBuffer[T] ++ iterator + env.blockManager.put(blockId, buffer.asInstanceOf[ArrayBuffer[Any]], level) + + actor ! ReportBlock(blockId, metadata) + } + + /** + * This method pushes a block (as bytes) into the block manager. + */ + def pushBlock(blockId: String, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { + env.blockManager.putBytes(blockId, bytes, level) + actor ! ReportBlock(blockId, metadata) + } + + /** A helper actor that communicates with the NetworkInputTracker */ + private class NetworkReceiverActor extends Actor { + logInfo("Attempting to register with tracker") + val ip = System.getProperty("spark.master.host", "localhost") + val port = System.getProperty("spark.master.port", "7077").toInt + val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port) + val tracker = env.actorSystem.actorFor(url) + val timeout = 5.seconds + + override def preStart() { + val future = tracker.ask(RegisterReceiver(streamId, self))(timeout) + Await.result(future, timeout) + } + + override def receive() = { + case ReportBlock(blockId, metadata) => + tracker ! AddBlocks(streamId, Array(blockId), metadata) + case ReportError(msg) => + tracker ! DeregisterReceiver(streamId, msg) + case StopReceiver(msg) => + stop() + tracker ! DeregisterReceiver(streamId, msg) + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala new file mode 100644 index 0000000000..024bf3bea4 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala @@ -0,0 +1,41 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.rdd.UnionRDD + +import scala.collection.mutable.Queue +import scala.collection.mutable.ArrayBuffer +import spark.streaming.{Time, StreamingContext} + +class QueueInputDStream[T: ClassManifest]( + @transient ssc: StreamingContext, + val queue: Queue[RDD[T]], + oneAtATime: Boolean, + defaultRDD: RDD[T] + ) extends InputDStream[T](ssc) { + + override def start() { } + + override def stop() { } + + override def compute(validTime: Time): Option[RDD[T]] = { + val buffer = new ArrayBuffer[RDD[T]]() + if (oneAtATime && queue.size > 0) { + buffer += queue.dequeue() + } else { + buffer ++= queue + } + if (buffer.size > 0) { + if (oneAtATime) { + Some(buffer.first) + } else { + Some(new UnionRDD(ssc.sc, buffer.toSeq)) + } + } else if (defaultRDD != null) { + Some(defaultRDD) + } else { + None + } + } + +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala new file mode 100644 index 0000000000..996cc7dea8 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala @@ -0,0 +1,88 @@ +package spark.streaming.dstream + +import spark.{DaemonThread, Logging} +import spark.storage.StorageLevel +import spark.streaming.StreamingContext + +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.{ReadableByteChannel, SocketChannel} +import java.io.EOFException +import java.util.concurrent.ArrayBlockingQueue + + +/** + * An input stream that reads blocks of serialized objects from a given network address. + * The blocks will be inserted directly into the block store. This is the fastest way to get + * data into Spark Streaming, though it requires the sender to batch data and serialize it + * in the format that the system is configured with. + */ +class RawInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_ ) with Logging { + + def createReceiver(): NetworkReceiver[T] = { + new RawNetworkReceiver(id, host, port, storageLevel).asInstanceOf[NetworkReceiver[T]] + } +} + +class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel) + extends NetworkReceiver[Any](streamId) { + + var blockPushingThread: Thread = null + + override def getLocationPreference = None + + def onStart() { + // Open a socket to the target address and keep reading from it + logInfo("Connecting to " + host + ":" + port) + val channel = SocketChannel.open() + channel.configureBlocking(true) + channel.connect(new InetSocketAddress(host, port)) + logInfo("Connected to " + host + ":" + port) + + val queue = new ArrayBlockingQueue[ByteBuffer](2) + + blockPushingThread = new DaemonThread { + override def run() { + var nextBlockNumber = 0 + while (true) { + val buffer = queue.take() + val blockId = "input-" + streamId + "-" + nextBlockNumber + nextBlockNumber += 1 + pushBlock(blockId, buffer, null, storageLevel) + } + } + } + blockPushingThread.start() + + val lengthBuffer = ByteBuffer.allocate(4) + while (true) { + lengthBuffer.clear() + readFully(channel, lengthBuffer) + lengthBuffer.flip() + val length = lengthBuffer.getInt() + val dataBuffer = ByteBuffer.allocate(length) + readFully(channel, dataBuffer) + dataBuffer.flip() + logInfo("Read a block with " + length + " bytes") + queue.put(dataBuffer) + } + } + + def onStop() { + if (blockPushingThread != null) blockPushingThread.interrupt() + } + + /** Read a buffer fully from a given Channel */ + private def readFully(channel: ReadableByteChannel, dest: ByteBuffer) { + while (dest.position < dest.limit) { + if (channel.read(dest) == -1) { + throw new EOFException("End of channel") + } + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala new file mode 100644 index 0000000000..2686de14d2 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -0,0 +1,148 @@ +package spark.streaming.dstream + +import spark.streaming.StreamingContext._ + +import spark.RDD +import spark.rdd.CoGroupedRDD +import spark.Partitioner +import spark.SparkContext._ +import spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer +import spark.streaming.{Interval, Time, DStream} + +class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( + parent: DStream[(K, V)], + reduceFunc: (V, V) => V, + invReduceFunc: (V, V) => V, + _windowTime: Time, + _slideTime: Time, + partitioner: Partitioner + ) extends DStream[(K,V)](parent.ssc) { + + assert(_windowTime.isMultipleOf(parent.slideTime), + "The window duration of ReducedWindowedDStream (" + _slideTime + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")" + ) + + assert(_slideTime.isMultipleOf(parent.slideTime), + "The slide duration of ReducedWindowedDStream (" + _slideTime + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")" + ) + + // Reduce each batch of data using reduceByKey which will be further reduced by window + // by ReducedWindowedDStream + val reducedStream = parent.reduceByKey(reduceFunc, partitioner) + + // Persist RDDs to memory by default as these RDDs are going to be reused. + super.persist(StorageLevel.MEMORY_ONLY_SER) + reducedStream.persist(StorageLevel.MEMORY_ONLY_SER) + + def windowTime: Time = _windowTime + + override def dependencies = List(reducedStream) + + override def slideTime: Time = _slideTime + + override val mustCheckpoint = true + + override def parentRememberDuration: Time = rememberDuration + windowTime + + override def persist(storageLevel: StorageLevel): DStream[(K,V)] = { + super.persist(storageLevel) + reducedStream.persist(storageLevel) + this + } + + override def checkpoint(interval: Time): DStream[(K, V)] = { + super.checkpoint(interval) + //reducedStream.checkpoint(interval) + this + } + + override def compute(validTime: Time): Option[RDD[(K, V)]] = { + val reduceF = reduceFunc + val invReduceF = invReduceFunc + + val currentTime = validTime + val currentWindow = Interval(currentTime - windowTime + parent.slideTime, currentTime) + val previousWindow = currentWindow - slideTime + + logDebug("Window time = " + windowTime) + logDebug("Slide time = " + slideTime) + logDebug("ZeroTime = " + zeroTime) + logDebug("Current window = " + currentWindow) + logDebug("Previous window = " + previousWindow) + + // _____________________________ + // | previous window _________|___________________ + // |___________________| current window | --------------> Time + // |_____________________________| + // + // |________ _________| |________ _________| + // | | + // V V + // old RDDs new RDDs + // + + // Get the RDDs of the reduced values in "old time steps" + val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideTime) + logDebug("# old RDDs = " + oldRDDs.size) + + // Get the RDDs of the reduced values in "new time steps" + val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideTime, currentWindow.endTime) + logDebug("# new RDDs = " + newRDDs.size) + + // Get the RDD of the reduced value of the previous window + val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]())) + + // Make the list of RDDs that needs to cogrouped together for reducing their reduced values + val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs + + // Cogroup the reduced RDDs and merge the reduced values + val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner) + //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ + + val numOldValues = oldRDDs.size + val numNewValues = newRDDs.size + + val mergeValues = (seqOfValues: Seq[Seq[V]]) => { + if (seqOfValues.size != 1 + numOldValues + numNewValues) { + throw new Exception("Unexpected number of sequences of reduced values") + } + // Getting reduced values "old time steps" that will be removed from current window + val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head) + // Getting reduced values "new time steps" + val newValues = (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head) + if (seqOfValues(0).isEmpty) { + // If previous window's reduce value does not exist, then at least new values should exist + if (newValues.isEmpty) { + throw new Exception("Neither previous window has value for key, nor new values found. " + + "Are you sure your key class hashes consistently?") + } + // Reduce the new values + newValues.reduce(reduceF) // return + } else { + // Get the previous window's reduced value + var tempValue = seqOfValues(0).head + // If old values exists, then inverse reduce then from previous value + if (!oldValues.isEmpty) { + tempValue = invReduceF(tempValue, oldValues.reduce(reduceF)) + } + // If new values exists, then reduce them with previous value + if (!newValues.isEmpty) { + tempValue = reduceF(tempValue, newValues.reduce(reduceF)) + } + tempValue // return + } + } + + val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues) + + Some(mergedValuesRDD) + } + + +} + + diff --git a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala new file mode 100644 index 0000000000..6854bbe665 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala @@ -0,0 +1,27 @@ +package spark.streaming.dstream + +import spark.{RDD, Partitioner} +import spark.SparkContext._ +import spark.streaming.{DStream, Time} + +private[streaming] +class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( + parent: DStream[(K,V)], + createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiner: (C, C) => C, + partitioner: Partitioner + ) extends DStream [(K,C)] (parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[(K,C)]] = { + parent.getOrCompute(validTime) match { + case Some(rdd) => + Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner)) + case None => None + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala new file mode 100644 index 0000000000..af5b73ae8d --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala @@ -0,0 +1,103 @@ +package spark.streaming.dstream + +import spark.streaming.StreamingContext +import spark.storage.StorageLevel + +import java.io._ +import java.net.Socket + +class SocketInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + bytesToObjects: InputStream => Iterator[T], + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_) { + + def createReceiver(): NetworkReceiver[T] = { + new SocketReceiver(id, host, port, bytesToObjects, storageLevel) + } +} + + +class SocketReceiver[T: ClassManifest]( + streamId: Int, + host: String, + port: Int, + bytesToObjects: InputStream => Iterator[T], + storageLevel: StorageLevel + ) extends NetworkReceiver[T](streamId) { + + lazy protected val dataHandler = new DataHandler(this, storageLevel) + + override def getLocationPreference = None + + protected def onStart() { + logInfo("Connecting to " + host + ":" + port) + val socket = new Socket(host, port) + logInfo("Connected to " + host + ":" + port) + dataHandler.start() + val iterator = bytesToObjects(socket.getInputStream()) + while(iterator.hasNext) { + val obj = iterator.next + dataHandler += obj + } + } + + protected def onStop() { + dataHandler.stop() + } + +} + + +object SocketReceiver { + + /** + * This methods translates the data from an inputstream (say, from a socket) + * to '\n' delimited strings and returns an iterator to access the strings. + */ + def bytesToLines(inputStream: InputStream): Iterator[String] = { + val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")) + + val iterator = new Iterator[String] { + var gotNext = false + var finished = false + var nextValue: String = null + + private def getNext() { + try { + nextValue = dataInputStream.readLine() + if (nextValue == null) { + finished = true + } + } + gotNext = true + } + + override def hasNext: Boolean = { + if (!finished) { + if (!gotNext) { + getNext() + if (finished) { + dataInputStream.close() + } + } + } + !finished + } + + override def next(): String = { + if (finished) { + throw new NoSuchElementException("End of stream") + } + if (!gotNext) { + getNext() + } + gotNext = false + nextValue + } + } + iterator + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala new file mode 100644 index 0000000000..6e190b5564 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala @@ -0,0 +1,83 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.Partitioner +import spark.SparkContext._ +import spark.storage.StorageLevel +import spark.streaming.{Time, DStream} + +class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest]( + parent: DStream[(K, V)], + updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], + partitioner: Partitioner, + preservePartitioning: Boolean + ) extends DStream[(K, S)](parent.ssc) { + + super.persist(StorageLevel.MEMORY_ONLY_SER) + + override def dependencies = List(parent) + + override def slideTime = parent.slideTime + + override val mustCheckpoint = true + + override def compute(validTime: Time): Option[RDD[(K, S)]] = { + + // Try to get the previous state RDD + getOrCompute(validTime - slideTime) match { + + case Some(prevStateRDD) => { // If previous state RDD exists + + // Try to get the parent RDD + parent.getOrCompute(validTime) match { + case Some(parentRDD) => { // If parent RDD exists, then compute as usual + + // Define the function for the mapPartition operation on cogrouped RDD; + // first map the cogrouped tuple to tuples of required type, + // and then apply the update function + val updateFuncLocal = updateFunc + val finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => { + val i = iterator.map(t => { + (t._1, t._2._1, t._2._2.headOption) + }) + updateFuncLocal(i) + } + val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) + val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) + //logDebug("Generating state RDD for time " + validTime) + return Some(stateRDD) + } + case None => { // If parent RDD does not exist, then return old state RDD + return Some(prevStateRDD) + } + } + } + + case None => { // If previous session RDD does not exist (first input data) + + // Try to get the parent RDD + parent.getOrCompute(validTime) match { + case Some(parentRDD) => { // If parent RDD exists, then compute as usual + + // Define the function for the mapPartition operation on grouped RDD; + // first map the grouped tuple to tuples of required type, + // and then apply the update function + val updateFuncLocal = updateFunc + val finalFunc = (iterator: Iterator[(K, Seq[V])]) => { + updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, None))) + } + + val groupedRDD = parentRDD.groupByKey(partitioner) + val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) + //logDebug("Generating state RDD for time " + validTime + " (first)") + return Some(sessionRDD) + } + case None => { // If parent RDD does not exist, then nothing to do! + //logDebug("Not generating state RDD (no previous state, no parent)") + return None + } + } + } + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala new file mode 100644 index 0000000000..0337579514 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala @@ -0,0 +1,19 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.streaming.{DStream, Time} + +private[streaming] +class TransformedDStream[T: ClassManifest, U: ClassManifest] ( + parent: DStream[T], + transformFunc: (RDD[T], Time) => RDD[U] + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(transformFunc(_, validTime)) + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala new file mode 100644 index 0000000000..f1efb2ae72 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala @@ -0,0 +1,39 @@ +package spark.streaming.dstream + +import spark.streaming.{DStream, Time} +import spark.RDD +import collection.mutable.ArrayBuffer +import spark.rdd.UnionRDD + +class UnionDStream[T: ClassManifest](parents: Array[DStream[T]]) + extends DStream[T](parents.head.ssc) { + + if (parents.length == 0) { + throw new IllegalArgumentException("Empty array of parents") + } + + if (parents.map(_.ssc).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different StreamingContexts") + } + + if (parents.map(_.slideTime).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different slide times") + } + + override def dependencies = parents.toList + + override def slideTime: Time = parents.head.slideTime + + override def compute(validTime: Time): Option[RDD[T]] = { + val rdds = new ArrayBuffer[RDD[T]]() + parents.map(_.getOrCompute(validTime)).foreach(_ match { + case Some(rdd) => rdds += rdd + case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime) + }) + if (rdds.size > 0) { + Some(new UnionRDD(ssc.sc, rdds)) + } else { + None + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala new file mode 100644 index 0000000000..4b2621c497 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala @@ -0,0 +1,40 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.rdd.UnionRDD +import spark.storage.StorageLevel +import spark.streaming.{Interval, Time, DStream} + + +class WindowedDStream[T: ClassManifest]( + parent: DStream[T], + _windowTime: Time, + _slideTime: Time) + extends DStream[T](parent.ssc) { + + if (!_windowTime.isMultipleOf(parent.slideTime)) + throw new Exception("The window duration of WindowedDStream (" + _slideTime + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") + + if (!_slideTime.isMultipleOf(parent.slideTime)) + throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") + + parent.persist(StorageLevel.MEMORY_ONLY_SER) + + def windowTime: Time = _windowTime + + override def dependencies = List(parent) + + override def slideTime: Time = _slideTime + + override def parentRememberDuration: Time = rememberDuration + windowTime + + override def compute(validTime: Time): Option[RDD[T]] = { + val currentWindow = Interval(validTime - windowTime + parent.slideTime, validTime) + Some(new UnionRDD(ssc.sc, parent.slice(currentWindow))) + } +} + + + diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala index 7c4ee3b34c..dfaaf03f03 100644 --- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala @@ -25,7 +25,7 @@ object GrepRaw { val rawStreams = (1 to numStreams).map(_ => ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray - val union = new UnionDStream(rawStreams) + val union = ssc.union(rawStreams) union.filter(_.contains("Alice")).count().foreach(r => println("Grep count: " + r.collect().mkString)) ssc.start() diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala index 182dfd8a52..338834bc3c 100644 --- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala @@ -34,7 +34,7 @@ object TopKWordCountRaw { val lines = (1 to numStreams).map(_ => { ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2) }) - val union = new UnionDStream(lines.toArray) + val union = ssc.union(lines) val counts = union.mapPartitions(splitAndCountPartitions) val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10) val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k)) diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala index 9bcd30f4d7..d93335a8ce 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala @@ -33,7 +33,7 @@ object WordCountRaw { val lines = (1 to numStreams).map(_ => { ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2) }) - val union = new UnionDStream(lines.toArray) + val union = ssc.union(lines) val counts = union.mapPartitions(splitAndCountPartitions) val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10) windowedCounts.foreach(r => println("# unique words = " + r.count())) diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala deleted file mode 100644 index 7c642d4802..0000000000 --- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala +++ /dev/null @@ -1,193 +0,0 @@ -package spark.streaming - -import java.util.Properties -import java.util.concurrent.Executors -import kafka.consumer._ -import kafka.message.{Message, MessageSet, MessageAndMetadata} -import kafka.serializer.StringDecoder -import kafka.utils.{Utils, ZKGroupTopicDirs} -import kafka.utils.ZkUtils._ -import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ -import spark._ -import spark.RDD -import spark.storage.StorageLevel - -// Key for a specific Kafka Partition: (broker, topic, group, part) -case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int) -// NOT USED - Originally intended for fault-tolerance -// Metadata for a Kafka Stream that it sent to the Master -case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long]) -// NOT USED - Originally intended for fault-tolerance -// Checkpoint data specific to a KafkaInputDstream -case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], - savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds) - -/** - * Input stream that pulls messages form a Kafka Broker. - * - * @param host Zookeper hostname. - * @param port Zookeper port. - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param initialOffsets Optional initial offsets for each of the partitions to consume. - * By default the value is pulled from zookeper. - * @param storageLevel RDD storage level. - */ -class KafkaInputDStream[T: ClassManifest]( - @transient ssc_ : StreamingContext, - host: String, - port: Int, - groupId: String, - topics: Map[String, Int], - initialOffsets: Map[KafkaPartitionKey, Long], - storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_ ) with Logging { - - // Metadata that keeps track of which messages have already been consumed. - var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]() - - /* NOT USED - Originally intended for fault-tolerance - - // In case of a failure, the offets for a particular timestamp will be restored. - @transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null - - - override protected[streaming] def addMetadata(metadata: Any) { - metadata match { - case x : KafkaInputDStreamMetadata => - savedOffsets(x.timestamp) = x.data - // TOOD: Remove logging - logInfo("New saved Offsets: " + savedOffsets) - case _ => logInfo("Received unknown metadata: " + metadata.toString) - } - } - - override protected[streaming] def updateCheckpointData(currentTime: Time) { - super.updateCheckpointData(currentTime) - if(savedOffsets.size > 0) { - // Find the offets that were stored before the checkpoint was initiated - val key = savedOffsets.keys.toList.sortWith(_ < _).filter(_ < currentTime.millis).last - val latestOffsets = savedOffsets(key) - logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString) - checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets) - // TODO: This may throw out offsets that are created after the checkpoint, - // but it's unlikely we'll need them. - savedOffsets.clear() - } - } - - override protected[streaming] def restoreCheckpointData() { - super.restoreCheckpointData() - logInfo("Restoring KafkaDStream checkpoint data.") - checkpointData match { - case x : KafkaDStreamCheckpointData => - restoredOffsets = x.savedOffsets - logInfo("Restored KafkaDStream offsets: " + savedOffsets) - } - } */ - - def createReceiver(): NetworkReceiver[T] = { - new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel) - .asInstanceOf[NetworkReceiver[T]] - } -} - -class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, - topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], - storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) { - - // Timeout for establishing a connection to Zookeper in ms. - val ZK_TIMEOUT = 10000 - - // Handles pushing data into the BlockManager - lazy protected val dataHandler = new DataHandler(this, storageLevel) - // Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset - lazy val offsets = HashMap[KafkaPartitionKey, Long]() - // Connection to Kafka - var consumerConnector : ZookeeperConsumerConnector = null - - def onStop() { - dataHandler.stop() - } - - def onStart() { - - // Starting the DataHandler that buffers blocks and pushes them into them BlockManager - dataHandler.start() - - // In case we are using multiple Threads to handle Kafka Messages - val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) - - val zooKeeperEndPoint = host + ":" + port - logInfo("Starting Kafka Consumer Stream with group: " + groupId) - logInfo("Initial offsets: " + initialOffsets.toString) - - // Zookeper connection properties - val props = new Properties() - props.put("zk.connect", zooKeeperEndPoint) - props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString) - props.put("groupid", groupId) - - // Create the connection to the cluster - logInfo("Connecting to Zookeper: " + zooKeeperEndPoint) - val consumerConfig = new ConsumerConfig(props) - consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector] - logInfo("Connected to " + zooKeeperEndPoint) - - // Reset the Kafka offsets in case we are recovering from a failure - resetOffsets(initialOffsets) - - // Create Threads for each Topic/Message Stream we are listening - val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder()) - - // Start the messages handler for each partition - topicMessageStreams.values.foreach { streams => - streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } - } - - } - - // Overwrites the offets in Zookeper. - private def resetOffsets(offsets: Map[KafkaPartitionKey, Long]) { - offsets.foreach { case(key, offset) => - val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic) - val partitionName = key.brokerId + "-" + key.partId - updatePersistentPath(consumerConnector.zkClient, - topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString) - } - } - - // Handles Kafka Messages - private class MessageHandler(stream: KafkaStream[String]) extends Runnable { - def run() { - logInfo("Starting MessageHandler.") - stream.takeWhile { msgAndMetadata => - dataHandler += msgAndMetadata.message - - // Updating the offet. The key is (broker, topic, group, partition). - val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic, - groupId, msgAndMetadata.topicInfo.partition.partId) - val offset = msgAndMetadata.topicInfo.getConsumeOffset - offsets.put(key, offset) - // logInfo("Handled message: " + (key, offset).toString) - - // Keep on handling messages - true - } - } - } - - // NOT USED - Originally intended for fault-tolerance - // class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel) - // extends DataHandler[Any](receiver, storageLevel) { - - // override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = { - // // Creates a new Block with Kafka-specific Metadata - // new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap)) - // } - - // } - -} diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index 0d82b2f1ea..920388bba9 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -42,7 +42,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { val stateStreamCheckpointInterval = Seconds(1) // this ensure checkpointing occurs at least once - val firstNumBatches = (stateStreamCheckpointInterval.millis / batchDuration.millis) * 2 + val firstNumBatches = (stateStreamCheckpointInterval / batchDuration) * 2 val secondNumBatches = firstNumBatches // Setup the streams diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala index 5b414117fc..4aa428bf64 100644 --- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala @@ -133,7 +133,7 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter { // Get the output buffer val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] val output = outputStream.output - val waitTime = (batchDuration.millis * (numBatches.toDouble + 0.5)).toLong + val waitTime = (batchDuration.milliseconds * (numBatches.toDouble + 0.5)).toLong val startTime = System.currentTimeMillis() try { diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index ed9a659092..76b528bec3 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -1,5 +1,6 @@ package spark.streaming +import dstream.SparkFlumeEvent import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} import java.util.concurrent.{TimeUnit, ArrayBlockingQueue} diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala index a44f738957..28bdd53c3c 100644 --- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala @@ -1,12 +1,16 @@ package spark.streaming +import spark.streaming.dstream.{InputDStream, ForEachDStream} +import spark.streaming.util.ManualClock + import spark.{RDD, Logging} -import util.ManualClock + import collection.mutable.ArrayBuffer -import org.scalatest.FunSuite import collection.mutable.SynchronizedBuffer + import java.io.{ObjectInputStream, IOException} +import org.scalatest.FunSuite /** * This is a input stream just for the testsuites. This is equivalent to a checkpointable, @@ -70,6 +74,10 @@ trait TestSuiteBase extends FunSuite with Logging { def actuallyWait = false + /** + * Set up required DStreams to test the DStream operation using the two sequences + * of input collections. + */ def setupStreams[U: ClassManifest, V: ClassManifest]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V] @@ -90,6 +98,10 @@ trait TestSuiteBase extends FunSuite with Logging { ssc } + /** + * Set up required DStreams to test the binary operation using the sequence + * of input collections. + */ def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest]( input1: Seq[Seq[U]], input2: Seq[Seq[V]], @@ -173,6 +185,11 @@ trait TestSuiteBase extends FunSuite with Logging { output } + /** + * Verify whether the output values after running a DStream operation + * is same as the expected output values, by comparing the output + * collections either as lists (order matters) or sets (order does not matter) + */ def verifyOutput[V: ClassManifest]( output: Seq[Seq[V]], expectedOutput: Seq[Seq[V]], @@ -199,6 +216,10 @@ trait TestSuiteBase extends FunSuite with Logging { logInfo("Output verified successfully") } + /** + * Test unary DStream operation with a list of inputs, with number of + * batches to run same as the number of expected output values + */ def testOperation[U: ClassManifest, V: ClassManifest]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], @@ -208,6 +229,15 @@ trait TestSuiteBase extends FunSuite with Logging { testOperation[U, V](input, operation, expectedOutput, -1, useSet) } + /** + * Test unary DStream operation with a list of inputs + * @param input Sequence of input collections + * @param operation Binary DStream operation to be applied to the 2 inputs + * @param expectedOutput Sequence of expected output collections + * @param numBatches Number of batches to run the operation for + * @param useSet Compare the output values with the expected output values + * as sets (order matters) or as lists (order does not matter) + */ def testOperation[U: ClassManifest, V: ClassManifest]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], @@ -221,6 +251,10 @@ trait TestSuiteBase extends FunSuite with Logging { verifyOutput[V](output, expectedOutput, useSet) } + /** + * Test binary DStream operation with two lists of inputs, with number of + * batches to run same as the number of expected output values + */ def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest]( input1: Seq[Seq[U]], input2: Seq[Seq[V]], @@ -231,6 +265,16 @@ trait TestSuiteBase extends FunSuite with Logging { testOperation[U, V, W](input1, input2, operation, expectedOutput, -1, useSet) } + /** + * Test binary DStream operation with two lists of inputs + * @param input1 First sequence of input collections + * @param input2 Second sequence of input collections + * @param operation Binary DStream operation to be applied to the 2 inputs + * @param expectedOutput Sequence of expected output collections + * @param numBatches Number of batches to run the operation for + * @param useSet Compare the output values with the expected output values + * as sets (order matters) or as lists (order does not matter) + */ def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest]( input1: Seq[Seq[U]], input2: Seq[Seq[V]], diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index 3e20e16708..4bc5229465 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -209,7 +209,7 @@ class WindowOperationsSuite extends TestSuiteBase { val expectedOutput = bigGroupByOutput.map(_.map(x => (x._1, x._2.toSet))) val windowTime = Seconds(2) val slideTime = Seconds(1) - val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { s.groupByKeyAndWindow(windowTime, slideTime) .map(x => (x._1, x._2.toSet)) @@ -223,7 +223,7 @@ class WindowOperationsSuite extends TestSuiteBase { val expectedOutput = Seq( Seq(1), Seq(2), Seq(3), Seq(3), Seq(1), Seq(0)) val windowTime = Seconds(2) val slideTime = Seconds(1) - val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt val operation = (s: DStream[Int]) => s.countByWindow(windowTime, slideTime) testOperation(input, operation, expectedOutput, numBatches, true) } @@ -233,7 +233,7 @@ class WindowOperationsSuite extends TestSuiteBase { val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3))) val windowTime = Seconds(2) val slideTime = Seconds(1) - val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { s.countByKeyAndWindow(windowTime, slideTime).map(x => (x._1, x._2.toInt)) } @@ -251,7 +251,7 @@ class WindowOperationsSuite extends TestSuiteBase { slideTime: Time = Seconds(1) ) { test("window - " + name) { - val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt val operation = (s: DStream[Int]) => s.window(windowTime, slideTime) testOperation(input, operation, expectedOutput, numBatches, true) } @@ -265,7 +265,7 @@ class WindowOperationsSuite extends TestSuiteBase { slideTime: Time = Seconds(1) ) { test("reduceByKeyAndWindow - " + name) { - val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist() } @@ -281,7 +281,7 @@ class WindowOperationsSuite extends TestSuiteBase { slideTime: Time = Seconds(1) ) { test("reduceByKeyAndWindowInv - " + name) { - val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime) .persist() -- cgit v1.2.3