diff options
Diffstat (limited to 'streaming')
38 files changed, 1703 insertions, 765 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index b9eb7f8ec4..7405c8b22e 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -6,6 +6,8 @@ import org.apache.hadoop.fs.{FileUtil, Path} import org.apache.hadoop.conf.Configuration import java.io._ +import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} +import java.util.concurrent.Executors private[streaming] @@ -38,32 +40,50 @@ class CheckpointWriter(checkpointDir: String) extends Logging { val conf = new Configuration() var fs = file.getFileSystem(conf) val maxAttempts = 3 + val executor = Executors.newFixedThreadPool(1) - def write(checkpoint: Checkpoint) { - // TODO: maybe do this in a different thread from the main stream execution thread - var attempts = 0 - while (attempts < maxAttempts) { - attempts += 1 - try { - logDebug("Saving checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'") - if (fs.exists(file)) { - val bkFile = new Path(file.getParent, file.getName + ".bk") - FileUtil.copy(fs, file, fs, bkFile, true, true, conf) - logDebug("Moved existing checkpoint file to " + bkFile) + class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { + def run() { + var attempts = 0 + val startTime = System.currentTimeMillis() + while (attempts < maxAttempts) { + attempts += 1 + try { + logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") + if (fs.exists(file)) { + val bkFile = new Path(file.getParent, file.getName + ".bk") + FileUtil.copy(fs, file, fs, bkFile, true, true, conf) + logDebug("Moved existing checkpoint file to " + bkFile) + } + val fos = fs.create(file) + fos.write(bytes) + fos.close() + fos.close() + val finishTime = System.currentTimeMillis(); + logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") + return + } catch { + case ioe: IOException => + logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe) } - val fos = fs.create(file) - val oos = new ObjectOutputStream(fos) - oos.writeObject(checkpoint) - oos.close() - logInfo("Checkpoint for time " + checkpoint.checkpointTime + " saved to file '" + file + "'") - fos.close() - return - } catch { - case ioe: IOException => - logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe) } + logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'") } - logError("Could not write checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'") + } + + def write(checkpoint: Checkpoint) { + val bos = new ByteArrayOutputStream() + val zos = new LZFOutputStream(bos) + val oos = new ObjectOutputStream(zos) + oos.writeObject(checkpoint) + oos.close() + bos.close() + executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) + } + + def stop() { + executor.shutdown() } } @@ -85,7 +105,8 @@ object CheckpointReader extends Logging { // of ObjectInputStream is used to explicitly use the current thread's default class // loader to find and load classes. This is a well know Java issue and has popped up // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) - val ois = new ObjectInputStreamWithLoader(fis, Thread.currentThread().getContextClassLoader) + val zis = new LZFInputStream(fis) + val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader) val cp = ois.readObject.asInstanceOf[Checkpoint] ois.close() fs.close() diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 0eb6aad187..e1be5ef51c 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -132,7 +132,7 @@ abstract class DStream[T: ClassManifest] ( // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger if (mustCheckpoint && checkpointDuration == null) { - checkpointDuration = slideDuration.max(Seconds(10)) + checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt logInfo("Checkpoint interval automatically set to " + checkpointDuration) } @@ -238,13 +238,15 @@ abstract class DStream[T: ClassManifest] ( dependencies.foreach(_.remember(parentRememberDuration)) } - /** This method checks whether the 'time' is valid wrt slideDuration for generating RDD */ + /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */ protected def isTimeValid(time: Time): Boolean = { if (!isInitialized) { throw new Exception (this + " has not been initialized") } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { + logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime)) false } else { + logInfo("Time " + time + " is valid") true } } @@ -292,7 +294,7 @@ abstract class DStream[T: ClassManifest] ( * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this - * (eg. ForEachDStream). + * to generate their own jobs. */ protected[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { @@ -308,19 +310,18 @@ abstract class DStream[T: ClassManifest] ( } /** - * Dereference RDDs that are older than rememberDuration. + * Clear metadata that are older than `rememberDuration` of this DStream. + * This is an internal method that should not be called directly. This default + * implementation clears the old generated RDDs. Subclasses of DStream may override + * this to clear their own metadata along with the generated RDDs. */ - protected[streaming] def forgetOldMetadata(time: Time) { + protected[streaming] def clearOldMetadata(time: Time) { var numForgotten = 0 - generatedRDDs.keys.foreach(t => { - if (t <= (time - rememberDuration)) { - generatedRDDs.remove(t) - numForgotten += 1 - logInfo("Forgot RDD of time " + t + " from " + this) - } - }) - logInfo("Forgot " + numForgotten + " RDDs from " + this) - dependencies.foreach(_.forgetOldMetadata(time)) + val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) + generatedRDDs --= oldRDDs.keys + logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " + + (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) + dependencies.foreach(_.clearOldMetadata(time)) } /* Adds metadata to the Stream while it is running. @@ -443,6 +444,15 @@ abstract class DStream[T: ClassManifest] ( def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _) /** + * Return a new DStream in which each RDD contains the counts of each distinct value in + * each RDD of this DStream. Hash partitioning is used to generate + * the RDDs with `numPartitions` partitions (Spark's default number of partitions if + * `numPartitions` not specified). + */ + def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] = + this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) + + /** * Apply a function to each RDD in this DStream. This is an output operator, so * this DStream will be registered as an output stream and therefore materialized. */ @@ -495,14 +505,16 @@ abstract class DStream[T: ClassManifest] ( } /** - * Return a new DStream which is computed based on windowed batches of this DStream. - * The new DStream generates RDDs with the same interval as this DStream. + * Return a new DStream in which each RDD contains all the elements in seen in a + * sliding window of time over this DStream. The new DStream generates RDDs with + * the same interval as this DStream. * @param windowDuration width of the window; must be a multiple of this DStream's interval. */ def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration) /** - * Return a new DStream which is computed based on windowed batches of this DStream. + * Return a new DStream in which each RDD contains all the elements in seen in a + * sliding window of time over this DStream. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -514,27 +526,39 @@ abstract class DStream[T: ClassManifest] ( } /** - * Return a new DStream which computed based on tumbling window on this DStream. - * This is equivalent to window(batchTime, batchTime). - * @param batchDuration tumbling window duration; must be a multiple of this DStream's - * batching interval - */ - def tumble(batchDuration: Duration): DStream[T] = window(batchDuration, batchDuration) - - /** * Return a new DStream in which each RDD has a single element generated by reducing all - * elements in a window over this DStream. windowDuration and slideDuration are as defined - * in the window() operation. This is equivalent to - * window(windowDuration, slideDuration).reduce(reduceFunc) + * elements in a sliding window over this DStream. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval */ def reduceByWindow( reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration ): DStream[T] = { - this.window(windowDuration, slideDuration).reduce(reduceFunc) + this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc) } + /** + * Return a new DStream in which each RDD has a single element generated by reducing all + * elements in a sliding window over this DStream. However, the reduction is done incrementally + * using the old window's reduced value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient than reduceByWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ def reduceByWindow( reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, @@ -548,14 +572,47 @@ abstract class DStream[T: ClassManifest] ( /** * Return a new DStream in which each RDD has a single element generated by counting the number - * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the - * window() operation. This is equivalent to window(windowDuration, slideDuration).count() + * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval */ def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = { this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration) } /** + * Return a new DStream in which each RDD contains the count of distinct elements in + * RDDs in a sliding window over this DStream. Hash partitioning is used to generate + * the RDDs with `numPartitions` partitions (Spark's default number of partitions if + * `numPartitions` not specified). + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream. + */ + def countByValueAndWindow( + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int = ssc.sc.defaultParallelism + ): DStream[(T, Long)] = { + + this.map(x => (x, 1L)).reduceByKeyAndWindow( + (x: Long, y: Long) => x + y, + (x: Long, y: Long) => x - y, + windowDuration, + slideDuration, + numPartitions, + (x: (T, Long)) => x._2 != 0L + ) + } + + /** * Return a new DStream by unifying data of another DStream with this DStream. * @param that Another DStream having the same slideDuration as this DStream. */ @@ -572,16 +629,21 @@ abstract class DStream[T: ClassManifest] ( * Return all the RDDs between 'fromTime' to 'toTime' (both included) */ def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { - val rdds = new ArrayBuffer[RDD[T]]() - var time = toTime.floor(slideDuration) - while (time >= zeroTime && time >= fromTime) { - getOrCompute(time) match { - case Some(rdd) => rdds += rdd - case None => //throw new Exception("Could not get RDD for time " + time) - } - time -= slideDuration + if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { + logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") + } + if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { + logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") } - rdds.toSeq + val alignedToTime = toTime.floor(slideDuration) + val alignedFromTime = fromTime.floor(slideDuration) + + logInfo("Slicing from " + fromTime + " to " + toTime + + " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") + + alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { + if (time >= zeroTime) getOrCompute(time) else None + }) } /** diff --git a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala index a375980b84..6b0fade7c6 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala @@ -87,7 +87,7 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) } override def toString() = { - "[\n" + checkpointFiles.size + "\n" + checkpointFiles.mkString("\n") + "\n]" + "[\n" + checkpointFiles.size + " checkpoint files \n" + checkpointFiles.mkString("\n") + "\n]" } } diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index d5a5496839..adb7f3a24d 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -11,17 +11,20 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() - private[streaming] var zeroTime: Time = null - private[streaming] var batchDuration: Duration = null - private[streaming] var rememberDuration: Duration = null - private[streaming] var checkpointInProgress = false + var rememberDuration: Duration = null + var checkpointInProgress = false - private[streaming] def start(time: Time) { + var zeroTime: Time = null + var startTime: Time = null + var batchDuration: Duration = null + + def start(time: Time) { this.synchronized { if (zeroTime != null) { throw new Exception("DStream graph computation already started") } zeroTime = time + startTime = time outputStreams.foreach(_.initialize(zeroTime)) outputStreams.foreach(_.remember(rememberDuration)) outputStreams.foreach(_.validate) @@ -29,19 +32,23 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { } } - private[streaming] def stop() { + def restart(time: Time) { + this.synchronized { startTime = time } + } + + def stop() { this.synchronized { inputStreams.par.foreach(_.stop()) } } - private[streaming] def setContext(ssc: StreamingContext) { + def setContext(ssc: StreamingContext) { this.synchronized { outputStreams.foreach(_.setContext(ssc)) } } - private[streaming] def setBatchDuration(duration: Duration) { + def setBatchDuration(duration: Duration) { this.synchronized { if (batchDuration != null) { throw new Exception("Batch duration already set as " + batchDuration + @@ -51,59 +58,68 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { batchDuration = duration } - private[streaming] def remember(duration: Duration) { + def remember(duration: Duration) { this.synchronized { if (rememberDuration != null) { throw new Exception("Batch duration already set as " + batchDuration + ". cannot set it again.") } + rememberDuration = duration } - rememberDuration = duration } - private[streaming] def addInputStream(inputStream: InputDStream[_]) { + def addInputStream(inputStream: InputDStream[_]) { this.synchronized { inputStream.setGraph(this) inputStreams += inputStream } } - private[streaming] def addOutputStream(outputStream: DStream[_]) { + def addOutputStream(outputStream: DStream[_]) { this.synchronized { outputStream.setGraph(this) outputStreams += outputStream } } - private[streaming] def getInputStreams() = this.synchronized { inputStreams.toArray } + def getInputStreams() = this.synchronized { inputStreams.toArray } - private[streaming] def getOutputStreams() = this.synchronized { outputStreams.toArray } + def getOutputStreams() = this.synchronized { outputStreams.toArray } - private[streaming] def generateRDDs(time: Time): Seq[Job] = { + def generateJobs(time: Time): Seq[Job] = { this.synchronized { - outputStreams.flatMap(outputStream => outputStream.generateJob(time)) + logInfo("Generating jobs for time " + time) + val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time)) + logInfo("Generated " + jobs.length + " jobs for time " + time) + jobs } } - private[streaming] def forgetOldRDDs(time: Time) { + def clearOldMetadata(time: Time) { this.synchronized { - outputStreams.foreach(_.forgetOldMetadata(time)) + logInfo("Clearing old metadata for time " + time) + outputStreams.foreach(_.clearOldMetadata(time)) + logInfo("Cleared old metadata for time " + time) } } - private[streaming] def updateCheckpointData(time: Time) { + def updateCheckpointData(time: Time) { this.synchronized { + logInfo("Updating checkpoint data for time " + time) outputStreams.foreach(_.updateCheckpointData(time)) + logInfo("Updated checkpoint data for time " + time) } } - private[streaming] def restoreCheckpointData() { + def restoreCheckpointData() { this.synchronized { + logInfo("Restoring checkpoint data") outputStreams.foreach(_.restoreCheckpointData()) + logInfo("Restored checkpoint data") } } - private[streaming] def validate() { + def validate() { this.synchronized { assert(batchDuration != null, "Batch duration has not been set") //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low") diff --git a/streaming/src/main/scala/spark/streaming/Duration.scala b/streaming/src/main/scala/spark/streaming/Duration.scala index e4dc579a17..ee26206e24 100644 --- a/streaming/src/main/scala/spark/streaming/Duration.scala +++ b/streaming/src/main/scala/spark/streaming/Duration.scala @@ -16,7 +16,7 @@ case class Duration (private val millis: Long) { def * (times: Int): Duration = new Duration(millis * times) - def / (that: Duration): Long = millis / that.millis + def / (that: Duration): Double = millis.toDouble / that.millis.toDouble def isMultipleOf(that: Duration): Boolean = (this.millis % that.millis == 0) diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala index dc21dfb722..6a8b81760e 100644 --- a/streaming/src/main/scala/spark/streaming/Interval.scala +++ b/streaming/src/main/scala/spark/streaming/Interval.scala @@ -30,6 +30,7 @@ class Interval(val beginTime: Time, val endTime: Time) { override def toString = "[" + beginTime + ", " + endTime + "]" } +private[streaming] object Interval { def currentInterval(duration: Duration): Interval = { val time = new Time(System.currentTimeMillis) diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index 5acdd01e58..7696c4a592 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -15,8 +15,8 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { SparkEnv.set(ssc.env) try { val timeTaken = job.run() - logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format( - (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, timeTaken / 1000.0)) + logInfo("Total delay: %.5f s for job %s of time %s (execution: %.5f s)".format( + (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, job.time.milliseconds, timeTaken / 1000.0)) } catch { case e: Exception => logError("Running " + job + " failed", e) @@ -38,19 +38,29 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { logInfo("Added " + job + " to queue") } + def stop() { + jobExecutor.shutdown() + } + private def clearJob(job: Job) { + var timeCleared = false + val time = job.time jobs.synchronized { - val jobsOfTime = jobs.get(job.time) + val jobsOfTime = jobs.get(time) if (jobsOfTime.isDefined) { jobsOfTime.get -= job if (jobsOfTime.get.isEmpty) { - jobs -= job.time + jobs -= time + timeCleared = true } } else { throw new Exception("Job finished for time " + job.time + " but time does not exist in jobs") } } + if (timeCleared) { + ssc.scheduler.clearOldMetadata(time) + } } def getPendingTimes(): Array[Time] = { diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index e4152f3a61..64972fd5cd 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -4,6 +4,7 @@ import spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver} import spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError} import spark.Logging import spark.SparkEnv +import spark.SparkContext._ import scala.collection.mutable.HashMap import scala.collection.mutable.Queue @@ -23,7 +24,7 @@ private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) ext */ private[streaming] class NetworkInputTracker( - @transient ssc: StreamingContext, + @transient ssc: StreamingContext, @transient networkInputStreams: Array[NetworkInputDStream[_]]) extends Logging { @@ -65,12 +66,12 @@ class NetworkInputTracker( def receive = { case RegisterReceiver(streamId, receiverActor) => { if (!networkInputStreamMap.contains(streamId)) { - throw new Exception("Register received for unexpected id " + streamId) + throw new Exception("Register received for unexpected id " + streamId) } receiverInfo += ((streamId, receiverActor)) logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address) sender ! true - } + } case AddBlocks(streamId, blockIds, metadata) => { val tmp = receivedBlockIds.synchronized { if (!receivedBlockIds.contains(streamId)) { @@ -85,7 +86,7 @@ class NetworkInputTracker( } case DeregisterReceiver(streamId, msg) => { receiverInfo -= streamId - logInfo("De-registered receiver for network stream " + streamId + logError("De-registered receiver for network stream " + streamId + " with message " + msg) //TODO: Do something about the corresponding NetworkInputDStream } @@ -95,8 +96,8 @@ class NetworkInputTracker( /** This thread class runs all the receivers on the cluster. */ class ReceiverExecutor extends Thread { val env = ssc.env - - override def run() { + + override def run() { try { SparkEnv.set(env) startReceivers() @@ -113,7 +114,7 @@ class NetworkInputTracker( */ def startReceivers() { val receivers = networkInputStreams.map(nis => { - val rcvr = nis.createReceiver() + val rcvr = nis.getReceiver() rcvr.setStreamId(nis.id) rcvr }) @@ -138,10 +139,14 @@ class NetworkInputTracker( } iterator.next().start() } + // Run the dummy Spark job to ensure that all slaves have registered. + // This avoids all the receivers to be scheduled on the same node. + //ssc.sparkContext.makeRDD(1 to 100, 100).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() + // Distribute the receivers and start them - ssc.sc.runJob(tempRDD, startReceiver) + ssc.sparkContext.runJob(tempRDD, startReceiver) } - + /** Stops the receivers. */ def stopReceivers() { // Signal the receivers to stop diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index fbcf061126..5a2dd46fa0 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -18,15 +18,15 @@ import org.apache.hadoop.conf.Configuration class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)]) extends Serializable { - - def ssc = self.ssc + + private[streaming] def ssc = self.ssc private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { new HashPartitioner(numPartitions) } /** - * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. */ def groupByKey(): DStream[(K, Seq[V])] = { @@ -34,7 +34,7 @@ extends Serializable { } /** - * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. */ def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = { @@ -42,7 +42,7 @@ extends Serializable { } /** - * Create a new DStream by applying `groupByKey` on each RDD. The supplied [[spark.Partitioner]] + * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[spark.Partitioner]] * is used to control the partitioning of each RDD. */ def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { @@ -54,7 +54,7 @@ extends Serializable { } /** - * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are * merged using the associative reduce function. Hash partitioning is used to generate the RDDs * with Spark's default number of partitions. */ @@ -63,7 +63,7 @@ extends Serializable { } /** - * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs * with `numPartitions` partitions. */ @@ -72,7 +72,7 @@ extends Serializable { } /** - * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are * merged using the supplied reduce function. [[spark.Partitioner]] is used to control the * partitioning of each RDD. */ @@ -82,7 +82,7 @@ extends Serializable { } /** - * Combine elements of each key in DStream's RDDs using custom function. This is similar to the + * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the * combineByKey for RDDs. Please refer to combineByKey in [[spark.PairRDDFunctions]] for more * information. */ @@ -95,15 +95,7 @@ extends Serializable { } /** - * Create a new DStream by counting the number of values of each key in each RDD. Hash - * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions. - */ - def countByKey(numPartitions: Int = self.ssc.sc.defaultParallelism): DStream[(K, Long)] = { - self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) - } - - /** - * Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to + * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with * Spark's default number of partitions. @@ -115,7 +107,7 @@ extends Serializable { } /** - * Create a new DStream by applying `groupByKey` over a sliding window. Similar to + * Return a new DStream by applying `groupByKey` over a sliding window. Similar to * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. * @param windowDuration width of the window; must be a multiple of this DStream's @@ -129,7 +121,7 @@ extends Serializable { } /** - * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream. * Similar to `DStream.groupByKey()`, but applies it over a sliding window. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param windowDuration width of the window; must be a multiple of this DStream's @@ -137,7 +129,8 @@ extends Serializable { * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param numPartitions Number of partitions of each RDD in the new DStream. + * @param numPartitions number of partitions of each RDD in the new DStream; if not specified + * then Spark's default number of partitions will be used */ def groupByKeyAndWindow( windowDuration: Duration, @@ -155,7 +148,7 @@ extends Serializable { * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. */ def groupByKeyAndWindow( windowDuration: Duration, @@ -166,7 +159,7 @@ extends Serializable { } /** - * Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * Return a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate * the RDDs with Spark's default number of partitions. @@ -182,7 +175,7 @@ extends Serializable { } /** - * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. * @param reduceFunc associative reduce function @@ -201,7 +194,7 @@ extends Serializable { } /** - * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. * @param reduceFunc associative reduce function @@ -210,10 +203,10 @@ extends Serializable { * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param numPartitions Number of partitions of each RDD in the new DStream. + * @param numPartitions number of partitions of each RDD in the new DStream. */ def reduceByKeyAndWindow( - reduceFunc: (V, V) => V, + reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration, numPartitions: Int @@ -222,7 +215,7 @@ extends Serializable { } /** - * Create a new DStream by applying `reduceByKey` over a sliding window. Similar to + * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to * `DStream.reduceByKey()`, but applies it over a sliding window. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's @@ -230,7 +223,8 @@ extends Serializable { * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner partitioner for controlling the partitioning of each RDD + * in the new DStream. */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, @@ -245,118 +239,78 @@ extends Serializable { } /** - * Create a new DStream by reducing over a using incremental computation. - * The reduced value of over a new window is calculated using the old window's reduce value : + * Return a new DStream by applying incremental `reduceByKey` over a sliding window. + * The reduced value of over a new window is calculated using the old window's reduced value : * 1. reduce the new values that entered the window (e.g., adding new counts) + * * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) - * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * + * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. * @param reduceFunc associative reduce function - * @param invReduceFunc inverse function + * @param invReduceFunc inverse reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval + * @param filterFunc Optional function to filter expired key-value pairs; + * only pairs that satisfy the function are retained */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, - slideDuration: Duration + slideDuration: Duration = self.slideDuration, + numPartitions: Int = ssc.sc.defaultParallelism, + filterFunc: ((K, V)) => Boolean = null ): DStream[(K, V)] = { reduceByKeyAndWindow( - reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner()) - } - - /** - * Create a new DStream by reducing over a using incremental computation. - * The reduced value of over a new window is calculated using the old window's reduce value : - * 1. reduce the new values that entered the window (e.g., adding new counts) - * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) - * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. - * However, it is applicable to only "invertible reduce functions". - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - * @param reduceFunc associative reduce function - * @param invReduceFunc inverse function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param numPartitions Number of partitions of each RDD in the new DStream. - */ - def reduceByKeyAndWindow( - reduceFunc: (V, V) => V, - invReduceFunc: (V, V) => V, - windowDuration: Duration, - slideDuration: Duration, - numPartitions: Int - ): DStream[(K, V)] = { - - reduceByKeyAndWindow( - reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) + reduceFunc, invReduceFunc, windowDuration, + slideDuration, defaultPartitioner(numPartitions), filterFunc + ) } /** - * Create a new DStream by reducing over a using incremental computation. - * The reduced value of over a new window is calculated using the old window's reduce value : + * Return a new DStream by applying incremental `reduceByKey` over a sliding window. + * The reduced value of over a new window is calculated using the old window's reduced value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) - * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". - * @param reduceFunc associative reduce function - * @param invReduceFunc inverse function + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. + * @param filterFunc Optional function to filter expired key-value pairs; + * only pairs that satisfy the function are retained */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration, - partitioner: Partitioner + partitioner: Partitioner, + filterFunc: ((K, V)) => Boolean ): DStream[(K, V)] = { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) + val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None new ReducedWindowedDStream[K, V]( - self, cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, partitioner) - } - - /** - * Create a new DStream by counting the number of values for each key over a window. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param numPartitions Number of partitions of each RDD in the new DStream. - */ - def countByKeyAndWindow( - windowDuration: Duration, - slideDuration: Duration, - numPartitions: Int = self.ssc.sc.defaultParallelism - ): DStream[(K, Long)] = { - - self.map(x => (x._1, 1L)).reduceByKeyAndWindow( - (x: Long, y: Long) => x + y, - (x: Long, y: Long) => x - y, - windowDuration, - slideDuration, - numPartitions + self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc, + windowDuration, slideDuration, partitioner ) } /** - * Create a new "state" DStream where the state for each key is updated by applying + * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. * @param updateFunc State update function. If `this` function returns None, then @@ -370,7 +324,7 @@ extends Serializable { } /** - * Create a new "state" DStream where the state for each key is updated by applying + * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param updateFunc State update function. If `this` function returns None, then @@ -405,7 +359,7 @@ extends Serializable { } /** - * Create a new "state" DStream where the state for each key is updated by applying + * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * [[spark.Paxrtitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then @@ -447,7 +401,7 @@ extends Serializable { } /** - * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this` * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that * key in both RDDs. Partitioner is used to partition each generated RDD. */ diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index b77986a3ba..1c4b22a898 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -9,11 +9,8 @@ class Scheduler(ssc: StreamingContext) extends Logging { initLogging() - val graph = ssc.graph - val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt val jobManager = new JobManager(ssc, concurrentJobs) - val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { new CheckpointWriter(ssc.checkpointDir) } else { @@ -23,57 +20,93 @@ class Scheduler(ssc: StreamingContext) extends Logging { val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock") val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, - longTime => generateRDDs(new Time(longTime))) + longTime => generateJobs(new Time(longTime))) + val graph = ssc.graph + var latestTime: Time = null - def start() { - // If context was started from checkpoint, then restart timer such that - // this timer's triggers occur at the same time as the original timer. - // Otherwise just start the timer from scratch, and initialize graph based - // on this first trigger time of the timer. + def start() = synchronized { if (ssc.isCheckpointPresent) { - // If manual clock is being used for testing, then - // either set the manual clock to the last checkpointed time, - // or if the property is defined set it to that time - if (clock.isInstanceOf[ManualClock]) { - val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds - val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong - clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) - } - // Reschedule the batches that were received but not processed before failure - ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time)) - // Restart the timer - timer.restart(graph.zeroTime.milliseconds) - logInfo("Scheduler's timer restarted") + restart() } else { - val firstTime = new Time(timer.start()) - graph.start(firstTime - ssc.graph.batchDuration) - logInfo("Scheduler's timer started") + startFirstTime() } logInfo("Scheduler started") } - def stop() { + def stop() = synchronized { timer.stop() - graph.stop() + jobManager.stop() + if (checkpointWriter != null) checkpointWriter.stop() + ssc.graph.stop() logInfo("Scheduler stopped") } - - private def generateRDDs(time: Time) { + + private def startFirstTime() { + val startTime = new Time(timer.getStartTime()) + graph.start(startTime - graph.batchDuration) + timer.start(startTime.milliseconds) + logInfo("Scheduler's timer started at " + startTime) + } + + private def restart() { + + // If manual clock is being used for testing, then + // either set the manual clock to the last checkpointed time, + // or if the property is defined set it to that time + if (clock.isInstanceOf[ManualClock]) { + val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds + val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong + clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) + } + + val batchDuration = ssc.graph.batchDuration + + // Batches when the master was down, that is, + // between the checkpoint and current restart time + val checkpointTime = ssc.initialCheckpoint.checkpointTime + val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds)) + val downTimes = checkpointTime.until(restartTime, batchDuration) + logInfo("Batches during down time: " + downTimes.mkString(", ")) + + // Batches that were unprocessed before failure + val pendingTimes = ssc.initialCheckpoint.pendingTimes + logInfo("Batches pending processing: " + pendingTimes.mkString(", ")) + // Reschedule jobs for these times + val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) + logInfo("Batches to reschedule: " + timesToReschedule.mkString(", ")) + timesToReschedule.foreach(time => + graph.generateJobs(time).foreach(jobManager.runJob) + ) + + // Restart the timer + timer.start(restartTime.milliseconds) + logInfo("Scheduler's timer restarted at " + restartTime) + } + + /** Generate jobs and perform checkpoint for the given `time`. */ + def generateJobs(time: Time) { SparkEnv.set(ssc.env) logInfo("\n-----------------------------------------------------\n") - graph.generateRDDs(time).foreach(jobManager.runJob) - graph.forgetOldRDDs(time) + graph.generateJobs(time).foreach(jobManager.runJob) + latestTime = time + doCheckpoint(time) + } + + /** + * Clear old metadata assuming jobs of `time` have finished processing. + * And also perform checkpoint. + */ + def clearOldMetadata(time: Time) { + ssc.graph.clearOldMetadata(time) doCheckpoint(time) - logInfo("Generated RDDs for time " + time) } - private def doCheckpoint(time: Time) { + /** Perform checkpoint for the give `time`. */ + def doCheckpoint(time: Time) = synchronized { if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { - val startTime = System.currentTimeMillis() + logInfo("Checkpointing graph for time " + time) ssc.graph.updateCheckpointData(time) checkpointWriter.write(new Checkpoint(ssc, time)) - val stopTime = System.currentTimeMillis() - logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms") } } } diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 8cfbec51d2..a9684c5772 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -1,10 +1,17 @@ package spark.streaming +import akka.actor.Props +import akka.actor.SupervisorStrategy + import spark.streaming.dstream._ import spark.{RDD, Logging, SparkEnv, SparkContext} +import spark.streaming.receivers.ActorReceiver +import spark.streaming.receivers.ReceiverSupervisorStrategy import spark.storage.StorageLevel import spark.util.MetadataCleaner +import spark.streaming.receivers.ActorReceiver + import scala.collection.mutable.Queue @@ -17,6 +24,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import java.util.UUID +import twitter4j.Status /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -30,14 +38,14 @@ class StreamingContext private ( ) extends Logging { /** - * Creates a StreamingContext using an existing SparkContext. + * Create a StreamingContext using an existing SparkContext. * @param sparkContext Existing SparkContext * @param batchDuration The time interval at which streaming data will be divided into batches */ def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration) /** - * Creates a StreamingContext by providing the details necessary for creating a new SparkContext. + * Create a StreamingContext by providing the details necessary for creating a new SparkContext. * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). * @param frameworkName A name for your job, to display on the cluster web UI * @param batchDuration The time interval at which streaming data will be divided into batches @@ -46,7 +54,7 @@ class StreamingContext private ( this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration) /** - * Re-creates a StreamingContext from a checkpoint file. + * Re-create a StreamingContext from a checkpoint file. * @param path Path either to the directory that was specified as the checkpoint directory, or * to the checkpoint file 'graph' or 'graph.bk'. */ @@ -101,12 +109,12 @@ class StreamingContext private ( protected[streaming] var scheduler: Scheduler = null /** - * Returns the associated Spark context + * Return the associated Spark context */ def sparkContext = sc /** - * Sets each DStreams in this context to remember RDDs it generated in the last given duration. + * Set each DStreams in this context to remember RDDs it generated in the last given duration. * DStreams remember RDDs only for a limited duration of time and releases them for garbage * collection. This method allows the developer to specify how to long to remember the RDDs ( * if the developer wishes to query old data outside the DStream computation). @@ -117,19 +125,16 @@ class StreamingContext private ( } /** - * Sets the context to periodically checkpoint the DStream operations for master - * fault-tolerance. By default, the graph will be checkpointed every batch interval. + * Set the context to periodically checkpoint the DStream operations for master + * fault-tolerance. The graph will be checkpointed every batch interval. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored - * @param interval checkpoint interval */ - def checkpoint(directory: String, interval: Duration = null) { + def checkpoint(directory: String) { if (directory != null) { sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory)) checkpointDir = directory - checkpointDuration = interval } else { checkpointDir = null - checkpointDuration = null } } @@ -140,6 +145,36 @@ class StreamingContext private ( protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement() /** + * Create an input stream with any arbitrary user implemented network receiver. + * @param receiver Custom implementation of NetworkReceiver + */ + def networkStream[T: ClassManifest]( + receiver: NetworkReceiver[T]): DStream[T] = { + val inputStream = new PluggableInputDStream[T](this, + receiver) + graph.addInputStream(inputStream) + inputStream + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel RDD storage level. Defaults to memory-only. + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def actorStream[T: ClassManifest]( + props: Props, name: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = { + networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) + } + + /** * Create an input stream that pulls messages form a Kafka Broker. * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. @@ -147,7 +182,8 @@ class StreamingContext private ( * in its own thread. * @param initialOffsets Optional initial offsets for each of the partitions to consume. * By default the value is pulled from zookeper. - * @param storageLevel RDD storage level. Defaults to memory-only. + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ def kafkaStream[T: ClassManifest]( zkQuorum: String, @@ -162,24 +198,24 @@ class StreamingContext private ( } /** - * Create a input stream from network source hostname:port. Data is received using - * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited + * Create a input stream from TCP source hostname:port. Data is received using + * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited * lines. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ - def networkTextStream( + def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[String] = { - networkStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) + socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } /** - * Create a input stream from network source hostname:port. Data is received using + * Create a input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes it interepreted as object using the given * converter. * @param hostname Hostname to connect to for receiving data @@ -188,7 +224,7 @@ class StreamingContext private ( * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects received (after converting bytes to objects) */ - def networkStream[T: ClassManifest]( + def socketStream[T: ClassManifest]( hostname: String, port: Int, converter: (InputStream) => Iterator[T], @@ -200,7 +236,7 @@ class StreamingContext private ( } /** - * Creates a input stream from a Flume source. + * Create a input stream from a Flume source. * @param hostname Hostname of the slave machine to which the flume data will be sent * @param port Port of the slave machine to which the flume data will be sent * @param storageLevel Storage level to use for storing the received objects @@ -236,7 +272,7 @@ class StreamingContext private ( } /** - * Creates a input stream that monitors a Hadoop-compatible filesystem + * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. * File names starting with . are ignored. * @param directory HDFS directory to monitor for new file @@ -255,7 +291,7 @@ class StreamingContext private ( } /** - * Creates a input stream that monitors a Hadoop-compatible filesystem + * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. * @param directory HDFS directory to monitor for new file * @param filter Function to filter paths to process @@ -274,9 +310,8 @@ class StreamingContext private ( inputStream } - /** - * Creates a input stream that monitors a Hadoop-compatible filesystem + * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as text files (using key as LongWritable, value * as Text and input format as TextInputFormat). File names starting with . are ignored. * @param directory HDFS directory to monitor for new file @@ -286,7 +321,25 @@ class StreamingContext private ( } /** - * Creates an input stream from a queue of RDDs. In each batch, + * Create a input stream that returns tweets received from Twitter. + * @param username Twitter username + * @param password Twitter password + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + username: String, + password: String, + filters: Seq[String], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[Status] = { + val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel) + registerInputStream(inputStream) + inputStream + } + + /** + * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * @param queue Queue of RDDs * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval @@ -300,7 +353,7 @@ class StreamingContext private ( } /** - * Creates an input stream from a queue of RDDs. In each batch, + * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * @param queue Queue of RDDs * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval @@ -325,7 +378,7 @@ class StreamingContext private ( } /** - * Registers an input stream that will be started (InputDStream.start() called) to get the + * Register an input stream that will be started (InputDStream.start() called) to get the * input data. */ def registerInputStream(inputStream: InputDStream[_]) { @@ -333,7 +386,7 @@ class StreamingContext private ( } /** - * Registers an output stream that will be computed every interval + * Register an output stream that will be computed every interval */ def registerOutputStream(outputStream: DStream[_]) { graph.addOutputStream(outputStream) @@ -351,7 +404,7 @@ class StreamingContext private ( } /** - * Starts the execution of the streams. + * Start the execution of the streams. */ def start() { if (checkpointDir != null && checkpointDuration == null && graph != null) { @@ -379,7 +432,7 @@ class StreamingContext private ( } /** - * Stops the execution of the streams. + * Stop the execution of the streams. */ def stop() { try { diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala index 5daeb761dd..f14decf08b 100644 --- a/streaming/src/main/scala/spark/streaming/Time.scala +++ b/streaming/src/main/scala/spark/streaming/Time.scala @@ -37,6 +37,19 @@ case class Time(private val millis: Long) { def max(that: Time): Time = if (this > that) this else that + def until(that: Time, interval: Duration): Seq[Time] = { + (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_)) + } + + def to(that: Time, interval: Duration): Seq[Time] = { + (this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_)) + } + + override def toString: String = (millis.toString + " ms") +} + +object Time { + val ordering = Ordering.by((time: Time) => time.millis) }
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala index 2e7466b16c..30985b4ebc 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -36,7 +36,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM def cache(): JavaDStream[T] = dstream.cache() /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ - def persist(): JavaDStream[T] = dstream.cache() + def persist(): JavaDStream[T] = dstream.persist() /** Persist the RDDs of this DStream with the given storage level */ def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel) @@ -50,34 +50,27 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM } /** - * Return a new DStream which is computed based on windowed batches of this DStream. - * The new DStream generates RDDs with the same interval as this DStream. + * Return a new DStream in which each RDD contains all the elements in seen in a + * sliding window of time over this DStream. The new DStream generates RDDs with + * the same interval as this DStream. * @param windowDuration width of the window; must be a multiple of this DStream's interval. - * @return */ def window(windowDuration: Duration): JavaDStream[T] = dstream.window(windowDuration) /** - * Return a new DStream which is computed based on windowed batches of this DStream. - * @param windowDuration duration (i.e., width) of the window; - * must be a multiple of this DStream's interval + * Return a new DStream in which each RDD contains all the elements in seen in a + * sliding window of time over this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's interval + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval */ def window(windowDuration: Duration, slideDuration: Duration): JavaDStream[T] = dstream.window(windowDuration, slideDuration) /** - * Return a new DStream which computed based on tumbling window on this DStream. - * This is equivalent to window(batchDuration, batchDuration). - * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval - */ - def tumble(batchDuration: Duration): JavaDStream[T] = - dstream.tumble(batchDuration) - - /** * Return a new DStream by unifying data of another DStream with this DStream. * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream. */ diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index b93cb7865a..1c1ba05ff9 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -34,6 +34,26 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable def count(): JavaDStream[JLong] = dstream.count() /** + * Return a new DStream in which each RDD contains the counts of each distinct value in + * each RDD of this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + */ + def countByValue(): JavaPairDStream[T, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByValue()) + } + + /** + * Return a new DStream in which each RDD contains the counts of each distinct value in + * each RDD of this DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + * @param numPartitions number of partitions of each RDD in the new DStream. + */ + def countByValue(numPartitions: Int): JavaPairDStream[T, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByValue(numPartitions)) + } + + + /** * Return a new DStream in which each RDD has a single element generated by counting the number * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the * window() operation. This is equivalent to window(windowDuration, slideDuration).count() @@ -43,6 +63,39 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable } /** + * Return a new DStream in which each RDD contains the count of distinct elements in + * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration) + : JavaPairDStream[T, JLong] = { + JavaPairDStream.scalaToJavaLong( + dstream.countByValueAndWindow(windowDuration, slideDuration)) + } + + /** + * Return a new DStream in which each RDD contains the count of distinct elements in + * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream. + */ + def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) + : JavaPairDStream[T, JLong] = { + JavaPairDStream.scalaToJavaLong( + dstream.countByValueAndWindow(windowDuration, slideDuration, numPartitions)) + } + + /** * Return a new DStream in which each RDD is generated by applying glom() to each RDD of * this DStream. Applying glom() to an RDD coalesces all elements within each partition into * an array. @@ -114,8 +167,38 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable /** * Return a new DStream in which each RDD has a single element generated by reducing all - * elements in a window over this DStream. windowDuration and slideDuration are as defined in the - * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc) + * elements in a sliding window over this DStream. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByWindow( + reduceFunc: (T, T) => T, + windowDuration: Duration, + slideDuration: Duration + ): DStream[T] = { + dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration) + } + + + /** + * Return a new DStream in which each RDD has a single element generated by reducing all + * elements in a sliding window over this DStream. However, the reduction is done incrementally + * using the old window's reduced value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient than reduceByWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval */ def reduceByWindow( reduceFunc: JFunction2[T, T, T], diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index ef10c091ca..952ca657bf 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -25,17 +25,17 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( // Methods common to all DStream's // ======================================================================= - /** Returns a new DStream containing only the elements that satisfy a predicate. */ + /** Return a new DStream containing only the elements that satisfy a predicate. */ def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] = dstream.filter((x => f(x).booleanValue())) - /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ def cache(): JavaPairDStream[K, V] = dstream.cache() - /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ - def persist(): JavaPairDStream[K, V] = dstream.cache() + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def persist(): JavaPairDStream[K, V] = dstream.persist() - /** Persists the RDDs of this DStream with the given storage level */ + /** Persist the RDDs of this DStream with the given storage level */ def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel) /** Method that generates a RDD for the given Duration */ @@ -67,15 +67,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.window(windowDuration, slideDuration) /** - * Returns a new DStream which computed based on tumbling window on this DStream. - * This is equivalent to window(batchDuration, batchDuration). - * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval - */ - def tumble(batchDuration: Duration): JavaPairDStream[K, V] = - dstream.tumble(batchDuration) - - /** - * Returns a new DStream by unifying data of another DStream with this DStream. + * Return a new DStream by unifying data of another DStream with this DStream. * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream. */ def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] = @@ -86,21 +78,21 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( // ======================================================================= /** - * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. */ def groupByKey(): JavaPairDStream[K, JList[V]] = dstream.groupByKey().mapValues(seqAsJavaList _) /** - * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. */ def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] = dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _) /** - * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. + * Return a new DStream by applying `groupByKey` on each RDD of `this` DStream. * Therefore, the values for each key in `this` DStream's RDDs are grouped into a * single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]] * is used to control the partitioning of each RDD. @@ -109,7 +101,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.groupByKey(partitioner).mapValues(seqAsJavaList _) /** - * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are * merged using the associative reduce function. Hash partitioning is used to generate the RDDs * with Spark's default number of partitions. */ @@ -117,7 +109,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.reduceByKey(func) /** - * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs * with `numPartitions` partitions. */ @@ -125,7 +117,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.reduceByKey(func, numPartitions) /** - * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are * merged using the supplied reduce function. [[spark.Partitioner]] is used to control the * partitioning of each RDD. */ @@ -149,24 +141,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Create a new DStream by counting the number of values of each key in each RDD. Hash - * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions. - */ - def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = { - JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions)); - } - - - /** - * Create a new DStream by counting the number of values of each key in each RDD. Hash - * partitioning is used to generate the RDDs with the default number of partitions. - */ - def countByKey(): JavaPairDStream[K, JLong] = { - JavaPairDStream.scalaToJavaLong(dstream.countByKey()); - } - - /** - * Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to + * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with * Spark's default number of partitions. @@ -178,7 +153,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Create a new DStream by applying `groupByKey` over a sliding window. Similar to + * Return a new DStream by applying `groupByKey` over a sliding window. Similar to * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. * @param windowDuration width of the window; must be a multiple of this DStream's @@ -193,7 +168,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream. * Similar to `DStream.groupByKey()`, but applies it over a sliding window. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param windowDuration width of the window; must be a multiple of this DStream's @@ -210,7 +185,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream. * Similar to `DStream.groupByKey()`, but applies it over a sliding window. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -243,7 +218,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. * @param reduceFunc associative reduce function @@ -262,7 +237,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. * @param reduceFunc associative reduce function @@ -283,7 +258,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Create a new DStream by applying `reduceByKey` over a sliding window. Similar to + * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to * `DStream.reduceByKey()`, but applies it over a sliding window. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's @@ -303,7 +278,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Create a new DStream by reducing over a using incremental computation. + * Return a new DStream by reducing over a using incremental computation. * The reduced value of over a new window is calculated using the old window's reduce value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) @@ -328,7 +303,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Create a new DStream by reducing over a using incremental computation. + * Return a new DStream by applying incremental `reduceByKey` over a sliding window. * The reduced value of over a new window is calculated using the old window's reduce value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) @@ -342,25 +317,31 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param numPartitions Number of partitions of each RDD in the new DStream. + * @param numPartitions number of partitions of each RDD in the new DStream. + * @param filterFunc function to filter expired key-value pairs; + * only pairs that satisfy the function are retained + * set this to null if you do not want to filter */ def reduceByKeyAndWindow( reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], windowDuration: Duration, slideDuration: Duration, - numPartitions: Int + numPartitions: Int, + filterFunc: JFunction[(K, V), java.lang.Boolean] ): JavaPairDStream[K, V] = { dstream.reduceByKeyAndWindow( reduceFunc, invReduceFunc, windowDuration, slideDuration, - numPartitions) + numPartitions, + (p: (K, V)) => filterFunc(p).booleanValue() + ) } /** - * Create a new DStream by reducing over a using incremental computation. + * Return a new DStream by applying incremental `reduceByKey` over a sliding window. * The reduced value of over a new window is calculated using the old window's reduce value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) @@ -374,49 +355,26 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param filterFunc function to filter expired key-value pairs; + * only pairs that satisfy the function are retained + * set this to null if you do not want to filter */ def reduceByKeyAndWindow( reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], windowDuration: Duration, slideDuration: Duration, - partitioner: Partitioner - ): JavaPairDStream[K, V] = { + partitioner: Partitioner, + filterFunc: JFunction[(K, V), java.lang.Boolean] + ): JavaPairDStream[K, V] = { dstream.reduceByKeyAndWindow( reduceFunc, invReduceFunc, windowDuration, slideDuration, - partitioner) - } - - /** - * Create a new DStream by counting the number of values for each key over a window. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - */ - def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) - : JavaPairDStream[K, JLong] = { - JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration)) - } - - /** - * Create a new DStream by counting the number of values for each key over a window. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param numPartitions Number of partitions of each RDD in the new DStream. - */ - def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) - : JavaPairDStream[K, Long] = { - dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions) + partitioner, + (p: (K, V)) => filterFunc(p).booleanValue() + ) } private def convertUpdateStateFunction[S](in: JFunction2[JList[V], Optional[S], Optional[S]]): diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 5bbf2b084f..d9a676819a 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -130,7 +130,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { */ def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel) : JavaDStream[String] = { - ssc.networkTextStream(hostname, port, storageLevel) + ssc.socketTextStream(hostname, port, storageLevel) } /** @@ -140,8 +140,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data */ - def networkTextStream(hostname: String, port: Int): JavaDStream[String] = { - ssc.networkTextStream(hostname, port) + def socketTextStream(hostname: String, port: Int): JavaDStream[String] = { + ssc.socketTextStream(hostname, port) } /** @@ -154,7 +154,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects received (after converting bytes to objects) */ - def networkStream[T]( + def socketStream[T]( hostname: String, port: Int, converter: JFunction[InputStream, java.lang.Iterable[T]], @@ -163,7 +163,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { def fn = (x: InputStream) => converter.apply(x).toIterator implicit val cmt: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - ssc.networkStream(hostname, port, fn, storageLevel) + ssc.socketStream(hostname, port, fn, storageLevel) } /** @@ -314,12 +314,11 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Sets the context to periodically checkpoint the DStream operations for master - * fault-tolerance. By default, the graph will be checkpointed every batch interval. + * fault-tolerance. The graph will be checkpointed every batch interval. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored - * @param interval checkpoint interval */ - def checkpoint(directory: String, interval: Duration = null) { - ssc.checkpoint(directory, interval) + def checkpoint(directory: String) { + ssc.checkpoint(directory) } /** diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala index c6ffb252ce..41b9bd9461 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala @@ -21,19 +21,21 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData + // Latest file mod time seen till any point of time private val lastModTimeFiles = new HashSet[String]() private var lastModTime = 0L @transient private var path_ : Path = null @transient private var fs_ : FileSystem = null - @transient private var files = new HashMap[Time, Array[String]] + @transient private[streaming] var files = new HashMap[Time, Array[String]] override def start() { if (newFilesOnly) { - lastModTime = System.currentTimeMillis() + lastModTime = graph.zeroTime.milliseconds } else { lastModTime = 0 } + logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly) } override def stop() { } @@ -43,38 +45,50 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K * a union RDD out of them. Note that this maintains the list of files that were processed * in the latest modification time in the previous call to this method. This is because the * modification time returned by the FileStatus API seems to return times only at the - * granularity of seconds. Hence, new files may have the same modification time as the - * latest modification time in the previous call to this method and the list of files - * maintained is used to filter the one that have been processed. + * granularity of seconds. And new files may have the same modification time as the + * latest modification time in the previous call to this method yet was not reported in + * the previous call. */ override def compute(validTime: Time): Option[RDD[(K, V)]] = { + assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime) + // Create the filter for selecting new files val newFilter = new PathFilter() { + // Latest file mod time seen in this round of fetching files and its corresponding files var latestModTime = 0L val latestModTimeFiles = new HashSet[String]() def accept(path: Path): Boolean = { - if (!filter(path)) { + if (!filter(path)) { // Reject file if it does not satisfy filter + logDebug("Rejected by filter " + path) return false - } else { + } else { // Accept file only if val modTime = fs.getFileStatus(path).getModificationTime() - if (modTime < lastModTime){ - return false + logDebug("Mod time for " + path + " is " + modTime) + if (modTime < lastModTime) { + logDebug("Mod time less than last mod time") + return false // If the file was created before the last time it was called } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) { - return false + logDebug("Mod time equal to last mod time, but file considered already") + return false // If the file was created exactly as lastModTime but not reported yet + } else if (modTime > validTime.milliseconds) { + logDebug("Mod time more than valid time") + return false // If the file was created after the time this function call requires } if (modTime > latestModTime) { latestModTime = modTime latestModTimeFiles.clear() + logDebug("Latest mod time updated to " + latestModTime) } latestModTimeFiles += path.toString + logDebug("Accepted " + path) return true } } } - + logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime) val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString) - logInfo("New files: " + newFiles.mkString(", ")) + logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) if (newFiles.length > 0) { // Update the modification time and the files processed for that modification time if (lastModTime != newFilter.latestModTime) { @@ -82,17 +96,21 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K lastModTimeFiles.clear() } lastModTimeFiles ++= newFilter.latestModTimeFiles + logDebug("Last mod time updated to " + lastModTime) } files += ((validTime, newFiles)) Some(filesToRDD(newFiles)) } - /** Forget the old time-to-files mappings along with old RDDs */ - protected[streaming] override def forgetOldMetadata(time: Time) { - super.forgetOldMetadata(time) - val filesToBeRemoved = files.filter(_._1 <= (time - rememberDuration)) - files --= filesToBeRemoved.keys - logInfo("Forgot " + filesToBeRemoved.size + " files from " + this) + /** Clear the old time-to-files mappings along with old RDDs */ + protected[streaming] override def clearOldMetadata(time: Time) { + super.clearOldMetadata(time) + val oldFiles = files.filter(_._1 <= (time - rememberDuration)) + files --= oldFiles.keys + logInfo("Cleared " + oldFiles.size + " old files that were older than " + + (time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) + logDebug("Cleared files are:\n" + + oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) } /** Generate one RDD from an array of files */ @@ -128,7 +146,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K private[streaming] class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) { - def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]] + def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]] override def update() { hadoopFiles.clear() @@ -139,14 +157,20 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K override def restore() { hadoopFiles.foreach { - case (time, files) => { - logInfo("Restoring Hadoop RDD for time " + time + " from files " + - files.mkString("[", ",", "]") ) - files - generatedRDDs += ((time, filesToRDD(files))) + case (t, f) => { + // Restore the metadata in both files and generatedRDDs + logInfo("Restoring files for time " + t + " - " + + f.mkString("[", ", ", "]") ) + files += ((t, f)) + generatedRDDs += ((t, filesToRDD(f))) } } } + + override def toString() = { + "[\n" + hadoopFiles.size + " file sets\n" + + hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]" + } } } diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala index efc7058480..c9644b3a83 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala @@ -25,7 +25,7 @@ class FlumeInputDStream[T: ClassManifest]( storageLevel: StorageLevel ) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { - override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = { + override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = { new FlumeReceiver(host, port, storageLevel) } } @@ -134,4 +134,4 @@ class FlumeReceiver( } override def getLocationPreference = Some(host) -}
\ No newline at end of file +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala index 980ca5177e..a4db44a608 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala @@ -1,10 +1,42 @@ package spark.streaming.dstream -import spark.streaming.{Duration, StreamingContext, DStream} +import spark.streaming.{Time, Duration, StreamingContext, DStream} +/** + * This is the abstract base class for all input streams. This class provides to methods + * start() and stop() which called by the scheduler to start and stop receiving data/ + * Input streams that can generated RDDs from new data just by running a service on + * the driver node (that is, without running a receiver onworker nodes) can be + * implemented by directly subclassing this InputDStream. For example, + * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory for + * new files and generates RDDs on the new files. For implementing input streams + * that requires running a receiver on the worker nodes, use NetworkInputDStream + * as the parent class. + */ abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) { + var lastValidTime: Time = null + + /** + * Checks whether the 'time' is valid wrt slideDuration for generating RDD. + * Additionally it also ensures valid times are in strictly increasing order. + * This ensures that InputDStream.compute() is called strictly on increasing + * times. + */ + override protected def isTimeValid(time: Time): Boolean = { + if (!super.isTimeValid(time)) { + false // Time not valid + } else { + // Time is valid, but check it it is more than lastValidTime + if (lastValidTime == null || lastValidTime <= time) { + logWarning("isTimeValid called with " + time + " where as last valid time is " + lastValidTime) + } + lastValidTime = time + true + } + } + override def dependencies = List() override def slideDuration: Duration = { @@ -13,7 +45,9 @@ abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContex ssc.graph.batchDuration } + /** Method called to start receiving data. Subclasses must implement this method. */ def start() + /** Method called to stop receiving data. Subclasses must implement this method. */ def stop() } diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index 4f8c8b9d10..dc7139cc27 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -41,7 +41,8 @@ class KafkaInputDStream[T: ClassManifest]( storageLevel: StorageLevel ) extends NetworkInputDStream[T](ssc_ ) with Logging { - def createReceiver(): NetworkReceiver[T] = { + + def getReceiver(): NetworkReceiver[T] = { new KafkaReceiver(zkQuorum, groupId, topics, initialOffsets, storageLevel) .asInstanceOf[NetworkReceiver[T]] } @@ -73,7 +74,7 @@ class KafkaReceiver(zkQuorum: String, groupId: String, logInfo("Starting Kafka Consumer Stream with group: " + groupId) logInfo("Initial offsets: " + initialOffsets.toString) - + // Zookeper connection properties val props = new Properties() props.put("zk.connect", zkQuorum) @@ -104,7 +105,7 @@ class KafkaReceiver(zkQuorum: String, groupId: String, offsets.foreach { case(key, offset) => val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic) val partitionName = key.brokerId + "-" + key.partId - updatePersistentPath(consumerConnector.zkClient, + updatePersistentPath(consumerConnector.zkClient, topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString) } } @@ -115,10 +116,10 @@ class KafkaReceiver(zkQuorum: String, groupId: String, logInfo("Starting MessageHandler.") stream.takeWhile { msgAndMetadata => blockGenerator += msgAndMetadata.message - // Keep on handling messages + true - } + } } } } diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index 8c322dd698..7385474963 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -20,7 +20,7 @@ import java.util.concurrent.ArrayBlockingQueue /** * Abstract class for defining any InputDStream that has to start a receiver on worker * nodes to receive external data. Specific implementations of NetworkInputDStream must - * define the createReceiver() function that creates the receiver object of type + * define the getReceiver() function that gets the receiver object of type * [[spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive * data. * @param ssc_ Streaming context that will execute this input stream @@ -34,11 +34,11 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming val id = ssc.getNewNetworkStreamId() /** - * Creates the receiver object that will be sent to the worker nodes + * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation * of a NetworkInputDStream. */ - def createReceiver(): NetworkReceiver[T] + def getReceiver(): NetworkReceiver[T] // Nothing to start or stop as both taken care of by the NetworkInputTracker. def start() {} @@ -46,8 +46,15 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming def stop() {} override def compute(validTime: Time): Option[RDD[T]] = { - val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime) - Some(new BlockRDD[T](ssc.sc, blockIds)) + // If this is called for any time before the start time of the context, + // then this returns an empty RDD. This may happen when recovering from a + // master failure + if (validTime >= graph.startTime) { + val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime) + Some(new BlockRDD[T](ssc.sc, blockIds)) + } else { + Some(new BlockRDD[T](ssc.sc, Array[String]())) + } } } diff --git a/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala new file mode 100644 index 0000000000..3c2a81947b --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala @@ -0,0 +1,13 @@ +package spark.streaming.dstream + +import spark.streaming.StreamingContext + +private[streaming] +class PluggableInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) { + + def getReceiver(): NetworkReceiver[T] = { + receiver + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala index 04e6b69b7b..1b2fa56779 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala @@ -25,7 +25,7 @@ class RawInputDStream[T: ClassManifest]( storageLevel: StorageLevel ) extends NetworkInputDStream[T](ssc_ ) with Logging { - def createReceiver(): NetworkReceiver[T] = { + def getReceiver(): NetworkReceiver[T] = { new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]] } } diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala index 733d5c4a25..aa5a71e1ed 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -3,7 +3,7 @@ package spark.streaming.dstream import spark.streaming.StreamingContext._ import spark.RDD -import spark.rdd.CoGroupedRDD +import spark.rdd.{CoGroupedRDD, MapPartitionsRDD} import spark.Partitioner import spark.SparkContext._ import spark.storage.StorageLevel @@ -15,7 +15,8 @@ private[streaming] class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( parent: DStream[(K, V)], reduceFunc: (V, V) => V, - invReduceFunc: (V, V) => V, + invReduceFunc: (V, V) => V, + filterFunc: Option[((K, V)) => Boolean], _windowDuration: Duration, _slideDuration: Duration, partitioner: Partitioner @@ -87,22 +88,25 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( // // Get the RDDs of the reduced values in "old time steps" - val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) + val oldRDDs = + reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) logDebug("# old RDDs = " + oldRDDs.size) // Get the RDDs of the reduced values in "new time steps" - val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime) + val newRDDs = + reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime) logDebug("# new RDDs = " + newRDDs.size) // Get the RDD of the reduced value of the previous window - val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]())) + val previousWindowRDD = + getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]())) // Make the list of RDDs that needs to cogrouped together for reducing their reduced values val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs // Cogroup the reduced RDDs and merge the reduced values - val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner) - //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ + val cogroupedRDD = + new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner) val numOldValues = oldRDDs.size val numNewValues = newRDDs.size @@ -114,7 +118,9 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( // Getting reduced values "old time steps" that will be removed from current window val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head) // Getting reduced values "new time steps" - val newValues = (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head) + val newValues = + (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head) + if (seqOfValues(0).isEmpty) { // If previous window's reduce value does not exist, then at least new values should exist if (newValues.isEmpty) { @@ -140,10 +146,12 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues) - Some(mergedValuesRDD) + if (filterFunc.isDefined) { + Some(mergedValuesRDD.filter(filterFunc.get)) + } else { + Some(mergedValuesRDD) + } } - - } diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala index d42027092b..4af839ad7f 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala @@ -15,7 +15,7 @@ class SocketInputDStream[T: ClassManifest]( storageLevel: StorageLevel ) extends NetworkInputDStream[T](ssc_) { - def createReceiver(): NetworkReceiver[T] = { + def getReceiver(): NetworkReceiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) } } diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala new file mode 100644 index 0000000000..c697498862 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala @@ -0,0 +1,72 @@ +package spark.streaming.dstream + +import spark._ +import spark.streaming._ +import storage.StorageLevel + +import twitter4j._ +import twitter4j.auth.BasicAuthorization + +/* A stream of Twitter statuses, potentially filtered by one or more keywords. +* +* @constructor create a new Twitter stream using the supplied username and password to authenticate. +* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is +* such that this may return a sampled subset of all tweets during each interval. +*/ +private[streaming] +class TwitterInputDStream( + @transient ssc_ : StreamingContext, + username: String, + password: String, + filters: Seq[String], + storageLevel: StorageLevel + ) extends NetworkInputDStream[Status](ssc_) { + + override def getReceiver(): NetworkReceiver[Status] = { + new TwitterReceiver(username, password, filters, storageLevel) + } +} + +private[streaming] +class TwitterReceiver( + username: String, + password: String, + filters: Seq[String], + storageLevel: StorageLevel + ) extends NetworkReceiver[Status] { + + var twitterStream: TwitterStream = _ + lazy val blockGenerator = new BlockGenerator(storageLevel) + + protected override def onStart() { + blockGenerator.start() + twitterStream = new TwitterStreamFactory() + .getInstance(new BasicAuthorization(username, password)) + twitterStream.addListener(new StatusListener { + def onStatus(status: Status) = { + blockGenerator += status + } + // Unimplemented + def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} + def onTrackLimitationNotice(i: Int) {} + def onScrubGeo(l: Long, l1: Long) {} + def onStallWarning(stallWarning: StallWarning) {} + def onException(e: Exception) { stopOnError(e) } + }) + + val query: FilterQuery = new FilterQuery + if (filters.size > 0) { + query.track(filters.toArray) + twitterStream.filter(query) + } else { + twitterStream.sample() + } + logInfo("Twitter receiver started") + } + + protected override def onStop() { + blockGenerator.stop() + twitterStream.shutdown() + logInfo("Twitter receiver stopped") + } +} diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala new file mode 100644 index 0000000000..b3201d0b28 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala @@ -0,0 +1,153 @@ +package spark.streaming.receivers + +import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy } +import akka.actor.{ actorRef2Scala, ActorRef } +import akka.actor.{ PossiblyHarmful, OneForOneStrategy } + +import spark.storage.StorageLevel +import spark.streaming.dstream.NetworkReceiver + +import java.util.concurrent.atomic.AtomicInteger + +/** A helper with set of defaults for supervisor strategy **/ +object ReceiverSupervisorStrategy { + + import akka.util.duration._ + import akka.actor.SupervisorStrategy._ + + val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = + 15 millis) { + case _: RuntimeException ⇒ Restart + case _: Exception ⇒ Escalate + } +} + +/** + * A receiver trait to be mixed in with your Actor to gain access to + * pushBlock API. + * + * @example {{{ + * class MyActor extends Actor with Receiver{ + * def receive { + * case anything :String ⇒ pushBlock(anything) + * } + * } + * //Can be plugged in actorStream as follows + * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") + * + * }}} + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of push block and InputDStream + * should be same. + * + */ +trait Receiver { self: Actor ⇒ + def pushBlock[T: ClassManifest](iter: Iterator[T]) { + context.parent ! Data(iter) + } + + def pushBlock[T: ClassManifest](data: T) { + context.parent ! Data(data) + } + +} + +/** + * Statistics for querying the supervisor about state of workers + */ +case class Statistics(numberOfMsgs: Int, + numberOfWorkers: Int, + numberOfHiccups: Int, + otherInfo: String) + +/** Case class to receive data sent by child actors **/ +private[streaming] case class Data[T: ClassManifest](data: T) + +/** + * Provides Actors as receivers for receiving stream. + * + * As Actors can also be used to receive data from almost any stream source. + * A nice set of abstraction(s) for actors as receivers is already provided for + * a few general cases. It is thus exposed as an API where user may come with + * his own Actor to run as receiver for Spark Streaming input source. + * + * This starts a supervisor actor which starts workers and also provides + * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance]. + * + * Here's a way to start more supervisor/workers as its children. + * + * @example {{{ + * context.parent ! Props(new Supervisor) + * }}} OR {{{ + * context.parent ! Props(new Worker,"Worker") + * }}} + * + * + */ +private[streaming] class ActorReceiver[T: ClassManifest]( + props: Props, + name: String, + storageLevel: StorageLevel, + receiverSupervisorStrategy: SupervisorStrategy) + extends NetworkReceiver[T] { + + protected lazy val blocksGenerator: BlockGenerator = + new BlockGenerator(storageLevel) + + protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor), + "Supervisor" + streamId) + + private class Supervisor extends Actor { + + override val supervisorStrategy = receiverSupervisorStrategy + val worker = context.actorOf(props, name) + logInfo("Started receiver worker at:" + worker.path) + + val n: AtomicInteger = new AtomicInteger(0) + val hiccups: AtomicInteger = new AtomicInteger(0) + + def receive = { + + case Data(iter: Iterator[_]) ⇒ pushBlock(iter.asInstanceOf[Iterator[T]]) + + case Data(msg) ⇒ + blocksGenerator += msg.asInstanceOf[T] + n.incrementAndGet + + case props: Props ⇒ + val worker = context.actorOf(props) + logInfo("Started receiver worker at:" + worker.path) + sender ! worker + + case (props: Props, name: String) ⇒ + val worker = context.actorOf(props, name) + logInfo("Started receiver worker at:" + worker.path) + sender ! worker + + case _: PossiblyHarmful => hiccups.incrementAndGet() + + case _: Statistics ⇒ + val workers = context.children + sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n")) + + } + } + + protected def pushBlock(iter: Iterator[T]) { + pushBlock("block-" + streamId + "-" + System.nanoTime(), + iter, null, storageLevel) + } + + protected def onStart() = { + blocksGenerator.start() + supervisor + logInfo("Supervision tree for receivers initialized at:" + supervisor.path) + } + + protected def onStop() = { + supervisor ! PoisonPill + } + +} diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala new file mode 100644 index 0000000000..bdd9f4d753 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala @@ -0,0 +1,392 @@ +package spark.streaming.util + +import spark.{Logging, RDD} +import spark.streaming._ +import spark.streaming.dstream.ForEachDStream +import StreamingContext._ + +import scala.util.Random +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} + +import java.io.{File, ObjectInputStream, IOException} +import java.util.UUID + +import com.google.common.io.Files + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.{FileUtil, FileSystem, Path} +import org.apache.hadoop.conf.Configuration + + +private[streaming] +object MasterFailureTest extends Logging { + initLogging() + + @volatile var killed = false + @volatile var killCount = 0 + + def main(args: Array[String]) { + if (args.size < 2) { + println( + "Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]") + System.exit(1) + } + val directory = args(0) + val numBatches = args(1).toInt + val batchDuration = if (args.size > 2) Milliseconds(args(2).toInt) else Seconds(1) + + println("\n\n========================= MAP TEST =========================\n\n") + testMap(directory, numBatches, batchDuration) + + println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n") + testUpdateStateByKey(directory, numBatches, batchDuration) + + println("\n\nSUCCESS\n\n") + } + + def testMap(directory: String, numBatches: Int, batchDuration: Duration) { + // Input: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ... + val input = (1 to numBatches).map(_.toString).toSeq + // Expected output: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ... + val expectedOutput = (1 to numBatches) + + val operation = (st: DStream[String]) => st.map(_.toInt) + + // Run streaming operation with multiple master failures + val output = testOperation(directory, batchDuration, input, operation, expectedOutput) + + logInfo("Expected output, size = " + expectedOutput.size) + logInfo(expectedOutput.mkString("[", ",", "]")) + logInfo("Output, size = " + output.size) + logInfo(output.mkString("[", ",", "]")) + + // Verify whether all the values of the expected output is present + // in the output + assert(output.distinct.toSet == expectedOutput.toSet) + } + + + def testUpdateStateByKey(directory: String, numBatches: Int, batchDuration: Duration) { + // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ... + val input = (1 to numBatches).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq + // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ... + val expectedOutput = (1L to numBatches).map(i => (1L to i).reduce(_ + _)).map(j => ("a", j)) + + val operation = (st: DStream[String]) => { + val updateFunc = (values: Seq[Long], state: Option[Long]) => { + Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L)) + } + st.flatMap(_.split(" ")) + .map(x => (x, 1L)) + .updateStateByKey[Long](updateFunc) + .checkpoint(batchDuration * 5) + } + + // Run streaming operation with multiple master failures + val output = testOperation(directory, batchDuration, input, operation, expectedOutput) + + logInfo("Expected output, size = " + expectedOutput.size + "\n" + expectedOutput) + logInfo("Output, size = " + output.size + "\n" + output) + + // Verify whether all the values in the output are among the expected output values + output.foreach(o => + assert(expectedOutput.contains(o), "Expected value " + o + " not found") + ) + + // Verify whether the last expected output value has been generated, there by + // confirming that none of the inputs have been missed + assert(output.last == expectedOutput.last) + } + + /** + * Tests stream operation with multiple master failures, and verifies whether the + * final set of output values is as expected or not. + */ + def testOperation[T: ClassManifest]( + directory: String, + batchDuration: Duration, + input: Seq[String], + operation: DStream[String] => DStream[T], + expectedOutput: Seq[T] + ): Seq[T] = { + + // Just making sure that the expected output does not have duplicates + assert(expectedOutput.distinct.toSet == expectedOutput.toSet) + + // Setup the stream computation with the given operation + val (ssc, checkpointDir, testDir) = setupStreams(directory, batchDuration, operation) + + // Start generating files in the a different thread + val fileGeneratingThread = new FileGeneratingThread(input, testDir, batchDuration.milliseconds) + fileGeneratingThread.start() + + // Run the streams and repeatedly kill it until the last expected output + // has been generated, or until it has run for twice the expected time + val lastExpectedOutput = expectedOutput.last + val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2 + val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun) + + // Delete directories + fileGeneratingThread.join() + val fs = checkpointDir.getFileSystem(new Configuration()) + fs.delete(checkpointDir, true) + fs.delete(testDir, true) + logInfo("Finished test after " + killCount + " failures") + mergedOutput + } + + /** + * Sets up the stream computation with the given operation, directory (local or HDFS), + * and batch duration. Returns the streaming context and the directory to which + * files should be written for testing. + */ + private def setupStreams[T: ClassManifest]( + directory: String, + batchDuration: Duration, + operation: DStream[String] => DStream[T] + ): (StreamingContext, Path, Path) = { + // Reset all state + reset() + + // Create the directories for this test + val uuid = UUID.randomUUID().toString + val rootDir = new Path(directory, uuid) + val fs = rootDir.getFileSystem(new Configuration()) + val checkpointDir = new Path(rootDir, "checkpoint") + val testDir = new Path(rootDir, "test") + fs.mkdirs(checkpointDir) + fs.mkdirs(testDir) + + // Setup the streaming computation with the given operation + System.clearProperty("spark.driver.port") + var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration) + ssc.checkpoint(checkpointDir.toString) + val inputStream = ssc.textFileStream(testDir.toString) + val operatedStream = operation(inputStream) + val outputStream = new TestOutputStream(operatedStream) + ssc.registerOutputStream(outputStream) + (ssc, checkpointDir, testDir) + } + + + /** + * Repeatedly starts and kills the streaming context until timed out or + * the last expected output is generated. Finally, return + */ + private def runStreams[T: ClassManifest]( + ssc_ : StreamingContext, + lastExpectedOutput: T, + maxTimeToRun: Long + ): Seq[T] = { + + var ssc = ssc_ + var totalTimeRan = 0L + var isLastOutputGenerated = false + var isTimedOut = false + val mergedOutput = new ArrayBuffer[T]() + val checkpointDir = ssc.checkpointDir + var batchDuration = ssc.graph.batchDuration + + while(!isLastOutputGenerated && !isTimedOut) { + // Get the output buffer + val outputBuffer = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[T]].output + def output = outputBuffer.flatMap(x => x) + + // Start the thread to kill the streaming after some time + killed = false + val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10) + killingThread.start() + + var timeRan = 0L + try { + // Start the streaming computation and let it run while ... + // (i) StreamingContext has not been shut down yet + // (ii) The last expected output has not been generated yet + // (iii) Its not timed out yet + System.clearProperty("spark.streaming.clock") + System.clearProperty("spark.driver.port") + ssc.start() + val startTime = System.currentTimeMillis() + while (!killed && !isLastOutputGenerated && !isTimedOut) { + Thread.sleep(100) + timeRan = System.currentTimeMillis() - startTime + isLastOutputGenerated = (!output.isEmpty && output.last == lastExpectedOutput) + isTimedOut = (timeRan + totalTimeRan > maxTimeToRun) + } + } catch { + case e: Exception => logError("Error running streaming context", e) + } + if (killingThread.isAlive) killingThread.interrupt() + ssc.stop() + + logInfo("Has been killed = " + killed) + logInfo("Is last output generated = " + isLastOutputGenerated) + logInfo("Is timed out = " + isTimedOut) + + // Verify whether the output of each batch has only one element or no element + // and then merge the new output with all the earlier output + mergedOutput ++= output + totalTimeRan += timeRan + logInfo("New output = " + output) + logInfo("Merged output = " + mergedOutput) + logInfo("Time ran = " + timeRan) + logInfo("Total time ran = " + totalTimeRan) + + if (!isLastOutputGenerated && !isTimedOut) { + val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 10) + logInfo( + "\n-------------------------------------------\n" + + " Restarting stream computation in " + sleepTime + " ms " + + "\n-------------------------------------------\n" + ) + Thread.sleep(sleepTime) + // Recreate the streaming context from checkpoint + ssc = new StreamingContext(checkpointDir) + } + } + mergedOutput + } + + /** + * Verifies the output value are the same as expected. Since failures can lead to + * a batch being processed twice, a batches output may appear more than once + * consecutively. To avoid getting confused with those, we eliminate consecutive + * duplicate batch outputs of values from the `output`. As a result, the + * expected output should not have consecutive batches with the same values as output. + */ + private def verifyOutput[T: ClassManifest](output: Seq[T], expectedOutput: Seq[T]) { + // Verify whether expected outputs do not consecutive batches with same output + for (i <- 0 until expectedOutput.size - 1) { + assert(expectedOutput(i) != expectedOutput(i+1), + "Expected output has consecutive duplicate sequence of values") + } + + // Log the output + println("Expected output, size = " + expectedOutput.size) + println(expectedOutput.mkString("[", ",", "]")) + println("Output, size = " + output.size) + println(output.mkString("[", ",", "]")) + + // Match the output with the expected output + output.foreach(o => + assert(expectedOutput.contains(o), "Expected value " + o + " not found") + ) + } + + /** Resets counter to prepare for the test */ + private def reset() { + killed = false + killCount = 0 + } +} + +/** + * This is a output stream just for testing. All the output is collected into a + * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. + */ +private[streaming] +class TestOutputStream[T: ClassManifest]( + parent: DStream[T], + val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]] + ) extends ForEachDStream[T]( + parent, + (rdd: RDD[T], t: Time) => { + val collected = rdd.collect() + output += collected + } + ) { + + // This is to clear the output buffer every it is read from a checkpoint + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + ois.defaultReadObject() + output.clear() + } +} + + +/** + * Thread to kill streaming context after a random period of time. + */ +private[streaming] +class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging { + initLogging() + + override def run() { + try { + // If it is the first killing, then allow the first checkpoint to be created + var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 2000 + val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime) + logInfo("Kill wait time = " + killWaitTime) + Thread.sleep(killWaitTime) + logInfo( + "\n---------------------------------------\n" + + "Killing streaming context after " + killWaitTime + " ms" + + "\n---------------------------------------\n" + ) + if (ssc != null) { + ssc.stop() + MasterFailureTest.killed = true + MasterFailureTest.killCount += 1 + } + logInfo("Killing thread finished normally") + } catch { + case ie: InterruptedException => logInfo("Killing thread interrupted") + case e: Exception => logWarning("Exception in killing thread", e) + } + + } +} + + +/** + * Thread to generate input files periodically with the desired text. + */ +private[streaming] +class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) + extends Thread with Logging { + initLogging() + + override def run() { + val localTestDir = Files.createTempDir() + var fs = testDir.getFileSystem(new Configuration()) + val maxTries = 3 + try { + Thread.sleep(5000) // To make sure that all the streaming context has been set up + for (i <- 0 until input.size) { + // Write the data to a local file and then move it to the target test directory + val localFile = new File(localTestDir, (i+1).toString) + val hadoopFile = new Path(testDir, (i+1).toString) + FileUtils.writeStringToFile(localFile, input(i).toString + "\n") + var tries = 0 + var done = false + while (!done && tries < maxTries) { + tries += 1 + try { + fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + done = true + } catch { + case ioe: IOException => { + fs = testDir.getFileSystem(new Configuration()) + logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe) + } + } + } + if (!done) + logError("Could not generate file " + hadoopFile) + else + logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis) + Thread.sleep(interval) + localFile.delete() + } + logInfo("File generating thread finished normally") + } catch { + case ie: InterruptedException => logInfo("File generating thread interrupted") + case e: Exception => logWarning("File generating in killing thread", e) + } finally { + fs.close() + } + } +} + + diff --git a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala index db715cc295..8e10276deb 100644 --- a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala @@ -3,9 +3,9 @@ package spark.streaming.util private[streaming] class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) { - val minPollTime = 25L + private val minPollTime = 25L - val pollTime = { + private val pollTime = { if (period / 10.0 > minPollTime) { (period / 10.0).toLong } else { @@ -13,11 +13,20 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => } } - val thread = new Thread() { + private val thread = new Thread() { override def run() { loop } } - var nextTime = 0L + private var nextTime = 0L + + def getStartTime(): Long = { + (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period + } + + def getRestartTime(originalStartTime: Long): Long = { + val gap = clock.currentTime - originalStartTime + (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime + } def start(startTime: Long): Long = { nextTime = startTime @@ -26,21 +35,14 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => } def start(): Long = { - val startTime = (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period - start(startTime) + start(getStartTime()) } - def restart(originalStartTime: Long): Long = { - val gap = clock.currentTime - originalStartTime - val newStartTime = (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime - start(newStartTime) - } - - def stop() { + def stop() { thread.interrupt() } - def loop() { + private def loop() { try { while (true) { clock.waitTillTime(nextTime) diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index fbe4af4597..5d510fd89f 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -33,8 +33,9 @@ public class JavaAPISuite implements Serializable { @Before public void setUp() { - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); - ssc.checkpoint("checkpoint", new Duration(1000)); + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc.checkpoint("checkpoint"); } @After @@ -45,7 +46,7 @@ public class JavaAPISuite implements Serializable { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port"); } - /* + @Test public void testCount() { List<List<Integer>> inputData = Arrays.asList( @@ -134,29 +135,6 @@ public class JavaAPISuite implements Serializable { } @Test - public void testTumble() { - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9), - Arrays.asList(10,11,12), - Arrays.asList(13,14,15), - Arrays.asList(16,17,18)); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(1,2,3,4,5,6), - Arrays.asList(7,8,9,10,11,12), - Arrays.asList(13,14,15,16,17,18)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.tumble(new Duration(2000)); - JavaTestUtils.attachTestOutputStream(windowed); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 6, 3); - - assertOrderInvariantEquals(expected, result); - } - - @Test public void testFilter() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), @@ -434,7 +412,7 @@ public class JavaAPISuite implements Serializable { assertOrderInvariantEquals(expected, result); } - */ + /* * Performs an order-invariant comparison of lists representing two RDD streams. This allows * us to account for ordering variation within individual RDD's which occurs during windowing. @@ -450,7 +428,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, actual); } - /* + // PairDStream Functions @Test public void testPairFilter() { @@ -583,50 +561,73 @@ public class JavaAPISuite implements Serializable { } @Test - public void testCountByKey() { - List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + public void testCountByValue() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("hello", "world"), + Arrays.asList("hello", "moon"), + Arrays.asList("hello")); List<List<Tuple2<String, Long>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<String, Long>("california", 2L), - new Tuple2<String, Long>("new york", 2L)), - Arrays.asList( - new Tuple2<String, Long>("california", 2L), - new Tuple2<String, Long>("new york", 2L))); - - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + Arrays.asList( + new Tuple2<String, Long>("hello", 1L), + new Tuple2<String, Long>("world", 1L)), + Arrays.asList( + new Tuple2<String, Long>("hello", 1L), + new Tuple2<String, Long>("moon", 1L)), + Arrays.asList( + new Tuple2<String, Long>("hello", 1L))); - JavaPairDStream<String, Long> counted = pairStream.countByKey(); + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Long> counted = stream.countByValue(); JavaTestUtils.attachTestOutputStream(counted); - List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3); Assert.assertEquals(expected, result); } @Test public void testGroupByKeyAndWindow() { - List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - List<List<Tuple2<String, List<String>>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")), - new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))), - Arrays.asList(new Tuple2<String, List<String>>("california", - Arrays.asList("sharks", "ducks", "dodgers", "giants")), - new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))), - Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")), - new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders")))); + List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3)), + new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 4)) + ), + Arrays.asList( + new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3, 5, 5)), + new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 1, 3, 4)) + ), + Arrays.asList( + new Tuple2<String, List<Integer>>("california", Arrays.asList(5, 5)), + new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 3)) + ) + ); - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, List<String>> groupWindowed = + JavaPairDStream<String, List<Integer>> groupWindowed = pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(groupWindowed); - List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - Assert.assertEquals(expected, result); + assert(result.size() == expected.size()); + for (int i = 0; i < result.size(); i++) { + assert(convert(result.get(i)).equals(convert(expected.get(i)))); + } + } + + private HashSet<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) { + List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<Tuple2<String, HashSet<Integer>>>(); + for (Tuple2<String, List<Integer>> tuple: listOfTuples) { + newListOfTuples.add(convert(tuple)); + } + return new HashSet<Tuple2<String, HashSet<Integer>>>(newListOfTuples); + } + + private Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) { + return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2())); } @Test @@ -711,26 +712,28 @@ public class JavaAPISuite implements Serializable { } @Test - public void testCountByKeyAndWindow() { - List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + public void testCountByValueAndWindow() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("hello", "world"), + Arrays.asList("hello", "moon"), + Arrays.asList("hello")); List<List<Tuple2<String, Long>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Long>("california", 2L), - new Tuple2<String, Long>("new york", 2L)), + new Tuple2<String, Long>("hello", 1L), + new Tuple2<String, Long>("world", 1L)), Arrays.asList( - new Tuple2<String, Long>("california", 4L), - new Tuple2<String, Long>("new york", 4L)), + new Tuple2<String, Long>("hello", 2L), + new Tuple2<String, Long>("world", 1L), + new Tuple2<String, Long>("moon", 1L)), Arrays.asList( - new Tuple2<String, Long>("california", 2L), - new Tuple2<String, Long>("new york", 2L))); + new Tuple2<String, Long>("hello", 2L), + new Tuple2<String, Long>("moon", 1L))); - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); - JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, Long> counted = - pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000)); + stream.countByValueAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(counted); List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -897,7 +900,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, result); } - */ + @Test public void testCheckpointMasterRecovery() throws InterruptedException { List<List<String>> inputData = Arrays.asList( @@ -912,7 +915,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList(8,7)); File tempDir = Files.createTempDir(); - ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); + ssc.checkpoint(tempDir.getAbsolutePath()); JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream letterCount = stream.map(new Function<String, Integer>() { @@ -964,7 +967,7 @@ public class JavaAPISuite implements Serializable { assertOrderInvariantEquals(expected, result1); } */ - /* + // Input stream tests. These mostly just test that we can instantiate a given InputStream with // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the // InputStream functionality is deferred to the existing Scala tests. @@ -972,15 +975,15 @@ public class JavaAPISuite implements Serializable { public void testKafkaStream() { HashMap<String, Integer> topics = Maps.newHashMap(); HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap(); - JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics); - JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets); - JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets, + JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); + JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets); + JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets, StorageLevel.MEMORY_AND_DISK()); } @Test public void testNetworkTextStream() { - JavaDStream test = ssc.networkTextStream("localhost", 12345); + JavaDStream test = ssc.socketTextStream("localhost", 12345); } @Test @@ -1000,7 +1003,7 @@ public class JavaAPISuite implements Serializable { } } - JavaDStream test = ssc.networkStream( + JavaDStream test = ssc.socketStream( "localhost", 12345, new Converter(), @@ -1026,5 +1029,5 @@ public class JavaAPISuite implements Serializable { public void testFileStream() { JavaPairDStream<String, String> foo = ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo"); - }*/ + } } diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala index 56349837e5..52ea28732a 100644 --- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala @@ -57,6 +57,7 @@ trait JavaTestBase extends TestSuiteBase { } object JavaTestUtils extends JavaTestBase { + override def maxWaitTimeMillis = 20000 } diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties index edfa1243fa..59c445e63f 100644 --- a/streaming/src/test/resources/log4j.properties +++ b/streaming/src/test/resources/log4j.properties @@ -1,5 +1,6 @@ # Set everything to be logged to the file streaming/target/unit-tests.log log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false log4j.appender.file.file=streaming/target/unit-tests.log diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index c031949dd1..8fce91853c 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -6,6 +6,8 @@ import util.ManualClock class BasicOperationsSuite extends TestSuiteBase { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + override def framework() = "BasicOperationsSuite" after { @@ -22,7 +24,7 @@ class BasicOperationsSuite extends TestSuiteBase { ) } - test("flatmap") { + test("flatMap") { val input = Seq(1 to 4, 5 to 8, 9 to 12) testOperation( input, @@ -86,6 +88,23 @@ class BasicOperationsSuite extends TestSuiteBase { ) } + test("count") { + testOperation( + Seq(1 to 1, 1 to 2, 1 to 3, 1 to 4), + (s: DStream[Int]) => s.count(), + Seq(Seq(1L), Seq(2L), Seq(3L), Seq(4L)) + ) + } + + test("countByValue") { + testOperation( + Seq(1 to 1, Seq(1, 1, 1), 1 to 2, Seq(1, 1, 2, 2)), + (s: DStream[Int]) => s.countByValue(), + Seq(Seq((1, 1L)), Seq((1, 3L)), Seq((1, 1L), (2, 1L)), Seq((2, 2L), (1, 2L))), + true + ) + } + test("mapValues") { testOperation( Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), @@ -204,12 +223,32 @@ class BasicOperationsSuite extends TestSuiteBase { case _ => Option(stateObj) } } - s.map(_ -> 1).updateStateByKey[StateObject](updateFunc).mapValues(_.counter) + s.map(x => (x, 1)).updateStateByKey[StateObject](updateFunc).mapValues(_.counter) } testOperation(inputData, updateStateOperation, outputData, true) } + test("slice") { + val ssc = new StreamingContext("local[2]", "BasicOperationSuite", Seconds(1)) + val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) + val stream = new TestInputStream[Int](ssc, input, 2) + ssc.registerInputStream(stream) + stream.foreach(_ => {}) // Dummy output stream + ssc.start() + Thread.sleep(2000) + def getInputFromSlice(fromMillis: Long, toMillis: Long) = { + stream.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet + } + + assert(getInputFromSlice(0, 1000) == Set(1)) + assert(getInputFromSlice(0, 2000) == Set(1, 2)) + assert(getInputFromSlice(1000, 2000) == Set(1, 2)) + assert(getInputFromSlice(2000, 4000) == Set(2, 3, 4)) + ssc.stop() + Thread.sleep(1000) + } + test("forgetting of RDDs - map and window operations") { assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second") diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index 7126af62d9..cac86deeaf 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -1,5 +1,6 @@ package spark.streaming +import dstream.FileInputDStream import spark.streaming.StreamingContext._ import java.io.File import runtime.RichInt @@ -10,8 +11,16 @@ import util.{Clock, ManualClock} import scala.util.Random import com.google.common.io.Files + +/** + * This test suites tests the checkpointing functionality of DStreams - + * the checkpointing of a DStream's RDDs as well as the checkpointing of + * the whole DStream graph. + */ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + before { FileUtils.deleteDirectory(new File(checkpointDir)) } @@ -30,21 +39,18 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { override def batchDuration = Milliseconds(500) - override def checkpointInterval = batchDuration - override def actuallyWait = true test("basic rdd checkpoints + dstream graph checkpoint recovery") { assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second") - assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration") System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") val stateStreamCheckpointInterval = Seconds(1) // this ensure checkpointing occurs at least once - val firstNumBatches = (stateStreamCheckpointInterval / batchDuration) * 2 + val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2 val secondNumBatches = firstNumBatches // Setup the streams @@ -64,7 +70,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Run till a time such that at least one RDD in the stream should have been checkpointed, // then check whether some RDD has been checkpointed or not ssc.start() - runStreamsWithRealDelay(ssc, firstNumBatches) + advanceTimeWithRealDelay(ssc, firstNumBatches) logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData) assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure") stateStream.checkpointData.checkpointFiles.foreach { @@ -77,7 +83,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Run till a further time such that previous checkpoint files in the stream would be deleted // and check whether the earlier checkpoint files are deleted val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2)) - runStreamsWithRealDelay(ssc, secondNumBatches) + advanceTimeWithRealDelay(ssc, secondNumBatches) checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted")) ssc.stop() @@ -92,7 +98,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Run one batch to generate a new checkpoint file and check whether some RDD // is present in the checkpoint data or not ssc.start() - runStreamsWithRealDelay(ssc, 1) + advanceTimeWithRealDelay(ssc, 1) assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure") stateStream.checkpointData.checkpointFiles.foreach { case (time, data) => { @@ -113,7 +119,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Adjust manual clock time as if it is being restarted after a delay System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString) ssc.start() - runStreamsWithRealDelay(ssc, 4) + advanceTimeWithRealDelay(ssc, 4) ssc.stop() System.clearProperty("spark.streaming.manualClock.jump") ssc = null @@ -168,74 +174,95 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { } // This tests whether file input stream remembers what files were seen before - // the master failure and uses them again to process a large window operatoin. + // the master failure and uses them again to process a large window operation. // It also tests whether batches, whose processing was incomplete due to the // failure, are re-processed or not. test("recovery with file input stream") { + // Disable manual clock as FileInputDStream does not work with manual clock + val clockProperty = System.getProperty("spark.streaming.clock") + System.clearProperty("spark.streaming.clock") + // Set up the streaming context and input streams val testDir = Files.createTempDir() - var ssc = new StreamingContext(master, framework, batchDuration) - ssc.checkpoint(checkpointDir, checkpointInterval) + var ssc = new StreamingContext(master, framework, Seconds(1)) + ssc.checkpoint(checkpointDir) val fileStream = ssc.textFileStream(testDir.toString) // Making value 3 take large time to process, to ensure that the master // shuts down in the middle of processing the 3rd batch val mappedStream = fileStream.map(s => { val i = s.toInt - if (i == 3) Thread.sleep(1000) + if (i == 3) Thread.sleep(2000) i }) + // Reducing over a large window to ensure that recovery from master failure // requires reprocessing of all the files seen before the failure - val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration) + val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1)) val outputBuffer = new ArrayBuffer[Seq[Int]] var outputStream = new TestOutputStream(reducedStream, outputBuffer) ssc.registerOutputStream(outputStream) ssc.start() // Create files and advance manual clock to process them - var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] Thread.sleep(1000) for (i <- Seq(1, 2, 3)) { FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") // wait to make sure that the file is written such that it gets shown in the file listings - Thread.sleep(500) - clock.addToTime(batchDuration.milliseconds) - // wait to make sure that FileInputDStream picks up this file only and not any other file - Thread.sleep(500) + Thread.sleep(1000) } logInfo("Output = " + outputStream.output.mkString(",")) assert(outputStream.output.size > 0, "No files processed before restart") ssc.stop() + // Verify whether files created have been recorded correctly or not + var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] + def recordedFiles = fileInputDStream.files.values.flatMap(x => x) + assert(!recordedFiles.filter(_.endsWith("1")).isEmpty) + assert(!recordedFiles.filter(_.endsWith("2")).isEmpty) + assert(!recordedFiles.filter(_.endsWith("3")).isEmpty) + // Create files while the master is down for (i <- Seq(4, 5, 6)) { FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") Thread.sleep(1000) } - // Restart stream computation from checkpoint and create more files to see whether - // they are being processed + // Recover context from checkpoint file and verify whether the files that were + // recorded before failure were saved and successfully recovered logInfo("*********** RESTARTING ************") ssc = new StreamingContext(checkpointDir) + fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] + assert(!recordedFiles.filter(_.endsWith("1")).isEmpty) + assert(!recordedFiles.filter(_.endsWith("2")).isEmpty) + assert(!recordedFiles.filter(_.endsWith("3")).isEmpty) + + // Restart stream computation ssc.start() - clock = ssc.scheduler.clock.asInstanceOf[ManualClock] for (i <- Seq(7, 8, 9)) { FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") - Thread.sleep(500) - clock.addToTime(batchDuration.milliseconds) - Thread.sleep(500) + Thread.sleep(1000) } Thread.sleep(1000) - logInfo("Output = " + outputStream.output.mkString(",")) + logInfo("Output = " + outputStream.output.mkString("[", ", ", "]")) assert(outputStream.output.size > 0, "No files processed after restart") ssc.stop() + // Verify whether files created while the driver was down have been recorded or not + assert(!recordedFiles.filter(_.endsWith("4")).isEmpty) + assert(!recordedFiles.filter(_.endsWith("5")).isEmpty) + assert(!recordedFiles.filter(_.endsWith("6")).isEmpty) + + // Verify whether new files created after recover have been recorded or not + assert(!recordedFiles.filter(_.endsWith("7")).isEmpty) + assert(!recordedFiles.filter(_.endsWith("8")).isEmpty) + assert(!recordedFiles.filter(_.endsWith("9")).isEmpty) + // Append the new output to the old buffer outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] outputBuffer ++= outputStream.output - // Verify whether data received by Spark Streaming was as expected - val expectedOutput = Seq(1, 3, 6, 28, 36, 45) + val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45) logInfo("--------------------------------") logInfo("output, size = " + outputBuffer.size) outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) @@ -244,11 +271,17 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { logInfo("--------------------------------") // Verify whether all the elements received are as expected - assert(outputBuffer.size === expectedOutput.size) - for (i <- 0 until outputBuffer.size) { - assert(outputBuffer(i).size === 1) - assert(outputBuffer(i).head === expectedOutput(i)) - } + val output = outputBuffer.flatMap(x => x) + assert(output.contains(6)) // To ensure that the 3rd input (i.e., 3) was processed + output.foreach(o => // To ensure all the inputs are correctly added cumulatively + assert(expectedOutput.contains(o), "Expected value " + o + " not found") + ) + // To ensure that all the inputs were received correctly + assert(expectedOutput.last === output.last) + + // Enable manual clock back again for other tests + if (clockProperty != null) + System.setProperty("spark.streaming.clock", clockProperty) } @@ -278,7 +311,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Do the computation for initial number of batches, create checkpoint file and quit ssc = setupStreams[U, V](input, operation) - val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs) + ssc.start() + val output = advanceTimeWithRealDelay[V](ssc, initialNumBatches) + ssc.stop() verifyOutput[V](output, expectedOutput.take(initialNumBatches), true) Thread.sleep(1000) @@ -289,17 +324,20 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { "\n-------------------------------------------\n" ) ssc = new StreamingContext(checkpointDir) - val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs) + System.clearProperty("spark.driver.port") + ssc.start() + val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches) // the first element will be re-processed data of the last batch before restart verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true) + ssc.stop() ssc = null } /** * Advances the manual clock on the streaming scheduler by given number of batches. - * It also wait for the expected amount of time for each batch. + * It also waits for the expected amount of time for each batch. */ - def runStreamsWithRealDelay(ssc: StreamingContext, numBatches: Long) { + def advanceTimeWithRealDelay[V: ClassManifest](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] logInfo("Manual clock before advancing = " + clock.time) for (i <- 1 to numBatches.toInt) { @@ -308,6 +346,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { } logInfo("Manual clock after advancing = " + clock.time) Thread.sleep(batchDuration.milliseconds) - } + val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] + outputStream.output + } }
\ No newline at end of file diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala index c4cfffbfc1..a5fa7ab92d 100644 --- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala @@ -1,191 +1,40 @@ package spark.streaming -import org.scalatest.BeforeAndAfter -import org.apache.commons.io.FileUtils +import spark.Logging +import spark.streaming.util.MasterFailureTest +import StreamingContext._ + +import org.scalatest.{FunSuite, BeforeAndAfter} +import com.google.common.io.Files import java.io.File -import scala.runtime.RichInt -import scala.util.Random -import spark.streaming.StreamingContext._ +import org.apache.commons.io.FileUtils import collection.mutable.ArrayBuffer -import spark.Logging + /** * This testsuite tests master failures at random times while the stream is running using * the real clock. */ -class FailureSuite extends TestSuiteBase with BeforeAndAfter { +class FailureSuite extends FunSuite with BeforeAndAfter with Logging { + + var directory = "FailureSuite" + val numBatches = 30 + val batchDuration = Milliseconds(1000) before { - FileUtils.deleteDirectory(new File(checkpointDir)) + FileUtils.deleteDirectory(new File(directory)) } after { - FailureSuite.reset() - FileUtils.deleteDirectory(new File(checkpointDir)) - - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") - } - - override def framework = "CheckpointSuite" - - override def batchDuration = Milliseconds(500) - - override def checkpointDir = "checkpoint" - - override def checkpointInterval = batchDuration - - test("multiple failures with updateStateByKey") { - val n = 30 - // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ... - val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq - // Last output: [ (a, 465) ] for n=30 - val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) ) - - val operation = (st: DStream[String]) => { - val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { - Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) - } - st.map(x => (x, 1)) - .updateStateByKey[RichInt](updateFunc) - .checkpoint(Seconds(2)) - .map(t => (t._1, t._2.self)) - } - - testOperationWithMultipleFailures(input, operation, lastOutput, n, n) - } - - test("multiple failures with reduceByKeyAndWindow") { - val n = 30 - val w = 100 - assert(w > n, "Window should be much larger than the number of input sets in this test") - // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ... - val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq - // Last output: [ (a, 465) ] - val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) ) - - val operation = (st: DStream[String]) => { - st.map(x => (x, 1)) - .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration) - .checkpoint(Seconds(2)) - } - - testOperationWithMultipleFailures(input, operation, lastOutput, n, n) - } - - - /** - * Tests stream operation with multiple master failures, and verifies whether the - * final set of output values is as expected or not. Checking the final value is - * proof that no intermediate data was lost due to master failures. - */ - def testOperationWithMultipleFailures[U: ClassManifest, V: ClassManifest]( - input: Seq[Seq[U]], - operation: DStream[U] => DStream[V], - lastExpectedOutput: Seq[V], - numBatches: Int, - numExpectedOutput: Int - ) { - var ssc = setupStreams[U, V](input, operation) - val mergedOutput = new ArrayBuffer[Seq[V]]() - - var totalTimeRan = 0L - while(totalTimeRan <= numBatches * batchDuration.milliseconds * 2) { - new KillingThread(ssc, numBatches * batchDuration.milliseconds.toInt / 4).start() - val (output, timeRan) = runStreamsWithRealClock[V](ssc, numBatches, numExpectedOutput) - - mergedOutput ++= output - totalTimeRan += timeRan - logInfo("New output = " + output) - logInfo("Merged output = " + mergedOutput) - logInfo("Total time spent = " + totalTimeRan) - val sleepTime = Random.nextInt(numBatches * batchDuration.milliseconds.toInt / 8) - logInfo( - "\n-------------------------------------------\n" + - " Restarting stream computation in " + sleepTime + " ms " + - "\n-------------------------------------------\n" - ) - Thread.sleep(sleepTime) - FailureSuite.failed = false - ssc = new StreamingContext(checkpointDir) - } - ssc.stop() - ssc = null - - // Verify whether the last output is the expected one - val lastOutput = mergedOutput(mergedOutput.lastIndexWhere(!_.isEmpty)) - assert(lastOutput.toSet === lastExpectedOutput.toSet) - logInfo("Finished computation after " + FailureSuite.failureCount + " failures") + FileUtils.deleteDirectory(new File(directory)) } - /** - * Runs the streams set up in `ssc` on real clock until the expected max number of - */ - def runStreamsWithRealClock[V: ClassManifest]( - ssc: StreamingContext, - numBatches: Int, - maxExpectedOutput: Int - ): (Seq[Seq[V]], Long) = { - - System.clearProperty("spark.streaming.clock") - - assert(numBatches > 0, "Number of batches to run stream computation is zero") - assert(maxExpectedOutput > 0, "Max expected outputs after " + numBatches + " is zero") - logInfo("numBatches = " + numBatches + ", maxExpectedOutput = " + maxExpectedOutput) - - // Get the output buffer - val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] - val output = outputStream.output - val waitTime = (batchDuration.milliseconds * (numBatches.toDouble + 0.5)).toLong - val startTime = System.currentTimeMillis() - - try { - // Start computation - ssc.start() - - // Wait until expected number of output items have been generated - while (output.size < maxExpectedOutput && System.currentTimeMillis() - startTime < waitTime && !FailureSuite.failed) { - logInfo("output.size = " + output.size + ", maxExpectedOutput = " + maxExpectedOutput) - Thread.sleep(100) - } - } catch { - case e: Exception => logInfo("Exception while running streams: " + e) - } finally { - ssc.stop() - } - val timeTaken = System.currentTimeMillis() - startTime - logInfo("" + output.size + " sets of output generated in " + timeTaken + " ms") - (output, timeTaken) + test("multiple failures with map") { + MasterFailureTest.testMap(directory, numBatches, batchDuration) } - -} - -object FailureSuite { - var failed = false - var failureCount = 0 - - def reset() { - failed = false - failureCount = 0 + test("multiple failures with updateStateByKey") { + MasterFailureTest.testUpdateStateByKey(directory, numBatches, batchDuration) } } -class KillingThread(ssc: StreamingContext, maxKillWaitTime: Int) extends Thread with Logging { - initLogging() - - override def run() { - var minKillWaitTime = if (FailureSuite.failureCount == 0) 3000 else 1000 // to allow the first checkpoint - val killWaitTime = minKillWaitTime + Random.nextInt(maxKillWaitTime) - logInfo("Kill wait time = " + killWaitTime) - Thread.sleep(killWaitTime.toLong) - logInfo( - "\n---------------------------------------\n" + - "Killing streaming context after " + killWaitTime + " ms" + - "\n---------------------------------------\n" - ) - if (ssc != null) ssc.stop() - FailureSuite.failed = true - FailureSuite.failureCount += 1 - } -} diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index c442210004..c9f941c5b8 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -1,5 +1,11 @@ package spark.streaming +import akka.actor.Actor +import akka.actor.IO +import akka.actor.IOManager +import akka.actor.Props +import akka.util.ByteString + import dstream.SparkFlumeEvent import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} @@ -7,6 +13,7 @@ import java.util.concurrent.{TimeUnit, ArrayBlockingQueue} import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import util.ManualClock import spark.storage.StorageLevel +import spark.streaming.receivers.Receiver import spark.Logging import scala.util.Random import org.apache.commons.io.FileUtils @@ -25,6 +32,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + val testPort = 9999 + override def checkpointDir = "checkpoint" after { @@ -35,13 +44,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("network input stream") { // Start the server - val testPort = 9999 val testServer = new TestServer(testPort) testServer.start() // Set up the streaming context and input streams val ssc = new StreamingContext(master, framework, batchDuration) - val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val networkStream = ssc.socketTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) @@ -95,7 +103,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val input = Seq(1, 2, 3, 4, 5) - + Thread.sleep(1000) val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 33333)); val client = SpecificRequestor.getClient( classOf[AvroSourceProtocol], transceiver); @@ -133,26 +141,29 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("file input stream") { + // Disable manual clock as FileInputDStream does not work with manual clock + System.clearProperty("spark.streaming.clock") + // Set up the streaming context and input streams val testDir = Files.createTempDir() val ssc = new StreamingContext(master, framework, batchDuration) - val filestream = ssc.textFileStream(testDir.toString) + val fileStream = ssc.textFileStream(testDir.toString) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] def output = outputBuffer.flatMap(x => x) - val outputStream = new TestOutputStream(filestream, outputBuffer) + val outputStream = new TestOutputStream(fileStream, outputBuffer) ssc.registerOutputStream(outputStream) ssc.start() // Create files in the temporary directory so that Spark Streaming can read data from it - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val input = Seq(1, 2, 3, 4, 5) val expectedOutput = input.map(_.toString) Thread.sleep(1000) for (i <- 0 until input.size) { - FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n") - Thread.sleep(500) - clock.addToTime(batchDuration.milliseconds) - //Thread.sleep(100) + val file = new File(testDir, i.toString) + FileUtils.writeStringToFile(file, input(i).toString + "\n") + logInfo("Created file " + file) + Thread.sleep(batchDuration.milliseconds) + Thread.sleep(1000) } val startTime = System.currentTimeMillis() Thread.sleep(1000) @@ -171,16 +182,68 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether all the elements received are as expected // (whether the elements were received one in each interval is not verified) + assert(output.toList === expectedOutput.toList) + + FileUtils.deleteDirectory(testDir) + + // Enable manual clock back again for other tests + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + } + + + test("actor input stream") { + // Start the server + val port = testPort + val testServer = new TestServer(port) + testServer.start() + + // Set up the streaming context and input streams + val ssc = new StreamingContext(master, framework, batchDuration) + val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor", + StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + val outputStream = new TestOutputStream(networkStream, outputBuffer) + def output = outputBuffer.flatMap(x => x) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Feed data to the server to send to the network receiver + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = 1 to 9 + val expectedOutput = input.map(x => x.toString) + Thread.sleep(1000) + for (i <- 0 until input.size) { + testServer.send(input(i).toString) + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(1000) + logInfo("Stopping server") + testServer.stop() + logInfo("Stopping context") + ssc.stop() + + // Verify whether data received was as expected + logInfo("--------------------------------") + logInfo("output.size = " + outputBuffer.size) + logInfo("output") + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + // (whether the elements were received one in each interval is not verified) assert(output.size === expectedOutput.size) for (i <- 0 until output.size) { - assert(output(i).size === 1) - assert(output(i).head.toString === expectedOutput(i)) + assert(output(i) === expectedOutput(i)) } - FileUtils.deleteDirectory(testDir) } } +/** This is server to test the network input stream */ class TestServer(port: Int) extends Logging { val queue = new ArrayBlockingQueue[String](100) @@ -239,3 +302,15 @@ object TestServer { } } } + +class TestActor(port: Int) extends Actor with Receiver { + + def bytesToString(byteString: ByteString) = byteString.utf8String + + override def preStart = IOManager(context.system).connect(new InetSocketAddress(port)) + + def receive = { + case IO.Read(socket, bytes) => + pushBlock(bytesToString(bytes)) + } +} diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala index c2733831b2..ad6aa79d10 100644 --- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala @@ -63,20 +63,25 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu */ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { + // Name of the framework for Spark context def framework = "TestSuiteBase" + // Master for Spark context def master = "local[2]" + // Batch duration def batchDuration = Seconds(1) + // Directory where the checkpoint data will be saved def checkpointDir = "checkpoint" - def checkpointInterval = batchDuration - + // Number of partitions of the input parallel collections created for testing def numInputPartitions = 2 + // Maximum time to wait before the test times out def maxWaitTimeMillis = 10000 + // Whether to actually wait in real time before changing manual clock def actuallyWait = false /** @@ -91,7 +96,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Create StreamingContext val ssc = new StreamingContext(master, framework, batchDuration) if (checkpointDir != null) { - ssc.checkpoint(checkpointDir, checkpointInterval) + ssc.checkpoint(checkpointDir) } // Setup the stream computation @@ -116,7 +121,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Create StreamingContext val ssc = new StreamingContext(master, framework, batchDuration) if (checkpointDir != null) { - ssc.checkpoint(checkpointDir, checkpointInterval) + ssc.checkpoint(checkpointDir) } // Setup the stream computation @@ -140,9 +145,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { numBatches: Int, numExpectedOutput: Int ): Seq[Seq[V]] = { - - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") - assert(numBatches > 0, "Number of batches to run stream computation is zero") assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero") logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput) @@ -186,7 +188,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { } finally { ssc.stop() } - output } diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index cd9608df53..1b66f3bda2 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -5,6 +5,8 @@ import collection.mutable.ArrayBuffer class WindowOperationsSuite extends TestSuiteBase { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + override def framework = "WindowOperationsSuite" override def maxWaitTimeMillis = 20000 @@ -82,12 +84,9 @@ class WindowOperationsSuite extends TestSuiteBase { ) /* - The output of the reduceByKeyAndWindow with inverse reduce function is - different from the naive reduceByKeyAndWindow. Even if the count of a - particular key is 0, the key does not get eliminated from the RDDs of - ReducedWindowedDStream. This causes the number of keys in these RDDs to - increase forever. A more generalized version that allows elimination of - keys should be considered. + The output of the reduceByKeyAndWindow with inverse function but without a filter + function will be different from the naive reduceByKeyAndWindow, as no keys get + eliminated from the ReducedWindowedDStream even if the value of a key becomes 0. */ val bigReduceInvOutput = Seq( @@ -175,31 +174,31 @@ class WindowOperationsSuite extends TestSuiteBase { // Testing reduceByKeyAndWindow (with invertible reduce function) - testReduceByKeyAndWindowInv( + testReduceByKeyAndWindowWithInverse( "basic reduction", Seq(Seq(("a", 1), ("a", 3)) ), Seq(Seq(("a", 4)) ) ) - testReduceByKeyAndWindowInv( + testReduceByKeyAndWindowWithInverse( "key already in window and new value added into window", Seq( Seq(("a", 1)), Seq(("a", 1)) ), Seq( Seq(("a", 1)), Seq(("a", 2)) ) ) - testReduceByKeyAndWindowInv( + testReduceByKeyAndWindowWithInverse( "new key added into window", Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ), Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) ) ) - testReduceByKeyAndWindowInv( + testReduceByKeyAndWindowWithInverse( "key removed from window", Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ), Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) ) ) - testReduceByKeyAndWindowInv( + testReduceByKeyAndWindowWithInverse( "larger slide time", largerSlideInput, largerSlideReduceOutput, @@ -207,7 +206,9 @@ class WindowOperationsSuite extends TestSuiteBase { Seconds(2) ) - testReduceByKeyAndWindowInv("big test", bigInput, bigReduceInvOutput) + testReduceByKeyAndWindowWithInverse("big test", bigInput, bigReduceInvOutput) + + testReduceByKeyAndWindowWithFilteredInverse("big test", bigInput, bigReduceOutput) test("groupByKeyAndWindow") { val input = bigInput @@ -235,14 +236,14 @@ class WindowOperationsSuite extends TestSuiteBase { testOperation(input, operation, expectedOutput, numBatches, true) } - test("countByKeyAndWindow") { - val input = Seq(Seq(("a", 1)), Seq(("b", 1), ("b", 2)), Seq(("a", 10), ("b", 20))) + test("countByValueAndWindow") { + val input = Seq(Seq("a"), Seq("b", "b"), Seq("a", "b")) val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3))) val windowDuration = Seconds(2) val slideDuration = Seconds(1) val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt - val operation = (s: DStream[(String, Int)]) => { - s.countByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toInt)) + val operation = (s: DStream[String]) => { + s.countByValueAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toInt)) } testOperation(input, operation, expectedOutput, numBatches, true) } @@ -272,29 +273,50 @@ class WindowOperationsSuite extends TestSuiteBase { slideDuration: Duration = Seconds(1) ) { test("reduceByKeyAndWindow - " + name) { + logInfo("reduceByKeyAndWindow - " + name) val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { - s.reduceByKeyAndWindow(_ + _, windowDuration, slideDuration).persist() + s.reduceByKeyAndWindow((x: Int, y: Int) => x + y, windowDuration, slideDuration) } testOperation(input, operation, expectedOutput, numBatches, true) } } - def testReduceByKeyAndWindowInv( + def testReduceByKeyAndWindowWithInverse( name: String, input: Seq[Seq[(String, Int)]], expectedOutput: Seq[Seq[(String, Int)]], windowDuration: Duration = Seconds(2), slideDuration: Duration = Seconds(1) ) { - test("reduceByKeyAndWindowInv - " + name) { + test("reduceByKeyAndWindow with inverse function - " + name) { + logInfo("reduceByKeyAndWindow with inverse function - " + name) val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration) - .persist() .checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing } testOperation(input, operation, expectedOutput, numBatches, true) } } + + def testReduceByKeyAndWindowWithFilteredInverse( + name: String, + input: Seq[Seq[(String, Int)]], + expectedOutput: Seq[Seq[(String, Int)]], + windowDuration: Duration = Seconds(2), + slideDuration: Duration = Seconds(1) + ) { + test("reduceByKeyAndWindow with inverse and filter functions - " + name) { + logInfo("reduceByKeyAndWindow with inverse and filter functions - " + name) + val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt + val filterFunc = (p: (String, Int)) => p._2 != 0 + val operation = (s: DStream[(String, Int)]) => { + s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration, filterFunc = filterFunc) + .persist() + .checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing + } + testOperation(input, operation, expectedOutput, numBatches, true) + } + } } |