diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala | 17 |
1 files changed, 10 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 54813934b8..6a45bc2f8a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -47,7 +47,8 @@ object MasterFailureTest extends Logging { def main(args: Array[String]) { if (args.size < 2) { println( - "Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]") + "Usage: MasterFailureTest <local/HDFS directory> <# batches> " + + "[<batch size in milliseconds>]") System.exit(1) } val directory = args(0) @@ -186,7 +187,8 @@ object MasterFailureTest extends Logging { // Setup the streaming computation with the given operation System.clearProperty("spark.driver.port") - val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) + val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, + Map()) ssc.checkpoint(checkpointDir.toString) val inputStream = ssc.textFileStream(testDir.toString) val operatedStream = operation(inputStream) @@ -287,7 +289,7 @@ object MasterFailureTest extends Logging { private def verifyOutput[T: ClassTag](output: Seq[T], expectedOutput: Seq[T]) { // Verify whether expected outputs do not consecutive batches with same output for (i <- 0 until expectedOutput.size - 1) { - assert(expectedOutput(i) != expectedOutput(i+1), + assert(expectedOutput(i) != expectedOutput(i + 1), "Expected output has consecutive duplicate sequence of values") } @@ -384,9 +386,9 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) Thread.sleep(5000) // To make sure that all the streaming context has been set up for (i <- 0 until input.size) { // Write the data to a local file and then move it to the target test directory - val localFile = new File(localTestDir, (i+1).toString) - val hadoopFile = new Path(testDir, (i+1).toString) - val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString) + val localFile = new File(localTestDir, (i + 1).toString) + val hadoopFile = new Path(testDir, (i + 1).toString) + val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString) FileUtils.writeStringToFile(localFile, input(i).toString + "\n") var tries = 0 var done = false @@ -400,7 +402,8 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) } catch { case ioe: IOException => { fs = testDir.getFileSystem(new Configuration()) - logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe) + logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", + ioe) } } } |