aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-02-09 10:09:19 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-02-09 10:09:19 -0800
commitb69f8b2a01669851c656739b6886efe4cddef31a (patch)
tree9ed0b8a6883f0ae64ce1d3f5c206306731a5f93f /streaming
parentb6dba10ae59215b5c4e40f7632563f592f138c87 (diff)
downloadspark-b69f8b2a01669851c656739b6886efe4cddef31a.tar.gz
spark-b69f8b2a01669851c656739b6886efe4cddef31a.tar.bz2
spark-b69f8b2a01669851c656739b6886efe4cddef31a.zip
Merge pull request #557 from ScrapCodes/style. Closes #557.
SPARK-1058, Fix Style Errors and Add Scala Style to Spark Build. Author: Patrick Wendell <pwendell@gmail.com> Author: Prashant Sharma <scrapcodes@gmail.com> == Merge branch commits == commit 1a8bd1c059b842cb95cc246aaea74a79fec684f4 Author: Prashant Sharma <scrapcodes@gmail.com> Date: Sun Feb 9 17:39:07 2014 +0530 scala style fixes commit f91709887a8e0b608c5c2b282db19b8a44d53a43 Author: Patrick Wendell <pwendell@gmail.com> Date: Fri Jan 24 11:22:53 2014 -0800 Adding scalastyle snapshot
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala17
1 files changed, 10 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 54813934b8..6a45bc2f8a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -47,7 +47,8 @@ object MasterFailureTest extends Logging {
def main(args: Array[String]) {
if (args.size < 2) {
println(
- "Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]")
+ "Usage: MasterFailureTest <local/HDFS directory> <# batches> " +
+ "[<batch size in milliseconds>]")
System.exit(1)
}
val directory = args(0)
@@ -186,7 +187,8 @@ object MasterFailureTest extends Logging {
// Setup the streaming computation with the given operation
System.clearProperty("spark.driver.port")
- val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map())
+ val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil,
+ Map())
ssc.checkpoint(checkpointDir.toString)
val inputStream = ssc.textFileStream(testDir.toString)
val operatedStream = operation(inputStream)
@@ -287,7 +289,7 @@ object MasterFailureTest extends Logging {
private def verifyOutput[T: ClassTag](output: Seq[T], expectedOutput: Seq[T]) {
// Verify whether expected outputs do not consecutive batches with same output
for (i <- 0 until expectedOutput.size - 1) {
- assert(expectedOutput(i) != expectedOutput(i+1),
+ assert(expectedOutput(i) != expectedOutput(i + 1),
"Expected output has consecutive duplicate sequence of values")
}
@@ -384,9 +386,9 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
Thread.sleep(5000) // To make sure that all the streaming context has been set up
for (i <- 0 until input.size) {
// Write the data to a local file and then move it to the target test directory
- val localFile = new File(localTestDir, (i+1).toString)
- val hadoopFile = new Path(testDir, (i+1).toString)
- val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString)
+ val localFile = new File(localTestDir, (i + 1).toString)
+ val hadoopFile = new Path(testDir, (i + 1).toString)
+ val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString)
FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
var tries = 0
var done = false
@@ -400,7 +402,8 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
} catch {
case ioe: IOException => {
fs = testDir.getFileSystem(new Configuration())
- logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
+ logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.",
+ ioe)
}
}
}