diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-02-09 10:09:19 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-02-09 10:09:19 -0800 |
commit | b69f8b2a01669851c656739b6886efe4cddef31a (patch) | |
tree | 9ed0b8a6883f0ae64ce1d3f5c206306731a5f93f /streaming/src | |
parent | b6dba10ae59215b5c4e40f7632563f592f138c87 (diff) | |
download | spark-b69f8b2a01669851c656739b6886efe4cddef31a.tar.gz spark-b69f8b2a01669851c656739b6886efe4cddef31a.tar.bz2 spark-b69f8b2a01669851c656739b6886efe4cddef31a.zip |
Merge pull request #557 from ScrapCodes/style. Closes #557.
SPARK-1058, Fix Style Errors and Add Scala Style to Spark Build.
Author: Patrick Wendell <pwendell@gmail.com>
Author: Prashant Sharma <scrapcodes@gmail.com>
== Merge branch commits ==
commit 1a8bd1c059b842cb95cc246aaea74a79fec684f4
Author: Prashant Sharma <scrapcodes@gmail.com>
Date: Sun Feb 9 17:39:07 2014 +0530
scala style fixes
commit f91709887a8e0b608c5c2b282db19b8a44d53a43
Author: Patrick Wendell <pwendell@gmail.com>
Date: Fri Jan 24 11:22:53 2014 -0800
Adding scalastyle snapshot
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala | 17 |
1 files changed, 10 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 54813934b8..6a45bc2f8a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -47,7 +47,8 @@ object MasterFailureTest extends Logging { def main(args: Array[String]) { if (args.size < 2) { println( - "Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]") + "Usage: MasterFailureTest <local/HDFS directory> <# batches> " + + "[<batch size in milliseconds>]") System.exit(1) } val directory = args(0) @@ -186,7 +187,8 @@ object MasterFailureTest extends Logging { // Setup the streaming computation with the given operation System.clearProperty("spark.driver.port") - val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) + val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, + Map()) ssc.checkpoint(checkpointDir.toString) val inputStream = ssc.textFileStream(testDir.toString) val operatedStream = operation(inputStream) @@ -287,7 +289,7 @@ object MasterFailureTest extends Logging { private def verifyOutput[T: ClassTag](output: Seq[T], expectedOutput: Seq[T]) { // Verify whether expected outputs do not consecutive batches with same output for (i <- 0 until expectedOutput.size - 1) { - assert(expectedOutput(i) != expectedOutput(i+1), + assert(expectedOutput(i) != expectedOutput(i + 1), "Expected output has consecutive duplicate sequence of values") } @@ -384,9 +386,9 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) Thread.sleep(5000) // To make sure that all the streaming context has been set up for (i <- 0 until input.size) { // Write the data to a local file and then move it to the target test directory - val localFile = new File(localTestDir, (i+1).toString) - val hadoopFile = new Path(testDir, (i+1).toString) - val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString) + val localFile = new File(localTestDir, (i + 1).toString) + val hadoopFile = new Path(testDir, (i + 1).toString) + val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString) FileUtils.writeStringToFile(localFile, input(i).toString + "\n") var tries = 0 var done = false @@ -400,7 +402,8 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) } catch { case ioe: IOException => { fs = testDir.getFileSystem(new Configuration()) - logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe) + logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", + ioe) } } } |