aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala14
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Interval.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Time.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala26
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala8
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java2
11 files changed, 47 insertions, 47 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 93023e8dce..ac56ff709c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -59,7 +59,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
}
}
-private[streaming]
+private[streaming]
object Checkpoint extends Logging {
val PREFIX = "checkpoint-"
val REGEX = (PREFIX + """([\d]+)([\w\.]*)""").r
@@ -79,7 +79,7 @@ object Checkpoint extends Logging {
def sortFunc(path1: Path, path2: Path): Boolean = {
val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
- (time1 < time2) || (time1 == time2 && bk1)
+ (time1 < time2) || (time1 == time2 && bk1)
}
val path = new Path(checkpointDir)
@@ -95,7 +95,7 @@ object Checkpoint extends Logging {
}
} else {
logInfo("Checkpoint directory " + path + " does not exist")
- Seq.empty
+ Seq.empty
}
}
}
@@ -160,7 +160,7 @@ class CheckpointWriter(
})
}
- // All done, print success
+ // All done, print success
val finishTime = System.currentTimeMillis()
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile +
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")
@@ -227,14 +227,14 @@ object CheckpointReader extends Logging {
{
val checkpointPath = new Path(checkpointDir)
def fs = checkpointPath.getFileSystem(hadoopConf)
-
- // Try to find the checkpoint files
+
+ // Try to find the checkpoint files
val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse
if (checkpointFiles.isEmpty) {
return None
}
- // Try to read the checkpoint files in the order
+ // Try to read the checkpoint files in the order
logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
val compressionCodec = CompressionCodec.createCodec(conf)
checkpointFiles.foreach(file => {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
index 16479a0127..ad4f3fdd14 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
@@ -20,11 +20,11 @@ package org.apache.spark.streaming
private[streaming]
class Interval(val beginTime: Time, val endTime: Time) {
def this(beginMs: Long, endMs: Long) = this(new Time(beginMs), new Time(endMs))
-
+
def duration(): Duration = endTime - beginTime
def + (time: Duration): Interval = {
- new Interval(beginTime + time, endTime + time)
+ new Interval(beginTime + time, endTime + time)
}
def - (time: Duration): Interval = {
@@ -40,9 +40,9 @@ class Interval(val beginTime: Time, val endTime: Time) {
}
def <= (that: Interval) = (this < that || this == that)
-
+
def > (that: Interval) = !(this <= that)
-
+
def >= (that: Interval) = !(this < that)
override def toString = "[" + beginTime + ", " + endTime + "]"
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
index 2678334f53..6a6b00a778 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
@@ -32,7 +32,7 @@ case class Time(private val millis: Long) {
def <= (that: Time): Boolean = (this.millis <= that.millis)
def > (that: Time): Boolean = (this.millis > that.millis)
-
+
def >= (that: Time): Boolean = (this.millis >= that.millis)
def + (that: Duration): Time = new Time(millis + that.milliseconds)
@@ -43,7 +43,7 @@ case class Time(private val millis: Long) {
def floor(that: Duration): Time = {
val t = that.milliseconds
- val m = math.floor(this.millis / t).toLong
+ val m = math.floor(this.millis / t).toLong
new Time(m * t)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 903e3f3c9b..f33c0ceafd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -51,7 +51,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
.map(x => (x._1, x._2.getCheckpointFile.get))
logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n"))
- // Add the checkpoint files to the data to be serialized
+ // Add the checkpoint files to the data to be serialized
if (!checkpointFiles.isEmpty) {
currentCheckpointFiles.clear()
currentCheckpointFiles ++= checkpointFiles
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 8a6051622e..e878285f6a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -232,7 +232,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}
logDebug("Accepted " + path)
} catch {
- case fnfe: java.io.FileNotFoundException =>
+ case fnfe: java.io.FileNotFoundException =>
logWarning("Error finding new files", fnfe)
reset()
return false
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index 97325f8ea3..6376cff78b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -31,11 +31,11 @@ class QueueInputDStream[T: ClassTag](
oneAtATime: Boolean,
defaultRDD: RDD[T]
) extends InputDStream[T](ssc) {
-
+
override def start() { }
-
+
override def stop() { }
-
+
override def compute(validTime: Time): Option[RDD[T]] = {
val buffer = new ArrayBuffer[RDD[T]]()
if (oneAtATime && queue.size > 0) {
@@ -55,5 +55,5 @@ class QueueInputDStream[T: ClassTag](
None
}
}
-
+
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
index 44eb2750c6..f5984d03c5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
@@ -47,7 +47,7 @@ object ReceiverSupervisorStrategy {
* the API for pushing received data into Spark Streaming for being processed.
*
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- *
+ *
* @example {{{
* class MyActor extends Actor with Receiver{
* def receive {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
index c5ef2cc8c3..39145a3ab0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
@@ -19,34 +19,34 @@ package org.apache.spark.streaming.util
private[streaming]
trait Clock {
- def currentTime(): Long
+ def currentTime(): Long
def waitTillTime(targetTime: Long): Long
}
private[streaming]
class SystemClock() extends Clock {
-
+
val minPollTime = 25L
-
+
def currentTime(): Long = {
System.currentTimeMillis()
- }
-
+ }
+
def waitTillTime(targetTime: Long): Long = {
var currentTime = 0L
currentTime = System.currentTimeMillis()
-
+
var waitTime = targetTime - currentTime
if (waitTime <= 0) {
return currentTime
}
-
+
val pollTime = {
if (waitTime / 10.0 > minPollTime) {
(waitTime / 10.0).toLong
} else {
- minPollTime
- }
+ minPollTime
+ }
}
while (true) {
@@ -55,7 +55,7 @@ class SystemClock() extends Clock {
if (waitTime <= 0) {
return currentTime
}
- val sleepTime =
+ val sleepTime =
if (waitTime < pollTime) {
waitTime
} else {
@@ -69,7 +69,7 @@ class SystemClock() extends Clock {
private[streaming]
class ManualClock() extends Clock {
-
+
var time = 0L
def currentTime() = time
@@ -85,13 +85,13 @@ class ManualClock() extends Clock {
this.synchronized {
time += timeToAdd
this.notifyAll()
- }
+ }
}
def waitTillTime(targetTime: Long): Long = {
this.synchronized {
while (time < targetTime) {
this.wait(100)
- }
+ }
}
currentTime()
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index 07021ebb58..bd1df55cf7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -25,8 +25,8 @@ import scala.collection.JavaConversions.mapAsScalaMap
private[streaming]
object RawTextHelper {
- /**
- * Splits lines and counts the words in them using specialized object-to-long hashmap
+ /**
+ * Splits lines and counts the words in them using specialized object-to-long hashmap
* (to avoid boxing-unboxing overhead of Long in java/scala HashMap)
*/
def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
@@ -55,13 +55,13 @@ object RawTextHelper {
map.toIterator.map{case (k, v) => (k, v)}
}
- /**
+ /**
* Gets the top k words in terms of word counts. Assumes that each word exists only once
* in the `data` iterator (that is, the counts have been reduced).
*/
def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = {
val taken = new Array[(String, Long)](k)
-
+
var i = 0
var len = 0
var done = false
@@ -93,7 +93,7 @@ object RawTextHelper {
}
taken.toIterator
}
-
+
/**
* Warms up the SparkContext in master and slave by running tasks to force JIT kick in
* before real workload starts.
@@ -106,11 +106,11 @@ object RawTextHelper {
.count()
}
}
-
- def add(v1: Long, v2: Long) = (v1 + v2)
- def subtract(v1: Long, v2: Long) = (v1 - v2)
+ def add(v1: Long, v2: Long) = (v1 + v2)
+
+ def subtract(v1: Long, v2: Long) = (v1 - v2)
- def max(v1: Long, v2: Long) = math.max(v1, v2)
+ def max(v1: Long, v2: Long) = math.max(v1, v2)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
index f71938ac55..e016377c94 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
@@ -22,10 +22,10 @@ import org.apache.spark.Logging
private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
extends Logging {
-
+
private val thread = new Thread("RecurringTimer - " + name) {
setDaemon(true)
- override def run() { loop }
+ override def run() { loop }
}
@volatile private var prevTime = -1L
@@ -104,11 +104,11 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
private[streaming]
object RecurringTimer {
-
+
def main(args: Array[String]) {
var lastRecurTime = 0L
val period = 1000
-
+
def onRecur(time: Long) {
val currentTime = System.currentTimeMillis()
println("" + currentTime + ": " + (currentTime - lastRecurTime))
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 13fa64894b..a0b1bbc34f 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1673,7 +1673,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testSocketString() {
-
+
class Converter implements Function<InputStream, Iterable<String>> {
public Iterable<String> call(InputStream in) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));