aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-18 13:26:12 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-18 13:26:12 -0800
commit6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1 (patch)
tree3848e9e09a2c8b7537f4a0635ea0a32daee1f9a8 /streaming
parent56b9bd197c522f33e354c2e9ad7e76440cf817e9 (diff)
parent8ad561dc7d6475d7b217ec3f57bac3b584fed31a (diff)
downloadspark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.gz
spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.bz2
spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.zip
Merge branch 'streaming' into ScrapCode-streaming
Conflicts: streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
Diffstat (limited to 'streaming')
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar (renamed from streaming/lib/kafka-0.7.2.jar)bin1358063 -> 1358063 bytes
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md51
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha11
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom9
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md51
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha11
-rw-r--r--streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml12
-rw-r--r--streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md51
-rw-r--r--streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha11
-rw-r--r--streaming/pom.xml144
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala70
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala235
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala93
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala58
-rw-r--r--streaming/src/main/scala/spark/streaming/Duration.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/Interval.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala44
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala9
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala154
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala102
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala94
-rw-r--r--streaming/src/main/scala/spark/streaming/Time.scala13
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala27
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala87
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala122
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala41
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala140
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala36
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala101
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala15
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala30
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala12
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala72
-rw-r--r--streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala392
-rw-r--r--streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala30
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java (renamed from streaming/src/test/java/JavaAPISuite.java)155
-rw-r--r--streaming/src/test/java/spark/streaming/JavaTestUtils.scala (renamed from streaming/src/test/java/JavaTestUtils.scala)1
-rw-r--r--streaming/src/test/resources/log4j.properties1
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala91
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala199
-rw-r--r--streaming/src/test/scala/spark/streaming/FailureSuite.scala186
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala158
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala28
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala67
45 files changed, 2015 insertions, 1027 deletions
diff --git a/streaming/lib/kafka-0.7.2.jar b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar
index 65f79925a4..65f79925a4 100644
--- a/streaming/lib/kafka-0.7.2.jar
+++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar
Binary files differ
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5
new file mode 100644
index 0000000000..29f45f4adb
--- /dev/null
+++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5
@@ -0,0 +1 @@
+18876b8bc2e4cef28b6d191aa49d963f \ No newline at end of file
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1
new file mode 100644
index 0000000000..e3bd62bac0
--- /dev/null
+++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1
@@ -0,0 +1 @@
+06b27270ffa52250a2c08703b397c99127b72060 \ No newline at end of file
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom
new file mode 100644
index 0000000000..082d35726a
--- /dev/null
+++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka</artifactId>
+ <version>0.7.2-spark</version>
+ <description>POM was created from install:install-file</description>
+</project>
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5
new file mode 100644
index 0000000000..92c4132b5b
--- /dev/null
+++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5
@@ -0,0 +1 @@
+7bc4322266e6032bdf9ef6eebdd8097d \ No newline at end of file
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1
new file mode 100644
index 0000000000..8a1d8a097a
--- /dev/null
+++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1
@@ -0,0 +1 @@
+d0f79e8eff0db43ca7bcf7dce2c8cd2972685c9d \ No newline at end of file
diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml
new file mode 100644
index 0000000000..720cd51c2f
--- /dev/null
+++ b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml
@@ -0,0 +1,12 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<metadata>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka</artifactId>
+ <versioning>
+ <release>0.7.2-spark</release>
+ <versions>
+ <version>0.7.2-spark</version>
+ </versions>
+ <lastUpdated>20130121015225</lastUpdated>
+ </versioning>
+</metadata>
diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5 b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5
new file mode 100644
index 0000000000..a4ce5dc9e8
--- /dev/null
+++ b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5
@@ -0,0 +1 @@
+e2b9c7c5f6370dd1d21a0aae5e8dcd77 \ No newline at end of file
diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1 b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1
new file mode 100644
index 0000000000..b869eaf2a6
--- /dev/null
+++ b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1
@@ -0,0 +1 @@
+2a4341da936b6c07a09383d17ffb185ac558ee91 \ No newline at end of file
diff --git a/streaming/pom.xml b/streaming/pom.xml
new file mode 100644
index 0000000000..6ee7e59df3
--- /dev/null
+++ b/streaming/pom.xml
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.spark-project</groupId>
+ <artifactId>parent</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-streaming</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project Streaming</name>
+ <url>http://spark-project.org/</url>
+
+ <repositories>
+ <!-- A repository in the local filesystem for the Kafka JAR, which we modified for Scala 2.9 -->
+ <repository>
+ <id>lib</id>
+ <url>file://${project.basedir}/lib</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>1.9.11</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka</artifactId>
+ <version>0.7.2-spark</version> <!-- Comes from our in-project repository -->
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ <version>1.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.sgroschupf</groupId>
+ <artifactId>zkclient</artifactId>
+ <version>0.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>hadoop1</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop1</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop1</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop2</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop2</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 2f3adb39c2..7405c8b22e 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -6,6 +6,8 @@ import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.conf.Configuration
import java.io._
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
+import java.util.concurrent.Executors
private[streaming]
@@ -17,7 +19,8 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
- val checkpointDuration: Duration = ssc.checkpointDuration
+ val checkpointDuration = ssc.checkpointDuration
+ val pendingTimes = ssc.scheduler.jobManager.getPendingTimes()
def validate() {
assert(master != null, "Checkpoint.master is null")
@@ -37,32 +40,50 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
val conf = new Configuration()
var fs = file.getFileSystem(conf)
val maxAttempts = 3
+ val executor = Executors.newFixedThreadPool(1)
- def write(checkpoint: Checkpoint) {
- // TODO: maybe do this in a different thread from the main stream execution thread
- var attempts = 0
- while (attempts < maxAttempts) {
- attempts += 1
- try {
- logDebug("Saving checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'")
- if (fs.exists(file)) {
- val bkFile = new Path(file.getParent, file.getName + ".bk")
- FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
- logDebug("Moved existing checkpoint file to " + bkFile)
+ class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
+ def run() {
+ var attempts = 0
+ val startTime = System.currentTimeMillis()
+ while (attempts < maxAttempts) {
+ attempts += 1
+ try {
+ logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
+ if (fs.exists(file)) {
+ val bkFile = new Path(file.getParent, file.getName + ".bk")
+ FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
+ logDebug("Moved existing checkpoint file to " + bkFile)
+ }
+ val fos = fs.create(file)
+ fos.write(bytes)
+ fos.close()
+ fos.close()
+ val finishTime = System.currentTimeMillis();
+ logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file +
+ "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds")
+ return
+ } catch {
+ case ioe: IOException =>
+ logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
}
- val fos = fs.create(file)
- val oos = new ObjectOutputStream(fos)
- oos.writeObject(checkpoint)
- oos.close()
- logInfo("Checkpoint for time " + checkpoint.checkpointTime + " saved to file '" + file + "'")
- fos.close()
- return
- } catch {
- case ioe: IOException =>
- logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
}
+ logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'")
}
- logError("Could not write checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'")
+ }
+
+ def write(checkpoint: Checkpoint) {
+ val bos = new ByteArrayOutputStream()
+ val zos = new LZFOutputStream(bos)
+ val oos = new ObjectOutputStream(zos)
+ oos.writeObject(checkpoint)
+ oos.close()
+ bos.close()
+ executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
+ }
+
+ def stop() {
+ executor.shutdown()
}
}
@@ -84,7 +105,8 @@ object CheckpointReader extends Logging {
// of ObjectInputStream is used to explicitly use the current thread's default class
// loader to find and load classes. This is a well know Java issue and has popped up
// in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
- val ois = new ObjectInputStreamWithLoader(fis, Thread.currentThread().getContextClassLoader)
+ val zis = new LZFInputStream(fis)
+ val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader)
val cp = ois.readObject.asInstanceOf[Checkpoint]
ois.close()
fs.close()
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 036763fe2f..e1be5ef51c 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -12,7 +12,7 @@ import scala.collection.mutable.HashMap
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
/**
@@ -75,7 +75,7 @@ abstract class DStream[T: ClassManifest] (
// Checkpoint details
protected[streaming] val mustCheckpoint = false
protected[streaming] var checkpointDuration: Duration = null
- protected[streaming] var checkpointData = new DStreamCheckpointData(HashMap[Time, Any]())
+ protected[streaming] val checkpointData = new DStreamCheckpointData(this)
// Reference to whole DStream graph
protected[streaming] var graph: DStreamGraph = null
@@ -85,10 +85,10 @@ abstract class DStream[T: ClassManifest] (
// Duration for which the DStream requires its parent DStream to remember each RDD created
protected[streaming] def parentRememberDuration = rememberDuration
- /** Returns the StreamingContext associated with this DStream */
- def context() = ssc
+ /** Return the StreamingContext associated with this DStream */
+ def context = ssc
- /** Persists the RDDs of this DStream with the given storage level */
+ /** Persist the RDDs of this DStream with the given storage level */
def persist(level: StorageLevel): DStream[T] = {
if (this.isInitialized) {
throw new UnsupportedOperationException(
@@ -132,7 +132,7 @@ abstract class DStream[T: ClassManifest] (
// Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
- checkpointDuration = slideDuration.max(Seconds(10))
+ checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
logInfo("Checkpoint interval automatically set to " + checkpointDuration)
}
@@ -154,11 +154,17 @@ abstract class DStream[T: ClassManifest] (
assert(
!mustCheckpoint || checkpointDuration != null,
- "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set. " +
+ "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." +
" Please use DStream.checkpoint() to set the interval."
)
assert(
+ checkpointDuration == null || context.sparkContext.checkpointDir.isDefined,
+ "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" +
+ " or SparkContext.checkpoint() to set the checkpoint directory."
+ )
+
+ assert(
checkpointDuration == null || checkpointDuration >= slideDuration,
"The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " +
@@ -192,10 +198,10 @@ abstract class DStream[T: ClassManifest] (
metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
"It seems you are doing some DStream window operation or setting a checkpoint interval " +
"which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
- "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
- "delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " +
- "the Java property 'spark.cleaner.delay' to more than " +
- math.ceil(rememberDuration.milliseconds.toDouble / 60000.0).toInt + " minutes."
+ "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" +
+ "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " +
+ "set the Java property 'spark.cleaner.delay' to more than " +
+ math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
)
dependencies.foreach(_.validate())
@@ -232,13 +238,15 @@ abstract class DStream[T: ClassManifest] (
dependencies.foreach(_.remember(parentRememberDuration))
}
- /** This method checks whether the 'time' is valid wrt slideDuration for generating RDD */
+ /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
protected def isTimeValid(time: Time): Boolean = {
if (!isInitialized) {
throw new Exception (this + " has not been initialized")
} else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
+ logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
false
} else {
+ logInfo("Time " + time + " is valid")
true
}
}
@@ -286,14 +294,14 @@ abstract class DStream[T: ClassManifest] (
* Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
- * (eg. ForEachDStream).
+ * to generate their own jobs.
*/
protected[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
- val emptyFunc = { (iterator: Iterator[T]) => {} }
- ssc.sc.runJob(rdd, emptyFunc)
+ val emptyFunc = { (iterator: Iterator[T]) => {} }
+ context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
@@ -302,20 +310,18 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Dereference RDDs that are older than rememberDuration.
+ * Clear metadata that are older than `rememberDuration` of this DStream.
+ * This is an internal method that should not be called directly. This default
+ * implementation clears the old generated RDDs. Subclasses of DStream may override
+ * this to clear their own metadata along with the generated RDDs.
*/
- protected[streaming] def forgetOldRDDs(time: Time) {
- val keys = generatedRDDs.keys
+ protected[streaming] def clearOldMetadata(time: Time) {
var numForgotten = 0
- keys.foreach(t => {
- if (t <= (time - rememberDuration)) {
- generatedRDDs.remove(t)
- numForgotten += 1
- logInfo("Forgot RDD of time " + t + " from " + this)
- }
- })
- logInfo("Forgot " + numForgotten + " RDDs from " + this)
- dependencies.foreach(_.forgetOldRDDs(time))
+ val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
+ generatedRDDs --= oldRDDs.keys
+ logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " +
+ (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
+ dependencies.foreach(_.clearOldMetadata(time))
}
/* Adds metadata to the Stream while it is running.
@@ -336,40 +342,10 @@ abstract class DStream[T: ClassManifest] (
*/
protected[streaming] def updateCheckpointData(currentTime: Time) {
logInfo("Updating checkpoint data for time " + currentTime)
-
- // Get the checkpointed RDDs from the generated RDDs
- val newRdds = generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
- .map(x => (x._1, x._2.getCheckpointFile.get))
-
- // Make a copy of the existing checkpoint data (checkpointed RDDs)
- val oldRdds = checkpointData.rdds.clone()
-
- // If the new checkpoint data has checkpoints then replace existing with the new one
- if (newRdds.size > 0) {
- checkpointData.rdds.clear()
- checkpointData.rdds ++= newRdds
- }
-
- // Make parent DStreams update their checkpoint data
+ checkpointData.update()
dependencies.foreach(_.updateCheckpointData(currentTime))
-
- // TODO: remove this, this is just for debugging
- newRdds.foreach {
- case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") }
- }
-
- if (newRdds.size > 0) {
- (oldRdds -- newRdds.keySet).foreach {
- case (time, data) => {
- val path = new Path(data.toString)
- val fs = path.getFileSystem(new Configuration())
- fs.delete(path, true)
- logInfo("Deleted checkpoint file '" + path + "' for time " + time)
- }
- }
- }
- logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.rdds.size + " checkpoints, "
- + "[" + checkpointData.rdds.mkString(",") + "]")
+ checkpointData.cleanup()
+ logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
}
/**
@@ -380,14 +356,8 @@ abstract class DStream[T: ClassManifest] (
*/
protected[streaming] def restoreCheckpointData() {
// Create RDDs from the checkpoint data
- logInfo("Restoring checkpoint data from " + checkpointData.rdds.size + " checkpointed RDDs")
- checkpointData.rdds.foreach {
- case(time, data) => {
- logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'")
- val rdd = ssc.sc.checkpointFile[T](data.toString)
- generatedRDDs += ((time, rdd))
- }
- }
+ logInfo("Restoring checkpoint data")
+ checkpointData.restore()
dependencies.foreach(_.restoreCheckpointData())
logInfo("Restored checkpoint data")
}
@@ -427,7 +397,7 @@ abstract class DStream[T: ClassManifest] (
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassManifest](mapFunc: T => U): DStream[U] = {
- new MappedDStream(this, ssc.sc.clean(mapFunc))
+ new MappedDStream(this, context.sparkContext.clean(mapFunc))
}
/**
@@ -435,7 +405,7 @@ abstract class DStream[T: ClassManifest] (
* and then flattening the results
*/
def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = {
- new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc))
+ new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
/** Return a new DStream containing only the elements that satisfy a predicate. */
@@ -457,7 +427,7 @@ abstract class DStream[T: ClassManifest] (
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean = false
): DStream[U] = {
- new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc), preservePartitioning)
+ new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning)
}
/**
@@ -474,6 +444,15 @@ abstract class DStream[T: ClassManifest] (
def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _)
/**
+ * Return a new DStream in which each RDD contains the counts of each distinct value in
+ * each RDD of this DStream. Hash partitioning is used to generate
+ * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
+ * `numPartitions` not specified).
+ */
+ def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] =
+ this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
+
+ /**
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
@@ -486,7 +465,7 @@ abstract class DStream[T: ClassManifest] (
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: (RDD[T], Time) => Unit) {
- val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc))
+ val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
ssc.registerOutputStream(newStream)
newStream
}
@@ -504,7 +483,7 @@ abstract class DStream[T: ClassManifest] (
* on each RDD of this DStream.
*/
def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
- new TransformedDStream(this, ssc.sc.clean(transformFunc))
+ new TransformedDStream(this, context.sparkContext.clean(transformFunc))
}
/**
@@ -521,19 +500,21 @@ abstract class DStream[T: ClassManifest] (
if (first11.size > 10) println("...")
println()
}
- val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc))
+ val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
ssc.registerOutputStream(newStream)
}
/**
- * Return a new DStream which is computed based on windowed batches of this DStream.
- * The new DStream generates RDDs with the same interval as this DStream.
+ * Return a new DStream in which each RDD contains all the elements in seen in a
+ * sliding window of time over this DStream. The new DStream generates RDDs with
+ * the same interval as this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's interval.
*/
def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
/**
- * Return a new DStream which is computed based on windowed batches of this DStream.
+ * Return a new DStream in which each RDD contains all the elements in seen in a
+ * sliding window of time over this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
@@ -545,27 +526,39 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Return a new DStream which computed based on tumbling window on this DStream.
- * This is equivalent to window(batchTime, batchTime).
- * @param batchDuration tumbling window duration; must be a multiple of this DStream's
- * batching interval
- */
- def tumble(batchDuration: Duration): DStream[T] = window(batchDuration, batchDuration)
-
- /**
* Return a new DStream in which each RDD has a single element generated by reducing all
- * elements in a window over this DStream. windowDuration and slideDuration are as defined
- * in the window() operation. This is equivalent to
- * window(windowDuration, slideDuration).reduce(reduceFunc)
+ * elements in a sliding window over this DStream.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
*/
def reduceByWindow(
reduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T] = {
- this.window(windowDuration, slideDuration).reduce(reduceFunc)
+ this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
}
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a sliding window over this DStream. However, the reduction is done incrementally
+ * using the old window's reduced value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient than reduceByWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
def reduceByWindow(
reduceFunc: (T, T) => T,
invReduceFunc: (T, T) => T,
@@ -579,14 +572,47 @@ abstract class DStream[T: ClassManifest] (
/**
* Return a new DStream in which each RDD has a single element generated by counting the number
- * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
- * window() operation. This is equivalent to window(windowDuration, slideDuration).count()
+ * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
*/
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = {
this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
}
/**
+ * Return a new DStream in which each RDD contains the count of distinct elements in
+ * RDDs in a sliding window over this DStream. Hash partitioning is used to generate
+ * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
+ * `numPartitions` not specified).
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ */
+ def countByValueAndWindow(
+ windowDuration: Duration,
+ slideDuration: Duration,
+ numPartitions: Int = ssc.sc.defaultParallelism
+ ): DStream[(T, Long)] = {
+
+ this.map(x => (x, 1L)).reduceByKeyAndWindow(
+ (x: Long, y: Long) => x + y,
+ (x: Long, y: Long) => x - y,
+ windowDuration,
+ slideDuration,
+ numPartitions,
+ (x: (T, Long)) => x._2 != 0L
+ )
+ }
+
+ /**
* Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same slideDuration as this DStream.
*/
@@ -603,16 +629,21 @@ abstract class DStream[T: ClassManifest] (
* Return all the RDDs between 'fromTime' to 'toTime' (both included)
*/
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
- val rdds = new ArrayBuffer[RDD[T]]()
- var time = toTime.floor(slideDuration)
- while (time >= zeroTime && time >= fromTime) {
- getOrCompute(time) match {
- case Some(rdd) => rdds += rdd
- case None => //throw new Exception("Could not get RDD for time " + time)
- }
- time -= slideDuration
+ if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
+ logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
+ }
+ if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
+ logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
}
- rdds.toSeq
+ val alignedToTime = toTime.floor(slideDuration)
+ val alignedFromTime = fromTime.floor(slideDuration)
+
+ logInfo("Slicing from " + fromTime + " to " + toTime +
+ " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
+
+ alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
+ if (time >= zeroTime) getOrCompute(time) else None
+ })
}
/**
@@ -645,7 +676,3 @@ abstract class DStream[T: ClassManifest] (
ssc.registerOutputStream(this)
}
}
-
-private[streaming]
-case class DStreamCheckpointData(rdds: HashMap[Time, Any])
-
diff --git a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala
new file mode 100644
index 0000000000..6b0fade7c6
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala
@@ -0,0 +1,93 @@
+package spark.streaming
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.conf.Configuration
+import collection.mutable.HashMap
+import spark.Logging
+
+
+
+private[streaming]
+class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
+ extends Serializable with Logging {
+ protected val data = new HashMap[Time, AnyRef]()
+
+ @transient private var fileSystem : FileSystem = null
+ @transient private var lastCheckpointFiles: HashMap[Time, String] = null
+
+ protected[streaming] def checkpointFiles = data.asInstanceOf[HashMap[Time, String]]
+
+ /**
+ * Updates the checkpoint data of the DStream. This gets called every time
+ * the graph checkpoint is initiated. Default implementation records the
+ * checkpoint files to which the generate RDDs of the DStream has been saved.
+ */
+ def update() {
+
+ // Get the checkpointed RDDs from the generated RDDs
+ val newCheckpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
+ .map(x => (x._1, x._2.getCheckpointFile.get))
+
+ // Make a copy of the existing checkpoint data (checkpointed RDDs)
+ lastCheckpointFiles = checkpointFiles.clone()
+
+ // If the new checkpoint data has checkpoints then replace existing with the new one
+ if (newCheckpointFiles.size > 0) {
+ checkpointFiles.clear()
+ checkpointFiles ++= newCheckpointFiles
+ }
+
+ // TODO: remove this, this is just for debugging
+ newCheckpointFiles.foreach {
+ case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") }
+ }
+ }
+
+ /**
+ * Cleanup old checkpoint data. This gets called every time the graph
+ * checkpoint is initiated, but after `update` is called. Default
+ * implementation, cleans up old checkpoint files.
+ */
+ def cleanup() {
+ // If there is at least on checkpoint file in the current checkpoint files,
+ // then delete the old checkpoint files.
+ if (checkpointFiles.size > 0 && lastCheckpointFiles != null) {
+ (lastCheckpointFiles -- checkpointFiles.keySet).foreach {
+ case (time, file) => {
+ try {
+ val path = new Path(file)
+ if (fileSystem == null) {
+ fileSystem = path.getFileSystem(new Configuration())
+ }
+ fileSystem.delete(path, true)
+ logInfo("Deleted checkpoint file '" + file + "' for time " + time)
+ } catch {
+ case e: Exception =>
+ logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e)
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Restore the checkpoint data. This gets called once when the DStream graph
+ * (along with its DStreams) are being restored from a graph checkpoint file.
+ * Default implementation restores the RDDs from their checkpoint files.
+ */
+ def restore() {
+ // Create RDDs from the checkpoint data
+ checkpointFiles.foreach {
+ case(time, file) => {
+ logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'")
+ dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file)))
+ }
+ }
+ }
+
+ override def toString() = {
+ "[\n" + checkpointFiles.size + " checkpoint files \n" + checkpointFiles.mkString("\n") + "\n]"
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index bc4a40d7bc..adb7f3a24d 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -11,17 +11,20 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
- private[streaming] var zeroTime: Time = null
- private[streaming] var batchDuration: Duration = null
- private[streaming] var rememberDuration: Duration = null
- private[streaming] var checkpointInProgress = false
+ var rememberDuration: Duration = null
+ var checkpointInProgress = false
- private[streaming] def start(time: Time) {
+ var zeroTime: Time = null
+ var startTime: Time = null
+ var batchDuration: Duration = null
+
+ def start(time: Time) {
this.synchronized {
if (zeroTime != null) {
throw new Exception("DStream graph computation already started")
}
zeroTime = time
+ startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validate)
@@ -29,19 +32,23 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
}
- private[streaming] def stop() {
+ def restart(time: Time) {
+ this.synchronized { startTime = time }
+ }
+
+ def stop() {
this.synchronized {
inputStreams.par.foreach(_.stop())
}
}
- private[streaming] def setContext(ssc: StreamingContext) {
+ def setContext(ssc: StreamingContext) {
this.synchronized {
outputStreams.foreach(_.setContext(ssc))
}
}
- private[streaming] def setBatchDuration(duration: Duration) {
+ def setBatchDuration(duration: Duration) {
this.synchronized {
if (batchDuration != null) {
throw new Exception("Batch duration already set as " + batchDuration +
@@ -51,59 +58,68 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
batchDuration = duration
}
- private[streaming] def remember(duration: Duration) {
+ def remember(duration: Duration) {
this.synchronized {
if (rememberDuration != null) {
throw new Exception("Batch duration already set as " + batchDuration +
". cannot set it again.")
}
+ rememberDuration = duration
}
- rememberDuration = duration
}
- private[streaming] def addInputStream(inputStream: InputDStream[_]) {
+ def addInputStream(inputStream: InputDStream[_]) {
this.synchronized {
inputStream.setGraph(this)
inputStreams += inputStream
}
}
- private[streaming] def addOutputStream(outputStream: DStream[_]) {
+ def addOutputStream(outputStream: DStream[_]) {
this.synchronized {
outputStream.setGraph(this)
outputStreams += outputStream
}
}
- private[streaming] def getInputStreams() = this.synchronized { inputStreams.toArray }
+ def getInputStreams() = this.synchronized { inputStreams.toArray }
- private[streaming] def getOutputStreams() = this.synchronized { outputStreams.toArray }
+ def getOutputStreams() = this.synchronized { outputStreams.toArray }
- private[streaming] def generateRDDs(time: Time): Seq[Job] = {
+ def generateJobs(time: Time): Seq[Job] = {
this.synchronized {
- outputStreams.flatMap(outputStream => outputStream.generateJob(time))
+ logInfo("Generating jobs for time " + time)
+ val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time))
+ logInfo("Generated " + jobs.length + " jobs for time " + time)
+ jobs
}
}
- private[streaming] def forgetOldRDDs(time: Time) {
+ def clearOldMetadata(time: Time) {
this.synchronized {
- outputStreams.foreach(_.forgetOldRDDs(time))
+ logInfo("Clearing old metadata for time " + time)
+ outputStreams.foreach(_.clearOldMetadata(time))
+ logInfo("Cleared old metadata for time " + time)
}
}
- private[streaming] def updateCheckpointData(time: Time) {
+ def updateCheckpointData(time: Time) {
this.synchronized {
+ logInfo("Updating checkpoint data for time " + time)
outputStreams.foreach(_.updateCheckpointData(time))
+ logInfo("Updated checkpoint data for time " + time)
}
}
- private[streaming] def restoreCheckpointData() {
+ def restoreCheckpointData() {
this.synchronized {
+ logInfo("Restoring checkpoint data")
outputStreams.foreach(_.restoreCheckpointData())
+ logInfo("Restored checkpoint data")
}
}
- private[streaming] def validate() {
+ def validate() {
this.synchronized {
assert(batchDuration != null, "Batch duration has not been set")
//assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low")
diff --git a/streaming/src/main/scala/spark/streaming/Duration.scala b/streaming/src/main/scala/spark/streaming/Duration.scala
index e4dc579a17..ee26206e24 100644
--- a/streaming/src/main/scala/spark/streaming/Duration.scala
+++ b/streaming/src/main/scala/spark/streaming/Duration.scala
@@ -16,7 +16,7 @@ case class Duration (private val millis: Long) {
def * (times: Int): Duration = new Duration(millis * times)
- def / (that: Duration): Long = millis / that.millis
+ def / (that: Duration): Double = millis.toDouble / that.millis.toDouble
def isMultipleOf(that: Duration): Boolean =
(this.millis % that.millis == 0)
diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala
index dc21dfb722..6a8b81760e 100644
--- a/streaming/src/main/scala/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/spark/streaming/Interval.scala
@@ -30,6 +30,7 @@ class Interval(val beginTime: Time, val endTime: Time) {
override def toString = "[" + beginTime + ", " + endTime + "]"
}
+private[streaming]
object Interval {
def currentInterval(duration: Duration): Interval = {
val time = new Time(System.currentTimeMillis)
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 3b910538e0..7696c4a592 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -3,6 +3,8 @@ package spark.streaming
import spark.Logging
import spark.SparkEnv
import java.util.concurrent.Executors
+import collection.mutable.HashMap
+import collection.mutable.ArrayBuffer
private[streaming]
@@ -13,21 +15,57 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
SparkEnv.set(ssc.env)
try {
val timeTaken = job.run()
- logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format(
- (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, timeTaken / 1000.0))
+ logInfo("Total delay: %.5f s for job %s of time %s (execution: %.5f s)".format(
+ (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, job.time.milliseconds, timeTaken / 1000.0))
} catch {
case e: Exception =>
logError("Running " + job + " failed", e)
}
+ clearJob(job)
}
}
initLogging()
val jobExecutor = Executors.newFixedThreadPool(numThreads)
-
+ val jobs = new HashMap[Time, ArrayBuffer[Job]]
+
def runJob(job: Job) {
+ jobs.synchronized {
+ jobs.getOrElseUpdate(job.time, new ArrayBuffer[Job]) += job
+ }
jobExecutor.execute(new JobHandler(ssc, job))
logInfo("Added " + job + " to queue")
}
+
+ def stop() {
+ jobExecutor.shutdown()
+ }
+
+ private def clearJob(job: Job) {
+ var timeCleared = false
+ val time = job.time
+ jobs.synchronized {
+ val jobsOfTime = jobs.get(time)
+ if (jobsOfTime.isDefined) {
+ jobsOfTime.get -= job
+ if (jobsOfTime.get.isEmpty) {
+ jobs -= time
+ timeCleared = true
+ }
+ } else {
+ throw new Exception("Job finished for time " + job.time +
+ " but time does not exist in jobs")
+ }
+ }
+ if (timeCleared) {
+ ssc.scheduler.clearOldMetadata(time)
+ }
+ }
+
+ def getPendingTimes(): Array[Time] = {
+ jobs.synchronized {
+ jobs.keySet.toArray
+ }
+ }
}
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index 4ddd0f8680..64972fd5cd 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -4,6 +4,7 @@ import spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
import spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
import spark.Logging
import spark.SparkEnv
+import spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.Queue
@@ -85,7 +86,7 @@ class NetworkInputTracker(
}
case DeregisterReceiver(streamId, msg) => {
receiverInfo -= streamId
- logInfo("De-registered receiver for network stream " + streamId
+ logError("De-registered receiver for network stream " + streamId
+ " with message " + msg)
//TODO: Do something about the corresponding NetworkInputDStream
}
@@ -138,8 +139,12 @@ class NetworkInputTracker(
}
iterator.next().start()
}
+ // Run the dummy Spark job to ensure that all slaves have registered.
+ // This avoids all the receivers to be scheduled on the same node.
+ //ssc.sparkContext.makeRDD(1 to 100, 100).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
+
// Distribute the receivers and start them
- ssc.sc.runJob(tempRDD, startReceiver)
+ ssc.sparkContext.runJob(tempRDD, startReceiver)
}
/** Stops the receivers. */
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index fbcf061126..5a2dd46fa0 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -18,15 +18,15 @@ import org.apache.hadoop.conf.Configuration
class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)])
extends Serializable {
-
- def ssc = self.ssc
+
+ private[streaming] def ssc = self.ssc
private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
new HashPartitioner(numPartitions)
}
/**
- * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
*/
def groupByKey(): DStream[(K, Seq[V])] = {
@@ -34,7 +34,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
@@ -42,7 +42,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `groupByKey` on each RDD. The supplied [[spark.Partitioner]]
+ * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[spark.Partitioner]]
* is used to control the partitioning of each RDD.
*/
def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
@@ -54,7 +54,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the associative reduce function. Hash partitioning is used to generate the RDDs
* with Spark's default number of partitions.
*/
@@ -63,7 +63,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
* with `numPartitions` partitions.
*/
@@ -72,7 +72,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the supplied reduce function. [[spark.Partitioner]] is used to control the
* partitioning of each RDD.
*/
@@ -82,7 +82,7 @@ extends Serializable {
}
/**
- * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
+ * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the
* combineByKey for RDDs. Please refer to combineByKey in [[spark.PairRDDFunctions]] for more
* information.
*/
@@ -95,15 +95,7 @@ extends Serializable {
}
/**
- * Create a new DStream by counting the number of values of each key in each RDD. Hash
- * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions.
- */
- def countByKey(numPartitions: Int = self.ssc.sc.defaultParallelism): DStream[(K, Long)] = {
- self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
- }
-
- /**
- * Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to
+ * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
* `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
* with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
* Spark's default number of partitions.
@@ -115,7 +107,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `groupByKey` over a sliding window. Similar to
+ * Return a new DStream by applying `groupByKey` over a sliding window. Similar to
* `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
@@ -129,7 +121,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* Similar to `DStream.groupByKey()`, but applies it over a sliding window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
@@ -137,7 +129,8 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @param numPartitions number of partitions of each RDD in the new DStream; if not specified
+ * then Spark's default number of partitions will be used
*/
def groupByKeyAndWindow(
windowDuration: Duration,
@@ -155,7 +148,7 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
*/
def groupByKeyAndWindow(
windowDuration: Duration,
@@ -166,7 +159,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * Return a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
* Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
* generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
* the RDDs with Spark's default number of partitions.
@@ -182,7 +175,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
@@ -201,7 +194,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
* @param reduceFunc associative reduce function
@@ -210,10 +203,10 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @param numPartitions number of partitions of each RDD in the new DStream.
*/
def reduceByKeyAndWindow(
- reduceFunc: (V, V) => V,
+ reduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
@@ -222,7 +215,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `reduceByKey` over a sliding window. Similar to
+ * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to
* `DStream.reduceByKey()`, but applies it over a sliding window.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
@@ -230,7 +223,8 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner partitioner for controlling the partitioning of each RDD
+ * in the new DStream.
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
@@ -245,118 +239,78 @@ extends Serializable {
}
/**
- * Create a new DStream by reducing over a using incremental computation.
- * The reduced value of over a new window is calculated using the old window's reduce value :
+ * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
+ * The reduced value of over a new window is calculated using the old window's reduced value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
+ *
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
- * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ *
+ * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
- * @param invReduceFunc inverse function
+ * @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
+ * @param filterFunc Optional function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
- slideDuration: Duration
+ slideDuration: Duration = self.slideDuration,
+ numPartitions: Int = ssc.sc.defaultParallelism,
+ filterFunc: ((K, V)) => Boolean = null
): DStream[(K, V)] = {
reduceByKeyAndWindow(
- reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner())
- }
-
- /**
- * Create a new DStream by reducing over a using incremental computation.
- * The reduced value of over a new window is calculated using the old window's reduce value :
- * 1. reduce the new values that entered the window (e.g., adding new counts)
- * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
- * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
- * However, it is applicable to only "invertible reduce functions".
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
- * @param reduceFunc associative reduce function
- * @param invReduceFunc inverse function
- * @param windowDuration width of the window; must be a multiple of this DStream's
- * batching interval
- * @param slideDuration sliding interval of the window (i.e., the interval after which
- * the new DStream will generate RDDs); must be a multiple of this
- * DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
- */
- def reduceByKeyAndWindow(
- reduceFunc: (V, V) => V,
- invReduceFunc: (V, V) => V,
- windowDuration: Duration,
- slideDuration: Duration,
- numPartitions: Int
- ): DStream[(K, V)] = {
-
- reduceByKeyAndWindow(
- reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
+ reduceFunc, invReduceFunc, windowDuration,
+ slideDuration, defaultPartitioner(numPartitions), filterFunc
+ )
}
/**
- * Create a new DStream by reducing over a using incremental computation.
- * The reduced value of over a new window is calculated using the old window's reduce value :
+ * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
+ * The reduced value of over a new window is calculated using the old window's reduced value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
- * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
- * @param reduceFunc associative reduce function
- * @param invReduceFunc inverse function
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param filterFunc Optional function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
- partitioner: Partitioner
+ partitioner: Partitioner,
+ filterFunc: ((K, V)) => Boolean
): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
+ val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
new ReducedWindowedDStream[K, V](
- self, cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, partitioner)
- }
-
- /**
- * Create a new DStream by counting the number of values for each key over a window.
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
- * @param windowDuration width of the window; must be a multiple of this DStream's
- * batching interval
- * @param slideDuration sliding interval of the window (i.e., the interval after which
- * the new DStream will generate RDDs); must be a multiple of this
- * DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
- */
- def countByKeyAndWindow(
- windowDuration: Duration,
- slideDuration: Duration,
- numPartitions: Int = self.ssc.sc.defaultParallelism
- ): DStream[(K, Long)] = {
-
- self.map(x => (x._1, 1L)).reduceByKeyAndWindow(
- (x: Long, y: Long) => x + y,
- (x: Long, y: Long) => x - y,
- windowDuration,
- slideDuration,
- numPartitions
+ self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
+ windowDuration, slideDuration, partitioner
)
}
/**
- * Create a new "state" DStream where the state for each key is updated by applying
+ * Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
@@ -370,7 +324,7 @@ extends Serializable {
}
/**
- * Create a new "state" DStream where the state for each key is updated by applying
+ * Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param updateFunc State update function. If `this` function returns None, then
@@ -405,7 +359,7 @@ extends Serializable {
}
/**
- * Create a new "state" DStream where the state for each key is updated by applying
+ * Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* [[spark.Paxrtitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
@@ -447,7 +401,7 @@ extends Serializable {
}
/**
- * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
+ * Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this`
* or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
* key in both RDDs. Partitioner is used to partition each generated RDD.
*/
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index c04ed37de8..1c4b22a898 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -9,11 +9,8 @@ class Scheduler(ssc: StreamingContext) extends Logging {
initLogging()
- val graph = ssc.graph
-
val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
val jobManager = new JobManager(ssc, concurrentJobs)
-
val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
new CheckpointWriter(ssc.checkpointDir)
} else {
@@ -23,54 +20,93 @@ class Scheduler(ssc: StreamingContext) extends Logging {
val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
- longTime => generateRDDs(new Time(longTime)))
+ longTime => generateJobs(new Time(longTime)))
+ val graph = ssc.graph
+ var latestTime: Time = null
- def start() {
- // If context was started from checkpoint, then restart timer such that
- // this timer's triggers occur at the same time as the original timer.
- // Otherwise just start the timer from scratch, and initialize graph based
- // on this first trigger time of the timer.
+ def start() = synchronized {
if (ssc.isCheckpointPresent) {
- // If manual clock is being used for testing, then
- // either set the manual clock to the last checkpointed time,
- // or if the property is defined set it to that time
- if (clock.isInstanceOf[ManualClock]) {
- val lastTime = ssc.getInitialCheckpoint.checkpointTime.milliseconds
- val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
- clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
- }
- timer.restart(graph.zeroTime.milliseconds)
- logInfo("Scheduler's timer restarted")
+ restart()
} else {
- val firstTime = new Time(timer.start())
- graph.start(firstTime - ssc.graph.batchDuration)
- logInfo("Scheduler's timer started")
+ startFirstTime()
}
logInfo("Scheduler started")
}
- def stop() {
+ def stop() = synchronized {
timer.stop()
- graph.stop()
+ jobManager.stop()
+ if (checkpointWriter != null) checkpointWriter.stop()
+ ssc.graph.stop()
logInfo("Scheduler stopped")
}
-
- private def generateRDDs(time: Time) {
+
+ private def startFirstTime() {
+ val startTime = new Time(timer.getStartTime())
+ graph.start(startTime - graph.batchDuration)
+ timer.start(startTime.milliseconds)
+ logInfo("Scheduler's timer started at " + startTime)
+ }
+
+ private def restart() {
+
+ // If manual clock is being used for testing, then
+ // either set the manual clock to the last checkpointed time,
+ // or if the property is defined set it to that time
+ if (clock.isInstanceOf[ManualClock]) {
+ val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
+ val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
+ clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
+ }
+
+ val batchDuration = ssc.graph.batchDuration
+
+ // Batches when the master was down, that is,
+ // between the checkpoint and current restart time
+ val checkpointTime = ssc.initialCheckpoint.checkpointTime
+ val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
+ val downTimes = checkpointTime.until(restartTime, batchDuration)
+ logInfo("Batches during down time: " + downTimes.mkString(", "))
+
+ // Batches that were unprocessed before failure
+ val pendingTimes = ssc.initialCheckpoint.pendingTimes
+ logInfo("Batches pending processing: " + pendingTimes.mkString(", "))
+ // Reschedule jobs for these times
+ val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
+ logInfo("Batches to reschedule: " + timesToReschedule.mkString(", "))
+ timesToReschedule.foreach(time =>
+ graph.generateJobs(time).foreach(jobManager.runJob)
+ )
+
+ // Restart the timer
+ timer.start(restartTime.milliseconds)
+ logInfo("Scheduler's timer restarted at " + restartTime)
+ }
+
+ /** Generate jobs and perform checkpoint for the given `time`. */
+ def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
- graph.generateRDDs(time).foreach(jobManager.runJob)
- graph.forgetOldRDDs(time)
+ graph.generateJobs(time).foreach(jobManager.runJob)
+ latestTime = time
+ doCheckpoint(time)
+ }
+
+ /**
+ * Clear old metadata assuming jobs of `time` have finished processing.
+ * And also perform checkpoint.
+ */
+ def clearOldMetadata(time: Time) {
+ ssc.graph.clearOldMetadata(time)
doCheckpoint(time)
- logInfo("Generated RDDs for time " + time)
}
- private def doCheckpoint(time: Time) {
+ /** Perform checkpoint for the give `time`. */
+ def doCheckpoint(time: Time) = synchronized {
if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
- val startTime = System.currentTimeMillis()
+ logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time))
- val stopTime = System.currentTimeMillis()
- logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms")
}
}
}
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index cd7379da14..48d344f055 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import java.util.UUID
+import twitter4j.Status
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -35,14 +36,14 @@ class StreamingContext private (
) extends Logging {
/**
- * Creates a StreamingContext using an existing SparkContext.
+ * Create a StreamingContext using an existing SparkContext.
* @param sparkContext Existing SparkContext
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration)
/**
- * Creates a StreamingContext by providing the details necessary for creating a new SparkContext.
+ * Create a StreamingContext by providing the details necessary for creating a new SparkContext.
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param frameworkName A name for your job, to display on the cluster web UI
* @param batchDuration The time interval at which streaming data will be divided into batches
@@ -51,7 +52,7 @@ class StreamingContext private (
this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
/**
- * Re-creates a StreamingContext from a checkpoint file.
+ * Re-create a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
*/
@@ -66,7 +67,7 @@ class StreamingContext private (
protected[streaming] val isCheckpointPresent = (cp_ != null)
- val sc: SparkContext = {
+ protected[streaming] val sc: SparkContext = {
if (isCheckpointPresent) {
new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars)
} else {
@@ -106,7 +107,12 @@ class StreamingContext private (
protected[streaming] var scheduler: Scheduler = null
/**
- * Sets each DStreams in this context to remember RDDs it generated in the last given duration.
+ * Return the associated Spark context
+ */
+ def sparkContext = sc
+
+ /**
+ * Set each DStreams in this context to remember RDDs it generated in the last given duration.
* DStreams remember RDDs only for a limited duration of time and releases them for garbage
* collection. This method allows the developer to specify how to long to remember the RDDs (
* if the developer wishes to query old data outside the DStream computation).
@@ -117,23 +123,20 @@ class StreamingContext private (
}
/**
- * Sets the context to periodically checkpoint the DStream operations for master
- * fault-tolerance. By default, the graph will be checkpointed every batch interval.
+ * Set the context to periodically checkpoint the DStream operations for master
+ * fault-tolerance. The graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
- * @param interval checkpoint interval
*/
- def checkpoint(directory: String, interval: Duration = null) {
+ def checkpoint(directory: String) {
if (directory != null) {
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
checkpointDir = directory
- checkpointDuration = interval
} else {
checkpointDir = null
- checkpointDuration = null
}
}
- protected[streaming] def getInitialCheckpoint(): Checkpoint = {
+ protected[streaming] def initialCheckpoint: Checkpoint = {
if (isCheckpointPresent) cp_ else null
}
@@ -165,8 +168,7 @@ class StreamingContext private (
/**
* Create an input stream that pulls messages form a Kafka Broker.
- * @param hostname Zookeper hostname.
- * @param port Zookeper port.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
@@ -175,14 +177,13 @@ class StreamingContext private (
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def kafkaStream[T: ClassManifest](
- hostname: String,
- port: Int,
+ zkQuorum: String,
groupId: String,
topics: Map[String, Int],
initialOffsets: Map[KafkaPartitionKey, Long] = Map[KafkaPartitionKey, Long](),
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
): DStream[T] = {
- val inputStream = new KafkaInputDStream[T](this, hostname, port, groupId, topics, initialOffsets, storageLevel)
+ val inputStream = new KafkaInputDStream[T](this, zkQuorum, groupId, topics, initialOffsets, storageLevel)
registerInputStream(inputStream)
inputStream
}
@@ -226,7 +227,7 @@ class StreamingContext private (
}
/**
- * Creates a input stream from a Flume source.
+ * Create a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
@@ -262,7 +263,7 @@ class StreamingContext private (
}
/**
- * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
@@ -281,7 +282,7 @@ class StreamingContext private (
}
/**
- * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* @param directory HDFS directory to monitor for new file
* @param filter Function to filter paths to process
@@ -300,9 +301,8 @@ class StreamingContext private (
inputStream
}
-
/**
- * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
@@ -312,17 +312,49 @@ class StreamingContext private (
}
/**
- * Creates a input stream from an queue of RDDs. In each batch,
+ * Create a input stream that returns tweets received from Twitter.
+ * @param username Twitter username
+ * @param password Twitter password
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ username: String,
+ password: String,
+ filters: Seq[String],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[Status] = {
+ val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel)
+ registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create an input stream from a queue of RDDs. In each batch,
+ * it will process either one or all of the RDDs returned by the queue.
+ * @param queue Queue of RDDs
+ * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
+ * @tparam T Type of objects in the RDD
+ */
+ def queueStream[T: ClassManifest](
+ queue: Queue[RDD[T]],
+ oneAtATime: Boolean = true
+ ): DStream[T] = {
+ queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1))
+ }
+
+ /**
+ * Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
- * @param defaultRDD Default RDD is returned by the DStream when the queue is empty
+ * @param defaultRDD Default RDD is returned by the DStream when the queue is empty. Set as null if no RDD should be returned when empty
* @tparam T Type of objects in the RDD
*/
def queueStream[T: ClassManifest](
queue: Queue[RDD[T]],
- oneAtATime: Boolean = true,
- defaultRDD: RDD[T] = null
+ oneAtATime: Boolean,
+ defaultRDD: RDD[T]
): DStream[T] = {
val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
registerInputStream(inputStream)
@@ -337,7 +369,7 @@ class StreamingContext private (
}
/**
- * Registers an input stream that will be started (InputDStream.start() called) to get the
+ * Register an input stream that will be started (InputDStream.start() called) to get the
* input data.
*/
def registerInputStream(inputStream: InputDStream[_]) {
@@ -345,7 +377,7 @@ class StreamingContext private (
}
/**
- * Registers an output stream that will be computed every interval
+ * Register an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: DStream[_]) {
graph.addOutputStream(outputStream)
@@ -363,7 +395,7 @@ class StreamingContext private (
}
/**
- * Starts the execution of the streams.
+ * Start the execution of the streams.
*/
def start() {
if (checkpointDir != null && checkpointDuration == null && graph != null) {
@@ -391,7 +423,7 @@ class StreamingContext private (
}
/**
- * Sstops the execution of the streams.
+ * Stop the execution of the streams.
*/
def stop() {
try {
@@ -418,7 +450,7 @@ object StreamingContext {
// Set the default cleaner delay to an hour if not already set.
// This should be sufficient for even 1 second interval.
if (MetadataCleaner.getDelaySeconds < 0) {
- MetadataCleaner.setDelaySeconds(60)
+ MetadataCleaner.setDelaySeconds(3600)
}
new SparkContext(master, frameworkName)
}
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index 5daeb761dd..f14decf08b 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -37,6 +37,19 @@ case class Time(private val millis: Long) {
def max(that: Time): Time = if (this > that) this else that
+ def until(that: Time, interval: Duration): Seq[Time] = {
+ (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_))
+ }
+
+ def to(that: Time, interval: Duration): Seq[Time] = {
+ (this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_))
+ }
+
+
override def toString: String = (millis.toString + " ms")
+}
+
+object Time {
+ val ordering = Ordering.by((time: Time) => time.millis)
} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
index 2e7466b16c..30985b4ebc 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -36,7 +36,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
def cache(): JavaDStream[T] = dstream.cache()
/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
- def persist(): JavaDStream[T] = dstream.cache()
+ def persist(): JavaDStream[T] = dstream.persist()
/** Persist the RDDs of this DStream with the given storage level */
def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel)
@@ -50,34 +50,27 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
}
/**
- * Return a new DStream which is computed based on windowed batches of this DStream.
- * The new DStream generates RDDs with the same interval as this DStream.
+ * Return a new DStream in which each RDD contains all the elements in seen in a
+ * sliding window of time over this DStream. The new DStream generates RDDs with
+ * the same interval as this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's interval.
- * @return
*/
def window(windowDuration: Duration): JavaDStream[T] =
dstream.window(windowDuration)
/**
- * Return a new DStream which is computed based on windowed batches of this DStream.
- * @param windowDuration duration (i.e., width) of the window;
- * must be a multiple of this DStream's interval
+ * Return a new DStream in which each RDD contains all the elements in seen in a
+ * sliding window of time over this DStream.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
- * the new DStream will generate RDDs); must be a multiple of this
- * DStream's interval
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
*/
def window(windowDuration: Duration, slideDuration: Duration): JavaDStream[T] =
dstream.window(windowDuration, slideDuration)
/**
- * Return a new DStream which computed based on tumbling window on this DStream.
- * This is equivalent to window(batchDuration, batchDuration).
- * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval
- */
- def tumble(batchDuration: Duration): JavaDStream[T] =
- dstream.tumble(batchDuration)
-
- /**
* Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
*/
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index b93cb7865a..1c1ba05ff9 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -34,6 +34,26 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
def count(): JavaDStream[JLong] = dstream.count()
/**
+ * Return a new DStream in which each RDD contains the counts of each distinct value in
+ * each RDD of this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
+ */
+ def countByValue(): JavaPairDStream[T, JLong] = {
+ JavaPairDStream.scalaToJavaLong(dstream.countByValue())
+ }
+
+ /**
+ * Return a new DStream in which each RDD contains the counts of each distinct value in
+ * each RDD of this DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ */
+ def countByValue(numPartitions: Int): JavaPairDStream[T, JLong] = {
+ JavaPairDStream.scalaToJavaLong(dstream.countByValue(numPartitions))
+ }
+
+
+ /**
* Return a new DStream in which each RDD has a single element generated by counting the number
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
* window() operation. This is equivalent to window(windowDuration, slideDuration).count()
@@ -43,6 +63,39 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
}
/**
+ * Return a new DStream in which each RDD contains the count of distinct elements in
+ * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration)
+ : JavaPairDStream[T, JLong] = {
+ JavaPairDStream.scalaToJavaLong(
+ dstream.countByValueAndWindow(windowDuration, slideDuration))
+ }
+
+ /**
+ * Return a new DStream in which each RDD contains the count of distinct elements in
+ * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ */
+ def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
+ : JavaPairDStream[T, JLong] = {
+ JavaPairDStream.scalaToJavaLong(
+ dstream.countByValueAndWindow(windowDuration, slideDuration, numPartitions))
+ }
+
+ /**
* Return a new DStream in which each RDD is generated by applying glom() to each RDD of
* this DStream. Applying glom() to an RDD coalesces all elements within each partition into
* an array.
@@ -114,8 +167,38 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
/**
* Return a new DStream in which each RDD has a single element generated by reducing all
- * elements in a window over this DStream. windowDuration and slideDuration are as defined in the
- * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc)
+ * elements in a sliding window over this DStream.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def reduceByWindow(
+ reduceFunc: (T, T) => T,
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): DStream[T] = {
+ dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
+ }
+
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a sliding window over this DStream. However, the reduction is done incrementally
+ * using the old window's reduced value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient than reduceByWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
*/
def reduceByWindow(
reduceFunc: JFunction2[T, T, T],
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index ef10c091ca..952ca657bf 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -25,17 +25,17 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
// Methods common to all DStream's
// =======================================================================
- /** Returns a new DStream containing only the elements that satisfy a predicate. */
+ /** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] =
dstream.filter((x => f(x).booleanValue()))
- /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): JavaPairDStream[K, V] = dstream.cache()
- /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
- def persist(): JavaPairDStream[K, V] = dstream.cache()
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ def persist(): JavaPairDStream[K, V] = dstream.persist()
- /** Persists the RDDs of this DStream with the given storage level */
+ /** Persist the RDDs of this DStream with the given storage level */
def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel)
/** Method that generates a RDD for the given Duration */
@@ -67,15 +67,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.window(windowDuration, slideDuration)
/**
- * Returns a new DStream which computed based on tumbling window on this DStream.
- * This is equivalent to window(batchDuration, batchDuration).
- * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval
- */
- def tumble(batchDuration: Duration): JavaPairDStream[K, V] =
- dstream.tumble(batchDuration)
-
- /**
- * Returns a new DStream by unifying data of another DStream with this DStream.
+ * Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
*/
def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] =
@@ -86,21 +78,21 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
// =======================================================================
/**
- * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
*/
def groupByKey(): JavaPairDStream[K, JList[V]] =
dstream.groupByKey().mapValues(seqAsJavaList _)
/**
- * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] =
dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _)
/**
- * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
+ * Return a new DStream by applying `groupByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs are grouped into a
* single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]]
* is used to control the partitioning of each RDD.
@@ -109,7 +101,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.groupByKey(partitioner).mapValues(seqAsJavaList _)
/**
- * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the associative reduce function. Hash partitioning is used to generate the RDDs
* with Spark's default number of partitions.
*/
@@ -117,7 +109,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.reduceByKey(func)
/**
- * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
* with `numPartitions` partitions.
*/
@@ -125,7 +117,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.reduceByKey(func, numPartitions)
/**
- * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the supplied reduce function. [[spark.Partitioner]] is used to control the
* partitioning of each RDD.
*/
@@ -149,24 +141,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by counting the number of values of each key in each RDD. Hash
- * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions.
- */
- def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = {
- JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions));
- }
-
-
- /**
- * Create a new DStream by counting the number of values of each key in each RDD. Hash
- * partitioning is used to generate the RDDs with the default number of partitions.
- */
- def countByKey(): JavaPairDStream[K, JLong] = {
- JavaPairDStream.scalaToJavaLong(dstream.countByKey());
- }
-
- /**
- * Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to
+ * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
* `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
* with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
* Spark's default number of partitions.
@@ -178,7 +153,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by applying `groupByKey` over a sliding window. Similar to
+ * Return a new DStream by applying `groupByKey` over a sliding window. Similar to
* `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
@@ -193,7 +168,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* Similar to `DStream.groupByKey()`, but applies it over a sliding window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
@@ -210,7 +185,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* Similar to `DStream.groupByKey()`, but applies it over a sliding window.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -243,7 +218,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
@@ -262,7 +237,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
* @param reduceFunc associative reduce function
@@ -283,7 +258,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by applying `reduceByKey` over a sliding window. Similar to
+ * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to
* `DStream.reduceByKey()`, but applies it over a sliding window.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
@@ -303,7 +278,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by reducing over a using incremental computation.
+ * Return a new DStream by reducing over a using incremental computation.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
@@ -328,7 +303,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by reducing over a using incremental computation.
+ * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
@@ -342,25 +317,31 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ * @param filterFunc function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
+ * set this to null if you do not want to filter
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
- numPartitions: Int
+ numPartitions: Int,
+ filterFunc: JFunction[(K, V), java.lang.Boolean]
): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(
reduceFunc,
invReduceFunc,
windowDuration,
slideDuration,
- numPartitions)
+ numPartitions,
+ (p: (K, V)) => filterFunc(p).booleanValue()
+ )
}
/**
- * Create a new DStream by reducing over a using incremental computation.
+ * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
@@ -374,49 +355,26 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param filterFunc function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
+ * set this to null if you do not want to filter
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
- partitioner: Partitioner
- ): JavaPairDStream[K, V] = {
+ partitioner: Partitioner,
+ filterFunc: JFunction[(K, V), java.lang.Boolean]
+ ): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(
reduceFunc,
invReduceFunc,
windowDuration,
slideDuration,
- partitioner)
- }
-
- /**
- * Create a new DStream by counting the number of values for each key over a window.
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
- * @param windowDuration width of the window; must be a multiple of this DStream's
- * batching interval
- * @param slideDuration sliding interval of the window (i.e., the interval after which
- * the new DStream will generate RDDs); must be a multiple of this
- * DStream's batching interval
- */
- def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
- : JavaPairDStream[K, JLong] = {
- JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration))
- }
-
- /**
- * Create a new DStream by counting the number of values for each key over a window.
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
- * @param windowDuration width of the window; must be a multiple of this DStream's
- * batching interval
- * @param slideDuration sliding interval of the window (i.e., the interval after which
- * the new DStream will generate RDDs); must be a multiple of this
- * DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
- */
- def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
- : JavaPairDStream[K, Long] = {
- dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions)
+ partitioner,
+ (p: (K, V)) => filterFunc(p).booleanValue()
+ )
}
private def convertUpdateStateFunction[S](in: JFunction2[JList[V], Optional[S], Optional[S]]):
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index f82e6a37cc..03933aae93 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -34,6 +34,14 @@ class JavaStreamingContext(val ssc: StreamingContext) {
this(new StreamingContext(master, frameworkName, batchDuration))
/**
+ * Creates a StreamingContext.
+ * @param sparkContext The underlying JavaSparkContext to use
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(sparkContext: JavaSparkContext, batchDuration: Duration) =
+ this(new StreamingContext(sparkContext.sc, batchDuration))
+
+ /**
* Re-creates a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
@@ -45,27 +53,24 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create an input stream that pulls messages form a Kafka Broker.
- * @param hostname Zookeper hostname.
- * @param port Zookeper port.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
*/
def kafkaStream[T](
- hostname: String,
- port: Int,
+ zkQuorum: String,
groupId: String,
topics: JMap[String, JInt])
: JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- ssc.kafkaStream[T](hostname, port, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
+ ssc.kafkaStream[T](zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}
/**
* Create an input stream that pulls messages form a Kafka Broker.
- * @param hostname Zookeper hostname.
- * @param port Zookeper port.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
@@ -73,8 +78,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* By default the value is pulled from zookeper.
*/
def kafkaStream[T](
- hostname: String,
- port: Int,
+ zkQuorum: String,
groupId: String,
topics: JMap[String, JInt],
initialOffsets: JMap[KafkaPartitionKey, JLong])
@@ -82,8 +86,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
ssc.kafkaStream[T](
- hostname,
- port,
+ zkQuorum,
groupId,
Map(topics.mapValues(_.intValue()).toSeq: _*),
Map(initialOffsets.mapValues(_.longValue()).toSeq: _*))
@@ -91,8 +94,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create an input stream that pulls messages form a Kafka Broker.
- * @param hostname Zookeper hostname.
- * @param port Zookeper port.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
@@ -101,8 +103,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param storageLevel RDD storage level. Defaults to memory-only
*/
def kafkaStream[T](
- hostname: String,
- port: Int,
+ zkQuorum: String,
groupId: String,
topics: JMap[String, JInt],
initialOffsets: JMap[KafkaPartitionKey, JLong],
@@ -111,8 +112,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
ssc.kafkaStream[T](
- hostname,
- port,
+ zkQuorum,
groupId,
Map(topics.mapValues(_.intValue()).toSeq: _*),
Map(initialOffsets.mapValues(_.longValue()).toSeq: _*),
@@ -314,12 +314,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Sets the context to periodically checkpoint the DStream operations for master
- * fault-tolerance. By default, the graph will be checkpointed every batch interval.
+ * fault-tolerance. The graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
- * @param interval checkpoint interval
*/
- def checkpoint(directory: String, interval: Duration = null) {
- ssc.checkpoint(directory, interval)
+ def checkpoint(directory: String) {
+ ssc.checkpoint(directory)
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
index 1e6ad84b44..41b9bd9461 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
@@ -2,13 +2,14 @@ package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
-import spark.streaming.{StreamingContext, Time}
+import spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import scala.collection.mutable.HashSet
+import scala.collection.mutable.{HashSet, HashMap}
+import java.io.{ObjectInputStream, IOException}
private[streaming]
class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
@@ -18,28 +19,23 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {
- @transient private var path_ : Path = null
- @transient private var fs_ : FileSystem = null
-
- var lastModTime = 0L
- val lastModTimeFiles = new HashSet[String]()
+ protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
- def path(): Path = {
- if (path_ == null) path_ = new Path(directory)
- path_
- }
+ // Latest file mod time seen till any point of time
+ private val lastModTimeFiles = new HashSet[String]()
+ private var lastModTime = 0L
- def fs(): FileSystem = {
- if (fs_ == null) fs_ = path.getFileSystem(new Configuration())
- fs_
- }
+ @transient private var path_ : Path = null
+ @transient private var fs_ : FileSystem = null
+ @transient private[streaming] var files = new HashMap[Time, Array[String]]
override def start() {
if (newFilesOnly) {
- lastModTime = System.currentTimeMillis()
+ lastModTime = graph.zeroTime.milliseconds
} else {
lastModTime = 0
}
+ logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly)
}
override def stop() { }
@@ -49,38 +45,50 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
* a union RDD out of them. Note that this maintains the list of files that were processed
* in the latest modification time in the previous call to this method. This is because the
* modification time returned by the FileStatus API seems to return times only at the
- * granularity of seconds. Hence, new files may have the same modification time as the
- * latest modification time in the previous call to this method and the list of files
- * maintained is used to filter the one that have been processed.
+ * granularity of seconds. And new files may have the same modification time as the
+ * latest modification time in the previous call to this method yet was not reported in
+ * the previous call.
*/
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
+ assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime)
+
// Create the filter for selecting new files
val newFilter = new PathFilter() {
+ // Latest file mod time seen in this round of fetching files and its corresponding files
var latestModTime = 0L
val latestModTimeFiles = new HashSet[String]()
def accept(path: Path): Boolean = {
- if (!filter(path)) {
+ if (!filter(path)) { // Reject file if it does not satisfy filter
+ logDebug("Rejected by filter " + path)
return false
- } else {
+ } else { // Accept file only if
val modTime = fs.getFileStatus(path).getModificationTime()
- if (modTime < lastModTime){
- return false
+ logDebug("Mod time for " + path + " is " + modTime)
+ if (modTime < lastModTime) {
+ logDebug("Mod time less than last mod time")
+ return false // If the file was created before the last time it was called
} else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) {
- return false
+ logDebug("Mod time equal to last mod time, but file considered already")
+ return false // If the file was created exactly as lastModTime but not reported yet
+ } else if (modTime > validTime.milliseconds) {
+ logDebug("Mod time more than valid time")
+ return false // If the file was created after the time this function call requires
}
if (modTime > latestModTime) {
latestModTime = modTime
latestModTimeFiles.clear()
+ logDebug("Latest mod time updated to " + latestModTime)
}
latestModTimeFiles += path.toString
+ logDebug("Accepted " + path)
return true
}
}
}
-
- val newFiles = fs.listStatus(path, newFilter)
- logInfo("New files: " + newFiles.map(_.getPath).mkString(", "))
+ logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime)
+ val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString)
+ logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
if (newFiles.length > 0) {
// Update the modification time and the files processed for that modification time
if (lastModTime != newFilter.latestModTime) {
@@ -88,10 +96,81 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
lastModTimeFiles.clear()
}
lastModTimeFiles ++= newFilter.latestModTimeFiles
+ logDebug("Last mod time updated to " + lastModTime)
+ }
+ files += ((validTime, newFiles))
+ Some(filesToRDD(newFiles))
+ }
+
+ /** Clear the old time-to-files mappings along with old RDDs */
+ protected[streaming] override def clearOldMetadata(time: Time) {
+ super.clearOldMetadata(time)
+ val oldFiles = files.filter(_._1 <= (time - rememberDuration))
+ files --= oldFiles.keys
+ logInfo("Cleared " + oldFiles.size + " old files that were older than " +
+ (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
+ logDebug("Cleared files are:\n" +
+ oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
+ }
+
+ /** Generate one RDD from an array of files */
+ protected[streaming] def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
+ new UnionRDD(
+ context.sparkContext,
+ files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
+ )
+ }
+
+ private def path: Path = {
+ if (path_ == null) path_ = new Path(directory)
+ path_
+ }
+
+ private def fs: FileSystem = {
+ if (fs_ == null) fs_ = path.getFileSystem(new Configuration())
+ fs_
+ }
+
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream) {
+ logDebug(this.getClass().getSimpleName + ".readObject used")
+ ois.defaultReadObject()
+ generatedRDDs = new HashMap[Time, RDD[(K,V)]] ()
+ files = new HashMap[Time, Array[String]]
+ }
+
+ /**
+ * A custom version of the DStreamCheckpointData that stores names of
+ * Hadoop files as checkpoint data.
+ */
+ private[streaming]
+ class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) {
+
+ def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]]
+
+ override def update() {
+ hadoopFiles.clear()
+ hadoopFiles ++= files
+ }
+
+ override def cleanup() { }
+
+ override def restore() {
+ hadoopFiles.foreach {
+ case (t, f) => {
+ // Restore the metadata in both files and generatedRDDs
+ logInfo("Restoring files for time " + t + " - " +
+ f.mkString("[", ", ", "]") )
+ files += ((t, f))
+ generatedRDDs += ((t, filesToRDD(f)))
+ }
+ }
+ }
+
+ override def toString() = {
+ "[\n" + hadoopFiles.size + " file sets\n" +
+ hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
}
- val newRDD = new UnionRDD(ssc.sc, newFiles.map(
- file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString)))
- Some(newRDD)
}
}
@@ -100,3 +179,4 @@ object FileInputDStream {
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
}
+
diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
index 980ca5177e..a4db44a608 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
@@ -1,10 +1,42 @@
package spark.streaming.dstream
-import spark.streaming.{Duration, StreamingContext, DStream}
+import spark.streaming.{Time, Duration, StreamingContext, DStream}
+/**
+ * This is the abstract base class for all input streams. This class provides to methods
+ * start() and stop() which called by the scheduler to start and stop receiving data/
+ * Input streams that can generated RDDs from new data just by running a service on
+ * the driver node (that is, without running a receiver onworker nodes) can be
+ * implemented by directly subclassing this InputDStream. For example,
+ * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory for
+ * new files and generates RDDs on the new files. For implementing input streams
+ * that requires running a receiver on the worker nodes, use NetworkInputDStream
+ * as the parent class.
+ */
abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
extends DStream[T](ssc_) {
+ var lastValidTime: Time = null
+
+ /**
+ * Checks whether the 'time' is valid wrt slideDuration for generating RDD.
+ * Additionally it also ensures valid times are in strictly increasing order.
+ * This ensures that InputDStream.compute() is called strictly on increasing
+ * times.
+ */
+ override protected def isTimeValid(time: Time): Boolean = {
+ if (!super.isTimeValid(time)) {
+ false // Time not valid
+ } else {
+ // Time is valid, but check it it is more than lastValidTime
+ if (lastValidTime == null || lastValidTime <= time) {
+ logWarning("isTimeValid called with " + time + " where as last valid time is " + lastValidTime)
+ }
+ lastValidTime = time
+ true
+ }
+ }
+
override def dependencies = List()
override def slideDuration: Duration = {
@@ -13,7 +45,9 @@ abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContex
ssc.graph.batchDuration
}
+ /** Method called to start receiving data. Subclasses must implement this method. */
def start()
+ /** Method called to stop receiving data. Subclasses must implement this method. */
def stop()
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 682cb82709..dc7139cc27 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -19,21 +19,11 @@ import scala.collection.JavaConversions._
// Key for a specific Kafka Partition: (broker, topic, group, part)
case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
-// NOT USED - Originally intended for fault-tolerance
-// Metadata for a Kafka Stream that it sent to the Master
-private[streaming]
-case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long])
-// NOT USED - Originally intended for fault-tolerance
-// Checkpoint data specific to a KafkaInputDstream
-private[streaming]
-case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
- savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds)
/**
* Input stream that pulls messages from a Kafka Broker.
- *
- * @param host Zookeper hostname.
- * @param port Zookeper port.
+ *
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
@@ -44,66 +34,23 @@ case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
private[streaming]
class KafkaInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
- host: String,
- port: Int,
+ zkQuorum: String,
groupId: String,
topics: Map[String, Int],
initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_ ) with Logging {
- // Metadata that keeps track of which messages have already been consumed.
- var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]()
-
- /* NOT USED - Originally intended for fault-tolerance
-
- // In case of a failure, the offets for a particular timestamp will be restored.
- @transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null
-
-
- override protected[streaming] def addMetadata(metadata: Any) {
- metadata match {
- case x : KafkaInputDStreamMetadata =>
- savedOffsets(x.timestamp) = x.data
- // TOOD: Remove logging
- logInfo("New saved Offsets: " + savedOffsets)
- case _ => logInfo("Received unknown metadata: " + metadata.toString)
- }
- }
-
- override protected[streaming] def updateCheckpointData(currentTime: Time) {
- super.updateCheckpointData(currentTime)
- if(savedOffsets.size > 0) {
- // Find the offets that were stored before the checkpoint was initiated
- val key = savedOffsets.keys.toList.sortWith(_ < _).filter(_ < currentTime.millis).last
- val latestOffsets = savedOffsets(key)
- logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString)
- checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets)
- // TODO: This may throw out offsets that are created after the checkpoint,
- // but it's unlikely we'll need them.
- savedOffsets.clear()
- }
- }
-
- override protected[streaming] def restoreCheckpointData() {
- super.restoreCheckpointData()
- logInfo("Restoring KafkaDStream checkpoint data.")
- checkpointData match {
- case x : KafkaDStreamCheckpointData =>
- restoredOffsets = x.savedOffsets
- logInfo("Restored KafkaDStream offsets: " + savedOffsets)
- }
- } */
def getReceiver(): NetworkReceiver[T] = {
- new KafkaReceiver(host, port, groupId, topics, initialOffsets, storageLevel)
+ new KafkaReceiver(zkQuorum, groupId, topics, initialOffsets, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
}
private[streaming]
-class KafkaReceiver(host: String, port: Int, groupId: String,
- topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
+class KafkaReceiver(zkQuorum: String, groupId: String,
+ topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel) extends NetworkReceiver[Any] {
// Timeout for establishing a connection to Zookeper in ms.
@@ -111,8 +58,6 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
// Handles pushing data into the BlockManager
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
- // Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset
- lazy val offsets = HashMap[KafkaPartitionKey, Long]()
// Connection to Kafka
var consumerConnector : ZookeeperConsumerConnector = null
@@ -127,24 +72,23 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
// In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
- val zooKeeperEndPoint = host + ":" + port
logInfo("Starting Kafka Consumer Stream with group: " + groupId)
logInfo("Initial offsets: " + initialOffsets.toString)
// Zookeper connection properties
val props = new Properties()
- props.put("zk.connect", zooKeeperEndPoint)
+ props.put("zk.connect", zkQuorum)
props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString)
props.put("groupid", groupId)
// Create the connection to the cluster
- logInfo("Connecting to Zookeper: " + zooKeeperEndPoint)
+ logInfo("Connecting to Zookeper: " + zkQuorum)
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector]
- logInfo("Connected to " + zooKeeperEndPoint)
+ logInfo("Connected to " + zkQuorum)
- // Reset the Kafka offsets in case we are recovering from a failure
- resetOffsets(initialOffsets)
+ // If specified, set the topic offset
+ setOffsets(initialOffsets)
// Create Threads for each Topic/Message Stream we are listening
val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder())
@@ -157,7 +101,7 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
}
// Overwrites the offets in Zookeper.
- private def resetOffsets(offsets: Map[KafkaPartitionKey, Long]) {
+ private def setOffsets(offsets: Map[KafkaPartitionKey, Long]) {
offsets.foreach { case(key, offset) =>
val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic)
val partitionName = key.brokerId + "-" + key.partId
@@ -172,29 +116,10 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
logInfo("Starting MessageHandler.")
stream.takeWhile { msgAndMetadata =>
blockGenerator += msgAndMetadata.message
-
- // Updating the offet. The key is (broker, topic, group, partition).
- val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic,
- groupId, msgAndMetadata.topicInfo.partition.partId)
- val offset = msgAndMetadata.topicInfo.getConsumeOffset
- offsets.put(key, offset)
- // logInfo("Handled message: " + (key, offset).toString)
-
// Keep on handling messages
+
true
}
}
}
-
- // NOT USED - Originally intended for fault-tolerance
- // class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel)
- // extends BufferingBlockCreator[Any](receiver, storageLevel) {
-
- // override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = {
- // // Creates a new Block with Kafka-specific Metadata
- // new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap))
- // }
-
- // }
-
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 9142deb9ed..7385474963 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -46,8 +46,15 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
def stop() {}
override def compute(validTime: Time): Option[RDD[T]] = {
- val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
- Some(new BlockRDD[T](ssc.sc, blockIds))
+ // If this is called for any time before the start time of the context,
+ // then this returns an empty RDD. This may happen when recovering from a
+ // master failure
+ if (validTime >= graph.startTime) {
+ val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
+ Some(new BlockRDD[T](ssc.sc, blockIds))
+ } else {
+ Some(new BlockRDD[T](ssc.sc, Array[String]()))
+ }
}
}
@@ -153,8 +160,8 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
/** A helper actor that communicates with the NetworkInputTracker */
private class NetworkReceiverActor extends Actor {
logInfo("Attempting to register with tracker")
- val ip = System.getProperty("spark.master.host", "localhost")
- val port = System.getProperty("spark.master.port", "7077").toInt
+ val ip = System.getProperty("spark.driver.host", "localhost")
+ val port = System.getProperty("spark.driver.port", "7077").toInt
val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
val tracker = env.actorSystem.actorFor(url)
val timeout = 5.seconds
diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
index 74ffa1c2a2..1b2fa56779 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
@@ -1,6 +1,6 @@
package spark.streaming.dstream
-import spark.{DaemonThread, Logging}
+import spark.Logging
import spark.storage.StorageLevel
import spark.streaming.StreamingContext
@@ -48,7 +48,8 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel)
val queue = new ArrayBlockingQueue[ByteBuffer](2)
- blockPushingThread = new DaemonThread {
+ blockPushingThread = new Thread {
+ setDaemon(true)
override def run() {
var nextBlockNumber = 0
while (true) {
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
index 733d5c4a25..aa5a71e1ed 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -3,7 +3,7 @@ package spark.streaming.dstream
import spark.streaming.StreamingContext._
import spark.RDD
-import spark.rdd.CoGroupedRDD
+import spark.rdd.{CoGroupedRDD, MapPartitionsRDD}
import spark.Partitioner
import spark.SparkContext._
import spark.storage.StorageLevel
@@ -15,7 +15,8 @@ private[streaming]
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
- invReduceFunc: (V, V) => V,
+ invReduceFunc: (V, V) => V,
+ filterFunc: Option[((K, V)) => Boolean],
_windowDuration: Duration,
_slideDuration: Duration,
partitioner: Partitioner
@@ -87,22 +88,25 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
//
// Get the RDDs of the reduced values in "old time steps"
- val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
+ val oldRDDs =
+ reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
- val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
+ val newRDDs =
+ reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
logDebug("# new RDDs = " + newRDDs.size)
// Get the RDD of the reduced value of the previous window
- val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
+ val previousWindowRDD =
+ getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
// Make the list of RDDs that needs to cogrouped together for reducing their reduced values
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
// Cogroup the reduced RDDs and merge the reduced values
- val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
- //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
+ val cogroupedRDD =
+ new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
val numOldValues = oldRDDs.size
val numNewValues = newRDDs.size
@@ -114,7 +118,9 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
// Getting reduced values "old time steps" that will be removed from current window
val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head)
// Getting reduced values "new time steps"
- val newValues = (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
+ val newValues =
+ (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
+
if (seqOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) {
@@ -140,10 +146,12 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues)
- Some(mergedValuesRDD)
+ if (filterFunc.isDefined) {
+ Some(mergedValuesRDD.filter(filterFunc.get))
+ } else {
+ Some(mergedValuesRDD)
+ }
}
-
-
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
index b4506c74aa..db62955036 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
@@ -48,8 +48,16 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest](
//logDebug("Generating state RDD for time " + validTime)
return Some(stateRDD)
}
- case None => { // If parent RDD does not exist, then return old state RDD
- return Some(prevStateRDD)
+ case None => { // If parent RDD does not exist
+
+ // Re-apply the update function to the old state RDD
+ val updateFuncLocal = updateFunc
+ val finalFunc = (iterator: Iterator[(K, S)]) => {
+ val i = iterator.map(t => (t._1, Seq[V](), Option(t._2)))
+ updateFuncLocal(i)
+ }
+ val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)
+ return Some(stateRDD)
}
}
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
new file mode 100644
index 0000000000..c697498862
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
@@ -0,0 +1,72 @@
+package spark.streaming.dstream
+
+import spark._
+import spark.streaming._
+import storage.StorageLevel
+
+import twitter4j._
+import twitter4j.auth.BasicAuthorization
+
+/* A stream of Twitter statuses, potentially filtered by one or more keywords.
+*
+* @constructor create a new Twitter stream using the supplied username and password to authenticate.
+* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
+* such that this may return a sampled subset of all tweets during each interval.
+*/
+private[streaming]
+class TwitterInputDStream(
+ @transient ssc_ : StreamingContext,
+ username: String,
+ password: String,
+ filters: Seq[String],
+ storageLevel: StorageLevel
+ ) extends NetworkInputDStream[Status](ssc_) {
+
+ override def getReceiver(): NetworkReceiver[Status] = {
+ new TwitterReceiver(username, password, filters, storageLevel)
+ }
+}
+
+private[streaming]
+class TwitterReceiver(
+ username: String,
+ password: String,
+ filters: Seq[String],
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[Status] {
+
+ var twitterStream: TwitterStream = _
+ lazy val blockGenerator = new BlockGenerator(storageLevel)
+
+ protected override def onStart() {
+ blockGenerator.start()
+ twitterStream = new TwitterStreamFactory()
+ .getInstance(new BasicAuthorization(username, password))
+ twitterStream.addListener(new StatusListener {
+ def onStatus(status: Status) = {
+ blockGenerator += status
+ }
+ // Unimplemented
+ def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
+ def onTrackLimitationNotice(i: Int) {}
+ def onScrubGeo(l: Long, l1: Long) {}
+ def onStallWarning(stallWarning: StallWarning) {}
+ def onException(e: Exception) { stopOnError(e) }
+ })
+
+ val query: FilterQuery = new FilterQuery
+ if (filters.size > 0) {
+ query.track(filters.toArray)
+ twitterStream.filter(query)
+ } else {
+ twitterStream.sample()
+ }
+ logInfo("Twitter receiver started")
+ }
+
+ protected override def onStop() {
+ blockGenerator.stop()
+ twitterStream.shutdown()
+ logInfo("Twitter receiver stopped")
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
new file mode 100644
index 0000000000..bdd9f4d753
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
@@ -0,0 +1,392 @@
+package spark.streaming.util
+
+import spark.{Logging, RDD}
+import spark.streaming._
+import spark.streaming.dstream.ForEachDStream
+import StreamingContext._
+
+import scala.util.Random
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+
+import java.io.{File, ObjectInputStream, IOException}
+import java.util.UUID
+
+import com.google.common.io.Files
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.fs.{FileUtil, FileSystem, Path}
+import org.apache.hadoop.conf.Configuration
+
+
+private[streaming]
+object MasterFailureTest extends Logging {
+ initLogging()
+
+ @volatile var killed = false
+ @volatile var killCount = 0
+
+ def main(args: Array[String]) {
+ if (args.size < 2) {
+ println(
+ "Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]")
+ System.exit(1)
+ }
+ val directory = args(0)
+ val numBatches = args(1).toInt
+ val batchDuration = if (args.size > 2) Milliseconds(args(2).toInt) else Seconds(1)
+
+ println("\n\n========================= MAP TEST =========================\n\n")
+ testMap(directory, numBatches, batchDuration)
+
+ println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n")
+ testUpdateStateByKey(directory, numBatches, batchDuration)
+
+ println("\n\nSUCCESS\n\n")
+ }
+
+ def testMap(directory: String, numBatches: Int, batchDuration: Duration) {
+ // Input: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ...
+ val input = (1 to numBatches).map(_.toString).toSeq
+ // Expected output: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ...
+ val expectedOutput = (1 to numBatches)
+
+ val operation = (st: DStream[String]) => st.map(_.toInt)
+
+ // Run streaming operation with multiple master failures
+ val output = testOperation(directory, batchDuration, input, operation, expectedOutput)
+
+ logInfo("Expected output, size = " + expectedOutput.size)
+ logInfo(expectedOutput.mkString("[", ",", "]"))
+ logInfo("Output, size = " + output.size)
+ logInfo(output.mkString("[", ",", "]"))
+
+ // Verify whether all the values of the expected output is present
+ // in the output
+ assert(output.distinct.toSet == expectedOutput.toSet)
+ }
+
+
+ def testUpdateStateByKey(directory: String, numBatches: Int, batchDuration: Duration) {
+ // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
+ val input = (1 to numBatches).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
+ // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
+ val expectedOutput = (1L to numBatches).map(i => (1L to i).reduce(_ + _)).map(j => ("a", j))
+
+ val operation = (st: DStream[String]) => {
+ val updateFunc = (values: Seq[Long], state: Option[Long]) => {
+ Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L))
+ }
+ st.flatMap(_.split(" "))
+ .map(x => (x, 1L))
+ .updateStateByKey[Long](updateFunc)
+ .checkpoint(batchDuration * 5)
+ }
+
+ // Run streaming operation with multiple master failures
+ val output = testOperation(directory, batchDuration, input, operation, expectedOutput)
+
+ logInfo("Expected output, size = " + expectedOutput.size + "\n" + expectedOutput)
+ logInfo("Output, size = " + output.size + "\n" + output)
+
+ // Verify whether all the values in the output are among the expected output values
+ output.foreach(o =>
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+
+ // Verify whether the last expected output value has been generated, there by
+ // confirming that none of the inputs have been missed
+ assert(output.last == expectedOutput.last)
+ }
+
+ /**
+ * Tests stream operation with multiple master failures, and verifies whether the
+ * final set of output values is as expected or not.
+ */
+ def testOperation[T: ClassManifest](
+ directory: String,
+ batchDuration: Duration,
+ input: Seq[String],
+ operation: DStream[String] => DStream[T],
+ expectedOutput: Seq[T]
+ ): Seq[T] = {
+
+ // Just making sure that the expected output does not have duplicates
+ assert(expectedOutput.distinct.toSet == expectedOutput.toSet)
+
+ // Setup the stream computation with the given operation
+ val (ssc, checkpointDir, testDir) = setupStreams(directory, batchDuration, operation)
+
+ // Start generating files in the a different thread
+ val fileGeneratingThread = new FileGeneratingThread(input, testDir, batchDuration.milliseconds)
+ fileGeneratingThread.start()
+
+ // Run the streams and repeatedly kill it until the last expected output
+ // has been generated, or until it has run for twice the expected time
+ val lastExpectedOutput = expectedOutput.last
+ val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2
+ val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun)
+
+ // Delete directories
+ fileGeneratingThread.join()
+ val fs = checkpointDir.getFileSystem(new Configuration())
+ fs.delete(checkpointDir, true)
+ fs.delete(testDir, true)
+ logInfo("Finished test after " + killCount + " failures")
+ mergedOutput
+ }
+
+ /**
+ * Sets up the stream computation with the given operation, directory (local or HDFS),
+ * and batch duration. Returns the streaming context and the directory to which
+ * files should be written for testing.
+ */
+ private def setupStreams[T: ClassManifest](
+ directory: String,
+ batchDuration: Duration,
+ operation: DStream[String] => DStream[T]
+ ): (StreamingContext, Path, Path) = {
+ // Reset all state
+ reset()
+
+ // Create the directories for this test
+ val uuid = UUID.randomUUID().toString
+ val rootDir = new Path(directory, uuid)
+ val fs = rootDir.getFileSystem(new Configuration())
+ val checkpointDir = new Path(rootDir, "checkpoint")
+ val testDir = new Path(rootDir, "test")
+ fs.mkdirs(checkpointDir)
+ fs.mkdirs(testDir)
+
+ // Setup the streaming computation with the given operation
+ System.clearProperty("spark.driver.port")
+ var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration)
+ ssc.checkpoint(checkpointDir.toString)
+ val inputStream = ssc.textFileStream(testDir.toString)
+ val operatedStream = operation(inputStream)
+ val outputStream = new TestOutputStream(operatedStream)
+ ssc.registerOutputStream(outputStream)
+ (ssc, checkpointDir, testDir)
+ }
+
+
+ /**
+ * Repeatedly starts and kills the streaming context until timed out or
+ * the last expected output is generated. Finally, return
+ */
+ private def runStreams[T: ClassManifest](
+ ssc_ : StreamingContext,
+ lastExpectedOutput: T,
+ maxTimeToRun: Long
+ ): Seq[T] = {
+
+ var ssc = ssc_
+ var totalTimeRan = 0L
+ var isLastOutputGenerated = false
+ var isTimedOut = false
+ val mergedOutput = new ArrayBuffer[T]()
+ val checkpointDir = ssc.checkpointDir
+ var batchDuration = ssc.graph.batchDuration
+
+ while(!isLastOutputGenerated && !isTimedOut) {
+ // Get the output buffer
+ val outputBuffer = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[T]].output
+ def output = outputBuffer.flatMap(x => x)
+
+ // Start the thread to kill the streaming after some time
+ killed = false
+ val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10)
+ killingThread.start()
+
+ var timeRan = 0L
+ try {
+ // Start the streaming computation and let it run while ...
+ // (i) StreamingContext has not been shut down yet
+ // (ii) The last expected output has not been generated yet
+ // (iii) Its not timed out yet
+ System.clearProperty("spark.streaming.clock")
+ System.clearProperty("spark.driver.port")
+ ssc.start()
+ val startTime = System.currentTimeMillis()
+ while (!killed && !isLastOutputGenerated && !isTimedOut) {
+ Thread.sleep(100)
+ timeRan = System.currentTimeMillis() - startTime
+ isLastOutputGenerated = (!output.isEmpty && output.last == lastExpectedOutput)
+ isTimedOut = (timeRan + totalTimeRan > maxTimeToRun)
+ }
+ } catch {
+ case e: Exception => logError("Error running streaming context", e)
+ }
+ if (killingThread.isAlive) killingThread.interrupt()
+ ssc.stop()
+
+ logInfo("Has been killed = " + killed)
+ logInfo("Is last output generated = " + isLastOutputGenerated)
+ logInfo("Is timed out = " + isTimedOut)
+
+ // Verify whether the output of each batch has only one element or no element
+ // and then merge the new output with all the earlier output
+ mergedOutput ++= output
+ totalTimeRan += timeRan
+ logInfo("New output = " + output)
+ logInfo("Merged output = " + mergedOutput)
+ logInfo("Time ran = " + timeRan)
+ logInfo("Total time ran = " + totalTimeRan)
+
+ if (!isLastOutputGenerated && !isTimedOut) {
+ val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 10)
+ logInfo(
+ "\n-------------------------------------------\n" +
+ " Restarting stream computation in " + sleepTime + " ms " +
+ "\n-------------------------------------------\n"
+ )
+ Thread.sleep(sleepTime)
+ // Recreate the streaming context from checkpoint
+ ssc = new StreamingContext(checkpointDir)
+ }
+ }
+ mergedOutput
+ }
+
+ /**
+ * Verifies the output value are the same as expected. Since failures can lead to
+ * a batch being processed twice, a batches output may appear more than once
+ * consecutively. To avoid getting confused with those, we eliminate consecutive
+ * duplicate batch outputs of values from the `output`. As a result, the
+ * expected output should not have consecutive batches with the same values as output.
+ */
+ private def verifyOutput[T: ClassManifest](output: Seq[T], expectedOutput: Seq[T]) {
+ // Verify whether expected outputs do not consecutive batches with same output
+ for (i <- 0 until expectedOutput.size - 1) {
+ assert(expectedOutput(i) != expectedOutput(i+1),
+ "Expected output has consecutive duplicate sequence of values")
+ }
+
+ // Log the output
+ println("Expected output, size = " + expectedOutput.size)
+ println(expectedOutput.mkString("[", ",", "]"))
+ println("Output, size = " + output.size)
+ println(output.mkString("[", ",", "]"))
+
+ // Match the output with the expected output
+ output.foreach(o =>
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+ }
+
+ /** Resets counter to prepare for the test */
+ private def reset() {
+ killed = false
+ killCount = 0
+ }
+}
+
+/**
+ * This is a output stream just for testing. All the output is collected into a
+ * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ */
+private[streaming]
+class TestOutputStream[T: ClassManifest](
+ parent: DStream[T],
+ val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
+ ) extends ForEachDStream[T](
+ parent,
+ (rdd: RDD[T], t: Time) => {
+ val collected = rdd.collect()
+ output += collected
+ }
+ ) {
+
+ // This is to clear the output buffer every it is read from a checkpoint
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream) {
+ ois.defaultReadObject()
+ output.clear()
+ }
+}
+
+
+/**
+ * Thread to kill streaming context after a random period of time.
+ */
+private[streaming]
+class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging {
+ initLogging()
+
+ override def run() {
+ try {
+ // If it is the first killing, then allow the first checkpoint to be created
+ var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 2000
+ val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime)
+ logInfo("Kill wait time = " + killWaitTime)
+ Thread.sleep(killWaitTime)
+ logInfo(
+ "\n---------------------------------------\n" +
+ "Killing streaming context after " + killWaitTime + " ms" +
+ "\n---------------------------------------\n"
+ )
+ if (ssc != null) {
+ ssc.stop()
+ MasterFailureTest.killed = true
+ MasterFailureTest.killCount += 1
+ }
+ logInfo("Killing thread finished normally")
+ } catch {
+ case ie: InterruptedException => logInfo("Killing thread interrupted")
+ case e: Exception => logWarning("Exception in killing thread", e)
+ }
+
+ }
+}
+
+
+/**
+ * Thread to generate input files periodically with the desired text.
+ */
+private[streaming]
+class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
+ extends Thread with Logging {
+ initLogging()
+
+ override def run() {
+ val localTestDir = Files.createTempDir()
+ var fs = testDir.getFileSystem(new Configuration())
+ val maxTries = 3
+ try {
+ Thread.sleep(5000) // To make sure that all the streaming context has been set up
+ for (i <- 0 until input.size) {
+ // Write the data to a local file and then move it to the target test directory
+ val localFile = new File(localTestDir, (i+1).toString)
+ val hadoopFile = new Path(testDir, (i+1).toString)
+ FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
+ var tries = 0
+ var done = false
+ while (!done && tries < maxTries) {
+ tries += 1
+ try {
+ fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+ done = true
+ } catch {
+ case ioe: IOException => {
+ fs = testDir.getFileSystem(new Configuration())
+ logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
+ }
+ }
+ }
+ if (!done)
+ logError("Could not generate file " + hadoopFile)
+ else
+ logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
+ Thread.sleep(interval)
+ localFile.delete()
+ }
+ logInfo("File generating thread finished normally")
+ } catch {
+ case ie: InterruptedException => logInfo("File generating thread interrupted")
+ case e: Exception => logWarning("File generating in killing thread", e)
+ } finally {
+ fs.close()
+ }
+ }
+}
+
+
diff --git a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
index db715cc295..8e10276deb 100644
--- a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
@@ -3,9 +3,9 @@ package spark.streaming.util
private[streaming]
class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
- val minPollTime = 25L
+ private val minPollTime = 25L
- val pollTime = {
+ private val pollTime = {
if (period / 10.0 > minPollTime) {
(period / 10.0).toLong
} else {
@@ -13,11 +13,20 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) =>
}
}
- val thread = new Thread() {
+ private val thread = new Thread() {
override def run() { loop }
}
- var nextTime = 0L
+ private var nextTime = 0L
+
+ def getStartTime(): Long = {
+ (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
+ }
+
+ def getRestartTime(originalStartTime: Long): Long = {
+ val gap = clock.currentTime - originalStartTime
+ (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
+ }
def start(startTime: Long): Long = {
nextTime = startTime
@@ -26,21 +35,14 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) =>
}
def start(): Long = {
- val startTime = (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
- start(startTime)
+ start(getStartTime())
}
- def restart(originalStartTime: Long): Long = {
- val gap = clock.currentTime - originalStartTime
- val newStartTime = (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
- start(newStartTime)
- }
-
- def stop() {
+ def stop() {
thread.interrupt()
}
- def loop() {
+ private def loop() {
try {
while (true) {
clock.waitTillTime(nextTime)
diff --git a/streaming/src/test/java/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 8c94e13e65..16bacffb92 100644
--- a/streaming/src/test/java/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -23,6 +23,7 @@ import spark.streaming.JavaCheckpointTestUtils;
import spark.streaming.dstream.KafkaPartitionKey;
import java.io.*;
+import java.text.Collator;
import java.util.*;
// The test suite itself is Serializable so that anonymous Function implementations can be
@@ -33,15 +34,18 @@ public class JavaAPISuite implements Serializable {
@Before
public void setUp() {
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ ssc.checkpoint("checkpoint");
}
@After
public void tearDown() {
ssc.stop();
ssc = null;
+
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port");
+ System.clearProperty("spark.driver.port");
}
@Test
@@ -132,29 +136,6 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testTumble() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1,2,3),
- Arrays.asList(4,5,6),
- Arrays.asList(7,8,9),
- Arrays.asList(10,11,12),
- Arrays.asList(13,14,15),
- Arrays.asList(16,17,18));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(1,2,3,4,5,6),
- Arrays.asList(7,8,9,10,11,12),
- Arrays.asList(13,14,15,16,17,18));
-
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream windowed = stream.tumble(new Duration(2000));
- JavaTestUtils.attachTestOutputStream(windowed);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 6, 3);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @Test
public void testFilter() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants", "dodgers"),
@@ -581,50 +562,73 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testCountByKey() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+ public void testCountByValue() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("hello", "world"),
+ Arrays.asList("hello", "moon"),
+ Arrays.asList("hello"));
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<String, Long>("california", 2L),
- new Tuple2<String, Long>("new york", 2L)),
- Arrays.asList(
- new Tuple2<String, Long>("california", 2L),
- new Tuple2<String, Long>("new york", 2L)));
-
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("world", 1L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("moon", 1L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L)));
- JavaPairDStream<String, Long> counted = pairStream.countByKey();
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Long> counted = stream.countByValue();
JavaTestUtils.attachTestOutputStream(counted);
- List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@Test
public void testGroupByKeyAndWindow() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
- List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))),
- Arrays.asList(new Tuple2<String, List<String>>("california",
- Arrays.asList("sharks", "ducks", "dodgers", "giants")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))),
- Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
+ List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 4))
+ ),
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3, 5, 5)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 1, 3, 4))
+ ),
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(5, 5)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 3))
+ )
+ );
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, List<String>> groupWindowed =
+ JavaPairDStream<String, List<Integer>> groupWindowed =
pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(groupWindowed);
- List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+ List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
- Assert.assertEquals(expected, result);
+ assert(result.size() == expected.size());
+ for (int i = 0; i < result.size(); i++) {
+ assert(convert(result.get(i)).equals(convert(expected.get(i))));
+ }
+ }
+
+ private HashSet<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
+ List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<Tuple2<String, HashSet<Integer>>>();
+ for (Tuple2<String, List<Integer>> tuple: listOfTuples) {
+ newListOfTuples.add(convert(tuple));
+ }
+ return new HashSet<Tuple2<String, HashSet<Integer>>>(newListOfTuples);
+ }
+
+ private Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) {
+ return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2()));
}
@Test
@@ -709,26 +713,28 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testCountByKeyAndWindow() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+ public void testCountByValueAndWindow() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("hello", "world"),
+ Arrays.asList("hello", "moon"),
+ Arrays.asList("hello"));
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Long>("california", 2L),
- new Tuple2<String, Long>("new york", 2L)),
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("world", 1L)),
Arrays.asList(
- new Tuple2<String, Long>("california", 4L),
- new Tuple2<String, Long>("new york", 4L)),
+ new Tuple2<String, Long>("hello", 2L),
+ new Tuple2<String, Long>("world", 1L),
+ new Tuple2<String, Long>("moon", 1L)),
Arrays.asList(
- new Tuple2<String, Long>("california", 2L),
- new Tuple2<String, Long>("new york", 2L)));
+ new Tuple2<String, Long>("hello", 2L),
+ new Tuple2<String, Long>("moon", 1L)));
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
JavaPairDStream<String, Long> counted =
- pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000));
+ stream.countByValueAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -909,9 +915,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(1,4),
Arrays.asList(8,7));
-
File tempDir = Files.createTempDir();
- ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000));
+ ssc.checkpoint(tempDir.getAbsolutePath());
JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
@@ -925,14 +930,16 @@ public class JavaAPISuite implements Serializable {
assertOrderInvariantEquals(expectedInitial, initialResult);
Thread.sleep(1000);
-
ssc.stop();
+
ssc = new JavaStreamingContext(tempDir.getAbsolutePath());
- ssc.start();
- List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 2);
- assertOrderInvariantEquals(expectedFinal, finalResult);
+ // Tweak to take into consideration that the last batch before failure
+ // will be re-processed after recovery
+ List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3);
+ assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
}
+
/** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
@Test
public void testCheckpointofIndividualStream() throws InterruptedException {
@@ -969,9 +976,9 @@ public class JavaAPISuite implements Serializable {
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap();
- JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics);
- JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets);
- JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets,
+ JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
+ JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets);
+ JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets,
StorageLevel.MEMORY_AND_DISK());
}
diff --git a/streaming/src/test/java/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
index 56349837e5..52ea28732a 100644
--- a/streaming/src/test/java/JavaTestUtils.scala
+++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
@@ -57,6 +57,7 @@ trait JavaTestBase extends TestSuiteBase {
}
object JavaTestUtils extends JavaTestBase {
+ override def maxWaitTimeMillis = 20000
}
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index edfa1243fa..59c445e63f 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,5 +1,6 @@
# Set everything to be logged to the file streaming/target/unit-tests.log
log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index f73f9b1823..8fce91853c 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -6,8 +6,15 @@ import util.ManualClock
class BasicOperationsSuite extends TestSuiteBase {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
override def framework() = "BasicOperationsSuite"
+ after {
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port")
+ }
+
test("map") {
val input = Seq(1 to 4, 5 to 8, 9 to 12)
testOperation(
@@ -17,7 +24,7 @@ class BasicOperationsSuite extends TestSuiteBase {
)
}
- test("flatmap") {
+ test("flatMap") {
val input = Seq(1 to 4, 5 to 8, 9 to 12)
testOperation(
input,
@@ -81,6 +88,23 @@ class BasicOperationsSuite extends TestSuiteBase {
)
}
+ test("count") {
+ testOperation(
+ Seq(1 to 1, 1 to 2, 1 to 3, 1 to 4),
+ (s: DStream[Int]) => s.count(),
+ Seq(Seq(1L), Seq(2L), Seq(3L), Seq(4L))
+ )
+ }
+
+ test("countByValue") {
+ testOperation(
+ Seq(1 to 1, Seq(1, 1, 1), 1 to 2, Seq(1, 1, 2, 2)),
+ (s: DStream[Int]) => s.countByValue(),
+ Seq(Seq((1, 1L)), Seq((1, 3L)), Seq((1, 1L), (2, 1L)), Seq((2, 2L), (1, 2L))),
+ true
+ )
+ }
+
test("mapValues") {
testOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ),
@@ -160,6 +184,71 @@ class BasicOperationsSuite extends TestSuiteBase {
testOperation(inputData, updateStateOperation, outputData, true)
}
+ test("updateStateByKey - object lifecycle") {
+ val inputData =
+ Seq(
+ Seq("a","b"),
+ null,
+ Seq("a","c","a"),
+ Seq("c"),
+ null,
+ null
+ )
+
+ val outputData =
+ Seq(
+ Seq(("a", 1), ("b", 1)),
+ Seq(("a", 1), ("b", 1)),
+ Seq(("a", 3), ("c", 1)),
+ Seq(("a", 3), ("c", 2)),
+ Seq(("c", 2)),
+ Seq()
+ )
+
+ val updateStateOperation = (s: DStream[String]) => {
+ class StateObject(var counter: Int = 0, var expireCounter: Int = 0) extends Serializable
+
+ // updateFunc clears a state when a StateObject is seen without new values twice in a row
+ val updateFunc = (values: Seq[Int], state: Option[StateObject]) => {
+ val stateObj = state.getOrElse(new StateObject)
+ values.foldLeft(0)(_ + _) match {
+ case 0 => stateObj.expireCounter += 1 // no new values
+ case n => { // has new values, increment and reset expireCounter
+ stateObj.counter += n
+ stateObj.expireCounter = 0
+ }
+ }
+ stateObj.expireCounter match {
+ case 2 => None // seen twice with no new values, give it the boot
+ case _ => Option(stateObj)
+ }
+ }
+ s.map(x => (x, 1)).updateStateByKey[StateObject](updateFunc).mapValues(_.counter)
+ }
+
+ testOperation(inputData, updateStateOperation, outputData, true)
+ }
+
+ test("slice") {
+ val ssc = new StreamingContext("local[2]", "BasicOperationSuite", Seconds(1))
+ val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
+ val stream = new TestInputStream[Int](ssc, input, 2)
+ ssc.registerInputStream(stream)
+ stream.foreach(_ => {}) // Dummy output stream
+ ssc.start()
+ Thread.sleep(2000)
+ def getInputFromSlice(fromMillis: Long, toMillis: Long) = {
+ stream.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet
+ }
+
+ assert(getInputFromSlice(0, 1000) == Set(1))
+ assert(getInputFromSlice(0, 2000) == Set(1, 2))
+ assert(getInputFromSlice(1000, 2000) == Set(1, 2))
+ assert(getInputFromSlice(2000, 4000) == Set(2, 3, 4))
+ ssc.stop()
+ Thread.sleep(1000)
+ }
+
test("forgetting of RDDs - map and window operations") {
assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second")
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 920388bba9..cac86deeaf 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -1,5 +1,6 @@
package spark.streaming
+import dstream.FileInputDStream
import spark.streaming.StreamingContext._
import java.io.File
import runtime.RichInt
@@ -7,17 +8,29 @@ import org.scalatest.BeforeAndAfter
import org.apache.commons.io.FileUtils
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import util.{Clock, ManualClock}
+import scala.util.Random
+import com.google.common.io.Files
+
+/**
+ * This test suites tests the checkpointing functionality of DStreams -
+ * the checkpointing of a DStream's RDDs as well as the checkpointing of
+ * the whole DStream graph.
+ */
class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
before {
FileUtils.deleteDirectory(new File(checkpointDir))
}
after {
-
if (ssc != null) ssc.stop()
FileUtils.deleteDirectory(new File(checkpointDir))
+
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port")
}
var ssc: StreamingContext = null
@@ -26,23 +39,18 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
override def batchDuration = Milliseconds(500)
- override def checkpointDir = "checkpoint"
-
- override def checkpointInterval = batchDuration
-
override def actuallyWait = true
- test("basic stream+rdd recovery") {
+ test("basic rdd checkpoints + dstream graph checkpoint recovery") {
assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
- assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration")
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
val stateStreamCheckpointInterval = Seconds(1)
// this ensure checkpointing occurs at least once
- val firstNumBatches = (stateStreamCheckpointInterval / batchDuration) * 2
+ val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2
val secondNumBatches = firstNumBatches
// Setup the streams
@@ -62,10 +70,10 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a time such that at least one RDD in the stream should have been checkpointed,
// then check whether some RDD has been checkpointed or not
ssc.start()
- runStreamsWithRealDelay(ssc, firstNumBatches)
- logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.rdds.mkString(",\n") + "]")
- assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before first failure")
- stateStream.checkpointData.rdds.foreach {
+ advanceTimeWithRealDelay(ssc, firstNumBatches)
+ logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
+ assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure")
+ stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
val file = new File(data.toString)
assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist")
@@ -74,8 +82,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a further time such that previous checkpoint files in the stream would be deleted
// and check whether the earlier checkpoint files are deleted
- val checkpointFiles = stateStream.checkpointData.rdds.map(x => new File(x._2.toString))
- runStreamsWithRealDelay(ssc, secondNumBatches)
+ val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
+ advanceTimeWithRealDelay(ssc, secondNumBatches)
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
ssc.stop()
@@ -90,9 +98,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run one batch to generate a new checkpoint file and check whether some RDD
// is present in the checkpoint data or not
ssc.start()
- runStreamsWithRealDelay(ssc, 1)
- assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before second failure")
- stateStream.checkpointData.rdds.foreach {
+ advanceTimeWithRealDelay(ssc, 1)
+ assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure")
+ stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
val file = new File(data.toString)
assert(file.exists(),
@@ -111,13 +119,16 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Adjust manual clock time as if it is being restarted after a delay
System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
ssc.start()
- runStreamsWithRealDelay(ssc, 4)
+ advanceTimeWithRealDelay(ssc, 4)
ssc.stop()
System.clearProperty("spark.streaming.manualClock.jump")
ssc = null
}
- test("map and reduceByKey") {
+ // This tests whether the systm can recover from a master failure with simple
+ // non-stateful operations. This assumes as reliable, replayable input
+ // source - TestInputDStream.
+ test("recovery with map and reduceByKey operations") {
testCheckpointedOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq() ),
(s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
@@ -126,7 +137,11 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
)
}
- test("reduceByKeyAndWindowInv") {
+
+ // This tests whether the ReduceWindowedDStream's RDD checkpoints works correctly such
+ // that the system can recover from a master failure. This assumes as reliable,
+ // replayable input source - TestInputDStream.
+ test("recovery with invertible reduceByKeyAndWindow operation") {
val n = 10
val w = 4
val input = (1 to n).map(_ => Seq("a")).toSeq
@@ -139,7 +154,11 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
testCheckpointedOperation(input, operation, output, 7)
}
- test("updateStateByKey") {
+
+ // This tests whether the StateDStream's RDD checkpoints works correctly such
+ // that the system can recover from a master failure. This assumes as reliable,
+ // replayable input source - TestInputDStream.
+ test("recovery with updateStateByKey operation") {
val input = (1 to 10).map(_ => Seq("a")).toSeq
val output = (1 to 10).map(x => Seq(("a", x))).toSeq
val operation = (st: DStream[String]) => {
@@ -154,11 +173,126 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
testCheckpointedOperation(input, operation, output, 7)
}
+ // This tests whether file input stream remembers what files were seen before
+ // the master failure and uses them again to process a large window operation.
+ // It also tests whether batches, whose processing was incomplete due to the
+ // failure, are re-processed or not.
+ test("recovery with file input stream") {
+ // Disable manual clock as FileInputDStream does not work with manual clock
+ val clockProperty = System.getProperty("spark.streaming.clock")
+ System.clearProperty("spark.streaming.clock")
+
+ // Set up the streaming context and input streams
+ val testDir = Files.createTempDir()
+ var ssc = new StreamingContext(master, framework, Seconds(1))
+ ssc.checkpoint(checkpointDir)
+ val fileStream = ssc.textFileStream(testDir.toString)
+ // Making value 3 take large time to process, to ensure that the master
+ // shuts down in the middle of processing the 3rd batch
+ val mappedStream = fileStream.map(s => {
+ val i = s.toInt
+ if (i == 3) Thread.sleep(2000)
+ i
+ })
+
+ // Reducing over a large window to ensure that recovery from master failure
+ // requires reprocessing of all the files seen before the failure
+ val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
+ val outputBuffer = new ArrayBuffer[Seq[Int]]
+ var outputStream = new TestOutputStream(reducedStream, outputBuffer)
+ ssc.registerOutputStream(outputStream)
+ ssc.start()
+
+ // Create files and advance manual clock to process them
+ //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ Thread.sleep(1000)
+ for (i <- Seq(1, 2, 3)) {
+ FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
+ // wait to make sure that the file is written such that it gets shown in the file listings
+ Thread.sleep(1000)
+ }
+ logInfo("Output = " + outputStream.output.mkString(","))
+ assert(outputStream.output.size > 0, "No files processed before restart")
+ ssc.stop()
+
+ // Verify whether files created have been recorded correctly or not
+ var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
+ def recordedFiles = fileInputDStream.files.values.flatMap(x => x)
+ assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+
+ // Create files while the master is down
+ for (i <- Seq(4, 5, 6)) {
+ FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
+ Thread.sleep(1000)
+ }
+
+ // Recover context from checkpoint file and verify whether the files that were
+ // recorded before failure were saved and successfully recovered
+ logInfo("*********** RESTARTING ************")
+ ssc = new StreamingContext(checkpointDir)
+ fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
+ assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+
+ // Restart stream computation
+ ssc.start()
+ for (i <- Seq(7, 8, 9)) {
+ FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
+ Thread.sleep(1000)
+ }
+ Thread.sleep(1000)
+ logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
+ assert(outputStream.output.size > 0, "No files processed after restart")
+ ssc.stop()
+
+ // Verify whether files created while the driver was down have been recorded or not
+ assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
+
+ // Verify whether new files created after recover have been recorded or not
+ assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
+
+ // Append the new output to the old buffer
+ outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
+ outputBuffer ++= outputStream.output
+
+ val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
+ logInfo("--------------------------------")
+ logInfo("output, size = " + outputBuffer.size)
+ outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("expected output, size = " + expectedOutput.size)
+ expectedOutput.foreach(x => logInfo("[" + x + "]"))
+ logInfo("--------------------------------")
+
+ // Verify whether all the elements received are as expected
+ val output = outputBuffer.flatMap(x => x)
+ assert(output.contains(6)) // To ensure that the 3rd input (i.e., 3) was processed
+ output.foreach(o => // To ensure all the inputs are correctly added cumulatively
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+ // To ensure that all the inputs were received correctly
+ assert(expectedOutput.last === output.last)
+
+ // Enable manual clock back again for other tests
+ if (clockProperty != null)
+ System.setProperty("spark.streaming.clock", clockProperty)
+ }
+
+
/**
- * Tests a streaming operation under checkpointing, by restart the operation
+ * Tests a streaming operation under checkpointing, by restarting the operation
* from checkpoint file and verifying whether the final output is correct.
* The output is assumed to have come from a reliable queue which an replay
* data as required.
+ *
+ * NOTE: This takes into consideration that the last batch processed before
+ * master failure will be re-processed after restart/recovery.
*/
def testCheckpointedOperation[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
@@ -172,11 +306,14 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val totalNumBatches = input.size
val nextNumBatches = totalNumBatches - initialNumBatches
val initialNumExpectedOutputs = initialNumBatches
- val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs
+ val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs + 1
+ // because the last batch will be processed again
// Do the computation for initial number of batches, create checkpoint file and quit
ssc = setupStreams[U, V](input, operation)
- val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs)
+ ssc.start()
+ val output = advanceTimeWithRealDelay[V](ssc, initialNumBatches)
+ ssc.stop()
verifyOutput[V](output, expectedOutput.take(initialNumBatches), true)
Thread.sleep(1000)
@@ -187,16 +324,20 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
"\n-------------------------------------------\n"
)
ssc = new StreamingContext(checkpointDir)
- val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs)
+ System.clearProperty("spark.driver.port")
+ ssc.start()
+ val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches)
+ // the first element will be re-processed data of the last batch before restart
verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true)
+ ssc.stop()
ssc = null
}
/**
* Advances the manual clock on the streaming scheduler by given number of batches.
- * It also wait for the expected amount of time for each batch.
+ * It also waits for the expected amount of time for each batch.
*/
- def runStreamsWithRealDelay(ssc: StreamingContext, numBatches: Long) {
+ def advanceTimeWithRealDelay[V: ClassManifest](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.time)
for (i <- 1 to numBatches.toInt) {
@@ -205,6 +346,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
logInfo("Manual clock after advancing = " + clock.time)
Thread.sleep(batchDuration.milliseconds)
- }
+ val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
+ outputStream.output
+ }
} \ No newline at end of file
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index 4aa428bf64..a5fa7ab92d 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -1,188 +1,40 @@
package spark.streaming
-import org.scalatest.BeforeAndAfter
-import org.apache.commons.io.FileUtils
+import spark.Logging
+import spark.streaming.util.MasterFailureTest
+import StreamingContext._
+
+import org.scalatest.{FunSuite, BeforeAndAfter}
+import com.google.common.io.Files
import java.io.File
-import scala.runtime.RichInt
-import scala.util.Random
-import spark.streaming.StreamingContext._
+import org.apache.commons.io.FileUtils
import collection.mutable.ArrayBuffer
-import spark.Logging
+
/**
* This testsuite tests master failures at random times while the stream is running using
* the real clock.
*/
-class FailureSuite extends TestSuiteBase with BeforeAndAfter {
+class FailureSuite extends FunSuite with BeforeAndAfter with Logging {
+
+ var directory = "FailureSuite"
+ val numBatches = 30
+ val batchDuration = Milliseconds(1000)
before {
- FileUtils.deleteDirectory(new File(checkpointDir))
+ FileUtils.deleteDirectory(new File(directory))
}
after {
- FailureSuite.reset()
- FileUtils.deleteDirectory(new File(checkpointDir))
- }
-
- override def framework = "CheckpointSuite"
-
- override def batchDuration = Milliseconds(500)
-
- override def checkpointDir = "checkpoint"
-
- override def checkpointInterval = batchDuration
-
- test("multiple failures with updateStateByKey") {
- val n = 30
- // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
- val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq
- // Last output: [ (a, 465) ] for n=30
- val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) )
-
- val operation = (st: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
- Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
- }
- st.map(x => (x, 1))
- .updateStateByKey[RichInt](updateFunc)
- .checkpoint(Seconds(2))
- .map(t => (t._1, t._2.self))
- }
-
- testOperationWithMultipleFailures(input, operation, lastOutput, n, n)
+ FileUtils.deleteDirectory(new File(directory))
}
- test("multiple failures with reduceByKeyAndWindow") {
- val n = 30
- val w = 100
- assert(w > n, "Window should be much larger than the number of input sets in this test")
- // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
- val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq
- // Last output: [ (a, 465) ]
- val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) )
-
- val operation = (st: DStream[String]) => {
- st.map(x => (x, 1))
- .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
- .checkpoint(Seconds(2))
- }
-
- testOperationWithMultipleFailures(input, operation, lastOutput, n, n)
+ test("multiple failures with map") {
+ MasterFailureTest.testMap(directory, numBatches, batchDuration)
}
-
- /**
- * Tests stream operation with multiple master failures, and verifies whether the
- * final set of output values is as expected or not. Checking the final value is
- * proof that no intermediate data was lost due to master failures.
- */
- def testOperationWithMultipleFailures[U: ClassManifest, V: ClassManifest](
- input: Seq[Seq[U]],
- operation: DStream[U] => DStream[V],
- lastExpectedOutput: Seq[V],
- numBatches: Int,
- numExpectedOutput: Int
- ) {
- var ssc = setupStreams[U, V](input, operation)
- val mergedOutput = new ArrayBuffer[Seq[V]]()
-
- var totalTimeRan = 0L
- while(totalTimeRan <= numBatches * batchDuration.milliseconds * 2) {
- new KillingThread(ssc, numBatches * batchDuration.milliseconds.toInt / 4).start()
- val (output, timeRan) = runStreamsWithRealClock[V](ssc, numBatches, numExpectedOutput)
-
- mergedOutput ++= output
- totalTimeRan += timeRan
- logInfo("New output = " + output)
- logInfo("Merged output = " + mergedOutput)
- logInfo("Total time spent = " + totalTimeRan)
- val sleepTime = Random.nextInt(numBatches * batchDuration.milliseconds.toInt / 8)
- logInfo(
- "\n-------------------------------------------\n" +
- " Restarting stream computation in " + sleepTime + " ms " +
- "\n-------------------------------------------\n"
- )
- Thread.sleep(sleepTime)
- FailureSuite.failed = false
- ssc = new StreamingContext(checkpointDir)
- }
- ssc.stop()
- ssc = null
-
- // Verify whether the last output is the expected one
- val lastOutput = mergedOutput(mergedOutput.lastIndexWhere(!_.isEmpty))
- assert(lastOutput.toSet === lastExpectedOutput.toSet)
- logInfo("Finished computation after " + FailureSuite.failureCount + " failures")
- }
-
- /**
- * Runs the streams set up in `ssc` on real clock until the expected max number of
- */
- def runStreamsWithRealClock[V: ClassManifest](
- ssc: StreamingContext,
- numBatches: Int,
- maxExpectedOutput: Int
- ): (Seq[Seq[V]], Long) = {
-
- System.clearProperty("spark.streaming.clock")
-
- assert(numBatches > 0, "Number of batches to run stream computation is zero")
- assert(maxExpectedOutput > 0, "Max expected outputs after " + numBatches + " is zero")
- logInfo("numBatches = " + numBatches + ", maxExpectedOutput = " + maxExpectedOutput)
-
- // Get the output buffer
- val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
- val output = outputStream.output
- val waitTime = (batchDuration.milliseconds * (numBatches.toDouble + 0.5)).toLong
- val startTime = System.currentTimeMillis()
-
- try {
- // Start computation
- ssc.start()
-
- // Wait until expected number of output items have been generated
- while (output.size < maxExpectedOutput && System.currentTimeMillis() - startTime < waitTime && !FailureSuite.failed) {
- logInfo("output.size = " + output.size + ", maxExpectedOutput = " + maxExpectedOutput)
- Thread.sleep(100)
- }
- } catch {
- case e: Exception => logInfo("Exception while running streams: " + e)
- } finally {
- ssc.stop()
- }
- val timeTaken = System.currentTimeMillis() - startTime
- logInfo("" + output.size + " sets of output generated in " + timeTaken + " ms")
- (output, timeTaken)
- }
-
-
-}
-
-object FailureSuite {
- var failed = false
- var failureCount = 0
-
- def reset() {
- failed = false
- failureCount = 0
+ test("multiple failures with updateStateByKey") {
+ MasterFailureTest.testUpdateStateByKey(directory, numBatches, batchDuration)
}
}
-class KillingThread(ssc: StreamingContext, maxKillWaitTime: Int) extends Thread with Logging {
- initLogging()
-
- override def run() {
- var minKillWaitTime = if (FailureSuite.failureCount == 0) 3000 else 1000 // to allow the first checkpoint
- val killWaitTime = minKillWaitTime + Random.nextInt(maxKillWaitTime)
- logInfo("Kill wait time = " + killWaitTime)
- Thread.sleep(killWaitTime.toLong)
- logInfo(
- "\n---------------------------------------\n" +
- "Killing streaming context after " + killWaitTime + " ms" +
- "\n---------------------------------------\n"
- )
- if (ssc != null) ssc.stop()
- FailureSuite.failed = true
- FailureSuite.failureCount += 1
- }
-}
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index e71ba6ddc1..7c1c2e1040 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -19,32 +19,24 @@ import org.apache.avro.ipc.specific.SpecificRequestor
import java.nio.ByteBuffer
import collection.JavaConversions._
import java.nio.charset.Charset
+import com.google.common.io.Files
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
- val testPort = 9999
- var testServer: TestServer = null
- var testDir: File = null
-
override def checkpointDir = "checkpoint"
after {
- FileUtils.deleteDirectory(new File(checkpointDir))
- if (testServer != null) {
- testServer.stop()
- testServer = null
- }
- if (testDir != null && testDir.exists()) {
- FileUtils.deleteDirectory(testDir)
- testDir = null
- }
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port")
}
+
test("network input stream") {
// Start the server
- testServer = new TestServer(testPort)
+ val testPort = 9999
+ val testServer = new TestServer(testPort)
testServer.start()
// Set up the streaming context and input streams
@@ -90,46 +82,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
}
- test("network input stream with checkpoint") {
- // Start the server
- testServer = new TestServer(testPort)
- testServer.start()
-
- // Set up the streaming context and input streams
- var ssc = new StreamingContext(master, framework, batchDuration)
- ssc.checkpoint(checkpointDir, checkpointInterval)
- val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
- var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]])
- ssc.registerOutputStream(outputStream)
- ssc.start()
-
- // Feed data to the server to send to the network receiver
- var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- for (i <- Seq(1, 2, 3)) {
- testServer.send(i.toString + "\n")
- Thread.sleep(100)
- clock.addToTime(batchDuration.milliseconds)
- }
- Thread.sleep(500)
- assert(outputStream.output.size > 0)
- ssc.stop()
-
- // Restart stream computation from checkpoint and feed more data to see whether
- // they are being received and processed
- logInfo("*********** RESTARTING ************")
- ssc = new StreamingContext(checkpointDir)
- ssc.start()
- clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- for (i <- Seq(4, 5, 6)) {
- testServer.send(i.toString + "\n")
- Thread.sleep(100)
- clock.addToTime(batchDuration.milliseconds)
- }
- Thread.sleep(500)
- outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]]
- assert(outputStream.output.size > 0)
- ssc.stop()
- }
test("flume input stream") {
// Set up the streaming context and input streams
@@ -143,7 +95,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
-
+ Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 33333));
val client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver);
@@ -179,42 +131,33 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
}
- test("file input stream") {
- // Create a temporary directory
- testDir = {
- var temp = File.createTempFile(".temp.", Random.nextInt().toString)
- temp.delete()
- temp.mkdirs()
- logInfo("Created temp dir " + temp)
- temp
- }
+ test("file input stream") {
+ // Disable manual clock as FileInputDStream does not work with manual clock
+ System.clearProperty("spark.streaming.clock")
// Set up the streaming context and input streams
+ val testDir = Files.createTempDir()
val ssc = new StreamingContext(master, framework, batchDuration)
- val filestream = ssc.textFileStream(testDir.toString)
+ val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
- val outputStream = new TestOutputStream(filestream, outputBuffer)
+ val outputStream = new TestOutputStream(fileStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
// Create files in the temporary directory so that Spark Streaming can read data from it
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
val expectedOutput = input.map(_.toString)
Thread.sleep(1000)
for (i <- 0 until input.size) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n")
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- //Thread.sleep(100)
+ val file = new File(testDir, i.toString)
+ FileUtils.writeStringToFile(file, input(i).toString + "\n")
+ logInfo("Created file " + file)
+ Thread.sleep(batchDuration.milliseconds)
+ Thread.sleep(1000)
}
val startTime = System.currentTimeMillis()
- /*while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
- logInfo("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size)
- Thread.sleep(100)
- }*/
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
@@ -223,75 +166,24 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether data received by Spark Streaming was as expected
logInfo("--------------------------------")
- logInfo("output.size = " + outputBuffer.size)
- logInfo("output")
+ logInfo("output, size = " + outputBuffer.size)
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("expected output.size = " + expectedOutput.size)
- logInfo("expected output")
+ logInfo("expected output, size = " + expectedOutput.size)
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
- assert(output.size === expectedOutput.size)
- for (i <- 0 until output.size) {
- assert(output(i).size === 1)
- assert(output(i).head.toString === expectedOutput(i))
- }
- }
+ assert(output.toList === expectedOutput.toList)
- test("file input stream with checkpoint") {
- // Create a temporary directory
- testDir = {
- var temp = File.createTempFile(".temp.", Random.nextInt().toString)
- temp.delete()
- temp.mkdirs()
- logInfo("Created temp dir " + temp)
- temp
- }
+ FileUtils.deleteDirectory(testDir)
- // Set up the streaming context and input streams
- var ssc = new StreamingContext(master, framework, batchDuration)
- ssc.checkpoint(checkpointDir, checkpointInterval)
- val filestream = ssc.textFileStream(testDir.toString)
- var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]])
- ssc.registerOutputStream(outputStream)
- ssc.start()
-
- // Create files and advance manual clock to process them
- var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- Thread.sleep(1000)
- for (i <- Seq(1, 2, 3)) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
- Thread.sleep(100)
- clock.addToTime(batchDuration.milliseconds)
- }
- Thread.sleep(500)
- logInfo("Output = " + outputStream.output.mkString(","))
- assert(outputStream.output.size > 0)
- ssc.stop()
-
- // Restart stream computation from checkpoint and create more files to see whether
- // they are being processed
- logInfo("*********** RESTARTING ************")
- ssc = new StreamingContext(checkpointDir)
- ssc.start()
- clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- Thread.sleep(500)
- for (i <- Seq(4, 5, 6)) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
- Thread.sleep(100)
- clock.addToTime(batchDuration.milliseconds)
- }
- Thread.sleep(500)
- outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]]
- logInfo("Output = " + outputStream.output.mkString(","))
- assert(outputStream.output.size > 0)
- ssc.stop()
+ // Enable manual clock back again for other tests
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
}
}
-
+/** This is server to test the network input stream */
class TestServer(port: Int) extends Logging {
val queue = new ArrayBlockingQueue[String](100)
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index a76f61d4ad..ad6aa79d10 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -10,7 +10,7 @@ import collection.mutable.SynchronizedBuffer
import java.io.{ObjectInputStream, IOException}
-import org.scalatest.FunSuite
+import org.scalatest.{BeforeAndAfter, FunSuite}
/**
* This is a input stream just for the testsuites. This is equivalent to a checkpointable,
@@ -28,6 +28,11 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
logInfo("Computing RDD for time " + validTime)
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
val selectedInput = if (index < input.size) input(index) else Seq[T]()
+
+ // lets us test cases where RDDs are not created
+ if (selectedInput == null)
+ return None
+
val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
logInfo("Created RDD " + rdd.id + " with " + selectedInput)
Some(rdd)
@@ -56,22 +61,27 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu
* This is the base trait for Spark Streaming testsuites. This provides basic functionality
* to run user-defined set of input on user-defined stream operations, and verify the output.
*/
-trait TestSuiteBase extends FunSuite with Logging {
+trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
+ // Name of the framework for Spark context
def framework = "TestSuiteBase"
+ // Master for Spark context
def master = "local[2]"
+ // Batch duration
def batchDuration = Seconds(1)
- def checkpointDir = null.asInstanceOf[String]
-
- def checkpointInterval = batchDuration
+ // Directory where the checkpoint data will be saved
+ def checkpointDir = "checkpoint"
+ // Number of partitions of the input parallel collections created for testing
def numInputPartitions = 2
+ // Maximum time to wait before the test times out
def maxWaitTimeMillis = 10000
+ // Whether to actually wait in real time before changing manual clock
def actuallyWait = false
/**
@@ -86,7 +96,7 @@ trait TestSuiteBase extends FunSuite with Logging {
// Create StreamingContext
val ssc = new StreamingContext(master, framework, batchDuration)
if (checkpointDir != null) {
- ssc.checkpoint(checkpointDir, checkpointInterval)
+ ssc.checkpoint(checkpointDir)
}
// Setup the stream computation
@@ -111,7 +121,7 @@ trait TestSuiteBase extends FunSuite with Logging {
// Create StreamingContext
val ssc = new StreamingContext(master, framework, batchDuration)
if (checkpointDir != null) {
- ssc.checkpoint(checkpointDir, checkpointInterval)
+ ssc.checkpoint(checkpointDir)
}
// Setup the stream computation
@@ -135,9 +145,6 @@ trait TestSuiteBase extends FunSuite with Logging {
numBatches: Int,
numExpectedOutput: Int
): Seq[Seq[V]] = {
-
- System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
-
assert(numBatches > 0, "Number of batches to run stream computation is zero")
assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero")
logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)
@@ -181,7 +188,6 @@ trait TestSuiteBase extends FunSuite with Logging {
} finally {
ssc.stop()
}
-
output
}
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index f9ba1f20f0..1b66f3bda2 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -5,12 +5,19 @@ import collection.mutable.ArrayBuffer
class WindowOperationsSuite extends TestSuiteBase {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
override def framework = "WindowOperationsSuite"
override def maxWaitTimeMillis = 20000
override def batchDuration = Seconds(1)
+ after {
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port")
+ }
+
val largerSlideInput = Seq(
Seq(("a", 1)),
Seq(("a", 2)), // 1st window from here
@@ -77,12 +84,9 @@ class WindowOperationsSuite extends TestSuiteBase {
)
/*
- The output of the reduceByKeyAndWindow with inverse reduce function is
- different from the naive reduceByKeyAndWindow. Even if the count of a
- particular key is 0, the key does not get eliminated from the RDDs of
- ReducedWindowedDStream. This causes the number of keys in these RDDs to
- increase forever. A more generalized version that allows elimination of
- keys should be considered.
+ The output of the reduceByKeyAndWindow with inverse function but without a filter
+ function will be different from the naive reduceByKeyAndWindow, as no keys get
+ eliminated from the ReducedWindowedDStream even if the value of a key becomes 0.
*/
val bigReduceInvOutput = Seq(
@@ -170,31 +174,31 @@ class WindowOperationsSuite extends TestSuiteBase {
// Testing reduceByKeyAndWindow (with invertible reduce function)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"basic reduction",
Seq(Seq(("a", 1), ("a", 3)) ),
Seq(Seq(("a", 4)) )
)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"key already in window and new value added into window",
Seq( Seq(("a", 1)), Seq(("a", 1)) ),
Seq( Seq(("a", 1)), Seq(("a", 2)) )
)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"new key added into window",
Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ),
Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) )
)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"key removed from window",
Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ),
Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) )
)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"larger slide time",
largerSlideInput,
largerSlideReduceOutput,
@@ -202,7 +206,9 @@ class WindowOperationsSuite extends TestSuiteBase {
Seconds(2)
)
- testReduceByKeyAndWindowInv("big test", bigInput, bigReduceInvOutput)
+ testReduceByKeyAndWindowWithInverse("big test", bigInput, bigReduceInvOutput)
+
+ testReduceByKeyAndWindowWithFilteredInverse("big test", bigInput, bigReduceOutput)
test("groupByKeyAndWindow") {
val input = bigInput
@@ -230,14 +236,14 @@ class WindowOperationsSuite extends TestSuiteBase {
testOperation(input, operation, expectedOutput, numBatches, true)
}
- test("countByKeyAndWindow") {
- val input = Seq(Seq(("a", 1)), Seq(("b", 1), ("b", 2)), Seq(("a", 10), ("b", 20)))
+ test("countByValueAndWindow") {
+ val input = Seq(Seq("a"), Seq("b", "b"), Seq("a", "b"))
val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3)))
val windowDuration = Seconds(2)
val slideDuration = Seconds(1)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
- val operation = (s: DStream[(String, Int)]) => {
- s.countByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toInt))
+ val operation = (s: DStream[String]) => {
+ s.countByValueAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toInt))
}
testOperation(input, operation, expectedOutput, numBatches, true)
}
@@ -267,29 +273,50 @@ class WindowOperationsSuite extends TestSuiteBase {
slideDuration: Duration = Seconds(1)
) {
test("reduceByKeyAndWindow - " + name) {
+ logInfo("reduceByKeyAndWindow - " + name)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
- s.reduceByKeyAndWindow(_ + _, windowDuration, slideDuration).persist()
+ s.reduceByKeyAndWindow((x: Int, y: Int) => x + y, windowDuration, slideDuration)
}
testOperation(input, operation, expectedOutput, numBatches, true)
}
}
- def testReduceByKeyAndWindowInv(
+ def testReduceByKeyAndWindowWithInverse(
name: String,
input: Seq[Seq[(String, Int)]],
expectedOutput: Seq[Seq[(String, Int)]],
windowDuration: Duration = Seconds(2),
slideDuration: Duration = Seconds(1)
) {
- test("reduceByKeyAndWindowInv - " + name) {
+ test("reduceByKeyAndWindow with inverse function - " + name) {
+ logInfo("reduceByKeyAndWindow with inverse function - " + name)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration)
- .persist()
.checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
}
testOperation(input, operation, expectedOutput, numBatches, true)
}
}
+
+ def testReduceByKeyAndWindowWithFilteredInverse(
+ name: String,
+ input: Seq[Seq[(String, Int)]],
+ expectedOutput: Seq[Seq[(String, Int)]],
+ windowDuration: Duration = Seconds(2),
+ slideDuration: Duration = Seconds(1)
+ ) {
+ test("reduceByKeyAndWindow with inverse and filter functions - " + name) {
+ logInfo("reduceByKeyAndWindow with inverse and filter functions - " + name)
+ val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
+ val filterFunc = (p: (String, Int)) => p._2 != 0
+ val operation = (s: DStream[(String, Int)]) => {
+ s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration, filterFunc = filterFunc)
+ .persist()
+ .checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
+ }
+ testOperation(input, operation, expectedOutput, numBatches, true)
+ }
+ }
}