aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala190
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStream.scala702
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala110
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala167
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Duration.scala83
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Interval.scala59
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Job.scala41
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala88
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala173
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala534
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala131
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala563
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Time.scala72
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala102
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala316
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala613
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala614
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala57
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala36
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala199
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala38
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala37
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala37
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala154
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala45
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala34
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala70
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala141
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala38
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala38
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala37
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala272
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala30
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala59
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala108
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala174
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala44
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala94
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala109
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala36
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala99
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala57
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala57
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala175
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala50
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala101
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala414
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala115
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala77
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala94
50 files changed, 7684 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
new file mode 100644
index 0000000000..2d8f072624
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.io._
+import java.util.concurrent.Executors
+import java.util.concurrent.RejectedExecutionException
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.Logging
+import org.apache.spark.io.CompressionCodec
+
+
+private[streaming]
+class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
+ extends Logging with Serializable {
+ val master = ssc.sc.master
+ val framework = ssc.sc.appName
+ val sparkHome = ssc.sc.sparkHome
+ val jars = ssc.sc.jars
+ val environment = ssc.sc.environment
+ val graph = ssc.graph
+ val checkpointDir = ssc.checkpointDir
+ val checkpointDuration = ssc.checkpointDuration
+ val pendingTimes = ssc.scheduler.jobManager.getPendingTimes()
+
+ def validate() {
+ assert(master != null, "Checkpoint.master is null")
+ assert(framework != null, "Checkpoint.framework is null")
+ assert(graph != null, "Checkpoint.graph is null")
+ assert(checkpointTime != null, "Checkpoint.checkpointTime is null")
+ logInfo("Checkpoint for time " + checkpointTime + " validated")
+ }
+}
+
+
+/**
+ * Convenience class to speed up the writing of graph checkpoint to file
+ */
+private[streaming]
+class CheckpointWriter(checkpointDir: String) extends Logging {
+ val file = new Path(checkpointDir, "graph")
+ // The file to which we actually write - and then "move" to file.
+ private val writeFile = new Path(file.getParent, file.getName + ".next")
+ private val bakFile = new Path(file.getParent, file.getName + ".bk")
+
+ private var stopped = false
+
+ val conf = new Configuration()
+ var fs = file.getFileSystem(conf)
+ val maxAttempts = 3
+ val executor = Executors.newFixedThreadPool(1)
+
+ private val compressionCodec = CompressionCodec.createCodec()
+
+ // Removed code which validates whether there is only one CheckpointWriter per path 'file' since
+ // I did not notice any errors - reintroduce it ?
+
+ class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
+ def run() {
+ var attempts = 0
+ val startTime = System.currentTimeMillis()
+ while (attempts < maxAttempts) {
+ attempts += 1
+ try {
+ logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
+ // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast.
+ val fos = fs.create(writeFile)
+ fos.write(bytes)
+ fos.close()
+ if (fs.exists(file) && fs.rename(file, bakFile)) {
+ logDebug("Moved existing checkpoint file to " + bakFile)
+ }
+ // paranoia
+ fs.delete(file, false)
+ fs.rename(writeFile, file)
+
+ val finishTime = System.currentTimeMillis();
+ logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file +
+ "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds")
+ return
+ } catch {
+ case ioe: IOException =>
+ logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
+ }
+ }
+ logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'")
+ }
+ }
+
+ def write(checkpoint: Checkpoint) {
+ val bos = new ByteArrayOutputStream()
+ val zos = compressionCodec.compressedOutputStream(bos)
+ val oos = new ObjectOutputStream(zos)
+ oos.writeObject(checkpoint)
+ oos.close()
+ bos.close()
+ try {
+ executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
+ } catch {
+ case rej: RejectedExecutionException =>
+ logError("Could not submit checkpoint task to the thread pool executor", rej)
+ }
+ }
+
+ def stop() {
+ synchronized {
+ if (stopped) return ;
+ stopped = true
+ }
+ executor.shutdown()
+ val startTime = System.currentTimeMillis()
+ val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)
+ val endTime = System.currentTimeMillis()
+ logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.")
+ }
+}
+
+
+private[streaming]
+object CheckpointReader extends Logging {
+
+ def read(path: String): Checkpoint = {
+ val fs = new Path(path).getFileSystem(new Configuration())
+ val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
+
+ val compressionCodec = CompressionCodec.createCodec()
+
+ attempts.foreach(file => {
+ if (fs.exists(file)) {
+ logInfo("Attempting to load checkpoint from file '" + file + "'")
+ try {
+ val fis = fs.open(file)
+ // ObjectInputStream uses the last defined user-defined class loader in the stack
+ // to find classes, which maybe the wrong class loader. Hence, a inherited version
+ // of ObjectInputStream is used to explicitly use the current thread's default class
+ // loader to find and load classes. This is a well know Java issue and has popped up
+ // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
+ val zis = compressionCodec.compressedInputStream(fis)
+ val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader)
+ val cp = ois.readObject.asInstanceOf[Checkpoint]
+ ois.close()
+ fs.close()
+ cp.validate()
+ logInfo("Checkpoint successfully loaded from file '" + file + "'")
+ logInfo("Checkpoint was generated at time " + cp.checkpointTime)
+ return cp
+ } catch {
+ case e: Exception =>
+ logError("Error loading checkpoint from file '" + file + "'", e)
+ }
+ } else {
+ logWarning("Could not read checkpoint from file '" + file + "' as it does not exist")
+ }
+
+ })
+ throw new Exception("Could not read checkpoint from path '" + path + "'")
+ }
+}
+
+private[streaming]
+class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader)
+ extends ObjectInputStream(inputStream_) {
+
+ override def resolveClass(desc: ObjectStreamClass): Class[_] = {
+ try {
+ return loader.loadClass(desc.getName())
+ } catch {
+ case e: Exception =>
+ }
+ return super.resolveClass(desc)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
new file mode 100644
index 0000000000..362247cc38
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -0,0 +1,702 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.apache.spark.streaming.dstream._
+import StreamingContext._
+import org.apache.spark.util.MetadataCleaner
+
+//import Time._
+
+import org.apache.spark.{RDD, Logging}
+import org.apache.spark.storage.StorageLevel
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+
+import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.conf.Configuration
+
+/**
+ * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
+ * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]]
+ * for more details on RDDs). DStreams can either be created from live data (such as, data from
+ * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
+ * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
+ * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
+ * by a parent DStream.
+ *
+ * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
+ * `window`. In addition, [[org.apache.spark.streaming.PairDStreamFunctions]] contains operations available
+ * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
+ * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
+ * implicit conversions when `spark.streaming.StreamingContext._` is imported.
+ *
+ * DStreams internally is characterized by a few basic properties:
+ * - A list of other DStreams that the DStream depends on
+ * - A time interval at which the DStream generates an RDD
+ * - A function that is used to generate an RDD after each time interval
+ */
+
+abstract class DStream[T: ClassManifest] (
+ @transient protected[streaming] var ssc: StreamingContext
+ ) extends Serializable with Logging {
+
+ initLogging()
+
+ // =======================================================================
+ // Methods that should be implemented by subclasses of DStream
+ // =======================================================================
+
+ /** Time interval after which the DStream generates a RDD */
+ def slideDuration: Duration
+
+ /** List of parent DStreams on which this DStream depends on */
+ def dependencies: List[DStream[_]]
+
+ /** Method that generates a RDD for the given time */
+ def compute (validTime: Time): Option[RDD[T]]
+
+ // =======================================================================
+ // Methods and fields available on all DStreams
+ // =======================================================================
+
+ // RDDs generated, marked as protected[streaming] so that testsuites can access it
+ @transient
+ protected[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
+
+ // Time zero for the DStream
+ protected[streaming] var zeroTime: Time = null
+
+ // Duration for which the DStream will remember each RDD created
+ protected[streaming] var rememberDuration: Duration = null
+
+ // Storage level of the RDDs in the stream
+ protected[streaming] var storageLevel: StorageLevel = StorageLevel.NONE
+
+ // Checkpoint details
+ protected[streaming] val mustCheckpoint = false
+ protected[streaming] var checkpointDuration: Duration = null
+ protected[streaming] val checkpointData = new DStreamCheckpointData(this)
+
+ // Reference to whole DStream graph
+ protected[streaming] var graph: DStreamGraph = null
+
+ protected[streaming] def isInitialized = (zeroTime != null)
+
+ // Duration for which the DStream requires its parent DStream to remember each RDD created
+ protected[streaming] def parentRememberDuration = rememberDuration
+
+ /** Return the StreamingContext associated with this DStream */
+ def context = ssc
+
+ /** Persist the RDDs of this DStream with the given storage level */
+ def persist(level: StorageLevel): DStream[T] = {
+ if (this.isInitialized) {
+ throw new UnsupportedOperationException(
+ "Cannot change storage level of an DStream after streaming context has started")
+ }
+ this.storageLevel = level
+ this
+ }
+
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
+
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ def cache(): DStream[T] = persist()
+
+ /**
+ * Enable periodic checkpointing of RDDs of this DStream
+ * @param interval Time interval after which generated RDD will be checkpointed
+ */
+ def checkpoint(interval: Duration): DStream[T] = {
+ if (isInitialized) {
+ throw new UnsupportedOperationException(
+ "Cannot change checkpoint interval of an DStream after streaming context has started")
+ }
+ persist()
+ checkpointDuration = interval
+ this
+ }
+
+ /**
+ * Initialize the DStream by setting the "zero" time, based on which
+ * the validity of future times is calculated. This method also recursively initializes
+ * its parent DStreams.
+ */
+ protected[streaming] def initialize(time: Time) {
+ if (zeroTime != null && zeroTime != time) {
+ throw new Exception("ZeroTime is already initialized to " + zeroTime
+ + ", cannot initialize it again to " + time)
+ }
+ zeroTime = time
+
+ // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
+ if (mustCheckpoint && checkpointDuration == null) {
+ checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
+ logInfo("Checkpoint interval automatically set to " + checkpointDuration)
+ }
+
+ // Set the minimum value of the rememberDuration if not already set
+ var minRememberDuration = slideDuration
+ if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
+ minRememberDuration = checkpointDuration * 2 // times 2 just to be sure that the latest checkpoint is not forgetten
+ }
+ if (rememberDuration == null || rememberDuration < minRememberDuration) {
+ rememberDuration = minRememberDuration
+ }
+
+ // Initialize the dependencies
+ dependencies.foreach(_.initialize(zeroTime))
+ }
+
+ protected[streaming] def validate() {
+ assert(rememberDuration != null, "Remember duration is set to null")
+
+ assert(
+ !mustCheckpoint || checkpointDuration != null,
+ "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." +
+ " Please use DStream.checkpoint() to set the interval."
+ )
+
+ assert(
+ checkpointDuration == null || context.sparkContext.checkpointDir.isDefined,
+ "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" +
+ " or SparkContext.checkpoint() to set the checkpoint directory."
+ )
+
+ assert(
+ checkpointDuration == null || checkpointDuration >= slideDuration,
+ "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
+ checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " +
+ "Please set it to at least " + slideDuration + "."
+ )
+
+ assert(
+ checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration),
+ "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
+ checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " +
+ "Please set it to a multiple " + slideDuration + "."
+ )
+
+ assert(
+ checkpointDuration == null || storageLevel != StorageLevel.NONE,
+ "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " +
+ "level has not been set to enable persisting. Please use DStream.persist() to set the " +
+ "storage level to use memory for better checkpointing performance."
+ )
+
+ assert(
+ checkpointDuration == null || rememberDuration > checkpointDuration,
+ "The remember duration for " + this.getClass.getSimpleName + " has been set to " +
+ rememberDuration + " which is not more than the checkpoint interval (" +
+ checkpointDuration + "). Please set it to higher than " + checkpointDuration + "."
+ )
+
+ val metadataCleanerDelay = MetadataCleaner.getDelaySeconds
+ logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
+ assert(
+ metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
+ "It seems you are doing some DStream window operation or setting a checkpoint interval " +
+ "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
+ "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" +
+ "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " +
+ "set the Java property 'spark.cleaner.delay' to more than " +
+ math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
+ )
+
+ dependencies.foreach(_.validate())
+
+ logInfo("Slide time = " + slideDuration)
+ logInfo("Storage level = " + storageLevel)
+ logInfo("Checkpoint interval = " + checkpointDuration)
+ logInfo("Remember duration = " + rememberDuration)
+ logInfo("Initialized and validated " + this)
+ }
+
+ protected[streaming] def setContext(s: StreamingContext) {
+ if (ssc != null && ssc != s) {
+ throw new Exception("Context is already set in " + this + ", cannot set it again")
+ }
+ ssc = s
+ logInfo("Set context for " + this)
+ dependencies.foreach(_.setContext(ssc))
+ }
+
+ protected[streaming] def setGraph(g: DStreamGraph) {
+ if (graph != null && graph != g) {
+ throw new Exception("Graph is already set in " + this + ", cannot set it again")
+ }
+ graph = g
+ dependencies.foreach(_.setGraph(graph))
+ }
+
+ protected[streaming] def remember(duration: Duration) {
+ if (duration != null && duration > rememberDuration) {
+ rememberDuration = duration
+ logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
+ }
+ dependencies.foreach(_.remember(parentRememberDuration))
+ }
+
+ /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
+ protected def isTimeValid(time: Time): Boolean = {
+ if (!isInitialized) {
+ throw new Exception (this + " has not been initialized")
+ } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
+ logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
+ false
+ } else {
+ logInfo("Time " + time + " is valid")
+ true
+ }
+ }
+
+ /**
+ * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
+ * method that should not be called directly.
+ */
+ protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
+ // If this DStream was not initialized (i.e., zeroTime not set), then do it
+ // If RDD was already generated, then retrieve it from HashMap
+ generatedRDDs.get(time) match {
+
+ // If an RDD was already generated and is being reused, then
+ // probably all RDDs in this DStream will be reused and hence should be cached
+ case Some(oldRDD) => Some(oldRDD)
+
+ // if RDD was not generated, and if the time is valid
+ // (based on sliding time of this DStream), then generate the RDD
+ case None => {
+ if (isTimeValid(time)) {
+ compute(time) match {
+ case Some(newRDD) =>
+ if (storageLevel != StorageLevel.NONE) {
+ newRDD.persist(storageLevel)
+ logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time)
+ }
+ if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
+ newRDD.checkpoint()
+ logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time)
+ }
+ generatedRDDs.put(time, newRDD)
+ Some(newRDD)
+ case None =>
+ None
+ }
+ } else {
+ None
+ }
+ }
+ }
+ }
+
+ /**
+ * Generate a SparkStreaming job for the given time. This is an internal method that
+ * should not be called directly. This default implementation creates a job
+ * that materializes the corresponding RDD. Subclasses of DStream may override this
+ * to generate their own jobs.
+ */
+ protected[streaming] def generateJob(time: Time): Option[Job] = {
+ getOrCompute(time) match {
+ case Some(rdd) => {
+ val jobFunc = () => {
+ val emptyFunc = { (iterator: Iterator[T]) => {} }
+ context.sparkContext.runJob(rdd, emptyFunc)
+ }
+ Some(new Job(time, jobFunc))
+ }
+ case None => None
+ }
+ }
+
+ /**
+ * Clear metadata that are older than `rememberDuration` of this DStream.
+ * This is an internal method that should not be called directly. This default
+ * implementation clears the old generated RDDs. Subclasses of DStream may override
+ * this to clear their own metadata along with the generated RDDs.
+ */
+ protected[streaming] def clearOldMetadata(time: Time) {
+ var numForgotten = 0
+ val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
+ generatedRDDs --= oldRDDs.keys
+ logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " +
+ (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
+ dependencies.foreach(_.clearOldMetadata(time))
+ }
+
+ /* Adds metadata to the Stream while it is running.
+ * This methd should be overwritten by sublcasses of InputDStream.
+ */
+ protected[streaming] def addMetadata(metadata: Any) {
+ if (metadata != null) {
+ logInfo("Dropping Metadata: " + metadata.toString)
+ }
+ }
+
+ /**
+ * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
+ * this stream. This is an internal method that should not be called directly. This is
+ * a default implementation that saves only the file names of the checkpointed RDDs to
+ * checkpointData. Subclasses of DStream (especially those of InputDStream) may override
+ * this method to save custom checkpoint data.
+ */
+ protected[streaming] def updateCheckpointData(currentTime: Time) {
+ logInfo("Updating checkpoint data for time " + currentTime)
+ checkpointData.update()
+ dependencies.foreach(_.updateCheckpointData(currentTime))
+ checkpointData.cleanup()
+ logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
+ }
+
+ /**
+ * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method
+ * that should not be called directly. This is a default implementation that recreates RDDs
+ * from the checkpoint file names stored in checkpointData. Subclasses of DStream that
+ * override the updateCheckpointData() method would also need to override this method.
+ */
+ protected[streaming] def restoreCheckpointData() {
+ // Create RDDs from the checkpoint data
+ logInfo("Restoring checkpoint data")
+ checkpointData.restore()
+ dependencies.foreach(_.restoreCheckpointData())
+ logInfo("Restored checkpoint data")
+ }
+
+ @throws(classOf[IOException])
+ private def writeObject(oos: ObjectOutputStream) {
+ logDebug(this.getClass().getSimpleName + ".writeObject used")
+ if (graph != null) {
+ graph.synchronized {
+ if (graph.checkpointInProgress) {
+ oos.defaultWriteObject()
+ } else {
+ val msg = "Object of " + this.getClass.getName + " is being serialized " +
+ " possibly as a part of closure of an RDD operation. This is because " +
+ " the DStream object is being referred to from within the closure. " +
+ " Please rewrite the RDD operation inside this DStream to avoid this. " +
+ " This has been enforced to avoid bloating of Spark tasks " +
+ " with unnecessary objects."
+ throw new java.io.NotSerializableException(msg)
+ }
+ }
+ } else {
+ throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.")
+ }
+ }
+
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream) {
+ logDebug(this.getClass().getSimpleName + ".readObject used")
+ ois.defaultReadObject()
+ generatedRDDs = new HashMap[Time, RDD[T]] ()
+ }
+
+ // =======================================================================
+ // DStream operations
+ // =======================================================================
+
+ /** Return a new DStream by applying a function to all elements of this DStream. */
+ def map[U: ClassManifest](mapFunc: T => U): DStream[U] = {
+ new MappedDStream(this, context.sparkContext.clean(mapFunc))
+ }
+
+ /**
+ * Return a new DStream by applying a function to all elements of this DStream,
+ * and then flattening the results
+ */
+ def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = {
+ new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
+ }
+
+ /** Return a new DStream containing only the elements that satisfy a predicate. */
+ def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying glom() to each RDD of
+ * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
+ * an array.
+ */
+ def glom(): DStream[Array[T]] = new GlommedDStream(this)
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
+ * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
+ * of the RDD.
+ */
+ def mapPartitions[U: ClassManifest](
+ mapPartFunc: Iterator[T] => Iterator[U],
+ preservePartitioning: Boolean = false
+ ): DStream[U] = {
+ new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning)
+ }
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing each RDD
+ * of this DStream.
+ */
+ def reduce(reduceFunc: (T, T) => T): DStream[T] =
+ this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by counting each RDD
+ * of this DStream.
+ */
+ def count(): DStream[Long] = {
+ this.map(_ => (null, 1L))
+ .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
+ .reduceByKey(_ + _)
+ .map(_._2)
+ }
+
+ /**
+ * Return a new DStream in which each RDD contains the counts of each distinct value in
+ * each RDD of this DStream. Hash partitioning is used to generate
+ * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
+ * `numPartitions` not specified).
+ */
+ def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] =
+ this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * this DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreach(foreachFunc: RDD[T] => Unit) {
+ this.foreach((r: RDD[T], t: Time) => foreachFunc(r))
+ }
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * this DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreach(foreachFunc: (RDD[T], Time) => Unit) {
+ val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
+ ssc.registerOutputStream(newStream)
+ newStream
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
+ transform((r: RDD[T], t: Time) => transformFunc(r))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
+ new TransformedDStream(this, context.sparkContext.clean(transformFunc))
+ }
+
+ /**
+ * Print the first ten elements of each RDD generated in this DStream. This is an output
+ * operator, so this DStream will be registered as an output stream and there materialized.
+ */
+ def print() {
+ def foreachFunc = (rdd: RDD[T], time: Time) => {
+ val first11 = rdd.take(11)
+ println ("-------------------------------------------")
+ println ("Time: " + time)
+ println ("-------------------------------------------")
+ first11.take(10).foreach(println)
+ if (first11.size > 10) println("...")
+ println()
+ }
+ val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
+ ssc.registerOutputStream(newStream)
+ }
+
+ /**
+ * Return a new DStream in which each RDD contains all the elements in seen in a
+ * sliding window of time over this DStream. The new DStream generates RDDs with
+ * the same interval as this DStream.
+ * @param windowDuration width of the window; must be a multiple of this DStream's interval.
+ */
+ def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
+
+ /**
+ * Return a new DStream in which each RDD contains all the elements in seen in a
+ * sliding window of time over this DStream.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {
+ new WindowedDStream(this, windowDuration, slideDuration)
+ }
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a sliding window over this DStream.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def reduceByWindow(
+ reduceFunc: (T, T) => T,
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): DStream[T] = {
+ this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
+ }
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a sliding window over this DStream. However, the reduction is done incrementally
+ * using the old window's reduced value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient than reduceByWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def reduceByWindow(
+ reduceFunc: (T, T) => T,
+ invReduceFunc: (T, T) => T,
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): DStream[T] = {
+ this.map(x => (1, x))
+ .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
+ .map(_._2)
+ }
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by counting the number
+ * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = {
+ this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
+ }
+
+ /**
+ * Return a new DStream in which each RDD contains the count of distinct elements in
+ * RDDs in a sliding window over this DStream. Hash partitioning is used to generate
+ * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
+ * `numPartitions` not specified).
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ */
+ def countByValueAndWindow(
+ windowDuration: Duration,
+ slideDuration: Duration,
+ numPartitions: Int = ssc.sc.defaultParallelism
+ ): DStream[(T, Long)] = {
+
+ this.map(x => (x, 1L)).reduceByKeyAndWindow(
+ (x: Long, y: Long) => x + y,
+ (x: Long, y: Long) => x - y,
+ windowDuration,
+ slideDuration,
+ numPartitions,
+ (x: (T, Long)) => x._2 != 0L
+ )
+ }
+
+ /**
+ * Return a new DStream by unifying data of another DStream with this DStream.
+ * @param that Another DStream having the same slideDuration as this DStream.
+ */
+ def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
+
+ /**
+ * Return all the RDDs defined by the Interval object (both end times included)
+ */
+ protected[streaming] def slice(interval: Interval): Seq[RDD[T]] = {
+ slice(interval.beginTime, interval.endTime)
+ }
+
+ /**
+ * Return all the RDDs between 'fromTime' to 'toTime' (both included)
+ */
+ def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
+ if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
+ logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
+ }
+ if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
+ logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
+ }
+ val alignedToTime = toTime.floor(slideDuration)
+ val alignedFromTime = fromTime.floor(slideDuration)
+
+ logInfo("Slicing from " + fromTime + " to " + toTime +
+ " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
+
+ alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
+ if (time >= zeroTime) getOrCompute(time) else None
+ })
+ }
+
+ /**
+ * Save each RDD in this DStream as a Sequence file of serialized objects.
+ * The file name at each batch interval is generated based on `prefix` and
+ * `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsObjectFiles(prefix: String, suffix: String = "") {
+ val saveFunc = (rdd: RDD[T], time: Time) => {
+ val file = rddToFileName(prefix, suffix, time)
+ rdd.saveAsObjectFile(file)
+ }
+ this.foreach(saveFunc)
+ }
+
+ /**
+ * Save each RDD in this DStream as at text file, using string representation
+ * of elements. The file name at each batch interval is generated based on
+ * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsTextFiles(prefix: String, suffix: String = "") {
+ val saveFunc = (rdd: RDD[T], time: Time) => {
+ val file = rddToFileName(prefix, suffix, time)
+ rdd.saveAsTextFile(file)
+ }
+ this.foreach(saveFunc)
+ }
+
+ def register() {
+ ssc.registerOutputStream(this)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
new file mode 100644
index 0000000000..58a0da2870
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.conf.Configuration
+import collection.mutable.HashMap
+import org.apache.spark.Logging
+
+
+
+private[streaming]
+class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
+ extends Serializable with Logging {
+ protected val data = new HashMap[Time, AnyRef]()
+
+ @transient private var fileSystem : FileSystem = null
+ @transient private var lastCheckpointFiles: HashMap[Time, String] = null
+
+ protected[streaming] def checkpointFiles = data.asInstanceOf[HashMap[Time, String]]
+
+ /**
+ * Updates the checkpoint data of the DStream. This gets called every time
+ * the graph checkpoint is initiated. Default implementation records the
+ * checkpoint files to which the generate RDDs of the DStream has been saved.
+ */
+ def update() {
+
+ // Get the checkpointed RDDs from the generated RDDs
+ val newCheckpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
+ .map(x => (x._1, x._2.getCheckpointFile.get))
+
+ // Make a copy of the existing checkpoint data (checkpointed RDDs)
+ lastCheckpointFiles = checkpointFiles.clone()
+
+ // If the new checkpoint data has checkpoints then replace existing with the new one
+ if (newCheckpointFiles.size > 0) {
+ checkpointFiles.clear()
+ checkpointFiles ++= newCheckpointFiles
+ }
+
+ // TODO: remove this, this is just for debugging
+ newCheckpointFiles.foreach {
+ case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") }
+ }
+ }
+
+ /**
+ * Cleanup old checkpoint data. This gets called every time the graph
+ * checkpoint is initiated, but after `update` is called. Default
+ * implementation, cleans up old checkpoint files.
+ */
+ def cleanup() {
+ // If there is at least on checkpoint file in the current checkpoint files,
+ // then delete the old checkpoint files.
+ if (checkpointFiles.size > 0 && lastCheckpointFiles != null) {
+ (lastCheckpointFiles -- checkpointFiles.keySet).foreach {
+ case (time, file) => {
+ try {
+ val path = new Path(file)
+ if (fileSystem == null) {
+ fileSystem = path.getFileSystem(new Configuration())
+ }
+ fileSystem.delete(path, true)
+ logInfo("Deleted checkpoint file '" + file + "' for time " + time)
+ } catch {
+ case e: Exception =>
+ logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e)
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Restore the checkpoint data. This gets called once when the DStream graph
+ * (along with its DStreams) are being restored from a graph checkpoint file.
+ * Default implementation restores the RDDs from their checkpoint files.
+ */
+ def restore() {
+ // Create RDDs from the checkpoint data
+ checkpointFiles.foreach {
+ case(time, file) => {
+ logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'")
+ dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file)))
+ }
+ }
+ }
+
+ override def toString() = {
+ "[\n" + checkpointFiles.size + " checkpoint files \n" + checkpointFiles.mkString("\n") + "\n]"
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
new file mode 100644
index 0000000000..b9a58fded6
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import dstream.InputDStream
+import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
+import collection.mutable.ArrayBuffer
+import org.apache.spark.Logging
+
+final private[streaming] class DStreamGraph extends Serializable with Logging {
+ initLogging()
+
+ private val inputStreams = new ArrayBuffer[InputDStream[_]]()
+ private val outputStreams = new ArrayBuffer[DStream[_]]()
+
+ var rememberDuration: Duration = null
+ var checkpointInProgress = false
+
+ var zeroTime: Time = null
+ var startTime: Time = null
+ var batchDuration: Duration = null
+
+ def start(time: Time) {
+ this.synchronized {
+ if (zeroTime != null) {
+ throw new Exception("DStream graph computation already started")
+ }
+ zeroTime = time
+ startTime = time
+ outputStreams.foreach(_.initialize(zeroTime))
+ outputStreams.foreach(_.remember(rememberDuration))
+ outputStreams.foreach(_.validate)
+ inputStreams.par.foreach(_.start())
+ }
+ }
+
+ def restart(time: Time) {
+ this.synchronized { startTime = time }
+ }
+
+ def stop() {
+ this.synchronized {
+ inputStreams.par.foreach(_.stop())
+ }
+ }
+
+ def setContext(ssc: StreamingContext) {
+ this.synchronized {
+ outputStreams.foreach(_.setContext(ssc))
+ }
+ }
+
+ def setBatchDuration(duration: Duration) {
+ this.synchronized {
+ if (batchDuration != null) {
+ throw new Exception("Batch duration already set as " + batchDuration +
+ ". cannot set it again.")
+ }
+ batchDuration = duration
+ }
+ }
+
+ def remember(duration: Duration) {
+ this.synchronized {
+ if (rememberDuration != null) {
+ throw new Exception("Batch duration already set as " + batchDuration +
+ ". cannot set it again.")
+ }
+ rememberDuration = duration
+ }
+ }
+
+ def addInputStream(inputStream: InputDStream[_]) {
+ this.synchronized {
+ inputStream.setGraph(this)
+ inputStreams += inputStream
+ }
+ }
+
+ def addOutputStream(outputStream: DStream[_]) {
+ this.synchronized {
+ outputStream.setGraph(this)
+ outputStreams += outputStream
+ }
+ }
+
+ def getInputStreams() = this.synchronized { inputStreams.toArray }
+
+ def getOutputStreams() = this.synchronized { outputStreams.toArray }
+
+ def generateJobs(time: Time): Seq[Job] = {
+ this.synchronized {
+ logInfo("Generating jobs for time " + time)
+ val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time))
+ logInfo("Generated " + jobs.length + " jobs for time " + time)
+ jobs
+ }
+ }
+
+ def clearOldMetadata(time: Time) {
+ this.synchronized {
+ logInfo("Clearing old metadata for time " + time)
+ outputStreams.foreach(_.clearOldMetadata(time))
+ logInfo("Cleared old metadata for time " + time)
+ }
+ }
+
+ def updateCheckpointData(time: Time) {
+ this.synchronized {
+ logInfo("Updating checkpoint data for time " + time)
+ outputStreams.foreach(_.updateCheckpointData(time))
+ logInfo("Updated checkpoint data for time " + time)
+ }
+ }
+
+ def restoreCheckpointData() {
+ this.synchronized {
+ logInfo("Restoring checkpoint data")
+ outputStreams.foreach(_.restoreCheckpointData())
+ logInfo("Restored checkpoint data")
+ }
+ }
+
+ def validate() {
+ this.synchronized {
+ assert(batchDuration != null, "Batch duration has not been set")
+ //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low")
+ assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute")
+ }
+ }
+
+ @throws(classOf[IOException])
+ private def writeObject(oos: ObjectOutputStream) {
+ this.synchronized {
+ logDebug("DStreamGraph.writeObject used")
+ checkpointInProgress = true
+ oos.defaultWriteObject()
+ checkpointInProgress = false
+ }
+ }
+
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream) {
+ this.synchronized {
+ logDebug("DStreamGraph.readObject used")
+ checkpointInProgress = true
+ ois.defaultReadObject()
+ checkpointInProgress = false
+ }
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala b/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala
new file mode 100644
index 0000000000..290ad37812
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.apache.spark.Utils
+
+case class Duration (private val millis: Long) {
+
+ def < (that: Duration): Boolean = (this.millis < that.millis)
+
+ def <= (that: Duration): Boolean = (this.millis <= that.millis)
+
+ def > (that: Duration): Boolean = (this.millis > that.millis)
+
+ def >= (that: Duration): Boolean = (this.millis >= that.millis)
+
+ def + (that: Duration): Duration = new Duration(millis + that.millis)
+
+ def - (that: Duration): Duration = new Duration(millis - that.millis)
+
+ def * (times: Int): Duration = new Duration(millis * times)
+
+ def / (that: Duration): Double = millis.toDouble / that.millis.toDouble
+
+ def isMultipleOf(that: Duration): Boolean =
+ (this.millis % that.millis == 0)
+
+ def min(that: Duration): Duration = if (this < that) this else that
+
+ def max(that: Duration): Duration = if (this > that) this else that
+
+ def isZero: Boolean = (this.millis == 0)
+
+ override def toString: String = (millis.toString + " ms")
+
+ def toFormattedString: String = millis.toString
+
+ def milliseconds: Long = millis
+
+ def prettyPrint = Utils.msDurationToString(millis)
+
+}
+
+/**
+ * Helper object that creates instance of [[org.apache.spark.streaming.Duration]] representing
+ * a given number of milliseconds.
+ */
+object Milliseconds {
+ def apply(milliseconds: Long) = new Duration(milliseconds)
+}
+
+/**
+ * Helper object that creates instance of [[org.apache.spark.streaming.Duration]] representing
+ * a given number of seconds.
+ */
+object Seconds {
+ def apply(seconds: Long) = new Duration(seconds * 1000)
+}
+
+/**
+ * Helper object that creates instance of [[org.apache.spark.streaming.Duration]] representing
+ * a given number of minutes.
+ */
+object Minutes {
+ def apply(minutes: Long) = new Duration(minutes * 60000)
+}
+
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
new file mode 100644
index 0000000000..04c994c136
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+private[streaming]
+class Interval(val beginTime: Time, val endTime: Time) {
+ def this(beginMs: Long, endMs: Long) = this(new Time(beginMs), new Time(endMs))
+
+ def duration(): Duration = endTime - beginTime
+
+ def + (time: Duration): Interval = {
+ new Interval(beginTime + time, endTime + time)
+ }
+
+ def - (time: Duration): Interval = {
+ new Interval(beginTime - time, endTime - time)
+ }
+
+ def < (that: Interval): Boolean = {
+ if (this.duration != that.duration) {
+ throw new Exception("Comparing two intervals with different durations [" + this + ", " + that + "]")
+ }
+ this.endTime < that.endTime
+ }
+
+ def <= (that: Interval) = (this < that || this == that)
+
+ def > (that: Interval) = !(this <= that)
+
+ def >= (that: Interval) = !(this < that)
+
+ override def toString = "[" + beginTime + ", " + endTime + "]"
+}
+
+private[streaming]
+object Interval {
+ def currentInterval(duration: Duration): Interval = {
+ val time = new Time(System.currentTimeMillis)
+ val intervalBegin = time.floor(duration)
+ new Interval(intervalBegin, intervalBegin + duration)
+ }
+}
+
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/Job.scala
new file mode 100644
index 0000000000..2128b7c7a6
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Job.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.util.concurrent.atomic.AtomicLong
+
+private[streaming]
+class Job(val time: Time, func: () => _) {
+ val id = Job.getNewId()
+ def run(): Long = {
+ val startTime = System.currentTimeMillis
+ func()
+ val stopTime = System.currentTimeMillis
+ (stopTime - startTime)
+ }
+
+ override def toString = "streaming job " + id + " @ " + time
+}
+
+private[streaming]
+object Job {
+ val id = new AtomicLong(0)
+
+ def getNewId() = id.getAndIncrement()
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala
new file mode 100644
index 0000000000..5233129506
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.apache.spark.Logging
+import org.apache.spark.SparkEnv
+import java.util.concurrent.Executors
+import collection.mutable.HashMap
+import collection.mutable.ArrayBuffer
+
+
+private[streaming]
+class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
+
+ class JobHandler(ssc: StreamingContext, job: Job) extends Runnable {
+ def run() {
+ SparkEnv.set(ssc.env)
+ try {
+ val timeTaken = job.run()
+ logInfo("Total delay: %.5f s for job %s of time %s (execution: %.5f s)".format(
+ (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, job.time.milliseconds, timeTaken / 1000.0))
+ } catch {
+ case e: Exception =>
+ logError("Running " + job + " failed", e)
+ }
+ clearJob(job)
+ }
+ }
+
+ initLogging()
+
+ val jobExecutor = Executors.newFixedThreadPool(numThreads)
+ val jobs = new HashMap[Time, ArrayBuffer[Job]]
+
+ def runJob(job: Job) {
+ jobs.synchronized {
+ jobs.getOrElseUpdate(job.time, new ArrayBuffer[Job]) += job
+ }
+ jobExecutor.execute(new JobHandler(ssc, job))
+ logInfo("Added " + job + " to queue")
+ }
+
+ def stop() {
+ jobExecutor.shutdown()
+ }
+
+ private def clearJob(job: Job) {
+ var timeCleared = false
+ val time = job.time
+ jobs.synchronized {
+ val jobsOfTime = jobs.get(time)
+ if (jobsOfTime.isDefined) {
+ jobsOfTime.get -= job
+ if (jobsOfTime.get.isEmpty) {
+ jobs -= time
+ timeCleared = true
+ }
+ } else {
+ throw new Exception("Job finished for time " + job.time +
+ " but time does not exist in jobs")
+ }
+ }
+ if (timeCleared) {
+ ssc.scheduler.clearOldMetadata(time)
+ }
+ }
+
+ def getPendingTimes(): Array[Time] = {
+ jobs.synchronized {
+ jobs.keySet.toArray
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
new file mode 100644
index 0000000000..aae79a4e6f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
+import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
+import org.apache.spark.Logging
+import org.apache.spark.SparkEnv
+import org.apache.spark.SparkContext._
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Queue
+
+import akka.actor._
+import akka.pattern.ask
+import akka.util.duration._
+import akka.dispatch._
+
+private[streaming] sealed trait NetworkInputTrackerMessage
+private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
+private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage
+private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
+
+/**
+ * This class manages the execution of the receivers of NetworkInputDStreams.
+ */
+private[streaming]
+class NetworkInputTracker(
+ @transient ssc: StreamingContext,
+ @transient networkInputStreams: Array[NetworkInputDStream[_]])
+ extends Logging {
+
+ val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*)
+ val receiverExecutor = new ReceiverExecutor()
+ val receiverInfo = new HashMap[Int, ActorRef]
+ val receivedBlockIds = new HashMap[Int, Queue[String]]
+ val timeout = 5000.milliseconds
+
+ var currentTime: Time = null
+
+ /** Start the actor and receiver execution thread. */
+ def start() {
+ ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker")
+ receiverExecutor.start()
+ }
+
+ /** Stop the receiver execution thread. */
+ def stop() {
+ // TODO: stop the actor as well
+ receiverExecutor.interrupt()
+ receiverExecutor.stopReceivers()
+ }
+
+ /** Return all the blocks received from a receiver. */
+ def getBlockIds(receiverId: Int, time: Time): Array[String] = synchronized {
+ val queue = receivedBlockIds.synchronized {
+ receivedBlockIds.getOrElse(receiverId, new Queue[String]())
+ }
+ val result = queue.synchronized {
+ queue.dequeueAll(x => true)
+ }
+ logInfo("Stream " + receiverId + " received " + result.size + " blocks")
+ result.toArray
+ }
+
+ /** Actor to receive messages from the receivers. */
+ private class NetworkInputTrackerActor extends Actor {
+ def receive = {
+ case RegisterReceiver(streamId, receiverActor) => {
+ if (!networkInputStreamMap.contains(streamId)) {
+ throw new Exception("Register received for unexpected id " + streamId)
+ }
+ receiverInfo += ((streamId, receiverActor))
+ logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
+ sender ! true
+ }
+ case AddBlocks(streamId, blockIds, metadata) => {
+ val tmp = receivedBlockIds.synchronized {
+ if (!receivedBlockIds.contains(streamId)) {
+ receivedBlockIds += ((streamId, new Queue[String]))
+ }
+ receivedBlockIds(streamId)
+ }
+ tmp.synchronized {
+ tmp ++= blockIds
+ }
+ networkInputStreamMap(streamId).addMetadata(metadata)
+ }
+ case DeregisterReceiver(streamId, msg) => {
+ receiverInfo -= streamId
+ logError("De-registered receiver for network stream " + streamId
+ + " with message " + msg)
+ //TODO: Do something about the corresponding NetworkInputDStream
+ }
+ }
+ }
+
+ /** This thread class runs all the receivers on the cluster. */
+ class ReceiverExecutor extends Thread {
+ val env = ssc.env
+
+ override def run() {
+ try {
+ SparkEnv.set(env)
+ startReceivers()
+ } catch {
+ case ie: InterruptedException => logInfo("ReceiverExecutor interrupted")
+ } finally {
+ stopReceivers()
+ }
+ }
+
+ /**
+ * Get the receivers from the NetworkInputDStreams, distributes them to the
+ * worker nodes as a parallel collection, and runs them.
+ */
+ def startReceivers() {
+ val receivers = networkInputStreams.map(nis => {
+ val rcvr = nis.getReceiver()
+ rcvr.setStreamId(nis.id)
+ rcvr
+ })
+
+ // Right now, we only honor preferences if all receivers have them
+ val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _)
+
+ // Create the parallel collection of receivers to distributed them on the worker nodes
+ val tempRDD =
+ if (hasLocationPreferences) {
+ val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
+ ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences)
+ }
+ else {
+ ssc.sc.makeRDD(receivers, receivers.size)
+ }
+
+ // Function to start the receiver on the worker node
+ val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => {
+ if (!iterator.hasNext) {
+ throw new Exception("Could not start receiver as details not found.")
+ }
+ iterator.next().start()
+ }
+ // Run the dummy Spark job to ensure that all slaves have registered.
+ // This avoids all the receivers to be scheduled on the same node.
+ ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
+
+ // Distribute the receivers and start them
+ ssc.sparkContext.runJob(tempRDD, startReceiver)
+ }
+
+ /** Stops the receivers. */
+ def stopReceivers() {
+ // Signal the receivers to stop
+ receiverInfo.values.foreach(_ ! StopReceiver)
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
new file mode 100644
index 0000000000..d8a7381e87
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.dstream.{ReducedWindowedDStream, StateDStream}
+import org.apache.spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream}
+import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream}
+
+import org.apache.spark.{Manifests, RDD, Partitioner, HashPartitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.storage.StorageLevel
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
+import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.conf.Configuration
+
+class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)])
+extends Serializable {
+
+ private[streaming] def ssc = self.ssc
+
+ private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
+ new HashPartitioner(numPartitions)
+ }
+
+ /**
+ * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
+ */
+ def groupByKey(): DStream[(K, Seq[V])] = {
+ groupByKey(defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * generate the RDDs with `numPartitions` partitions.
+ */
+ def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
+ groupByKey(defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]]
+ * is used to control the partitioning of each RDD.
+ */
+ def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
+ val createCombiner = (v: V) => ArrayBuffer[V](v)
+ val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
+ val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
+ combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
+ .asInstanceOf[DStream[(K, Seq[V])]]
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
+ * with Spark's default number of partitions.
+ */
+ def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
+ reduceByKey(reduceFunc, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
+ * with `numPartitions` partitions.
+ */
+ def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
+ reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the
+ * partitioning of each RDD.
+ */
+ def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
+ val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
+ combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
+ }
+
+ /**
+ * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the
+ * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more
+ * information.
+ */
+ def combineByKey[C: ClassManifest](
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiner: (C, C) => C,
+ partitioner: Partitioner) : DStream[(K, C)] = {
+ new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner)
+ }
+
+ /**
+ * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
+ * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
+ * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ */
+ def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = {
+ groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying `groupByKey` over a sliding window. Similar to
+ * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = {
+ groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions number of partitions of each RDD in the new DStream; if not specified
+ * then Spark's default number of partitions will be used
+ */
+ def groupByKeyAndWindow(
+ windowDuration: Duration,
+ slideDuration: Duration,
+ numPartitions: Int
+ ): DStream[(K, Seq[V])] = {
+ groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
+ */
+ def groupByKeyAndWindow(
+ windowDuration: Duration,
+ slideDuration: Duration,
+ partitioner: Partitioner
+ ): DStream[(K, Seq[V])] = {
+ self.window(windowDuration, slideDuration).groupByKey(partitioner)
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
+ * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
+ * the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ windowDuration: Duration
+ ): DStream[(K, V)] = {
+ reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): DStream[(K, V)] = {
+ reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with `numPartitions` partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ windowDuration: Duration,
+ slideDuration: Duration,
+ numPartitions: Int
+ ): DStream[(K, V)] = {
+ reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to
+ * `DStream.reduceByKey()`, but applies it over a sliding window.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner partitioner for controlling the partitioning of each RDD
+ * in the new DStream.
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ windowDuration: Duration,
+ slideDuration: Duration,
+ partitioner: Partitioner
+ ): DStream[(K, V)] = {
+ val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
+ self.reduceByKey(cleanedReduceFunc, partitioner)
+ .window(windowDuration, slideDuration)
+ .reduceByKey(cleanedReduceFunc, partitioner)
+ }
+
+ /**
+ * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
+ * The reduced value of over a new window is calculated using the old window's reduced value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ *
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ *
+ * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param filterFunc Optional function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ invReduceFunc: (V, V) => V,
+ windowDuration: Duration,
+ slideDuration: Duration = self.slideDuration,
+ numPartitions: Int = ssc.sc.defaultParallelism,
+ filterFunc: ((K, V)) => Boolean = null
+ ): DStream[(K, V)] = {
+
+ reduceByKeyAndWindow(
+ reduceFunc, invReduceFunc, windowDuration,
+ slideDuration, defaultPartitioner(numPartitions), filterFunc
+ )
+ }
+
+ /**
+ * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
+ * The reduced value of over a new window is calculated using the old window's reduced value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param filterFunc Optional function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ invReduceFunc: (V, V) => V,
+ windowDuration: Duration,
+ slideDuration: Duration,
+ partitioner: Partitioner,
+ filterFunc: ((K, V)) => Boolean
+ ): DStream[(K, V)] = {
+
+ val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
+ val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
+ val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
+ new ReducedWindowedDStream[K, V](
+ self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
+ windowDuration, slideDuration, partitioner
+ )
+ }
+
+ /**
+ * Return a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of each key.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @tparam S State type
+ */
+ def updateStateByKey[S: ClassManifest](
+ updateFunc: (Seq[V], Option[S]) => Option[S]
+ ): DStream[(K, S)] = {
+ updateStateByKey(updateFunc, defaultPartitioner())
+ }
+
+ /**
+ * Return a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of each key.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @tparam S State type
+ */
+ def updateStateByKey[S: ClassManifest](
+ updateFunc: (Seq[V], Option[S]) => Option[S],
+ numPartitions: Int
+ ): DStream[(K, S)] = {
+ updateStateByKey(updateFunc, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Create a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of the key.
+ * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @tparam S State type
+ */
+ def updateStateByKey[S: ClassManifest](
+ updateFunc: (Seq[V], Option[S]) => Option[S],
+ partitioner: Partitioner
+ ): DStream[(K, S)] = {
+ val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
+ iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
+ }
+ updateStateByKey(newUpdateFunc, partitioner, true)
+ }
+
+ /**
+ * Return a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of each key.
+ * [[org.apache.spark.Paxrtitioner]] is used to control the partitioning of each RDD.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated. Note, that
+ * this function may generate a different a tuple with a different key
+ * than the input key. It is up to the developer to decide whether to
+ * remember the partitioner despite the key being changed.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
+ * @tparam S State type
+ */
+ def updateStateByKey[S: ClassManifest](
+ updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
+ partitioner: Partitioner,
+ rememberPartitioner: Boolean
+ ): DStream[(K, S)] = {
+ new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)
+ }
+
+
+ def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = {
+ new MapValuedDStream[K, V, U](self, mapValuesFunc)
+ }
+
+ def flatMapValues[U: ClassManifest](
+ flatMapValuesFunc: V => TraversableOnce[U]
+ ): DStream[(K, U)] = {
+ new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
+ }
+
+ /**
+ * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
+ * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
+ * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
+ * of partitions.
+ */
+ def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
+ cogroup(other, defaultPartitioner())
+ }
+
+ /**
+ * Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this`
+ * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
+ * key in both RDDs. Partitioner is used to partition each generated RDD.
+ */
+ def cogroup[W: ClassManifest](
+ other: DStream[(K, W)],
+ partitioner: Partitioner
+ ): DStream[(K, (Seq[V], Seq[W]))] = {
+
+ val cgd = new CoGroupedDStream[K](
+ Seq(self.asInstanceOf[DStream[(K, _)]], other.asInstanceOf[DStream[(K, _)]]),
+ partitioner
+ )
+ val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)(
+ classManifest[K],
+ Manifests.seqSeqManifest
+ )
+ pdfs.mapValues {
+ case Seq(vs, ws) =>
+ (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
+ }
+ }
+
+ /**
+ * Join `this` DStream with `other` DStream. HashPartitioner is used
+ * to partition each generated RDD into default number of partitions.
+ */
+ def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
+ join[W](other, defaultPartitioner())
+ }
+
+ /**
+ * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
+ * be generated by joining RDDs from `this` and other DStream. Uses the given
+ * Partitioner to partition each generated RDD.
+ */
+ def join[W: ClassManifest](
+ other: DStream[(K, W)],
+ partitioner: Partitioner
+ ): DStream[(K, (V, W))] = {
+ this.cogroup(other, partitioner)
+ .flatMapValues{
+ case (vs, ws) =>
+ for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
+ }
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ */
+ def saveAsHadoopFiles[F <: OutputFormat[K, V]](
+ prefix: String,
+ suffix: String
+ )(implicit fm: ClassManifest[F]) {
+ saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ */
+ def saveAsHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: OutputFormat[_, _]],
+ conf: JobConf = new JobConf
+ ) {
+ val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
+ val file = rddToFileName(prefix, suffix, time)
+ rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
+ }
+ self.foreach(saveFunc)
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
+ prefix: String,
+ suffix: String
+ )(implicit fm: ClassManifest[F]) {
+ saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsNewAPIHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+ conf: Configuration = new Configuration
+ ) {
+ val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
+ val file = rddToFileName(prefix, suffix, time)
+ rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
+ }
+ self.foreach(saveFunc)
+ }
+
+ private def getKeyClass() = implicitly[ClassManifest[K]].erasure
+
+ private def getValueClass() = implicitly[ClassManifest[V]].erasure
+}
+
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
new file mode 100644
index 0000000000..ed892e33e6
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import util.{ManualClock, RecurringTimer, Clock}
+import org.apache.spark.SparkEnv
+import org.apache.spark.Logging
+
+private[streaming]
+class Scheduler(ssc: StreamingContext) extends Logging {
+
+ initLogging()
+
+ val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
+ val jobManager = new JobManager(ssc, concurrentJobs)
+ val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
+ new CheckpointWriter(ssc.checkpointDir)
+ } else {
+ null
+ }
+
+ val clockClass = System.getProperty(
+ "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
+ val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+ val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
+ longTime => generateJobs(new Time(longTime)))
+ val graph = ssc.graph
+ var latestTime: Time = null
+
+ def start() = synchronized {
+ if (ssc.isCheckpointPresent) {
+ restart()
+ } else {
+ startFirstTime()
+ }
+ logInfo("Scheduler started")
+ }
+
+ def stop() = synchronized {
+ timer.stop()
+ jobManager.stop()
+ if (checkpointWriter != null) checkpointWriter.stop()
+ ssc.graph.stop()
+ logInfo("Scheduler stopped")
+ }
+
+ private def startFirstTime() {
+ val startTime = new Time(timer.getStartTime())
+ graph.start(startTime - graph.batchDuration)
+ timer.start(startTime.milliseconds)
+ logInfo("Scheduler's timer started at " + startTime)
+ }
+
+ private def restart() {
+
+ // If manual clock is being used for testing, then
+ // either set the manual clock to the last checkpointed time,
+ // or if the property is defined set it to that time
+ if (clock.isInstanceOf[ManualClock]) {
+ val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
+ val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
+ clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
+ }
+
+ val batchDuration = ssc.graph.batchDuration
+
+ // Batches when the master was down, that is,
+ // between the checkpoint and current restart time
+ val checkpointTime = ssc.initialCheckpoint.checkpointTime
+ val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
+ val downTimes = checkpointTime.until(restartTime, batchDuration)
+ logInfo("Batches during down time: " + downTimes.mkString(", "))
+
+ // Batches that were unprocessed before failure
+ val pendingTimes = ssc.initialCheckpoint.pendingTimes
+ logInfo("Batches pending processing: " + pendingTimes.mkString(", "))
+ // Reschedule jobs for these times
+ val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
+ logInfo("Batches to reschedule: " + timesToReschedule.mkString(", "))
+ timesToReschedule.foreach(time =>
+ graph.generateJobs(time).foreach(jobManager.runJob)
+ )
+
+ // Restart the timer
+ timer.start(restartTime.milliseconds)
+ logInfo("Scheduler's timer restarted at " + restartTime)
+ }
+
+ /** Generate jobs and perform checkpoint for the given `time`. */
+ def generateJobs(time: Time) {
+ SparkEnv.set(ssc.env)
+ logInfo("\n-----------------------------------------------------\n")
+ graph.generateJobs(time).foreach(jobManager.runJob)
+ latestTime = time
+ doCheckpoint(time)
+ }
+
+ /**
+ * Clear old metadata assuming jobs of `time` have finished processing.
+ * And also perform checkpoint.
+ */
+ def clearOldMetadata(time: Time) {
+ ssc.graph.clearOldMetadata(time)
+ doCheckpoint(time)
+ }
+
+ /** Perform checkpoint for the give `time`. */
+ def doCheckpoint(time: Time) = synchronized {
+ if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
+ logInfo("Checkpointing graph for time " + time)
+ ssc.graph.updateCheckpointData(time)
+ checkpointWriter.write(new Checkpoint(ssc, time))
+ }
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
new file mode 100644
index 0000000000..3852ac2dab
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import akka.actor.Props
+import akka.actor.SupervisorStrategy
+import akka.zeromq.Subscribe
+
+import org.apache.spark.streaming.dstream._
+
+import org.apache.spark._
+import org.apache.spark.streaming.receivers.ActorReceiver
+import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
+import org.apache.spark.streaming.receivers.ZeroMQReceiver
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.streaming.receivers.ActorReceiver
+
+import scala.collection.mutable.Queue
+import scala.collection.Map
+
+import java.io.InputStream
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.UUID
+
+import org.apache.hadoop.io.LongWritable
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
+import org.apache.hadoop.fs.Path
+import twitter4j.Status
+import twitter4j.auth.Authorization
+
+
+/**
+ * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
+ * information (such as, cluster URL and job name) to internally create a SparkContext, it provides
+ * methods used to create DStream from various input sources.
+ */
+class StreamingContext private (
+ sc_ : SparkContext,
+ cp_ : Checkpoint,
+ batchDur_ : Duration
+ ) extends Logging {
+
+ /**
+ * Create a StreamingContext using an existing SparkContext.
+ * @param sparkContext Existing SparkContext
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(sparkContext: SparkContext, batchDuration: Duration) = {
+ this(sparkContext, null, batchDuration)
+ }
+
+ /**
+ * Create a StreamingContext by providing the details necessary for creating a new SparkContext.
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param appName A name for your job, to display on the cluster web UI
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(
+ master: String,
+ appName: String,
+ batchDuration: Duration,
+ sparkHome: String = null,
+ jars: Seq[String] = Nil,
+ environment: Map[String, String] = Map()) = {
+ this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment),
+ null, batchDuration)
+ }
+
+
+ /**
+ * Re-create a StreamingContext from a checkpoint file.
+ * @param path Path either to the directory that was specified as the checkpoint directory, or
+ * to the checkpoint file 'graph' or 'graph.bk'.
+ */
+ def this(path: String) = this(null, CheckpointReader.read(path), null)
+
+ initLogging()
+
+ if (sc_ == null && cp_ == null) {
+ throw new Exception("Spark Streaming cannot be initialized with " +
+ "both SparkContext and checkpoint as null")
+ }
+
+ if (MetadataCleaner.getDelaySeconds < 0) {
+ throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; "
+ + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)")
+ }
+
+ protected[streaming] val isCheckpointPresent = (cp_ != null)
+
+ protected[streaming] val sc: SparkContext = {
+ if (isCheckpointPresent) {
+ new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars, cp_.environment)
+ } else {
+ sc_
+ }
+ }
+
+ protected[streaming] val env = SparkEnv.get
+
+ protected[streaming] val graph: DStreamGraph = {
+ if (isCheckpointPresent) {
+ cp_.graph.setContext(this)
+ cp_.graph.restoreCheckpointData()
+ cp_.graph
+ } else {
+ assert(batchDur_ != null, "Batch duration for streaming context cannot be null")
+ val newGraph = new DStreamGraph()
+ newGraph.setBatchDuration(batchDur_)
+ newGraph
+ }
+ }
+
+ protected[streaming] val nextNetworkInputStreamId = new AtomicInteger(0)
+ protected[streaming] var networkInputTracker: NetworkInputTracker = null
+
+ protected[streaming] var checkpointDir: String = {
+ if (isCheckpointPresent) {
+ sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true)
+ cp_.checkpointDir
+ } else {
+ null
+ }
+ }
+
+ protected[streaming] var checkpointDuration: Duration = if (isCheckpointPresent) cp_.checkpointDuration else null
+ protected[streaming] var receiverJobThread: Thread = null
+ protected[streaming] var scheduler: Scheduler = null
+
+ /**
+ * Return the associated Spark context
+ */
+ def sparkContext = sc
+
+ /**
+ * Set each DStreams in this context to remember RDDs it generated in the last given duration.
+ * DStreams remember RDDs only for a limited duration of time and releases them for garbage
+ * collection. This method allows the developer to specify how to long to remember the RDDs (
+ * if the developer wishes to query old data outside the DStream computation).
+ * @param duration Minimum duration that each DStream should remember its RDDs
+ */
+ def remember(duration: Duration) {
+ graph.remember(duration)
+ }
+
+ /**
+ * Set the context to periodically checkpoint the DStream operations for master
+ * fault-tolerance. The graph will be checkpointed every batch interval.
+ * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
+ */
+ def checkpoint(directory: String) {
+ if (directory != null) {
+ sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
+ checkpointDir = directory
+ } else {
+ checkpointDir = null
+ }
+ }
+
+ protected[streaming] def initialCheckpoint: Checkpoint = {
+ if (isCheckpointPresent) cp_ else null
+ }
+
+ protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
+
+ /**
+ * Create an input stream with any arbitrary user implemented network receiver.
+ * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
+ * @param receiver Custom implementation of NetworkReceiver
+ */
+ def networkStream[T: ClassManifest](
+ receiver: NetworkReceiver[T]): DStream[T] = {
+ val inputStream = new PluggableInputDStream[T](this,
+ receiver)
+ graph.addInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create an input stream with any arbitrary user implemented actor receiver.
+ * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
+ * @param props Props object defining creation of the actor
+ * @param name Name of the actor
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e parametrized type of data received and actorStream
+ * should be same.
+ */
+ def actorStream[T: ClassManifest](
+ props: Props,
+ name: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
+ ): DStream[T] = {
+ networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
+ * and each frame has sequence of byte thus it needs the converter
+ * (which might be deserializer of bytes) to translate from sequence
+ * of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ */
+ def zeroMQStream[T: ClassManifest](
+ publisherUrl:String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
+ ): DStream[T] = {
+ actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
+ "ZeroMQReceiver", storageLevel, supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kafka Broker.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ */
+ def kafkaStream(
+ zkQuorum: String,
+ groupId: String,
+ topics: Map[String, Int],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
+ ): DStream[String] = {
+ val kafkaParams = Map[String, String](
+ "zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000")
+ kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kafka Broker.
+ * @param kafkaParams Map of kafka configuration paramaters.
+ * See: http://kafka.apache.org/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def kafkaStream[T: ClassManifest, D <: kafka.serializer.Decoder[_]: Manifest](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
+ ): DStream[T] = {
+ val inputStream = new KafkaInputDStream[T, D](this, kafkaParams, topics, storageLevel)
+ registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create a input stream from TCP source hostname:port. Data is received using
+ * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
+ * lines.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ */
+ def socketTextStream(
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[String] = {
+ socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
+ }
+
+ /**
+ * Create a input stream from TCP source hostname:port. Data is received using
+ * a TCP socket and the receive bytes it interepreted as object using the given
+ * converter.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param converter Function to convert the byte stream to objects
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam T Type of the objects received (after converting bytes to objects)
+ */
+ def socketStream[T: ClassManifest](
+ hostname: String,
+ port: Int,
+ converter: (InputStream) => Iterator[T],
+ storageLevel: StorageLevel
+ ): DStream[T] = {
+ val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
+ registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def flumeStream (
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[SparkFlumeEvent] = {
+ val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel)
+ registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create a input stream from network source hostname:port, where data is received
+ * as serialized blocks (serialized using the Spark's serializer) that can be directly
+ * pushed into the block manager without deserializing them. This is the most efficient
+ * way to receive data.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam T Type of the objects in the received blocks
+ */
+ def rawSocketStream[T: ClassManifest](
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[T] = {
+ val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel)
+ registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * File names starting with . are ignored.
+ * @param directory HDFS directory to monitor for new file
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
+ */
+ def fileStream[
+ K: ClassManifest,
+ V: ClassManifest,
+ F <: NewInputFormat[K, V]: ClassManifest
+ ] (directory: String): DStream[(K, V)] = {
+ val inputStream = new FileInputDStream[K, V, F](this, directory)
+ registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * @param directory HDFS directory to monitor for new file
+ * @param filter Function to filter paths to process
+ * @param newFilesOnly Should process only new files and ignore existing files in the directory
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
+ */
+ def fileStream[
+ K: ClassManifest,
+ V: ClassManifest,
+ F <: NewInputFormat[K, V]: ClassManifest
+ ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = {
+ val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
+ registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them as text files (using key as LongWritable, value
+ * as Text and input format as TextInputFormat). File names starting with . are ignored.
+ * @param directory HDFS directory to monitor for new file
+ */
+ def textFileStream(directory: String): DStream[String] = {
+ fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
+ * authorization; this uses the system properties twitter4j.oauth.consumerKey,
+ * .consumerSecret, .accessToken and .accessTokenSecret.
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ twitterAuth: Option[Authorization] = None,
+ filters: Seq[String] = Nil,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[Status] = {
+ val inputStream = new TwitterInputDStream(this, twitterAuth, filters, storageLevel)
+ registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create an input stream from a queue of RDDs. In each batch,
+ * it will process either one or all of the RDDs returned by the queue.
+ * @param queue Queue of RDDs
+ * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
+ * @tparam T Type of objects in the RDD
+ */
+ def queueStream[T: ClassManifest](
+ queue: Queue[RDD[T]],
+ oneAtATime: Boolean = true
+ ): DStream[T] = {
+ queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1))
+ }
+
+ /**
+ * Create an input stream from a queue of RDDs. In each batch,
+ * it will process either one or all of the RDDs returned by the queue.
+ * @param queue Queue of RDDs
+ * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
+ * @param defaultRDD Default RDD is returned by the DStream when the queue is empty.
+ * Set as null if no RDD should be returned when empty
+ * @tparam T Type of objects in the RDD
+ */
+ def queueStream[T: ClassManifest](
+ queue: Queue[RDD[T]],
+ oneAtATime: Boolean,
+ defaultRDD: RDD[T]
+ ): DStream[T] = {
+ val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
+ registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create a unified DStream from multiple DStreams of the same type and same interval
+ */
+ def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
+ new UnionDStream[T](streams.toArray)
+ }
+
+ /**
+ * Register an input stream that will be started (InputDStream.start() called) to get the
+ * input data.
+ */
+ def registerInputStream(inputStream: InputDStream[_]) {
+ graph.addInputStream(inputStream)
+ }
+
+ /**
+ * Register an output stream that will be computed every interval
+ */
+ def registerOutputStream(outputStream: DStream[_]) {
+ graph.addOutputStream(outputStream)
+ }
+
+ protected def validate() {
+ assert(graph != null, "Graph is null")
+ graph.validate()
+
+ assert(
+ checkpointDir == null || checkpointDuration != null,
+ "Checkpoint directory has been set, but the graph checkpointing interval has " +
+ "not been set. Please use StreamingContext.checkpoint() to set the interval."
+ )
+ }
+
+ /**
+ * Start the execution of the streams.
+ */
+ def start() {
+ if (checkpointDir != null && checkpointDuration == null && graph != null) {
+ checkpointDuration = graph.batchDuration
+ }
+
+ validate()
+
+ val networkInputStreams = graph.getInputStreams().filter(s => s match {
+ case n: NetworkInputDStream[_] => true
+ case _ => false
+ }).map(_.asInstanceOf[NetworkInputDStream[_]]).toArray
+
+ if (networkInputStreams.length > 0) {
+ // Start the network input tracker (must start before receivers)
+ networkInputTracker = new NetworkInputTracker(this, networkInputStreams)
+ networkInputTracker.start()
+ }
+
+ Thread.sleep(1000)
+
+ // Start the scheduler
+ scheduler = new Scheduler(this)
+ scheduler.start()
+ }
+
+ /**
+ * Stop the execution of the streams.
+ */
+ def stop() {
+ try {
+ if (scheduler != null) scheduler.stop()
+ if (networkInputTracker != null) networkInputTracker.stop()
+ if (receiverJobThread != null) receiverJobThread.interrupt()
+ sc.stop()
+ logInfo("StreamingContext stopped successfully")
+ } catch {
+ case e: Exception => logWarning("Error while stopping", e)
+ }
+ }
+}
+
+
+object StreamingContext {
+
+ implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
+ new PairDStreamFunctions[K, V](stream)
+ }
+
+ protected[streaming] def createNewSparkContext(
+ master: String,
+ appName: String,
+ sparkHome: String,
+ jars: Seq[String],
+ environment: Map[String, String]): SparkContext = {
+ // Set the default cleaner delay to an hour if not already set.
+ // This should be sufficient for even 1 second interval.
+ if (MetadataCleaner.getDelaySeconds < 0) {
+ MetadataCleaner.setDelaySeconds(3600)
+ }
+ new SparkContext(master, appName, sparkHome, jars, environment)
+ }
+
+ protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
+ if (prefix == null) {
+ time.milliseconds.toString
+ } else if (suffix == null || suffix.length ==0) {
+ prefix + "-" + time.milliseconds
+ } else {
+ prefix + "-" + time.milliseconds + "." + suffix
+ }
+ }
+
+ protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = {
+ new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
new file mode 100644
index 0000000000..2678334f53
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+/**
+ * This is a simple class that represents an absolute instant of time.
+ * Internally, it represents time as the difference, measured in milliseconds, between the current
+ * time and midnight, January 1, 1970 UTC. This is the same format as what is returned by
+ * System.currentTimeMillis.
+ */
+case class Time(private val millis: Long) {
+
+ def milliseconds: Long = millis
+
+ def < (that: Time): Boolean = (this.millis < that.millis)
+
+ def <= (that: Time): Boolean = (this.millis <= that.millis)
+
+ def > (that: Time): Boolean = (this.millis > that.millis)
+
+ def >= (that: Time): Boolean = (this.millis >= that.millis)
+
+ def + (that: Duration): Time = new Time(millis + that.milliseconds)
+
+ def - (that: Time): Duration = new Duration(millis - that.millis)
+
+ def - (that: Duration): Time = new Time(millis - that.milliseconds)
+
+ def floor(that: Duration): Time = {
+ val t = that.milliseconds
+ val m = math.floor(this.millis / t).toLong
+ new Time(m * t)
+ }
+
+ def isMultipleOf(that: Duration): Boolean =
+ (this.millis % that.milliseconds == 0)
+
+ def min(that: Time): Time = if (this < that) this else that
+
+ def max(that: Time): Time = if (this > that) this else that
+
+ def until(that: Time, interval: Duration): Seq[Time] = {
+ (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_))
+ }
+
+ def to(that: Time, interval: Duration): Seq[Time] = {
+ (this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_))
+ }
+
+
+ override def toString: String = (millis.toString + " ms")
+
+}
+
+object Time {
+ val ordering = Ordering.by((time: Time) => time.millis)
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
new file mode 100644
index 0000000000..f8c8d8ece1
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import org.apache.spark.streaming.{Duration, Time, DStream}
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.RDD
+
+/**
+ * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
+ * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]]
+ * for more details on RDDs). DStreams can either be created from live data (such as, data from
+ * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
+ * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
+ * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
+ * by a parent DStream.
+ *
+ * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
+ * `window`. In addition, [[org.apache.spark.streaming.api.java.JavaPairDStream]] contains operations available
+ * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`.
+ *
+ * DStreams internally is characterized by a few basic properties:
+ * - A list of other DStreams that the DStream depends on
+ * - A time interval at which the DStream generates an RDD
+ * - A function that is used to generate an RDD after each time interval
+ */
+class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
+ extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] {
+
+ override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
+
+ /** Return a new DStream containing only the elements that satisfy a predicate. */
+ def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
+ dstream.filter((x => f(x).booleanValue()))
+
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ def cache(): JavaDStream[T] = dstream.cache()
+
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ def persist(): JavaDStream[T] = dstream.persist()
+
+ /** Persist the RDDs of this DStream with the given storage level */
+ def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel)
+
+ /** Generate an RDD for the given duration */
+ def compute(validTime: Time): JavaRDD[T] = {
+ dstream.compute(validTime) match {
+ case Some(rdd) => new JavaRDD(rdd)
+ case None => null
+ }
+ }
+
+ /**
+ * Return a new DStream in which each RDD contains all the elements in seen in a
+ * sliding window of time over this DStream. The new DStream generates RDDs with
+ * the same interval as this DStream.
+ * @param windowDuration width of the window; must be a multiple of this DStream's interval.
+ */
+ def window(windowDuration: Duration): JavaDStream[T] =
+ dstream.window(windowDuration)
+
+ /**
+ * Return a new DStream in which each RDD contains all the elements in seen in a
+ * sliding window of time over this DStream.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def window(windowDuration: Duration, slideDuration: Duration): JavaDStream[T] =
+ dstream.window(windowDuration, slideDuration)
+
+ /**
+ * Return a new DStream by unifying data of another DStream with this DStream.
+ * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
+ */
+ def union(that: JavaDStream[T]): JavaDStream[T] =
+ dstream.union(that.dstream)
+}
+
+object JavaDStream {
+ implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] =
+ new JavaDStream[T](dstream)
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
new file mode 100644
index 0000000000..2e6fe9a9c4
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import java.util.{List => JList}
+import java.lang.{Long => JLong}
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.streaming._
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD}
+import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
+import java.util
+import org.apache.spark.RDD
+import JavaDStream._
+
+trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
+ extends Serializable {
+ implicit val classManifest: ClassManifest[T]
+
+ def dstream: DStream[T]
+
+ def wrapRDD(in: RDD[T]): R
+
+ implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = {
+ in.map(new JLong(_))
+ }
+
+ /**
+ * Print the first ten elements of each RDD generated in this DStream. This is an output
+ * operator, so this DStream will be registered as an output stream and there materialized.
+ */
+ def print() = dstream.print()
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by counting each RDD
+ * of this DStream.
+ */
+ def count(): JavaDStream[JLong] = dstream.count()
+
+ /**
+ * Return a new DStream in which each RDD contains the counts of each distinct value in
+ * each RDD of this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
+ */
+ def countByValue(): JavaPairDStream[T, JLong] = {
+ JavaPairDStream.scalaToJavaLong(dstream.countByValue())
+ }
+
+ /**
+ * Return a new DStream in which each RDD contains the counts of each distinct value in
+ * each RDD of this DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ */
+ def countByValue(numPartitions: Int): JavaPairDStream[T, JLong] = {
+ JavaPairDStream.scalaToJavaLong(dstream.countByValue(numPartitions))
+ }
+
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by counting the number
+ * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
+ * window() operation. This is equivalent to window(windowDuration, slideDuration).count()
+ */
+ def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = {
+ dstream.countByWindow(windowDuration, slideDuration)
+ }
+
+ /**
+ * Return a new DStream in which each RDD contains the count of distinct elements in
+ * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration)
+ : JavaPairDStream[T, JLong] = {
+ JavaPairDStream.scalaToJavaLong(
+ dstream.countByValueAndWindow(windowDuration, slideDuration))
+ }
+
+ /**
+ * Return a new DStream in which each RDD contains the count of distinct elements in
+ * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ */
+ def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
+ : JavaPairDStream[T, JLong] = {
+ JavaPairDStream.scalaToJavaLong(
+ dstream.countByValueAndWindow(windowDuration, slideDuration, numPartitions))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying glom() to each RDD of
+ * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
+ * an array.
+ */
+ def glom(): JavaDStream[JList[T]] =
+ new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
+
+ /** Return the StreamingContext associated with this DStream */
+ def context(): StreamingContext = dstream.context()
+
+ /** Return a new DStream by applying a function to all elements of this DStream. */
+ def map[R](f: JFunction[T, R]): JavaDStream[R] = {
+ new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType())
+ }
+
+ /** Return a new DStream by applying a function to all elements of this DStream. */
+ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+ def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
+ new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
+ }
+
+ /**
+ * Return a new DStream by applying a function to all elements of this DStream,
+ * and then flattening the results
+ */
+ def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
+ import scala.collection.JavaConverters._
+ def fn = (x: T) => f.apply(x).asScala
+ new JavaDStream(dstream.flatMap(fn)(f.elementType()))(f.elementType())
+ }
+
+ /**
+ * Return a new DStream by applying a function to all elements of this DStream,
+ * and then flattening the results
+ */
+ def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+ import scala.collection.JavaConverters._
+ def fn = (x: T) => f.apply(x).asScala
+ def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
+ new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
+ * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
+ * of the RDD.
+ */
+ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType())
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
+ * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
+ * of the RDD.
+ */
+ def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
+ : JavaPairDStream[K2, V2] = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType())
+ }
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing each RDD
+ * of this DStream.
+ */
+ def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f)
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a sliding window over this DStream.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def reduceByWindow(
+ reduceFunc: (T, T) => T,
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): DStream[T] = {
+ dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
+ }
+
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a sliding window over this DStream. However, the reduction is done incrementally
+ * using the old window's reduced value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient than reduceByWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def reduceByWindow(
+ reduceFunc: JFunction2[T, T, T],
+ invReduceFunc: JFunction2[T, T, T],
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): JavaDStream[T] = {
+ dstream.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
+ }
+
+ /**
+ * Return all the RDDs between 'fromDuration' to 'toDuration' (both included)
+ */
+ def slice(fromTime: Time, toTime: Time): JList[R] = {
+ new util.ArrayList(dstream.slice(fromTime, toTime).map(wrapRDD(_)).toSeq)
+ }
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * this DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreach(foreachFunc: JFunction[R, Void]) {
+ dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd)))
+ }
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * this DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreach(foreachFunc: JFunction2[R, Time, Void]) {
+ dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
+ implicit val cm: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ def scalaTransform (in: RDD[T]): RDD[U] =
+ transformFunc.call(wrapRDD(in)).rdd
+ dstream.transform(scalaTransform(_))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
+ implicit val cm: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ def scalaTransform (in: RDD[T], time: Time): RDD[U] =
+ transformFunc.call(wrapRDD(in), time).rdd
+ dstream.transform(scalaTransform(_, _))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
+ JavaPairDStream[K2, V2] = {
+ implicit val cmk: ClassManifest[K2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+ implicit val cmv: ClassManifest[V2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ def scalaTransform (in: RDD[T]): RDD[(K2, V2)] =
+ transformFunc.call(wrapRDD(in)).rdd
+ dstream.transform(scalaTransform(_))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
+ JavaPairDStream[K2, V2] = {
+ implicit val cmk: ClassManifest[K2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+ implicit val cmv: ClassManifest[V2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] =
+ transformFunc.call(wrapRDD(in), time).rdd
+ dstream.transform(scalaTransform(_, _))
+ }
+
+ /**
+ * Enable periodic checkpointing of RDDs of this DStream
+ * @param interval Time interval after which generated RDD will be checkpointed
+ */
+ def checkpoint(interval: Duration) = {
+ dstream.checkpoint(interval)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
new file mode 100644
index 0000000000..c203dccd17
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import java.util.{List => JList}
+import java.lang.{Long => JLong}
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.{RDD, Partitioner}
+import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.api.java.{JavaUtils, JavaRDD, JavaPairRDD}
+import org.apache.spark.storage.StorageLevel
+import com.google.common.base.Optional
+import org.apache.spark.RDD
+
+class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
+ implicit val kManifiest: ClassManifest[K],
+ implicit val vManifest: ClassManifest[V])
+ extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] {
+
+ override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
+
+ // =======================================================================
+ // Methods common to all DStream's
+ // =======================================================================
+
+ /** Return a new DStream containing only the elements that satisfy a predicate. */
+ def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] =
+ dstream.filter((x => f(x).booleanValue()))
+
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ def cache(): JavaPairDStream[K, V] = dstream.cache()
+
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ def persist(): JavaPairDStream[K, V] = dstream.persist()
+
+ /** Persist the RDDs of this DStream with the given storage level */
+ def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel)
+
+ /** Method that generates a RDD for the given Duration */
+ def compute(validTime: Time): JavaPairRDD[K, V] = {
+ dstream.compute(validTime) match {
+ case Some(rdd) => new JavaPairRDD(rdd)
+ case None => null
+ }
+ }
+
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * The new DStream generates RDDs with the same interval as this DStream.
+ * @param windowDuration width of the window; must be a multiple of this DStream's interval.
+ * @return
+ */
+ def window(windowDuration: Duration): JavaPairDStream[K, V] =
+ dstream.window(windowDuration)
+
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * @param windowDuration duration (i.e., width) of the window;
+ * must be a multiple of this DStream's interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's interval
+ */
+ def window(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] =
+ dstream.window(windowDuration, slideDuration)
+
+ /**
+ * Return a new DStream by unifying data of another DStream with this DStream.
+ * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
+ */
+ def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] =
+ dstream.union(that.dstream)
+
+ // =======================================================================
+ // Methods only for PairDStream's
+ // =======================================================================
+
+ /**
+ * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
+ */
+ def groupByKey(): JavaPairDStream[K, JList[V]] =
+ dstream.groupByKey().mapValues(seqAsJavaList _)
+
+ /**
+ * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * generate the RDDs with `numPartitions` partitions.
+ */
+ def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] =
+ dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _)
+
+ /**
+ * Return a new DStream by applying `groupByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
+ * single sequence to generate the RDDs of the new DStream. [[org.apache.spark.Partitioner]]
+ * is used to control the partitioning of each RDD.
+ */
+ def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] =
+ dstream.groupByKey(partitioner).mapValues(seqAsJavaList _)
+
+ /**
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
+ * with Spark's default number of partitions.
+ */
+ def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] =
+ dstream.reduceByKey(func)
+
+ /**
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
+ * with `numPartitions` partitions.
+ */
+ def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] =
+ dstream.reduceByKey(func, numPartitions)
+
+ /**
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the
+ * partitioning of each RDD.
+ */
+ def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = {
+ dstream.reduceByKey(func, partitioner)
+ }
+
+ /**
+ * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
+ * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more
+ * information.
+ */
+ def combineByKey[C](createCombiner: JFunction[V, C],
+ mergeValue: JFunction2[C, V, C],
+ mergeCombiners: JFunction2[C, C, C],
+ partitioner: Partitioner
+ ): JavaPairDStream[K, C] = {
+ implicit val cm: ClassManifest[C] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
+ dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
+ }
+
+ /**
+ * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
+ * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
+ * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ */
+ def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKeyAndWindow(windowDuration).mapValues(seqAsJavaList _)
+ }
+
+ /**
+ * Return a new DStream by applying `groupByKey` over a sliding window. Similar to
+ * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
+ : JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _)
+ }
+
+ /**
+ * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
+ :JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions)
+ .mapValues(seqAsJavaList _)
+ }
+
+ /**
+ * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ */
+ def groupByKeyAndWindow(
+ windowDuration: Duration,
+ slideDuration: Duration,
+ partitioner: Partitioner
+ ):JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner)
+ .mapValues(seqAsJavaList _)
+ }
+
+ /**
+ * Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
+ * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
+ * the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ */
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration)
+ :JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, windowDuration)
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration
+ ):JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with `numPartitions` partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration,
+ numPartitions: Int
+ ): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions)
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to
+ * `DStream.reduceByKey()`, but applies it over a sliding window.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration,
+ partitioner: Partitioner
+ ): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner)
+ }
+
+ /**
+ * Return a new DStream by reducing over a using incremental computation.
+ * The reduced value of over a new window is calculated using the old window's reduce value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ invReduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
+ }
+
+ /**
+ * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
+ * The reduced value of over a new window is calculated using the old window's reduce value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ * @param filterFunc function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
+ * set this to null if you do not want to filter
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ invReduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration,
+ numPartitions: Int,
+ filterFunc: JFunction[(K, V), java.lang.Boolean]
+ ): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(
+ reduceFunc,
+ invReduceFunc,
+ windowDuration,
+ slideDuration,
+ numPartitions,
+ (p: (K, V)) => filterFunc(p).booleanValue()
+ )
+ }
+
+ /**
+ * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
+ * The reduced value of over a new window is calculated using the old window's reduce value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param filterFunc function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
+ * set this to null if you do not want to filter
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ invReduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration,
+ partitioner: Partitioner,
+ filterFunc: JFunction[(K, V), java.lang.Boolean]
+ ): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(
+ reduceFunc,
+ invReduceFunc,
+ windowDuration,
+ slideDuration,
+ partitioner,
+ (p: (K, V)) => filterFunc(p).booleanValue()
+ )
+ }
+
+ private def convertUpdateStateFunction[S](in: JFunction2[JList[V], Optional[S], Optional[S]]):
+ (Seq[V], Option[S]) => Option[S] = {
+ val scalaFunc: (Seq[V], Option[S]) => Option[S] = (values, state) => {
+ val list: JList[V] = values
+ val scalaState: Optional[S] = JavaUtils.optionToOptional(state)
+ val result: Optional[S] = in.apply(list, scalaState)
+ result.isPresent match {
+ case true => Some(result.get())
+ case _ => None
+ }
+ }
+ scalaFunc
+ }
+
+ /**
+ * Create a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of each key.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @tparam S State type
+ */
+ def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]])
+ : JavaPairDStream[K, S] = {
+ implicit val cm: ClassManifest[S] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
+ dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
+ }
+
+ /**
+ * Create a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of each key.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @tparam S State type
+ */
+ def updateStateByKey[S: ClassManifest](
+ updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
+ numPartitions: Int)
+ : JavaPairDStream[K, S] = {
+ dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions)
+ }
+
+ /**
+ * Create a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of the key.
+ * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @tparam S State type
+ */
+ def updateStateByKey[S: ClassManifest](
+ updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
+ partitioner: Partitioner
+ ): JavaPairDStream[K, S] = {
+ dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner)
+ }
+
+ def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
+ implicit val cm: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ dstream.mapValues(f)
+ }
+
+ def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = {
+ import scala.collection.JavaConverters._
+ def fn = (x: V) => f.apply(x).asScala
+ implicit val cm: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ dstream.flatMapValues(fn)
+ }
+
+ /**
+ * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
+ * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
+ * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
+ * of partitions.
+ */
+ def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ }
+
+ /**
+ * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
+ * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
+ * key in both RDDs. Partitioner is used to partition each generated RDD.
+ */
+ def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
+ : JavaPairDStream[K, (JList[V], JList[W])] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.cogroup(other.dstream, partitioner)
+ .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ }
+
+ /**
+ * Join `this` DStream with `other` DStream. HashPartitioner is used
+ * to partition each generated RDD into default number of partitions.
+ */
+ def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.join(other.dstream)
+ }
+
+ /**
+ * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
+ * be generated by joining RDDs from `this` and other DStream. Uses the given
+ * Partitioner to partition each generated RDD.
+ */
+ def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
+ : JavaPairDStream[K, (V, W)] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.join(other.dstream, partitioner)
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) {
+ dstream.saveAsHadoopFiles(prefix, suffix)
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: OutputFormat[_, _]]) {
+ dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: OutputFormat[_, _]],
+ conf: JobConf) {
+ dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) {
+ dstream.saveAsNewAPIHadoopFiles(prefix, suffix)
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsNewAPIHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) {
+ dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsNewAPIHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+ conf: Configuration = new Configuration) {
+ dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
+ }
+
+ override val classManifest: ClassManifest[(K, V)] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+}
+
+object JavaPairDStream {
+ implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)])
+ :JavaPairDStream[K, V] =
+ new JavaPairDStream[K, V](dstream)
+
+ def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
+ implicit val cmk: ClassManifest[K] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ implicit val cmv: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ new JavaPairDStream[K, V](dstream.dstream)
+ }
+
+ def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long])
+ : JavaPairDStream[K, JLong] = {
+ StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
new file mode 100644
index 0000000000..f10beb1db3
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import org.apache.spark.streaming._
+import receivers.{ActorReceiver, ReceiverSupervisorStrategy}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import twitter4j.Status
+import akka.actor.Props
+import akka.actor.SupervisorStrategy
+import akka.zeromq.Subscribe
+import scala.collection.JavaConversions._
+import java.lang.{Long => JLong, Integer => JInt}
+import java.io.InputStream
+import java.util.{Map => JMap}
+import twitter4j.auth.Authorization
+import org.apache.spark.RDD
+
+/**
+ * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
+ * information (such as, cluster URL and job name) to internally create a SparkContext, it provides
+ * methods used to create DStream from various input sources.
+ */
+class JavaStreamingContext(val ssc: StreamingContext) {
+
+ // TODOs:
+ // - Test to/from Hadoop functions
+ // - Support creating and registering InputStreams
+
+
+ /**
+ * Creates a StreamingContext.
+ * @param master Name of the Spark Master
+ * @param appName Name to be used when registering with the scheduler
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(master: String, appName: String, batchDuration: Duration) =
+ this(new StreamingContext(master, appName, batchDuration, null, Nil, Map()))
+
+ /**
+ * Creates a StreamingContext.
+ * @param master Name of the Spark Master
+ * @param appName Name to be used when registering with the scheduler
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param sparkHome The SPARK_HOME directory on the slave nodes
+ * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the local
+ * file system or an HDFS, HTTP, HTTPS, or FTP URL.
+ */
+ def this(
+ master: String,
+ appName: String,
+ batchDuration: Duration,
+ sparkHome: String,
+ jarFile: String) =
+ this(new StreamingContext(master, appName, batchDuration, sparkHome, Seq(jarFile), Map()))
+
+ /**
+ * Creates a StreamingContext.
+ * @param master Name of the Spark Master
+ * @param appName Name to be used when registering with the scheduler
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param sparkHome The SPARK_HOME directory on the slave nodes
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ */
+ def this(
+ master: String,
+ appName: String,
+ batchDuration: Duration,
+ sparkHome: String,
+ jars: Array[String]) =
+ this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, Map()))
+
+ /**
+ * Creates a StreamingContext.
+ * @param master Name of the Spark Master
+ * @param appName Name to be used when registering with the scheduler
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param sparkHome The SPARK_HOME directory on the slave nodes
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ * @param environment Environment variables to set on worker nodes
+ */
+ def this(
+ master: String,
+ appName: String,
+ batchDuration: Duration,
+ sparkHome: String,
+ jars: Array[String],
+ environment: JMap[String, String]) =
+ this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, environment))
+
+ /**
+ * Creates a StreamingContext using an existing SparkContext.
+ * @param sparkContext The underlying JavaSparkContext to use
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(sparkContext: JavaSparkContext, batchDuration: Duration) =
+ this(new StreamingContext(sparkContext.sc, batchDuration))
+
+ /**
+ * Re-creates a StreamingContext from a checkpoint file.
+ * @param path Path either to the directory that was specified as the checkpoint directory, or
+ * to the checkpoint file 'graph' or 'graph.bk'.
+ */
+ def this(path: String) = this (new StreamingContext(path))
+
+ /** The underlying SparkContext */
+ val sc: JavaSparkContext = new JavaSparkContext(ssc.sc)
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ */
+ def kafkaStream(
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt])
+ : JavaDStream[String] = {
+ implicit val cmt: ClassManifest[String] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]]
+ ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
+ StorageLevel.MEMORY_ONLY_SER_2)
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level. Defaults to memory-only
+ *
+ */
+ def kafkaStream(
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel)
+ : JavaDStream[String] = {
+ implicit val cmt: ClassManifest[String] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]]
+ ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
+ storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param typeClass Type of RDD
+ * @param decoderClass Type of kafka decoder
+ * @param kafkaParams Map of kafka configuration paramaters.
+ * See: http://kafka.apache.org/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level. Defaults to memory-only
+ */
+ def kafkaStream[T, D <: kafka.serializer.Decoder[_]](
+ typeClass: Class[T],
+ decoderClass: Class[D],
+ kafkaParams: JMap[String, String],
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel)
+ : JavaDStream[T] = {
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cmd: Manifest[D] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[D]]
+ ssc.kafkaStream[T, D](
+ kafkaParams.toMap,
+ Map(topics.mapValues(_.intValue()).toSeq: _*),
+ storageLevel)
+ }
+
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
+ * lines.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ */
+ def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
+ : JavaDStream[String] = {
+ ssc.socketTextStream(hostname, port, storageLevel)
+ }
+
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
+ * lines.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ */
+ def socketTextStream(hostname: String, port: Int): JavaDStream[String] = {
+ ssc.socketTextStream(hostname, port)
+ }
+
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes it interepreted as object using the given
+ * converter.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param converter Function to convert the byte stream to objects
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam T Type of the objects received (after converting bytes to objects)
+ */
+ def socketStream[T](
+ hostname: String,
+ port: Int,
+ converter: JFunction[InputStream, java.lang.Iterable[T]],
+ storageLevel: StorageLevel)
+ : JavaDStream[T] = {
+ def fn = (x: InputStream) => converter.apply(x).toIterator
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.socketStream(hostname, port, fn, storageLevel)
+ }
+
+ /**
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them as text files (using key as LongWritable, value
+ * as Text and input format as TextInputFormat). File names starting with . are ignored.
+ * @param directory HDFS directory to monitor for new file
+ */
+ def textFileStream(directory: String): JavaDStream[String] = {
+ ssc.textFileStream(directory)
+ }
+
+ /**
+ * Create a input stream from network source hostname:port, where data is received
+ * as serialized blocks (serialized using the Spark's serializer) that can be directly
+ * pushed into the block manager without deserializing them. This is the most efficient
+ * way to receive data.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam T Type of the objects in the received blocks
+ */
+ def rawSocketStream[T](
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel): JavaDStream[T] = {
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port, storageLevel))
+ }
+
+ /**
+ * Create a input stream from network source hostname:port, where data is received
+ * as serialized blocks (serialized using the Spark's serializer) that can be directly
+ * pushed into the block manager without deserializing them. This is the most efficient
+ * way to receive data.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @tparam T Type of the objects in the received blocks
+ */
+ def rawSocketStream[T](hostname: String, port: Int): JavaDStream[T] = {
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port))
+ }
+
+ /**
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * File names starting with . are ignored.
+ * @param directory HDFS directory to monitor for new file
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
+ */
+ def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = {
+ implicit val cmk: ClassManifest[K] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ implicit val cmv: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cmf: ClassManifest[F] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]]
+ ssc.fileStream[K, V, F](directory);
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
+ JavaDStream[SparkFlumeEvent] = {
+ ssc.flumeStream(hostname, port, storageLevel)
+ }
+
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ */
+ def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
+ ssc.flumeStream(hostname, port)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J Authorization object
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ twitterAuth: Authorization,
+ filters: Array[String],
+ storageLevel: StorageLevel
+ ): JavaDStream[Status] = {
+ ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * .consumerSecret, .accessToken and .accessTokenSecret to be set.
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ filters: Array[String],
+ storageLevel: StorageLevel
+ ): JavaDStream[Status] = {
+ ssc.twitterStream(None, filters, storageLevel)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J Authorization
+ * @param filters Set of filter strings to get only those tweets that match them
+ */
+ def twitterStream(
+ twitterAuth: Authorization,
+ filters: Array[String]
+ ): JavaDStream[Status] = {
+ ssc.twitterStream(Some(twitterAuth), filters)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * .consumerSecret, .accessToken and .accessTokenSecret to be set.
+ * @param filters Set of filter strings to get only those tweets that match them
+ */
+ def twitterStream(
+ filters: Array[String]
+ ): JavaDStream[Status] = {
+ ssc.twitterStream(None, filters)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J Authorization
+ */
+ def twitterStream(
+ twitterAuth: Authorization
+ ): JavaDStream[Status] = {
+ ssc.twitterStream(Some(twitterAuth))
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * .consumerSecret, .accessToken and .accessTokenSecret to be set.
+ */
+ def twitterStream(): JavaDStream[Status] = {
+ ssc.twitterStream()
+ }
+
+ /**
+ * Create an input stream with any arbitrary user implemented actor receiver.
+ * @param props Props object defining creation of the actor
+ * @param name Name of the actor
+ * @param storageLevel Storage level to use for storing the received objects
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e parametrized type of data received and actorStream
+ * should be same.
+ */
+ def actorStream[T](
+ props: Props,
+ name: String,
+ storageLevel: StorageLevel,
+ supervisorStrategy: SupervisorStrategy
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.actorStream[T](props, name, storageLevel, supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream with any arbitrary user implemented actor receiver.
+ * @param props Props object defining creation of the actor
+ * @param name Name of the actor
+ * @param storageLevel Storage level to use for storing the received objects
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e parametrized type of data received and actorStream
+ * should be same.
+ */
+ def actorStream[T](
+ props: Props,
+ name: String,
+ storageLevel: StorageLevel
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.actorStream[T](props, name, storageLevel)
+ }
+
+ /**
+ * Create an input stream with any arbitrary user implemented actor receiver.
+ * @param props Props object defining creation of the actor
+ * @param name Name of the actor
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e parametrized type of data received and actorStream
+ * should be same.
+ */
+ def actorStream[T](
+ props: Props,
+ name: String
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.actorStream[T](props, name)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def zeroMQStream[T](
+ publisherUrl:String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+ storageLevel: StorageLevel,
+ supervisorStrategy: SupervisorStrategy
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ */
+ def zeroMQStream[T](
+ publisherUrl:String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+ storageLevel: StorageLevel
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ */
+ def zeroMQStream[T](
+ publisherUrl:String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
+ }
+
+ /**
+ * Registers an output stream that will be computed every interval
+ */
+ def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) {
+ ssc.registerOutputStream(outputStream.dstream)
+ }
+
+ /**
+ * Creates a input stream from an queue of RDDs. In each batch,
+ * it will process either one or all of the RDDs returned by the queue.
+ *
+ * NOTE: changes to the queue after the stream is created will not be recognized.
+ * @param queue Queue of RDDs
+ * @tparam T Type of objects in the RDD
+ */
+ def queueStream[T](queue: java.util.Queue[JavaRDD[T]]): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ val sQueue = new scala.collection.mutable.Queue[RDD[T]]
+ sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
+ ssc.queueStream(sQueue)
+ }
+
+ /**
+ * Creates a input stream from an queue of RDDs. In each batch,
+ * it will process either one or all of the RDDs returned by the queue.
+ *
+ * NOTE: changes to the queue after the stream is created will not be recognized.
+ * @param queue Queue of RDDs
+ * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
+ * @tparam T Type of objects in the RDD
+ */
+ def queueStream[T](queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ val sQueue = new scala.collection.mutable.Queue[RDD[T]]
+ sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
+ ssc.queueStream(sQueue, oneAtATime)
+ }
+
+ /**
+ * Creates a input stream from an queue of RDDs. In each batch,
+ * it will process either one or all of the RDDs returned by the queue.
+ *
+ * NOTE: changes to the queue after the stream is created will not be recognized.
+ * @param queue Queue of RDDs
+ * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
+ * @param defaultRDD Default RDD is returned by the DStream when the queue is empty
+ * @tparam T Type of objects in the RDD
+ */
+ def queueStream[T](
+ queue: java.util.Queue[JavaRDD[T]],
+ oneAtATime: Boolean,
+ defaultRDD: JavaRDD[T]): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ val sQueue = new scala.collection.mutable.Queue[RDD[T]]
+ sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
+ ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd)
+ }
+
+ /**
+ * Sets the context to periodically checkpoint the DStream operations for master
+ * fault-tolerance. The graph will be checkpointed every batch interval.
+ * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
+ */
+ def checkpoint(directory: String) {
+ ssc.checkpoint(directory)
+ }
+
+ /**
+ * Sets each DStreams in this context to remember RDDs it generated in the last given duration.
+ * DStreams remember RDDs only for a limited duration of duration and releases them for garbage
+ * collection. This method allows the developer to specify how to long to remember the RDDs (
+ * if the developer wishes to query old data outside the DStream computation).
+ * @param duration Minimum duration that each DStream should remember its RDDs
+ */
+ def remember(duration: Duration) {
+ ssc.remember(duration)
+ }
+
+ /**
+ * Starts the execution of the streams.
+ */
+ def start() = ssc.start()
+
+ /**
+ * Sstops the execution of the streams.
+ */
+ def stop() = ssc.stop()
+
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
new file mode 100644
index 0000000000..4a9d82211f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.{RDD, Partitioner}
+import org.apache.spark.rdd.CoGroupedRDD
+import org.apache.spark.streaming.{Time, DStream, Duration}
+
+private[streaming]
+class CoGroupedDStream[K : ClassManifest](
+ parents: Seq[DStream[(K, _)]],
+ partitioner: Partitioner
+ ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) {
+
+ if (parents.length == 0) {
+ throw new IllegalArgumentException("Empty array of parents")
+ }
+
+ if (parents.map(_.ssc).distinct.size > 1) {
+ throw new IllegalArgumentException("Array of parents have different StreamingContexts")
+ }
+
+ if (parents.map(_.slideDuration).distinct.size > 1) {
+ throw new IllegalArgumentException("Array of parents have different slide times")
+ }
+
+ override def dependencies = parents.toList
+
+ override def slideDuration: Duration = parents.head.slideDuration
+
+ override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = {
+ val part = partitioner
+ val rdds = parents.flatMap(_.getOrCompute(validTime))
+ if (rdds.size > 0) {
+ val q = new CoGroupedRDD[K](rdds, part)
+ Some(q)
+ } else {
+ None
+ }
+ }
+
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
new file mode 100644
index 0000000000..35cc4cb396
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.RDD
+import org.apache.spark.streaming.{Time, StreamingContext}
+
+/**
+ * An input stream that always returns the same RDD on each timestep. Useful for testing.
+ */
+class ConstantInputDStream[T: ClassManifest](ssc_ : StreamingContext, rdd: RDD[T])
+ extends InputDStream[T](ssc_) {
+
+ override def start() {}
+
+ override def stop() {}
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ Some(rdd)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
new file mode 100644
index 0000000000..1c265ed972
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.RDD
+import org.apache.spark.rdd.UnionRDD
+import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
+
+import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+
+import scala.collection.mutable.{HashSet, HashMap}
+import java.io.{ObjectInputStream, IOException}
+
+private[streaming]
+class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
+ @transient ssc_ : StreamingContext,
+ directory: String,
+ filter: Path => Boolean = FileInputDStream.defaultFilter,
+ newFilesOnly: Boolean = true)
+ extends InputDStream[(K, V)](ssc_) {
+
+ protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
+
+ // Latest file mod time seen till any point of time
+ private val lastModTimeFiles = new HashSet[String]()
+ private var lastModTime = 0L
+
+ @transient private var path_ : Path = null
+ @transient private var fs_ : FileSystem = null
+ @transient private[streaming] var files = new HashMap[Time, Array[String]]
+
+ override def start() {
+ if (newFilesOnly) {
+ lastModTime = graph.zeroTime.milliseconds
+ } else {
+ lastModTime = 0
+ }
+ logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly)
+ }
+
+ override def stop() { }
+
+ /**
+ * Finds the files that were modified since the last time this method was called and makes
+ * a union RDD out of them. Note that this maintains the list of files that were processed
+ * in the latest modification time in the previous call to this method. This is because the
+ * modification time returned by the FileStatus API seems to return times only at the
+ * granularity of seconds. And new files may have the same modification time as the
+ * latest modification time in the previous call to this method yet was not reported in
+ * the previous call.
+ */
+ override def compute(validTime: Time): Option[RDD[(K, V)]] = {
+ assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime)
+
+ // Create the filter for selecting new files
+ val newFilter = new PathFilter() {
+ // Latest file mod time seen in this round of fetching files and its corresponding files
+ var latestModTime = 0L
+ val latestModTimeFiles = new HashSet[String]()
+
+ def accept(path: Path): Boolean = {
+ if (!filter(path)) { // Reject file if it does not satisfy filter
+ logDebug("Rejected by filter " + path)
+ return false
+ } else { // Accept file only if
+ val modTime = fs.getFileStatus(path).getModificationTime()
+ logDebug("Mod time for " + path + " is " + modTime)
+ if (modTime < lastModTime) {
+ logDebug("Mod time less than last mod time")
+ return false // If the file was created before the last time it was called
+ } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) {
+ logDebug("Mod time equal to last mod time, but file considered already")
+ return false // If the file was created exactly as lastModTime but not reported yet
+ } else if (modTime > validTime.milliseconds) {
+ logDebug("Mod time more than valid time")
+ return false // If the file was created after the time this function call requires
+ }
+ if (modTime > latestModTime) {
+ latestModTime = modTime
+ latestModTimeFiles.clear()
+ logDebug("Latest mod time updated to " + latestModTime)
+ }
+ latestModTimeFiles += path.toString
+ logDebug("Accepted " + path)
+ return true
+ }
+ }
+ }
+ logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime)
+ val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString)
+ logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
+ if (newFiles.length > 0) {
+ // Update the modification time and the files processed for that modification time
+ if (lastModTime != newFilter.latestModTime) {
+ lastModTime = newFilter.latestModTime
+ lastModTimeFiles.clear()
+ }
+ lastModTimeFiles ++= newFilter.latestModTimeFiles
+ logDebug("Last mod time updated to " + lastModTime)
+ }
+ files += ((validTime, newFiles))
+ Some(filesToRDD(newFiles))
+ }
+
+ /** Clear the old time-to-files mappings along with old RDDs */
+ protected[streaming] override def clearOldMetadata(time: Time) {
+ super.clearOldMetadata(time)
+ val oldFiles = files.filter(_._1 <= (time - rememberDuration))
+ files --= oldFiles.keys
+ logInfo("Cleared " + oldFiles.size + " old files that were older than " +
+ (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
+ logDebug("Cleared files are:\n" +
+ oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
+ }
+
+ /** Generate one RDD from an array of files */
+ protected[streaming] def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
+ new UnionRDD(
+ context.sparkContext,
+ files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
+ )
+ }
+
+ private def path: Path = {
+ if (path_ == null) path_ = new Path(directory)
+ path_
+ }
+
+ private def fs: FileSystem = {
+ if (fs_ == null) fs_ = path.getFileSystem(new Configuration())
+ fs_
+ }
+
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream) {
+ logDebug(this.getClass().getSimpleName + ".readObject used")
+ ois.defaultReadObject()
+ generatedRDDs = new HashMap[Time, RDD[(K,V)]] ()
+ files = new HashMap[Time, Array[String]]
+ }
+
+ /**
+ * A custom version of the DStreamCheckpointData that stores names of
+ * Hadoop files as checkpoint data.
+ */
+ private[streaming]
+ class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) {
+
+ def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]]
+
+ override def update() {
+ hadoopFiles.clear()
+ hadoopFiles ++= files
+ }
+
+ override def cleanup() { }
+
+ override def restore() {
+ hadoopFiles.foreach {
+ case (t, f) => {
+ // Restore the metadata in both files and generatedRDDs
+ logInfo("Restoring files for time " + t + " - " +
+ f.mkString("[", ", ", "]") )
+ files += ((t, f))
+ generatedRDDs += ((t, filesToRDD(f)))
+ }
+ }
+ }
+
+ override def toString() = {
+ "[\n" + hadoopFiles.size + " file sets\n" +
+ hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
+ }
+ }
+}
+
+private[streaming]
+object FileInputDStream {
+ def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
+}
+
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
new file mode 100644
index 0000000000..3166c68760
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.RDD
+
+private[streaming]
+class FilteredDStream[T: ClassManifest](
+ parent: DStream[T],
+ filterFunc: T => Boolean
+ ) extends DStream[T](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideDuration: Duration = parent.slideDuration
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ parent.getOrCompute(validTime).map(_.filter(filterFunc))
+ }
+}
+
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
new file mode 100644
index 0000000000..21950ad6ac
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.RDD
+import org.apache.spark.SparkContext._
+
+private[streaming]
+class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
+ parent: DStream[(K, V)],
+ flatMapValueFunc: V => TraversableOnce[U]
+ ) extends DStream[(K, U)](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideDuration: Duration = parent.slideDuration
+
+ override def compute(validTime: Time): Option[RDD[(K, U)]] = {
+ parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
new file mode 100644
index 0000000000..8377cfe60c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.RDD
+
+private[streaming]
+class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
+ parent: DStream[T],
+ flatMapFunc: T => Traversable[U]
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideDuration: Duration = parent.slideDuration
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
new file mode 100644
index 0000000000..3fb443143c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.streaming.StreamingContext
+
+import org.apache.spark.Utils
+import org.apache.spark.storage.StorageLevel
+
+import org.apache.flume.source.avro.AvroSourceProtocol
+import org.apache.flume.source.avro.AvroFlumeEvent
+import org.apache.flume.source.avro.Status
+import org.apache.avro.ipc.specific.SpecificResponder
+import org.apache.avro.ipc.NettyServer
+
+import scala.collection.JavaConversions._
+
+import java.net.InetSocketAddress
+import java.io.{ObjectInput, ObjectOutput, Externalizable}
+import java.nio.ByteBuffer
+
+private[streaming]
+class FlumeInputDStream[T: ClassManifest](
+ @transient ssc_ : StreamingContext,
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel
+) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
+
+ override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
+ new FlumeReceiver(host, port, storageLevel)
+ }
+}
+
+/**
+ * A wrapper class for AvroFlumeEvent's with a custom serialization format.
+ *
+ * This is necessary because AvroFlumeEvent uses inner data structures
+ * which are not serializable.
+ */
+class SparkFlumeEvent() extends Externalizable {
+ var event : AvroFlumeEvent = new AvroFlumeEvent()
+
+ /* De-serialize from bytes. */
+ def readExternal(in: ObjectInput) {
+ val bodyLength = in.readInt()
+ val bodyBuff = new Array[Byte](bodyLength)
+ in.read(bodyBuff)
+
+ val numHeaders = in.readInt()
+ val headers = new java.util.HashMap[CharSequence, CharSequence]
+
+ for (i <- 0 until numHeaders) {
+ val keyLength = in.readInt()
+ val keyBuff = new Array[Byte](keyLength)
+ in.read(keyBuff)
+ val key : String = Utils.deserialize(keyBuff)
+
+ val valLength = in.readInt()
+ val valBuff = new Array[Byte](valLength)
+ in.read(valBuff)
+ val value : String = Utils.deserialize(valBuff)
+
+ headers.put(key, value)
+ }
+
+ event.setBody(ByteBuffer.wrap(bodyBuff))
+ event.setHeaders(headers)
+ }
+
+ /* Serialize to bytes. */
+ def writeExternal(out: ObjectOutput) {
+ val body = event.getBody.array()
+ out.writeInt(body.length)
+ out.write(body)
+
+ val numHeaders = event.getHeaders.size()
+ out.writeInt(numHeaders)
+ for ((k, v) <- event.getHeaders) {
+ val keyBuff = Utils.serialize(k.toString)
+ out.writeInt(keyBuff.length)
+ out.write(keyBuff)
+ val valBuff = Utils.serialize(v.toString)
+ out.writeInt(valBuff.length)
+ out.write(valBuff)
+ }
+ }
+}
+
+private[streaming] object SparkFlumeEvent {
+ def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
+ val event = new SparkFlumeEvent
+ event.event = in
+ event
+ }
+}
+
+/** A simple server that implements Flume's Avro protocol. */
+private[streaming]
+class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
+ override def append(event : AvroFlumeEvent) : Status = {
+ receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
+ Status.OK
+ }
+
+ override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
+ events.foreach (event =>
+ receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
+ Status.OK
+ }
+}
+
+/** A NetworkReceiver which listens for events using the
+ * Flume Avro interface.*/
+private[streaming]
+class FlumeReceiver(
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[SparkFlumeEvent] {
+
+ lazy val blockGenerator = new BlockGenerator(storageLevel)
+
+ protected override def onStart() {
+ val responder = new SpecificResponder(
+ classOf[AvroSourceProtocol], new FlumeEventServer(this));
+ val server = new NettyServer(responder, new InetSocketAddress(host, port));
+ blockGenerator.start()
+ server.start()
+ logInfo("Flume receiver started")
+ }
+
+ protected override def onStop() {
+ blockGenerator.stop()
+ logInfo("Flume receiver stopped")
+ }
+
+ override def getLocationPreference = Some(host)
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
new file mode 100644
index 0000000000..c1f95650c8
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.RDD
+import org.apache.spark.streaming.{Duration, DStream, Job, Time}
+
+private[streaming]
+class ForEachDStream[T: ClassManifest] (
+ parent: DStream[T],
+ foreachFunc: (RDD[T], Time) => Unit
+ ) extends DStream[Unit](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideDuration: Duration = parent.slideDuration
+
+ override def compute(validTime: Time): Option[RDD[Unit]] = None
+
+ override def generateJob(time: Time): Option[Job] = {
+ parent.getOrCompute(time) match {
+ case Some(rdd) =>
+ val jobFunc = () => {
+ foreachFunc(rdd, time)
+ }
+ Some(new Job(time, jobFunc))
+ case None => None
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
new file mode 100644
index 0000000000..1e4c7e7fde
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.RDD
+
+private[streaming]
+class GlommedDStream[T: ClassManifest](parent: DStream[T])
+ extends DStream[Array[T]](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideDuration: Duration = parent.slideDuration
+
+ override def compute(validTime: Time): Option[RDD[Array[T]]] = {
+ parent.getOrCompute(validTime).map(_.glom())
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
new file mode 100644
index 0000000000..674b27118c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.streaming.{Time, Duration, StreamingContext, DStream}
+
+/**
+ * This is the abstract base class for all input streams. This class provides to methods
+ * start() and stop() which called by the scheduler to start and stop receiving data/
+ * Input streams that can generated RDDs from new data just by running a service on
+ * the driver node (that is, without running a receiver onworker nodes) can be
+ * implemented by directly subclassing this InputDStream. For example,
+ * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory for
+ * new files and generates RDDs on the new files. For implementing input streams
+ * that requires running a receiver on the worker nodes, use NetworkInputDStream
+ * as the parent class.
+ */
+abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
+ extends DStream[T](ssc_) {
+
+ var lastValidTime: Time = null
+
+ /**
+ * Checks whether the 'time' is valid wrt slideDuration for generating RDD.
+ * Additionally it also ensures valid times are in strictly increasing order.
+ * This ensures that InputDStream.compute() is called strictly on increasing
+ * times.
+ */
+ override protected def isTimeValid(time: Time): Boolean = {
+ if (!super.isTimeValid(time)) {
+ false // Time not valid
+ } else {
+ // Time is valid, but check it it is more than lastValidTime
+ if (lastValidTime != null && time < lastValidTime) {
+ logWarning("isTimeValid called with " + time + " where as last valid time is " + lastValidTime)
+ }
+ lastValidTime = time
+ true
+ }
+ }
+
+ override def dependencies = List()
+
+ override def slideDuration: Duration = {
+ if (ssc == null) throw new Exception("ssc is null")
+ if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
+ ssc.graph.batchDuration
+ }
+
+ /** Method called to start receiving data. Subclasses must implement this method. */
+ def start()
+
+ /** Method called to stop receiving data. Subclasses must implement this method. */
+ def stop()
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
new file mode 100644
index 0000000000..51e913675d
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Time, DStreamCheckpointData, StreamingContext}
+
+import java.util.Properties
+import java.util.concurrent.Executors
+
+import kafka.consumer._
+import kafka.message.{Message, MessageSet, MessageAndMetadata}
+import kafka.serializer.Decoder
+import kafka.utils.{Utils, ZKGroupTopicDirs}
+import kafka.utils.ZkUtils._
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient._
+
+import scala.collection.Map
+import scala.collection.mutable.HashMap
+import scala.collection.JavaConversions._
+
+
+/**
+ * Input stream that pulls messages from a Kafka Broker.
+ *
+ * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level.
+ */
+private[streaming]
+class KafkaInputDStream[T: ClassManifest, D <: Decoder[_]: Manifest](
+ @transient ssc_ : StreamingContext,
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
+ ) extends NetworkInputDStream[T](ssc_ ) with Logging {
+
+
+ def getReceiver(): NetworkReceiver[T] = {
+ new KafkaReceiver[T, D](kafkaParams, topics, storageLevel)
+ .asInstanceOf[NetworkReceiver[T]]
+ }
+}
+
+private[streaming]
+class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[Any] {
+
+ // Handles pushing data into the BlockManager
+ lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+ // Connection to Kafka
+ var consumerConnector : ConsumerConnector = null
+
+ def onStop() {
+ blockGenerator.stop()
+ }
+
+ def onStart() {
+
+ blockGenerator.start()
+
+ // In case we are using multiple Threads to handle Kafka Messages
+ val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
+
+ logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("groupid"))
+
+ // Kafka connection properties
+ val props = new Properties()
+ kafkaParams.foreach(param => props.put(param._1, param._2))
+
+ // Create the connection to the cluster
+ logInfo("Connecting to Zookeper: " + kafkaParams("zk.connect"))
+ val consumerConfig = new ConsumerConfig(props)
+ consumerConnector = Consumer.create(consumerConfig)
+ logInfo("Connected to " + kafkaParams("zk.connect"))
+
+ // When autooffset.reset is defined, it is our responsibility to try and whack the
+ // consumer group zk node.
+ if (kafkaParams.contains("autooffset.reset")) {
+ tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid"))
+ }
+
+ // Create Threads for each Topic/Message Stream we are listening
+ val decoder = manifest[D].erasure.newInstance.asInstanceOf[Decoder[T]]
+ val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder)
+
+ // Start the messages handler for each partition
+ topicMessageStreams.values.foreach { streams =>
+ streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
+ }
+ }
+
+ // Handles Kafka Messages
+ private class MessageHandler[T: ClassManifest](stream: KafkaStream[T]) extends Runnable {
+ def run() {
+ logInfo("Starting MessageHandler.")
+ for (msgAndMetadata <- stream) {
+ blockGenerator += msgAndMetadata.message
+ }
+ }
+ }
+
+ // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because
+ // Kafka 0.7.2 only honors this param when the group is not in zookeeper.
+ //
+ // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas'
+ // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest':
+ // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+ private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
+ try {
+ val dir = "/consumers/" + groupId
+ logInfo("Cleaning up temporary zookeeper data under " + dir + ".")
+ val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
+ zk.deleteRecursive(dir)
+ zk.close()
+ } catch {
+ case _ => // swallow
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
new file mode 100644
index 0000000000..1d79d707bb
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.RDD
+
+private[streaming]
+class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
+ parent: DStream[T],
+ mapPartFunc: Iterator[T] => Iterator[U],
+ preservePartitioning: Boolean
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideDuration: Duration = parent.slideDuration
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
new file mode 100644
index 0000000000..312e0c0567
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.RDD
+import org.apache.spark.SparkContext._
+
+private[streaming]
+class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
+ parent: DStream[(K, V)],
+ mapValueFunc: V => U
+ ) extends DStream[(K, U)](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideDuration: Duration = parent.slideDuration
+
+ override def compute(validTime: Time): Option[RDD[(K, U)]] = {
+ parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
new file mode 100644
index 0000000000..af688dde5f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.RDD
+
+private[streaming]
+class MappedDStream[T: ClassManifest, U: ClassManifest] (
+ parent: DStream[T],
+ mapFunc: T => U
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideDuration: Duration = parent.slideDuration
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.map[U](mapFunc))
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
new file mode 100644
index 0000000000..3d68da36a2
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.streaming.{Time, StreamingContext, AddBlocks, RegisterReceiver, DeregisterReceiver}
+
+import org.apache.spark.{Logging, SparkEnv, RDD}
+import org.apache.spark.rdd.BlockRDD
+import org.apache.spark.storage.StorageLevel
+
+import scala.collection.mutable.ArrayBuffer
+
+import java.nio.ByteBuffer
+
+import akka.actor.{Props, Actor}
+import akka.pattern.ask
+import akka.dispatch.Await
+import akka.util.duration._
+import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
+import java.util.concurrent.ArrayBlockingQueue
+
+/**
+ * Abstract class for defining any InputDStream that has to start a receiver on worker
+ * nodes to receive external data. Specific implementations of NetworkInputDStream must
+ * define the getReceiver() function that gets the receiver object of type
+ * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive
+ * data.
+ * @param ssc_ Streaming context that will execute this input stream
+ * @tparam T Class type of the object of this stream
+ */
+abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext)
+ extends InputDStream[T](ssc_) {
+
+ // This is an unique identifier that is used to match the network receiver with the
+ // corresponding network input stream.
+ val id = ssc.getNewNetworkStreamId()
+
+ /**
+ * Gets the receiver object that will be sent to the worker nodes
+ * to receive data. This method needs to defined by any specific implementation
+ * of a NetworkInputDStream.
+ */
+ def getReceiver(): NetworkReceiver[T]
+
+ // Nothing to start or stop as both taken care of by the NetworkInputTracker.
+ def start() {}
+
+ def stop() {}
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ // If this is called for any time before the start time of the context,
+ // then this returns an empty RDD. This may happen when recovering from a
+ // master failure
+ if (validTime >= graph.startTime) {
+ val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
+ Some(new BlockRDD[T](ssc.sc, blockIds))
+ } else {
+ Some(new BlockRDD[T](ssc.sc, Array[String]()))
+ }
+ }
+}
+
+
+private[streaming] sealed trait NetworkReceiverMessage
+private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
+private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage
+private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage
+
+/**
+ * Abstract class of a receiver that can be run on worker nodes to receive external data. See
+ * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] for an explanation.
+ */
+abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Logging {
+
+ initLogging()
+
+ lazy protected val env = SparkEnv.get
+
+ lazy protected val actor = env.actorSystem.actorOf(
+ Props(new NetworkReceiverActor()), "NetworkReceiver-" + streamId)
+
+ lazy protected val receivingThread = Thread.currentThread()
+
+ protected var streamId: Int = -1
+
+ /**
+ * This method will be called to start receiving data. All your receiver
+ * starting code should be implemented by defining this function.
+ */
+ protected def onStart()
+
+ /** This method will be called to stop receiving data. */
+ protected def onStop()
+
+ /** Conveys a placement preference (hostname) for this receiver. */
+ def getLocationPreference() : Option[String] = None
+
+ /**
+ * Starts the receiver. First is accesses all the lazy members to
+ * materialize them. Then it calls the user-defined onStart() method to start
+ * other threads, etc required to receiver the data.
+ */
+ def start() {
+ try {
+ // Access the lazy vals to materialize them
+ env
+ actor
+ receivingThread
+
+ // Call user-defined onStart()
+ onStart()
+ } catch {
+ case ie: InterruptedException =>
+ logInfo("Receiving thread interrupted")
+ //println("Receiving thread interrupted")
+ case e: Exception =>
+ stopOnError(e)
+ }
+ }
+
+ /**
+ * Stops the receiver. First it interrupts the main receiving thread,
+ * that is, the thread that called receiver.start(). Then it calls the user-defined
+ * onStop() method to stop other threads and/or do cleanup.
+ */
+ def stop() {
+ receivingThread.interrupt()
+ onStop()
+ //TODO: terminate the actor
+ }
+
+ /**
+ * Stops the receiver and reports exception to the tracker.
+ * This should be called whenever an exception is to be handled on any thread
+ * of the receiver.
+ */
+ protected def stopOnError(e: Exception) {
+ logError("Error receiving data", e)
+ stop()
+ actor ! ReportError(e.toString)
+ }
+
+
+ /**
+ * Pushes a block (as an ArrayBuffer filled with data) into the block manager.
+ */
+ def pushBlock(blockId: String, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
+ env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
+ actor ! ReportBlock(blockId, metadata)
+ }
+
+ /**
+ * Pushes a block (as bytes) into the block manager.
+ */
+ def pushBlock(blockId: String, bytes: ByteBuffer, metadata: Any, level: StorageLevel) {
+ env.blockManager.putBytes(blockId, bytes, level)
+ actor ! ReportBlock(blockId, metadata)
+ }
+
+ /** A helper actor that communicates with the NetworkInputTracker */
+ private class NetworkReceiverActor extends Actor {
+ logInfo("Attempting to register with tracker")
+ val ip = System.getProperty("spark.driver.host", "localhost")
+ val port = System.getProperty("spark.driver.port", "7077").toInt
+ val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
+ val tracker = env.actorSystem.actorFor(url)
+ val timeout = 5.seconds
+
+ override def preStart() {
+ val future = tracker.ask(RegisterReceiver(streamId, self))(timeout)
+ Await.result(future, timeout)
+ }
+
+ override def receive() = {
+ case ReportBlock(blockId, metadata) =>
+ tracker ! AddBlocks(streamId, Array(blockId), metadata)
+ case ReportError(msg) =>
+ tracker ! DeregisterReceiver(streamId, msg)
+ case StopReceiver(msg) =>
+ stop()
+ tracker ! DeregisterReceiver(streamId, msg)
+ }
+ }
+
+ protected[streaming] def setStreamId(id: Int) {
+ streamId = id
+ }
+
+ /**
+ * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts them into
+ * appropriately named blocks at regular intervals. This class starts two threads,
+ * one to periodically start a new batch and prepare the previous batch of as a block,
+ * the other to push the blocks into the block manager.
+ */
+ class BlockGenerator(storageLevel: StorageLevel)
+ extends Serializable with Logging {
+
+ case class Block(id: String, buffer: ArrayBuffer[T], metadata: Any = null)
+
+ val clock = new SystemClock()
+ val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong
+ val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
+ val blockStorageLevel = storageLevel
+ val blocksForPushing = new ArrayBlockingQueue[Block](1000)
+ val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
+
+ var currentBuffer = new ArrayBuffer[T]
+
+ def start() {
+ blockIntervalTimer.start()
+ blockPushingThread.start()
+ logInfo("Data handler started")
+ }
+
+ def stop() {
+ blockIntervalTimer.stop()
+ blockPushingThread.interrupt()
+ logInfo("Data handler stopped")
+ }
+
+ def += (obj: T) {
+ currentBuffer += obj
+ }
+
+ private def updateCurrentBuffer(time: Long) {
+ try {
+ val newBlockBuffer = currentBuffer
+ currentBuffer = new ArrayBuffer[T]
+ if (newBlockBuffer.size > 0) {
+ val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time - blockInterval)
+ val newBlock = new Block(blockId, newBlockBuffer)
+ blocksForPushing.add(newBlock)
+ }
+ } catch {
+ case ie: InterruptedException =>
+ logInfo("Block interval timer thread interrupted")
+ case e: Exception =>
+ NetworkReceiver.this.stop()
+ }
+ }
+
+ private def keepPushingBlocks() {
+ logInfo("Block pushing thread started")
+ try {
+ while(true) {
+ val block = blocksForPushing.take()
+ NetworkReceiver.this.pushBlock(block.id, block.buffer, block.metadata, storageLevel)
+ }
+ } catch {
+ case ie: InterruptedException =>
+ logInfo("Block pushing thread interrupted")
+ case e: Exception =>
+ NetworkReceiver.this.stop()
+ }
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
new file mode 100644
index 0000000000..15782f5c11
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.streaming.StreamingContext
+
+private[streaming]
+class PluggableInputDStream[T: ClassManifest](
+ @transient ssc_ : StreamingContext,
+ receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) {
+
+ def getReceiver(): NetworkReceiver[T] = {
+ receiver
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
new file mode 100644
index 0000000000..b43ecaeebe
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.RDD
+import org.apache.spark.rdd.UnionRDD
+
+import scala.collection.mutable.Queue
+import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.streaming.{Time, StreamingContext}
+
+private[streaming]
+class QueueInputDStream[T: ClassManifest](
+ @transient ssc: StreamingContext,
+ val queue: Queue[RDD[T]],
+ oneAtATime: Boolean,
+ defaultRDD: RDD[T]
+ ) extends InputDStream[T](ssc) {
+
+ override def start() { }
+
+ override def stop() { }
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ val buffer = new ArrayBuffer[RDD[T]]()
+ if (oneAtATime && queue.size > 0) {
+ buffer += queue.dequeue()
+ } else {
+ buffer ++= queue
+ }
+ if (buffer.size > 0) {
+ if (oneAtATime) {
+ Some(buffer.head)
+ } else {
+ Some(new UnionRDD(ssc.sc, buffer.toSeq))
+ }
+ } else if (defaultRDD != null) {
+ Some(defaultRDD)
+ } else {
+ None
+ }
+ }
+
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
new file mode 100644
index 0000000000..c91f12ecd7
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.nio.channels.{ReadableByteChannel, SocketChannel}
+import java.io.EOFException
+import java.util.concurrent.ArrayBlockingQueue
+
+
+/**
+ * An input stream that reads blocks of serialized objects from a given network address.
+ * The blocks will be inserted directly into the block store. This is the fastest way to get
+ * data into Spark Streaming, though it requires the sender to batch data and serialize it
+ * in the format that the system is configured with.
+ */
+private[streaming]
+class RawInputDStream[T: ClassManifest](
+ @transient ssc_ : StreamingContext,
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ) extends NetworkInputDStream[T](ssc_ ) with Logging {
+
+ def getReceiver(): NetworkReceiver[T] = {
+ new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]]
+ }
+}
+
+private[streaming]
+class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel)
+ extends NetworkReceiver[Any] {
+
+ var blockPushingThread: Thread = null
+
+ override def getLocationPreference = None
+
+ def onStart() {
+ // Open a socket to the target address and keep reading from it
+ logInfo("Connecting to " + host + ":" + port)
+ val channel = SocketChannel.open()
+ channel.configureBlocking(true)
+ channel.connect(new InetSocketAddress(host, port))
+ logInfo("Connected to " + host + ":" + port)
+
+ val queue = new ArrayBlockingQueue[ByteBuffer](2)
+
+ blockPushingThread = new Thread {
+ setDaemon(true)
+ override def run() {
+ var nextBlockNumber = 0
+ while (true) {
+ val buffer = queue.take()
+ val blockId = "input-" + streamId + "-" + nextBlockNumber
+ nextBlockNumber += 1
+ pushBlock(blockId, buffer, null, storageLevel)
+ }
+ }
+ }
+ blockPushingThread.start()
+
+ val lengthBuffer = ByteBuffer.allocate(4)
+ while (true) {
+ lengthBuffer.clear()
+ readFully(channel, lengthBuffer)
+ lengthBuffer.flip()
+ val length = lengthBuffer.getInt()
+ val dataBuffer = ByteBuffer.allocate(length)
+ readFully(channel, dataBuffer)
+ dataBuffer.flip()
+ logInfo("Read a block with " + length + " bytes")
+ queue.put(dataBuffer)
+ }
+ }
+
+ def onStop() {
+ if (blockPushingThread != null) blockPushingThread.interrupt()
+ }
+
+ /** Read a buffer fully from a given Channel */
+ private def readFully(channel: ReadableByteChannel, dest: ByteBuffer) {
+ while (dest.position < dest.limit) {
+ if (channel.read(dest) == -1) {
+ throw new EOFException("End of channel")
+ }
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
new file mode 100644
index 0000000000..b6c672f899
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.streaming.StreamingContext._
+
+import org.apache.spark.RDD
+import org.apache.spark.rdd.{CoGroupedRDD, MapPartitionsRDD}
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.storage.StorageLevel
+
+import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.streaming.{Duration, Interval, Time, DStream}
+
+private[streaming]
+class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
+ parent: DStream[(K, V)],
+ reduceFunc: (V, V) => V,
+ invReduceFunc: (V, V) => V,
+ filterFunc: Option[((K, V)) => Boolean],
+ _windowDuration: Duration,
+ _slideDuration: Duration,
+ partitioner: Partitioner
+ ) extends DStream[(K,V)](parent.ssc) {
+
+ assert(_windowDuration.isMultipleOf(parent.slideDuration),
+ "The window duration of ReducedWindowedDStream (" + _slideDuration + ") " +
+ "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
+ )
+
+ assert(_slideDuration.isMultipleOf(parent.slideDuration),
+ "The slide duration of ReducedWindowedDStream (" + _slideDuration + ") " +
+ "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
+ )
+
+ // Reduce each batch of data using reduceByKey which will be further reduced by window
+ // by ReducedWindowedDStream
+ val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
+
+ // Persist RDDs to memory by default as these RDDs are going to be reused.
+ super.persist(StorageLevel.MEMORY_ONLY_SER)
+ reducedStream.persist(StorageLevel.MEMORY_ONLY_SER)
+
+ def windowDuration: Duration = _windowDuration
+
+ override def dependencies = List(reducedStream)
+
+ override def slideDuration: Duration = _slideDuration
+
+ override val mustCheckpoint = true
+
+ override def parentRememberDuration: Duration = rememberDuration + windowDuration
+
+ override def persist(storageLevel: StorageLevel): DStream[(K,V)] = {
+ super.persist(storageLevel)
+ reducedStream.persist(storageLevel)
+ this
+ }
+
+ override def checkpoint(interval: Duration): DStream[(K, V)] = {
+ super.checkpoint(interval)
+ //reducedStream.checkpoint(interval)
+ this
+ }
+
+ override def compute(validTime: Time): Option[RDD[(K, V)]] = {
+ val reduceF = reduceFunc
+ val invReduceF = invReduceFunc
+
+ val currentTime = validTime
+ val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime)
+ val previousWindow = currentWindow - slideDuration
+
+ logDebug("Window time = " + windowDuration)
+ logDebug("Slide time = " + slideDuration)
+ logDebug("ZeroTime = " + zeroTime)
+ logDebug("Current window = " + currentWindow)
+ logDebug("Previous window = " + previousWindow)
+
+ // _____________________________
+ // | previous window _________|___________________
+ // |___________________| current window | --------------> Time
+ // |_____________________________|
+ //
+ // |________ _________| |________ _________|
+ // | |
+ // V V
+ // old RDDs new RDDs
+ //
+
+ // Get the RDDs of the reduced values in "old time steps"
+ val oldRDDs =
+ reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
+ logDebug("# old RDDs = " + oldRDDs.size)
+
+ // Get the RDDs of the reduced values in "new time steps"
+ val newRDDs =
+ reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
+ logDebug("# new RDDs = " + newRDDs.size)
+
+ // Get the RDD of the reduced value of the previous window
+ val previousWindowRDD =
+ getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
+
+ // Make the list of RDDs that needs to cogrouped together for reducing their reduced values
+ val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
+
+ // Cogroup the reduced RDDs and merge the reduced values
+ val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner)
+ //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
+
+ val numOldValues = oldRDDs.size
+ val numNewValues = newRDDs.size
+
+ val mergeValues = (seqOfValues: Seq[Seq[V]]) => {
+ if (seqOfValues.size != 1 + numOldValues + numNewValues) {
+ throw new Exception("Unexpected number of sequences of reduced values")
+ }
+ // Getting reduced values "old time steps" that will be removed from current window
+ val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head)
+ // Getting reduced values "new time steps"
+ val newValues =
+ (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
+
+ if (seqOfValues(0).isEmpty) {
+ // If previous window's reduce value does not exist, then at least new values should exist
+ if (newValues.isEmpty) {
+ throw new Exception("Neither previous window has value for key, nor new values found. " +
+ "Are you sure your key class hashes consistently?")
+ }
+ // Reduce the new values
+ newValues.reduce(reduceF) // return
+ } else {
+ // Get the previous window's reduced value
+ var tempValue = seqOfValues(0).head
+ // If old values exists, then inverse reduce then from previous value
+ if (!oldValues.isEmpty) {
+ tempValue = invReduceF(tempValue, oldValues.reduce(reduceF))
+ }
+ // If new values exists, then reduce them with previous value
+ if (!newValues.isEmpty) {
+ tempValue = reduceF(tempValue, newValues.reduce(reduceF))
+ }
+ tempValue // return
+ }
+ }
+
+ val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues)
+
+ if (filterFunc.isDefined) {
+ Some(mergedValuesRDD.filter(filterFunc.get))
+ } else {
+ Some(mergedValuesRDD)
+ }
+ }
+}
+
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
new file mode 100644
index 0000000000..3a0bd2acd7
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.{RDD, Partitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.{Duration, DStream, Time}
+
+private[streaming]
+class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
+ parent: DStream[(K,V)],
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiner: (C, C) => C,
+ partitioner: Partitioner
+ ) extends DStream [(K,C)] (parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideDuration: Duration = parent.slideDuration
+
+ override def compute(validTime: Time): Option[RDD[(K,C)]] = {
+ parent.getOrCompute(validTime) match {
+ case Some(rdd) =>
+ Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
+ case None => None
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
new file mode 100644
index 0000000000..e2539c7396
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.NextIterator
+
+import java.io._
+import java.net.Socket
+
+private[streaming]
+class SocketInputDStream[T: ClassManifest](
+ @transient ssc_ : StreamingContext,
+ host: String,
+ port: Int,
+ bytesToObjects: InputStream => Iterator[T],
+ storageLevel: StorageLevel
+ ) extends NetworkInputDStream[T](ssc_) {
+
+ def getReceiver(): NetworkReceiver[T] = {
+ new SocketReceiver(host, port, bytesToObjects, storageLevel)
+ }
+}
+
+private[streaming]
+class SocketReceiver[T: ClassManifest](
+ host: String,
+ port: Int,
+ bytesToObjects: InputStream => Iterator[T],
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[T] {
+
+ lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+
+ override def getLocationPreference = None
+
+ protected def onStart() {
+ logInfo("Connecting to " + host + ":" + port)
+ val socket = new Socket(host, port)
+ logInfo("Connected to " + host + ":" + port)
+ blockGenerator.start()
+ val iterator = bytesToObjects(socket.getInputStream())
+ while(iterator.hasNext) {
+ val obj = iterator.next
+ blockGenerator += obj
+ }
+ }
+
+ protected def onStop() {
+ blockGenerator.stop()
+ }
+
+}
+
+private[streaming]
+object SocketReceiver {
+
+ /**
+ * This methods translates the data from an inputstream (say, from a socket)
+ * to '\n' delimited strings and returns an iterator to access the strings.
+ */
+ def bytesToLines(inputStream: InputStream): Iterator[String] = {
+ val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"))
+ new NextIterator[String] {
+ protected override def getNext() = {
+ val nextValue = dataInputStream.readLine()
+ if (nextValue == null) {
+ finished = true
+ }
+ nextValue
+ }
+
+ protected override def close() {
+ dataInputStream.close()
+ }
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
new file mode 100644
index 0000000000..c1c9f808f0
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.RDD
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Duration, Time, DStream}
+
+private[streaming]
+class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest](
+ parent: DStream[(K, V)],
+ updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
+ partitioner: Partitioner,
+ preservePartitioning: Boolean
+ ) extends DStream[(K, S)](parent.ssc) {
+
+ super.persist(StorageLevel.MEMORY_ONLY_SER)
+
+ override def dependencies = List(parent)
+
+ override def slideDuration: Duration = parent.slideDuration
+
+ override val mustCheckpoint = true
+
+ override def compute(validTime: Time): Option[RDD[(K, S)]] = {
+
+ // Try to get the previous state RDD
+ getOrCompute(validTime - slideDuration) match {
+
+ case Some(prevStateRDD) => { // If previous state RDD exists
+
+ // Try to get the parent RDD
+ parent.getOrCompute(validTime) match {
+ case Some(parentRDD) => { // If parent RDD exists, then compute as usual
+
+ // Define the function for the mapPartition operation on cogrouped RDD;
+ // first map the cogrouped tuple to tuples of required type,
+ // and then apply the update function
+ val updateFuncLocal = updateFunc
+ val finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => {
+ val i = iterator.map(t => {
+ (t._1, t._2._1, t._2._2.headOption)
+ })
+ updateFuncLocal(i)
+ }
+ val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
+ val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
+ //logDebug("Generating state RDD for time " + validTime)
+ return Some(stateRDD)
+ }
+ case None => { // If parent RDD does not exist
+
+ // Re-apply the update function to the old state RDD
+ val updateFuncLocal = updateFunc
+ val finalFunc = (iterator: Iterator[(K, S)]) => {
+ val i = iterator.map(t => (t._1, Seq[V](), Option(t._2)))
+ updateFuncLocal(i)
+ }
+ val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)
+ return Some(stateRDD)
+ }
+ }
+ }
+
+ case None => { // If previous session RDD does not exist (first input data)
+
+ // Try to get the parent RDD
+ parent.getOrCompute(validTime) match {
+ case Some(parentRDD) => { // If parent RDD exists, then compute as usual
+
+ // Define the function for the mapPartition operation on grouped RDD;
+ // first map the grouped tuple to tuples of required type,
+ // and then apply the update function
+ val updateFuncLocal = updateFunc
+ val finalFunc = (iterator: Iterator[(K, Seq[V])]) => {
+ updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, None)))
+ }
+
+ val groupedRDD = parentRDD.groupByKey(partitioner)
+ val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)
+ //logDebug("Generating state RDD for time " + validTime + " (first)")
+ return Some(sessionRDD)
+ }
+ case None => { // If parent RDD does not exist, then nothing to do!
+ //logDebug("Not generating state RDD (no previous state, no parent)")
+ return None
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
new file mode 100644
index 0000000000..edba2032b4
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.RDD
+import org.apache.spark.streaming.{Duration, DStream, Time}
+
+private[streaming]
+class TransformedDStream[T: ClassManifest, U: ClassManifest] (
+ parent: DStream[T],
+ transformFunc: (RDD[T], Time) => RDD[U]
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideDuration: Duration = parent.slideDuration
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(transformFunc(_, validTime))
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala
new file mode 100644
index 0000000000..387e15b0e6
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark._
+import org.apache.spark.streaming._
+import storage.StorageLevel
+import twitter4j._
+import twitter4j.auth.Authorization
+import java.util.prefs.Preferences
+import twitter4j.conf.ConfigurationBuilder
+import twitter4j.conf.PropertyConfiguration
+import twitter4j.auth.OAuthAuthorization
+import twitter4j.auth.AccessToken
+
+/* A stream of Twitter statuses, potentially filtered by one or more keywords.
+*
+* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials.
+* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
+* such that this may return a sampled subset of all tweets during each interval.
+*
+* If no Authorization object is provided, initializes OAuth authorization using the system
+* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret.
+*/
+private[streaming]
+class TwitterInputDStream(
+ @transient ssc_ : StreamingContext,
+ twitterAuth: Option[Authorization],
+ filters: Seq[String],
+ storageLevel: StorageLevel
+ ) extends NetworkInputDStream[Status](ssc_) {
+
+ private def createOAuthAuthorization(): Authorization = {
+ new OAuthAuthorization(new ConfigurationBuilder().build())
+ }
+
+ private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
+
+ override def getReceiver(): NetworkReceiver[Status] = {
+ new TwitterReceiver(authorization, filters, storageLevel)
+ }
+}
+
+private[streaming]
+class TwitterReceiver(
+ twitterAuth: Authorization,
+ filters: Seq[String],
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[Status] {
+
+ var twitterStream: TwitterStream = _
+ lazy val blockGenerator = new BlockGenerator(storageLevel)
+
+ protected override def onStart() {
+ blockGenerator.start()
+ twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
+ twitterStream.addListener(new StatusListener {
+ def onStatus(status: Status) = {
+ blockGenerator += status
+ }
+ // Unimplemented
+ def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
+ def onTrackLimitationNotice(i: Int) {}
+ def onScrubGeo(l: Long, l1: Long) {}
+ def onStallWarning(stallWarning: StallWarning) {}
+ def onException(e: Exception) { stopOnError(e) }
+ })
+
+ val query: FilterQuery = new FilterQuery
+ if (filters.size > 0) {
+ query.track(filters.toArray)
+ twitterStream.filter(query)
+ } else {
+ twitterStream.sample()
+ }
+ logInfo("Twitter receiver started")
+ }
+
+ protected override def onStop() {
+ blockGenerator.stop()
+ twitterStream.shutdown()
+ logInfo("Twitter receiver stopped")
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
new file mode 100644
index 0000000000..97eab97b2f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.RDD
+import collection.mutable.ArrayBuffer
+import org.apache.spark.rdd.UnionRDD
+
+private[streaming]
+class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
+ extends DStream[T](parents.head.ssc) {
+
+ if (parents.length == 0) {
+ throw new IllegalArgumentException("Empty array of parents")
+ }
+
+ if (parents.map(_.ssc).distinct.size > 1) {
+ throw new IllegalArgumentException("Array of parents have different StreamingContexts")
+ }
+
+ if (parents.map(_.slideDuration).distinct.size > 1) {
+ throw new IllegalArgumentException("Array of parents have different slide times")
+ }
+
+ override def dependencies = parents.toList
+
+ override def slideDuration: Duration = parents.head.slideDuration
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ val rdds = new ArrayBuffer[RDD[T]]()
+ parents.map(_.getOrCompute(validTime)).foreach(_ match {
+ case Some(rdd) => rdds += rdd
+ case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
+ })
+ if (rdds.size > 0) {
+ Some(new UnionRDD(ssc.sc, rdds))
+ } else {
+ None
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
new file mode 100644
index 0000000000..dbbea39e81
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.RDD
+import org.apache.spark.rdd.UnionRDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Duration, Interval, Time, DStream}
+
+private[streaming]
+class WindowedDStream[T: ClassManifest](
+ parent: DStream[T],
+ _windowDuration: Duration,
+ _slideDuration: Duration)
+ extends DStream[T](parent.ssc) {
+
+ if (!_windowDuration.isMultipleOf(parent.slideDuration))
+ throw new Exception("The window duration of WindowedDStream (" + _slideDuration + ") " +
+ "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+
+ if (!_slideDuration.isMultipleOf(parent.slideDuration))
+ throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
+ "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+
+ parent.persist(StorageLevel.MEMORY_ONLY_SER)
+
+ def windowDuration: Duration = _windowDuration
+
+ override def dependencies = List(parent)
+
+ override def slideDuration: Duration = _slideDuration
+
+ override def parentRememberDuration: Duration = rememberDuration + windowDuration
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
+ Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
+ }
+}
+
+
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
new file mode 100644
index 0000000000..4b5d8c467e
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receivers
+
+import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy }
+import akka.actor.{ actorRef2Scala, ActorRef }
+import akka.actor.{ PossiblyHarmful, OneForOneStrategy }
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.NetworkReceiver
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable.ArrayBuffer
+
+/** A helper with set of defaults for supervisor strategy **/
+object ReceiverSupervisorStrategy {
+
+ import akka.util.duration._
+ import akka.actor.SupervisorStrategy._
+
+ val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
+ 15 millis) {
+ case _: RuntimeException ⇒ Restart
+ case _: Exception ⇒ Escalate
+ }
+}
+
+/**
+ * A receiver trait to be mixed in with your Actor to gain access to
+ * pushBlock API.
+ *
+ * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
+ *
+ * @example {{{
+ * class MyActor extends Actor with Receiver{
+ * def receive {
+ * case anything :String ⇒ pushBlock(anything)
+ * }
+ * }
+ * //Can be plugged in actorStream as follows
+ * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
+ *
+ * }}}
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e parametrized type of push block and InputDStream
+ * should be same.
+ *
+ */
+trait Receiver { self: Actor ⇒
+ def pushBlock[T: ClassManifest](iter: Iterator[T]) {
+ context.parent ! Data(iter)
+ }
+
+ def pushBlock[T: ClassManifest](data: T) {
+ context.parent ! Data(data)
+ }
+
+}
+
+/**
+ * Statistics for querying the supervisor about state of workers
+ */
+case class Statistics(numberOfMsgs: Int,
+ numberOfWorkers: Int,
+ numberOfHiccups: Int,
+ otherInfo: String)
+
+/** Case class to receive data sent by child actors **/
+private[streaming] case class Data[T: ClassManifest](data: T)
+
+/**
+ * Provides Actors as receivers for receiving stream.
+ *
+ * As Actors can also be used to receive data from almost any stream source.
+ * A nice set of abstraction(s) for actors as receivers is already provided for
+ * a few general cases. It is thus exposed as an API where user may come with
+ * his own Actor to run as receiver for Spark Streaming input source.
+ *
+ * This starts a supervisor actor which starts workers and also provides
+ * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance].
+ *
+ * Here's a way to start more supervisor/workers as its children.
+ *
+ * @example {{{
+ * context.parent ! Props(new Supervisor)
+ * }}} OR {{{
+ * context.parent ! Props(new Worker,"Worker")
+ * }}}
+ *
+ *
+ */
+private[streaming] class ActorReceiver[T: ClassManifest](
+ props: Props,
+ name: String,
+ storageLevel: StorageLevel,
+ receiverSupervisorStrategy: SupervisorStrategy)
+ extends NetworkReceiver[T] {
+
+ protected lazy val blocksGenerator: BlockGenerator =
+ new BlockGenerator(storageLevel)
+
+ protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor),
+ "Supervisor" + streamId)
+
+ private class Supervisor extends Actor {
+
+ override val supervisorStrategy = receiverSupervisorStrategy
+ val worker = context.actorOf(props, name)
+ logInfo("Started receiver worker at:" + worker.path)
+
+ val n: AtomicInteger = new AtomicInteger(0)
+ val hiccups: AtomicInteger = new AtomicInteger(0)
+
+ def receive = {
+
+ case Data(iter: Iterator[_]) ⇒ pushBlock(iter.asInstanceOf[Iterator[T]])
+
+ case Data(msg) ⇒
+ blocksGenerator += msg.asInstanceOf[T]
+ n.incrementAndGet
+
+ case props: Props ⇒
+ val worker = context.actorOf(props)
+ logInfo("Started receiver worker at:" + worker.path)
+ sender ! worker
+
+ case (props: Props, name: String) ⇒
+ val worker = context.actorOf(props, name)
+ logInfo("Started receiver worker at:" + worker.path)
+ sender ! worker
+
+ case _: PossiblyHarmful => hiccups.incrementAndGet()
+
+ case _: Statistics ⇒
+ val workers = context.children
+ sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n"))
+
+ }
+ }
+
+ protected def pushBlock(iter: Iterator[T]) {
+ val buffer = new ArrayBuffer[T]
+ buffer ++= iter
+ pushBlock("block-" + streamId + "-" + System.nanoTime(), buffer, null, storageLevel)
+ }
+
+ protected def onStart() = {
+ blocksGenerator.start()
+ supervisor
+ logInfo("Supervision tree for receivers initialized at:" + supervisor.path)
+ }
+
+ protected def onStop() = {
+ supervisor ! PoisonPill
+ }
+
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
new file mode 100644
index 0000000000..043bb8c8bf
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receivers
+
+import akka.actor.Actor
+import akka.zeromq._
+
+import org.apache.spark.Logging
+
+/**
+ * A receiver to subscribe to ZeroMQ stream.
+ */
+private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T])
+ extends Actor with Receiver with Logging {
+
+ override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self),
+ Connect(publisherUrl), subscribe)
+
+ def receive: Receive = {
+
+ case Connecting ⇒ logInfo("connecting ...")
+
+ case m: ZMQMessage ⇒
+ logDebug("Received message for:" + m.firstFrameAsString)
+
+ //We ignore first frame for processing as it is the topic
+ val bytes = m.frames.tail.map(_.payload)
+ pushBlock(bytesToObjects(bytes))
+
+ case Closed ⇒ logInfo("received closed ")
+
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
new file mode 100644
index 0000000000..f67bb2f6ac
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+private[streaming]
+trait Clock {
+ def currentTime(): Long
+ def waitTillTime(targetTime: Long): Long
+}
+
+private[streaming]
+class SystemClock() extends Clock {
+
+ val minPollTime = 25L
+
+ def currentTime(): Long = {
+ System.currentTimeMillis()
+ }
+
+ def waitTillTime(targetTime: Long): Long = {
+ var currentTime = 0L
+ currentTime = System.currentTimeMillis()
+
+ var waitTime = targetTime - currentTime
+ if (waitTime <= 0) {
+ return currentTime
+ }
+
+ val pollTime = {
+ if (waitTime / 10.0 > minPollTime) {
+ (waitTime / 10.0).toLong
+ } else {
+ minPollTime
+ }
+ }
+
+
+ while (true) {
+ currentTime = System.currentTimeMillis()
+ waitTime = targetTime - currentTime
+
+ if (waitTime <= 0) {
+
+ return currentTime
+ }
+ val sleepTime =
+ if (waitTime < pollTime) {
+ waitTime
+ } else {
+ pollTime
+ }
+ Thread.sleep(sleepTime)
+ }
+ return -1
+ }
+}
+
+private[streaming]
+class ManualClock() extends Clock {
+
+ var time = 0L
+
+ def currentTime() = time
+
+ def setTime(timeToSet: Long) = {
+ this.synchronized {
+ time = timeToSet
+ this.notifyAll()
+ }
+ }
+
+ def addToTime(timeToAdd: Long) = {
+ this.synchronized {
+ time += timeToAdd
+ this.notifyAll()
+ }
+ }
+ def waitTillTime(targetTime: Long): Long = {
+ this.synchronized {
+ while (time < targetTime) {
+ this.wait(100)
+ }
+ }
+ return currentTime()
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
new file mode 100644
index 0000000000..50d72298e4
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import org.apache.spark.{Logging, RDD}
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.dstream.ForEachDStream
+import StreamingContext._
+
+import scala.util.Random
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+
+import java.io.{File, ObjectInputStream, IOException}
+import java.util.UUID
+
+import com.google.common.io.Files
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.fs.{FileUtil, FileSystem, Path}
+import org.apache.hadoop.conf.Configuration
+
+
+private[streaming]
+object MasterFailureTest extends Logging {
+ initLogging()
+
+ @volatile var killed = false
+ @volatile var killCount = 0
+
+ def main(args: Array[String]) {
+ if (args.size < 2) {
+ println(
+ "Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]")
+ System.exit(1)
+ }
+ val directory = args(0)
+ val numBatches = args(1).toInt
+ val batchDuration = if (args.size > 2) Milliseconds(args(2).toInt) else Seconds(1)
+
+ println("\n\n========================= MAP TEST =========================\n\n")
+ testMap(directory, numBatches, batchDuration)
+
+ println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n")
+ testUpdateStateByKey(directory, numBatches, batchDuration)
+
+ println("\n\nSUCCESS\n\n")
+ }
+
+ def testMap(directory: String, numBatches: Int, batchDuration: Duration) {
+ // Input: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ...
+ val input = (1 to numBatches).map(_.toString).toSeq
+ // Expected output: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ...
+ val expectedOutput = (1 to numBatches)
+
+ val operation = (st: DStream[String]) => st.map(_.toInt)
+
+ // Run streaming operation with multiple master failures
+ val output = testOperation(directory, batchDuration, input, operation, expectedOutput)
+
+ logInfo("Expected output, size = " + expectedOutput.size)
+ logInfo(expectedOutput.mkString("[", ",", "]"))
+ logInfo("Output, size = " + output.size)
+ logInfo(output.mkString("[", ",", "]"))
+
+ // Verify whether all the values of the expected output is present
+ // in the output
+ assert(output.distinct.toSet == expectedOutput.toSet)
+ }
+
+
+ def testUpdateStateByKey(directory: String, numBatches: Int, batchDuration: Duration) {
+ // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
+ val input = (1 to numBatches).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
+ // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
+ val expectedOutput = (1L to numBatches).map(i => (1L to i).reduce(_ + _)).map(j => ("a", j))
+
+ val operation = (st: DStream[String]) => {
+ val updateFunc = (values: Seq[Long], state: Option[Long]) => {
+ Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L))
+ }
+ st.flatMap(_.split(" "))
+ .map(x => (x, 1L))
+ .updateStateByKey[Long](updateFunc)
+ .checkpoint(batchDuration * 5)
+ }
+
+ // Run streaming operation with multiple master failures
+ val output = testOperation(directory, batchDuration, input, operation, expectedOutput)
+
+ logInfo("Expected output, size = " + expectedOutput.size + "\n" + expectedOutput)
+ logInfo("Output, size = " + output.size + "\n" + output)
+
+ // Verify whether all the values in the output are among the expected output values
+ output.foreach(o =>
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+
+ // Verify whether the last expected output value has been generated, there by
+ // confirming that none of the inputs have been missed
+ assert(output.last == expectedOutput.last)
+ }
+
+ /**
+ * Tests stream operation with multiple master failures, and verifies whether the
+ * final set of output values is as expected or not.
+ */
+ def testOperation[T: ClassManifest](
+ directory: String,
+ batchDuration: Duration,
+ input: Seq[String],
+ operation: DStream[String] => DStream[T],
+ expectedOutput: Seq[T]
+ ): Seq[T] = {
+
+ // Just making sure that the expected output does not have duplicates
+ assert(expectedOutput.distinct.toSet == expectedOutput.toSet)
+
+ // Setup the stream computation with the given operation
+ val (ssc, checkpointDir, testDir) = setupStreams(directory, batchDuration, operation)
+
+ // Start generating files in the a different thread
+ val fileGeneratingThread = new FileGeneratingThread(input, testDir, batchDuration.milliseconds)
+ fileGeneratingThread.start()
+
+ // Run the streams and repeatedly kill it until the last expected output
+ // has been generated, or until it has run for twice the expected time
+ val lastExpectedOutput = expectedOutput.last
+ val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2
+ val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun)
+
+ // Delete directories
+ fileGeneratingThread.join()
+ val fs = checkpointDir.getFileSystem(new Configuration())
+ fs.delete(checkpointDir, true)
+ fs.delete(testDir, true)
+ logInfo("Finished test after " + killCount + " failures")
+ mergedOutput
+ }
+
+ /**
+ * Sets up the stream computation with the given operation, directory (local or HDFS),
+ * and batch duration. Returns the streaming context and the directory to which
+ * files should be written for testing.
+ */
+ private def setupStreams[T: ClassManifest](
+ directory: String,
+ batchDuration: Duration,
+ operation: DStream[String] => DStream[T]
+ ): (StreamingContext, Path, Path) = {
+ // Reset all state
+ reset()
+
+ // Create the directories for this test
+ val uuid = UUID.randomUUID().toString
+ val rootDir = new Path(directory, uuid)
+ val fs = rootDir.getFileSystem(new Configuration())
+ val checkpointDir = new Path(rootDir, "checkpoint")
+ val testDir = new Path(rootDir, "test")
+ fs.mkdirs(checkpointDir)
+ fs.mkdirs(testDir)
+
+ // Setup the streaming computation with the given operation
+ System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
+ var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map())
+ ssc.checkpoint(checkpointDir.toString)
+ val inputStream = ssc.textFileStream(testDir.toString)
+ val operatedStream = operation(inputStream)
+ val outputStream = new TestOutputStream(operatedStream)
+ ssc.registerOutputStream(outputStream)
+ (ssc, checkpointDir, testDir)
+ }
+
+
+ /**
+ * Repeatedly starts and kills the streaming context until timed out or
+ * the last expected output is generated. Finally, return
+ */
+ private def runStreams[T: ClassManifest](
+ ssc_ : StreamingContext,
+ lastExpectedOutput: T,
+ maxTimeToRun: Long
+ ): Seq[T] = {
+
+ var ssc = ssc_
+ var totalTimeRan = 0L
+ var isLastOutputGenerated = false
+ var isTimedOut = false
+ val mergedOutput = new ArrayBuffer[T]()
+ val checkpointDir = ssc.checkpointDir
+ var batchDuration = ssc.graph.batchDuration
+
+ while(!isLastOutputGenerated && !isTimedOut) {
+ // Get the output buffer
+ val outputBuffer = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[T]].output
+ def output = outputBuffer.flatMap(x => x)
+
+ // Start the thread to kill the streaming after some time
+ killed = false
+ val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10)
+ killingThread.start()
+
+ var timeRan = 0L
+ try {
+ // Start the streaming computation and let it run while ...
+ // (i) StreamingContext has not been shut down yet
+ // (ii) The last expected output has not been generated yet
+ // (iii) Its not timed out yet
+ System.clearProperty("spark.streaming.clock")
+ System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
+ ssc.start()
+ val startTime = System.currentTimeMillis()
+ while (!killed && !isLastOutputGenerated && !isTimedOut) {
+ Thread.sleep(100)
+ timeRan = System.currentTimeMillis() - startTime
+ isLastOutputGenerated = (!output.isEmpty && output.last == lastExpectedOutput)
+ isTimedOut = (timeRan + totalTimeRan > maxTimeToRun)
+ }
+ } catch {
+ case e: Exception => logError("Error running streaming context", e)
+ }
+ if (killingThread.isAlive) killingThread.interrupt()
+ ssc.stop()
+
+ logInfo("Has been killed = " + killed)
+ logInfo("Is last output generated = " + isLastOutputGenerated)
+ logInfo("Is timed out = " + isTimedOut)
+
+ // Verify whether the output of each batch has only one element or no element
+ // and then merge the new output with all the earlier output
+ mergedOutput ++= output
+ totalTimeRan += timeRan
+ logInfo("New output = " + output)
+ logInfo("Merged output = " + mergedOutput)
+ logInfo("Time ran = " + timeRan)
+ logInfo("Total time ran = " + totalTimeRan)
+
+ if (!isLastOutputGenerated && !isTimedOut) {
+ val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 10)
+ logInfo(
+ "\n-------------------------------------------\n" +
+ " Restarting stream computation in " + sleepTime + " ms " +
+ "\n-------------------------------------------\n"
+ )
+ Thread.sleep(sleepTime)
+ // Recreate the streaming context from checkpoint
+ ssc = new StreamingContext(checkpointDir)
+ }
+ }
+ mergedOutput
+ }
+
+ /**
+ * Verifies the output value are the same as expected. Since failures can lead to
+ * a batch being processed twice, a batches output may appear more than once
+ * consecutively. To avoid getting confused with those, we eliminate consecutive
+ * duplicate batch outputs of values from the `output`. As a result, the
+ * expected output should not have consecutive batches with the same values as output.
+ */
+ private def verifyOutput[T: ClassManifest](output: Seq[T], expectedOutput: Seq[T]) {
+ // Verify whether expected outputs do not consecutive batches with same output
+ for (i <- 0 until expectedOutput.size - 1) {
+ assert(expectedOutput(i) != expectedOutput(i+1),
+ "Expected output has consecutive duplicate sequence of values")
+ }
+
+ // Log the output
+ println("Expected output, size = " + expectedOutput.size)
+ println(expectedOutput.mkString("[", ",", "]"))
+ println("Output, size = " + output.size)
+ println(output.mkString("[", ",", "]"))
+
+ // Match the output with the expected output
+ output.foreach(o =>
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+ }
+
+ /** Resets counter to prepare for the test */
+ private def reset() {
+ killed = false
+ killCount = 0
+ }
+}
+
+/**
+ * This is a output stream just for testing. All the output is collected into a
+ * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ */
+private[streaming]
+class TestOutputStream[T: ClassManifest](
+ parent: DStream[T],
+ val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
+ ) extends ForEachDStream[T](
+ parent,
+ (rdd: RDD[T], t: Time) => {
+ val collected = rdd.collect()
+ output += collected
+ }
+ ) {
+
+ // This is to clear the output buffer every it is read from a checkpoint
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream) {
+ ois.defaultReadObject()
+ output.clear()
+ }
+}
+
+
+/**
+ * Thread to kill streaming context after a random period of time.
+ */
+private[streaming]
+class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging {
+ initLogging()
+
+ override def run() {
+ try {
+ // If it is the first killing, then allow the first checkpoint to be created
+ var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 2000
+ val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime)
+ logInfo("Kill wait time = " + killWaitTime)
+ Thread.sleep(killWaitTime)
+ logInfo(
+ "\n---------------------------------------\n" +
+ "Killing streaming context after " + killWaitTime + " ms" +
+ "\n---------------------------------------\n"
+ )
+ if (ssc != null) {
+ ssc.stop()
+ MasterFailureTest.killed = true
+ MasterFailureTest.killCount += 1
+ }
+ logInfo("Killing thread finished normally")
+ } catch {
+ case ie: InterruptedException => logInfo("Killing thread interrupted")
+ case e: Exception => logWarning("Exception in killing thread", e)
+ }
+
+ }
+}
+
+
+/**
+ * Thread to generate input files periodically with the desired text.
+ */
+private[streaming]
+class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
+ extends Thread with Logging {
+ initLogging()
+
+ override def run() {
+ val localTestDir = Files.createTempDir()
+ var fs = testDir.getFileSystem(new Configuration())
+ val maxTries = 3
+ try {
+ Thread.sleep(5000) // To make sure that all the streaming context has been set up
+ for (i <- 0 until input.size) {
+ // Write the data to a local file and then move it to the target test directory
+ val localFile = new File(localTestDir, (i+1).toString)
+ val hadoopFile = new Path(testDir, (i+1).toString)
+ val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString)
+ FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
+ var tries = 0
+ var done = false
+ while (!done && tries < maxTries) {
+ tries += 1
+ try {
+ // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+ fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile)
+ fs.rename(tempHadoopFile, hadoopFile)
+ done = true
+ } catch {
+ case ioe: IOException => {
+ fs = testDir.getFileSystem(new Configuration())
+ logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
+ }
+ }
+ }
+ if (!done)
+ logError("Could not generate file " + hadoopFile)
+ else
+ logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
+ Thread.sleep(interval)
+ localFile.delete()
+ }
+ logInfo("File generating thread finished normally")
+ } catch {
+ case ie: InterruptedException => logInfo("File generating thread interrupted")
+ case e: Exception => logWarning("File generating in killing thread", e)
+ } finally {
+ fs.close()
+ }
+ }
+}
+
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
new file mode 100644
index 0000000000..4e6ce6eabd
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+import scala.collection.JavaConversions.mapAsScalaMap
+
+object RawTextHelper {
+
+ /**
+ * Splits lines and counts the words in them using specialized object-to-long hashmap
+ * (to avoid boxing-unboxing overhead of Long in java/scala HashMap)
+ */
+ def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
+ val map = new OLMap[String]
+ var i = 0
+ var j = 0
+ while (iter.hasNext) {
+ val s = iter.next()
+ i = 0
+ while (i < s.length) {
+ j = i
+ while (j < s.length && s.charAt(j) != ' ') {
+ j += 1
+ }
+ if (j > i) {
+ val w = s.substring(i, j)
+ val c = map.getLong(w)
+ map.put(w, c + 1)
+ }
+ i = j
+ while (i < s.length && s.charAt(i) == ' ') {
+ i += 1
+ }
+ }
+ }
+ map.toIterator.map{case (k, v) => (k, v)}
+ }
+
+ /**
+ * Gets the top k words in terms of word counts. Assumes that each word exists only once
+ * in the `data` iterator (that is, the counts have been reduced).
+ */
+ def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = {
+ val taken = new Array[(String, Long)](k)
+
+ var i = 0
+ var len = 0
+ var done = false
+ var value: (String, Long) = null
+ var swap: (String, Long) = null
+ var count = 0
+
+ while(data.hasNext) {
+ value = data.next
+ if (value != null) {
+ count += 1
+ if (len == 0) {
+ taken(0) = value
+ len = 1
+ } else if (len < k || value._2 > taken(len - 1)._2) {
+ if (len < k) {
+ len += 1
+ }
+ taken(len - 1) = value
+ i = len - 1
+ while(i > 0 && taken(i - 1)._2 < taken(i)._2) {
+ swap = taken(i)
+ taken(i) = taken(i-1)
+ taken(i - 1) = swap
+ i -= 1
+ }
+ }
+ }
+ }
+ return taken.toIterator
+ }
+
+ /**
+ * Warms up the SparkContext in master and slave by running tasks to force JIT kick in
+ * before real workload starts.
+ */
+ def warmUp(sc: SparkContext) {
+ for(i <- 0 to 1) {
+ sc.parallelize(1 to 200000, 1000)
+ .map(_ % 1331).map(_.toString)
+ .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
+ .count()
+ }
+ }
+
+ def add(v1: Long, v2: Long) = (v1 + v2)
+
+ def subtract(v1: Long, v2: Long) = (v1 - v2)
+
+ def max(v1: Long, v2: Long) = math.max(v1, v2)
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
new file mode 100644
index 0000000000..249f6a22ae
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+import org.apache.spark.util.{RateLimitedOutputStream, IntParam}
+import java.net.ServerSocket
+import org.apache.spark.{Logging, KryoSerializer}
+import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
+import scala.io.Source
+import java.io.IOException
+
+/**
+ * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a
+ * specified rate. Used to feed data into RawInputDStream.
+ */
+object RawTextSender extends Logging {
+ def main(args: Array[String]) {
+ if (args.length != 4) {
+ System.err.println("Usage: RawTextSender <port> <file> <blockSize> <bytesPerSec>")
+ System.exit(1)
+ }
+ // Parse the arguments using a pattern match
+ val Array(IntParam(port), file, IntParam(blockSize), IntParam(bytesPerSec)) = args
+
+ // Repeat the input data multiple times to fill in a buffer
+ val lines = Source.fromFile(file).getLines().toArray
+ val bufferStream = new FastByteArrayOutputStream(blockSize + 1000)
+ val ser = new KryoSerializer().newInstance()
+ val serStream = ser.serializeStream(bufferStream)
+ var i = 0
+ while (bufferStream.position < blockSize) {
+ serStream.writeObject(lines(i))
+ i = (i + 1) % lines.length
+ }
+ bufferStream.trim()
+ val array = bufferStream.array
+
+ val countBuf = ByteBuffer.wrap(new Array[Byte](4))
+ countBuf.putInt(array.length)
+ countBuf.flip()
+
+ val serverSocket = new ServerSocket(port)
+ logInfo("Listening on port " + port)
+
+ while (true) {
+ val socket = serverSocket.accept()
+ logInfo("Got a new connection")
+ val out = new RateLimitedOutputStream(socket.getOutputStream, bytesPerSec)
+ try {
+ while (true) {
+ out.write(countBuf.array)
+ out.write(array)
+ }
+ } catch {
+ case e: IOException =>
+ logError("Client disconnected")
+ socket.close()
+ }
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
new file mode 100644
index 0000000000..d644240405
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+private[streaming]
+class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
+
+ private val minPollTime = 25L
+
+ private val pollTime = {
+ if (period / 10.0 > minPollTime) {
+ (period / 10.0).toLong
+ } else {
+ minPollTime
+ }
+ }
+
+ private val thread = new Thread() {
+ override def run() { loop }
+ }
+
+ private var nextTime = 0L
+
+ def getStartTime(): Long = {
+ (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
+ }
+
+ def getRestartTime(originalStartTime: Long): Long = {
+ val gap = clock.currentTime - originalStartTime
+ (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
+ }
+
+ def start(startTime: Long): Long = {
+ nextTime = startTime
+ thread.start()
+ nextTime
+ }
+
+ def start(): Long = {
+ start(getStartTime())
+ }
+
+ def stop() {
+ thread.interrupt()
+ }
+
+ private def loop() {
+ try {
+ while (true) {
+ clock.waitTillTime(nextTime)
+ callback(nextTime)
+ nextTime += period
+ }
+
+ } catch {
+ case e: InterruptedException =>
+ }
+ }
+}
+
+private[streaming]
+object RecurringTimer {
+
+ def main(args: Array[String]) {
+ var lastRecurTime = 0L
+ val period = 1000
+
+ def onRecur(time: Long) {
+ val currentTime = System.currentTimeMillis()
+ println("" + currentTime + ": " + (currentTime - lastRecurTime))
+ lastRecurTime = currentTime
+ }
+ val timer = new RecurringTimer(new SystemClock(), period, onRecur)
+ timer.start()
+ Thread.sleep(30 * 1000)
+ timer.stop()
+ }
+}
+