From 930b70f0523e96fe01c1317ef7fad1b76b36d4d9 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Thu, 10 Apr 2014 15:04:13 -0700 Subject: Remove Unnecessary Whitespace's stack these together in a commit else they show up chunk by chunk in different commits. Author: Sandeep Closes #380 from techaddict/white_space and squashes the following commits: b58f294 [Sandeep] Remove Unnecessary Whitespace's --- .../org/apache/spark/streaming/Checkpoint.scala | 14 ++++++------ .../org/apache/spark/streaming/Interval.scala | 8 +++---- .../scala/org/apache/spark/streaming/Time.scala | 4 ++-- .../streaming/dstream/DStreamCheckpointData.scala | 2 +- .../spark/streaming/dstream/FileInputDStream.scala | 2 +- .../streaming/dstream/QueueInputDStream.scala | 8 +++---- .../spark/streaming/receivers/ActorReceiver.scala | 2 +- .../org/apache/spark/streaming/util/Clock.scala | 26 +++++++++++----------- .../spark/streaming/util/RawTextHelper.scala | 18 +++++++-------- .../spark/streaming/util/RecurringTimer.scala | 8 +++---- .../org/apache/spark/streaming/JavaAPISuite.java | 2 +- 11 files changed, 47 insertions(+), 47 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 93023e8dce..ac56ff709c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -59,7 +59,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) } } -private[streaming] +private[streaming] object Checkpoint extends Logging { val PREFIX = "checkpoint-" val REGEX = (PREFIX + """([\d]+)([\w\.]*)""").r @@ -79,7 +79,7 @@ object Checkpoint extends Logging { def sortFunc(path1: Path, path2: Path): Boolean = { val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) } val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) } - (time1 < time2) || (time1 == time2 && bk1) + (time1 < time2) || (time1 == time2 && bk1) } val path = new Path(checkpointDir) @@ -95,7 +95,7 @@ object Checkpoint extends Logging { } } else { logInfo("Checkpoint directory " + path + " does not exist") - Seq.empty + Seq.empty } } } @@ -160,7 +160,7 @@ class CheckpointWriter( }) } - // All done, print success + // All done, print success val finishTime = System.currentTimeMillis() logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms") @@ -227,14 +227,14 @@ object CheckpointReader extends Logging { { val checkpointPath = new Path(checkpointDir) def fs = checkpointPath.getFileSystem(hadoopConf) - - // Try to find the checkpoint files + + // Try to find the checkpoint files val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse if (checkpointFiles.isEmpty) { return None } - // Try to read the checkpoint files in the order + // Try to read the checkpoint files in the order logInfo("Checkpoint files found: " + checkpointFiles.mkString(",")) val compressionCodec = CompressionCodec.createCodec(conf) checkpointFiles.foreach(file => { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala index 16479a0127..ad4f3fdd14 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala @@ -20,11 +20,11 @@ package org.apache.spark.streaming private[streaming] class Interval(val beginTime: Time, val endTime: Time) { def this(beginMs: Long, endMs: Long) = this(new Time(beginMs), new Time(endMs)) - + def duration(): Duration = endTime - beginTime def + (time: Duration): Interval = { - new Interval(beginTime + time, endTime + time) + new Interval(beginTime + time, endTime + time) } def - (time: Duration): Interval = { @@ -40,9 +40,9 @@ class Interval(val beginTime: Time, val endTime: Time) { } def <= (that: Interval) = (this < that || this == that) - + def > (that: Interval) = !(this <= that) - + def >= (that: Interval) = !(this < that) override def toString = "[" + beginTime + ", " + endTime + "]" diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala index 2678334f53..6a6b00a778 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala @@ -32,7 +32,7 @@ case class Time(private val millis: Long) { def <= (that: Time): Boolean = (this.millis <= that.millis) def > (that: Time): Boolean = (this.millis > that.millis) - + def >= (that: Time): Boolean = (this.millis >= that.millis) def + (that: Duration): Time = new Time(millis + that.milliseconds) @@ -43,7 +43,7 @@ case class Time(private val millis: Long) { def floor(that: Duration): Time = { val t = that.milliseconds - val m = math.floor(this.millis / t).toLong + val m = math.floor(this.millis / t).toLong new Time(m * t) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 903e3f3c9b..f33c0ceafd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -51,7 +51,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) .map(x => (x._1, x._2.getCheckpointFile.get)) logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) - // Add the checkpoint files to the data to be serialized + // Add the checkpoint files to the data to be serialized if (!checkpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 8a6051622e..e878285f6a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -232,7 +232,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } logDebug("Accepted " + path) } catch { - case fnfe: java.io.FileNotFoundException => + case fnfe: java.io.FileNotFoundException => logWarning("Error finding new files", fnfe) reset() return false diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index 97325f8ea3..6376cff78b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -31,11 +31,11 @@ class QueueInputDStream[T: ClassTag]( oneAtATime: Boolean, defaultRDD: RDD[T] ) extends InputDStream[T](ssc) { - + override def start() { } - + override def stop() { } - + override def compute(validTime: Time): Option[RDD[T]] = { val buffer = new ArrayBuffer[RDD[T]]() if (oneAtATime && queue.size > 0) { @@ -55,5 +55,5 @@ class QueueInputDStream[T: ClassTag]( None } } - + } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala index 44eb2750c6..f5984d03c5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala @@ -47,7 +47,7 @@ object ReceiverSupervisorStrategy { * the API for pushing received data into Spark Streaming for being processed. * * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html - * + * * @example {{{ * class MyActor extends Actor with Receiver{ * def receive { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala index c5ef2cc8c3..39145a3ab0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala @@ -19,34 +19,34 @@ package org.apache.spark.streaming.util private[streaming] trait Clock { - def currentTime(): Long + def currentTime(): Long def waitTillTime(targetTime: Long): Long } private[streaming] class SystemClock() extends Clock { - + val minPollTime = 25L - + def currentTime(): Long = { System.currentTimeMillis() - } - + } + def waitTillTime(targetTime: Long): Long = { var currentTime = 0L currentTime = System.currentTimeMillis() - + var waitTime = targetTime - currentTime if (waitTime <= 0) { return currentTime } - + val pollTime = { if (waitTime / 10.0 > minPollTime) { (waitTime / 10.0).toLong } else { - minPollTime - } + minPollTime + } } while (true) { @@ -55,7 +55,7 @@ class SystemClock() extends Clock { if (waitTime <= 0) { return currentTime } - val sleepTime = + val sleepTime = if (waitTime < pollTime) { waitTime } else { @@ -69,7 +69,7 @@ class SystemClock() extends Clock { private[streaming] class ManualClock() extends Clock { - + var time = 0L def currentTime() = time @@ -85,13 +85,13 @@ class ManualClock() extends Clock { this.synchronized { time += timeToAdd this.notifyAll() - } + } } def waitTillTime(targetTime: Long): Long = { this.synchronized { while (time < targetTime) { this.wait(100) - } + } } currentTime() } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala index 07021ebb58..bd1df55cf7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala @@ -25,8 +25,8 @@ import scala.collection.JavaConversions.mapAsScalaMap private[streaming] object RawTextHelper { - /** - * Splits lines and counts the words in them using specialized object-to-long hashmap + /** + * Splits lines and counts the words in them using specialized object-to-long hashmap * (to avoid boxing-unboxing overhead of Long in java/scala HashMap) */ def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = { @@ -55,13 +55,13 @@ object RawTextHelper { map.toIterator.map{case (k, v) => (k, v)} } - /** + /** * Gets the top k words in terms of word counts. Assumes that each word exists only once * in the `data` iterator (that is, the counts have been reduced). */ def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = { val taken = new Array[(String, Long)](k) - + var i = 0 var len = 0 var done = false @@ -93,7 +93,7 @@ object RawTextHelper { } taken.toIterator } - + /** * Warms up the SparkContext in master and slave by running tasks to force JIT kick in * before real workload starts. @@ -106,11 +106,11 @@ object RawTextHelper { .count() } } - - def add(v1: Long, v2: Long) = (v1 + v2) - def subtract(v1: Long, v2: Long) = (v1 - v2) + def add(v1: Long, v2: Long) = (v1 + v2) + + def subtract(v1: Long, v2: Long) = (v1 - v2) - def max(v1: Long, v2: Long) = math.max(v1, v2) + def max(v1: Long, v2: Long) = math.max(v1, v2) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala index f71938ac55..e016377c94 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala @@ -22,10 +22,10 @@ import org.apache.spark.Logging private[streaming] class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String) extends Logging { - + private val thread = new Thread("RecurringTimer - " + name) { setDaemon(true) - override def run() { loop } + override def run() { loop } } @volatile private var prevTime = -1L @@ -104,11 +104,11 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: private[streaming] object RecurringTimer { - + def main(args: Array[String]) { var lastRecurTime = 0L val period = 1000 - + def onRecur(time: Long) { val currentTime = System.currentTimeMillis() println("" + currentTime + ": " + (currentTime - lastRecurTime)) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 13fa64894b..a0b1bbc34f 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1673,7 +1673,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testSocketString() { - + class Converter implements Function> { public Iterable call(InputStream in) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(in)); -- cgit v1.2.3