aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-13 02:16:28 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-13 02:16:28 -0800
commit8a25d530edfa3abcdbe2effcd6bfbe484ac40acb (patch)
treed6e65f4ecc2a74ccdb2941e4c271cf7a384aab6c /streaming
parent564dd8c3f415746a68f05bde6ea2a0e7a7760b4c (diff)
downloadspark-8a25d530edfa3abcdbe2effcd6bfbe484ac40acb.tar.gz
spark-8a25d530edfa3abcdbe2effcd6bfbe484ac40acb.tar.bz2
spark-8a25d530edfa3abcdbe2effcd6bfbe484ac40acb.zip
Optimized checkpoint writing by reusing FileSystem object. Fixed bug in updating of checkpoint data in DStream where the checkpointed RDDs, upon recovery, were not recognized as checkpointed RDDs and therefore deleted from HDFS. Made InputStreamsSuite more robust to timing delays.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala73
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala28
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala10
-rw-r--r--streaming/src/test/resources/log4j.properties2
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala25
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala88
7 files changed, 128 insertions, 106 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index a70fb8f73a..770f7b0cc0 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -5,7 +5,7 @@ import spark.{Logging, Utils}
import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.conf.Configuration
-import java.io.{InputStream, ObjectStreamClass, ObjectInputStream, ObjectOutputStream}
+import java.io._
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
@@ -18,8 +18,6 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val checkpointDir = ssc.checkpointDir
val checkpointInterval = ssc.checkpointInterval
- validate()
-
def validate() {
assert(master != null, "Checkpoint.master is null")
assert(framework != null, "Checkpoint.framework is null")
@@ -27,35 +25,50 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
assert(checkpointTime != null, "Checkpoint.checkpointTime is null")
logInfo("Checkpoint for time " + checkpointTime + " validated")
}
+}
- def save(path: String) {
- val file = new Path(path, "graph")
- val conf = new Configuration()
- val fs = file.getFileSystem(conf)
- logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
- if (fs.exists(file)) {
- val bkFile = new Path(file.getParent, file.getName + ".bk")
- FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
- logDebug("Moved existing checkpoint file to " + bkFile)
+/**
+ * Convenience class to speed up the writing of graph checkpoint to file
+ */
+class CheckpointWriter(checkpointDir: String) extends Logging {
+ val file = new Path(checkpointDir, "graph")
+ val conf = new Configuration()
+ var fs = file.getFileSystem(conf)
+ val maxAttempts = 3
+
+ def write(checkpoint: Checkpoint) {
+ // TODO: maybe do this in a different thread from the main stream execution thread
+ var attempts = 0
+ while (attempts < maxAttempts) {
+ attempts += 1
+ try {
+ logDebug("Saving checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'")
+ if (fs.exists(file)) {
+ val bkFile = new Path(file.getParent, file.getName + ".bk")
+ FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
+ logDebug("Moved existing checkpoint file to " + bkFile)
+ }
+ val fos = fs.create(file)
+ val oos = new ObjectOutputStream(fos)
+ oos.writeObject(checkpoint)
+ oos.close()
+ logInfo("Checkpoint for time " + checkpoint.checkpointTime + " saved to file '" + file + "'")
+ fos.close()
+ return
+ } catch {
+ case ioe: IOException =>
+ logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
+ }
}
- val fos = fs.create(file)
- val oos = new ObjectOutputStream(fos)
- oos.writeObject(this)
- oos.close()
- fs.close()
- logInfo("Checkpoint of streaming context for time " + checkpointTime + " saved successfully to file '" + file + "'")
- }
-
- def toBytes(): Array[Byte] = {
- val bytes = Utils.serialize(this)
- bytes
+ logError("Could not write checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'")
}
}
-object Checkpoint extends Logging {
- def load(path: String): Checkpoint = {
+object CheckpointReader extends Logging {
+
+ def read(path: String): Checkpoint = {
val fs = new Path(path).getFileSystem(new Configuration())
val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
@@ -82,17 +95,11 @@ object Checkpoint extends Logging {
logError("Error loading checkpoint from file '" + file + "'", e)
}
} else {
- logWarning("Could not load checkpoint from file '" + file + "' as it does not exist")
+ logWarning("Could not read checkpoint from file '" + file + "' as it does not exist")
}
})
- throw new Exception("Could not load checkpoint from path '" + path + "'")
- }
-
- def fromBytes(bytes: Array[Byte]): Checkpoint = {
- val cp = Utils.deserialize[Checkpoint](bytes)
- cp.validate()
- cp
+ throw new Exception("Could not read checkpoint from path '" + path + "'")
}
}
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index abf132e45e..7e6f73dd7d 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -289,6 +289,7 @@ extends Serializable with Logging {
*/
protected[streaming] def updateCheckpointData(currentTime: Time) {
logInfo("Updating checkpoint data for time " + currentTime)
+
// Get the checkpointed RDDs from the generated RDDs
val newCheckpointData = generatedRDDs.filter(_._2.getCheckpointData() != null)
.map(x => (x._1, x._2.getCheckpointData()))
@@ -334,8 +335,11 @@ extends Serializable with Logging {
logInfo("Restoring checkpoint data from " + checkpointData.size + " checkpointed RDDs")
checkpointData.foreach {
case(time, data) => {
- logInfo("Restoring checkpointed RDD for time " + time + " from file")
- generatedRDDs += ((time, ssc.sc.objectFile[T](data.toString)))
+ logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'")
+ val rdd = ssc.sc.objectFile[T](data.toString)
+ // Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData()
+ rdd.checkpointFile = data.toString
+ generatedRDDs += ((time, rdd))
}
}
dependencies.foreach(_.restoreCheckpointData())
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index de0fb1f3ad..e2dca91179 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -16,8 +16,16 @@ extends Logging {
initLogging()
val graph = ssc.graph
+
val concurrentJobs = System.getProperty("spark.stream.concurrentJobs", "1").toInt
val jobManager = new JobManager(ssc, concurrentJobs)
+
+ val checkpointWriter = if (ssc.checkpointInterval != null && ssc.checkpointDir != null) {
+ new CheckpointWriter(ssc.checkpointDir)
+ } else {
+ null
+ }
+
val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration, generateRDDs(_))
@@ -52,19 +60,23 @@ extends Logging {
logInfo("Scheduler stopped")
}
- def generateRDDs(time: Time) {
+ private def generateRDDs(time: Time) {
SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
- graph.generateRDDs(time).foreach(submitJob)
- logInfo("Generated RDDs for time " + time)
+ graph.generateRDDs(time).foreach(jobManager.runJob)
graph.forgetOldRDDs(time)
- if (ssc.checkpointInterval != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointInterval)) {
- ssc.doCheckpoint(time)
- }
+ doCheckpoint(time)
+ logInfo("Generated RDDs for time " + time)
}
- def submitJob(job: Job) {
- jobManager.runJob(job)
+ private def doCheckpoint(time: Time) {
+ if (ssc.checkpointInterval != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointInterval)) {
+ val startTime = System.currentTimeMillis()
+ ssc.graph.updateCheckpointData(time)
+ checkpointWriter.write(new Checkpoint(ssc, time))
+ val stopTime = System.currentTimeMillis()
+ logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms")
+ }
}
}
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index ab6d6e8dea..ef6a05a392 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -28,7 +28,7 @@ final class StreamingContext (
def this(master: String, frameworkName: String, sparkHome: String = null, jars: Seq[String] = Nil) =
this(new SparkContext(master, frameworkName, sparkHome, jars), null)
- def this(path: String) = this(null, Checkpoint.load(path))
+ def this(path: String) = this(null, CheckpointReader.read(path))
def this(cp_ : Checkpoint) = this(null, cp_)
@@ -225,14 +225,6 @@ final class StreamingContext (
case e: Exception => logWarning("Error while stopping", e)
}
}
-
- def doCheckpoint(currentTime: Time) {
- val startTime = System.currentTimeMillis()
- graph.updateCheckpointData(currentTime)
- new Checkpoint(this, currentTime).save(checkpointDir)
- val stopTime = System.currentTimeMillis()
- logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms")
- }
}
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index 02fe16866e..33774b463d 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,5 +1,5 @@
# Set everything to be logged to the console
-log4j.rootCategory=WARN, console
+log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 0ad57e38b9..b3afedf39f 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -24,7 +24,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
override def framework = "CheckpointSuite"
- override def batchDuration = Milliseconds(200)
+ override def batchDuration = Milliseconds(500)
override def checkpointDir = "checkpoint"
@@ -34,7 +34,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
test("basic stream+rdd recovery") {
- assert(batchDuration === Milliseconds(200), "batchDuration for this test must be 1 second")
+ assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration")
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
@@ -134,9 +134,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val operation = (st: DStream[String]) => {
st.map(x => (x, 1))
.reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
- .checkpoint(Seconds(2))
+ .checkpoint(batchDuration * 2)
}
- testCheckpointedOperation(input, operation, output, 3)
+ testCheckpointedOperation(input, operation, output, 7)
}
test("updateStateByKey") {
@@ -148,14 +148,18 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
st.map(x => (x, 1))
.updateStateByKey[RichInt](updateFunc)
- .checkpoint(Seconds(2))
+ .checkpoint(batchDuration * 2)
.map(t => (t._1, t._2.self))
}
- testCheckpointedOperation(input, operation, output, 3)
+ testCheckpointedOperation(input, operation, output, 7)
}
-
-
+ /**
+ * Tests a streaming operation under checkpointing, by restart the operation
+ * from checkpoint file and verifying whether the final output is correct.
+ * The output is assumed to have come from a reliable queue which an replay
+ * data as required.
+ */
def testCheckpointedOperation[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
@@ -170,8 +174,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val initialNumExpectedOutputs = initialNumBatches
val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs
- // Do half the computation (half the number of batches), create checkpoint file and quit
-
+ // Do the computation for initial number of batches, create checkpoint file and quit
ssc = setupStreams[U, V](input, operation)
val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs)
verifyOutput[V](output, expectedOutput.take(initialNumBatches), true)
@@ -193,8 +196,6 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
* Advances the manual clock on the streaming scheduler by given number of batches.
* It also wait for the expected amount of time for each batch.
*/
-
-
def runStreamsWithRealDelay(ssc: StreamingContext, numBatches: Long) {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.time)
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 0957748603..3e99440226 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -16,24 +16,36 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+ val testPort = 9999
+ var testServer: TestServer = null
+ var testDir: File = null
+
override def checkpointDir = "checkpoint"
after {
FileUtils.deleteDirectory(new File(checkpointDir))
+ if (testServer != null) {
+ testServer.stop()
+ testServer = null
+ }
+ if (testDir != null && testDir.exists()) {
+ FileUtils.deleteDirectory(testDir)
+ testDir = null
+ }
}
test("network input stream") {
// Start the server
- val serverPort = 9999
- val server = new TestServer(9999)
- server.start()
+ testServer = new TestServer(testPort)
+ testServer.start()
// Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
- val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK)
+ val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
+ def output = outputBuffer.flatMap(x => x)
ssc.registerOutputStream(outputStream)
ssc.start()
@@ -41,21 +53,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
val expectedOutput = input.map(_.toString)
+ Thread.sleep(1000)
for (i <- 0 until input.size) {
- server.send(input(i).toString + "\n")
+ testServer.send(input(i).toString + "\n")
Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
}
- val startTime = System.currentTimeMillis()
- while (outputBuffer.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
- logInfo("output.size = " + outputBuffer.size + ", expectedOutput.size = " + expectedOutput.size)
- Thread.sleep(100)
- }
Thread.sleep(1000)
- val timeTaken = System.currentTimeMillis() - startTime
- assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping server")
- server.stop()
+ testServer.stop()
logInfo("Stopping context")
ssc.stop()
@@ -69,24 +75,24 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
- assert(outputBuffer.size === expectedOutput.size)
- for (i <- 0 until outputBuffer.size) {
- assert(outputBuffer(i).size === 1)
- assert(outputBuffer(i).head === expectedOutput(i))
+ // Verify whether all the elements received are as expected
+ // (whether the elements were received one in each interval is not verified)
+ assert(output.size === expectedOutput.size)
+ for (i <- 0 until output.size) {
+ assert(output(i) === expectedOutput(i))
}
}
test("network input stream with checkpoint") {
// Start the server
- val serverPort = 9999
- val server = new TestServer(9999)
- server.start()
+ testServer = new TestServer(testPort)
+ testServer.start()
// Set up the streaming context and input streams
var ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
ssc.checkpoint(checkpointDir, checkpointInterval)
- val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK)
+ val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]])
ssc.registerOutputStream(outputStream)
ssc.start()
@@ -94,7 +100,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Feed data to the server to send to the network receiver
var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
for (i <- Seq(1, 2, 3)) {
- server.send(i.toString + "\n")
+ testServer.send(i.toString + "\n")
Thread.sleep(100)
clock.addToTime(batchDuration.milliseconds)
}
@@ -109,7 +115,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
ssc.start()
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
for (i <- Seq(4, 5, 6)) {
- server.send(i.toString + "\n")
+ testServer.send(i.toString + "\n")
Thread.sleep(100)
clock.addToTime(batchDuration.milliseconds)
}
@@ -120,12 +126,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
test("file input stream") {
+
// Create a temporary directory
- val dir = {
+ testDir = {
var temp = File.createTempFile(".temp.", Random.nextInt().toString)
temp.delete()
temp.mkdirs()
- temp.deleteOnExit()
logInfo("Created temp dir " + temp)
temp
}
@@ -133,10 +139,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
- val filestream = ssc.textFileStream(dir.toString)
+ val filestream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
-
val outputStream = new TestOutputStream(filestream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
@@ -147,16 +152,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val expectedOutput = input.map(_.toString)
Thread.sleep(1000)
for (i <- 0 until input.size) {
- FileUtils.writeStringToFile(new File(dir, i.toString), input(i).toString + "\n")
- Thread.sleep(100)
+ FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n")
+ Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
- Thread.sleep(100)
+ //Thread.sleep(100)
}
val startTime = System.currentTimeMillis()
- while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
- //println("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size)
+ /*while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+ logInfo("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size)
Thread.sleep(100)
- }
+ }*/
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
@@ -165,14 +170,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether data received by Spark Streaming was as expected
logInfo("--------------------------------")
- logInfo("output.size = " + output.size)
+ logInfo("output.size = " + outputBuffer.size)
logInfo("output")
- output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
+ // Verify whether all the elements received are as expected
+ // (whether the elements were received one in each interval is not verified)
assert(output.size === expectedOutput.size)
for (i <- 0 until output.size) {
assert(output(i).size === 1)
@@ -182,12 +189,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("file input stream with checkpoint") {
// Create a temporary directory
- val dir = {
+ testDir = {
var temp = File.createTempFile(".temp.", Random.nextInt().toString)
temp.delete()
temp.mkdirs()
- temp.deleteOnExit()
- println("Created temp dir " + temp)
+ logInfo("Created temp dir " + temp)
temp
}
@@ -195,7 +201,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
var ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
ssc.checkpoint(checkpointDir, checkpointInterval)
- val filestream = ssc.textFileStream(dir.toString)
+ val filestream = ssc.textFileStream(testDir.toString)
var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]])
ssc.registerOutputStream(outputStream)
ssc.start()
@@ -204,7 +210,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
- FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n")
+ FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
Thread.sleep(100)
clock.addToTime(batchDuration.milliseconds)
}
@@ -221,7 +227,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(500)
for (i <- Seq(4, 5, 6)) {
- FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n")
+ FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
Thread.sleep(100)
clock.addToTime(batchDuration.milliseconds)
}