diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2014-03-28 00:21:49 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-03-28 00:21:49 -0700 |
commit | 60abc252545ec7a5d59957a32e764cd18f6c16b4 (patch) | |
tree | b15a68100607a429149327355f680882e5a6baa3 /streaming | |
parent | 632c322036b123c6f72e0c8b87d50e08bec3a1ab (diff) | |
download | spark-60abc252545ec7a5d59957a32e764cd18f6c16b4.tar.gz spark-60abc252545ec7a5d59957a32e764cd18f6c16b4.tar.bz2 spark-60abc252545ec7a5d59957a32e764cd18f6c16b4.zip |
SPARK-1096, a space after comment start style checker.
Author: Prashant Sharma <prashant.s@imaginea.com>
Closes #124 from ScrapCodes/SPARK-1096/scalastyle-comment-check and squashes the following commits:
214135a [Prashant Sharma] Review feedback.
5eba88c [Prashant Sharma] Fixed style checks for ///+ comments.
e54b2f8 [Prashant Sharma] improved message, work around.
83e7144 [Prashant Sharma] removed dependency on scalastyle in plugin, since scalastyle sbt plugin already depends on the right version. Incase we update the plugin we will have to adjust our spark-style project to depend on right scalastyle version.
810a1d6 [Prashant Sharma] SPARK-1096, a space after comment style checker.
ba33193 [Prashant Sharma] scala style as a project
Diffstat (limited to 'streaming')
7 files changed, 10 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index fde46705d8..d3339063cc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -153,7 +153,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def validate() { this.synchronized { assert(batchDuration != null, "Batch duration has not been set") - //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + + // assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + // " is very low") assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute") } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 0dc6704603..72ad0bae75 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -128,7 +128,6 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging } catch { case ie: InterruptedException => logInfo("Receiving thread interrupted") - //println("Receiving thread interrupted") case e: Exception => stopOnError(e) } @@ -142,7 +141,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging def stop() { receivingThread.interrupt() onStop() - //TODO: terminate the actor + // TODO: terminate the actor } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index ca0a8ae478..b334d68bf9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -78,7 +78,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( override def checkpoint(interval: Duration): DStream[(K, V)] = { super.checkpoint(interval) - //reducedStream.checkpoint(interval) + // reducedStream.checkpoint(interval) this } @@ -128,7 +128,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( // Cogroup the reduced RDDs and merge the reduced values val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner) - //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ + // val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ val numOldValues = oldRDDs.size val numNewValues = newRDDs.size diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index 9d8889b655..5f7d3ba26c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -64,7 +64,6 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( } val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) - //logDebug("Generating state RDD for time " + validTime) Some(stateRDD) } case None => { // If parent RDD does not exist @@ -97,11 +96,11 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( val groupedRDD = parentRDD.groupByKey(partitioner) val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) - //logDebug("Generating state RDD for time " + validTime + " (first)") + // logDebug("Generating state RDD for time " + validTime + " (first)") Some(sessionRDD) } case None => { // If parent RDD does not exist, then nothing to do! - //logDebug("Not generating state RDD (no previous state, no parent)") + // logDebug("Not generating state RDD (no previous state, no parent)") None } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index e4fa163f2e..cad68e248a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -126,7 +126,7 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { receiverInfo -= streamId logError("De-registered receiver for network stream " + streamId + " with message " + msg) - //TODO: Do something about the corresponding NetworkInputDStream + // TODO: Do something about the corresponding NetworkInputDStream } } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 0784e562ac..25739956cb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -252,7 +252,7 @@ class CheckpointSuite extends TestSuiteBase { ssc.start() // Create files and advance manual clock to process them - //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] Thread.sleep(1000) for (i <- Seq(1, 2, 3)) { Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 74e73ebb34..7df206241b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -154,7 +154,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor", - StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope + // Had to pass the local value of port to prevent from closing over entire scope + StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) |