aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala1
5 files changed, 10 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 192aa6a139..1da0b0a54d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -720,12 +720,14 @@ abstract class DStream[T: ClassTag] (
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
+ // scalastyle:off println
println("-------------------------------------------")
println("Time: " + time)
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
+ // scalastyle:on println
}
}
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
index ca2f319f17..6addb96752 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -35,7 +35,9 @@ private[streaming]
object RawTextSender extends Logging {
def main(args: Array[String]) {
if (args.length != 4) {
+ // scalastyle:off println
System.err.println("Usage: RawTextSender <port> <file> <blockSize> <bytesPerSec>")
+ // scalastyle:on println
System.exit(1)
}
// Parse the arguments using a pattern match
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
index c8eef833eb..dd32ad5ad8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
@@ -106,7 +106,7 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
}
private[streaming]
-object RecurringTimer {
+object RecurringTimer extends Logging {
def main(args: Array[String]) {
var lastRecurTime = 0L
@@ -114,7 +114,7 @@ object RecurringTimer {
def onRecur(time: Long) {
val currentTime = System.currentTimeMillis()
- println("" + currentTime + ": " + (currentTime - lastRecurTime))
+ logInfo("" + currentTime + ": " + (currentTime - lastRecurTime))
lastRecurTime = currentTime
}
val timer = new RecurringTimer(new SystemClock(), period, onRecur, "Test")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index e0f14fd954..6e9d443109 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -43,6 +43,7 @@ object MasterFailureTest extends Logging {
@volatile var setupCalled = false
def main(args: Array[String]) {
+ // scalastyle:off println
if (args.size < 2) {
println(
"Usage: MasterFailureTest <local/HDFS directory> <# batches> " +
@@ -60,6 +61,7 @@ object MasterFailureTest extends Logging {
testUpdateStateByKey(directory, numBatches, batchDuration)
println("\n\nSUCCESS\n\n")
+ // scalastyle:on println
}
def testMap(directory: String, numBatches: Int, batchDuration: Duration) {
@@ -291,10 +293,12 @@ object MasterFailureTest extends Logging {
}
// Log the output
+ // scalastyle:off println
println("Expected output, size = " + expectedOutput.size)
println(expectedOutput.mkString("[", ",", "]"))
println("Output, size = " + output.size)
println(output.mkString("[", ",", "]"))
+ // scalastyle:on println
// Match the output with the expected output
output.foreach(o =>
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
index 7865b06c2e..a2dbae149f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
@@ -76,7 +76,6 @@ class JobGeneratorSuite extends TestSuiteBase {
if (time.milliseconds == longBatchTime) {
while (waitLatch.getCount() > 0) {
waitLatch.await()
- println("Await over")
}
}
})