aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorJacek Laskowski <jacek@japila.pl>2016-01-23 12:14:16 -0800
committerReynold Xin <rxin@databricks.com>2016-01-23 12:14:16 -0800
commitcfdcef70ddd25484f1cb1791e529210d602c2283 (patch)
tree3b8bc3a8d8781b1eefb901c078af7683d53e0ad4 /streaming
parent423783a08bb8730852973aca19603e444d15040d (diff)
downloadspark-cfdcef70ddd25484f1cb1791e529210d602c2283.tar.gz
spark-cfdcef70ddd25484f1cb1791e529210d602c2283.tar.bz2
spark-cfdcef70ddd25484f1cb1791e529210d602c2283.zip
[STREAMING][MINOR] Scaladoc + logs
Found while doing code review Author: Jacek Laskowski <jacek@japila.pl> Closes #10878 from jaceklaskowski/streaming-scaladoc-logs-tiny-fixes.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala2
4 files changed, 5 insertions, 6 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
index f1114c1e5a..66f646d7dc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
@@ -30,9 +30,8 @@ import org.apache.spark.util.ClosureCleaner
* `mapWithState` operation of a
* [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair DStream]] (Scala) or a
* [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] (Java).
- * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
- * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create instances of
- * this class.
+ * Use [[org.apache.spark.streaming.StateSpec.function() StateSpec.function]] factory methods
+ * to create instances of this class.
*
* Example in Scala:
* {{{
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala
index 36ff9c7e61..ed08191f41 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala
@@ -90,7 +90,7 @@ private[streaming] class MapWithStateDStreamImpl[
}
/**
- * A DStream that allows per-key state to be maintains, and arbitrary records to be generated
+ * A DStream that allows per-key state to be maintained, and arbitrary records to be generated
* based on updates to the state. This is the main DStream that implements the `mapWithState`
* operation on DStreams.
*
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 60b5c838e9..5f1c671c3c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -166,7 +166,7 @@ private[streaming] class ReceivedBlockTracker(
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
- logInfo("Deleting batches " + timesToCleanup)
+ logInfo(s"Deleting batches: ${timesToCleanup.mkString(" ")}")
if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
timeToAllocatedBlocks --= timesToCleanup
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 4908be0536..cacd430cf3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -91,7 +91,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = synchronized {
val batchUIData = BatchUIData(batchStarted.batchInfo)
- runningBatchUIData(batchStarted.batchInfo.batchTime) = BatchUIData(batchStarted.batchInfo)
+ runningBatchUIData(batchStarted.batchInfo.batchTime) = batchUIData
waitingBatchUIData.remove(batchStarted.batchInfo.batchTime)
totalReceivedRecords += batchUIData.numRecords