aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-11-13 13:16:18 -0800
committerDenny <dennybritz@gmail.com>2012-11-13 13:16:18 -0800
commit2aceae25be0562726a24b1b9e596e975f764cf30 (patch)
treea0cb1c645e2f00ea65ab1b35230602790bc66fe0 /streaming/src
parentb6f7ba813e93916dad9dbb0f06819362a5fb7cf7 (diff)
parent26fec8f0b850e7eb0b6cfe63770f2e68cd50441b (diff)
downloadspark-2aceae25be0562726a24b1b9e596e975f764cf30.tar.gz
spark-2aceae25be0562726a24b1b9e596e975f764cf30.tar.bz2
spark-2aceae25be0562726a24b1b9e596e975f764cf30.zip
Merge branch 'dev' into kafka
Conflicts: streaming/src/main/scala/spark/streaming/DStream.scala
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala73
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala12
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala28
-rw-r--r--streaming/src/main/scala/spark/streaming/StateDStream.scala15
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala33
-rw-r--r--streaming/src/test/resources/log4j.properties2
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala25
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala88
9 files changed, 155 insertions, 123 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index a70fb8f73a..770f7b0cc0 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -5,7 +5,7 @@ import spark.{Logging, Utils}
import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.conf.Configuration
-import java.io.{InputStream, ObjectStreamClass, ObjectInputStream, ObjectOutputStream}
+import java.io._
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
@@ -18,8 +18,6 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val checkpointDir = ssc.checkpointDir
val checkpointInterval = ssc.checkpointInterval
- validate()
-
def validate() {
assert(master != null, "Checkpoint.master is null")
assert(framework != null, "Checkpoint.framework is null")
@@ -27,35 +25,50 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
assert(checkpointTime != null, "Checkpoint.checkpointTime is null")
logInfo("Checkpoint for time " + checkpointTime + " validated")
}
+}
- def save(path: String) {
- val file = new Path(path, "graph")
- val conf = new Configuration()
- val fs = file.getFileSystem(conf)
- logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
- if (fs.exists(file)) {
- val bkFile = new Path(file.getParent, file.getName + ".bk")
- FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
- logDebug("Moved existing checkpoint file to " + bkFile)
+/**
+ * Convenience class to speed up the writing of graph checkpoint to file
+ */
+class CheckpointWriter(checkpointDir: String) extends Logging {
+ val file = new Path(checkpointDir, "graph")
+ val conf = new Configuration()
+ var fs = file.getFileSystem(conf)
+ val maxAttempts = 3
+
+ def write(checkpoint: Checkpoint) {
+ // TODO: maybe do this in a different thread from the main stream execution thread
+ var attempts = 0
+ while (attempts < maxAttempts) {
+ attempts += 1
+ try {
+ logDebug("Saving checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'")
+ if (fs.exists(file)) {
+ val bkFile = new Path(file.getParent, file.getName + ".bk")
+ FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
+ logDebug("Moved existing checkpoint file to " + bkFile)
+ }
+ val fos = fs.create(file)
+ val oos = new ObjectOutputStream(fos)
+ oos.writeObject(checkpoint)
+ oos.close()
+ logInfo("Checkpoint for time " + checkpoint.checkpointTime + " saved to file '" + file + "'")
+ fos.close()
+ return
+ } catch {
+ case ioe: IOException =>
+ logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
+ }
}
- val fos = fs.create(file)
- val oos = new ObjectOutputStream(fos)
- oos.writeObject(this)
- oos.close()
- fs.close()
- logInfo("Checkpoint of streaming context for time " + checkpointTime + " saved successfully to file '" + file + "'")
- }
-
- def toBytes(): Array[Byte] = {
- val bytes = Utils.serialize(this)
- bytes
+ logError("Could not write checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'")
}
}
-object Checkpoint extends Logging {
- def load(path: String): Checkpoint = {
+object CheckpointReader extends Logging {
+
+ def read(path: String): Checkpoint = {
val fs = new Path(path).getFileSystem(new Configuration())
val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
@@ -82,17 +95,11 @@ object Checkpoint extends Logging {
logError("Error loading checkpoint from file '" + file + "'", e)
}
} else {
- logWarning("Could not load checkpoint from file '" + file + "' as it does not exist")
+ logWarning("Could not read checkpoint from file '" + file + "' as it does not exist")
}
})
- throw new Exception("Could not load checkpoint from path '" + path + "'")
- }
-
- def fromBytes(bytes: Array[Byte]): Checkpoint = {
- val cp = Utils.deserialize[Checkpoint](bytes)
- cp.validate()
- cp
+ throw new Exception("Could not read checkpoint from path '" + path + "'")
}
}
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index e8bbf7d1c0..1235ce3e4d 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -233,7 +233,7 @@ extends Serializable with Logging {
}
if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) {
newRDD.checkpoint()
- logInfo("Marking RDD for time " + time + " for checkpointing at time " + time)
+ logInfo("Marking RDD " + newRDD + " for time " + time + " for checkpointing at time " + time)
}
generatedRDDs.put(time, newRDD)
Some(newRDD)
@@ -300,6 +300,9 @@ extends Serializable with Logging {
* this method to save custom checkpoint data.
*/
protected[streaming] def updateCheckpointData(currentTime: Time) {
+
+ logInfo("Updating checkpoint data for time " + currentTime)
+
// Get the checkpointed RDDs from the generated RDDs
val newRdds = generatedRDDs.filter(_._2.getCheckpointData() != null)
.map(x => (x._1, x._2.getCheckpointData()))
@@ -342,8 +345,11 @@ extends Serializable with Logging {
logInfo("Restoring checkpoint data from " + checkpointData.rdds.size + " checkpointed RDDs")
checkpointData.rdds.foreach {
case(time, data) => {
- logInfo("Restoring checkpointed RDD for time " + time + " from file")
- generatedRDDs += ((time, ssc.sc.objectFile[T](data.toString)))
+ logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'")
+ val rdd = ssc.sc.objectFile[T](data.toString)
+ // Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData()
+ rdd.checkpointFile = data.toString
+ generatedRDDs += ((time, rdd))
}
}
dependencies.foreach(_.restoreCheckpointData())
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index b07d51fa6b..8b484e6acf 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -57,7 +57,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
override def checkpoint(interval: Time): DStream[(K, V)] = {
super.checkpoint(interval)
- reducedStream.checkpoint(interval)
+ //reducedStream.checkpoint(interval)
this
}
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index de0fb1f3ad..e2dca91179 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -16,8 +16,16 @@ extends Logging {
initLogging()
val graph = ssc.graph
+
val concurrentJobs = System.getProperty("spark.stream.concurrentJobs", "1").toInt
val jobManager = new JobManager(ssc, concurrentJobs)
+
+ val checkpointWriter = if (ssc.checkpointInterval != null && ssc.checkpointDir != null) {
+ new CheckpointWriter(ssc.checkpointDir)
+ } else {
+ null
+ }
+
val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration, generateRDDs(_))
@@ -52,19 +60,23 @@ extends Logging {
logInfo("Scheduler stopped")
}
- def generateRDDs(time: Time) {
+ private def generateRDDs(time: Time) {
SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
- graph.generateRDDs(time).foreach(submitJob)
- logInfo("Generated RDDs for time " + time)
+ graph.generateRDDs(time).foreach(jobManager.runJob)
graph.forgetOldRDDs(time)
- if (ssc.checkpointInterval != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointInterval)) {
- ssc.doCheckpoint(time)
- }
+ doCheckpoint(time)
+ logInfo("Generated RDDs for time " + time)
}
- def submitJob(job: Job) {
- jobManager.runJob(job)
+ private def doCheckpoint(time: Time) {
+ if (ssc.checkpointInterval != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointInterval)) {
+ val startTime = System.currentTimeMillis()
+ ssc.graph.updateCheckpointData(time)
+ checkpointWriter.write(new Checkpoint(ssc, time))
+ val stopTime = System.currentTimeMillis()
+ logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms")
+ }
}
}
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala
index cb261808f5..b7e4c1c30c 100644
--- a/streaming/src/main/scala/spark/streaming/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala
@@ -7,20 +7,11 @@ import spark.rdd.MapPartitionsRDD
import spark.SparkContext._
import spark.storage.StorageLevel
-
-class StateRDD[U: ClassManifest, T: ClassManifest](
- prev: RDD[T],
- f: Iterator[T] => Iterator[U],
- rememberPartitioner: Boolean
- ) extends MapPartitionsRDD[U, T](prev, f) {
- override val partitioner = if (rememberPartitioner) prev.partitioner else None
-}
-
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
- rememberPartitioner: Boolean
+ preservePartitioning: Boolean
) extends DStream[(K, S)](parent.ssc) {
super.persist(StorageLevel.MEMORY_ONLY_SER)
@@ -53,7 +44,7 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
updateFuncLocal(i)
}
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
- val stateRDD = new StateRDD(cogroupedRDD, finalFunc, rememberPartitioner)
+ val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
//logDebug("Generating state RDD for time " + validTime)
return Some(stateRDD)
}
@@ -78,7 +69,7 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
}
val groupedRDD = parentRDD.groupByKey(partitioner)
- val sessionRDD = new StateRDD(groupedRDD, finalFunc, rememberPartitioner)
+ val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)
//logDebug("Generating state RDD for time " + validTime + " (first)")
return Some(sessionRDD)
}
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 4cba525f86..5e11e6d734 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -28,7 +28,7 @@ final class StreamingContext (
def this(master: String, frameworkName: String, sparkHome: String = null, jars: Seq[String] = Nil) =
this(new SparkContext(master, frameworkName, sparkHome, jars), null)
- def this(path: String) = this(null, Checkpoint.load(path))
+ def this(path: String) = this(null, CheckpointReader.read(path))
def this(cp_ : Checkpoint) = this(null, cp_)
@@ -85,7 +85,7 @@ final class StreamingContext (
graph.setRememberDuration(duration)
}
- def checkpoint(dir: String, interval: Time) {
+ def checkpoint(dir: String, interval: Time = null) {
if (dir != null) {
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(dir))
checkpointDir = dir
@@ -211,12 +211,29 @@ final class StreamingContext (
graph.addOutputStream(outputStream)
}
+ def validate() {
+ assert(graph != null, "Graph is null")
+ graph.validate()
+
+ assert(
+ checkpointDir == null || checkpointInterval != null,
+ "Checkpoint directory has been set, but the graph checkpointing interval has " +
+ "not been set. Please use StreamingContext.checkpoint() to set the interval."
+ )
+
+
+ }
+
+
/**
* This function starts the execution of the streams.
*/
def start() {
- assert(graph != null, "Graph is null")
- graph.validate()
+ if (checkpointDir != null && checkpointInterval == null && graph != null) {
+ checkpointInterval = graph.batchDuration
+ }
+
+ validate()
val networkInputStreams = graph.getInputStreams().filter(s => s match {
case n: NetworkInputDStream[_] => true
@@ -250,14 +267,6 @@ final class StreamingContext (
case e: Exception => logWarning("Error while stopping", e)
}
}
-
- def doCheckpoint(currentTime: Time) {
- val startTime = System.currentTimeMillis()
- graph.updateCheckpointData(currentTime)
- new Checkpoint(this, currentTime).save(checkpointDir)
- val stopTime = System.currentTimeMillis()
- logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms")
- }
}
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index 02fe16866e..33774b463d 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,5 +1,5 @@
# Set everything to be logged to the console
-log4j.rootCategory=WARN, console
+log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 0bcf207082..0d82b2f1ea 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -24,7 +24,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
override def framework = "CheckpointSuite"
- override def batchDuration = Milliseconds(200)
+ override def batchDuration = Milliseconds(500)
override def checkpointDir = "checkpoint"
@@ -34,7 +34,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
test("basic stream+rdd recovery") {
- assert(batchDuration === Milliseconds(200), "batchDuration for this test must be 1 second")
+ assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration")
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
@@ -134,9 +134,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val operation = (st: DStream[String]) => {
st.map(x => (x, 1))
.reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
- .checkpoint(Seconds(2))
+ .checkpoint(batchDuration * 2)
}
- testCheckpointedOperation(input, operation, output, 3)
+ testCheckpointedOperation(input, operation, output, 7)
}
test("updateStateByKey") {
@@ -148,14 +148,18 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
st.map(x => (x, 1))
.updateStateByKey[RichInt](updateFunc)
- .checkpoint(Seconds(2))
+ .checkpoint(batchDuration * 2)
.map(t => (t._1, t._2.self))
}
- testCheckpointedOperation(input, operation, output, 3)
+ testCheckpointedOperation(input, operation, output, 7)
}
-
-
+ /**
+ * Tests a streaming operation under checkpointing, by restart the operation
+ * from checkpoint file and verifying whether the final output is correct.
+ * The output is assumed to have come from a reliable queue which an replay
+ * data as required.
+ */
def testCheckpointedOperation[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
@@ -170,8 +174,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val initialNumExpectedOutputs = initialNumBatches
val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs
- // Do half the computation (half the number of batches), create checkpoint file and quit
-
+ // Do the computation for initial number of batches, create checkpoint file and quit
ssc = setupStreams[U, V](input, operation)
val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs)
verifyOutput[V](output, expectedOutput.take(initialNumBatches), true)
@@ -193,8 +196,6 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
* Advances the manual clock on the streaming scheduler by given number of batches.
* It also wait for the expected amount of time for each batch.
*/
-
-
def runStreamsWithRealDelay(ssc: StreamingContext, numBatches: Long) {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.time)
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 0957748603..3e99440226 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -16,24 +16,36 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+ val testPort = 9999
+ var testServer: TestServer = null
+ var testDir: File = null
+
override def checkpointDir = "checkpoint"
after {
FileUtils.deleteDirectory(new File(checkpointDir))
+ if (testServer != null) {
+ testServer.stop()
+ testServer = null
+ }
+ if (testDir != null && testDir.exists()) {
+ FileUtils.deleteDirectory(testDir)
+ testDir = null
+ }
}
test("network input stream") {
// Start the server
- val serverPort = 9999
- val server = new TestServer(9999)
- server.start()
+ testServer = new TestServer(testPort)
+ testServer.start()
// Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
- val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK)
+ val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
+ def output = outputBuffer.flatMap(x => x)
ssc.registerOutputStream(outputStream)
ssc.start()
@@ -41,21 +53,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
val expectedOutput = input.map(_.toString)
+ Thread.sleep(1000)
for (i <- 0 until input.size) {
- server.send(input(i).toString + "\n")
+ testServer.send(input(i).toString + "\n")
Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
}
- val startTime = System.currentTimeMillis()
- while (outputBuffer.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
- logInfo("output.size = " + outputBuffer.size + ", expectedOutput.size = " + expectedOutput.size)
- Thread.sleep(100)
- }
Thread.sleep(1000)
- val timeTaken = System.currentTimeMillis() - startTime
- assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping server")
- server.stop()
+ testServer.stop()
logInfo("Stopping context")
ssc.stop()
@@ -69,24 +75,24 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
- assert(outputBuffer.size === expectedOutput.size)
- for (i <- 0 until outputBuffer.size) {
- assert(outputBuffer(i).size === 1)
- assert(outputBuffer(i).head === expectedOutput(i))
+ // Verify whether all the elements received are as expected
+ // (whether the elements were received one in each interval is not verified)
+ assert(output.size === expectedOutput.size)
+ for (i <- 0 until output.size) {
+ assert(output(i) === expectedOutput(i))
}
}
test("network input stream with checkpoint") {
// Start the server
- val serverPort = 9999
- val server = new TestServer(9999)
- server.start()
+ testServer = new TestServer(testPort)
+ testServer.start()
// Set up the streaming context and input streams
var ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
ssc.checkpoint(checkpointDir, checkpointInterval)
- val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK)
+ val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]])
ssc.registerOutputStream(outputStream)
ssc.start()
@@ -94,7 +100,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Feed data to the server to send to the network receiver
var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
for (i <- Seq(1, 2, 3)) {
- server.send(i.toString + "\n")
+ testServer.send(i.toString + "\n")
Thread.sleep(100)
clock.addToTime(batchDuration.milliseconds)
}
@@ -109,7 +115,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
ssc.start()
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
for (i <- Seq(4, 5, 6)) {
- server.send(i.toString + "\n")
+ testServer.send(i.toString + "\n")
Thread.sleep(100)
clock.addToTime(batchDuration.milliseconds)
}
@@ -120,12 +126,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
test("file input stream") {
+
// Create a temporary directory
- val dir = {
+ testDir = {
var temp = File.createTempFile(".temp.", Random.nextInt().toString)
temp.delete()
temp.mkdirs()
- temp.deleteOnExit()
logInfo("Created temp dir " + temp)
temp
}
@@ -133,10 +139,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
- val filestream = ssc.textFileStream(dir.toString)
+ val filestream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
-
val outputStream = new TestOutputStream(filestream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
@@ -147,16 +152,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val expectedOutput = input.map(_.toString)
Thread.sleep(1000)
for (i <- 0 until input.size) {
- FileUtils.writeStringToFile(new File(dir, i.toString), input(i).toString + "\n")
- Thread.sleep(100)
+ FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n")
+ Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
- Thread.sleep(100)
+ //Thread.sleep(100)
}
val startTime = System.currentTimeMillis()
- while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
- //println("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size)
+ /*while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+ logInfo("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size)
Thread.sleep(100)
- }
+ }*/
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
@@ -165,14 +170,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether data received by Spark Streaming was as expected
logInfo("--------------------------------")
- logInfo("output.size = " + output.size)
+ logInfo("output.size = " + outputBuffer.size)
logInfo("output")
- output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
+ // Verify whether all the elements received are as expected
+ // (whether the elements were received one in each interval is not verified)
assert(output.size === expectedOutput.size)
for (i <- 0 until output.size) {
assert(output(i).size === 1)
@@ -182,12 +189,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("file input stream with checkpoint") {
// Create a temporary directory
- val dir = {
+ testDir = {
var temp = File.createTempFile(".temp.", Random.nextInt().toString)
temp.delete()
temp.mkdirs()
- temp.deleteOnExit()
- println("Created temp dir " + temp)
+ logInfo("Created temp dir " + temp)
temp
}
@@ -195,7 +201,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
var ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
ssc.checkpoint(checkpointDir, checkpointInterval)
- val filestream = ssc.textFileStream(dir.toString)
+ val filestream = ssc.textFileStream(testDir.toString)
var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]])
ssc.registerOutputStream(outputStream)
ssc.start()
@@ -204,7 +210,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
- FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n")
+ FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
Thread.sleep(100)
clock.addToTime(batchDuration.milliseconds)
}
@@ -221,7 +227,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(500)
for (i <- Seq(4, 5, 6)) {
- FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n")
+ FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
Thread.sleep(100)
clock.addToTime(batchDuration.milliseconds)
}