aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-03-28 00:21:49 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-28 00:21:49 -0700
commit60abc252545ec7a5d59957a32e764cd18f6c16b4 (patch)
treeb15a68100607a429149327355f680882e5a6baa3 /streaming/src
parent632c322036b123c6f72e0c8b87d50e08bec3a1ab (diff)
downloadspark-60abc252545ec7a5d59957a32e764cd18f6c16b4.tar.gz
spark-60abc252545ec7a5d59957a32e764cd18f6c16b4.tar.bz2
spark-60abc252545ec7a5d59957a32e764cd18f6c16b4.zip
SPARK-1096, a space after comment start style checker.
Author: Prashant Sharma <prashant.s@imaginea.com> Closes #124 from ScrapCodes/SPARK-1096/scalastyle-comment-check and squashes the following commits: 214135a [Prashant Sharma] Review feedback. 5eba88c [Prashant Sharma] Fixed style checks for ///+ comments. e54b2f8 [Prashant Sharma] improved message, work around. 83e7144 [Prashant Sharma] removed dependency on scalastyle in plugin, since scalastyle sbt plugin already depends on the right version. Incase we update the plugin we will have to adjust our spark-style project to depend on right scalastyle version. 810a1d6 [Prashant Sharma] SPARK-1096, a space after comment style checker. ba33193 [Prashant Sharma] scala style as a project
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala3
7 files changed, 10 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index fde46705d8..d3339063cc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -153,7 +153,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def validate() {
this.synchronized {
assert(batchDuration != null, "Batch duration has not been set")
- //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
+ // assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
// " is very low")
assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute")
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 0dc6704603..72ad0bae75 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -128,7 +128,6 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
} catch {
case ie: InterruptedException =>
logInfo("Receiving thread interrupted")
- //println("Receiving thread interrupted")
case e: Exception =>
stopOnError(e)
}
@@ -142,7 +141,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
def stop() {
receivingThread.interrupt()
onStop()
- //TODO: terminate the actor
+ // TODO: terminate the actor
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index ca0a8ae478..b334d68bf9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -78,7 +78,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
override def checkpoint(interval: Duration): DStream[(K, V)] = {
super.checkpoint(interval)
- //reducedStream.checkpoint(interval)
+ // reducedStream.checkpoint(interval)
this
}
@@ -128,7 +128,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
// Cogroup the reduced RDDs and merge the reduced values
val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]],
partitioner)
- //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
+ // val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
val numOldValues = oldRDDs.size
val numNewValues = newRDDs.size
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 9d8889b655..5f7d3ba26c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -64,7 +64,6 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
}
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
- //logDebug("Generating state RDD for time " + validTime)
Some(stateRDD)
}
case None => { // If parent RDD does not exist
@@ -97,11 +96,11 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
val groupedRDD = parentRDD.groupByKey(partitioner)
val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)
- //logDebug("Generating state RDD for time " + validTime + " (first)")
+ // logDebug("Generating state RDD for time " + validTime + " (first)")
Some(sessionRDD)
}
case None => { // If parent RDD does not exist, then nothing to do!
- //logDebug("Not generating state RDD (no previous state, no parent)")
+ // logDebug("Not generating state RDD (no previous state, no parent)")
None
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index e4fa163f2e..cad68e248a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -126,7 +126,7 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
receiverInfo -= streamId
logError("De-registered receiver for network stream " + streamId
+ " with message " + msg)
- //TODO: Do something about the corresponding NetworkInputDStream
+ // TODO: Do something about the corresponding NetworkInputDStream
}
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 0784e562ac..25739956cb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -252,7 +252,7 @@ class CheckpointSuite extends TestSuiteBase {
ssc.start()
// Create files and advance manual clock to process them
- //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 74e73ebb34..7df206241b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -154,7 +154,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
- StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope
+ // Had to pass the local value of port to prevent from closing over entire scope
+ StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)