aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala17
1 files changed, 10 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 54813934b8..6a45bc2f8a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -47,7 +47,8 @@ object MasterFailureTest extends Logging {
def main(args: Array[String]) {
if (args.size < 2) {
println(
- "Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]")
+ "Usage: MasterFailureTest <local/HDFS directory> <# batches> " +
+ "[<batch size in milliseconds>]")
System.exit(1)
}
val directory = args(0)
@@ -186,7 +187,8 @@ object MasterFailureTest extends Logging {
// Setup the streaming computation with the given operation
System.clearProperty("spark.driver.port")
- val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map())
+ val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil,
+ Map())
ssc.checkpoint(checkpointDir.toString)
val inputStream = ssc.textFileStream(testDir.toString)
val operatedStream = operation(inputStream)
@@ -287,7 +289,7 @@ object MasterFailureTest extends Logging {
private def verifyOutput[T: ClassTag](output: Seq[T], expectedOutput: Seq[T]) {
// Verify whether expected outputs do not consecutive batches with same output
for (i <- 0 until expectedOutput.size - 1) {
- assert(expectedOutput(i) != expectedOutput(i+1),
+ assert(expectedOutput(i) != expectedOutput(i + 1),
"Expected output has consecutive duplicate sequence of values")
}
@@ -384,9 +386,9 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
Thread.sleep(5000) // To make sure that all the streaming context has been set up
for (i <- 0 until input.size) {
// Write the data to a local file and then move it to the target test directory
- val localFile = new File(localTestDir, (i+1).toString)
- val hadoopFile = new Path(testDir, (i+1).toString)
- val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString)
+ val localFile = new File(localTestDir, (i + 1).toString)
+ val hadoopFile = new Path(testDir, (i + 1).toString)
+ val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString)
FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
var tries = 0
var done = false
@@ -400,7 +402,8 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
} catch {
case ioe: IOException => {
fs = testDir.getFileSystem(new Configuration())
- logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
+ logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.",
+ ioe)
}
}
}