diff options
Diffstat (limited to 'streaming')
63 files changed, 8269 insertions, 0 deletions
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar Binary files differnew file mode 100644 index 0000000000..65f79925a4 --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 new file mode 100644 index 0000000000..29f45f4adb --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 @@ -0,0 +1 @@ +18876b8bc2e4cef28b6d191aa49d963f
\ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 new file mode 100644 index 0000000000..e3bd62bac0 --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 @@ -0,0 +1 @@ +06b27270ffa52250a2c08703b397c99127b72060
\ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom new file mode 100644 index 0000000000..082d35726a --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom @@ -0,0 +1,9 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka</artifactId> + <version>0.7.2-spark</version> + <description>POM was created from install:install-file</description> +</project> diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 new file mode 100644 index 0000000000..92c4132b5b --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 @@ -0,0 +1 @@ +7bc4322266e6032bdf9ef6eebdd8097d
\ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 new file mode 100644 index 0000000000..8a1d8a097a --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 @@ -0,0 +1 @@ +d0f79e8eff0db43ca7bcf7dce2c8cd2972685c9d
\ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml new file mode 100644 index 0000000000..720cd51c2f --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml @@ -0,0 +1,12 @@ +<?xml version="1.0" encoding="UTF-8"?> +<metadata> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka</artifactId> + <versioning> + <release>0.7.2-spark</release> + <versions> + <version>0.7.2-spark</version> + </versions> + <lastUpdated>20130121015225</lastUpdated> + </versioning> +</metadata> diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5 b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5 new file mode 100644 index 0000000000..a4ce5dc9e8 --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5 @@ -0,0 +1 @@ +e2b9c7c5f6370dd1d21a0aae5e8dcd77
\ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1 b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1 new file mode 100644 index 0000000000..b869eaf2a6 --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1 @@ -0,0 +1 @@ +2a4341da936b6c07a09383d17ffb185ac558ee91
\ No newline at end of file diff --git a/streaming/pom.xml b/streaming/pom.xml new file mode 100644 index 0000000000..6ee7e59df3 --- /dev/null +++ b/streaming/pom.xml @@ -0,0 +1,144 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.spark-project</groupId> + <artifactId>parent</artifactId> + <version>0.7.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <groupId>org.spark-project</groupId> + <artifactId>spark-streaming</artifactId> + <packaging>jar</packaging> + <name>Spark Project Streaming</name> + <url>http://spark-project.org/</url> + + <repositories> + <!-- A repository in the local filesystem for the Kafka JAR, which we modified for Scala 2.9 --> + <repository> + <id>lib</id> + <url>file://${project.basedir}/lib</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>1.9.11</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka</artifactId> + <version>0.7.2-spark</version> <!-- Comes from our in-project repository --> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sdk</artifactId> + <version>1.2.0</version> + </dependency> + <dependency> + <groupId>com.github.sgroschupf</groupId> + <artifactId>zkclient</artifactId> + <version>0.1</version> + </dependency> + + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.novocode</groupId> + <artifactId>junit-interface</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <outputDirectory>target/scala-${scala.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory> + <plugins> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>hadoop1</id> + <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + <classifier>hadoop1</classifier> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <classifier>hadoop1</classifier> + </configuration> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>hadoop2</id> + <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + <classifier>hadoop2</classifier> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <classifier>hadoop2</classifier> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala new file mode 100644 index 0000000000..2f3adb39c2 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -0,0 +1,118 @@ +package spark.streaming + +import spark.{Logging, Utils} + +import org.apache.hadoop.fs.{FileUtil, Path} +import org.apache.hadoop.conf.Configuration + +import java.io._ + + +private[streaming] +class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) + extends Logging with Serializable { + val master = ssc.sc.master + val framework = ssc.sc.jobName + val sparkHome = ssc.sc.sparkHome + val jars = ssc.sc.jars + val graph = ssc.graph + val checkpointDir = ssc.checkpointDir + val checkpointDuration: Duration = ssc.checkpointDuration + + def validate() { + assert(master != null, "Checkpoint.master is null") + assert(framework != null, "Checkpoint.framework is null") + assert(graph != null, "Checkpoint.graph is null") + assert(checkpointTime != null, "Checkpoint.checkpointTime is null") + logInfo("Checkpoint for time " + checkpointTime + " validated") + } +} + +/** + * Convenience class to speed up the writing of graph checkpoint to file + */ +private[streaming] +class CheckpointWriter(checkpointDir: String) extends Logging { + val file = new Path(checkpointDir, "graph") + val conf = new Configuration() + var fs = file.getFileSystem(conf) + val maxAttempts = 3 + + def write(checkpoint: Checkpoint) { + // TODO: maybe do this in a different thread from the main stream execution thread + var attempts = 0 + while (attempts < maxAttempts) { + attempts += 1 + try { + logDebug("Saving checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'") + if (fs.exists(file)) { + val bkFile = new Path(file.getParent, file.getName + ".bk") + FileUtil.copy(fs, file, fs, bkFile, true, true, conf) + logDebug("Moved existing checkpoint file to " + bkFile) + } + val fos = fs.create(file) + val oos = new ObjectOutputStream(fos) + oos.writeObject(checkpoint) + oos.close() + logInfo("Checkpoint for time " + checkpoint.checkpointTime + " saved to file '" + file + "'") + fos.close() + return + } catch { + case ioe: IOException => + logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe) + } + } + logError("Could not write checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'") + } +} + + +private[streaming] +object CheckpointReader extends Logging { + + def read(path: String): Checkpoint = { + val fs = new Path(path).getFileSystem(new Configuration()) + val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk")) + + attempts.foreach(file => { + if (fs.exists(file)) { + logInfo("Attempting to load checkpoint from file '" + file + "'") + try { + val fis = fs.open(file) + // ObjectInputStream uses the last defined user-defined class loader in the stack + // to find classes, which maybe the wrong class loader. Hence, a inherited version + // of ObjectInputStream is used to explicitly use the current thread's default class + // loader to find and load classes. This is a well know Java issue and has popped up + // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) + val ois = new ObjectInputStreamWithLoader(fis, Thread.currentThread().getContextClassLoader) + val cp = ois.readObject.asInstanceOf[Checkpoint] + ois.close() + fs.close() + cp.validate() + logInfo("Checkpoint successfully loaded from file '" + file + "'") + logInfo("Checkpoint was generated at time " + cp.checkpointTime) + return cp + } catch { + case e: Exception => + logError("Error loading checkpoint from file '" + file + "'", e) + } + } else { + logWarning("Could not read checkpoint from file '" + file + "' as it does not exist") + } + + }) + throw new Exception("Could not read checkpoint from path '" + path + "'") + } +} + +private[streaming] +class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader) extends ObjectInputStream(inputStream_) { + override def resolveClass(desc: ObjectStreamClass): Class[_] = { + try { + return loader.loadClass(desc.getName()) + } catch { + case e: Exception => + } + return super.resolveClass(desc) + } +} diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala new file mode 100644 index 0000000000..352f83fe0c --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -0,0 +1,657 @@ +package spark.streaming + +import spark.streaming.dstream._ +import StreamingContext._ +//import Time._ + +import spark.{RDD, Logging} +import spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap + +import java.io.{ObjectInputStream, IOException, ObjectOutputStream} + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration + +/** + * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]] + * for more details on RDDs). DStreams can either be created from live data (such as, data from + * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations + * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each + * DStream periodically generates a RDD, either from live data or by transforming the RDD generated + * by a parent DStream. + * + * This class contains the basic operations available on all DStreams, such as `map`, `filter` and + * `window`. In addition, [[spark.streaming.PairDStreamFunctions]] contains operations available + * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations + * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through + * implicit conversions when `spark.streaming.StreamingContext._` is imported. + * + * DStreams internally is characterized by a few basic properties: + * - A list of other DStreams that the DStream depends on + * - A time interval at which the DStream generates an RDD + * - A function that is used to generate an RDD after each time interval + */ + +abstract class DStream[T: ClassManifest] ( + @transient protected[streaming] var ssc: StreamingContext + ) extends Serializable with Logging { + + initLogging() + + // ======================================================================= + // Methods that should be implemented by subclasses of DStream + // ======================================================================= + + /** Time interval after which the DStream generates a RDD */ + def slideDuration: Duration + + /** List of parent DStreams on which this DStream depends on */ + def dependencies: List[DStream[_]] + + /** Method that generates a RDD for the given time */ + def compute (validTime: Time): Option[RDD[T]] + + // ======================================================================= + // Methods and fields available on all DStreams + // ======================================================================= + + // RDDs generated, marked as protected[streaming] so that testsuites can access it + @transient + protected[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () + + // Time zero for the DStream + protected[streaming] var zeroTime: Time = null + + // Duration for which the DStream will remember each RDD created + protected[streaming] var rememberDuration: Duration = null + + // Storage level of the RDDs in the stream + protected[streaming] var storageLevel: StorageLevel = StorageLevel.NONE + + // Checkpoint details + protected[streaming] val mustCheckpoint = false + protected[streaming] var checkpointDuration: Duration = null + protected[streaming] var checkpointData = new DStreamCheckpointData(HashMap[Time, Any]()) + + // Reference to whole DStream graph + protected[streaming] var graph: DStreamGraph = null + + protected[streaming] def isInitialized = (zeroTime != null) + + // Duration for which the DStream requires its parent DStream to remember each RDD created + protected[streaming] def parentRememberDuration = rememberDuration + + /** Returns the StreamingContext associated with this DStream */ + def context() = ssc + + /** Persists the RDDs of this DStream with the given storage level */ + def persist(level: StorageLevel): DStream[T] = { + if (this.isInitialized) { + throw new UnsupportedOperationException( + "Cannot change storage level of an DStream after streaming context has started") + } + this.storageLevel = level + this + } + + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER) + + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def cache(): DStream[T] = persist() + + /** + * Enable periodic checkpointing of RDDs of this DStream + * @param interval Time interval after which generated RDD will be checkpointed + */ + def checkpoint(interval: Duration): DStream[T] = { + if (isInitialized) { + throw new UnsupportedOperationException( + "Cannot change checkpoint interval of an DStream after streaming context has started") + } + persist() + checkpointDuration = interval + this + } + + /** + * Initialize the DStream by setting the "zero" time, based on which + * the validity of future times is calculated. This method also recursively initializes + * its parent DStreams. + */ + protected[streaming] def initialize(time: Time) { + if (zeroTime != null && zeroTime != time) { + throw new Exception("ZeroTime is already initialized to " + zeroTime + + ", cannot initialize it again to " + time) + } + zeroTime = time + + // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger + if (mustCheckpoint && checkpointDuration == null) { + checkpointDuration = slideDuration.max(Seconds(10)) + logInfo("Checkpoint interval automatically set to " + checkpointDuration) + } + + // Set the minimum value of the rememberDuration if not already set + var minRememberDuration = slideDuration + if (checkpointDuration != null && minRememberDuration <= checkpointDuration) { + minRememberDuration = checkpointDuration * 2 // times 2 just to be sure that the latest checkpoint is not forgetten + } + if (rememberDuration == null || rememberDuration < minRememberDuration) { + rememberDuration = minRememberDuration + } + + // Initialize the dependencies + dependencies.foreach(_.initialize(zeroTime)) + } + + protected[streaming] def validate() { + assert(rememberDuration != null, "Remember duration is set to null") + + assert( + !mustCheckpoint || checkpointDuration != null, + "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." + + " Please use DStream.checkpoint() to set the interval." + ) + + assert( + checkpointDuration == null || ssc.sc.checkpointDir.isDefined, + "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" + + " or SparkContext.checkpoint() to set the checkpoint directory." + ) + + assert( + checkpointDuration == null || checkpointDuration >= slideDuration, + "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + + checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " + + "Please set it to at least " + slideDuration + "." + ) + + assert( + checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration), + "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + + checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " + + "Please set it to a multiple " + slideDuration + "." + ) + + assert( + checkpointDuration == null || storageLevel != StorageLevel.NONE, + "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " + + "level has not been set to enable persisting. Please use DStream.persist() to set the " + + "storage level to use memory for better checkpointing performance." + ) + + assert( + checkpointDuration == null || rememberDuration > checkpointDuration, + "The remember duration for " + this.getClass.getSimpleName + " has been set to " + + rememberDuration + " which is not more than the checkpoint interval (" + + checkpointDuration + "). Please set it to higher than " + checkpointDuration + "." + ) + + val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds + logInfo("metadataCleanupDelay = " + metadataCleanerDelay) + assert( + metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000, + "It seems you are doing some DStream window operation or setting a checkpoint interval " + + "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + + "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" + + "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " + + "set the Java property 'spark.cleaner.delay' to more than " + + math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds." + ) + + dependencies.foreach(_.validate()) + + logInfo("Slide time = " + slideDuration) + logInfo("Storage level = " + storageLevel) + logInfo("Checkpoint interval = " + checkpointDuration) + logInfo("Remember duration = " + rememberDuration) + logInfo("Initialized and validated " + this) + } + + protected[streaming] def setContext(s: StreamingContext) { + if (ssc != null && ssc != s) { + throw new Exception("Context is already set in " + this + ", cannot set it again") + } + ssc = s + logInfo("Set context for " + this) + dependencies.foreach(_.setContext(ssc)) + } + + protected[streaming] def setGraph(g: DStreamGraph) { + if (graph != null && graph != g) { + throw new Exception("Graph is already set in " + this + ", cannot set it again") + } + graph = g + dependencies.foreach(_.setGraph(graph)) + } + + protected[streaming] def remember(duration: Duration) { + if (duration != null && duration > rememberDuration) { + rememberDuration = duration + logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this) + } + dependencies.foreach(_.remember(parentRememberDuration)) + } + + /** This method checks whether the 'time' is valid wrt slideDuration for generating RDD */ + protected def isTimeValid(time: Time): Boolean = { + if (!isInitialized) { + throw new Exception (this + " has not been initialized") + } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { + false + } else { + true + } + } + + /** + * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal + * method that should not be called directly. + */ + protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = { + // If this DStream was not initialized (i.e., zeroTime not set), then do it + // If RDD was already generated, then retrieve it from HashMap + generatedRDDs.get(time) match { + + // If an RDD was already generated and is being reused, then + // probably all RDDs in this DStream will be reused and hence should be cached + case Some(oldRDD) => Some(oldRDD) + + // if RDD was not generated, and if the time is valid + // (based on sliding time of this DStream), then generate the RDD + case None => { + if (isTimeValid(time)) { + compute(time) match { + case Some(newRDD) => + if (storageLevel != StorageLevel.NONE) { + newRDD.persist(storageLevel) + logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time) + } + if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { + newRDD.checkpoint() + logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time) + } + generatedRDDs.put(time, newRDD) + Some(newRDD) + case None => + None + } + } else { + None + } + } + } + } + + /** + * Generate a SparkStreaming job for the given time. This is an internal method that + * should not be called directly. This default implementation creates a job + * that materializes the corresponding RDD. Subclasses of DStream may override this + * (eg. ForEachDStream). + */ + protected[streaming] def generateJob(time: Time): Option[Job] = { + getOrCompute(time) match { + case Some(rdd) => { + val jobFunc = () => { + val emptyFunc = { (iterator: Iterator[T]) => {} } + ssc.sc.runJob(rdd, emptyFunc) + } + Some(new Job(time, jobFunc)) + } + case None => None + } + } + + /** + * Dereference RDDs that are older than rememberDuration. + */ + protected[streaming] def forgetOldRDDs(time: Time) { + val keys = generatedRDDs.keys + var numForgotten = 0 + keys.foreach(t => { + if (t <= (time - rememberDuration)) { + generatedRDDs.remove(t) + numForgotten += 1 + logInfo("Forgot RDD of time " + t + " from " + this) + } + }) + logInfo("Forgot " + numForgotten + " RDDs from " + this) + dependencies.foreach(_.forgetOldRDDs(time)) + } + + /* Adds metadata to the Stream while it is running. + * This methd should be overwritten by sublcasses of InputDStream. + */ + protected[streaming] def addMetadata(metadata: Any) { + if (metadata != null) { + logInfo("Dropping Metadata: " + metadata.toString) + } + } + + /** + * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of + * this stream. This is an internal method that should not be called directly. This is + * a default implementation that saves only the file names of the checkpointed RDDs to + * checkpointData. Subclasses of DStream (especially those of InputDStream) may override + * this method to save custom checkpoint data. + */ + protected[streaming] def updateCheckpointData(currentTime: Time) { + logInfo("Updating checkpoint data for time " + currentTime) + + // Get the checkpointed RDDs from the generated RDDs + val newRdds = generatedRDDs.filter(_._2.getCheckpointFile.isDefined) + .map(x => (x._1, x._2.getCheckpointFile.get)) + + // Make a copy of the existing checkpoint data (checkpointed RDDs) + val oldRdds = checkpointData.rdds.clone() + + // If the new checkpoint data has checkpoints then replace existing with the new one + if (newRdds.size > 0) { + checkpointData.rdds.clear() + checkpointData.rdds ++= newRdds + } + + // Make parent DStreams update their checkpoint data + dependencies.foreach(_.updateCheckpointData(currentTime)) + + // TODO: remove this, this is just for debugging + newRdds.foreach { + case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") } + } + + if (newRdds.size > 0) { + (oldRdds -- newRdds.keySet).foreach { + case (time, data) => { + val path = new Path(data.toString) + val fs = path.getFileSystem(new Configuration()) + fs.delete(path, true) + logInfo("Deleted checkpoint file '" + path + "' for time " + time) + } + } + } + logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.rdds.size + " checkpoints, " + + "[" + checkpointData.rdds.mkString(",") + "]") + } + + /** + * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method + * that should not be called directly. This is a default implementation that recreates RDDs + * from the checkpoint file names stored in checkpointData. Subclasses of DStream that + * override the updateCheckpointData() method would also need to override this method. + */ + protected[streaming] def restoreCheckpointData() { + // Create RDDs from the checkpoint data + logInfo("Restoring checkpoint data from " + checkpointData.rdds.size + " checkpointed RDDs") + checkpointData.rdds.foreach { + case(time, data) => { + logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'") + val rdd = ssc.sc.checkpointFile[T](data.toString) + generatedRDDs += ((time, rdd)) + } + } + dependencies.foreach(_.restoreCheckpointData()) + logInfo("Restored checkpoint data") + } + + @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream) { + logDebug(this.getClass().getSimpleName + ".writeObject used") + if (graph != null) { + graph.synchronized { + if (graph.checkpointInProgress) { + oos.defaultWriteObject() + } else { + val msg = "Object of " + this.getClass.getName + " is being serialized " + + " possibly as a part of closure of an RDD operation. This is because " + + " the DStream object is being referred to from within the closure. " + + " Please rewrite the RDD operation inside this DStream to avoid this. " + + " This has been enforced to avoid bloating of Spark tasks " + + " with unnecessary objects." + throw new java.io.NotSerializableException(msg) + } + } + } else { + throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.") + } + } + + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + logDebug(this.getClass().getSimpleName + ".readObject used") + ois.defaultReadObject() + generatedRDDs = new HashMap[Time, RDD[T]] () + } + + // ======================================================================= + // DStream operations + // ======================================================================= + + /** Return a new DStream by applying a function to all elements of this DStream. */ + def map[U: ClassManifest](mapFunc: T => U): DStream[U] = { + new MappedDStream(this, ssc.sc.clean(mapFunc)) + } + + /** + * Return a new DStream by applying a function to all elements of this DStream, + * and then flattening the results + */ + def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = { + new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc)) + } + + /** Return a new DStream containing only the elements that satisfy a predicate. */ + def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc) + + /** + * Return a new DStream in which each RDD is generated by applying glom() to each RDD of + * this DStream. Applying glom() to an RDD coalesces all elements within each partition into + * an array. + */ + def glom(): DStream[Array[T]] = new GlommedDStream(this) + + /** + * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs + * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition + * of the RDD. + */ + def mapPartitions[U: ClassManifest]( + mapPartFunc: Iterator[T] => Iterator[U], + preservePartitioning: Boolean = false + ): DStream[U] = { + new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc), preservePartitioning) + } + + /** + * Return a new DStream in which each RDD has a single element generated by reducing each RDD + * of this DStream. + */ + def reduce(reduceFunc: (T, T) => T): DStream[T] = + this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) + + /** + * Return a new DStream in which each RDD has a single element generated by counting each RDD + * of this DStream. + */ + def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _) + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * this DStream will be registered as an output stream and therefore materialized. + */ + def foreach(foreachFunc: RDD[T] => Unit) { + foreach((r: RDD[T], t: Time) => foreachFunc(r)) + } + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * this DStream will be registered as an output stream and therefore materialized. + */ + def foreach(foreachFunc: (RDD[T], Time) => Unit) { + val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc)) + ssc.registerOutputStream(newStream) + newStream + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = { + transform((r: RDD[T], t: Time) => transformFunc(r)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { + new TransformedDStream(this, ssc.sc.clean(transformFunc)) + } + + /** + * Print the first ten elements of each RDD generated in this DStream. This is an output + * operator, so this DStream will be registered as an output stream and there materialized. + */ + def print() { + def foreachFunc = (rdd: RDD[T], time: Time) => { + val first11 = rdd.take(11) + println ("-------------------------------------------") + println ("Time: " + time) + println ("-------------------------------------------") + first11.take(10).foreach(println) + if (first11.size > 10) println("...") + println() + } + val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc)) + ssc.registerOutputStream(newStream) + } + + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * The new DStream generates RDDs with the same interval as this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's interval. + */ + def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration) + + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = { + new WindowedDStream(this, windowDuration, slideDuration) + } + + /** + * Return a new DStream which computed based on tumbling window on this DStream. + * This is equivalent to window(batchTime, batchTime). + * @param batchDuration tumbling window duration; must be a multiple of this DStream's + * batching interval + */ + def tumble(batchDuration: Duration): DStream[T] = window(batchDuration, batchDuration) + + /** + * Return a new DStream in which each RDD has a single element generated by reducing all + * elements in a window over this DStream. windowDuration and slideDuration are as defined + * in the window() operation. This is equivalent to + * window(windowDuration, slideDuration).reduce(reduceFunc) + */ + def reduceByWindow( + reduceFunc: (T, T) => T, + windowDuration: Duration, + slideDuration: Duration + ): DStream[T] = { + this.window(windowDuration, slideDuration).reduce(reduceFunc) + } + + def reduceByWindow( + reduceFunc: (T, T) => T, + invReduceFunc: (T, T) => T, + windowDuration: Duration, + slideDuration: Duration + ): DStream[T] = { + this.map(x => (1, x)) + .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) + .map(_._2) + } + + /** + * Return a new DStream in which each RDD has a single element generated by counting the number + * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the + * window() operation. This is equivalent to window(windowDuration, slideDuration).count() + */ + def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = { + this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration) + } + + /** + * Return a new DStream by unifying data of another DStream with this DStream. + * @param that Another DStream having the same slideDuration as this DStream. + */ + def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) + + /** + * Return all the RDDs defined by the Interval object (both end times included) + */ + protected[streaming] def slice(interval: Interval): Seq[RDD[T]] = { + slice(interval.beginTime, interval.endTime) + } + + /** + * Return all the RDDs between 'fromTime' to 'toTime' (both included) + */ + def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { + val rdds = new ArrayBuffer[RDD[T]]() + var time = toTime.floor(slideDuration) + while (time >= zeroTime && time >= fromTime) { + getOrCompute(time) match { + case Some(rdd) => rdds += rdd + case None => //throw new Exception("Could not get RDD for time " + time) + } + time -= slideDuration + } + rdds.toSeq + } + + /** + * Save each RDD in this DStream as a Sequence file of serialized objects. + * The file name at each batch interval is generated based on `prefix` and + * `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsObjectFiles(prefix: String, suffix: String = "") { + val saveFunc = (rdd: RDD[T], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsObjectFile(file) + } + this.foreach(saveFunc) + } + + /** + * Save each RDD in this DStream as at text file, using string representation + * of elements. The file name at each batch interval is generated based on + * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsTextFiles(prefix: String, suffix: String = "") { + val saveFunc = (rdd: RDD[T], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsTextFile(file) + } + this.foreach(saveFunc) + } + + def register() { + ssc.registerOutputStream(this) + } +} + +private[streaming] +case class DStreamCheckpointData(rdds: HashMap[Time, Any]) + diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala new file mode 100644 index 0000000000..bc4a40d7bc --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -0,0 +1,134 @@ +package spark.streaming + +import dstream.InputDStream +import java.io.{ObjectInputStream, IOException, ObjectOutputStream} +import collection.mutable.ArrayBuffer +import spark.Logging + +final private[streaming] class DStreamGraph extends Serializable with Logging { + initLogging() + + private val inputStreams = new ArrayBuffer[InputDStream[_]]() + private val outputStreams = new ArrayBuffer[DStream[_]]() + + private[streaming] var zeroTime: Time = null + private[streaming] var batchDuration: Duration = null + private[streaming] var rememberDuration: Duration = null + private[streaming] var checkpointInProgress = false + + private[streaming] def start(time: Time) { + this.synchronized { + if (zeroTime != null) { + throw new Exception("DStream graph computation already started") + } + zeroTime = time + outputStreams.foreach(_.initialize(zeroTime)) + outputStreams.foreach(_.remember(rememberDuration)) + outputStreams.foreach(_.validate) + inputStreams.par.foreach(_.start()) + } + } + + private[streaming] def stop() { + this.synchronized { + inputStreams.par.foreach(_.stop()) + } + } + + private[streaming] def setContext(ssc: StreamingContext) { + this.synchronized { + outputStreams.foreach(_.setContext(ssc)) + } + } + + private[streaming] def setBatchDuration(duration: Duration) { + this.synchronized { + if (batchDuration != null) { + throw new Exception("Batch duration already set as " + batchDuration + + ". cannot set it again.") + } + } + batchDuration = duration + } + + private[streaming] def remember(duration: Duration) { + this.synchronized { + if (rememberDuration != null) { + throw new Exception("Batch duration already set as " + batchDuration + + ". cannot set it again.") + } + } + rememberDuration = duration + } + + private[streaming] def addInputStream(inputStream: InputDStream[_]) { + this.synchronized { + inputStream.setGraph(this) + inputStreams += inputStream + } + } + + private[streaming] def addOutputStream(outputStream: DStream[_]) { + this.synchronized { + outputStream.setGraph(this) + outputStreams += outputStream + } + } + + private[streaming] def getInputStreams() = this.synchronized { inputStreams.toArray } + + private[streaming] def getOutputStreams() = this.synchronized { outputStreams.toArray } + + private[streaming] def generateRDDs(time: Time): Seq[Job] = { + this.synchronized { + outputStreams.flatMap(outputStream => outputStream.generateJob(time)) + } + } + + private[streaming] def forgetOldRDDs(time: Time) { + this.synchronized { + outputStreams.foreach(_.forgetOldRDDs(time)) + } + } + + private[streaming] def updateCheckpointData(time: Time) { + this.synchronized { + outputStreams.foreach(_.updateCheckpointData(time)) + } + } + + private[streaming] def restoreCheckpointData() { + this.synchronized { + outputStreams.foreach(_.restoreCheckpointData()) + } + } + + private[streaming] def validate() { + this.synchronized { + assert(batchDuration != null, "Batch duration has not been set") + //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low") + assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute") + } + } + + @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream) { + this.synchronized { + logDebug("DStreamGraph.writeObject used") + checkpointInProgress = true + oos.defaultWriteObject() + checkpointInProgress = false + } + } + + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + this.synchronized { + logDebug("DStreamGraph.readObject used") + checkpointInProgress = true + ois.defaultReadObject() + checkpointInProgress = false + } + } +} + diff --git a/streaming/src/main/scala/spark/streaming/Duration.scala b/streaming/src/main/scala/spark/streaming/Duration.scala new file mode 100644 index 0000000000..e4dc579a17 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/Duration.scala @@ -0,0 +1,62 @@ +package spark.streaming + +case class Duration (private val millis: Long) { + + def < (that: Duration): Boolean = (this.millis < that.millis) + + def <= (that: Duration): Boolean = (this.millis <= that.millis) + + def > (that: Duration): Boolean = (this.millis > that.millis) + + def >= (that: Duration): Boolean = (this.millis >= that.millis) + + def + (that: Duration): Duration = new Duration(millis + that.millis) + + def - (that: Duration): Duration = new Duration(millis - that.millis) + + def * (times: Int): Duration = new Duration(millis * times) + + def / (that: Duration): Long = millis / that.millis + + def isMultipleOf(that: Duration): Boolean = + (this.millis % that.millis == 0) + + def min(that: Duration): Duration = if (this < that) this else that + + def max(that: Duration): Duration = if (this > that) this else that + + def isZero: Boolean = (this.millis == 0) + + override def toString: String = (millis.toString + " ms") + + def toFormattedString: String = millis.toString + + def milliseconds: Long = millis +} + + +/** + * Helper object that creates instance of [[spark.streaming.Duration]] representing + * a given number of milliseconds. + */ +object Milliseconds { + def apply(milliseconds: Long) = new Duration(milliseconds) +} + +/** + * Helper object that creates instance of [[spark.streaming.Duration]] representing + * a given number of seconds. + */ +object Seconds { + def apply(seconds: Long) = new Duration(seconds * 1000) +} + +/** + * Helper object that creates instance of [[spark.streaming.Duration]] representing + * a given number of minutes. + */ +object Minutes { + def apply(minutes: Long) = new Duration(minutes * 60000) +} + + diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala new file mode 100644 index 0000000000..dc21dfb722 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/Interval.scala @@ -0,0 +1,41 @@ +package spark.streaming + +private[streaming] +class Interval(val beginTime: Time, val endTime: Time) { + def this(beginMs: Long, endMs: Long) = this(new Time(beginMs), new Time(endMs)) + + def duration(): Duration = endTime - beginTime + + def + (time: Duration): Interval = { + new Interval(beginTime + time, endTime + time) + } + + def - (time: Duration): Interval = { + new Interval(beginTime - time, endTime - time) + } + + def < (that: Interval): Boolean = { + if (this.duration != that.duration) { + throw new Exception("Comparing two intervals with different durations [" + this + ", " + that + "]") + } + this.endTime < that.endTime + } + + def <= (that: Interval) = (this < that || this == that) + + def > (that: Interval) = !(this <= that) + + def >= (that: Interval) = !(this < that) + + override def toString = "[" + beginTime + ", " + endTime + "]" +} + +object Interval { + def currentInterval(duration: Duration): Interval = { + val time = new Time(System.currentTimeMillis) + val intervalBegin = time.floor(duration) + new Interval(intervalBegin, intervalBegin + duration) + } +} + + diff --git a/streaming/src/main/scala/spark/streaming/Job.scala b/streaming/src/main/scala/spark/streaming/Job.scala new file mode 100644 index 0000000000..67bd8388bc --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/Job.scala @@ -0,0 +1,24 @@ +package spark.streaming + +import java.util.concurrent.atomic.AtomicLong + +private[streaming] +class Job(val time: Time, func: () => _) { + val id = Job.getNewId() + def run(): Long = { + val startTime = System.currentTimeMillis + func() + val stopTime = System.currentTimeMillis + (stopTime - startTime) + } + + override def toString = "streaming job " + id + " @ " + time +} + +private[streaming] +object Job { + val id = new AtomicLong(0) + + def getNewId() = id.getAndIncrement() +} + diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala new file mode 100644 index 0000000000..3b910538e0 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -0,0 +1,33 @@ +package spark.streaming + +import spark.Logging +import spark.SparkEnv +import java.util.concurrent.Executors + + +private[streaming] +class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { + + class JobHandler(ssc: StreamingContext, job: Job) extends Runnable { + def run() { + SparkEnv.set(ssc.env) + try { + val timeTaken = job.run() + logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format( + (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, timeTaken / 1000.0)) + } catch { + case e: Exception => + logError("Running " + job + " failed", e) + } + } + } + + initLogging() + + val jobExecutor = Executors.newFixedThreadPool(numThreads) + + def runJob(job: Job) { + jobExecutor.execute(new JobHandler(ssc, job)) + logInfo("Added " + job + " to queue") + } +} diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala new file mode 100644 index 0000000000..e4152f3a61 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -0,0 +1,151 @@ +package spark.streaming + +import spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver} +import spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError} +import spark.Logging +import spark.SparkEnv + +import scala.collection.mutable.HashMap +import scala.collection.mutable.Queue + +import akka.actor._ +import akka.pattern.ask +import akka.util.duration._ +import akka.dispatch._ + +private[streaming] sealed trait NetworkInputTrackerMessage +private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage +private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage +private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage + +/** + * This class manages the execution of the receivers of NetworkInputDStreams. + */ +private[streaming] +class NetworkInputTracker( + @transient ssc: StreamingContext, + @transient networkInputStreams: Array[NetworkInputDStream[_]]) + extends Logging { + + val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*) + val receiverExecutor = new ReceiverExecutor() + val receiverInfo = new HashMap[Int, ActorRef] + val receivedBlockIds = new HashMap[Int, Queue[String]] + val timeout = 5000.milliseconds + + var currentTime: Time = null + + /** Start the actor and receiver execution thread. */ + def start() { + ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker") + receiverExecutor.start() + } + + /** Stop the receiver execution thread. */ + def stop() { + // TODO: stop the actor as well + receiverExecutor.interrupt() + receiverExecutor.stopReceivers() + } + + /** Return all the blocks received from a receiver. */ + def getBlockIds(receiverId: Int, time: Time): Array[String] = synchronized { + val queue = receivedBlockIds.synchronized { + receivedBlockIds.getOrElse(receiverId, new Queue[String]()) + } + val result = queue.synchronized { + queue.dequeueAll(x => true) + } + logInfo("Stream " + receiverId + " received " + result.size + " blocks") + result.toArray + } + + /** Actor to receive messages from the receivers. */ + private class NetworkInputTrackerActor extends Actor { + def receive = { + case RegisterReceiver(streamId, receiverActor) => { + if (!networkInputStreamMap.contains(streamId)) { + throw new Exception("Register received for unexpected id " + streamId) + } + receiverInfo += ((streamId, receiverActor)) + logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address) + sender ! true + } + case AddBlocks(streamId, blockIds, metadata) => { + val tmp = receivedBlockIds.synchronized { + if (!receivedBlockIds.contains(streamId)) { + receivedBlockIds += ((streamId, new Queue[String])) + } + receivedBlockIds(streamId) + } + tmp.synchronized { + tmp ++= blockIds + } + networkInputStreamMap(streamId).addMetadata(metadata) + } + case DeregisterReceiver(streamId, msg) => { + receiverInfo -= streamId + logInfo("De-registered receiver for network stream " + streamId + + " with message " + msg) + //TODO: Do something about the corresponding NetworkInputDStream + } + } + } + + /** This thread class runs all the receivers on the cluster. */ + class ReceiverExecutor extends Thread { + val env = ssc.env + + override def run() { + try { + SparkEnv.set(env) + startReceivers() + } catch { + case ie: InterruptedException => logInfo("ReceiverExecutor interrupted") + } finally { + stopReceivers() + } + } + + /** + * Get the receivers from the NetworkInputDStreams, distributes them to the + * worker nodes as a parallel collection, and runs them. + */ + def startReceivers() { + val receivers = networkInputStreams.map(nis => { + val rcvr = nis.createReceiver() + rcvr.setStreamId(nis.id) + rcvr + }) + + // Right now, we only honor preferences if all receivers have them + val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _) + + // Create the parallel collection of receivers to distributed them on the worker nodes + val tempRDD = + if (hasLocationPreferences) { + val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString))) + ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences) + } + else { + ssc.sc.makeRDD(receivers, receivers.size) + } + + // Function to start the receiver on the worker node + val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => { + if (!iterator.hasNext) { + throw new Exception("Could not start receiver as details not found.") + } + iterator.next().start() + } + // Distribute the receivers and start them + ssc.sc.runJob(tempRDD, startReceiver) + } + + /** Stops the receivers. */ + def stopReceivers() { + // Signal the receivers to stop + receiverInfo.values.foreach(_ ! StopReceiver) + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala new file mode 100644 index 0000000000..fbcf061126 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -0,0 +1,562 @@ +package spark.streaming + +import spark.streaming.StreamingContext._ +import spark.streaming.dstream.{ReducedWindowedDStream, StateDStream} +import spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream} +import spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream} + +import spark.{Manifests, RDD, Partitioner, HashPartitioner} +import spark.SparkContext._ +import spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.mapred.{JobConf, OutputFormat} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} +import org.apache.hadoop.mapred.OutputFormat +import org.apache.hadoop.conf.Configuration + +class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)]) +extends Serializable { + + def ssc = self.ssc + + private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { + new HashPartitioner(numPartitions) + } + + /** + * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + */ + def groupByKey(): DStream[(K, Seq[V])] = { + groupByKey(defaultPartitioner()) + } + + /** + * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. + */ + def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = { + groupByKey(defaultPartitioner(numPartitions)) + } + + /** + * Create a new DStream by applying `groupByKey` on each RDD. The supplied [[spark.Partitioner]] + * is used to control the partitioning of each RDD. + */ + def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { + val createCombiner = (v: V) => ArrayBuffer[V](v) + val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v) + val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2) + combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner) + .asInstanceOf[DStream[(K, Seq[V])]] + } + + /** + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the associative reduce function. Hash partitioning is used to generate the RDDs + * with Spark's default number of partitions. + */ + def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = { + reduceByKey(reduceFunc, defaultPartitioner()) + } + + /** + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs + * with `numPartitions` partitions. + */ + def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = { + reduceByKey(reduceFunc, defaultPartitioner(numPartitions)) + } + + /** + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. [[spark.Partitioner]] is used to control the + * partitioning of each RDD. + */ + def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner) + } + + /** + * Combine elements of each key in DStream's RDDs using custom function. This is similar to the + * combineByKey for RDDs. Please refer to combineByKey in [[spark.PairRDDFunctions]] for more + * information. + */ + def combineByKey[C: ClassManifest]( + createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiner: (C, C) => C, + partitioner: Partitioner) : DStream[(K, C)] = { + new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner) + } + + /** + * Create a new DStream by counting the number of values of each key in each RDD. Hash + * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions. + */ + def countByKey(numPartitions: Int = self.ssc.sc.defaultParallelism): DStream[(K, Long)] = { + self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) + } + + /** + * Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to + * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs + * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ + def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = { + groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner()) + } + + /** + * Create a new DStream by applying `groupByKey` over a sliding window. Similar to + * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = { + groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) + } + + /** + * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ + def groupByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int + ): DStream[(K, Seq[V])] = { + groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions)) + } + + /** + * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ + def groupByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): DStream[(K, Seq[V])] = { + self.window(windowDuration, slideDuration).groupByKey(partitioner) + } + + /** + * Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream + * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate + * the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowDuration: Duration + ): DStream[(K, V)] = { + reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner()) + } + + /** + * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration + ): DStream[(K, V)] = { + reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner()) + } + + /** + * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int + ): DStream[(K, V)] = { + reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) + } + + /** + * Create a new DStream by applying `reduceByKey` over a sliding window. Similar to + * `DStream.reduceByKey()`, but applies it over a sliding window. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): DStream[(K, V)] = { + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + self.reduceByKey(cleanedReduceFunc, partitioner) + .window(windowDuration, slideDuration) + .reduceByKey(cleanedReduceFunc, partitioner) + } + + /** + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + invReduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration + ): DStream[(K, V)] = { + + reduceByKeyAndWindow( + reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner()) + } + + /** + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + invReduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int + ): DStream[(K, V)] = { + + reduceByKeyAndWindow( + reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) + } + + /** + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + invReduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): DStream[(K, V)] = { + + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) + new ReducedWindowedDStream[K, V]( + self, cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, partitioner) + } + + /** + * Create a new DStream by counting the number of values for each key over a window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ + def countByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int = self.ssc.sc.defaultParallelism + ): DStream[(K, Long)] = { + + self.map(x => (x._1, 1L)).reduceByKeyAndWindow( + (x: Long, y: Long) => x + y, + (x: Long, y: Long) => x - y, + windowDuration, + slideDuration, + numPartitions + ) + } + + /** + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @tparam S State type + */ + def updateStateByKey[S: ClassManifest]( + updateFunc: (Seq[V], Option[S]) => Option[S] + ): DStream[(K, S)] = { + updateStateByKey(updateFunc, defaultPartitioner()) + } + + /** + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @param numPartitions Number of partitions of each RDD in the new DStream. + * @tparam S State type + */ + def updateStateByKey[S: ClassManifest]( + updateFunc: (Seq[V], Option[S]) => Option[S], + numPartitions: Int + ): DStream[(K, S)] = { + updateStateByKey(updateFunc, defaultPartitioner(numPartitions)) + } + + /** + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of the key. + * [[spark.Partitioner]] is used to control the partitioning of each RDD. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @tparam S State type + */ + def updateStateByKey[S: ClassManifest]( + updateFunc: (Seq[V], Option[S]) => Option[S], + partitioner: Partitioner + ): DStream[(K, S)] = { + val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { + iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) + } + updateStateByKey(newUpdateFunc, partitioner, true) + } + + /** + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * [[spark.Paxrtitioner]] is used to control the partitioning of each RDD. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. Note, that + * this function may generate a different a tuple with a different key + * than the input key. It is up to the developer to decide whether to + * remember the partitioner despite the key being changed. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. + * @tparam S State type + */ + def updateStateByKey[S: ClassManifest]( + updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], + partitioner: Partitioner, + rememberPartitioner: Boolean + ): DStream[(K, S)] = { + new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner) + } + + + def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = { + new MapValuedDStream[K, V, U](self, mapValuesFunc) + } + + def flatMapValues[U: ClassManifest]( + flatMapValuesFunc: V => TraversableOnce[U] + ): DStream[(K, U)] = { + new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc) + } + + /** + * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that + * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number + * of partitions. + */ + def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { + cogroup(other, defaultPartitioner()) + } + + /** + * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that + * key in both RDDs. Partitioner is used to partition each generated RDD. + */ + def cogroup[W: ClassManifest]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (Seq[V], Seq[W]))] = { + + val cgd = new CoGroupedDStream[K]( + Seq(self.asInstanceOf[DStream[(_, _)]], other.asInstanceOf[DStream[(_, _)]]), + partitioner + ) + val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)( + classManifest[K], + Manifests.seqSeqManifest + ) + pdfs.mapValues { + case Seq(vs, ws) => + (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) + } + } + + /** + * Join `this` DStream with `other` DStream. HashPartitioner is used + * to partition each generated RDD into default number of partitions. + */ + def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = { + join[W](other, defaultPartitioner()) + } + + /** + * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will + * be generated by joining RDDs from `this` and other DStream. Uses the given + * Partitioner to partition each generated RDD. + */ + def join[W: ClassManifest]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (V, W))] = { + this.cogroup(other, partitioner) + .flatMapValues{ + case (vs, ws) => + for (v <- vs.iterator; w <- ws.iterator) yield (v, w) + } + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + */ + def saveAsHadoopFiles[F <: OutputFormat[K, V]]( + prefix: String, + suffix: String + )(implicit fm: ClassManifest[F]) { + saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + */ + def saveAsHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + conf: JobConf = new JobConf + ) { + val saveFunc = (rdd: RDD[(K, V)], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) + } + self.foreach(saveFunc) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( + prefix: String, + suffix: String + )(implicit fm: ClassManifest[F]) { + saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]], + conf: Configuration = new Configuration + ) { + val saveFunc = (rdd: RDD[(K, V)], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) + } + self.foreach(saveFunc) + } + + private def getKeyClass() = implicitly[ClassManifest[K]].erasure + + private def getValueClass() = implicitly[ClassManifest[V]].erasure +} + + diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala new file mode 100644 index 0000000000..c04ed37de8 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -0,0 +1,77 @@ +package spark.streaming + +import util.{ManualClock, RecurringTimer, Clock} +import spark.SparkEnv +import spark.Logging + +private[streaming] +class Scheduler(ssc: StreamingContext) extends Logging { + + initLogging() + + val graph = ssc.graph + + val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt + val jobManager = new JobManager(ssc, concurrentJobs) + + val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { + new CheckpointWriter(ssc.checkpointDir) + } else { + null + } + + val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock") + val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] + val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, + longTime => generateRDDs(new Time(longTime))) + + def start() { + // If context was started from checkpoint, then restart timer such that + // this timer's triggers occur at the same time as the original timer. + // Otherwise just start the timer from scratch, and initialize graph based + // on this first trigger time of the timer. + if (ssc.isCheckpointPresent) { + // If manual clock is being used for testing, then + // either set the manual clock to the last checkpointed time, + // or if the property is defined set it to that time + if (clock.isInstanceOf[ManualClock]) { + val lastTime = ssc.getInitialCheckpoint.checkpointTime.milliseconds + val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong + clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) + } + timer.restart(graph.zeroTime.milliseconds) + logInfo("Scheduler's timer restarted") + } else { + val firstTime = new Time(timer.start()) + graph.start(firstTime - ssc.graph.batchDuration) + logInfo("Scheduler's timer started") + } + logInfo("Scheduler started") + } + + def stop() { + timer.stop() + graph.stop() + logInfo("Scheduler stopped") + } + + private def generateRDDs(time: Time) { + SparkEnv.set(ssc.env) + logInfo("\n-----------------------------------------------------\n") + graph.generateRDDs(time).foreach(jobManager.runJob) + graph.forgetOldRDDs(time) + doCheckpoint(time) + logInfo("Generated RDDs for time " + time) + } + + private def doCheckpoint(time: Time) { + if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { + val startTime = System.currentTimeMillis() + ssc.graph.updateCheckpointData(time) + checkpointWriter.write(new Checkpoint(ssc, time)) + val stopTime = System.currentTimeMillis() + logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms") + } + } +} + diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala new file mode 100644 index 0000000000..37ba524b48 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -0,0 +1,411 @@ +package spark.streaming + +import spark.streaming.dstream._ + +import spark.{RDD, Logging, SparkEnv, SparkContext} +import spark.storage.StorageLevel +import spark.util.MetadataCleaner + +import scala.collection.mutable.Queue + +import java.io.InputStream +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.Text +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat +import org.apache.hadoop.fs.Path +import java.util.UUID + +/** + * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic + * information (such as, cluster URL and job name) to internally create a SparkContext, it provides + * methods used to create DStream from various input sources. + */ +class StreamingContext private ( + sc_ : SparkContext, + cp_ : Checkpoint, + batchDur_ : Duration + ) extends Logging { + + /** + * Creates a StreamingContext using an existing SparkContext. + * @param sparkContext Existing SparkContext + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration) + + /** + * Creates a StreamingContext by providing the details necessary for creating a new SparkContext. + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param frameworkName A name for your job, to display on the cluster web UI + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(master: String, frameworkName: String, batchDuration: Duration) = + this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration) + + /** + * Re-creates a StreamingContext from a checkpoint file. + * @param path Path either to the directory that was specified as the checkpoint directory, or + * to the checkpoint file 'graph' or 'graph.bk'. + */ + def this(path: String) = this(null, CheckpointReader.read(path), null) + + initLogging() + + if (sc_ == null && cp_ == null) { + throw new Exception("Streaming Context cannot be initilalized with " + + "both SparkContext and checkpoint as null") + } + + protected[streaming] val isCheckpointPresent = (cp_ != null) + + val sc: SparkContext = { + if (isCheckpointPresent) { + new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars) + } else { + sc_ + } + } + + protected[streaming] val env = SparkEnv.get + + protected[streaming] val graph: DStreamGraph = { + if (isCheckpointPresent) { + cp_.graph.setContext(this) + cp_.graph.restoreCheckpointData() + cp_.graph + } else { + assert(batchDur_ != null, "Batch duration for streaming context cannot be null") + val newGraph = new DStreamGraph() + newGraph.setBatchDuration(batchDur_) + newGraph + } + } + + protected[streaming] val nextNetworkInputStreamId = new AtomicInteger(0) + protected[streaming] var networkInputTracker: NetworkInputTracker = null + + protected[streaming] var checkpointDir: String = { + if (isCheckpointPresent) { + sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true) + cp_.checkpointDir + } else { + null + } + } + + protected[streaming] var checkpointDuration: Duration = if (isCheckpointPresent) cp_.checkpointDuration else null + protected[streaming] var receiverJobThread: Thread = null + protected[streaming] var scheduler: Scheduler = null + + /** + * Sets each DStreams in this context to remember RDDs it generated in the last given duration. + * DStreams remember RDDs only for a limited duration of time and releases them for garbage + * collection. This method allows the developer to specify how to long to remember the RDDs ( + * if the developer wishes to query old data outside the DStream computation). + * @param duration Minimum duration that each DStream should remember its RDDs + */ + def remember(duration: Duration) { + graph.remember(duration) + } + + /** + * Sets the context to periodically checkpoint the DStream operations for master + * fault-tolerance. By default, the graph will be checkpointed every batch interval. + * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored + * @param interval checkpoint interval + */ + def checkpoint(directory: String, interval: Duration = null) { + if (directory != null) { + sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory)) + checkpointDir = directory + checkpointDuration = interval + } else { + checkpointDir = null + checkpointDuration = null + } + } + + protected[streaming] def getInitialCheckpoint(): Checkpoint = { + if (isCheckpointPresent) cp_ else null + } + + protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement() + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param hostname Zookeper hostname. + * @param port Zookeper port. + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param initialOffsets Optional initial offsets for each of the partitions to consume. + * By default the value is pulled from zookeper. + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + def kafkaStream[T: ClassManifest]( + hostname: String, + port: Int, + groupId: String, + topics: Map[String, Int], + initialOffsets: Map[KafkaPartitionKey, Long] = Map[KafkaPartitionKey, Long](), + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 + ): DStream[T] = { + val inputStream = new KafkaInputDStream[T](this, hostname, port, groupId, topics, initialOffsets, storageLevel) + registerInputStream(inputStream) + inputStream + } + + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited + * lines. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + */ + def networkTextStream( + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[String] = { + networkStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) + } + + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes it interepreted as object using the given + * converter. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param converter Function to convert the byte stream to objects + * @param storageLevel Storage level to use for storing the received objects + * @tparam T Type of the objects received (after converting bytes to objects) + */ + def networkStream[T: ClassManifest]( + hostname: String, + port: Int, + converter: (InputStream) => Iterator[T], + storageLevel: StorageLevel + ): DStream[T] = { + val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel) + registerInputStream(inputStream) + inputStream + } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def flumeStream ( + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[SparkFlumeEvent] = { + val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel) + registerInputStream(inputStream) + inputStream + } + + /** + * Create a input stream from network source hostname:port, where data is received + * as serialized blocks (serialized using the Spark's serializer) that can be directly + * pushed into the block manager without deserializing them. This is the most efficient + * way to receive data. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param storageLevel Storage level to use for storing the received objects + * @tparam T Type of the objects in the received blocks + */ + def rawNetworkStream[T: ClassManifest]( + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[T] = { + val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel) + registerInputStream(inputStream) + inputStream + } + + /** + * Creates a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassManifest, + V: ClassManifest, + F <: NewInputFormat[K, V]: ClassManifest + ] (directory: String): DStream[(K, V)] = { + val inputStream = new FileInputDStream[K, V, F](this, directory) + registerInputStream(inputStream) + inputStream + } + + /** + * Creates a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * @param directory HDFS directory to monitor for new file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassManifest, + V: ClassManifest, + F <: NewInputFormat[K, V]: ClassManifest + ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = { + val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) + registerInputStream(inputStream) + inputStream + } + + + /** + * Creates a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as text files (using key as LongWritable, value + * as Text and input format as TextInputFormat). File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + */ + def textFileStream(directory: String): DStream[String] = { + fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) + } + + /** + * Creates a input stream from an queue of RDDs. In each batch, + * it will process either one or all of the RDDs returned by the queue. + * @param queue Queue of RDDs + * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval + * @param defaultRDD Default RDD is returned by the DStream when the queue is empty + * @tparam T Type of objects in the RDD + */ + def queueStream[T: ClassManifest]( + queue: Queue[RDD[T]], + oneAtATime: Boolean = true, + defaultRDD: RDD[T] = null + ): DStream[T] = { + val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD) + registerInputStream(inputStream) + inputStream + } + + /** + * Create a unified DStream from multiple DStreams of the same type and same interval + */ + def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = { + new UnionDStream[T](streams.toArray) + } + + /** + * Registers an input stream that will be started (InputDStream.start() called) to get the + * input data. + */ + def registerInputStream(inputStream: InputDStream[_]) { + graph.addInputStream(inputStream) + } + + /** + * Registers an output stream that will be computed every interval + */ + def registerOutputStream(outputStream: DStream[_]) { + graph.addOutputStream(outputStream) + } + + protected def validate() { + assert(graph != null, "Graph is null") + graph.validate() + + assert( + checkpointDir == null || checkpointDuration != null, + "Checkpoint directory has been set, but the graph checkpointing interval has " + + "not been set. Please use StreamingContext.checkpoint() to set the interval." + ) + } + + /** + * Starts the execution of the streams. + */ + def start() { + if (checkpointDir != null && checkpointDuration == null && graph != null) { + checkpointDuration = graph.batchDuration + } + + validate() + + val networkInputStreams = graph.getInputStreams().filter(s => s match { + case n: NetworkInputDStream[_] => true + case _ => false + }).map(_.asInstanceOf[NetworkInputDStream[_]]).toArray + + if (networkInputStreams.length > 0) { + // Start the network input tracker (must start before receivers) + networkInputTracker = new NetworkInputTracker(this, networkInputStreams) + networkInputTracker.start() + } + + Thread.sleep(1000) + + // Start the scheduler + scheduler = new Scheduler(this) + scheduler.start() + } + + /** + * Sstops the execution of the streams. + */ + def stop() { + try { + if (scheduler != null) scheduler.stop() + if (networkInputTracker != null) networkInputTracker.stop() + if (receiverJobThread != null) receiverJobThread.interrupt() + sc.stop() + logInfo("StreamingContext stopped successfully") + } catch { + case e: Exception => logWarning("Error while stopping", e) + } + } +} + + +object StreamingContext { + + implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = { + new PairDStreamFunctions[K, V](stream) + } + + protected[streaming] def createNewSparkContext(master: String, frameworkName: String): SparkContext = { + + // Set the default cleaner delay to an hour if not already set. + // This should be sufficient for even 1 second interval. + if (MetadataCleaner.getDelaySeconds < 0) { + MetadataCleaner.setDelaySeconds(3600) + } + new SparkContext(master, frameworkName) + } + + protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = { + if (prefix == null) { + time.milliseconds.toString + } else if (suffix == null || suffix.length ==0) { + prefix + "-" + time.milliseconds + } else { + prefix + "-" + time.milliseconds + "." + suffix + } + } + + protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = { + new Path(sscCheckpointDir, UUID.randomUUID.toString).toString + } +} + diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala new file mode 100644 index 0000000000..5daeb761dd --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/Time.scala @@ -0,0 +1,42 @@ +package spark.streaming + +/** + * This is a simple class that represents an absolute instant of time. + * Internally, it represents time as the difference, measured in milliseconds, between the current + * time and midnight, January 1, 1970 UTC. This is the same format as what is returned by + * System.currentTimeMillis. + */ +case class Time(private val millis: Long) { + + def milliseconds: Long = millis + + def < (that: Time): Boolean = (this.millis < that.millis) + + def <= (that: Time): Boolean = (this.millis <= that.millis) + + def > (that: Time): Boolean = (this.millis > that.millis) + + def >= (that: Time): Boolean = (this.millis >= that.millis) + + def + (that: Duration): Time = new Time(millis + that.milliseconds) + + def - (that: Time): Duration = new Duration(millis - that.millis) + + def - (that: Duration): Time = new Time(millis - that.milliseconds) + + def floor(that: Duration): Time = { + val t = that.milliseconds + val m = math.floor(this.millis / t).toLong + new Time(m * t) + } + + def isMultipleOf(that: Duration): Boolean = + (this.millis % that.milliseconds == 0) + + def min(that: Time): Time = if (this < that) this else that + + def max(that: Time): Time = if (this > that) this else that + + override def toString: String = (millis.toString + " ms") + +}
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala new file mode 100644 index 0000000000..2e7466b16c --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -0,0 +1,91 @@ +package spark.streaming.api.java + +import spark.streaming.{Duration, Time, DStream} +import spark.api.java.function.{Function => JFunction} +import spark.api.java.JavaRDD +import spark.storage.StorageLevel + +/** + * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]] + * for more details on RDDs). DStreams can either be created from live data (such as, data from + * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations + * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each + * DStream periodically generates a RDD, either from live data or by transforming the RDD generated + * by a parent DStream. + * + * This class contains the basic operations available on all DStreams, such as `map`, `filter` and + * `window`. In addition, [[spark.streaming.api.java.JavaPairDStream]] contains operations available + * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations + * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through + * implicit conversions when `spark.streaming.StreamingContext._` is imported. + * + * DStreams internally is characterized by a few basic properties: + * - A list of other DStreams that the DStream depends on + * - A time interval at which the DStream generates an RDD + * - A function that is used to generate an RDD after each time interval + */ +class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) + extends JavaDStreamLike[T, JavaDStream[T]] { + + /** Return a new DStream containing only the elements that satisfy a predicate. */ + def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = + dstream.filter((x => f(x).booleanValue())) + + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def cache(): JavaDStream[T] = dstream.cache() + + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def persist(): JavaDStream[T] = dstream.cache() + + /** Persist the RDDs of this DStream with the given storage level */ + def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel) + + /** Generate an RDD for the given duration */ + def compute(validTime: Time): JavaRDD[T] = { + dstream.compute(validTime) match { + case Some(rdd) => new JavaRDD(rdd) + case None => null + } + } + + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * The new DStream generates RDDs with the same interval as this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's interval. + * @return + */ + def window(windowDuration: Duration): JavaDStream[T] = + dstream.window(windowDuration) + + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * @param windowDuration duration (i.e., width) of the window; + * must be a multiple of this DStream's interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's interval + */ + def window(windowDuration: Duration, slideDuration: Duration): JavaDStream[T] = + dstream.window(windowDuration, slideDuration) + + /** + * Return a new DStream which computed based on tumbling window on this DStream. + * This is equivalent to window(batchDuration, batchDuration). + * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval + */ + def tumble(batchDuration: Duration): JavaDStream[T] = + dstream.tumble(batchDuration) + + /** + * Return a new DStream by unifying data of another DStream with this DStream. + * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream. + */ + def union(that: JavaDStream[T]): JavaDStream[T] = + dstream.union(that.dstream) +} + +object JavaDStream { + implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] = + new JavaDStream[T](dstream) +}
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala new file mode 100644 index 0000000000..b93cb7865a --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -0,0 +1,183 @@ +package spark.streaming.api.java + +import java.util.{List => JList} +import java.lang.{Long => JLong} + +import scala.collection.JavaConversions._ + +import spark.streaming._ +import spark.api.java.JavaRDD +import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} +import java.util +import spark.RDD +import JavaDStream._ + +trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable { + implicit val classManifest: ClassManifest[T] + + def dstream: DStream[T] + + implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = { + in.map(new JLong(_)) + } + + /** + * Print the first ten elements of each RDD generated in this DStream. This is an output + * operator, so this DStream will be registered as an output stream and there materialized. + */ + def print() = dstream.print() + + /** + * Return a new DStream in which each RDD has a single element generated by counting each RDD + * of this DStream. + */ + def count(): JavaDStream[JLong] = dstream.count() + + /** + * Return a new DStream in which each RDD has a single element generated by counting the number + * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the + * window() operation. This is equivalent to window(windowDuration, slideDuration).count() + */ + def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = { + dstream.countByWindow(windowDuration, slideDuration) + } + + /** + * Return a new DStream in which each RDD is generated by applying glom() to each RDD of + * this DStream. Applying glom() to an RDD coalesces all elements within each partition into + * an array. + */ + def glom(): JavaDStream[JList[T]] = + new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) + + /** Return the StreamingContext associated with this DStream */ + def context(): StreamingContext = dstream.context() + + /** Return a new DStream by applying a function to all elements of this DStream. */ + def map[R](f: JFunction[T, R]): JavaDStream[R] = { + new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType()) + } + + /** Return a new DStream by applying a function to all elements of this DStream. */ + def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = { + def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] + new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) + } + + /** + * Return a new DStream by applying a function to all elements of this DStream, + * and then flattening the results + */ + def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = { + import scala.collection.JavaConverters._ + def fn = (x: T) => f.apply(x).asScala + new JavaDStream(dstream.flatMap(fn)(f.elementType()))(f.elementType()) + } + + /** + * Return a new DStream by applying a function to all elements of this DStream, + * and then flattening the results + */ + def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairDStream[K, V] = { + import scala.collection.JavaConverters._ + def fn = (x: T) => f.apply(x).asScala + def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] + new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType()) + } + + /** + * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs + * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition + * of the RDD. + */ + def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType()) + } + + /** + * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs + * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition + * of the RDD. + */ + def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V]) + : JavaPairDStream[K, V] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType()) + } + + /** + * Return a new DStream in which each RDD has a single element generated by reducing each RDD + * of this DStream. + */ + def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f) + + /** + * Return a new DStream in which each RDD has a single element generated by reducing all + * elements in a window over this DStream. windowDuration and slideDuration are as defined in the + * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc) + */ + def reduceByWindow( + reduceFunc: JFunction2[T, T, T], + invReduceFunc: JFunction2[T, T, T], + windowDuration: Duration, + slideDuration: Duration + ): JavaDStream[T] = { + dstream.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration) + } + + /** + * Return all the RDDs between 'fromDuration' to 'toDuration' (both included) + */ + def slice(fromDuration: Duration, toDuration: Duration): JList[JavaRDD[T]] = { + new util.ArrayList(dstream.slice(fromDuration, toDuration).map(new JavaRDD(_)).toSeq) + } + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * this DStream will be registered as an output stream and therefore materialized. + */ + def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) { + dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd))) + } + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * this DStream will be registered as an output stream and therefore materialized. + */ + def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) { + dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = { + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + def scalaTransform (in: RDD[T]): RDD[U] = + transformFunc.call(new JavaRDD[T](in)).rdd + dstream.transform(scalaTransform(_)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = { + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + def scalaTransform (in: RDD[T], time: Time): RDD[U] = + transformFunc.call(new JavaRDD[T](in), time).rdd + dstream.transform(scalaTransform(_, _)) + } + + /** + * Enable periodic checkpointing of RDDs of this DStream + * @param interval Time interval after which generated RDD will be checkpointed + */ + def checkpoint(interval: Duration) = { + dstream.checkpoint(interval) + } +}
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala new file mode 100644 index 0000000000..ef10c091ca --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -0,0 +1,638 @@ +package spark.streaming.api.java + +import java.util.{List => JList} +import java.lang.{Long => JLong} + +import scala.collection.JavaConversions._ + +import spark.streaming._ +import spark.streaming.StreamingContext._ +import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import spark.Partitioner +import org.apache.hadoop.mapred.{JobConf, OutputFormat} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} +import org.apache.hadoop.conf.Configuration +import spark.api.java.JavaPairRDD +import spark.storage.StorageLevel +import com.google.common.base.Optional + +class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( + implicit val kManifiest: ClassManifest[K], + implicit val vManifest: ClassManifest[V]) + extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] { + + // ======================================================================= + // Methods common to all DStream's + // ======================================================================= + + /** Returns a new DStream containing only the elements that satisfy a predicate. */ + def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] = + dstream.filter((x => f(x).booleanValue())) + + /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def cache(): JavaPairDStream[K, V] = dstream.cache() + + /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def persist(): JavaPairDStream[K, V] = dstream.cache() + + /** Persists the RDDs of this DStream with the given storage level */ + def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel) + + /** Method that generates a RDD for the given Duration */ + def compute(validTime: Time): JavaPairRDD[K, V] = { + dstream.compute(validTime) match { + case Some(rdd) => new JavaPairRDD(rdd) + case None => null + } + } + + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * The new DStream generates RDDs with the same interval as this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's interval. + * @return + */ + def window(windowDuration: Duration): JavaPairDStream[K, V] = + dstream.window(windowDuration) + + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * @param windowDuration duration (i.e., width) of the window; + * must be a multiple of this DStream's interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's interval + */ + def window(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] = + dstream.window(windowDuration, slideDuration) + + /** + * Returns a new DStream which computed based on tumbling window on this DStream. + * This is equivalent to window(batchDuration, batchDuration). + * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval + */ + def tumble(batchDuration: Duration): JavaPairDStream[K, V] = + dstream.tumble(batchDuration) + + /** + * Returns a new DStream by unifying data of another DStream with this DStream. + * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream. + */ + def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] = + dstream.union(that.dstream) + + // ======================================================================= + // Methods only for PairDStream's + // ======================================================================= + + /** + * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + */ + def groupByKey(): JavaPairDStream[K, JList[V]] = + dstream.groupByKey().mapValues(seqAsJavaList _) + + /** + * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. + */ + def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] = + dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _) + + /** + * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. + * Therefore, the values for each key in `this` DStream's RDDs are grouped into a + * single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]] + * is used to control the partitioning of each RDD. + */ + def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] = + dstream.groupByKey(partitioner).mapValues(seqAsJavaList _) + + /** + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the associative reduce function. Hash partitioning is used to generate the RDDs + * with Spark's default number of partitions. + */ + def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] = + dstream.reduceByKey(func) + + /** + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs + * with `numPartitions` partitions. + */ + def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] = + dstream.reduceByKey(func, numPartitions) + + /** + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. [[spark.Partitioner]] is used to control the + * partitioning of each RDD. + */ + def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = { + dstream.reduceByKey(func, partitioner) + } + + /** + * Combine elements of each key in DStream's RDDs using custom function. This is similar to the + * combineByKey for RDDs. Please refer to combineByKey in [[spark.PairRDDFunctions]] for more + * information. + */ + def combineByKey[C](createCombiner: JFunction[V, C], + mergeValue: JFunction2[C, V, C], + mergeCombiners: JFunction2[C, C, C], + partitioner: Partitioner + ): JavaPairDStream[K, C] = { + implicit val cm: ClassManifest[C] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]] + dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) + } + + /** + * Create a new DStream by counting the number of values of each key in each RDD. Hash + * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions. + */ + def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions)); + } + + + /** + * Create a new DStream by counting the number of values of each key in each RDD. Hash + * partitioning is used to generate the RDDs with the default number of partitions. + */ + def countByKey(): JavaPairDStream[K, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByKey()); + } + + /** + * Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to + * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs + * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ + def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowDuration).mapValues(seqAsJavaList _) + } + + /** + * Create a new DStream by applying `groupByKey` over a sliding window. Similar to + * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) + : JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _) + } + + /** + * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) + :JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions) + .mapValues(seqAsJavaList _) + } + + /** + * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ + def groupByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ):JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner) + .mapValues(seqAsJavaList _) + } + + /** + * Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream + * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate + * the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration) + :JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration) + } + + /** + * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration + ):JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration) + } + + /** + * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions) + } + + /** + * Create a new DStream by applying `reduceByKey` over a sliding window. Similar to + * `DStream.reduceByKey()`, but applies it over a sliding window. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner) + } + + /** + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + invReduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration) + } + + /** + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + invReduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow( + reduceFunc, + invReduceFunc, + windowDuration, + slideDuration, + numPartitions) + } + + /** + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + invReduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow( + reduceFunc, + invReduceFunc, + windowDuration, + slideDuration, + partitioner) + } + + /** + * Create a new DStream by counting the number of values for each key over a window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) + : JavaPairDStream[K, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration)) + } + + /** + * Create a new DStream by counting the number of values for each key over a window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ + def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) + : JavaPairDStream[K, Long] = { + dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions) + } + + private def convertUpdateStateFunction[S](in: JFunction2[JList[V], Optional[S], Optional[S]]): + (Seq[V], Option[S]) => Option[S] = { + val scalaFunc: (Seq[V], Option[S]) => Option[S] = (values, state) => { + val list: JList[V] = values + val scalaState: Optional[S] = state match { + case Some(s) => Optional.of(s) + case _ => Optional.absent() + } + val result: Optional[S] = in.apply(list, scalaState) + result.isPresent match { + case true => Some(result.get()) + case _ => None + } + } + scalaFunc + } + + /** + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @tparam S State type + */ + def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]]) + : JavaPairDStream[K, S] = { + implicit val cm: ClassManifest[S] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] + dstream.updateStateByKey(convertUpdateStateFunction(updateFunc)) + } + + /** + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @param numPartitions Number of partitions of each RDD in the new DStream. + * @tparam S State type + */ + def updateStateByKey[S: ClassManifest]( + updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], + numPartitions: Int) + : JavaPairDStream[K, S] = { + dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions) + } + + /** + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of the key. + * [[spark.Partitioner]] is used to control the partitioning of each RDD. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @tparam S State type + */ + def updateStateByKey[S: ClassManifest]( + updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], + partitioner: Partitioner + ): JavaPairDStream[K, S] = { + dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner) + } + + def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + dstream.mapValues(f) + } + + def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = { + import scala.collection.JavaConverters._ + def fn = (x: V) => f.apply(x).asScala + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + dstream.flatMapValues(fn) + } + + /** + * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that + * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number + * of partitions. + */ + def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + } + + /** + * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that + * key in both RDDs. Partitioner is used to partition each generated RDD. + */ + def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner) + : JavaPairDStream[K, (JList[V], JList[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.cogroup(other.dstream, partitioner) + .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + } + + /** + * Join `this` DStream with `other` DStream. HashPartitioner is used + * to partition each generated RDD into default number of partitions. + */ + def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.join(other.dstream) + } + + /** + * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will + * be generated by joining RDDs from `this` and other DStream. Uses the given + * Partitioner to partition each generated RDD. + */ + def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner) + : JavaPairDStream[K, (V, W)] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.join(other.dstream, partitioner) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) { + dstream.saveAsHadoopFiles(prefix, suffix) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]]) { + dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + conf: JobConf) { + dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) { + dstream.saveAsNewAPIHadoopFiles(prefix, suffix) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) { + dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]], + conf: Configuration = new Configuration) { + dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) + } + + override val classManifest: ClassManifest[(K, V)] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] +} + +object JavaPairDStream { + implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]) + :JavaPairDStream[K, V] = + new JavaPairDStream[K, V](dstream) + + def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + new JavaPairDStream[K, V](dstream.dstream) + } + + def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long]) + : JavaPairDStream[K, JLong] = { + StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_)) + } +} diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala new file mode 100644 index 0000000000..e7f446a49b --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -0,0 +1,354 @@ +package spark.streaming.api.java + +import scala.collection.JavaConversions._ +import java.lang.{Long => JLong, Integer => JInt} + +import spark.streaming._ +import dstream._ +import spark.storage.StorageLevel +import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import java.io.InputStream +import java.util.{Map => JMap} +import spark.api.java.{JavaSparkContext, JavaRDD} + +/** + * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic + * information (such as, cluster URL and job name) to internally create a SparkContext, it provides + * methods used to create DStream from various input sources. + */ +class JavaStreamingContext(val ssc: StreamingContext) { + + // TODOs: + // - Test to/from Hadoop functions + // - Support creating and registering InputStreams + + + /** + * Creates a StreamingContext. + * @param master Name of the Spark Master + * @param frameworkName Name to be used when registering with the scheduler + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(master: String, frameworkName: String, batchDuration: Duration) = + this(new StreamingContext(master, frameworkName, batchDuration)) + + /** + * Creates a StreamingContext. + * @param sparkContext The underlying JavaSparkContext to use + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(sparkContext: JavaSparkContext, batchDuration: Duration) = + this(new StreamingContext(sparkContext.sc, batchDuration)) + + /** + * Re-creates a StreamingContext from a checkpoint file. + * @param path Path either to the directory that was specified as the checkpoint directory, or + * to the checkpoint file 'graph' or 'graph.bk'. + */ + def this(path: String) = this (new StreamingContext(path)) + + /** The underlying SparkContext */ + val sc: JavaSparkContext = new JavaSparkContext(ssc.sc) + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param hostname Zookeper hostname. + * @param port Zookeper port. + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + */ + def kafkaStream[T]( + hostname: String, + port: Int, + groupId: String, + topics: JMap[String, JInt]) + : JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.kafkaStream[T](hostname, port, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param hostname Zookeper hostname. + * @param port Zookeper port. + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param initialOffsets Optional initial offsets for each of the partitions to consume. + * By default the value is pulled from zookeper. + */ + def kafkaStream[T]( + hostname: String, + port: Int, + groupId: String, + topics: JMap[String, JInt], + initialOffsets: JMap[KafkaPartitionKey, JLong]) + : JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.kafkaStream[T]( + hostname, + port, + groupId, + Map(topics.mapValues(_.intValue()).toSeq: _*), + Map(initialOffsets.mapValues(_.longValue()).toSeq: _*)) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param hostname Zookeper hostname. + * @param port Zookeper port. + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param initialOffsets Optional initial offsets for each of the partitions to consume. + * By default the value is pulled from zookeper. + * @param storageLevel RDD storage level. Defaults to memory-only + */ + def kafkaStream[T]( + hostname: String, + port: Int, + groupId: String, + topics: JMap[String, JInt], + initialOffsets: JMap[KafkaPartitionKey, JLong], + storageLevel: StorageLevel) + : JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.kafkaStream[T]( + hostname, + port, + groupId, + Map(topics.mapValues(_.intValue()).toSeq: _*), + Map(initialOffsets.mapValues(_.longValue()).toSeq: _*), + storageLevel) + } + + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited + * lines. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + */ + def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel) + : JavaDStream[String] = { + ssc.networkTextStream(hostname, port, storageLevel) + } + + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited + * lines. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + */ + def networkTextStream(hostname: String, port: Int): JavaDStream[String] = { + ssc.networkTextStream(hostname, port) + } + + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes it interepreted as object using the given + * converter. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param converter Function to convert the byte stream to objects + * @param storageLevel Storage level to use for storing the received objects + * @tparam T Type of the objects received (after converting bytes to objects) + */ + def networkStream[T]( + hostname: String, + port: Int, + converter: JFunction[InputStream, java.lang.Iterable[T]], + storageLevel: StorageLevel) + : JavaDStream[T] = { + def fn = (x: InputStream) => converter.apply(x).toIterator + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.networkStream(hostname, port, fn, storageLevel) + } + + /** + * Creates a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as text files (using key as LongWritable, value + * as Text and input format as TextInputFormat). File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + */ + def textFileStream(directory: String): JavaDStream[String] = { + ssc.textFileStream(directory) + } + + /** + * Create a input stream from network source hostname:port, where data is received + * as serialized blocks (serialized using the Spark's serializer) that can be directly + * pushed into the block manager without deserializing them. This is the most efficient + * way to receive data. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param storageLevel Storage level to use for storing the received objects + * @tparam T Type of the objects in the received blocks + */ + def rawNetworkStream[T]( + hostname: String, + port: Int, + storageLevel: StorageLevel): JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port, storageLevel)) + } + + /** + * Create a input stream from network source hostname:port, where data is received + * as serialized blocks (serialized using the Spark's serializer) that can be directly + * pushed into the block manager without deserializing them. This is the most efficient + * way to receive data. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @tparam T Type of the objects in the received blocks + */ + def rawNetworkStream[T](hostname: String, port: Int): JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port)) + } + + /** + * Creates a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cmf: ClassManifest[F] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]] + ssc.fileStream[K, V, F](directory); + } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): + JavaDStream[SparkFlumeEvent] = { + ssc.flumeStream(hostname, port, storageLevel) + } + + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + */ + def flumeStream(hostname: String, port: Int): + JavaDStream[SparkFlumeEvent] = { + ssc.flumeStream(hostname, port) + } + + /** + * Registers an output stream that will be computed every interval + */ + def registerOutputStream(outputStream: JavaDStreamLike[_, _]) { + ssc.registerOutputStream(outputStream.dstream) + } + + /** + * Creates a input stream from an queue of RDDs. In each batch, + * it will process either one or all of the RDDs returned by the queue. + * + * NOTE: changes to the queue after the stream is created will not be recognized. + * @param queue Queue of RDDs + * @tparam T Type of objects in the RDD + */ + def queueStream[T](queue: java.util.Queue[JavaRDD[T]]): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]] + sQueue.enqueue(queue.map(_.rdd).toSeq: _*) + ssc.queueStream(sQueue) + } + + /** + * Creates a input stream from an queue of RDDs. In each batch, + * it will process either one or all of the RDDs returned by the queue. + * + * NOTE: changes to the queue after the stream is created will not be recognized. + * @param queue Queue of RDDs + * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval + * @tparam T Type of objects in the RDD + */ + def queueStream[T](queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]] + sQueue.enqueue(queue.map(_.rdd).toSeq: _*) + ssc.queueStream(sQueue, oneAtATime) + } + + /** + * Creates a input stream from an queue of RDDs. In each batch, + * it will process either one or all of the RDDs returned by the queue. + * + * NOTE: changes to the queue after the stream is created will not be recognized. + * @param queue Queue of RDDs + * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval + * @param defaultRDD Default RDD is returned by the DStream when the queue is empty + * @tparam T Type of objects in the RDD + */ + def queueStream[T]( + queue: java.util.Queue[JavaRDD[T]], + oneAtATime: Boolean, + defaultRDD: JavaRDD[T]): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]] + sQueue.enqueue(queue.map(_.rdd).toSeq: _*) + ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd) + } + + /** + * Sets the context to periodically checkpoint the DStream operations for master + * fault-tolerance. By default, the graph will be checkpointed every batch interval. + * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored + * @param interval checkpoint interval + */ + def checkpoint(directory: String, interval: Duration = null) { + ssc.checkpoint(directory, interval) + } + + /** + * Sets each DStreams in this context to remember RDDs it generated in the last given duration. + * DStreams remember RDDs only for a limited duration of duration and releases them for garbage + * collection. This method allows the developer to specify how to long to remember the RDDs ( + * if the developer wishes to query old data outside the DStream computation). + * @param duration Minimum duration that each DStream should remember its RDDs + */ + def remember(duration: Duration) { + ssc.remember(duration) + } + + /** + * Starts the execution of the streams. + */ + def start() = ssc.start() + + /** + * Sstops the execution of the streams. + */ + def stop() = ssc.stop() + +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala new file mode 100644 index 0000000000..ddb1bf6b28 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala @@ -0,0 +1,40 @@ +package spark.streaming.dstream + +import spark.{RDD, Partitioner} +import spark.rdd.CoGroupedRDD +import spark.streaming.{Time, DStream, Duration} + +private[streaming] +class CoGroupedDStream[K : ClassManifest]( + parents: Seq[DStream[(_, _)]], + partitioner: Partitioner + ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) { + + if (parents.length == 0) { + throw new IllegalArgumentException("Empty array of parents") + } + + if (parents.map(_.ssc).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different StreamingContexts") + } + + if (parents.map(_.slideDuration).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different slide times") + } + + override def dependencies = parents.toList + + override def slideDuration: Duration = parents.head.slideDuration + + override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = { + val part = partitioner + val rdds = parents.flatMap(_.getOrCompute(validTime)) + if (rdds.size > 0) { + val q = new CoGroupedRDD[K](rdds, part) + Some(q) + } else { + None + } + } + +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala new file mode 100644 index 0000000000..41c3af4694 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala @@ -0,0 +1,19 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.streaming.{Time, StreamingContext} + +/** + * An input stream that always returns the same RDD on each timestep. Useful for testing. + */ +class ConstantInputDStream[T: ClassManifest](ssc_ : StreamingContext, rdd: RDD[T]) + extends InputDStream[T](ssc_) { + + override def start() {} + + override def stop() {} + + override def compute(validTime: Time): Option[RDD[T]] = { + Some(rdd) + } +}
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala new file mode 100644 index 0000000000..1e6ad84b44 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala @@ -0,0 +1,102 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.rdd.UnionRDD +import spark.streaming.{StreamingContext, Time} + +import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} + +import scala.collection.mutable.HashSet + +private[streaming] +class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( + @transient ssc_ : StreamingContext, + directory: String, + filter: Path => Boolean = FileInputDStream.defaultFilter, + newFilesOnly: Boolean = true) + extends InputDStream[(K, V)](ssc_) { + + @transient private var path_ : Path = null + @transient private var fs_ : FileSystem = null + + var lastModTime = 0L + val lastModTimeFiles = new HashSet[String]() + + def path(): Path = { + if (path_ == null) path_ = new Path(directory) + path_ + } + + def fs(): FileSystem = { + if (fs_ == null) fs_ = path.getFileSystem(new Configuration()) + fs_ + } + + override def start() { + if (newFilesOnly) { + lastModTime = System.currentTimeMillis() + } else { + lastModTime = 0 + } + } + + override def stop() { } + + /** + * Finds the files that were modified since the last time this method was called and makes + * a union RDD out of them. Note that this maintains the list of files that were processed + * in the latest modification time in the previous call to this method. This is because the + * modification time returned by the FileStatus API seems to return times only at the + * granularity of seconds. Hence, new files may have the same modification time as the + * latest modification time in the previous call to this method and the list of files + * maintained is used to filter the one that have been processed. + */ + override def compute(validTime: Time): Option[RDD[(K, V)]] = { + // Create the filter for selecting new files + val newFilter = new PathFilter() { + var latestModTime = 0L + val latestModTimeFiles = new HashSet[String]() + + def accept(path: Path): Boolean = { + if (!filter(path)) { + return false + } else { + val modTime = fs.getFileStatus(path).getModificationTime() + if (modTime < lastModTime){ + return false + } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) { + return false + } + if (modTime > latestModTime) { + latestModTime = modTime + latestModTimeFiles.clear() + } + latestModTimeFiles += path.toString + return true + } + } + } + + val newFiles = fs.listStatus(path, newFilter) + logInfo("New files: " + newFiles.map(_.getPath).mkString(", ")) + if (newFiles.length > 0) { + // Update the modification time and the files processed for that modification time + if (lastModTime != newFilter.latestModTime) { + lastModTime = newFilter.latestModTime + lastModTimeFiles.clear() + } + lastModTimeFiles ++= newFilter.latestModTimeFiles + } + val newRDD = new UnionRDD(ssc.sc, newFiles.map( + file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString))) + Some(newRDD) + } +} + +private[streaming] +object FileInputDStream { + def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala new file mode 100644 index 0000000000..e993164f99 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala @@ -0,0 +1,21 @@ +package spark.streaming.dstream + +import spark.streaming.{Duration, DStream, Time} +import spark.RDD + +private[streaming] +class FilteredDStream[T: ClassManifest]( + parent: DStream[T], + filterFunc: T => Boolean + ) extends DStream[T](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[T]] = { + parent.getOrCompute(validTime).map(_.filter(filterFunc)) + } +} + + diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala new file mode 100644 index 0000000000..cabd34f5f2 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala @@ -0,0 +1,20 @@ +package spark.streaming.dstream + +import spark.streaming.{Duration, DStream, Time} +import spark.RDD +import spark.SparkContext._ + +private[streaming] +class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( + parent: DStream[(K, V)], + flatMapValueFunc: V => TraversableOnce[U] + ) extends DStream[(K, U)](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[(K, U)]] = { + parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)) + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala new file mode 100644 index 0000000000..a69af60589 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala @@ -0,0 +1,20 @@ +package spark.streaming.dstream + +import spark.streaming.{Duration, DStream, Time} +import spark.RDD + +private[streaming] +class FlatMappedDStream[T: ClassManifest, U: ClassManifest]( + parent: DStream[T], + flatMapFunc: T => Traversable[U] + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) + } +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala new file mode 100644 index 0000000000..efc7058480 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala @@ -0,0 +1,137 @@ +package spark.streaming.dstream + +import spark.streaming.StreamingContext + +import spark.Utils +import spark.storage.StorageLevel + +import org.apache.flume.source.avro.AvroSourceProtocol +import org.apache.flume.source.avro.AvroFlumeEvent +import org.apache.flume.source.avro.Status +import org.apache.avro.ipc.specific.SpecificResponder +import org.apache.avro.ipc.NettyServer + +import scala.collection.JavaConversions._ + +import java.net.InetSocketAddress +import java.io.{ObjectInput, ObjectOutput, Externalizable} +import java.nio.ByteBuffer + +private[streaming] +class FlumeInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + storageLevel: StorageLevel +) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { + + override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = { + new FlumeReceiver(host, port, storageLevel) + } +} + +/** + * A wrapper class for AvroFlumeEvent's with a custom serialization format. + * + * This is necessary because AvroFlumeEvent uses inner data structures + * which are not serializable. + */ +class SparkFlumeEvent() extends Externalizable { + var event : AvroFlumeEvent = new AvroFlumeEvent() + + /* De-serialize from bytes. */ + def readExternal(in: ObjectInput) { + val bodyLength = in.readInt() + val bodyBuff = new Array[Byte](bodyLength) + in.read(bodyBuff) + + val numHeaders = in.readInt() + val headers = new java.util.HashMap[CharSequence, CharSequence] + + for (i <- 0 until numHeaders) { + val keyLength = in.readInt() + val keyBuff = new Array[Byte](keyLength) + in.read(keyBuff) + val key : String = Utils.deserialize(keyBuff) + + val valLength = in.readInt() + val valBuff = new Array[Byte](valLength) + in.read(valBuff) + val value : String = Utils.deserialize(valBuff) + + headers.put(key, value) + } + + event.setBody(ByteBuffer.wrap(bodyBuff)) + event.setHeaders(headers) + } + + /* Serialize to bytes. */ + def writeExternal(out: ObjectOutput) { + val body = event.getBody.array() + out.writeInt(body.length) + out.write(body) + + val numHeaders = event.getHeaders.size() + out.writeInt(numHeaders) + for ((k, v) <- event.getHeaders) { + val keyBuff = Utils.serialize(k.toString) + out.writeInt(keyBuff.length) + out.write(keyBuff) + val valBuff = Utils.serialize(v.toString) + out.writeInt(valBuff.length) + out.write(valBuff) + } + } +} + +private[streaming] object SparkFlumeEvent { + def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = { + val event = new SparkFlumeEvent + event.event = in + event + } +} + +/** A simple server that implements Flume's Avro protocol. */ +private[streaming] +class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { + override def append(event : AvroFlumeEvent) : Status = { + receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event) + Status.OK + } + + override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { + events.foreach (event => + receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)) + Status.OK + } +} + +/** A NetworkReceiver which listens for events using the + * Flume Avro interface.*/ +private[streaming] +class FlumeReceiver( + host: String, + port: Int, + storageLevel: StorageLevel + ) extends NetworkReceiver[SparkFlumeEvent] { + + lazy val blockGenerator = new BlockGenerator(storageLevel) + + protected override def onStart() { + val responder = new SpecificResponder( + classOf[AvroSourceProtocol], new FlumeEventServer(this)); + val server = new NettyServer(responder, new InetSocketAddress(host, port)); + blockGenerator.start() + server.start() + logInfo("Flume receiver started") + } + + protected override def onStop() { + blockGenerator.stop() + logInfo("Flume receiver stopped") + } + + override def getLocationPreference = Some(host) +}
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala new file mode 100644 index 0000000000..ee69ea5177 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala @@ -0,0 +1,28 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.streaming.{Duration, DStream, Job, Time} + +private[streaming] +class ForEachDStream[T: ClassManifest] ( + parent: DStream[T], + foreachFunc: (RDD[T], Time) => Unit + ) extends DStream[Unit](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[Unit]] = None + + override def generateJob(time: Time): Option[Job] = { + parent.getOrCompute(time) match { + case Some(rdd) => + val jobFunc = () => { + foreachFunc(rdd, time) + } + Some(new Job(time, jobFunc)) + case None => None + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala new file mode 100644 index 0000000000..b589cbd4d5 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala @@ -0,0 +1,17 @@ +package spark.streaming.dstream + +import spark.streaming.{Duration, DStream, Time} +import spark.RDD + +private[streaming] +class GlommedDStream[T: ClassManifest](parent: DStream[T]) + extends DStream[Array[T]](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[Array[T]]] = { + parent.getOrCompute(validTime).map(_.glom()) + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala new file mode 100644 index 0000000000..980ca5177e --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala @@ -0,0 +1,19 @@ +package spark.streaming.dstream + +import spark.streaming.{Duration, StreamingContext, DStream} + +abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext) + extends DStream[T](ssc_) { + + override def dependencies = List() + + override def slideDuration: Duration = { + if (ssc == null) throw new Exception("ssc is null") + if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null") + ssc.graph.batchDuration + } + + def start() + + def stop() +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala new file mode 100644 index 0000000000..2b4740bdf7 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -0,0 +1,200 @@ +package spark.streaming.dstream + +import spark.Logging +import spark.storage.StorageLevel +import spark.streaming.{Time, DStreamCheckpointData, StreamingContext} + +import java.util.Properties +import java.util.concurrent.Executors + +import kafka.consumer._ +import kafka.message.{Message, MessageSet, MessageAndMetadata} +import kafka.serializer.StringDecoder +import kafka.utils.{Utils, ZKGroupTopicDirs} +import kafka.utils.ZkUtils._ + +import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ + + +// Key for a specific Kafka Partition: (broker, topic, group, part) +case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int) +// NOT USED - Originally intended for fault-tolerance +// Metadata for a Kafka Stream that it sent to the Master +private[streaming] +case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long]) +// NOT USED - Originally intended for fault-tolerance +// Checkpoint data specific to a KafkaInputDstream +private[streaming] +case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], + savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds) + +/** + * Input stream that pulls messages from a Kafka Broker. + * + * @param host Zookeper hostname. + * @param port Zookeper port. + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param initialOffsets Optional initial offsets for each of the partitions to consume. + * By default the value is pulled from zookeper. + * @param storageLevel RDD storage level. + */ +private[streaming] +class KafkaInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + groupId: String, + topics: Map[String, Int], + initialOffsets: Map[KafkaPartitionKey, Long], + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_ ) with Logging { + + // Metadata that keeps track of which messages have already been consumed. + var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]() + + /* NOT USED - Originally intended for fault-tolerance + + // In case of a failure, the offets for a particular timestamp will be restored. + @transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null + + + override protected[streaming] def addMetadata(metadata: Any) { + metadata match { + case x : KafkaInputDStreamMetadata => + savedOffsets(x.timestamp) = x.data + // TOOD: Remove logging + logInfo("New saved Offsets: " + savedOffsets) + case _ => logInfo("Received unknown metadata: " + metadata.toString) + } + } + + override protected[streaming] def updateCheckpointData(currentTime: Time) { + super.updateCheckpointData(currentTime) + if(savedOffsets.size > 0) { + // Find the offets that were stored before the checkpoint was initiated + val key = savedOffsets.keys.toList.sortWith(_ < _).filter(_ < currentTime.millis).last + val latestOffsets = savedOffsets(key) + logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString) + checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets) + // TODO: This may throw out offsets that are created after the checkpoint, + // but it's unlikely we'll need them. + savedOffsets.clear() + } + } + + override protected[streaming] def restoreCheckpointData() { + super.restoreCheckpointData() + logInfo("Restoring KafkaDStream checkpoint data.") + checkpointData match { + case x : KafkaDStreamCheckpointData => + restoredOffsets = x.savedOffsets + logInfo("Restored KafkaDStream offsets: " + savedOffsets) + } + } */ + + def createReceiver(): NetworkReceiver[T] = { + new KafkaReceiver(host, port, groupId, topics, initialOffsets, storageLevel) + .asInstanceOf[NetworkReceiver[T]] + } +} + +private[streaming] +class KafkaReceiver(host: String, port: Int, groupId: String, + topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], + storageLevel: StorageLevel) extends NetworkReceiver[Any] { + + // Timeout for establishing a connection to Zookeper in ms. + val ZK_TIMEOUT = 10000 + + // Handles pushing data into the BlockManager + lazy protected val blockGenerator = new BlockGenerator(storageLevel) + // Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset + lazy val offsets = HashMap[KafkaPartitionKey, Long]() + // Connection to Kafka + var consumerConnector : ZookeeperConsumerConnector = null + + def onStop() { + blockGenerator.stop() + } + + def onStart() { + + blockGenerator.start() + + // In case we are using multiple Threads to handle Kafka Messages + val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) + + val zooKeeperEndPoint = host + ":" + port + logInfo("Starting Kafka Consumer Stream with group: " + groupId) + logInfo("Initial offsets: " + initialOffsets.toString) + + // Zookeper connection properties + val props = new Properties() + props.put("zk.connect", zooKeeperEndPoint) + props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString) + props.put("groupid", groupId) + + // Create the connection to the cluster + logInfo("Connecting to Zookeper: " + zooKeeperEndPoint) + val consumerConfig = new ConsumerConfig(props) + consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector] + logInfo("Connected to " + zooKeeperEndPoint) + + // Reset the Kafka offsets in case we are recovering from a failure + resetOffsets(initialOffsets) + + // Create Threads for each Topic/Message Stream we are listening + val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder()) + + // Start the messages handler for each partition + topicMessageStreams.values.foreach { streams => + streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } + } + + } + + // Overwrites the offets in Zookeper. + private def resetOffsets(offsets: Map[KafkaPartitionKey, Long]) { + offsets.foreach { case(key, offset) => + val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic) + val partitionName = key.brokerId + "-" + key.partId + updatePersistentPath(consumerConnector.zkClient, + topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString) + } + } + + // Handles Kafka Messages + private class MessageHandler(stream: KafkaStream[String]) extends Runnable { + def run() { + logInfo("Starting MessageHandler.") + stream.takeWhile { msgAndMetadata => + blockGenerator += msgAndMetadata.message + + // Updating the offet. The key is (broker, topic, group, partition). + val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic, + groupId, msgAndMetadata.topicInfo.partition.partId) + val offset = msgAndMetadata.topicInfo.getConsumeOffset + offsets.put(key, offset) + // logInfo("Handled message: " + (key, offset).toString) + + // Keep on handling messages + true + } + } + } + + // NOT USED - Originally intended for fault-tolerance + // class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel) + // extends BufferingBlockCreator[Any](receiver, storageLevel) { + + // override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = { + // // Creates a new Block with Kafka-specific Metadata + // new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap)) + // } + + // } + +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala new file mode 100644 index 0000000000..848afecfad --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala @@ -0,0 +1,21 @@ +package spark.streaming.dstream + +import spark.streaming.{Duration, DStream, Time} +import spark.RDD + +private[streaming] +class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( + parent: DStream[T], + mapPartFunc: Iterator[T] => Iterator[U], + preservePartitioning: Boolean + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) + } +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala new file mode 100644 index 0000000000..6055aa6a05 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala @@ -0,0 +1,21 @@ +package spark.streaming.dstream + +import spark.streaming.{Duration, DStream, Time} +import spark.RDD +import spark.SparkContext._ + +private[streaming] +class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( + parent: DStream[(K, V)], + mapValueFunc: V => U + ) extends DStream[(K, U)](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[(K, U)]] = { + parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc)) + } +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala new file mode 100644 index 0000000000..20818a0cab --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala @@ -0,0 +1,20 @@ +package spark.streaming.dstream + +import spark.streaming.{Duration, DStream, Time} +import spark.RDD + +private[streaming] +class MappedDStream[T: ClassManifest, U: ClassManifest] ( + parent: DStream[T], + mapFunc: T => U + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(_.map[U](mapFunc)) + } +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala new file mode 100644 index 0000000000..8c322dd698 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -0,0 +1,254 @@ +package spark.streaming.dstream + +import spark.streaming.{Time, StreamingContext, AddBlocks, RegisterReceiver, DeregisterReceiver} + +import spark.{Logging, SparkEnv, RDD} +import spark.rdd.BlockRDD +import spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer + +import java.nio.ByteBuffer + +import akka.actor.{Props, Actor} +import akka.pattern.ask +import akka.dispatch.Await +import akka.util.duration._ +import spark.streaming.util.{RecurringTimer, SystemClock} +import java.util.concurrent.ArrayBlockingQueue + +/** + * Abstract class for defining any InputDStream that has to start a receiver on worker + * nodes to receive external data. Specific implementations of NetworkInputDStream must + * define the createReceiver() function that creates the receiver object of type + * [[spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive + * data. + * @param ssc_ Streaming context that will execute this input stream + * @tparam T Class type of the object of this stream + */ +abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext) + extends InputDStream[T](ssc_) { + + // This is an unique identifier that is used to match the network receiver with the + // corresponding network input stream. + val id = ssc.getNewNetworkStreamId() + + /** + * Creates the receiver object that will be sent to the worker nodes + * to receive data. This method needs to defined by any specific implementation + * of a NetworkInputDStream. + */ + def createReceiver(): NetworkReceiver[T] + + // Nothing to start or stop as both taken care of by the NetworkInputTracker. + def start() {} + + def stop() {} + + override def compute(validTime: Time): Option[RDD[T]] = { + val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime) + Some(new BlockRDD[T](ssc.sc, blockIds)) + } +} + + +private[streaming] sealed trait NetworkReceiverMessage +private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage +private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage +private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage + +/** + * Abstract class of a receiver that can be run on worker nodes to receive external data. See + * [[spark.streaming.dstream.NetworkInputDStream]] for an explanation. + */ +abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Logging { + + initLogging() + + lazy protected val env = SparkEnv.get + + lazy protected val actor = env.actorSystem.actorOf( + Props(new NetworkReceiverActor()), "NetworkReceiver-" + streamId) + + lazy protected val receivingThread = Thread.currentThread() + + protected var streamId: Int = -1 + + /** + * This method will be called to start receiving data. All your receiver + * starting code should be implemented by defining this function. + */ + protected def onStart() + + /** This method will be called to stop receiving data. */ + protected def onStop() + + /** Conveys a placement preference (hostname) for this receiver. */ + def getLocationPreference() : Option[String] = None + + /** + * Starts the receiver. First is accesses all the lazy members to + * materialize them. Then it calls the user-defined onStart() method to start + * other threads, etc required to receiver the data. + */ + def start() { + try { + // Access the lazy vals to materialize them + env + actor + receivingThread + + // Call user-defined onStart() + onStart() + } catch { + case ie: InterruptedException => + logInfo("Receiving thread interrupted") + //println("Receiving thread interrupted") + case e: Exception => + stopOnError(e) + } + } + + /** + * Stops the receiver. First it interrupts the main receiving thread, + * that is, the thread that called receiver.start(). Then it calls the user-defined + * onStop() method to stop other threads and/or do cleanup. + */ + def stop() { + receivingThread.interrupt() + onStop() + //TODO: terminate the actor + } + + /** + * Stops the receiver and reports to exception to the tracker. + * This should be called whenever an exception has happened on any thread + * of the receiver. + */ + protected def stopOnError(e: Exception) { + logError("Error receiving data", e) + stop() + actor ! ReportError(e.toString) + } + + + /** + * Pushes a block (as iterator of values) into the block manager. + */ + def pushBlock(blockId: String, iterator: Iterator[T], metadata: Any, level: StorageLevel) { + val buffer = new ArrayBuffer[T] ++ iterator + env.blockManager.put(blockId, buffer.asInstanceOf[ArrayBuffer[Any]], level) + + actor ! ReportBlock(blockId, metadata) + } + + /** + * Pushes a block (as bytes) into the block manager. + */ + def pushBlock(blockId: String, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { + env.blockManager.putBytes(blockId, bytes, level) + actor ! ReportBlock(blockId, metadata) + } + + /** A helper actor that communicates with the NetworkInputTracker */ + private class NetworkReceiverActor extends Actor { + logInfo("Attempting to register with tracker") + val ip = System.getProperty("spark.driver.host", "localhost") + val port = System.getProperty("spark.driver.port", "7077").toInt + val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port) + val tracker = env.actorSystem.actorFor(url) + val timeout = 5.seconds + + override def preStart() { + val future = tracker.ask(RegisterReceiver(streamId, self))(timeout) + Await.result(future, timeout) + } + + override def receive() = { + case ReportBlock(blockId, metadata) => + tracker ! AddBlocks(streamId, Array(blockId), metadata) + case ReportError(msg) => + tracker ! DeregisterReceiver(streamId, msg) + case StopReceiver(msg) => + stop() + tracker ! DeregisterReceiver(streamId, msg) + } + } + + protected[streaming] def setStreamId(id: Int) { + streamId = id + } + + /** + * Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into + * appropriately named blocks at regular intervals. This class starts two threads, + * one to periodically start a new batch and prepare the previous batch of as a block, + * the other to push the blocks into the block manager. + */ + class BlockGenerator(storageLevel: StorageLevel) + extends Serializable with Logging { + + case class Block(id: String, iterator: Iterator[T], metadata: Any = null) + + val clock = new SystemClock() + val blockInterval = 200L + val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) + val blockStorageLevel = storageLevel + val blocksForPushing = new ArrayBlockingQueue[Block](1000) + val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } + + var currentBuffer = new ArrayBuffer[T] + + def start() { + blockIntervalTimer.start() + blockPushingThread.start() + logInfo("Data handler started") + } + + def stop() { + blockIntervalTimer.stop() + blockPushingThread.interrupt() + logInfo("Data handler stopped") + } + + def += (obj: T) { + currentBuffer += obj + } + + private def createBlock(blockId: String, iterator: Iterator[T]) : Block = { + new Block(blockId, iterator) + } + + private def updateCurrentBuffer(time: Long) { + try { + val newBlockBuffer = currentBuffer + currentBuffer = new ArrayBuffer[T] + if (newBlockBuffer.size > 0) { + val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time - blockInterval) + val newBlock = createBlock(blockId, newBlockBuffer.toIterator) + blocksForPushing.add(newBlock) + } + } catch { + case ie: InterruptedException => + logInfo("Block interval timer thread interrupted") + case e: Exception => + NetworkReceiver.this.stop() + } + } + + private def keepPushingBlocks() { + logInfo("Block pushing thread started") + try { + while(true) { + val block = blocksForPushing.take() + NetworkReceiver.this.pushBlock(block.id, block.iterator, block.metadata, storageLevel) + } + } catch { + case ie: InterruptedException => + logInfo("Block pushing thread interrupted") + case e: Exception => + NetworkReceiver.this.stop() + } + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala new file mode 100644 index 0000000000..024bf3bea4 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala @@ -0,0 +1,41 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.rdd.UnionRDD + +import scala.collection.mutable.Queue +import scala.collection.mutable.ArrayBuffer +import spark.streaming.{Time, StreamingContext} + +class QueueInputDStream[T: ClassManifest]( + @transient ssc: StreamingContext, + val queue: Queue[RDD[T]], + oneAtATime: Boolean, + defaultRDD: RDD[T] + ) extends InputDStream[T](ssc) { + + override def start() { } + + override def stop() { } + + override def compute(validTime: Time): Option[RDD[T]] = { + val buffer = new ArrayBuffer[RDD[T]]() + if (oneAtATime && queue.size > 0) { + buffer += queue.dequeue() + } else { + buffer ++= queue + } + if (buffer.size > 0) { + if (oneAtATime) { + Some(buffer.first) + } else { + Some(new UnionRDD(ssc.sc, buffer.toSeq)) + } + } else if (defaultRDD != null) { + Some(defaultRDD) + } else { + None + } + } + +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala new file mode 100644 index 0000000000..04e6b69b7b --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala @@ -0,0 +1,91 @@ +package spark.streaming.dstream + +import spark.Logging +import spark.storage.StorageLevel +import spark.streaming.StreamingContext + +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.{ReadableByteChannel, SocketChannel} +import java.io.EOFException +import java.util.concurrent.ArrayBlockingQueue + + +/** + * An input stream that reads blocks of serialized objects from a given network address. + * The blocks will be inserted directly into the block store. This is the fastest way to get + * data into Spark Streaming, though it requires the sender to batch data and serialize it + * in the format that the system is configured with. + */ +private[streaming] +class RawInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_ ) with Logging { + + def createReceiver(): NetworkReceiver[T] = { + new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]] + } +} + +private[streaming] +class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel) + extends NetworkReceiver[Any] { + + var blockPushingThread: Thread = null + + override def getLocationPreference = None + + def onStart() { + // Open a socket to the target address and keep reading from it + logInfo("Connecting to " + host + ":" + port) + val channel = SocketChannel.open() + channel.configureBlocking(true) + channel.connect(new InetSocketAddress(host, port)) + logInfo("Connected to " + host + ":" + port) + + val queue = new ArrayBlockingQueue[ByteBuffer](2) + + blockPushingThread = new Thread { + setDaemon(true) + override def run() { + var nextBlockNumber = 0 + while (true) { + val buffer = queue.take() + val blockId = "input-" + streamId + "-" + nextBlockNumber + nextBlockNumber += 1 + pushBlock(blockId, buffer, null, storageLevel) + } + } + } + blockPushingThread.start() + + val lengthBuffer = ByteBuffer.allocate(4) + while (true) { + lengthBuffer.clear() + readFully(channel, lengthBuffer) + lengthBuffer.flip() + val length = lengthBuffer.getInt() + val dataBuffer = ByteBuffer.allocate(length) + readFully(channel, dataBuffer) + dataBuffer.flip() + logInfo("Read a block with " + length + " bytes") + queue.put(dataBuffer) + } + } + + def onStop() { + if (blockPushingThread != null) blockPushingThread.interrupt() + } + + /** Read a buffer fully from a given Channel */ + private def readFully(channel: ReadableByteChannel, dest: ByteBuffer) { + while (dest.position < dest.limit) { + if (channel.read(dest) == -1) { + throw new EOFException("End of channel") + } + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala new file mode 100644 index 0000000000..733d5c4a25 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -0,0 +1,149 @@ +package spark.streaming.dstream + +import spark.streaming.StreamingContext._ + +import spark.RDD +import spark.rdd.CoGroupedRDD +import spark.Partitioner +import spark.SparkContext._ +import spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer +import spark.streaming.{Duration, Interval, Time, DStream} + +private[streaming] +class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( + parent: DStream[(K, V)], + reduceFunc: (V, V) => V, + invReduceFunc: (V, V) => V, + _windowDuration: Duration, + _slideDuration: Duration, + partitioner: Partitioner + ) extends DStream[(K,V)](parent.ssc) { + + assert(_windowDuration.isMultipleOf(parent.slideDuration), + "The window duration of ReducedWindowedDStream (" + _slideDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" + ) + + assert(_slideDuration.isMultipleOf(parent.slideDuration), + "The slide duration of ReducedWindowedDStream (" + _slideDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" + ) + + // Reduce each batch of data using reduceByKey which will be further reduced by window + // by ReducedWindowedDStream + val reducedStream = parent.reduceByKey(reduceFunc, partitioner) + + // Persist RDDs to memory by default as these RDDs are going to be reused. + super.persist(StorageLevel.MEMORY_ONLY_SER) + reducedStream.persist(StorageLevel.MEMORY_ONLY_SER) + + def windowDuration: Duration = _windowDuration + + override def dependencies = List(reducedStream) + + override def slideDuration: Duration = _slideDuration + + override val mustCheckpoint = true + + override def parentRememberDuration: Duration = rememberDuration + windowDuration + + override def persist(storageLevel: StorageLevel): DStream[(K,V)] = { + super.persist(storageLevel) + reducedStream.persist(storageLevel) + this + } + + override def checkpoint(interval: Duration): DStream[(K, V)] = { + super.checkpoint(interval) + //reducedStream.checkpoint(interval) + this + } + + override def compute(validTime: Time): Option[RDD[(K, V)]] = { + val reduceF = reduceFunc + val invReduceF = invReduceFunc + + val currentTime = validTime + val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime) + val previousWindow = currentWindow - slideDuration + + logDebug("Window time = " + windowDuration) + logDebug("Slide time = " + slideDuration) + logDebug("ZeroTime = " + zeroTime) + logDebug("Current window = " + currentWindow) + logDebug("Previous window = " + previousWindow) + + // _____________________________ + // | previous window _________|___________________ + // |___________________| current window | --------------> Time + // |_____________________________| + // + // |________ _________| |________ _________| + // | | + // V V + // old RDDs new RDDs + // + + // Get the RDDs of the reduced values in "old time steps" + val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) + logDebug("# old RDDs = " + oldRDDs.size) + + // Get the RDDs of the reduced values in "new time steps" + val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime) + logDebug("# new RDDs = " + newRDDs.size) + + // Get the RDD of the reduced value of the previous window + val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]())) + + // Make the list of RDDs that needs to cogrouped together for reducing their reduced values + val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs + + // Cogroup the reduced RDDs and merge the reduced values + val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner) + //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ + + val numOldValues = oldRDDs.size + val numNewValues = newRDDs.size + + val mergeValues = (seqOfValues: Seq[Seq[V]]) => { + if (seqOfValues.size != 1 + numOldValues + numNewValues) { + throw new Exception("Unexpected number of sequences of reduced values") + } + // Getting reduced values "old time steps" that will be removed from current window + val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head) + // Getting reduced values "new time steps" + val newValues = (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head) + if (seqOfValues(0).isEmpty) { + // If previous window's reduce value does not exist, then at least new values should exist + if (newValues.isEmpty) { + throw new Exception("Neither previous window has value for key, nor new values found. " + + "Are you sure your key class hashes consistently?") + } + // Reduce the new values + newValues.reduce(reduceF) // return + } else { + // Get the previous window's reduced value + var tempValue = seqOfValues(0).head + // If old values exists, then inverse reduce then from previous value + if (!oldValues.isEmpty) { + tempValue = invReduceF(tempValue, oldValues.reduce(reduceF)) + } + // If new values exists, then reduce them with previous value + if (!newValues.isEmpty) { + tempValue = reduceF(tempValue, newValues.reduce(reduceF)) + } + tempValue // return + } + } + + val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues) + + Some(mergedValuesRDD) + } + + +} + + diff --git a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala new file mode 100644 index 0000000000..1f9548bfb8 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala @@ -0,0 +1,27 @@ +package spark.streaming.dstream + +import spark.{RDD, Partitioner} +import spark.SparkContext._ +import spark.streaming.{Duration, DStream, Time} + +private[streaming] +class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( + parent: DStream[(K,V)], + createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiner: (C, C) => C, + partitioner: Partitioner + ) extends DStream [(K,C)] (parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[(K,C)]] = { + parent.getOrCompute(validTime) match { + case Some(rdd) => + Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner)) + case None => None + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala new file mode 100644 index 0000000000..d42027092b --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala @@ -0,0 +1,103 @@ +package spark.streaming.dstream + +import spark.streaming.StreamingContext +import spark.storage.StorageLevel + +import java.io._ +import java.net.Socket + +private[streaming] +class SocketInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + bytesToObjects: InputStream => Iterator[T], + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_) { + + def createReceiver(): NetworkReceiver[T] = { + new SocketReceiver(host, port, bytesToObjects, storageLevel) + } +} + +private[streaming] +class SocketReceiver[T: ClassManifest]( + host: String, + port: Int, + bytesToObjects: InputStream => Iterator[T], + storageLevel: StorageLevel + ) extends NetworkReceiver[T] { + + lazy protected val blockGenerator = new BlockGenerator(storageLevel) + + override def getLocationPreference = None + + protected def onStart() { + logInfo("Connecting to " + host + ":" + port) + val socket = new Socket(host, port) + logInfo("Connected to " + host + ":" + port) + blockGenerator.start() + val iterator = bytesToObjects(socket.getInputStream()) + while(iterator.hasNext) { + val obj = iterator.next + blockGenerator += obj + } + } + + protected def onStop() { + blockGenerator.stop() + } + +} + +private[streaming] +object SocketReceiver { + + /** + * This methods translates the data from an inputstream (say, from a socket) + * to '\n' delimited strings and returns an iterator to access the strings. + */ + def bytesToLines(inputStream: InputStream): Iterator[String] = { + val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")) + + val iterator = new Iterator[String] { + var gotNext = false + var finished = false + var nextValue: String = null + + private def getNext() { + try { + nextValue = dataInputStream.readLine() + if (nextValue == null) { + finished = true + } + } + gotNext = true + } + + override def hasNext: Boolean = { + if (!finished) { + if (!gotNext) { + getNext() + if (finished) { + dataInputStream.close() + } + } + } + !finished + } + + override def next(): String = { + if (finished) { + throw new NoSuchElementException("End of stream") + } + if (!gotNext) { + getNext() + } + gotNext = false + nextValue + } + } + iterator + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala new file mode 100644 index 0000000000..b4506c74aa --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala @@ -0,0 +1,84 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.Partitioner +import spark.SparkContext._ +import spark.storage.StorageLevel +import spark.streaming.{Duration, Time, DStream} + +private[streaming] +class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest]( + parent: DStream[(K, V)], + updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], + partitioner: Partitioner, + preservePartitioning: Boolean + ) extends DStream[(K, S)](parent.ssc) { + + super.persist(StorageLevel.MEMORY_ONLY_SER) + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override val mustCheckpoint = true + + override def compute(validTime: Time): Option[RDD[(K, S)]] = { + + // Try to get the previous state RDD + getOrCompute(validTime - slideDuration) match { + + case Some(prevStateRDD) => { // If previous state RDD exists + + // Try to get the parent RDD + parent.getOrCompute(validTime) match { + case Some(parentRDD) => { // If parent RDD exists, then compute as usual + + // Define the function for the mapPartition operation on cogrouped RDD; + // first map the cogrouped tuple to tuples of required type, + // and then apply the update function + val updateFuncLocal = updateFunc + val finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => { + val i = iterator.map(t => { + (t._1, t._2._1, t._2._2.headOption) + }) + updateFuncLocal(i) + } + val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) + val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) + //logDebug("Generating state RDD for time " + validTime) + return Some(stateRDD) + } + case None => { // If parent RDD does not exist, then return old state RDD + return Some(prevStateRDD) + } + } + } + + case None => { // If previous session RDD does not exist (first input data) + + // Try to get the parent RDD + parent.getOrCompute(validTime) match { + case Some(parentRDD) => { // If parent RDD exists, then compute as usual + + // Define the function for the mapPartition operation on grouped RDD; + // first map the grouped tuple to tuples of required type, + // and then apply the update function + val updateFuncLocal = updateFunc + val finalFunc = (iterator: Iterator[(K, Seq[V])]) => { + updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, None))) + } + + val groupedRDD = parentRDD.groupByKey(partitioner) + val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) + //logDebug("Generating state RDD for time " + validTime + " (first)") + return Some(sessionRDD) + } + case None => { // If parent RDD does not exist, then nothing to do! + //logDebug("Not generating state RDD (no previous state, no parent)") + return None + } + } + } + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala new file mode 100644 index 0000000000..99660d9dee --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala @@ -0,0 +1,19 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.streaming.{Duration, DStream, Time} + +private[streaming] +class TransformedDStream[T: ClassManifest, U: ClassManifest] ( + parent: DStream[T], + transformFunc: (RDD[T], Time) => RDD[U] + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(transformFunc(_, validTime)) + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala new file mode 100644 index 0000000000..00bad5da34 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala @@ -0,0 +1,40 @@ +package spark.streaming.dstream + +import spark.streaming.{Duration, DStream, Time} +import spark.RDD +import collection.mutable.ArrayBuffer +import spark.rdd.UnionRDD + +private[streaming] +class UnionDStream[T: ClassManifest](parents: Array[DStream[T]]) + extends DStream[T](parents.head.ssc) { + + if (parents.length == 0) { + throw new IllegalArgumentException("Empty array of parents") + } + + if (parents.map(_.ssc).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different StreamingContexts") + } + + if (parents.map(_.slideDuration).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different slide times") + } + + override def dependencies = parents.toList + + override def slideDuration: Duration = parents.head.slideDuration + + override def compute(validTime: Time): Option[RDD[T]] = { + val rdds = new ArrayBuffer[RDD[T]]() + parents.map(_.getOrCompute(validTime)).foreach(_ match { + case Some(rdd) => rdds += rdd + case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime) + }) + if (rdds.size > 0) { + Some(new UnionRDD(ssc.sc, rdds)) + } else { + None + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala new file mode 100644 index 0000000000..cbf0c88108 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala @@ -0,0 +1,40 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.rdd.UnionRDD +import spark.storage.StorageLevel +import spark.streaming.{Duration, Interval, Time, DStream} + +private[streaming] +class WindowedDStream[T: ClassManifest]( + parent: DStream[T], + _windowDuration: Duration, + _slideDuration: Duration) + extends DStream[T](parent.ssc) { + + if (!_windowDuration.isMultipleOf(parent.slideDuration)) + throw new Exception("The window duration of WindowedDStream (" + _slideDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + + if (!_slideDuration.isMultipleOf(parent.slideDuration)) + throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + + parent.persist(StorageLevel.MEMORY_ONLY_SER) + + def windowDuration: Duration = _windowDuration + + override def dependencies = List(parent) + + override def slideDuration: Duration = _slideDuration + + override def parentRememberDuration: Duration = rememberDuration + windowDuration + + override def compute(validTime: Time): Option[RDD[T]] = { + val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) + Some(new UnionRDD(ssc.sc, parent.slice(currentWindow))) + } +} + + + diff --git a/streaming/src/main/scala/spark/streaming/util/Clock.scala b/streaming/src/main/scala/spark/streaming/util/Clock.scala new file mode 100644 index 0000000000..974651f9f6 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/util/Clock.scala @@ -0,0 +1,84 @@ +package spark.streaming.util + +private[streaming] +trait Clock { + def currentTime(): Long + def waitTillTime(targetTime: Long): Long +} + +private[streaming] +class SystemClock() extends Clock { + + val minPollTime = 25L + + def currentTime(): Long = { + System.currentTimeMillis() + } + + def waitTillTime(targetTime: Long): Long = { + var currentTime = 0L + currentTime = System.currentTimeMillis() + + var waitTime = targetTime - currentTime + if (waitTime <= 0) { + return currentTime + } + + val pollTime = { + if (waitTime / 10.0 > minPollTime) { + (waitTime / 10.0).toLong + } else { + minPollTime + } + } + + + while (true) { + currentTime = System.currentTimeMillis() + waitTime = targetTime - currentTime + + if (waitTime <= 0) { + + return currentTime + } + val sleepTime = + if (waitTime < pollTime) { + waitTime + } else { + pollTime + } + Thread.sleep(sleepTime) + } + return -1 + } +} + +private[streaming] +class ManualClock() extends Clock { + + var time = 0L + + def currentTime() = time + + def setTime(timeToSet: Long) = { + this.synchronized { + time = timeToSet + this.notifyAll() + } + } + + def addToTime(timeToAdd: Long) = { + this.synchronized { + time += timeToAdd + this.notifyAll() + } + } + def waitTillTime(targetTime: Long): Long = { + this.synchronized { + while (time < targetTime) { + this.wait(100) + } + } + return currentTime() + } +} diff --git a/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala new file mode 100644 index 0000000000..03749d4a94 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala @@ -0,0 +1,98 @@ +package spark.streaming.util + +import spark.SparkContext +import spark.SparkContext._ +import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +import scala.collection.JavaConversions.mapAsScalaMap + +object RawTextHelper { + + /** + * Splits lines and counts the words in them using specialized object-to-long hashmap + * (to avoid boxing-unboxing overhead of Long in java/scala HashMap) + */ + def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = { + val map = new OLMap[String] + var i = 0 + var j = 0 + while (iter.hasNext) { + val s = iter.next() + i = 0 + while (i < s.length) { + j = i + while (j < s.length && s.charAt(j) != ' ') { + j += 1 + } + if (j > i) { + val w = s.substring(i, j) + val c = map.getLong(w) + map.put(w, c + 1) + } + i = j + while (i < s.length && s.charAt(i) == ' ') { + i += 1 + } + } + } + map.toIterator.map{case (k, v) => (k, v)} + } + + /** + * Gets the top k words in terms of word counts. Assumes that each word exists only once + * in the `data` iterator (that is, the counts have been reduced). + */ + def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = { + val taken = new Array[(String, Long)](k) + + var i = 0 + var len = 0 + var done = false + var value: (String, Long) = null + var swap: (String, Long) = null + var count = 0 + + while(data.hasNext) { + value = data.next + if (value != null) { + count += 1 + if (len == 0) { + taken(0) = value + len = 1 + } else if (len < k || value._2 > taken(len - 1)._2) { + if (len < k) { + len += 1 + } + taken(len - 1) = value + i = len - 1 + while(i > 0 && taken(i - 1)._2 < taken(i)._2) { + swap = taken(i) + taken(i) = taken(i-1) + taken(i - 1) = swap + i -= 1 + } + } + } + } + return taken.toIterator + } + + /** + * Warms up the SparkContext in master and slave by running tasks to force JIT kick in + * before real workload starts. + */ + def warmUp(sc: SparkContext) { + for(i <- 0 to 1) { + sc.parallelize(1 to 200000, 1000) + .map(_ % 1331).map(_.toString) + .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10) + .count() + } + } + + def add(v1: Long, v2: Long) = (v1 + v2) + + def subtract(v1: Long, v2: Long) = (v1 - v2) + + def max(v1: Long, v2: Long) = math.max(v1, v2) +} + diff --git a/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala new file mode 100644 index 0000000000..d8b987ec86 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala @@ -0,0 +1,60 @@ +package spark.streaming.util + +import java.nio.ByteBuffer +import spark.util.{RateLimitedOutputStream, IntParam} +import java.net.ServerSocket +import spark.{Logging, KryoSerializer} +import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream +import io.Source +import java.io.IOException + +/** + * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a + * specified rate. Used to feed data into RawInputDStream. + */ +object RawTextSender extends Logging { + def main(args: Array[String]) { + if (args.length != 4) { + System.err.println("Usage: RawTextSender <port> <file> <blockSize> <bytesPerSec>") + System.exit(1) + } + // Parse the arguments using a pattern match + val Array(IntParam(port), file, IntParam(blockSize), IntParam(bytesPerSec)) = args + + // Repeat the input data multiple times to fill in a buffer + val lines = Source.fromFile(file).getLines().toArray + val bufferStream = new FastByteArrayOutputStream(blockSize + 1000) + val ser = new KryoSerializer().newInstance() + val serStream = ser.serializeStream(bufferStream) + var i = 0 + while (bufferStream.position < blockSize) { + serStream.writeObject(lines(i)) + i = (i + 1) % lines.length + } + bufferStream.trim() + val array = bufferStream.array + + val countBuf = ByteBuffer.wrap(new Array[Byte](4)) + countBuf.putInt(array.length) + countBuf.flip() + + val serverSocket = new ServerSocket(port) + logInfo("Listening on port " + port) + + while (true) { + val socket = serverSocket.accept() + logInfo("Got a new connection") + val out = new RateLimitedOutputStream(socket.getOutputStream, bytesPerSec) + try { + while (true) { + out.write(countBuf.array) + out.write(array) + } + } catch { + case e: IOException => + logError("Client disconnected") + socket.close() + } + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala new file mode 100644 index 0000000000..db715cc295 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala @@ -0,0 +1,75 @@ +package spark.streaming.util + +private[streaming] +class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) { + + val minPollTime = 25L + + val pollTime = { + if (period / 10.0 > minPollTime) { + (period / 10.0).toLong + } else { + minPollTime + } + } + + val thread = new Thread() { + override def run() { loop } + } + + var nextTime = 0L + + def start(startTime: Long): Long = { + nextTime = startTime + thread.start() + nextTime + } + + def start(): Long = { + val startTime = (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period + start(startTime) + } + + def restart(originalStartTime: Long): Long = { + val gap = clock.currentTime - originalStartTime + val newStartTime = (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime + start(newStartTime) + } + + def stop() { + thread.interrupt() + } + + def loop() { + try { + while (true) { + clock.waitTillTime(nextTime) + callback(nextTime) + nextTime += period + } + + } catch { + case e: InterruptedException => + } + } +} + +private[streaming] +object RecurringTimer { + + def main(args: Array[String]) { + var lastRecurTime = 0L + val period = 1000 + + def onRecur(time: Long) { + val currentTime = System.currentTimeMillis() + println("" + currentTime + ": " + (currentTime - lastRecurTime)) + lastRecurTime = currentTime + } + val timer = new RecurringTimer(new SystemClock(), period, onRecur) + timer.start() + Thread.sleep(30 * 1000) + timer.stop() + } +} + diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java new file mode 100644 index 0000000000..79d6093429 --- /dev/null +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -0,0 +1,1029 @@ +package spark.streaming; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import scala.Tuple2; +import spark.HashPartitioner; +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; +import spark.api.java.function.*; +import spark.storage.StorageLevel; +import spark.streaming.api.java.JavaDStream; +import spark.streaming.api.java.JavaPairDStream; +import spark.streaming.api.java.JavaStreamingContext; +import spark.streaming.JavaTestUtils; +import spark.streaming.JavaCheckpointTestUtils; +import spark.streaming.dstream.KafkaPartitionKey; + +import java.io.*; +import java.util.*; + +// The test suite itself is Serializable so that anonymous Function implementations can be +// serialized, as an alternative to converting these anonymous classes to static inner classes; +// see http://stackoverflow.com/questions/758570/. +public class JavaAPISuite implements Serializable { + private transient JavaStreamingContext ssc; + + @Before + public void setUp() { + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc.checkpoint("checkpoint", new Duration(1000)); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port"); + } + + @Test + public void testCount() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3,4), + Arrays.asList(3,4,5), + Arrays.asList(3)); + + List<List<Long>> expected = Arrays.asList( + Arrays.asList(4L), + Arrays.asList(3L), + Arrays.asList(1L)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream count = stream.count(); + JavaTestUtils.attachTestOutputStream(count); + List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3); + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testMap() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("hello", "world"), + Arrays.asList("goodnight", "moon")); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(5,5), + Arrays.asList(9,4)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream letterCount = stream.map(new Function<String, Integer>() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaTestUtils.attachTestOutputStream(letterCount); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testWindow() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6,1,2,3), + Arrays.asList(7,8,9,4,5,6), + Arrays.asList(7,8,9)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream windowed = stream.window(new Duration(2000)); + JavaTestUtils.attachTestOutputStream(windowed); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testWindowWithSlideDuration() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9), + Arrays.asList(10,11,12), + Arrays.asList(13,14,15), + Arrays.asList(16,17,18)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1,2,3,4,5,6), + Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12), + Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18), + Arrays.asList(13,14,15,16,17,18)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000)); + JavaTestUtils.attachTestOutputStream(windowed); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testTumble() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9), + Arrays.asList(10,11,12), + Arrays.asList(13,14,15), + Arrays.asList(16,17,18)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1,2,3,4,5,6), + Arrays.asList(7,8,9,10,11,12), + Arrays.asList(13,14,15,16,17,18)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream windowed = stream.tumble(new Duration(2000)); + JavaTestUtils.attachTestOutputStream(windowed); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 6, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testFilter() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List<List<String>> expected = Arrays.asList( + Arrays.asList("giants"), + Arrays.asList("yankees")); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream filtered = stream.filter(new Function<String, Boolean>() { + @Override + public Boolean call(String s) throws Exception { + return s.contains("a"); + } + }); + JavaTestUtils.attachTestOutputStream(filtered); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testGlom() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List<List<List<String>>> expected = Arrays.asList( + Arrays.asList(Arrays.asList("giants", "dodgers")), + Arrays.asList(Arrays.asList("yankees", "red socks"))); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream glommed = stream.glom(); + JavaTestUtils.attachTestOutputStream(glommed); + List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testMapPartitions() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List<List<String>> expected = Arrays.asList( + Arrays.asList("GIANTSDODGERS"), + Arrays.asList("YANKEESRED SOCKS")); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { + @Override + public Iterable<String> call(Iterator<String> in) { + String out = ""; + while (in.hasNext()) { + out = out + in.next().toUpperCase(); + } + return Lists.newArrayList(out); + } + }); + JavaTestUtils.attachTestOutputStream(mapped); + List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + private class IntegerSum extends Function2<Integer, Integer, Integer> { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + } + + private class IntegerDifference extends Function2<Integer, Integer, Integer> { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 - i2; + } + } + + @Test + public void testReduce() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(6), + Arrays.asList(15), + Arrays.asList(24)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream reduced = stream.reduce(new IntegerSum()); + JavaTestUtils.attachTestOutputStream(reduced); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByWindow() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(6), + Arrays.asList(21), + Arrays.asList(39), + Arrays.asList(24)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(), + new IntegerDifference(), new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reducedWindowed); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); + + Assert.assertEquals(expected, result); + } + + @Test + public void testQueueStream() { + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc()); + JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3)); + JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6)); + JavaRDD<Integer> rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9)); + + LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList(); + rdds.add(rdd1); + rdds.add(rdd2); + rdds.add(rdd3); + + JavaDStream<Integer> stream = ssc.queueStream(rdds); + JavaTestUtils.attachTestOutputStream(stream); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); + Assert.assertEquals(expected, result); + } + + @Test + public void testTransform() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(3,4,5), + Arrays.asList(6,7,8), + Arrays.asList(9,10,11)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream transformed = stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { + @Override + public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { + return in.map(new Function<Integer, Integer>() { + @Override + public Integer call(Integer i) throws Exception { + return i + 2; + } + }); + }}); + JavaTestUtils.attachTestOutputStream(transformed); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testFlatMap() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("go", "giants"), + Arrays.asList("boo", "dodgers"), + Arrays.asList("athletics")); + + List<List<String>> expected = Arrays.asList( + Arrays.asList("g","o","g","i","a","n","t","s"), + Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"), + Arrays.asList("a","t","h","l","e","t","i","c","s")); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream flatMapped = stream.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterable<String> call(String x) { + return Lists.newArrayList(x.split("(?!^)")); + } + }); + JavaTestUtils.attachTestOutputStream(flatMapped); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testPairFlatMap() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants"), + Arrays.asList("dodgers"), + Arrays.asList("athletics")); + + List<List<Tuple2<Integer, String>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, String>(6, "g"), + new Tuple2<Integer, String>(6, "i"), + new Tuple2<Integer, String>(6, "a"), + new Tuple2<Integer, String>(6, "n"), + new Tuple2<Integer, String>(6, "t"), + new Tuple2<Integer, String>(6, "s")), + Arrays.asList( + new Tuple2<Integer, String>(7, "d"), + new Tuple2<Integer, String>(7, "o"), + new Tuple2<Integer, String>(7, "d"), + new Tuple2<Integer, String>(7, "g"), + new Tuple2<Integer, String>(7, "e"), + new Tuple2<Integer, String>(7, "r"), + new Tuple2<Integer, String>(7, "s")), + Arrays.asList( + new Tuple2<Integer, String>(9, "a"), + new Tuple2<Integer, String>(9, "t"), + new Tuple2<Integer, String>(9, "h"), + new Tuple2<Integer, String>(9, "l"), + new Tuple2<Integer, String>(9, "e"), + new Tuple2<Integer, String>(9, "t"), + new Tuple2<Integer, String>(9, "i"), + new Tuple2<Integer, String>(9, "c"), + new Tuple2<Integer, String>(9, "s"))); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() { + @Override + public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { + List<Tuple2<Integer, String>> out = Lists.newArrayList(); + for (String letter: in.split("(?!^)")) { + out.add(new Tuple2<Integer, String>(in.length(), letter)); + } + return out; + } + }); + JavaTestUtils.attachTestOutputStream(flatMapped); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testUnion() { + List<List<Integer>> inputData1 = Arrays.asList( + Arrays.asList(1,1), + Arrays.asList(2,2), + Arrays.asList(3,3)); + + List<List<Integer>> inputData2 = Arrays.asList( + Arrays.asList(4,4), + Arrays.asList(5,5), + Arrays.asList(6,6)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1,1,4,4), + Arrays.asList(2,2,5,5), + Arrays.asList(3,3,6,6)); + + JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2); + JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2); + + JavaDStream unioned = stream1.union(stream2); + JavaTestUtils.attachTestOutputStream(unioned); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + /* + * Performs an order-invariant comparison of lists representing two RDD streams. This allows + * us to account for ordering variation within individual RDD's which occurs during windowing. + */ + public static <T extends Comparable> void assertOrderInvariantEquals( + List<List<T>> expected, List<List<T>> actual) { + for (List<T> list: expected) { + Collections.sort(list); + } + for (List<T> list: actual) { + Collections.sort(list); + } + Assert.assertEquals(expected, actual); + } + + + // PairDStream Functions + @Test + public void testPairFilter() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, Integer>("giants", 6)), + Arrays.asList(new Tuple2<String, Integer>("yankees", 7))); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = stream.map( + new PairFunction<String, String, Integer>() { + @Override + public Tuple2 call(String in) throws Exception { + return new Tuple2<String, Integer>(in, in.length()); + } + }); + + JavaPairDStream<String, Integer> filtered = pairStream.filter( + new Function<Tuple2<String, Integer>, Boolean>() { + @Override + public Boolean call(Tuple2<String, Integer> in) throws Exception { + return in._1().contains("a"); + } + }); + JavaTestUtils.attachTestOutputStream(filtered); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("california", "giants"), + new Tuple2<String, String>("new york", "yankees"), + new Tuple2<String, String>("new york", "mets")), + Arrays.asList(new Tuple2<String, String>("california", "sharks"), + new Tuple2<String, String>("california", "ducks"), + new Tuple2<String, String>("new york", "rangers"), + new Tuple2<String, String>("new york", "islanders"))); + + List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( + Arrays.asList( + new Tuple2<String, Integer>("california", 1), + new Tuple2<String, Integer>("california", 3), + new Tuple2<String, Integer>("new york", 4), + new Tuple2<String, Integer>("new york", 1)), + Arrays.asList( + new Tuple2<String, Integer>("california", 5), + new Tuple2<String, Integer>("california", 5), + new Tuple2<String, Integer>("new york", 3), + new Tuple2<String, Integer>("new york", 1))); + + @Test + public void testPairGroupByKey() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, List<String>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")), + new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))), + Arrays.asList( + new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")), + new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders")))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, List<String>> grouped = pairStream.groupByKey(); + JavaTestUtils.attachTestOutputStream(grouped); + List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairReduceByKey() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Integer>("california", 4), + new Tuple2<String, Integer>("new york", 5)), + Arrays.asList( + new Tuple2<String, Integer>("california", 10), + new Tuple2<String, Integer>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(new IntegerSum()); + + JavaTestUtils.attachTestOutputStream(reduced); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCombineByKey() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Integer>("california", 4), + new Tuple2<String, Integer>("new york", 5)), + Arrays.asList( + new Tuple2<String, Integer>("california", 10), + new Tuple2<String, Integer>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey( + new Function<Integer, Integer>() { + @Override + public Integer call(Integer i) throws Exception { + return i; + } + }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); + + JavaTestUtils.attachTestOutputStream(combined); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCountByKey() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, Long>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Long>("california", 2L), + new Tuple2<String, Long>("new york", 2L)), + Arrays.asList( + new Tuple2<String, Long>("california", 2L), + new Tuple2<String, Long>("new york", 2L))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Long> counted = pairStream.countByKey(); + JavaTestUtils.attachTestOutputStream(counted); + List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testGroupByKeyAndWindow() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, List<String>>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")), + new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))), + Arrays.asList(new Tuple2<String, List<String>>("california", + Arrays.asList("sharks", "ducks", "dodgers", "giants")), + new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))), + Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")), + new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders")))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, List<String>> groupWindowed = + pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(groupWindowed); + List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByKeyAndWindow() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, Integer>("california", 4), + new Tuple2<String, Integer>("new york", 5)), + Arrays.asList(new Tuple2<String, Integer>("california", 14), + new Tuple2<String, Integer>("new york", 9)), + Arrays.asList(new Tuple2<String, Integer>("california", 10), + new Tuple2<String, Integer>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> reduceWindowed = + pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reduceWindowed); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testUpdateStateByKey() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, Integer>("california", 4), + new Tuple2<String, Integer>("new york", 5)), + Arrays.asList(new Tuple2<String, Integer>("california", 14), + new Tuple2<String, Integer>("new york", 9)), + Arrays.asList(new Tuple2<String, Integer>("california", 14), + new Tuple2<String, Integer>("new york", 9))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey( + new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>(){ + @Override + public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { + int out = 0; + if (state.isPresent()) { + out = out + state.get(); + } + for (Integer v: values) { + out = out + v; + } + return Optional.of(out); + } + }); + JavaTestUtils.attachTestOutputStream(updated); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByKeyAndWindowWithInverse() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, Integer>("california", 4), + new Tuple2<String, Integer>("new york", 5)), + Arrays.asList(new Tuple2<String, Integer>("california", 14), + new Tuple2<String, Integer>("new york", 9)), + Arrays.asList(new Tuple2<String, Integer>("california", 10), + new Tuple2<String, Integer>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> reduceWindowed = + pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reduceWindowed); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCountByKeyAndWindow() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, Long>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Long>("california", 2L), + new Tuple2<String, Long>("new york", 2L)), + Arrays.asList( + new Tuple2<String, Long>("california", 4L), + new Tuple2<String, Long>("new york", 4L)), + Arrays.asList( + new Tuple2<String, Long>("california", 2L), + new Tuple2<String, Long>("new york", 2L))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Long> counted = + pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(counted); + List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testMapValues() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, String>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "DODGERS"), + new Tuple2<String, String>("california", "GIANTS"), + new Tuple2<String, String>("new york", "YANKEES"), + new Tuple2<String, String>("new york", "METS")), + Arrays.asList(new Tuple2<String, String>("california", "SHARKS"), + new Tuple2<String, String>("california", "DUCKS"), + new Tuple2<String, String>("new york", "RANGERS"), + new Tuple2<String, String>("new york", "ISLANDERS"))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() { + @Override + public String call(String s) throws Exception { + return s.toUpperCase(); + } + }); + + JavaTestUtils.attachTestOutputStream(mapped); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testFlatMapValues() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, String>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers1"), + new Tuple2<String, String>("california", "dodgers2"), + new Tuple2<String, String>("california", "giants1"), + new Tuple2<String, String>("california", "giants2"), + new Tuple2<String, String>("new york", "yankees1"), + new Tuple2<String, String>("new york", "yankees2"), + new Tuple2<String, String>("new york", "mets1"), + new Tuple2<String, String>("new york", "mets2")), + Arrays.asList(new Tuple2<String, String>("california", "sharks1"), + new Tuple2<String, String>("california", "sharks2"), + new Tuple2<String, String>("california", "ducks1"), + new Tuple2<String, String>("california", "ducks2"), + new Tuple2<String, String>("new york", "rangers1"), + new Tuple2<String, String>("new york", "rangers2"), + new Tuple2<String, String>("new york", "islanders1"), + new Tuple2<String, String>("new york", "islanders2"))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + + JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues( + new Function<String, Iterable<String>>() { + @Override + public Iterable<String> call(String in) { + List<String> out = new ArrayList<String>(); + out.add(in + "1"); + out.add(in + "2"); + return out; + } + }); + + JavaTestUtils.attachTestOutputStream(flatMapped); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCoGroup() { + List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("new york", "yankees")), + Arrays.asList(new Tuple2<String, String>("california", "sharks"), + new Tuple2<String, String>("new york", "rangers"))); + + List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "giants"), + new Tuple2<String, String>("new york", "mets")), + Arrays.asList(new Tuple2<String, String>("california", "ducks"), + new Tuple2<String, String>("new york", "islanders"))); + + + List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Tuple2<List<String>, List<String>>>("california", + new Tuple2<List<String>, List<String>>(Arrays.asList("dodgers"), Arrays.asList("giants"))), + new Tuple2<String, Tuple2<List<String>, List<String>>>("new york", + new Tuple2<List<String>, List<String>>(Arrays.asList("yankees"), Arrays.asList("mets")))), + Arrays.asList( + new Tuple2<String, Tuple2<List<String>, List<String>>>("california", + new Tuple2<List<String>, List<String>>(Arrays.asList("sharks"), Arrays.asList("ducks"))), + new Tuple2<String, Tuple2<List<String>, List<String>>>("new york", + new Tuple2<List<String>, List<String>>(Arrays.asList("rangers"), Arrays.asList("islanders"))))); + + + JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2); + JavaTestUtils.attachTestOutputStream(grouped); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testJoin() { + List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("new york", "yankees")), + Arrays.asList(new Tuple2<String, String>("california", "sharks"), + new Tuple2<String, String>("new york", "rangers"))); + + List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "giants"), + new Tuple2<String, String>("new york", "mets")), + Arrays.asList(new Tuple2<String, String>("california", "ducks"), + new Tuple2<String, String>("new york", "islanders"))); + + + List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Tuple2<String, String>>("california", + new Tuple2<String, String>("dodgers", "giants")), + new Tuple2<String, Tuple2<String, String>>("new york", + new Tuple2<String, String>("yankees", "mets"))), + Arrays.asList( + new Tuple2<String, Tuple2<String, String>>("california", + new Tuple2<String, String>("sharks", "ducks")), + new Tuple2<String, Tuple2<String, String>>("new york", + new Tuple2<String, String>("rangers", "islanders")))); + + + JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2); + JavaTestUtils.attachTestOutputStream(joined); + List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCheckpointMasterRecovery() throws InterruptedException { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("this", "is"), + Arrays.asList("a", "test"), + Arrays.asList("counting", "letters")); + + List<List<Integer>> expectedInitial = Arrays.asList( + Arrays.asList(4,2)); + List<List<Integer>> expectedFinal = Arrays.asList( + Arrays.asList(1,4), + Arrays.asList(8,7)); + + + File tempDir = Files.createTempDir(); + ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); + + JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream letterCount = stream.map(new Function<String, Integer>() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaCheckpointTestUtils.attachTestOutputStream(letterCount); + List<List<Integer>> initialResult = JavaTestUtils.runStreams(ssc, 1, 1); + + assertOrderInvariantEquals(expectedInitial, initialResult); + Thread.sleep(1000); + + ssc.stop(); + ssc = new JavaStreamingContext(tempDir.getAbsolutePath()); + ssc.start(); + List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 2); + assertOrderInvariantEquals(expectedFinal, finalResult); + } + + /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD + @Test + public void testCheckpointofIndividualStream() throws InterruptedException { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("this", "is"), + Arrays.asList("a", "test"), + Arrays.asList("counting", "letters")); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(4,2), + Arrays.asList(1,4), + Arrays.asList(8,7)); + + JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream letterCount = stream.map(new Function<String, Integer>() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaCheckpointTestUtils.attachTestOutputStream(letterCount); + + letterCount.checkpoint(new Duration(1000)); + + List<List<Integer>> result1 = JavaCheckpointTestUtils.runStreams(ssc, 3, 3); + assertOrderInvariantEquals(expected, result1); + } + */ + + // Input stream tests. These mostly just test that we can instantiate a given InputStream with + // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the + // InputStream functionality is deferred to the existing Scala tests. + @Test + public void testKafkaStream() { + HashMap<String, Integer> topics = Maps.newHashMap(); + HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap(); + JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics); + JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets); + JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets, + StorageLevel.MEMORY_AND_DISK()); + } + + @Test + public void testNetworkTextStream() { + JavaDStream test = ssc.networkTextStream("localhost", 12345); + } + + @Test + public void testNetworkString() { + class Converter extends Function<InputStream, Iterable<String>> { + public Iterable<String> call(InputStream in) { + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + List<String> out = new ArrayList<String>(); + try { + while (true) { + String line = reader.readLine(); + if (line == null) { break; } + out.add(line); + } + } catch (IOException e) { } + return out; + } + } + + JavaDStream test = ssc.networkStream( + "localhost", + 12345, + new Converter(), + StorageLevel.MEMORY_ONLY()); + } + + @Test + public void testTextFileStream() { + JavaDStream test = ssc.textFileStream("/tmp/foo"); + } + + @Test + public void testRawNetworkStream() { + JavaDStream test = ssc.rawNetworkStream("localhost", 12345); + } + + @Test + public void testFlumeStream() { + JavaDStream test = ssc.flumeStream("localhost", 12345); + } + + @Test + public void testFileStream() { + JavaPairDStream<String, String> foo = + ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo"); + } +} diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala new file mode 100644 index 0000000000..56349837e5 --- /dev/null +++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala @@ -0,0 +1,65 @@ +package spark.streaming + +import collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import java.util.{List => JList} +import spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext} +import spark.streaming._ +import java.util.ArrayList +import collection.JavaConversions._ + +/** Exposes streaming test functionality in a Java-friendly way. */ +trait JavaTestBase extends TestSuiteBase { + + /** + * Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context. + * The stream will be derived from the supplied lists of Java objects. + **/ + def attachTestInputStream[T]( + ssc: JavaStreamingContext, + data: JList[JList[T]], + numPartitions: Int) = { + val seqData = data.map(Seq(_:_*)) + + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions) + ssc.ssc.registerInputStream(dstream) + new JavaDStream[T](dstream) + } + + /** + * Attach a provided stream to it's associated StreamingContext as a + * [[spark.streaming.TestOutputStream]]. + **/ + def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]]( + dstream: JavaDStreamLike[T, This]) = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val ostream = new TestOutputStream(dstream.dstream, + new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]) + dstream.dstream.ssc.registerOutputStream(ostream) + } + + /** + * Process all registered streams for a numBatches batches, failing if + * numExpectedOutput RDD's are not generated. Generated RDD's are collected + * and returned, represented as a list for each batch interval. + */ + def runStreams[V]( + ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { + implicit val cm: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) + val out = new ArrayList[JList[V]]() + res.map(entry => out.append(new ArrayList[V](entry))) + out + } +} + +object JavaTestUtils extends JavaTestBase { + +} + +object JavaCheckpointTestUtils extends JavaTestBase { + override def actuallyWait = true +}
\ No newline at end of file diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties new file mode 100644 index 0000000000..edfa1243fa --- /dev/null +++ b/streaming/src/test/resources/log4j.properties @@ -0,0 +1,11 @@ +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala new file mode 100644 index 0000000000..4a036f0710 --- /dev/null +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -0,0 +1,218 @@ +package spark.streaming + +import spark.streaming.StreamingContext._ +import scala.runtime.RichInt +import util.ManualClock + +class BasicOperationsSuite extends TestSuiteBase { + + override def framework() = "BasicOperationsSuite" + + after { + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + } + + test("map") { + val input = Seq(1 to 4, 5 to 8, 9 to 12) + testOperation( + input, + (r: DStream[Int]) => r.map(_.toString), + input.map(_.map(_.toString)) + ) + } + + test("flatmap") { + val input = Seq(1 to 4, 5 to 8, 9 to 12) + testOperation( + input, + (r: DStream[Int]) => r.flatMap(x => Seq(x, x * 2)), + input.map(_.flatMap(x => Array(x, x * 2))) + ) + } + + test("filter") { + val input = Seq(1 to 4, 5 to 8, 9 to 12) + testOperation( + input, + (r: DStream[Int]) => r.filter(x => (x % 2 == 0)), + input.map(_.filter(x => (x % 2 == 0))) + ) + } + + test("glom") { + assert(numInputPartitions === 2, "Number of input partitions has been changed from 2") + val input = Seq(1 to 4, 5 to 8, 9 to 12) + val output = Seq( + Seq( Seq(1, 2), Seq(3, 4) ), + Seq( Seq(5, 6), Seq(7, 8) ), + Seq( Seq(9, 10), Seq(11, 12) ) + ) + val operation = (r: DStream[Int]) => r.glom().map(_.toSeq) + testOperation(input, operation, output) + } + + test("mapPartitions") { + assert(numInputPartitions === 2, "Number of input partitions has been changed from 2") + val input = Seq(1 to 4, 5 to 8, 9 to 12) + val output = Seq(Seq(3, 7), Seq(11, 15), Seq(19, 23)) + val operation = (r: DStream[Int]) => r.mapPartitions(x => Iterator(x.reduce(_ + _))) + testOperation(input, operation, output, true) + } + + test("groupByKey") { + testOperation( + Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), + (s: DStream[String]) => s.map(x => (x, 1)).groupByKey(), + Seq( Seq(("a", Seq(1, 1)), ("b", Seq(1))), Seq(("", Seq(1, 1))), Seq() ), + true + ) + } + + test("reduceByKey") { + testOperation( + Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), + (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _), + Seq( Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq() ), + true + ) + } + + test("reduce") { + testOperation( + Seq(1 to 4, 5 to 8, 9 to 12), + (s: DStream[Int]) => s.reduce(_ + _), + Seq(Seq(10), Seq(26), Seq(42)) + ) + } + + test("mapValues") { + testOperation( + Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), + (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _).mapValues(_ + 10), + Seq( Seq(("a", 12), ("b", 11)), Seq(("", 12)), Seq() ), + true + ) + } + + test("flatMapValues") { + testOperation( + Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), + (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _).flatMapValues(x => Seq(x, x + 10)), + Seq( Seq(("a", 2), ("a", 12), ("b", 1), ("b", 11)), Seq(("", 2), ("", 12)), Seq() ), + true + ) + } + + test("cogroup") { + val inputData1 = Seq( Seq("a", "a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "a", "b"), Seq("b", ""), Seq(), Seq() ) + val outputData = Seq( + Seq( ("a", (Seq(1, 1), Seq("x", "x"))), ("b", (Seq(1), Seq("x"))) ), + Seq( ("a", (Seq(1), Seq())), ("b", (Seq(), Seq("x"))), ("", (Seq(1), Seq("x"))) ), + Seq( ("", (Seq(1), Seq())) ), + Seq( ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x"))) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + + test("join") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (1, "x")), ("b", (1, "x")) ), + Seq( ("", (1, "x")) ), + Seq( ), + Seq( ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + s1.map(x => (x,1)).join(s2.map(x => (x,"x"))) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + + test("updateStateByKey") { + val inputData = + Seq( + Seq("a"), + Seq("a", "b"), + Seq("a", "b", "c"), + Seq("a", "b"), + Seq("a"), + Seq() + ) + + val outputData = + Seq( + Seq(("a", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 3), ("b", 2), ("c", 1)), + Seq(("a", 4), ("b", 3), ("c", 1)), + Seq(("a", 5), ("b", 3), ("c", 1)), + Seq(("a", 5), ("b", 3), ("c", 1)) + ) + + val updateStateOperation = (s: DStream[String]) => { + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + Some(values.foldLeft(0)(_ + _) + state.getOrElse(0)) + } + s.map(x => (x, 1)).updateStateByKey[Int](updateFunc) + } + + testOperation(inputData, updateStateOperation, outputData, true) + } + + test("forgetting of RDDs - map and window operations") { + assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second") + + val input = (0 until 10).map(x => Seq(x, x + 1)).toSeq + val rememberDuration = Seconds(3) + + assert(input.size === 10, "Number of inputs have changed") + + def operation(s: DStream[Int]): DStream[(Int, Int)] = { + s.map(x => (x % 10, 1)) + .window(Seconds(2), Seconds(1)) + .window(Seconds(4), Seconds(2)) + } + + val ssc = setupStreams(input, operation _) + ssc.remember(rememberDuration) + runStreams[(Int, Int)](ssc, input.size, input.size / 2) + + val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head + val windowedStream1 = windowedStream2.dependencies.head + val mappedStream = windowedStream1.dependencies.head + + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + assert(clock.time === Seconds(10).milliseconds) + + // IDEALLY + // WindowedStream2 should remember till 7 seconds: 10, 8, + // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5 + // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, + + // IN THIS TEST + // WindowedStream2 should remember till 7 seconds: 10, 8, + // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4 + // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2 + + // WindowedStream2 + assert(windowedStream2.generatedRDDs.contains(Time(10000))) + assert(windowedStream2.generatedRDDs.contains(Time(8000))) + assert(!windowedStream2.generatedRDDs.contains(Time(6000))) + + // WindowedStream1 + assert(windowedStream1.generatedRDDs.contains(Time(10000))) + assert(windowedStream1.generatedRDDs.contains(Time(4000))) + assert(!windowedStream1.generatedRDDs.contains(Time(3000))) + + // MappedStream + assert(mappedStream.generatedRDDs.contains(Time(10000))) + assert(mappedStream.generatedRDDs.contains(Time(2000))) + assert(!mappedStream.generatedRDDs.contains(Time(1000))) + } +} diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala new file mode 100644 index 0000000000..563a7d1458 --- /dev/null +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -0,0 +1,210 @@ +package spark.streaming + +import spark.streaming.StreamingContext._ +import java.io.File +import runtime.RichInt +import org.scalatest.BeforeAndAfter +import org.apache.commons.io.FileUtils +import collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import util.{Clock, ManualClock} + +class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { + + before { + FileUtils.deleteDirectory(new File(checkpointDir)) + } + + after { + if (ssc != null) ssc.stop() + FileUtils.deleteDirectory(new File(checkpointDir)) + + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + } + + var ssc: StreamingContext = null + + override def framework = "CheckpointSuite" + + override def batchDuration = Milliseconds(500) + + override def checkpointInterval = batchDuration + + override def actuallyWait = true + + test("basic stream+rdd recovery") { + + assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second") + assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration") + + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + + val stateStreamCheckpointInterval = Seconds(1) + + // this ensure checkpointing occurs at least once + val firstNumBatches = (stateStreamCheckpointInterval / batchDuration) * 2 + val secondNumBatches = firstNumBatches + + // Setup the streams + val input = (1 to 10).map(_ => Seq("a")).toSeq + val operation = (st: DStream[String]) => { + val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { + Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) + } + st.map(x => (x, 1)) + .updateStateByKey[RichInt](updateFunc) + .checkpoint(stateStreamCheckpointInterval) + .map(t => (t._1, t._2.self)) + } + var ssc = setupStreams(input, operation) + var stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head + + // Run till a time such that at least one RDD in the stream should have been checkpointed, + // then check whether some RDD has been checkpointed or not + ssc.start() + runStreamsWithRealDelay(ssc, firstNumBatches) + logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.rdds.mkString(",\n") + "]") + assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before first failure") + stateStream.checkpointData.rdds.foreach { + case (time, data) => { + val file = new File(data.toString) + assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist") + } + } + + // Run till a further time such that previous checkpoint files in the stream would be deleted + // and check whether the earlier checkpoint files are deleted + val checkpointFiles = stateStream.checkpointData.rdds.map(x => new File(x._2.toString)) + runStreamsWithRealDelay(ssc, secondNumBatches) + checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted")) + ssc.stop() + + // Restart stream computation using the checkpoint file and check whether + // checkpointed RDDs have been restored or not + ssc = new StreamingContext(checkpointDir) + stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head + logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]") + assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from first failure") + + + // Run one batch to generate a new checkpoint file and check whether some RDD + // is present in the checkpoint data or not + ssc.start() + runStreamsWithRealDelay(ssc, 1) + assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before second failure") + stateStream.checkpointData.rdds.foreach { + case (time, data) => { + val file = new File(data.toString) + assert(file.exists(), + "Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist") + } + } + ssc.stop() + + // Restart stream computation from the new checkpoint file to see whether that file has + // correct checkpoint data + ssc = new StreamingContext(checkpointDir) + stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head + logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]") + assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure") + + // Adjust manual clock time as if it is being restarted after a delay + System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString) + ssc.start() + runStreamsWithRealDelay(ssc, 4) + ssc.stop() + System.clearProperty("spark.streaming.manualClock.jump") + ssc = null + } + + test("map and reduceByKey") { + testCheckpointedOperation( + Seq( Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq() ), + (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _), + Seq( Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq() ), + 3 + ) + } + + test("reduceByKeyAndWindowInv") { + val n = 10 + val w = 4 + val input = (1 to n).map(_ => Seq("a")).toSeq + val output = Seq(Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 3))) ++ (1 to (n - w + 1)).map(x => Seq(("a", 4))) + val operation = (st: DStream[String]) => { + st.map(x => (x, 1)) + .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration) + .checkpoint(batchDuration * 2) + } + testCheckpointedOperation(input, operation, output, 7) + } + + test("updateStateByKey") { + val input = (1 to 10).map(_ => Seq("a")).toSeq + val output = (1 to 10).map(x => Seq(("a", x))).toSeq + val operation = (st: DStream[String]) => { + val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { + Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) + } + st.map(x => (x, 1)) + .updateStateByKey[RichInt](updateFunc) + .checkpoint(batchDuration * 2) + .map(t => (t._1, t._2.self)) + } + testCheckpointedOperation(input, operation, output, 7) + } + + /** + * Tests a streaming operation under checkpointing, by restart the operation + * from checkpoint file and verifying whether the final output is correct. + * The output is assumed to have come from a reliable queue which an replay + * data as required. + */ + def testCheckpointedOperation[U: ClassManifest, V: ClassManifest]( + input: Seq[Seq[U]], + operation: DStream[U] => DStream[V], + expectedOutput: Seq[Seq[V]], + initialNumBatches: Int + ) { + + // Current code assumes that: + // number of inputs = number of outputs = number of batches to be run + val totalNumBatches = input.size + val nextNumBatches = totalNumBatches - initialNumBatches + val initialNumExpectedOutputs = initialNumBatches + val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs + + // Do the computation for initial number of batches, create checkpoint file and quit + ssc = setupStreams[U, V](input, operation) + val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs) + verifyOutput[V](output, expectedOutput.take(initialNumBatches), true) + Thread.sleep(1000) + + // Restart and complete the computation from checkpoint file + logInfo( + "\n-------------------------------------------\n" + + " Restarting stream computation " + + "\n-------------------------------------------\n" + ) + ssc = new StreamingContext(checkpointDir) + val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs) + verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true) + ssc = null + } + + /** + * Advances the manual clock on the streaming scheduler by given number of batches. + * It also wait for the expected amount of time for each batch. + */ + def runStreamsWithRealDelay(ssc: StreamingContext, numBatches: Long) { + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + logInfo("Manual clock before advancing = " + clock.time) + for (i <- 1 to numBatches.toInt) { + clock.addToTime(batchDuration.milliseconds) + Thread.sleep(batchDuration.milliseconds) + } + logInfo("Manual clock after advancing = " + clock.time) + Thread.sleep(batchDuration.milliseconds) + } + +}
\ No newline at end of file diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala new file mode 100644 index 0000000000..c4cfffbfc1 --- /dev/null +++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala @@ -0,0 +1,191 @@ +package spark.streaming + +import org.scalatest.BeforeAndAfter +import org.apache.commons.io.FileUtils +import java.io.File +import scala.runtime.RichInt +import scala.util.Random +import spark.streaming.StreamingContext._ +import collection.mutable.ArrayBuffer +import spark.Logging + +/** + * This testsuite tests master failures at random times while the stream is running using + * the real clock. + */ +class FailureSuite extends TestSuiteBase with BeforeAndAfter { + + before { + FileUtils.deleteDirectory(new File(checkpointDir)) + } + + after { + FailureSuite.reset() + FileUtils.deleteDirectory(new File(checkpointDir)) + + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + } + + override def framework = "CheckpointSuite" + + override def batchDuration = Milliseconds(500) + + override def checkpointDir = "checkpoint" + + override def checkpointInterval = batchDuration + + test("multiple failures with updateStateByKey") { + val n = 30 + // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ... + val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq + // Last output: [ (a, 465) ] for n=30 + val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) ) + + val operation = (st: DStream[String]) => { + val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { + Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) + } + st.map(x => (x, 1)) + .updateStateByKey[RichInt](updateFunc) + .checkpoint(Seconds(2)) + .map(t => (t._1, t._2.self)) + } + + testOperationWithMultipleFailures(input, operation, lastOutput, n, n) + } + + test("multiple failures with reduceByKeyAndWindow") { + val n = 30 + val w = 100 + assert(w > n, "Window should be much larger than the number of input sets in this test") + // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ... + val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq + // Last output: [ (a, 465) ] + val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) ) + + val operation = (st: DStream[String]) => { + st.map(x => (x, 1)) + .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration) + .checkpoint(Seconds(2)) + } + + testOperationWithMultipleFailures(input, operation, lastOutput, n, n) + } + + + /** + * Tests stream operation with multiple master failures, and verifies whether the + * final set of output values is as expected or not. Checking the final value is + * proof that no intermediate data was lost due to master failures. + */ + def testOperationWithMultipleFailures[U: ClassManifest, V: ClassManifest]( + input: Seq[Seq[U]], + operation: DStream[U] => DStream[V], + lastExpectedOutput: Seq[V], + numBatches: Int, + numExpectedOutput: Int + ) { + var ssc = setupStreams[U, V](input, operation) + val mergedOutput = new ArrayBuffer[Seq[V]]() + + var totalTimeRan = 0L + while(totalTimeRan <= numBatches * batchDuration.milliseconds * 2) { + new KillingThread(ssc, numBatches * batchDuration.milliseconds.toInt / 4).start() + val (output, timeRan) = runStreamsWithRealClock[V](ssc, numBatches, numExpectedOutput) + + mergedOutput ++= output + totalTimeRan += timeRan + logInfo("New output = " + output) + logInfo("Merged output = " + mergedOutput) + logInfo("Total time spent = " + totalTimeRan) + val sleepTime = Random.nextInt(numBatches * batchDuration.milliseconds.toInt / 8) + logInfo( + "\n-------------------------------------------\n" + + " Restarting stream computation in " + sleepTime + " ms " + + "\n-------------------------------------------\n" + ) + Thread.sleep(sleepTime) + FailureSuite.failed = false + ssc = new StreamingContext(checkpointDir) + } + ssc.stop() + ssc = null + + // Verify whether the last output is the expected one + val lastOutput = mergedOutput(mergedOutput.lastIndexWhere(!_.isEmpty)) + assert(lastOutput.toSet === lastExpectedOutput.toSet) + logInfo("Finished computation after " + FailureSuite.failureCount + " failures") + } + + /** + * Runs the streams set up in `ssc` on real clock until the expected max number of + */ + def runStreamsWithRealClock[V: ClassManifest]( + ssc: StreamingContext, + numBatches: Int, + maxExpectedOutput: Int + ): (Seq[Seq[V]], Long) = { + + System.clearProperty("spark.streaming.clock") + + assert(numBatches > 0, "Number of batches to run stream computation is zero") + assert(maxExpectedOutput > 0, "Max expected outputs after " + numBatches + " is zero") + logInfo("numBatches = " + numBatches + ", maxExpectedOutput = " + maxExpectedOutput) + + // Get the output buffer + val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] + val output = outputStream.output + val waitTime = (batchDuration.milliseconds * (numBatches.toDouble + 0.5)).toLong + val startTime = System.currentTimeMillis() + + try { + // Start computation + ssc.start() + + // Wait until expected number of output items have been generated + while (output.size < maxExpectedOutput && System.currentTimeMillis() - startTime < waitTime && !FailureSuite.failed) { + logInfo("output.size = " + output.size + ", maxExpectedOutput = " + maxExpectedOutput) + Thread.sleep(100) + } + } catch { + case e: Exception => logInfo("Exception while running streams: " + e) + } finally { + ssc.stop() + } + val timeTaken = System.currentTimeMillis() - startTime + logInfo("" + output.size + " sets of output generated in " + timeTaken + " ms") + (output, timeTaken) + } + + +} + +object FailureSuite { + var failed = false + var failureCount = 0 + + def reset() { + failed = false + failureCount = 0 + } +} + +class KillingThread(ssc: StreamingContext, maxKillWaitTime: Int) extends Thread with Logging { + initLogging() + + override def run() { + var minKillWaitTime = if (FailureSuite.failureCount == 0) 3000 else 1000 // to allow the first checkpoint + val killWaitTime = minKillWaitTime + Random.nextInt(maxKillWaitTime) + logInfo("Kill wait time = " + killWaitTime) + Thread.sleep(killWaitTime.toLong) + logInfo( + "\n---------------------------------------\n" + + "Killing streaming context after " + killWaitTime + " ms" + + "\n---------------------------------------\n" + ) + if (ssc != null) ssc.stop() + FailureSuite.failed = true + FailureSuite.failureCount += 1 + } +} diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala new file mode 100644 index 0000000000..70ae6e3934 --- /dev/null +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -0,0 +1,355 @@ +package spark.streaming + +import dstream.SparkFlumeEvent +import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} +import java.io.{File, BufferedWriter, OutputStreamWriter} +import java.util.concurrent.{TimeUnit, ArrayBlockingQueue} +import collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import util.ManualClock +import spark.storage.StorageLevel +import spark.Logging +import scala.util.Random +import org.apache.commons.io.FileUtils +import org.scalatest.BeforeAndAfter +import org.apache.flume.source.avro.AvroSourceProtocol +import org.apache.flume.source.avro.AvroFlumeEvent +import org.apache.flume.source.avro.Status +import org.apache.avro.ipc.{specific, NettyTransceiver} +import org.apache.avro.ipc.specific.SpecificRequestor +import java.nio.ByteBuffer +import collection.JavaConversions._ +import java.nio.charset.Charset + +class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { + + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + + val testPort = 9999 + var testServer: TestServer = null + var testDir: File = null + + override def checkpointDir = "checkpoint" + + after { + FileUtils.deleteDirectory(new File(checkpointDir)) + if (testServer != null) { + testServer.stop() + testServer = null + } + if (testDir != null && testDir.exists()) { + FileUtils.deleteDirectory(testDir) + testDir = null + } + + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + } + + test("network input stream") { + // Start the server + testServer = new TestServer(testPort) + testServer.start() + + // Set up the streaming context and input streams + val ssc = new StreamingContext(master, framework, batchDuration) + val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] + val outputStream = new TestOutputStream(networkStream, outputBuffer) + def output = outputBuffer.flatMap(x => x) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Feed data to the server to send to the network receiver + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = Seq(1, 2, 3, 4, 5) + val expectedOutput = input.map(_.toString) + Thread.sleep(1000) + for (i <- 0 until input.size) { + testServer.send(input(i).toString + "\n") + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(1000) + logInfo("Stopping server") + testServer.stop() + logInfo("Stopping context") + ssc.stop() + + // Verify whether data received was as expected + logInfo("--------------------------------") + logInfo("output.size = " + outputBuffer.size) + logInfo("output") + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + // (whether the elements were received one in each interval is not verified) + assert(output.size === expectedOutput.size) + for (i <- 0 until output.size) { + assert(output(i) === expectedOutput(i)) + } + } + + test("network input stream with checkpoint") { + // Start the server + testServer = new TestServer(testPort) + testServer.start() + + // Set up the streaming context and input streams + var ssc = new StreamingContext(master, framework, batchDuration) + ssc.checkpoint(checkpointDir, checkpointInterval) + val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) + var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]]) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Feed data to the server to send to the network receiver + var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + for (i <- Seq(1, 2, 3)) { + testServer.send(i.toString + "\n") + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(500) + assert(outputStream.output.size > 0) + ssc.stop() + + // Restart stream computation from checkpoint and feed more data to see whether + // they are being received and processed + logInfo("*********** RESTARTING ************") + ssc = new StreamingContext(checkpointDir) + ssc.start() + clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + for (i <- Seq(4, 5, 6)) { + testServer.send(i.toString + "\n") + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(500) + outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]] + assert(outputStream.output.size > 0) + ssc.stop() + } + + test("flume input stream") { + // Set up the streaming context and input streams + val ssc = new StreamingContext(master, framework, batchDuration) + val flumeStream = ssc.flumeStream("localhost", 33333, StorageLevel.MEMORY_AND_DISK) + val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] + with SynchronizedBuffer[Seq[SparkFlumeEvent]] + val outputStream = new TestOutputStream(flumeStream, outputBuffer) + ssc.registerOutputStream(outputStream) + ssc.start() + + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = Seq(1, 2, 3, 4, 5) + + val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 33333)); + val client = SpecificRequestor.getClient( + classOf[AvroSourceProtocol], transceiver); + + for (i <- 0 until input.size) { + val event = new AvroFlumeEvent + event.setBody(ByteBuffer.wrap(input(i).toString.getBytes())) + event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) + client.append(event) + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + } + + val startTime = System.currentTimeMillis() + while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size) + Thread.sleep(100) + } + Thread.sleep(1000) + val timeTaken = System.currentTimeMillis() - startTime + assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") + logInfo("Stopping context") + ssc.stop() + + val decoder = Charset.forName("UTF-8").newDecoder() + + assert(outputBuffer.size === input.length) + for (i <- 0 until outputBuffer.size) { + assert(outputBuffer(i).size === 1) + val str = decoder.decode(outputBuffer(i).head.event.getBody) + assert(str.toString === input(i).toString) + assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") + } + } + + test("file input stream") { + + // Create a temporary directory + testDir = { + var temp = File.createTempFile(".temp.", Random.nextInt().toString) + temp.delete() + temp.mkdirs() + logInfo("Created temp dir " + temp) + temp + } + + // Set up the streaming context and input streams + val ssc = new StreamingContext(master, framework, batchDuration) + val filestream = ssc.textFileStream(testDir.toString) + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + def output = outputBuffer.flatMap(x => x) + val outputStream = new TestOutputStream(filestream, outputBuffer) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Create files in the temporary directory so that Spark Streaming can read data from it + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = Seq(1, 2, 3, 4, 5) + val expectedOutput = input.map(_.toString) + Thread.sleep(1000) + for (i <- 0 until input.size) { + FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n") + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + //Thread.sleep(100) + } + val startTime = System.currentTimeMillis() + /*while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + logInfo("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size) + Thread.sleep(100) + }*/ + Thread.sleep(1000) + val timeTaken = System.currentTimeMillis() - startTime + assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") + logInfo("Stopping context") + ssc.stop() + + // Verify whether data received by Spark Streaming was as expected + logInfo("--------------------------------") + logInfo("output.size = " + outputBuffer.size) + logInfo("output") + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + // (whether the elements were received one in each interval is not verified) + assert(output.size === expectedOutput.size) + for (i <- 0 until output.size) { + assert(output(i).size === 1) + assert(output(i).head.toString === expectedOutput(i)) + } + } + + test("file input stream with checkpoint") { + // Create a temporary directory + testDir = { + var temp = File.createTempFile(".temp.", Random.nextInt().toString) + temp.delete() + temp.mkdirs() + logInfo("Created temp dir " + temp) + temp + } + + // Set up the streaming context and input streams + var ssc = new StreamingContext(master, framework, batchDuration) + ssc.checkpoint(checkpointDir, checkpointInterval) + val filestream = ssc.textFileStream(testDir.toString) + var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]]) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Create files and advance manual clock to process them + var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + Thread.sleep(1000) + for (i <- Seq(1, 2, 3)) { + FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(500) + logInfo("Output = " + outputStream.output.mkString(",")) + assert(outputStream.output.size > 0) + ssc.stop() + + // Restart stream computation from checkpoint and create more files to see whether + // they are being processed + logInfo("*********** RESTARTING ************") + ssc = new StreamingContext(checkpointDir) + ssc.start() + clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + Thread.sleep(500) + for (i <- Seq(4, 5, 6)) { + FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(500) + outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]] + logInfo("Output = " + outputStream.output.mkString(",")) + assert(outputStream.output.size > 0) + ssc.stop() + } +} + + +class TestServer(port: Int) extends Logging { + + val queue = new ArrayBlockingQueue[String](100) + + val serverSocket = new ServerSocket(port) + + val servingThread = new Thread() { + override def run() { + try { + while(true) { + logInfo("Accepting connections on port " + port) + val clientSocket = serverSocket.accept() + logInfo("New connection") + try { + clientSocket.setTcpNoDelay(true) + val outputStream = new BufferedWriter(new OutputStreamWriter(clientSocket.getOutputStream)) + + while(clientSocket.isConnected) { + val msg = queue.poll(100, TimeUnit.MILLISECONDS) + if (msg != null) { + outputStream.write(msg) + outputStream.flush() + logInfo("Message '" + msg + "' sent") + } + } + } catch { + case e: SocketException => logError("TestServer error", e) + } finally { + logInfo("Connection closed") + if (!clientSocket.isClosed) clientSocket.close() + } + } + } catch { + case ie: InterruptedException => + + } finally { + serverSocket.close() + } + } + } + + def start() { servingThread.start() } + + def send(msg: String) { queue.add(msg) } + + def stop() { servingThread.interrupt() } +} + +object TestServer { + def main(args: Array[String]) { + val s = new TestServer(9999) + s.start() + while(true) { + Thread.sleep(1000) + s.send("hello") + } + } +} diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala new file mode 100644 index 0000000000..49129f3964 --- /dev/null +++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala @@ -0,0 +1,291 @@ +package spark.streaming + +import spark.streaming.dstream.{InputDStream, ForEachDStream} +import spark.streaming.util.ManualClock + +import spark.{RDD, Logging} + +import collection.mutable.ArrayBuffer +import collection.mutable.SynchronizedBuffer + +import java.io.{ObjectInputStream, IOException} + +import org.scalatest.{BeforeAndAfter, FunSuite} + +/** + * This is a input stream just for the testsuites. This is equivalent to a checkpointable, + * replayable, reliable message queue like Kafka. It requires a sequence as input, and + * returns the i_th element at the i_th batch unde manual clock. + */ +class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int) + extends InputDStream[T](ssc_) { + + def start() {} + + def stop() {} + + def compute(validTime: Time): Option[RDD[T]] = { + logInfo("Computing RDD for time " + validTime) + val index = ((validTime - zeroTime) / slideDuration - 1).toInt + val selectedInput = if (index < input.size) input(index) else Seq[T]() + val rdd = ssc.sc.makeRDD(selectedInput, numPartitions) + logInfo("Created RDD " + rdd.id + " with " + selectedInput) + Some(rdd) + } +} + +/** + * This is a output stream just for the testsuites. All the output is collected into a + * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. + */ +class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]]) + extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { + val collected = rdd.collect() + output += collected + }) { + + // This is to clear the output buffer every it is read from a checkpoint + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + ois.defaultReadObject() + output.clear() + } +} + +/** + * This is the base trait for Spark Streaming testsuites. This provides basic functionality + * to run user-defined set of input on user-defined stream operations, and verify the output. + */ +trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { + + def framework = "TestSuiteBase" + + def master = "local[2]" + + def batchDuration = Seconds(1) + + def checkpointDir = "checkpoint" + + def checkpointInterval = batchDuration + + def numInputPartitions = 2 + + def maxWaitTimeMillis = 10000 + + def actuallyWait = false + + /** + * Set up required DStreams to test the DStream operation using the two sequences + * of input collections. + */ + def setupStreams[U: ClassManifest, V: ClassManifest]( + input: Seq[Seq[U]], + operation: DStream[U] => DStream[V] + ): StreamingContext = { + + // Create StreamingContext + val ssc = new StreamingContext(master, framework, batchDuration) + if (checkpointDir != null) { + ssc.checkpoint(checkpointDir, checkpointInterval) + } + + // Setup the stream computation + val inputStream = new TestInputStream(ssc, input, numInputPartitions) + val operatedStream = operation(inputStream) + val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[V]] with SynchronizedBuffer[Seq[V]]) + ssc.registerInputStream(inputStream) + ssc.registerOutputStream(outputStream) + ssc + } + + /** + * Set up required DStreams to test the binary operation using the sequence + * of input collections. + */ + def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest]( + input1: Seq[Seq[U]], + input2: Seq[Seq[V]], + operation: (DStream[U], DStream[V]) => DStream[W] + ): StreamingContext = { + + // Create StreamingContext + val ssc = new StreamingContext(master, framework, batchDuration) + if (checkpointDir != null) { + ssc.checkpoint(checkpointDir, checkpointInterval) + } + + // Setup the stream computation + val inputStream1 = new TestInputStream(ssc, input1, numInputPartitions) + val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions) + val operatedStream = operation(inputStream1, inputStream2) + val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[W]] with SynchronizedBuffer[Seq[W]]) + ssc.registerInputStream(inputStream1) + ssc.registerInputStream(inputStream2) + ssc.registerOutputStream(outputStream) + ssc + } + + /** + * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and + * returns the collected output. It will wait until `numExpectedOutput` number of + * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached. + */ + def runStreams[V: ClassManifest]( + ssc: StreamingContext, + numBatches: Int, + numExpectedOutput: Int + ): Seq[Seq[V]] = { + + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + + assert(numBatches > 0, "Number of batches to run stream computation is zero") + assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero") + logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput) + + // Get the output buffer + val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] + val output = outputStream.output + + try { + // Start computation + ssc.start() + + // Advance manual clock + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + logInfo("Manual clock before advancing = " + clock.time) + if (actuallyWait) { + for (i <- 1 to numBatches) { + logInfo("Actually waiting for " + batchDuration) + clock.addToTime(batchDuration.milliseconds) + Thread.sleep(batchDuration.milliseconds) + } + } else { + clock.addToTime(numBatches * batchDuration.milliseconds) + } + logInfo("Manual clock after advancing = " + clock.time) + + // Wait until expected number of output items have been generated + val startTime = System.currentTimeMillis() + while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput) + Thread.sleep(100) + } + val timeTaken = System.currentTimeMillis() - startTime + + assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") + assert(output.size === numExpectedOutput, "Unexpected number of outputs generated") + + Thread.sleep(500) // Give some time for the forgetting old RDDs to complete + } catch { + case e: Exception => e.printStackTrace(); throw e; + } finally { + ssc.stop() + } + + output + } + + /** + * Verify whether the output values after running a DStream operation + * is same as the expected output values, by comparing the output + * collections either as lists (order matters) or sets (order does not matter) + */ + def verifyOutput[V: ClassManifest]( + output: Seq[Seq[V]], + expectedOutput: Seq[Seq[V]], + useSet: Boolean + ) { + logInfo("--------------------------------") + logInfo("output.size = " + output.size) + logInfo("output") + output.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Match the output with the expected output + assert(output.size === expectedOutput.size, "Number of outputs do not match") + for (i <- 0 until output.size) { + if (useSet) { + assert(output(i).toSet === expectedOutput(i).toSet) + } else { + assert(output(i).toList === expectedOutput(i).toList) + } + } + logInfo("Output verified successfully") + } + + /** + * Test unary DStream operation with a list of inputs, with number of + * batches to run same as the number of expected output values + */ + def testOperation[U: ClassManifest, V: ClassManifest]( + input: Seq[Seq[U]], + operation: DStream[U] => DStream[V], + expectedOutput: Seq[Seq[V]], + useSet: Boolean = false + ) { + testOperation[U, V](input, operation, expectedOutput, -1, useSet) + } + + /** + * Test unary DStream operation with a list of inputs + * @param input Sequence of input collections + * @param operation Binary DStream operation to be applied to the 2 inputs + * @param expectedOutput Sequence of expected output collections + * @param numBatches Number of batches to run the operation for + * @param useSet Compare the output values with the expected output values + * as sets (order matters) or as lists (order does not matter) + */ + def testOperation[U: ClassManifest, V: ClassManifest]( + input: Seq[Seq[U]], + operation: DStream[U] => DStream[V], + expectedOutput: Seq[Seq[V]], + numBatches: Int, + useSet: Boolean + ) { + val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size + val ssc = setupStreams[U, V](input, operation) + val output = runStreams[V](ssc, numBatches_, expectedOutput.size) + verifyOutput[V](output, expectedOutput, useSet) + } + + /** + * Test binary DStream operation with two lists of inputs, with number of + * batches to run same as the number of expected output values + */ + def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest]( + input1: Seq[Seq[U]], + input2: Seq[Seq[V]], + operation: (DStream[U], DStream[V]) => DStream[W], + expectedOutput: Seq[Seq[W]], + useSet: Boolean + ) { + testOperation[U, V, W](input1, input2, operation, expectedOutput, -1, useSet) + } + + /** + * Test binary DStream operation with two lists of inputs + * @param input1 First sequence of input collections + * @param input2 Second sequence of input collections + * @param operation Binary DStream operation to be applied to the 2 inputs + * @param expectedOutput Sequence of expected output collections + * @param numBatches Number of batches to run the operation for + * @param useSet Compare the output values with the expected output values + * as sets (order matters) or as lists (order does not matter) + */ + def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest]( + input1: Seq[Seq[U]], + input2: Seq[Seq[V]], + operation: (DStream[U], DStream[V]) => DStream[W], + expectedOutput: Seq[Seq[W]], + numBatches: Int, + useSet: Boolean + ) { + val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size + val ssc = setupStreams[U, V, W](input1, input2, operation) + val output = runStreams[W](ssc, numBatches_, expectedOutput.size) + verifyOutput[W](output, expectedOutput, useSet) + } +} diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala new file mode 100644 index 0000000000..cd9608df53 --- /dev/null +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -0,0 +1,300 @@ +package spark.streaming + +import spark.streaming.StreamingContext._ +import collection.mutable.ArrayBuffer + +class WindowOperationsSuite extends TestSuiteBase { + + override def framework = "WindowOperationsSuite" + + override def maxWaitTimeMillis = 20000 + + override def batchDuration = Seconds(1) + + after { + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + } + + val largerSlideInput = Seq( + Seq(("a", 1)), + Seq(("a", 2)), // 1st window from here + Seq(("a", 3)), + Seq(("a", 4)), // 2nd window from here + Seq(("a", 5)), + Seq(("a", 6)), // 3rd window from here + Seq(), + Seq() // 4th window from here + ) + + val largerSlideReduceOutput = Seq( + Seq(("a", 3)), + Seq(("a", 10)), + Seq(("a", 18)), + Seq(("a", 11)) + ) + + + val bigInput = Seq( + Seq(("a", 1)), + Seq(("a", 1), ("b", 1)), + Seq(("a", 1), ("b", 1), ("c", 1)), + Seq(("a", 1), ("b", 1)), + Seq(("a", 1)), + Seq(), + Seq(("a", 1)), + Seq(("a", 1), ("b", 1)), + Seq(("a", 1), ("b", 1), ("c", 1)), + Seq(("a", 1), ("b", 1)), + Seq(("a", 1)), + Seq() + ) + + val bigGroupByOutput = Seq( + Seq(("a", Seq(1))), + Seq(("a", Seq(1, 1)), ("b", Seq(1))), + Seq(("a", Seq(1, 1)), ("b", Seq(1, 1)), ("c", Seq(1))), + Seq(("a", Seq(1, 1)), ("b", Seq(1, 1)), ("c", Seq(1))), + Seq(("a", Seq(1, 1)), ("b", Seq(1))), + Seq(("a", Seq(1))), + Seq(("a", Seq(1))), + Seq(("a", Seq(1, 1)), ("b", Seq(1))), + Seq(("a", Seq(1, 1)), ("b", Seq(1, 1)), ("c", Seq(1))), + Seq(("a", Seq(1, 1)), ("b", Seq(1, 1)), ("c", Seq(1))), + Seq(("a", Seq(1, 1)), ("b", Seq(1))), + Seq(("a", Seq(1))) + ) + + + val bigReduceOutput = Seq( + Seq(("a", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 1)), + Seq(("a", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 1)) + ) + + /* + The output of the reduceByKeyAndWindow with inverse reduce function is + different from the naive reduceByKeyAndWindow. Even if the count of a + particular key is 0, the key does not get eliminated from the RDDs of + ReducedWindowedDStream. This causes the number of keys in these RDDs to + increase forever. A more generalized version that allows elimination of + keys should be considered. + */ + + val bigReduceInvOutput = Seq( + Seq(("a", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 1), ("c", 0)), + Seq(("a", 1), ("b", 0), ("c", 0)), + Seq(("a", 1), ("b", 0), ("c", 0)), + Seq(("a", 2), ("b", 1), ("c", 0)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 1), ("c", 0)), + Seq(("a", 1), ("b", 0), ("c", 0)) + ) + + // Testing window operation + + testWindow( + "basic window", + Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)), + Seq( Seq(0), Seq(0, 1), Seq(1, 2), Seq(2, 3), Seq(3, 4), Seq(4, 5)) + ) + + testWindow( + "tumbling window", + Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)), + Seq( Seq(0, 1), Seq(2, 3), Seq(4, 5)), + Seconds(2), + Seconds(2) + ) + + testWindow( + "larger window", + Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)), + Seq( Seq(0, 1), Seq(0, 1, 2, 3), Seq(2, 3, 4, 5), Seq(4, 5)), + Seconds(4), + Seconds(2) + ) + + testWindow( + "non-overlapping window", + Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)), + Seq( Seq(1, 2), Seq(4, 5)), + Seconds(2), + Seconds(3) + ) + + // Testing naive reduceByKeyAndWindow (without invertible function) + + testReduceByKeyAndWindow( + "basic reduction", + Seq( Seq(("a", 1), ("a", 3)) ), + Seq( Seq(("a", 4)) ) + ) + + testReduceByKeyAndWindow( + "key already in window and new value added into window", + Seq( Seq(("a", 1)), Seq(("a", 1)) ), + Seq( Seq(("a", 1)), Seq(("a", 2)) ) + ) + + testReduceByKeyAndWindow( + "new key added into window", + Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ), + Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) ) + ) + + testReduceByKeyAndWindow( + "key removed from window", + Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ), + Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq() ) + ) + + testReduceByKeyAndWindow( + "larger slide time", + largerSlideInput, + largerSlideReduceOutput, + Seconds(4), + Seconds(2) + ) + + testReduceByKeyAndWindow("big test", bigInput, bigReduceOutput) + + // Testing reduceByKeyAndWindow (with invertible reduce function) + + testReduceByKeyAndWindowInv( + "basic reduction", + Seq(Seq(("a", 1), ("a", 3)) ), + Seq(Seq(("a", 4)) ) + ) + + testReduceByKeyAndWindowInv( + "key already in window and new value added into window", + Seq( Seq(("a", 1)), Seq(("a", 1)) ), + Seq( Seq(("a", 1)), Seq(("a", 2)) ) + ) + + testReduceByKeyAndWindowInv( + "new key added into window", + Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ), + Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) ) + ) + + testReduceByKeyAndWindowInv( + "key removed from window", + Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ), + Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) ) + ) + + testReduceByKeyAndWindowInv( + "larger slide time", + largerSlideInput, + largerSlideReduceOutput, + Seconds(4), + Seconds(2) + ) + + testReduceByKeyAndWindowInv("big test", bigInput, bigReduceInvOutput) + + test("groupByKeyAndWindow") { + val input = bigInput + val expectedOutput = bigGroupByOutput.map(_.map(x => (x._1, x._2.toSet))) + val windowDuration = Seconds(2) + val slideDuration = Seconds(1) + val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt + val operation = (s: DStream[(String, Int)]) => { + s.groupByKeyAndWindow(windowDuration, slideDuration) + .map(x => (x._1, x._2.toSet)) + .persist() + } + testOperation(input, operation, expectedOutput, numBatches, true) + } + + test("countByWindow") { + val input = Seq(Seq(1), Seq(1), Seq(1, 2), Seq(0), Seq(), Seq() ) + val expectedOutput = Seq( Seq(1), Seq(2), Seq(3), Seq(3), Seq(1), Seq(0)) + val windowDuration = Seconds(2) + val slideDuration = Seconds(1) + val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt + val operation = (s: DStream[Int]) => { + s.countByWindow(windowDuration, slideDuration).map(_.toInt) + } + testOperation(input, operation, expectedOutput, numBatches, true) + } + + test("countByKeyAndWindow") { + val input = Seq(Seq(("a", 1)), Seq(("b", 1), ("b", 2)), Seq(("a", 10), ("b", 20))) + val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3))) + val windowDuration = Seconds(2) + val slideDuration = Seconds(1) + val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt + val operation = (s: DStream[(String, Int)]) => { + s.countByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toInt)) + } + testOperation(input, operation, expectedOutput, numBatches, true) + } + + + // Helper functions + + def testWindow( + name: String, + input: Seq[Seq[Int]], + expectedOutput: Seq[Seq[Int]], + windowDuration: Duration = Seconds(2), + slideDuration: Duration = Seconds(1) + ) { + test("window - " + name) { + val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt + val operation = (s: DStream[Int]) => s.window(windowDuration, slideDuration) + testOperation(input, operation, expectedOutput, numBatches, true) + } + } + + def testReduceByKeyAndWindow( + name: String, + input: Seq[Seq[(String, Int)]], + expectedOutput: Seq[Seq[(String, Int)]], + windowDuration: Duration = Seconds(2), + slideDuration: Duration = Seconds(1) + ) { + test("reduceByKeyAndWindow - " + name) { + val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt + val operation = (s: DStream[(String, Int)]) => { + s.reduceByKeyAndWindow(_ + _, windowDuration, slideDuration).persist() + } + testOperation(input, operation, expectedOutput, numBatches, true) + } + } + + def testReduceByKeyAndWindowInv( + name: String, + input: Seq[Seq[(String, Int)]], + expectedOutput: Seq[Seq[(String, Int)]], + windowDuration: Duration = Seconds(2), + slideDuration: Duration = Seconds(1) + ) { + test("reduceByKeyAndWindowInv - " + name) { + val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt + val operation = (s: DStream[(String, Int)]) => { + s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration) + .persist() + .checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing + } + testOperation(input, operation, expectedOutput, numBatches, true) + } + } +} |